# Creating a Classification Model

In this exercise, you will implement a classification model that uses features of a flight to predict whether or not it will be late.

## Import Spark SQL and Spark MLlib Libraries

First, import the libraries you will need to train the model:

In [None]:
from pyspark.sql.types import *
from pyspark.sql.functions import *

from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import VectorAssembler, StringIndexer, MinMaxScaler

filestore = "abfss://files@datalakeXXXXXXX.dfs.core.windows.net"

print("Libraries imported!")

### Load Source Data
The data for this exercise is provided as a CSV file containing details of flights that has already been cleaned up for modeling. The data includes specific characteristics (or *features*) for each flight, as well as a *label* column indicating whether or not the flight was late (a flight with an arrival delay of more than 25 minutes is considered *late*).

You will load this data into a dataframe and display it.

In [None]:
flightSchema = StructType([
  StructField("DayofMonth", IntegerType(), False),
  StructField("DayOfWeek", IntegerType(), False),
  StructField("Carrier", StringType(), False),
  StructField("OriginAirportID", IntegerType(), False),
  StructField("DestAirportID", IntegerType(), False),
  StructField("DepDelay", IntegerType(), False),
  StructField("ArrDelay", IntegerType(), False),
  StructField("Late", IntegerType(), False),
])

data = spark.read.csv(filestore + '/data/flights.csv', schema=flightSchema, header=True)
display(data.limit(20))

### Split the Data
It is common practice when building supervised machine learning models to split the source data, using some of it to train the model and reserving some to test the trained model. In this exercise, you will use 70% of the data for training, and reserve 30% for testing.

In [None]:
splits = data.randomSplit([0.7, 0.3])
train = splits[0]
test = splits[1]
train_rows = train.count()
test_rows = test.count()
print ("Training Rows:", train_rows, " Testing Rows:", test_rows)

### Prepare the Training Data

A predictive model often requires multiple stages of feature preparation. For example, it is common when using some algorithms to distingish between continuous features (which have a calculable numeric value) and categorical features (which are numeric representations of discrete categories). It is also common to *normalize* continuous numeric features to use a common scale - for example, by scaling all numbers to a proportional decimal value between 0 and 1 (strictly speaking, it only really makes sense to do this when you have multiple numeric columns - normalizing them all to similar scales prevents a feature with particularly large values from dominating the training of the model - in this case, we only have one non-categorical numeric feature; but we've included this so you can see how it's done!).

A pipeline consists of a series of *transformer* and *estimator* components that typically prepare a dataframe for
modeling and then train a predictive model. In this case, you will create a pipeline with the following components:
- A **StringIndexer** estimator for each categorical variable to generate numeric indexes for categorical features
- A **VectorAssembler** that creates a vector of continuous numeric features
- A **MinMaxScaler** that normalizes vector of numeric features
- A **VectorAssembler** that creates a vector of categorical and continuous features
- A **LogisticRegression** algorithm that trains a classification model.


In [None]:
## Need all types to be the same?

monthdayIndexer = StringIndexer(inputCol="DayofMonth", outputCol="DayofMonthIdx")
weekdayIndexer = StringIndexer(inputCol="DayOfWeek", outputCol="DayOfWeekIdx")
carrierIndexer = StringIndexer(inputCol="Carrier", outputCol="CarrierIdx")
originIndexer = StringIndexer(inputCol="OriginAirportID", outputCol="OriginAirportIdx")
destIndexer = StringIndexer(inputCol="DestAirportID", outputCol="DestAirportIdx")
numVect = VectorAssembler(inputCols = ["DepDelay"], outputCol="numFeatures")
minMax = MinMaxScaler(inputCol = numVect.getOutputCol(), outputCol="normNums")
featVect = VectorAssembler(inputCols=["DayofMonthIdx", "DayOfWeekIdx", "CarrierIdx", "OriginAirportIdx", "DestAirportIdx", "normNums"], outputCol="features")
lr = LogisticRegression(labelCol="Late", featuresCol="features")
pipeline = Pipeline(stages=[monthdayIndexer, weekdayIndexer, carrierIndexer, originIndexer, destIndexer, numVect, minMax, featVect, lr])
print ("Pipeline defined!")

### Train a Classification Model

The pipeline itself is an estimator, and so it has a **fit** method that you can call to run the pipeline on a specified dataframe. In this case, you will run the pipeline on the training data to train a model.

In [None]:
model = pipeline.fit(train)
print ("Model trained!")

### Test the Model
Now you're ready to use the **transform** method of the model to generate some predictions. You can use this approach to predict delay status for flights where the label is unknown; but in this case you are using the test data which includes a known true label value, so you can compare the predicted status to the actual status.

In [None]:
prediction = model.transform(test)
predicted = prediction.select("features", "probability", col("prediction").cast("Int"), col("Late").alias("trueLabel"))
display(predicted.limit(100))

Looking at the result, the **prediction** column contains the predicted value for the label, and the **trueLabel** column contains the actual known value from the testing data. The **probability** column shows the probability score for each class (0 or 1). It looks like there are a mix of correct and incorrect predictions, and the ones that are incorrect tend to have fairly close probabilities for each class. Later in this course you'll learn how to measure the accuracy of a model.

## Compute Confusion Matrix Metrics
Classifiers are typically evaluated by creating a *confusion matrix*, which indicates the number of:
- True Positives
- True Negatives
- False Positives
- False Negatives

From these core measures, other evaluation metrics such as *precision* and *recall* can be calculated.

