## PREDICTING FLIGHT DELAYS AT ARRIVAL

*Powered by the <span style="color:rgb(0,94,184)">Data Science Platform and Romain*</span> <img src="images/logo-dsp-100x100.png" style="vertical-align:middle" width="25" height="25" />

Level: <span style="color:green">Beginner</span>

Duration: *40 min*

## Introduction

The goal of this notebook is to to build, train and evaluate a Random Forest model in order to predict the flight delays at arrival of Southwest airline company (WN) at the Los Angeles international airport (LAX) in 2015. The purpose is not to obtain the best possible prediction but rather to emphasize on the various steps needed to build such a model.

The main goal of this notebook is to undestand all the principles of machine learning using a tool called Azure Machine Learning. We will go through every steps together to discover all the possibilities of Azure ML.

This notebook will cover the features of the Data science platform below:
* PySpark notebook
* Data access in MaprFS Raw Data Archive using spark
* Data preparation using SQL and spark-sql
* Data visualization
* Build a Random Forest model using spark-ml
* Train this random forest model using spark-ml
* Evaluate the model using spark-ml
* Using Azure ML
* Creating a component
* Creating an environment with Dockerfile
* Creating a Pipeline

<img src="images/global_free_trial_spark_delay.png" width="700" height="350" />

## 1. Data analysis

Before starting, we have to get a handle to our workspace

In [1]:
%pip install azure-ai-ml

Note: you may need to restart the kernel to use updated packages.


In [1]:
# Authentication package
from azure.identity import DefaultAzureCredential
import os
from azure.ai.ml import MLClient
credential = DefaultAzureCredential()

# Execute the script
%run setenv.py

# Get a handle to the workspace
ml_client = MLClient(
    credential=credential,
    subscription_id= os.environ['subscription_id'],
    resource_group_name= os.environ['resource_group'],
    workspace_name= os.environ['workspace_name']
)

* Checking Credentials

ml_client is lazy. So your credentials might be invalid. Run this cell to make sure your credentials are correct :

In [2]:
# Check if credentials are valid
from IPython.display import Image
from colorama import Fore

try :
    ml_client.begin_create_or_update(ml_client.workspaces.get())
    print(Fore.GREEN + "Credentials are valid")
except :
    print(Fore.RED + "Credentials are invalid - please check the TODO CELL")
    print("Please check your credentials : subscription_id, resource_group_name, workspace_name must be correct")
    display(Image(filename='images/credentials.PNG'))
    print("You can find your credentials by clicking on the TOP LEFT of the Azure Portal ML Studio")
    display(Image(filename='images/values.png'))


