In [None]:
MLlib is a component of Apache Spark for machine learning

Various tools provided by MLlib include:
    
ML Algorithms: collaborative filtering, classification and clustering
    
Featurization: feature extraction, transformation, dimensionality reduction, and selection
    
Pipelines: tools for constructing, evaluating, and tuning ML Pipelines

In [None]:
Scikit-learn is a popular Python library for data mining and machine learning

Scikit-learn algorithms only work for small datasets on a single machine

Spark's MLlib algorithms are designed for parallel processing on a cluster

Supports languages such as Scala, Java, and R

Provides a high-level API to build machine learning pipelines

In [None]:
The three C's of machine learning in PySpark MLlib

Collaborative Filtering: Produce recommendations, useful in recommendation engines

Classification: Which of a set categorises a new observation
    
Clustering: Grouping data based on similar charateristics

In [5]:
#pyspark.mllib.recommendation

from pyspark.mllib.recommendation import ALS

#pyspark.mllib.classification

from pyspark.mllib.classification import LogisticRegressionWithLBFGS

#pyspark.mllib.clustering

from pyspark.mllib.clustering import KMeans

# Collaborative Filtering

Finding users that share common interests

It can be user based or item based

In [None]:
Rating class in pyspark.mllib.recommendation submodule

The Rating class is a wrapper around tuple (user, product and rating)

Useful for parsing the RDD and creating a tuple of user id, product id and rating

In [None]:
from pyspark.mllib.recommendation import Rating
r = Rating(user = 1, product = 2, rating = 5.0)
(r[0], r[1], r[2])

In [None]:
Splitting the data using randomSplit()

Between training and test data

In [None]:
data = sc.parallelize([1, 2, 3, 4, 5, 6, 7, 8, 9, 10])
training, test=data.randomSplit([0.6, 0.4])
training.collect()
test.collect()

# Alternating Least Squares (ALS)

Alternating Least Squares (ALS) algorithm in spark.mllib provides collaborative filtering

ALS.train(ratings, rank, iterations)

rank means number of features

number of iterations to run least square computations

In [None]:
r1 = Rating(1, 1, 1.0)
r2 = Rating(1, 2, 2.0)
r3 = Rating(2, 1, 2.0)
ratings = sc.parallelize([r1, r2, r3])
ratings.collect()

In [None]:
model = ALS.train(ratings, rank=10, iterations=10)

In [None]:
predictAll() – Returns RDD of Rating Objects

The predictAll() method returns a list of predicted ratings for input user and product pair

The method takes in an RDD without ratings to generate the ratings

In [None]:
unrated_RDD = sc.parallelize([(1, 2), (1, 1)])

In [None]:
predictions = model.predictAll(unrated_RDD)
predictions.collect()

# Transformations

In [6]:
import pandas as pd

In [7]:
flights = pd.read_csv('flights.csv')

In [8]:
print(flights.head(n=5))

   mon  dom  dow carrier  flight  org  mile  depart  duration  delay
0   11   20    6      US      19  JFK  2153    9.48       351    NaN
1    0   22    2      UA    1107  ORD   316   16.33        82   30.0
2    2   20    4      UA     226  SFO   337    6.17        82   -8.0
3    9   13    1      AA     419  ORD  1236   10.33       195   -5.0
4    4    2    5      AA     325  ORD   258    8.92        65    NaN


In [9]:
# Remove the 'flight' column
flights = flights.drop(columns='flight')

In [10]:
print(flights.head(n=5))

   mon  dom  dow carrier  org  mile  depart  duration  delay
0   11   20    6      US  JFK  2153    9.48       351    NaN
1    0   22    2      UA  ORD   316   16.33        82   30.0
2    2   20    4      UA  SFO   337    6.17        82   -8.0
3    9   13    1      AA  ORD  1236   10.33       195   -5.0
4    4    2    5      AA  ORD   258    8.92        65    NaN


In [11]:
flights=flights.dropna()

In [12]:
print(flights.head(n=5))

   mon  dom  dow carrier  org  mile  depart  duration  delay