In [None]:
tp = float(predicted.filter("prediction == 1.0 AND truelabel == 1").count())
fp = float(predicted.filter("prediction == 1.0 AND truelabel == 0").count())
tn = float(predicted.filter("prediction == 0.0 AND truelabel == 0").count())
fn = float(predicted.filter("prediction == 0.0 AND truelabel == 1").count())
metrics = spark.createDataFrame([
 ("TP", tp),
 ("FP", fp),
 ("TN", tn),
 ("FN", fn),
 ("Precision", tp / (tp + fp)),
 ("Recall", tp / (tp + fn))],["metric", "value"])
display(metrics)

## View the Raw Prediction and Probability
The prediction is based on a raw prediction score that describes a labelled point in a logistic function. This raw prediction is then converted to a predicted label of 0 or 1 based on a probability vector that indicates the confidence for each possible label value (in this case, 0 and 1). The value with the highest confidence is selected as the prediction.

In [None]:
prediction.select("rawPrediction", "probability", col("prediction").cast("int"), col("Late").alias("trueLabel")).show(100, truncate=False)

Note that the results include rows where the probability for 0 (the first value in the **probability** vector) is only slightly higher than the probability for 1 (the second value in the **probability** vector). The default *discrimination threshold* (the boundary that decides whether a probability is predicted as a 1 or a 0) is set to 0.5; so the prediction with the highest probability is always used, no matter how close to the threshold.

### Review the Area Under ROC
Another way to assess the performance of a classification model is to measure the area under a *received operator characteristic (ROC) curve* for the model. Spark MLlib includes a **BinaryClassificationEvaluator** class that you can use to compute this. A ROC curve plots the True Positive and False Positive rates for varying threshold values (the probability value over which a positive label is predicted). The area under this curve gives an overall indication of the model's predictive performance as a value between 0 and 1. A value under 0.5 means that a binary classification model (which predicts one of two possible labels) is no better at predicting the right class than a random 50/50 guess.

In [None]:
import matplotlib.pyplot as plt
from pyspark.ml.evaluation import BinaryClassificationEvaluator

# Plot the ROC curve
modelSummary = model.stages[-1].summary
plt.plot([0, 1], [0, 1], 'r--')
plt.plot(modelSummary.roc.select('FPR').collect(),
         modelSummary.roc.select('TPR').collect())
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
plt.show()

# Get the AUC
evaluator = BinaryClassificationEvaluator(labelCol="Late", rawPredictionCol="rawPrediction", metricName="areaUnderROC")
auc = evaluator.evaluate(prediction)
print ("AUC = ", auc)

In [None]:
from onnxmltools import convert_sparkml
from onnxconverter_common.data_types import *

initial_types = [('DayofMonth', StringTensorType(shape=[1, 1])),
                 ('DayOfWeek', StringTensorType(shape=[1, 1])),
                 ('Carrier', StringTensorType(shape=[1, 1])),
                 ('OriginAirportID', StringTensorType(shape=[1, 1])),
                 ('DestAirportID', StringTensorType(shape=[1, 1])),
                 ('DepDelay', FloatTensorType(shape=[1, 1]))]
onnx_model = convert_sparkml(model, 'Pyspark model', initial_types, target_opset = 7)

In [None]:
print(onnx_model)

In [None]:
import mlflow

modelpath = filestore + "/flight_model4/"
mlflow.onnx.save_model(onnx_model, modelpath)
print("Model saved to ", modelpath)

In [None]:
from pyspark.sql.functions import col, pandas_udf,udf,lit
import azure.synapse.ml.predict as pcontext
from mlflow.utils import model_utils
import azure.synapse.ml.predict.utils._logger as synapse_predict_logger

#Enable SynapseML predict
spark.conf.set("spark.synapse.ml.predict.enabled","true")

In [None]:
#Set input data path
DATA_FILE = "/data/new-flights.csv"

#Set ADLS URI, if trained model is uploaded in ADLS
MODEL_URI = modelpath

#Define model return type
RETURN_TYPES = "int" # for ex: int, float etc. PySpark data types are supported

#Define model runtime. This supports only mlflow
RUNTIME = "mlflow"

#Bind model within Spark session
mlflow_model = pcontext.bind_model(
    return_types=RETURN_TYPES, 
    runtime=RUNTIME, 
    model_alias="flight_model", #This alias will be used in PREDICT call to refer  this   model
    model_uri=MODEL_URI #In case of AML, it will be AML_MODEL_URI
    ).register()

In [None]:
#Read data from ADLS
df = spark.read \
.format("csv") \
.option("header", "true") \
.csv(DATA_FILE,
    inferSchema=True)
df.createOrReplaceTempView('new_flights')

In [None]:
#Call PREDICT using Spark SQL API

predictions = spark.sql(
                """
                    SELECT PREDICT("delay_model", DayofMonth, DayOfWeek, CarrierIdx, OriginAirportID, DestAirportID, DepDelay) AS predict 
                    FROM new_flights
                """
            ).show()

In [None]:
#Call PREDICT using user defined function (UDF)

df = df["DayofMonth", "DayOfWeek", "Carrier", "OriginAirportID", "DestAirportID", "DepDelay"] # for ex. df["empid","empname"]

df.withColumn("PREDICT",mlflow_model.udf(lit("late"),*df.columns)).show()

In [None]:
#Call PREDICT using Transformer API

columns = ["DayofMonth", "DayOfWeek", "Carrier", "OriginAirportID", "DestAirportID", "DepDelay"] # for ex. df["empid","empname"]

tranformer = mlflow_model.create_transformer().setInputCols(columns).setOutputCol("PREDICT")

tranformer.transform(df).show()

In [None]:
type(mlflow_model)