[32mCredentials are valid


## Create a compute resource to run your job

You'll need a compute resource for running a job. It can be single or multi-node machines with Linux or Windows OS, or a specific compute fabric like Spark.

You'll provision a Linux compute cluster. See the [full list on VM sizes and prices](https://azure.microsoft.com/pricing/details/machine-learning/) .

For this example, you only need a basic cluster, so you'll use a Standard_DS11_v2 model and create an Azure ML Compute.

In [3]:
from azure.ai.ml.entities import AmlCompute

# Name assigned to the compute cluster
cpu_compute_target = "cpu-cluster-flights"

try:
    # let's see if the compute target already exists
    cpu_cluster = ml_client.compute.get(cpu_compute_target)
    print(
        f"You already have a cluster named {cpu_compute_target}, we'll reuse it as is."
    )

except Exception:
    print("Creating a new cpu compute target...")

    # Let's create the Azure ML compute object with the intended parameters
    cpu_cluster = AmlCompute(
        name=cpu_compute_target,
        # Azure ML Compute is the on-demand VM service
        type="amlcompute",
        # VM Family
        size="STANDARD_DS11_V2",
        # Minimum running nodes when there is no job running
        min_instances=0,
        # Nodes in cluster
        max_instances=4,
        # How many seconds will the node running after the job termination
        idle_time_before_scale_down=180,
        # Dedicated or LowPriority. The latter is cheaper but there is a chance of job termination
        tier="Dedicated",
    )

    # Now, we pass the object to MLClient's create_or_update method
    cpu_cluster = ml_client.compute.begin_create_or_update(cpu_cluster)
    cpu_cluster = ml_client.compute.get(cpu_compute_target)
    

print(
    f"AMLCompute with name {cpu_cluster.name} is created, the compute size is {cpu_cluster.size}"
)

You already have a cluster named cpu-cluster-flights, we'll reuse it as is.
AMLCompute with name cpu-cluster-flights is created, the compute size is STANDARD_DS11_V2


### 1.1 Create Environnment for our execution/job

To run your AzureML job on your compute resource, you'll need an environment. An environment lists the software runtime and libraries that you want installed on the compute where you’ll be training. It's similar to your python environment on your local machine.

AzureML provides many curated or ready-made environments, which are useful for common training and inference scenarios. You can also create your own custom environments using a docker image, or a conda configuration.

In this example, you'll create a custom conda environment for your jobs, using a conda yaml file.

First, create a directory to store the file in.

In [4]:
import os

env_dir = "./env"
os.makedirs(env_dir, exist_ok=True)

Now, we create the Dockerfile where our pipeline will run

In [5]:
%%writefile {env_dir}/Dockerfile
FROM mcr.microsoft.com/azureml/openmpi4.1.0-ubuntu20.04:20220902.v1

ENV AZUREML_CONDA_ENVIRONMENT_PATH /azureml-envs/responsibleai-0.21

# Install wkhtmltopdf for pdf rendering from html
RUN apt-get -y update && apt-get -y install wkhtmltopdf

# Create conda environment
RUN conda create -p $AZUREML_CONDA_ENVIRONMENT_PATH \
    python=3.8 pip=21.3.1 -c anaconda -c conda-forge

# Prepend path to AzureML conda environment
ENV PATH $AZUREML_CONDA_ENVIRONMENT_PATH/bin:$PATH

# Install pip dependencies
# markupsafe and itsdangerous are bug workarounds
# install azureml-defaults==1.47.0
    # inference-schema[numpy-support]==1.5
    # joblib==1.0.1
RUN pip install 'responsibleai~=0.21.0' \
                'raiwidgets~=0.21.0' \
                'pyarrow' \
                'markupsafe<=2.0.1' \
                'itsdangerous==2.0.1' \
                'mlflow==1.30.0' \
                'scikit-learn<1.1' \
                'pdfkit==1.0.0' \
                'plotly==5.6.0' \
                'kaleido==0.2.1' \
                'azureml-core==1.47.0' \
                'azureml-dataset-runtime==1.47.0' \
                'azureml-mlflow==1.47.0' \
                'azureml-telemetry==1.47.0'\
                'seaborn'\
                'matplotlib'\
                'pyspark>=3.1,<3.2'\
                'azureml-defaults==1.47.0'\
                'inference-schema[numpy-support]==1.5'\
                'joblib==1.0.1'
                
                    

RUN pip install --pre 'azure-ai-ml'

# no-deps install for domonic due to unresolvable dependencies requirment on urllib3 and requests.
# score card rendering is using domonic only for the html elements composer which does not involve requests or urllib3
RUN pip install --no-deps 'charset-normalizer==2.0.12' \
                          'cssselect==1.1.0' \
                          'elementpath==2.5.0' \
                          'html5lib==1.1' \
                          'webencodings==0.5.1' \
                          'domonic==0.9.10'

# This is needed for mpi to locate libpython
ENV LD_LIBRARY_PATH $AZUREML_CONDA_ENVIRONMENT_PATH/lib:$LD_LIBRARY_PATH

# This is needed for pyspark to locate Java
RUN apt-get update && \
    mkdir -p /usr/share/man/man1 && \
    apt-get install -y openjdk-8-jdk && \
    apt-get install -y ant && \
    apt-get clean && \
    rm -rf /var/lib/apt/lists/ && \
    rm -rf /var/cache/oracle-jdk8-installer;
    
ENV JAVA_HOME /usr/lib/jvm/java-8-openjdk-amd64/
RUN export JAVA_HOME


Writing ./env/Dockerfile


Now we use a job to register our environment into our workspace

In [6]:
from azure.ai.ml.entities import Environment
from azure.ai.ml.entities import BuildContext
import os

custom_env_name = "flight-delays-custom-env"

buildcontext = BuildContext(
    path=env_dir
)

pipeline_job_env = Environment(
    name=custom_env_name,
    description="Custom environment for spark flight delays",
    tags={"owner": os.environ["owner"], "created": "2022-11-23"},
    build=buildcontext,
)
pipeline_job_env = ml_client.environments.create_or_update(pipeline_job_env)

print(
    f"Environment with name {pipeline_job_env.name} is registered to workspace, the environment version is {pipeline_job_env.version}"
)

[32mUploading env (0.0 MBs):   0%|          | 0/2484 [00:00<?, ?it/s][32mUploading env (0.0 MBs): 100%|██████████| 2484/2484 [00:00<00:00, 246531.11it/s]
[39m



Environment with name flight-delays-custom-env is registered to workspace, the environment version is 1


# 1.2 Writing our component
First of all, we are creating a yml file. This file will be a description of our azure ml component. It explains how this component works.

In [7]:
import os

component_dir = "./components"
os.makedirs(component_dir, exist_ok=True)

analysis_src_dir = "./components/src"
os.makedirs(analysis_src_dir, exist_ok=True)

In [8]:
%%writefile $component_dir/flight_delays_analysis.yml
$schema: https://azuremlschemas.azureedge.net/latest/commandComponent.schema.json
name: analysis
display_name: analysis
version: 30
type: command
inputs:
  public_data_flight_delays: 
    type: uri_file
outputs:
  eval_output:
    type: uri_folder
code: ./src
environment: azureml:flight-delays-custom-env@latest
command: >-
  python analysis.py
  --public_data_flight_delays ${{inputs.public_data_flight_delays}} 
  --eval_output ${{outputs.eval_output}}

Writing ./components/flight_delays_analysis.yml


Now we need to actually create the code that will be executed by our component

In [8]:
%%writefile {analysis_src_dir}/analysis.py
import os
import argparse
import pandas as pd
import mlflow
import datetime, warnings, scipy 
import pandas as pd
import numpy as np
import seaborn as sns
import matplotlib as mpl
import matplotlib.pyplot as plt
import matplotlib.patches as mpatches
from matplotlib.patches import ConnectionPatch
from collections import OrderedDict
from matplotlib.gridspec import GridSpec

import pyspark.sql.functions as F
from pyspark.sql.types import *
from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession
from pathlib import Path

sc = SparkContext('local')
spark = SparkSession(sc)

def main():
    """Main function of the script."""

    # input and output arguments
    parser = argparse.ArgumentParser()
    parser.add_argument("--public_data_flight_delays", type=str, help="path to input data")
    parser.add_argument("--eval_output", type=str, help="path to eval output")
    args = parser.parse_args()
   
    # Start Logging
    mlflow.start_run()

    ###################
    #<load the data>
    ###################
    print(" ".join(f"{k}={v}" for k, v in vars(args).items()))

    print("input data:", args.public_data_flight_delays)

    args = parser.parse_args()

    flights = spark.read.option("inferschema", "true").csv(os.path.join(args.public_data_flight_delays, "flights.csv"), header=True)
    airlines_names = spark.read.option("inferschema", "true").csv(os.path.join(args.public_data_flight_delays,"airlines.csv"), header=True).toPandas()
    
    ####################
    #</load the data>
    ####################

    ####################
    #<Metrics rows and columns>
    ####################
    mlflow.log_metric("nb_rows", flights.count())
    mlflow.log_metric("nb_columns", len(flights.columns))

    ####################
    #</Metrics rows and columns>
    ####################

    ####################
    #<sample csv load>
    ####################
    tab_info=pd.DataFrame(flights.dtypes).T.rename(index={0:'column type'})

    sample = flights.limit(10).toPandas()

    sample.to_csv("sample.csv")

    mlflow.log_artifact("sample.csv")

    flights.createOrReplaceTempView("flights")

    airline_delays = spark.sql( \
        "SELECT AIRLINE, MIN(ARRIVAL_DELAY) as min, MAX(ARRIVAL_DELAY) as max, AVG(ARRIVAL_DELAY) as avg, count(*) as count \
        from flights WHERE MONTH == 1 AND ARRIVAL_DELAY is not null \
        group by AIRLINE order by count")

    tab_pandas = airline_delays.toPandas()
    
    tab_pandas.to_csv("airline_delays.csv")

    mlflow.log_artifact("airline_delays.csv")

    ####################
    #</sample csv load>
    ####################


    ####################
    #<Graph Visualisation>
    ####################
    # 1.6 Flight & Mean Flight Delay distribution per Airline

    abbr_companies = airlines_names.set_index('IATA_CODE')['AIRLINE'].to_dict()
    global_stats = airline_delays.toPandas()
    font = {'family' : 'normal', 'weight' : 'bold', 'size'   : 15}
    mpl.rc('font', **font)
    #__________________________________________________________________
    colors = ['royalblue', 'grey', 'wheat', 'c', 'firebrick', 'seagreen', 'lightskyblue',
            'lightcoral', 'yellowgreen', 'gold', 'tomato', 'violet', 'aquamarine', 'chartreuse']
    #___________________________________
    fig = plt.figure(1, figsize=(16,15))
    gs=GridSpec(2,2)             
    ax1=fig.add_subplot(gs[0,0]) 
    ax2=fig.add_subplot(gs[0,1])  
    #------------------------------
    # Pie chart nº1: nb of flights
    #------------------------------
    labels = [s for s in  global_stats.index]
    sizes  = global_stats['count'].values
    explode = [0.3 if sizes[i] < 20000 else 0.0 for i in range(len(abbr_companies))]
    patches, texts, autotexts = ax1.pie(sizes, explode = explode,
                                    labels=labels, colors = colors,  autopct='%1.0f%%',
                                    shadow=False, startangle=0)
    for i in range(len(abbr_companies)): 
        texts[i].set_fontsize(14)
    ax1.axis('equal')
    ax1.set_title('% of flights per company', bbox={'facecolor':'midnightblue', 'pad':5},
                color = 'w',fontsize=18)
    #_______________________________________________
    # I set the legend: abreviation -> airline name
    comp_handler = []
    i = 0
    for company in abbr_companies:
        comp_handler.append(mpatches.Patch(color=colors[i],
                label = abbr_companies[company]))
        i = i + 1
    ax1.legend(handles=comp_handler, bbox_to_anchor=(0.2, 0.9), 
            fontsize = 13, bbox_transform=plt.gcf().transFigure)
    #----------------------------------------
    # Pie chart nº2: mean delay at arrival
    #----------------------------------------
    sizes  = global_stats['avg'].values
    sizes  = [max(s,0) for s in sizes]
    explode = [0.0 if sizes[i] < 20000 else 0.01 for i in range(len(abbr_companies))]
    patches, texts, autotexts = ax2.pie(sizes, explode = explode, labels = labels,
                                    colors = colors, shadow=False, startangle=0,
                                    autopct = lambda p :  '{:.0f}'.format(p * sum(sizes) / 100))
    for i in range(len(abbr_companies)): 
        texts[i].set_fontsize(14)
    ax2.axis('equal')
    ax2.set_title('Mean delay at arrival', bbox={'facecolor':'midnightblue', 'pad':5},
                color='w', fontsize=18)
    #________________________
    plt.tight_layout(w_pad=3)
    plt.subplots()
    #________________________
    mlflow.log_figure(fig, "flight_delay_per_airline.png")

    ####################
    #</Graph Visualisation>
    ####################

    # 1.7. How airport impacts delays - Mean delays at arrival for each airport for Southwest airline

    # Reset plot
    plt.clf()

    airline = "'WN'" # South West

    airports_delays = spark.sql( \
    "SELECT DESTINATION_AIRPORT, MIN(ARRIVAL_DELAY) as min, MAX(ARRIVAL_DELAY) as max, AVG(ARRIVAL_DELAY) as avg, count(*) as count \
    from flights WHERE MONTH == 1 AND AIRLINE == " + airline + " AND ARRIVAL_DELAY is not null \
    group by DESTINATION_AIRPORT order by DESTINATION_AIRPORT")

    airports_delays.limit(10).toPandas().to_csv("airports_delays.csv")
    mlflow.log_artifact("airports_delays.csv")
    

    # Output of our component
    eval_msg = f"Eval done\n"
    (Path(args.eval_output) / "eval_result.txt").write_text(eval_msg)
   
    # Stop Logging
    mlflow.end_run()

if __name__ == "__main__":
    main()

Writing ./components/src/analysis.py


# 1.3 Creating our component with a command
* Open a terminal

<img src="images/openTerminal.PNG" width="800" height="600" />

* Go into the right folder
```
$ cd flightdelaytuto/tutoAzureML/
```
* Execute the setenv.sh script that will setup variables environment
```
$ source setenv.sh
```
* Go into the components folder 
```
$ cd components
```
* Login Azure with this command
```
$ az login --tenant $tenant_id
```
* After you successfuly login, set the right subscription ID you are currently using in Microsoft Azure Machine learning Studio.
```
$ az account set --subscription $subscription_id
```
* Set your workspace and resource group
```
$ az configure --defaults workspace=$workspace_name group=$resource_group
```
* You are now perfectly set up and can create a component with this command with the .yml file that we created earlier :
```
$ az ml component create --file flight_delays_analysis.yml
```
* It should display a JSON with informations of the component you just created

# 1.4 Creating a pipeline with the designer tool

Open a new window of Azure ML Studio

Go to Pipelines and create a new pipeline drafts

Select Custom using custom components

You should now see the designer tool

Now select in Data, the dataset-delays-flights and drag it into the pipeline model

Select Component and drop analysis component into the pipeline model

Now drag the Data output port into public_flight_delays input port

Rename the pipeline with the name : **Analysis Delay Flights**

Go to settings and Select a Compute Cluster type and select : **cpu-cluster-flights** that we create earlier

Your screen should look like this :

<img src="images/designer_tool.PNG" width="800" height="390" />

It is time to press Submit (Create a new experiment called ExperimentDelayFlight) to launch this pipeline for the first time

**Note that a .yml file can completely replace the design tool**

In the nav bar : Assets : Jobs on the left you should see your job running (30 seconds after the submit)

Click on the Job and wait for the Job to finish running (It can take 5 up to 15 minutes to run ,installing environment on a node...)

While waiting, you can start creating the second component in part 2

When it is complete, click on the analysis component
Pipeline job overview is where you should see your pipeline running

<img src="images/overview.PNG" width="700" height="600" />

**Important**
Whenever the job is complete, you can see a lot of informations in the component analysis that has finished.
On the top nav bar :

-Output + Logs = when you see the csv that has been generate by the pipeline and the output file

-Metrics : Values that have been registered such as nb of rows
* 5,819,079 rows
* 31 columns

-Images : Images that have been loaded during the execution

## 2 Model training: Single airport LAX, Single airline WN

The previsous sections dealt with an exploration of the dataset.  
From here, we start with the modeling of flight delays. 

**Motivations**

There is a high variability in average delays, both between the different airports but also between the different airlines. So, it is necessary to learn model that is specific to an airline the destination airport.

We will learn model that predicts the flight delays at the destination airport at a given time of arrival (on January). We will work on Southwest Airlines (WN) flights arriving at the airport of Los Angeles International Airport (LAX). We will use the 3 first weeks of January as the training set and the follwoing week of January as the test set.

# 2.1 Training Component

Now we create our second component which will create a model and train the model

In [10]:
%%writefile $component_dir/flight_delays_training.yml
$schema: https://azuremlschemas.azureedge.net/latest/commandComponent.schema.json
name: train_delay_model
display_name: train_delay_model
version: 11
type: command
inputs:
  public_data_flight_delays: 
    type: uri_file
  num_trees:
    type: integer
    default: 100
outputs:
  eval_output:
    type: uri_folder
code: ./src
environment: azureml:flight-delays-custom-env@latest
command: >-
  python train.py
  --public_data_flight_delays ${{inputs.public_data_flight_delays}} 
  --num_trees ${{inputs.num_trees}}
  --eval_output ${{outputs.eval_output}}

Writing ./components/flight_delays_training.yml


Here is the code of our component

In [12]:
%%writefile {analysis_src_dir}/train.py
import os
import argparse
import pandas as pd
import mlflow
import datetime, warnings, scipy 
import pandas as pd
import numpy as np
import seaborn as sns
import matplotlib as mpl
import matplotlib.pyplot as plt
import matplotlib.patches as mpatches
from matplotlib.patches import ConnectionPatch
from collections import OrderedDict
from matplotlib.gridspec import GridSpec
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.linalg import Vectors
import pyspark.sql.functions as F
from pyspark.sql.types import *
from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession
from pathlib import Path
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.evaluation import RegressionEvaluator

sc = SparkContext('local')
spark = SparkSession(sc)


def format_date(year, month, day, scheduled):
    if scheduled == 2400: 
        scheduled = 0
    scheduled = "{0:04d}".format(int(scheduled))
    return datetime.datetime(year, month, day, int(scheduled[0:2]), int(scheduled[2:4])).strftime("%Y-%m-%d %H:%M:%S")

def format_hour(scheduled):
    if scheduled == 2400: 
        scheduled = 0
    scheduled = "{0:04d}".format(int(scheduled))
    return int(scheduled[0:2])
    
def format_seconds(scheduled):
    if scheduled == 2400: 
        scheduled = 0
    scheduled = "{0:04d}".format(int(scheduled))
    return (3600 * int(scheduled[0:2])) + (60 * int(scheduled[2:4]))


def main():
    """Main function of the script."""

    # input and output arguments
    parser = argparse.ArgumentParser()
    parser.add_argument("--public_data_flight_delays", type=str, help="path to input data")
    parser.add_argument("--num_trees", type=int, required=False, default=100)
    parser.add_argument("--eval_output", type=str, help="path to eval output")
    args = parser.parse_args()
   
    # Start Logging
    mlflow.start_run()

    ###################
    #<load the data>
    ###################
    print(" ".join(f"{k}={v}" for k, v in vars(args).items()))

    print("input data:", args.public_data_flight_delays)

    args = parser.parse_args()

    flights = spark.read.option("inferschema", "true").csv(os.path.join(args.public_data_flight_delays, "flights.csv"), header=True)
    airlines_names = spark.read.option("inferschema", "true").csv(os.path.join(args.public_data_flight_delays,"airlines.csv"), header=True).toPandas()
    
    ####################
    #</load the data>
    ####################
    udf_format_date = F.udf(format_date, StringType())
    udf_format_hour = F.udf(format_hour, IntegerType())
    udf_format_seconds = F.udf(format_seconds, IntegerType())
    flights = flights \
        .withColumn('SCHEDULED_DEPARTURE_FORMATTED',udf_format_date(flights.YEAR, flights.MONTH, flights.DAY, flights.SCHEDULED_DEPARTURE)) \
        .withColumn('SCHEDULED_ARRIVAL_FORMATTED',udf_format_date(flights.YEAR, flights.MONTH, flights.DAY, flights.SCHEDULED_ARRIVAL)) \
        .withColumn('SCHEDULED_ARRIVAL_hour',udf_format_hour(flights.SCHEDULED_ARRIVAL)) \
        .withColumn('SCHEDULED_ARRIVAL_sec',udf_format_seconds(flights.SCHEDULED_ARRIVAL))

    flights.limit(10).toPandas().to_csv("flights.csv")
    mlflow.log_artifact("flights.csv")

    airline = "'WN'"
    airport = "'LAX'"

    flights.createOrReplaceTempView("flights")

    df_fi = spark.sql( \
        "SELECT DAY_OF_WEEK, SCHEDULED_ARRIVAL_sec, DAY, ARRIVAL_DELAY as label \
        from flights WHERE AIRLINE == " + airline + " AND DESTINATION_AIRPORT == " + airport +  " AND ARRIVAL_DELAY is not null AND MONTH == 1")

    df_fi.limit(10).toPandas().to_csv("df_fi.csv")
    mlflow.log_artifact("df_fi.csv")

    df_train = df_fi.filter(F.col('DAY') < 23).drop('DAY')
    df_test = df_fi.filter(F.col('DAY') > 23).drop('DAY')
    print("training dataset size: " + str(df_train.count()))
    print("test dataset size: " + str(df_test.count()))
    df_train.limit(10).toPandas().to_csv("df_train.csv")
    mlflow.log_artifact("df_train.csv")

    assembler = VectorAssembler(
    inputCols=["DAY_OF_WEEK", "SCHEDULED_ARRIVAL_sec"], outputCol="features"
    )

    X_train = assembler.transform(df_train)
    X_train.limit(10).toPandas().to_csv("X_train.csv")
    mlflow.log_artifact("X_train.csv")

    # 2.2.1 Build, Train Model and visualize feature importances
    # Train a RandomForest model
    rf = RandomForestRegressor(numTrees=args.num_trees, featuresCol='features',labelCol='label',predictionCol='prediction')
    rfModel = rf.fit(X_train)

    importances = rfModel.featureImportances

    x_values = list(range(len(importances)))

    # Visualize the feature importances
    plt.bar(x_values, importances, orientation = 'vertical')
    plt.xticks(x_values, ["DAY_OF_WEEK", "SCHEDULED_ARRIVAL_sec"], rotation=40)
    plt.ylabel('Importance')
    plt.xlabel('Feature')
    plt.title('Feature Importances')

    plt.savefig("feature_importances.png")
    mlflow.log_artifact("feature_importances.png")

    # 2.3 Model evaluation
    X_test = assembler.transform(df_test)
    predictions = rfModel.transform(X_test)
    predictions.select("prediction", "label", "features").limit(10).toPandas().to_csv("predictions.csv")
    mlflow.log_artifact("predictions.csv")

    evaluator = RegressionEvaluator(
    labelCol="label", predictionCol="prediction", metricName="mse")
    mse = evaluator.evaluate(predictions)
    print("Mean Squared Error (MSE) on test data = %g" % mse)

    mlflow.log_metric("mse", mse)

    # 2.4 Model registry

    # get the current run id
    run_id = mlflow.active_run().info.run_id

    # Save the model
    mlflow.spark.log_model(rfModel, "model_delays_flight")
    mlflow.spark.save_model(rfModel, "model_delays_flight")
    
    mlflow.register_model("runs:/" + run_id + "/model_delays_flight", "model_delays_flight")


    # Output of our component
    eval_msg = f"Eval done\n"
    (Path(args.eval_output) / "eval_result.txt").write_text(eval_msg)
   
    # Stop Logging
    mlflow.end_run()

if __name__ == "__main__":
    main()

Overwriting ./components/src/train.py


# 2.2 Creating an other component with a command
* Open a terminal

<img src="images/openTerminal.PNG" width="800" height="600" />

* Go into the right folder
```
$ cd flightdelaytuto/tutoAzureML/
```
* Execute the setenv.sh script that will setup variables environment
```
$ source setenv.sh
```
* Go into the components folder 
```
$ cd components
```
Now we should already been logged. If there is any problem go to the login instructions above
```
$ az ml component create --file flight_delays_training.yml
```
* It should display a JSON with informations of the component you just created

# 2.3 Updating our pipeline with the designer tool

* Open a new window of Azure ML Studio

* Go to Pipelines on the left nav bar

* Select Pipeline Drafts and modify the pipeline that you created earlier (Click on it)

* You should now see the designer tool

* Select Component and drop train_delay_model component into the pipeline model

* Now drag the Data output port into public_flight_delays input port from train_delay_model component

* Rename the pipeline with the name : **Analysis and Train Delay Flights**

* Click on the train_delay_model component, you should be able to see a parameters called num_trees which is used by our component when it trains our model with randomForest algorithm


* It is time to press Submit to launch this pipeline and select the same experiment

* Now you can see that analysis composent finished instantly because all the steps have been reused fron the precedent execution

(It can take 5 up to 15 minutes to run)

**Note that a .yml file can completely replace the design tool**

**Important**
Whenever the job is complete, you can see a lot of informations in the component analysis/train_delay_model that has finished :

-Output + Logs = when you see the csv that has been generate by the pipeline and the output file

-Metrics : Values that have been registered such as nb of rows
* 5,819,079 rows
* 31 columns

-Images : Images that have been loaded during the execution

# 2. Create endpoint and deployment

Online endpoints are endpoints that are used for online (real-time) inferencing
Our goal here is to deploy our model so everyone can use this model to get a prediction.
Endpoint is a server that runs into Azure CPU or GPU. You can request a prediction with REST API when it deploys

## 2.1 Create endpoint

First we create an empty endpoint. This is like an empty shell

In [1]:
# Creating a local endpoint
import datetime
import os
from azure.ai.ml.entities import (
    ManagedOnlineEndpoint,
    ManagedOnlineDeployment,
    Model,
    Environment,
    CodeConfiguration,
)

online_endpoint_name = "EndptDFlt" + datetime.datetime.now().strftime("%Y%m%d%H%M%S")

print(online_endpoint_name)

# create an online endpoint
endpoint = ManagedOnlineEndpoint(
    name=online_endpoint_name,
    description="this is an online endpoint",
    auth_mode="key",
    tags={"owner": os.environ["owner"]},
)

EndpointDelayFlight20221215125942


KeyError: 'owner'

Let's create this endpoint into our workspace

In [18]:
ml_client.online_endpoints.begin_create_or_update(endpoint)

<azure.core.polling._poller.LROPoller at 0x7f5a76de5310>

Now let's make a new directory for our script that will get the data, predict and return the prediction

In [19]:
import os

score_dir = "./score"
os.makedirs(score_dir, exist_ok=True)

Finally we put our score.py into our directory.
This score.py always have an init function that will be call only once when we deploy this script
Beside this function, we also have a run that is called everytime a prediction has to be made
Here we can see that init just loads the model and run gather the data, use the model to predict and returns the prediction. This is exactly what we expected from en endpoint deployment

In [20]:
%%writefile {score_dir}/score.py
import os
import logging
import json
import numpy
import joblib
import mlflow
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.linalg import Vectors
import pandas as pd
import pyspark.sql.functions as F
from pyspark.sql.types import *
from pyspark.sql import functions as F
from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession
from pyspark.ml.regression import RandomForestRegressor
from pyspark import SparkConf, SparkContext



def init():
    """
    This function is called when the container is initialized/started, typically after create/update of the deployment.
    You can write the logic here to perform init operations like caching the model in memory
    """
    global model
    global spark
    # AZUREML_MODEL_DIR is an environment variable created during deployment.
    # It is the path to the model folder (./azureml-models/$MODEL_NAME/$VERSION)
    model_path = os.path.join(
        os.getenv("AZUREML_MODEL_DIR"), "model_delays_flight"
    )
    # deserialize the model file back into a sklearn model
    model = mlflow.pyfunc.load_model(model_path)
    logging.info("Init complete")
    conf = SparkConf().setAppName("appName").setMaster("local")
    sc = SparkContext.getOrCreate(conf=conf)
    spark = SparkSession(sc)


def run(raw_data):
    """
    This function is called for every invocation of the endpoint to perform the actual scoring/prediction.
    In the example we extract the data from the json input and call the model's predict()
    method and return the result back

    :param raw_data: The input data in json format
    :return: The prediction result in json format

    the format of the input data is:

    {
        "data": [
            {
                "DAY_OF_WEEK": 1,
                "SCHEDULED_ARRIVAL_sec": 0
            },
            {
                "DAY_OF_WEEK": 2,
                "SCHEDULED_ARRIVAL_sec": 1
            }
        ]
    }
    """
    logging.info("Request received")
    data = json.loads(raw_data)["data"]
    df = spark.createDataFrame(data)
    # transform the data into dataframe
    assembler = VectorAssembler(
    inputCols=["DAY_OF_WEEK", "SCHEDULED_ARRIVAL_sec"], outputCol="features")

    X_test = assembler.transform(df)
    print(X_test.limit(10).toPandas())
    # make prediction
    rfModel = model._model_impl.spark_model
    
    result = rfModel.transform(X_test)

    logging.info("Request processed")
    
    # transform the result into list
    return result.select("prediction").toPandas().to_json(orient="records")

Writing ./score/score.py


* We create an object that represent our online deployment

In [27]:
from azureml.core import Workspace, Environment, Model

blue_deployment = ManagedOnlineDeployment(
    name="blue",
    endpoint_name=online_endpoint_name,
    model="model_delays_flight@latest",
    environment="flight-delays-custom-env@latest",
    code_configuration=CodeConfiguration(
        code="score/", scoring_script="score.py"
    ),
    instance_type="Standard_DS2_v2",
    instance_count=1,
)

* And now we deploy into our endpoint (Our endpoint has to be fully created to get a deployment)

In [28]:
import time
# Check if the endpoint is ready
while True:
    if ml_client.online_endpoints.get(online_endpoint_name).provisioning_state == "Succeeded":
        print("Endpoint is ready")
        ml_client.online_deployments.begin_create_or_update(blue_deployment)
        break
    else:
        print("Endpoint is not ready, waiting...")
        time.sleep(10)

Check: endpoint EndpointDelayFlight25 exists
[32mUploading score (0.0 MBs): 100%|██████████| 2878/2878 [00:00<00:00, 65881.16it/s]
[39m

data_collector is not a known attribute of class <class 'azure.ai.ml._restclient.v2022_02_01_preview.models._models_py3.ManagedOnlineDeployment'> and will be ignored


* To see if is completed, go on the nav bar on the left and select Endpoints

<img src="images/endpoint.PNG" width="850" height="420" />

* Select the endpoint that you created

* It should take 5 minutes to deploy blue wich is our deployment

* When it is done, select Test on the top nav bar

<img src="images/testEndpoint.PNG" width="700" height="420" />

* Now put this data into the test and press Test :
```
 {
        "data": [
            {
                "DAY_OF_WEEK": 6,
                "SCHEDULED_ARRIVAL_sec": 39000
            },
            {
                "DAY_OF_WEEK": 2,
                "SCHEDULED_ARRIVAL_sec": 38000
            }
        ]
} 
```

# 3. Congratulations

You've used the Data Science Platform for an exploratory usecase where you analyse a data set and build a machine learning model to predict flight delays at arrival.

**You are now**

  ✔️ AzureML Grand Master

**IMPORTANT =>**

⚠️ Don't forget to shutdown/delete the compute that you created (compute instance + compute cluster [Compute on the left nav bar] + endpoint)