1    0   22    2      UA  ORD   316   16.33        82   30.0
2    2   20    4      UA  SFO   337    6.17        82   -8.0
3    9   13    1      AA  ORD  1236   10.33       195   -5.0
5    5    2    1      UA  SFO   550    7.98       102    2.0
6    7    2    6      AA  ORD   733   10.83       135   54.0


In [None]:
#Column manipulation

In [13]:
import pyspark
from pyspark.sql import SparkSession
import pandas as pd

spark = SparkSession.builder.appName('pandasToSparkDF').getOrCreate()
flights = spark.createDataFrame(flights)

In [14]:
# Import the required function
from pyspark.sql.functions import round

In [15]:
#Derive a new km column from the mile column, rounding to zero decimal places. One mile is 1.60934 km.

#Remove the mile column.

flights_km = flights.withColumn('km', round(flights.mile * 1.60934, 0)) \
                    .drop('mile')

In [16]:
# Create a label column with a value of 1 indicating the delay was 15 minutes or more and 0 otherwise.

flights = flights_km.withColumn('label', (flights_km.delay >= 15).cast('integer'))

In [17]:
# Check first five records
flights.show(5)

+---+---+---+-------+---+------+--------+-----+------+-----+
|mon|dom|dow|carrier|org|depart|duration|delay|    km|label|
+---+---+---+-------+---+------+--------+-----+------+-----+
|  0| 22|  2|     UA|ORD| 16.33|      82| 30.0| 509.0|    1|
|  2| 20|  4|     UA|SFO|  6.17|      82| -8.0| 542.0|    0|
|  9| 13|  1|     AA|ORD| 10.33|     195| -5.0|1989.0|    0|
|  5|  2|  1|     UA|SFO|  7.98|     102|  2.0| 885.0|    0|
|  7|  2|  6|     AA|ORD| 10.83|     135| 54.0|1180.0|    1|
+---+---+---+-------+---+------+--------+-----+------+-----+
only showing top 5 rows



In [None]:
In the flights data there are two columns, carrier and org, which hold categorical data. 

You need to transform those columns into indexed numerical values.

In [18]:
from pyspark.ml.feature import StringIndexer

# Create an indexer object to transform the carrier column from a string to an numeric index.
indexer = StringIndexer(inputCol='carrier', outputCol='carrier_idx')

# Indexer identifies categories in the data, Prepare the indexer object on the flight data.
indexer_model = indexer.fit(flights)

# Indexer creates a new column with numeric index values
flights_indexed = indexer_model.transform(flights)

# Repeat the process for the other categorical feature
flights = StringIndexer(inputCol='org', outputCol='org_idx').fit(flights_indexed).transform(flights_indexed)

In [19]:
flights.show(5)

+---+---+---+-------+---+------+--------+-----+------+-----+-----------+-------+
|mon|dom|dow|carrier|org|depart|duration|delay|    km|label|carrier_idx|org_idx|
+---+---+---+-------+---+------+--------+-----+------+-----+-----------+-------+
|  0| 22|  2|     UA|ORD| 16.33|      82| 30.0| 509.0|    1|        0.0|    0.0|
|  2| 20|  4|     UA|SFO|  6.17|      82| -8.0| 542.0|    0|        0.0|    1.0|
|  9| 13|  1|     AA|ORD| 10.33|     195| -5.0|1989.0|    0|        1.0|    0.0|
|  5|  2|  1|     UA|SFO|  7.98|     102|  2.0| 885.0|    0|        0.0|    1.0|
|  7|  2|  6|     AA|ORD| 10.83|     135| 54.0|1180.0|    1|        1.0|    0.0|
+---+---+---+-------+---+------+--------+-----+------+-----+-----------+-------+
only showing top 5 rows



# Assembling columns

The final stage of data preparation is to consolidate all of the predictor columns into a single column.

An updated version of the flights data, which takes into account all of the changes from the previous few exercises, has the following predictor columns:

mon, dom and dow
carrier_idx (indexed value from carrier)
org_idx (indexed value from org)
km
depart
duration

