# Model Prototype

### This notebook shows how to create a baseline model pipeline and save it

##### We save the Spark Dataframe as an Iceberg Table. Iceberg is a new open table format backed by Apple, Netflix and Cloudera. 
##### In the context of ML Ops, the most anticipated feature is Time Travel i.e. the ability to reproduce the data and the schema across different versions in time
##### Finally, we create a simple PySpark pipeline and train a classifier with Keras/Tensorflow

* For a more comprehensive demo of Iceberg in CML, please visit the [Spark3 Iceberg CML Github Repository](https://github.com/pdefusco/Spark3_Iceberg_CML)
* For a more detailed introduction to CML Session, Notebooks, and Spark tips and trips please visit the [CML Total Beginner GitHub Repository](https://github.com/pdefusco/CML-Total-Beginner)
* For a more comprehensive example of the Atlas Python client mentioned below, please visit the [Atlas Client Example Notebook in the Data Integration with ML GitHub Repository](https://github.com/pdefusco/Data_Integration_wMachineLearning/blob/main/2_A_Atlas_Client_Example.ipynb)

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from sklearn.datasets import make_circles
import tensorflow as tf
import pandas as pd
from helpers.plot_decision_boundary import *

#### The Spark Session is created with the following configurations. If you get an error, ensure your CML Session is using Runtimes and Spark 3.1.

In [2]:
spark = SparkSession.builder.master('local[*]')\
  .config("spark.jars.packages","org.apache.iceberg:iceberg-spark3-runtime:0.12.1")\
  .config("spark.sql.extensions","org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")\
  .config("spark.sql.catalog.spark_catalog","org.apache.iceberg.spark.SparkSessionCatalog")\
  .config("spark.sql.catalog.spark_catalog.type","hive")\
  .config("spark.hadoop.fs.s3a.s3guard.ddb.region","us-east-2")\
  .config("spark.yarn.access.hadoopFileSystems","s3a://gd01-uat2/")\
  .getOrCreate()

#### Just some fake data...

In [4]:
# Make 1000 examples
n_samples = 1000

# Create circles
X, y = make_circles(n_samples, 
                    noise=0.03, 
                    random_state=42)

circles = pd.DataFrame({"var1":X[:, 0], "var2":X[:, 1], "label":y})
circles.head()

Unnamed: 0,var1,var2,label
0,0.754246,0.231481,1
1,-0.756159,0.153259,1
2,-0.815392,0.173282,1
3,-0.393731,0.692883,1
4,0.442208,-0.896723,0


#### We can save the DataFrame as an Iceberg Table using Spark

In [5]:
# Creating a Spark Dataframe from the Pandas Dataframe
sparkDF=spark.createDataFrame(circles) 

In [12]:
# Saving the Spark Dataframe as an Iceberg table
spark.sql("CREATE TABLE IF NOT EXISTS ice_cml (var1 int, var2 int, label int) USING iceberg")

sparkDF.write.format("iceberg").mode("overwrite").save("default.ice_cml")

#### The table is automatically tracked by the Data Lake associated with the CML Workspace

#### To check that a new entry for the table has been added to Atlas in the Data Lake, go back to the CDP Homepage and open Data Catalog. 

#### Select the Data Lake (i.e. Cloud Environment) that your worskpace was built in. 

#### Use the Atlas Search bar at the top to browse for the table and click on it

#### Notice Atlas is tracking a lot of interesting Metadata including Table Attributes, Lineage, and a lot More. 

#### The Metadata can even be customized. [This notebook](https://github.com/pdefusco/Data_Integration_wMachineLearning/blob/main/2_A_Atlas_Client_Example.ipynb) shows how you can use the Atlas Python Client to build custom lineage flows.

In [None]:


# Visualize with a plot
import matplotlib.pyplot as plt
plt.scatter(X[:, 0], X[:, 1], c=y, cmap=plt.cm.RdYlBu);


# Split data into train and test sets
X_train, y_train = X[:800], y[:800] # 80% of the data for the training set
X_test, y_test = X[800:], y[800:] # 20% of the data for the test set

# Check the shapes of the data
X_train.shape, X_test.shape # 800 examples in the training set, 200 examples in the test set

# Set random seed
tf.random.set_seed(42)

# Create the model (same as model_7)
model = tf.keras.Sequential([
  tf.keras.layers.Dense(4, activation="relu"), # hidden layer 1, using "relu" for activation (same as tf.keras.activations.relu)
  tf.keras.layers.Dense(4, activation="relu"),
  tf.keras.layers.Dense(1, activation="sigmoid") # output layer, using 'sigmoid' for the output
])

# Compile the model
model.compile(loss=tf.keras.losses.binary_crossentropy,
                optimizer=tf.keras.optimizers.Adam(lr=0.01), # increase learning rate from 0.001 to 0.01 for faster learning
                metrics=['accuracy'])

# Fit the model
history = model.fit(X_train, y_train, epochs=25)

# Evaluate our model on the test set
loss, accuracy = model.evaluate(X_test, y_test)
print(f"Model loss on the test set: {loss}")
print(f"Model accuracy on the test set: {100*accuracy:.2f}%")

# Plot the decision boundaries for the training and test sets
plt.figure(figsize=(12, 6))
plt.subplot(1, 2, 1)
plt.title("Train")
plot_decision_boundary(model, X=X_train, y=y_train)
plt.subplot(1, 2, 2)
plt.title("Test")
plot_decision_boundary(model, X=X_test, y=y_test)
plt.show()

# You can access the information in the history variable using the .history attribute
pd.DataFrame(history.history)

# Plot the loss curves
pd.DataFrame(history.history).plot()
plt.title("Model training curves")


model.save('models/my_model.h5')