# Exercise 5: Machine Learning with Spark

This Notebook will introduce you to the Machine Learning tools available in Spark and will show how you can use them to train and evaluate a model based on the data that you are examining.

#### Step 1 - Accessing the data

For this lab, you will once again be working with the Food Inspection data collected by the City of Chicago.  Thus the steps that you will follow to access the data are effectively the same as they were in the previous exercise.

Run the code in the following cells.  This code will prepare the `inspectionDataframe` with the Food Inspection data that you will be learning during this Machine Learning exercise.

*(Note - this very first cell run might take a little extra time, as environment resources are being provisioned.)*

In [None]:
# Obtain a reference to the text file
filename = 'wasbs://sparklab@a4rsparkresources.blob.core.windows.net/FoodInspectionsNoHeader.csv'
inspectionsFile = sc.textFile(filename)

# Process each line of the CSV file by mapping them through a parse function.
def csvParse(s):
    import csv
    from StringIO import StringIO
    sio = StringIO(s)
    value = csv.reader(sio).next()
    sio.close()
    return value
inspectionDataset = inspectionsFile.map(csvParse)

In [None]:
from pyspark.sql.types import *

# Configure the schema that describes the data being imported 
schema=StructType([
        StructField('Inspection ID', LongType(), False),
        StructField('DBA Name', StringType(), False),
        StructField('AKA Name', StringType(), True),
        StructField('License#', LongType(), False),
        StructField('FacilityType', StringType(), True),
        StructField('Risk', StringType(), True),
        StructField('Address', StringType(), True),
        StructField('City', StringType(), True),
        StructField('State', StringType(), True),
        StructField('Zip', StringType(), True),
        StructField('InspectionDate', StringType(), False),
        StructField('InspectionType', StringType(), False),
        StructField('Results', StringType(), False),
        StructField('Latitude', DoubleType(), True),
        StructField('Longitude', DoubleType(), True)])

# Create a dataframe by bringing the data and the schema together
inspectionDataframe = sqlContext.createDataFrame(inspectionDataset, schema)

#### Step 2 - Prepare the data

In general, the first step in a Machine Learning exercise (once the data has been acquired) is to pre-process the data in order to align the data content with the machine learning tools being used.  This sometimes includes replacing or removing extraneous or outlying data, identifying label and feature values, and other preparatory steps.

The following several cells perform the data preparation tasks necessary for this lab.  These steps include:
- Reducing the inspection results to a pass/fail/other category, then dropping rows from the "other" category in order to get to a set of binary pass/fail records.
- Mapping text-based feature values to numerical values required by the Spark ML Machine Learning algorithms.
- Grouping the features to be used by Spark ML to develop a model.

Run the code in the following three cells to perform these data preparation tasks.

In [None]:
from pyspark.ml import Pipeline
from pyspark.sql import Row
from pyspark.sql.functions import UserDefinedFunction

# Function to map Pass/Fail to 1.0 or 0.0 (and -1 for other values)
def labelForResults(s):
    if s == 'Fail':
        return 0.0
    elif s == 'Pass w/ Conditions' or s == 'Pass':
        return 1.0
    else:
        return -1.0
label = UserDefinedFunction(labelForResults, DoubleType())

# Create a new dataframe that uses the computed "label" instead of the "results", and excludes "other values"
labeledData = inspectionDataframe.select(
    label(inspectionDataframe['results']).alias('label'), 
    inspectionDataframe['FacilityType'], 
    inspectionDataframe['InspectionType'], 
    inspectionDataframe['Zip']).where('label >= 0')

print('After filtering, there are ' + str(labeledData.count()) + ' records.')

In [None]:
# Use indexers to convert from string values to a numeric index value
from pyspark.ml.feature import StringIndexer