In [20]:
# Import the necessary class
from pyspark.ml.feature import VectorAssembler

# Create an assembler object
assembler = VectorAssembler(inputCols=[
    'mon', 'dom', 'dow','depart', 'duration','km','carrier_idx', 'org_idx'
], outputCol='features')

# Consolidate predictor columns
flights = assembler.transform(flights)

# Check the resulting column
flights.select('features', 'delay').show(5, truncate=False)

+-----------------------------------------+-----+
|features                                 |delay|
+-----------------------------------------+-----+
|[0.0,22.0,2.0,16.33,82.0,509.0,0.0,0.0]  |30.0 |
|[2.0,20.0,4.0,6.17,82.0,542.0,0.0,1.0]   |-8.0 |
|[9.0,13.0,1.0,10.33,195.0,1989.0,1.0,0.0]|-5.0 |
|[5.0,2.0,1.0,7.98,102.0,885.0,0.0,1.0]   |2.0  |
|[7.0,2.0,6.0,10.83,135.0,1180.0,1.0,0.0] |54.0 |
+-----------------------------------------+-----+
only showing top 5 rows



# Decision Tree

### Train/test split

In [21]:
# Split into training and testing sets in a 80:20 ratio

# For repeatability set a random number seed of 17 for the split.

flights_train, flights_test = flights.randomSplit([0.8, 0.2], seed=17)

In [22]:
flights.count()

47022

In [23]:
flights_train.count()

37592

In [24]:
[flights_train.count(),flights_test.count()]

[37592, 9430]

In [25]:
flights_train.show()

