For the batch scoring, we will persist the values in a new global persistent Databricks table. In production data workloads, you may save the scored data to Blob Storage, Azure Cosmos DB, or other serving layer. Another implementation detail we are skipping for the lab is processing only new files. This can be accomplished by creating a widget in the notebook that accepts a path parameter that is passed in from Azure Data Factory.

In [0]:
from pyspark.ml import Pipeline, PipelineModel
from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler, Bucketizer
from pyspark.sql.functions import array, col, lit
from pyspark.sql.types import *

Replace STORAGE-ACCOUNT-NAME with the name of your storage account. You can find this in the Azure portal by locating the storage account that you created in the lab setup, within your resource group. The container name is set to the default used for this lab. If yours is different, update the containerName variable accordingly.

Define the schema for the CSV files

In [0]:
data_schema = StructType([
        StructField('OriginAirportCode',StringType()),
        StructField('Month', IntegerType()),
        StructField('DayofMonth', IntegerType()),
        StructField('CRSDepHour', IntegerType()),
        StructField('DayOfWeek', IntegerType()),
        StructField('Carrier', StringType()),
        StructField('DestAirportCode', StringType()),
        StructField('DepDel15', IntegerType()),
        StructField('WindSpeed', DoubleType()),
        StructField('SeaLevelPressure', DoubleType()),  
        StructField('HourlyPrecip', DoubleType())])

Create a new DataFrame from the CSV files, applying the schema

In [0]:

data_location = "/mnt/sparkcontainer/Triage"

dfDelays = spark.read.csv("{0}/FlightsAndWeather/*/*/FlightsAndWeather.csv".format(data_location),
                    schema=data_schema,
                    sep=",",
                    header=True)

Load the trained machine learning model you created earlier in the lab

In [0]:
# Load the saved pipeline model
model = PipelineModel.load("/flightDelayModel")

In [0]:
display(dfDelays)

Make a prediction against the loaded data set

In [0]:
# Make a prediction against the dataset
prediction = model.transform(dfDelays)

Save the scored data into a new global table called **scoredflights**

In [0]:
prediction.write.mode("overwrite").save("/mnt/sparkcontainer/Silver/scoredflights")

In [0]:
%sql
DROP TABLE IF EXISTS scoredflights;

CREATE TABLE scoredflights
USING DELTA LOCATION '/mnt/sparkcontainer/Silver/scoredflights'