facilityIndexer = StringIndexer(inputCol="FacilityType", outputCol="FacilityTypeIndex")
inspectionIndexer = StringIndexer(inputCol="InspectionType", outputCol="InspectionTypeIndex")
zipIndexer = StringIndexer(inputCol="Zip", outputCol="ZipIndex")

# Run the indexers to create a new dataframe
pipeline = Pipeline(stages=[facilityIndexer, inspectionIndexer, zipIndexer])
indexedData = pipeline.fit(labeledData).transform(labeledData)

In [None]:
# Convert from several discrete feature columns to a single vector feature column
from pyspark.ml.feature import VectorAssembler

assembler = VectorAssembler(inputCols=["FacilityTypeIndex", "InspectionTypeIndex", "ZipIndex"], outputCol="features")
preparedData = assembler.transform(indexedData)

#### Step 3 - Train and Score the Model

Once the data has been prepared, you can then build, train, and score your model.  To do this, you generally need to identify two data sets.  The first data set will be used for training the model.  The second data set is usuallyy independent of the first one sand is used to score the model - by applying the Machine Learning model to each of the values in the scoring data set and seeing how effective it is at predicting the outcome.

Run the code in the following three cells.  The first cell splits the data that you have prepared 80%/20% into two other collections, one for training and one for scoring.  The second cell creates an instance of the *Naive Bayes*  classifer Machine Learning algorithm and trains it with your training data.  Finally, the third cell applies your trained model to the scoring data and prints out the relative success percentage.

In [None]:
# Split the sample data into 80% training set, 20% scoring/evaluation set
splits = preparedData.randomSplit([0.8, 0.2], 24)
trainingData = splits[0]
scoringData = splits[1]
print("Training: " + str(trainingData.count()) + ' records.')
print("Scoring: " + str(scoringData.count()) + ' records.')

In [None]:
# Train the ML model
from pyspark.ml.classification import NaiveBayes

nb = NaiveBayes()
nbModel = nb.fit(trainingData)

In [None]:
# Score/evaluate the model
predictions = nbModel.transform(scoringData)
predictions.registerTempTable('predictions')

# Display the success rate
numSuccesses = predictions.where("""(prediction = 0 AND label = 0) OR 
                                      (prediction = 1 AND label = 1)""").count()
numInspections = predictions.count()

print "There were", numInspections, "inspections and there were", numSuccesses, "successful predictions"
print "This is a", str((float(numSuccesses) / float(numInspections)) * 100) + "%", "success rate"

#### Step 4 - Graph the model's accuracy
The accuracy percentage text displayed at the conclusion of the previous step is interesting, but it might be more useful to actually visualize the success of the algorithm.

Run the code in the following five cells. The first four use the `%%sql` Magic Instruction to calculate the number of true and false positive and negative results that resulted from scoring your Machine Learning algorithm.  The code in the final cell will then plot these values in a pie chart so you can visualize the strengths and weaknesses of your trained model.

In [None]:
%%sql -q -o true_positive
SELECT count(*) AS cnt FROM Predictions WHERE prediction = 1 AND label = 1

In [None]:
%%sql -q -o false_positive
SELECT count(*) AS cnt FROM Predictions WHERE prediction = 1 AND label = 0

In [None]:
%%sql -q -o true_negative
SELECT count(*) AS cnt FROM Predictions WHERE prediction = 0 AND label = 0

In [None]:
%%sql -q -o false_negative
SELECT count(*) AS cnt FROM Predictions WHERE prediction = 0 AND label = 1

In [None]:
%%local
%matplotlib inline
import matplotlib.pyplot as plt

labels = ['True positive', 'False positive', 'True negative', 'False negative']
sizes = [true_positive['cnt'], false_positive['cnt'], false_negative['cnt'], true_negative['cnt']]
plt.pie(sizes, labels=labels, autopct='%1.1f%%')
plt.axis('equal')

At this point, you could work on interpreting the chart above and use the insight it provides to refine your data collection, preparation, and training work until you have a Machine Learning model whose accuracy you are satisfied with.