+---+---+---+-------+---+------+--------+-----+------+-----+-----------+-------+--------------------+
|mon|dom|dow|carrier|org|depart|duration|delay|    km|label|carrier_idx|org_idx|            features|
+---+---+---+-------+---+------+--------+-----+------+-----+-----------+-------+--------------------+
|  0|  1|  2|     AA|LGA| 20.42|     185| 31.0|1765.0|    1|        1.0|    3.0|[0.0,1.0,2.0,20.4...|
|  0|  1|  2|     AA|ORD|  8.75|     120|  8.0|1180.0|    0|        1.0|    0.0|[0.0,1.0,2.0,8.75...|
|  0|  1|  2|     AA|ORD| 11.08|     105| 18.0| 985.0|    1|        1.0|    0.0|[0.0,1.0,2.0,11.0...|
|  0|  1|  2|     AA|ORD| 12.67|     160| 68.0|1429.0|    1|        1.0|    0.0|[0.0,1.0,2.0,12.6...|
|  0|  1|  2|     AA|ORD|  15.0|      70| 62.0| 415.0|    1|        1.0|    0.0|[0.0,1.0,2.0,15.0...|
|  0|  1|  2|     AA|ORD| 16.75|     260|137.0|2778.0|    1|        1.0|    0.0|[0.0,1.0,2.0,16.7...|
|  0|  1|  2|     AA|ORD| 17.75|     180|237.0|1926.0|    1|        1.0|    0.0|[0

### Build a Decision Tree

In [26]:
# Import the Decision Tree Classifier class
from pyspark.ml.classification import DecisionTreeClassifier

In [27]:
# Create a classifier object and fit to the training data
tree = DecisionTreeClassifier()

In [28]:
tree_model = tree.fit(flights_train)

In [29]:
# Create predictions for the testing data and take a look at the predictions
prediction = tree_model.transform(flights_test)
prediction.select('label', 'prediction', 'probability').show(5, False)

+-----+----------+----------------------------------------+
|label|prediction|probability                             |
+-----+----------+----------------------------------------+
|1    |1.0       |[0.37313780260707635,0.6268621973929237]|
|1    |0.0       |[0.5275247524752475,0.47247524752475245]|
|1    |1.0       |[0.43974159055468925,0.5602584094453108]|
|1    |1.0       |[0.43974159055468925,0.5602584094453108]|
|1    |1.0       |[0.43974159055468925,0.5602584094453108]|
+-----+----------+----------------------------------------+
only showing top 5 rows



### Evaluate the Decision Tree

In [None]:
A confusion matrix gives a useful breakdown of predictions versus known values. 
It has four cells which represent the counts of:

True Negatives (TN) — model predicts negative outcome & known outcome is negative
True Positives (TP) — model predicts positive outcome & known outcome is positive
False Negatives (FN) — model predicts negative outcome but known outcome is positive
False Positives (FP) — model predicts positive outcome but known outcome is negative.

The accuracy is the ratio of correct predictions (TP and TN) to all predictions (TP, TN, FP and FN).

In [36]:
# Create a confusion matrix by counting the combinations of label and prediction
prediction.groupBy('label', 'prediction').count().show()

# Calculate the elements of the confusion matrix
TN = prediction.filter('prediction = 0 AND label = prediction').count()
TP = prediction.filter('prediction = 1 AND label = prediction').count()
FN = prediction.filter('prediction = 0 AND label != prediction').count()
FP = prediction.filter('prediction = 1 AND label != prediction').count()

# Accuracy measures the proportion of correct predictions
accuracy = (TN + TP) / (TN + TP + FN + FP)
print(accuracy)

+-----+----------+-----+
|label|prediction|count|
+-----+----------+-----+
|    1|       0.0| 1664|
|    0|       0.0| 2596|
|    1|       1.0| 3130|
|    0|       1.0| 2040|
+-----+----------+-----+

0


In [38]:
from __future__ import division

In [167]:
(2279+3745)/(1079+2279+3745+2392)

0.6344391785150079

# Ensembles

An ensemble model combines the results from multiple models to produce better predictions than any one of those models acting alone. 

The concept is based on the idea of the "Wisdom of the Crowd", which implies that the aggregated opinion of a group is better than the opinions of the individuals in that group, even if the individuals are experts.

Ideally each of the models in the ensemble should be different.

In [None]:
### Random Forest

- each tree is trained on a different random subset of the data and 

- within each tree a random subset of features is used for splitting at each node. 

The result is a collection of trees where no two trees are the same. 

Within the Random Forest model, all of the trees operate in parallel.

In [31]:
from pyspark.ml import Pipeline
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.feature import IndexToString, StringIndexer, VectorIndexer
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# Train a RandomForest model.
rf = RandomForestClassifier(labelCol="indexedLabel", featuresCol="indexedFeatures", numTrees=10)

# Index labels, adding metadata to the label column.
# Fit on whole dataset to include all labels in index.
labelIndexer = StringIndexer(inputCol="label", outputCol="indexedLabel").fit(flights)

# Automatically identify categorical features, and index them.
# Set maxCategories so features with > 4 distinct values are treated as continuous.
featureIndexer =\
    VectorIndexer(inputCol="features", outputCol="indexedFeatures", maxCategories=4).fit(flights)

# Convert indexed labels back to original labels.
labelConverter = IndexToString(inputCol="prediction", outputCol="predictedLabel",
                               labels=labelIndexer.labels)

# Chain indexers and forest in a Pipeline
pipeline = Pipeline(stages=[labelIndexer, featureIndexer, rf, labelConverter])

# Train model.  This also runs the indexers.
model = pipeline.fit(flights_train)

# Make predictions.
predictions = model.transform(flights_test)

# Select example rows to display.
predictions.select("predictedLabel", "label", "features").show(5)

+--------------+-----+--------------------+
|predictedLabel|label|            features|
+--------------+-----+--------------------+
|             1|    1|[0.0,1.0,2.0,13.7...|
|             0|    1|[0.0,1.0,2.0,6.75...|
|             1|    1|[0.0,1.0,2.0,17.5...|
|             1|    1|[0.0,1.0,2.0,20.2...|
|             1|    1|[0.0,1.0,2.0,20.5...|
+--------------+-----+--------------------+
only showing top 5 rows



# LOGISTIC REGRESSION MODEL

In [None]:
The objective is to predict

whether a flight is likely to be delayed by at least 15 minutes (label 1) or not (label 0).

In [None]:
you'll only use the mon, depart and duration columns for the moment. 

These are numerical features which can immediately be used for a Logistic Regression model. 

In [None]:
The data have been split into training and testing sets and 

are available as flights_train and flights_test.

In [32]:
# Import the logistic regression class
from pyspark.ml.classification import LogisticRegression

# Create a classifier object and train on training data
logistic = LogisticRegression().fit(flights_train)

# Create predictions for the testing data and show confusion matrix
prediction = logistic.transform(flights_test)
prediction.groupBy('label', 'prediction').count().show()

+-----+----------+-----+
|label|prediction|count|
+-----+----------+-----+
|    1|       0.0| 1664|
|    0|       0.0| 2596|
|    1|       1.0| 3130|
|    0|       1.0| 2040|
+-----+----------+-----+



In [None]:
Evaluate the Logistic Regression model

Accuracy is generally not a very reliable metric because it can be biased by the most common target 

class.

There are two other useful metrics:

precision and
recall.

In [None]:
Precision is the proportion of positive predictions which are correct. 
For all flights which are predicted to be delayed, what proportion is actually delayed?

Recall is the proportion of positives outcomes which are correctly predicted. 
For all delayed flights, what proportion is correctly predicted by the model?

In [39]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator, BinaryClassificationEvaluator

# Calculate precision and recall
precision = TP / (TP + FP)
recall = TP / (TP + FN)
print('precision = {:.2f}\nrecall    = {:.2f}'.format(precision, recall))


precision = 0.61
recall    = 0.65


# Linear regression

In [None]:
Encoding flight origin
The org column in the flights data is a categorical variable giving the airport 
from which a flight departs.

ORD — O'Hare International Airport (Chicago)
SFO — San Francisco International Airport
JFK — John F Kennedy International Airport (New York)
LGA — La Guardia Airport (New York)
SMF — Sacramento
SJC — San Jose
TUS — Tucson International Airport
OGG — Kahului (Hawaii)
Obviously this is only a small subset of airports. 
Nevertheless, since this is a categorical variable, 
it needs to be one-hot encoded before it can be used in a regression model.

The data are in a variable called flights. 
You have already used a string indexer to create a column of indexed values 
corresponding to the strings in org.

In [None]:
Flight duration model: Just distance
In this exercise you'll build a regression model to predict flight duration (the duration column).

In [34]:
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator

# Create a regression object and train on training data
regression = LinearRegression(featuresCol='features',labelCol='duration').fit(flights_train)

# Create predictions for the testing data and take a look at the predictions
predictions = regression.transform(flights_test)
predictions.select('duration', 'prediction').show(5, False)

# Calculate the RMSE
RegressionEvaluator(labelCol='duration').evaluate(predictions)

+--------+------------------+
|duration|prediction        |
+--------+------------------+
|155     |155.00000000000006|
|341     |341.00000000000017|
|155     |155.00000000000006|
|157     |157.0             |
|114     |114.0             |
+--------+------------------+
only showing top 5 rows



1.0811031774548243e-13

### Interpreting the coefficients

In [35]:
# Intercept (average minutes on ground)
inter = regression.intercept
print(inter)

# Coefficients
coefs = regression.coefficients
print(coefs)

# Average minutes per km
minutes_per_km = regression.coefficients[0]
print(minutes_per_km)

# Average speed in km per hour
avg_speed = 60 / minutes_per_km
print(avg_speed)

-2.43905683111e-13
[-9.5374281055128e-16,-7.291202184260976e-16,-2.5178785244852695e-15,-1.4398988386382168e-15,1.0000000000000067,-5.070569873311002e-16,-1.5725358249002988e-14,1.3377064015779953e-14]
-9.5374281055128e-16
-6.291004171797526e+16


In [40]:
regression.coefficients

DenseVector([-0.0, -0.0, -0.0, -0.0, 1.0, -0.0, -0.0, 0.0])

In [41]:
regression.intercept

-2.4390568311079943e-13