# Spark Machine Learning Pipeline (MMLSpark replaced with SynapseML)

In this exercise, we see more practical machine learning example using **Spark ML pipeline**.

### Here we create the model to predict the flight delay over 15 minutes (ARR_DEL15) using other attributes (airport code, career, weather conditions, etc).

Before starting,

- You must put [flight_weather.csv](https://1drv.ms/u/s!AuopXnMb-AqcgbZD7jEX6OTb4j8CTQ?e=KkeDdT).

In [0]:
#Databricks (mmlspark has been replaced with synapseML, follow below steps)
To install SynapseML on the Databricks cloud, create a new library from Maven coordinates in your workspace.
https://microsoft.github.io/SynapseML/docs/getting_started/installation/

## I used runtime 12 Beta and added coordinates for spark 3.2, that worked successfully.
For the coordinates use: com.microsoft.azure:synapseml_2.12:0.10.2 for Spark3.2 Cluster and com.microsoft.azure:synapseml_2.12:0.9.5-13-d1b51517-SNAPSHOT for Spark3.1 Cluster; Add the resolver: https://mmlspark.azureedge.net/maven. Ensure this library is attached to your target cluster(s).

Finally, ensure that your Spark cluster has at least Spark 3.12 and Scala 2.12.

You can use SynapseML in both your Scala and PySpark notebooks. To get started with our example notebooks import the following databricks archive:

In [0]:
# Generate classifier ## check it mmlspark is installed
# For the coordinates use: com.microsoft.ml.spark:mmlspark_2.11:1.0.0-rc3 with the resolver: https://mmlspark.azureedge.net/maven. Ensure this library is attached to your target cluster(s).

# Finally, ensure that your Spark cluster has at least Spark 2.4 and Scala 2.11.
#from mmlspark.lightgbm import LightGBMClassifier
from synapse.ml.lightgbm import LightGBMClassifier


In [0]:
# Read dataset from  /FileStore/tables/flight_weather.csv
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DoubleType, TimestampType
df = (sqlContext.read.format("csv").
  option("header", "true").
  option("nullValue", "NA").
  option("inferSchema", True).
  load("/FileStore/tables/flight.csv"))

See original data

In [0]:
# See data
display(df)

X.1,YEAR,MONTH,DAY_OF_MONTH,DAY_OF_WEEK,FL_DATE,UNIQUE_CARRIER,TAIL_NUM,FL_NUM,ORIGIN_AIRPORT_ID,ORIGIN,ORIGIN_STATE_ABR,DEST_AIRPORT_ID,DEST,DEST_STATE_ABR,CRS_DEP_TIME,DEP_TIME,DEP_DELAY,DEP_DELAY_NEW,DEP_DEL15,DEP_DELAY_GROUP,TAXI_OUT,WHEELS_OFF,WHEELS_ON,TAXI_IN,CRS_ARR_TIME,ARR_TIME,ARR_DELAY,ARR_DELAY_NEW,ARR_DEL15,ARR_DELAY_GROUP,CANCELLED,CANCELLATION_CODE,DIVERTED,CRS_ELAPSED_TIME,ACTUAL_ELAPSED_TIME,AIR_TIME,FLIGHTS,DISTANCE,DISTANCE_GROUP,CARRIER_DELAY,WEATHER_DELAY,NAS_DELAY,SECURITY_DELAY,LATE_AIRCRAFT_DELAY,X,VisibilityOrigin,DryBulbCelsiusOrigin,DewPointCelsiusOrigin,RelativeHumidityOrigin,WindSpeedOrigin,AltimeterOrigin,VisibilityDest,DryBulbCelsiusDest,DewPointCelsiusDest,RelativeHumidityDest,WindSpeedDest,AltimeterDest
1,2012,1,4,3,2012-01-04,AA,N320AA,1,12478,JFK,NY,12892,LAX,CA,9,904.0,4.0,4.0,0.0,0.0,18.0,922.0,1131.0,20.0,12,1151.0,-34.0,0.0,0.0,-2.0,0,,0,385,347.0,309.0,1,2475,10,,,,,,,10.0,-3.9,-19.4,29.0,7.0,30.1,10.0,17.2,10.6,65.0,3.0,30.18
2,2012,1,20,5,2012-01-20,AA,N327AA,1,12478,JFK,NY,12892,LAX,CA,9,858.0,-2.0,0.0,0.0,-1.0,21.0,919.0,1212.0,15.0,12,1227.0,2.0,2.0,0.0,0.0,0,,0,385,389.0,353.0,1,2475,10,,,,,,,10.0,0.0,-16.7,28.0,15.0,30.21,10.0,13.9,10.0,78.0,6.0,30.09
3,2012,1,5,4,2012-01-05,AA,N329AA,2,12892,LAX,CA,12478,JFK,NY,9,925.0,-5.0,0.0,0.0,-1.0,18.0,943.0,1741.0,13.0,18,1754.0,-6.0,0.0,0.0,-1.0,0,,0,330,329.0,298.0,1,2475,10,,,,,,,10.0,18.3,7.2,49.0,8.0,30.02,10.0,-0.6,-5.0,72.0,9.0,29.99
4,2012,1,21,6,2012-01-21,AA,N327AA,2,12892,LAX,CA,12478,JFK,NY,9,,,,,,,,,,18,,,,,,1,B,0,330,,,1,2475,10,,,,,,,10.0,15.6,6.7,56.0,18.0,29.99,10.0,-3.3,-8.3,69.0,13.0,30.28
5,2012,1,6,5,2012-01-06,AA,N336AA,3,12478,JFK,NY,12892,LAX,CA,12,1151.0,-9.0,0.0,0.0,-1.0,14.0,1205.0,1436.0,7.0,15,1443.0,-32.0,0.0,0.0,-2.0,0,,0,375,352.0,331.0,1,2475,10,,,,,,,10.0,8.9,0.0,54.0,13.0,29.78,6.0,13.3,11.7,90.0,0.0,29.99
6,2012,1,22,7,2012-01-22,AA,N323AA,3,12478,JFK,NY,12892,LAX,CA,12,1154.0,-6.0,0.0,0.0,-1.0,23.0,1217.0,1455.0,13.0,15,1508.0,-7.0,0.0,0.0,-1.0,0,,0,375,374.0,338.0,1,2475,10,,,,,,,10.0,0.0,-4.4,72.0,6.0,30.5,10.0,12.8,9.4,80.0,3.0,30.14
7,2012,1,7,6,2012-01-07,AA,N336AA,4,12892,LAX,CA,12478,JFK,NY,12,1223.0,8.0,8.0,0.0,0.0,15.0,1238.0,2018.0,3.0,20,2021.0,-24.0,0.0,0.0,-2.0,0,,0,330,298.0,280.0,1,2475,10,,,,,,,6.0,12.8,10.0,83.0,0.0,30.02,10.0,7.2,-4.4,44.0,15.0,30.01
8,2012,1,23,1,2012-01-23,AA,N336AA,4,12892,LAX,CA,12478,JFK,NY,12,1207.0,-8.0,0.0,0.0,-1.0,19.0,1226.0,2022.0,9.0,20,2031.0,-14.0,0.0,0.0,-1.0,0,,0,330,324.0,296.0,1,2475,10,,,,,,,10.0,12.2,9.4,83.0,0.0,29.98,6.0,8.15,7.9,98.0,8.0,29.91
9,2012,1,8,7,2012-01-08,AA,N357AA,5,11298,DFW,TX,12173,HNL,HI,13,1413.0,73.0,73.0,1.0,4.0,12.0,1425.0,1759.0,2.0,17,1801.0,26.0,26.0,1.0,1.0,0,,0,515,468.0,454.0,1,3784,11,26.0,0.0,0.0,0.0,0.0,,9.0,11.9,8.966666667,81.0,8.333333333,30.07666667,10.0,20.0,17.8,87.0,0.0,30.09
10,2012,1,24,2,2012-01-24,AA,N380AA,5,11298,DFW,TX,12173,HNL,HI,13,1256.0,-4.0,0.0,0.0,-1.0,16.0,1312.0,1638.0,3.0,17,1641.0,-54.0,0.0,0.0,-2.0,0,,0,515,465.0,446.0,1,3784,11,,,,,,,7.5,12.35,9.5,83.5,8.5,29.94,10.0,21.7,18.9,84.0,0.0,30.01


In [0]:
df["ARR_DEL15"]

Mark as "delayed over 15 minutes" if it's canceled.

In [0]:
# ARR_DEL15 = 1 if it's canceled.
from pyspark.sql.functions import when
df = df.withColumn("ARR_DEL15", when(df["CANCELLED"] == 1, 1).otherwise(df["ARR_DEL15"]))

Remove flights if it's diverted.

In [0]:
# Remove flights if it's diverted.
df = df.filter(df["DIVERTED"] == 0)

Narrow to required columns.

In [0]:
# Select required columns
df = df.select(
  "ARR_DEL15",
  "MONTH",
  "DAY_OF_WEEK",
  "UNIQUE_CARRIER",
  "ORIGIN",
  "DEST",
  "CRS_DEP_TIME",
  "CRS_ARR_TIME",
  "RelativeHumidityOrigin",
  "AltimeterOrigin",
  "DryBulbCelsiusOrigin",
  "WindSpeedOrigin",
  "VisibilityOrigin",
  "DewPointCelsiusOrigin",
  "RelativeHumidityDest",
  "AltimeterDest",
  "DryBulbCelsiusDest",
  "WindSpeedDest",
  "VisibilityDest",
  "DewPointCelsiusDest")

Drop rows with null value for all columns

In [0]:
# Drop rows with null value
df = df.dropna()

Split data into training data and evaluation data (ratio is 80% : 20%)

In [0]:
# Split data into train data and test data
(traindf, testdf) = df.randomSplit([0.8, 0.2])

Convert categorical values to index values (0, 1, ...) for carrier code (UNIQUE_CARRIER), airport code (ORIGIN, DEST), flag for delay over 15 minutes (ARR_DEL15).

In [0]:
# Convert categorical values to index values (0, 1, ...)
from pyspark.ml.feature import StringIndexer
uniqueCarrierIndexer = StringIndexer(inputCol="UNIQUE_CARRIER", outputCol="Indexed_UNIQUE_CARRIER").fit(df)
originIndexer = StringIndexer(inputCol="ORIGIN", outputCol="Indexed_ORIGIN").fit(df)
destIndexer = StringIndexer(inputCol="DEST", outputCol="Indexed_DEST").fit(df)
arrDel15Indexer = StringIndexer(inputCol="ARR_DEL15", outputCol="Indexed_ARR_DEL15").fit(df)

### In Spark machine learning, the feature columns must be wrapped as vector value.    
We create new vector column named "features".

VectorAssembler is a transformer that combines a given list of columns into a single vector column. It is useful for combining raw features and features generated by different feature transformers into a single feature vector, in order to train ML models like logistic regression and decision trees. VectorAssembler accepts the following input column types: all numeric types, boolean type, and vector type.

userFeatures is a vector column that contains three user features. We want to combine hour, mobile, and userFeatures into a single feature vector called features and use it to predict clicked or not. If we set VectorAssembler’s input columns to hour, mobile, and userFeatures and output column to features, after transformation we should get the following DataFrame:

 id | hour | mobile | userFeatures     | clicked | features
----|------|--------|------------------|---------|-----------------------------
 0  | 18   | 1.0    | [0.0, 10.0, 0.5] | 1.0     | [18.0, 1.0, 0.0, 10.0, 0.5]

In [0]:
# Assemble feature columns
# https://spark.apache.org/docs/latest/ml-features
# refer the above documentation for feature engineering
from pyspark.ml.feature import VectorAssembler
assembler = VectorAssembler(
  inputCols = [
    "MONTH",
    "DAY_OF_WEEK",
    "Indexed_UNIQUE_CARRIER",
    "Indexed_ORIGIN",
    "Indexed_DEST",
    "CRS_DEP_TIME",
    "CRS_ARR_TIME",
    "RelativeHumidityOrigin",
    "AltimeterOrigin",
    "DryBulbCelsiusOrigin",
    "WindSpeedOrigin",
    "VisibilityOrigin",
    "DewPointCelsiusOrigin",
    "RelativeHumidityDest",
    "AltimeterDest",
    "DryBulbCelsiusDest",
    "WindSpeedDest",
    "VisibilityDest",
    "DewPointCelsiusDest"],
  outputCol = "features")

##Microsoft Machine Learning for Apache Spark
MMLSpark is an ecosystem of tools aimed towards expanding the distributed computing framework Apache Spark in several new directions. MMLSpark adds many deep learning and data science tools to the Spark ecosystem, including seamless integration of Spark Machine Learning pipelines with Microsoft Cognitive Toolkit (CNTK), LightGBM and OpenCV. These tools enable powerful and highly-scalable predictive and analytical models for a variety of datasources.

## Light GBM classifier in MMLSpark.
Generate classifier. Here we use Light GBM classifier in MMLSpark.
LightGBM, short for Light Gradient Boosting Machine, is a free and open source distributed gradient boosting framework for machine learning originally developed by Microsoft. It is based on decision tree algorithms and used for ranking, classification and other machine learning tasks.
num_leaves. This is the main parameter to control the complexity of the tree model.

In [0]:
# Generate classifier
#from synapse.ml.lightgbm import LightGBMClassifier
classifier = LightGBMClassifier(
  featuresCol="features",
  labelCol="ARR_DEL15",
  learningRate=0.3,
  numIterations=15,
  numLeaves=100)

Generate SparkML pipeline and run training !    
Trained model (with coefficients) and pipeline are stored in "model".

 ML Pipelines provide a uniform set of high-level APIs built on top of DataFrames that help users create and tune practical machine learning pipelines. Refere to the Sparl ML pipeline documentation, https://spark.apache.org/docs/latest/ml-pipeline.html
 
 MLlib standardizes APIs for machine learning algorithms to make it easier to combine multiple algorithms into a single pipeline, or workflow.The pipeline concept is mostly inspired by the scikit-learn project.

DataFrame: This ML API uses DataFrame from Spark SQL as an ML dataset, which can hold a variety of data types. E.g., a DataFrame could have different columns storing text, feature vectors, true labels, and predictions.

Transformer: A Transformer is an algorithm which can transform one DataFrame into another DataFrame. E.g., an ML model is a Transformer which transforms a DataFrame with features into a DataFrame with predictions.

Estimator: An Estimator is an algorithm which can be fit on a DataFrame to produce a Transformer. E.g., a learning algorithm is an Estimator which trains on a DataFrame and produces a model.

Pipeline: A Pipeline chains multiple Transformers and Estimators together to specify an ML workflow.

Parameter: All Transformers and Estimators now share a common API for specifying parameters.

In [0]:
# Create pipeline and Train
from pyspark.ml import Pipeline
pipeline = Pipeline(stages=[uniqueCarrierIndexer, originIndexer, destIndexer, arrDel15Indexer, assembler, classifier])
model = pipeline.fit(traindf)

Predict with eveluation data

In [0]:
# Predict with eveluation data
pred = model.transform(testdf)

Show eveluation result. (I'm sorry, but it might not be good result in this example ...)

another good article with bunch of ML models implementations and evaluations using multiclassclassificationEvaluator, e.g., accuracy, F1-score etc. https://datascience-enthusiast.com/Python/PySpark_ML_with_Text_part1.html

In [0]:
# Evaluate results
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
evaluator = MulticlassClassificationEvaluator(labelCol="ARR_DEL15", predictionCol="prediction")
accuracy = evaluator.evaluate(pred)
print("Accuracy = %g" % accuracy)

Accuracy = 0.841838


Save (Export) pipeline model with trained coefficients.    
Saved pipeline mode can be loaded using Azure Machine Learning service for inference serving.

(Not required as i cannot access Azure serving engine due to subscirption limitations:-( 
.Before running, you must run and create mounted point (/mnt/testblob)**.

In [0]:
# Save pipeline on a serving engine, e.g. Microsoft Azure
#model.write().overwrite().save("/mnt/testblob/flight_model")
## i am saving local to dbfs
model.write().overwrite().save("/FileStore/tables/flight_model")

In [0]:
### ALL SET for TECHNICAL PROJECT