<a href="https://colab.research.google.com/github/jcestevezc/Machine-Learning-Big-Data/blob/master/Machine%20Learning%20on%20Spark/Introduction.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

<img src="https://github.com/jcestevezc/Machine-Learning-Big-Data/blob/master/Machine%20Learning%20on%20Spark/ASB.jpeg?raw=true" width="1000"> 
# Machine Learning on Spark

## 0.1 Methodology [CRISP-DM](https://github.com/jcestevezc/Machine-Learning-Big-Data/blob/master/Machine%20Learning%20on%20Spark/CRISP_DM.pdf) and [ASUM-DM](http://gforge.icesi.edu.co/ASUM-DM_External/index.htm#cognos.external.asum-DM_Teaser/deliveryprocesses/ASUM-DM_8A5C87D5.html)

Cross-industry standard process for data mining, known as CRISP-DM,[1] is an open standard process model that describes common approaches used by data mining experts. It is the most widely-used analytics model.

<img src="https://github.com/jcestevezc/Machine-Learning-Big-Data/blob/master/Machine%20Learning%20on%20Spark/CRISP.png?raw=true" width="400">

In 2015, IBM released a new methodology called Analytics Solutions Unified Method for Data Mining/Predictive Analytics (also known as ASUM-DM) which refines and extends CRISP-DM. Include *project management, more detailed deployment and operational description*.

<img src="https://github.com/jcestevezc/Machine-Learning-Big-Data/blob/master/Machine%20Learning%20on%20Spark/ASUM.jpg?raw=true" width="610">




## 0.2. Enviroment preparation

In [1]:
 ! pip install pyspark



In [2]:
# Import the PySpark module
from pyspark.sql import SparkSession
from pyspark import SparkContext, SparkConf
from pyspark.sql.functions import round
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import VectorAssembler

In [3]:
##Create SparkContext
sc = SparkContext.getOrCreate()

# Create SparkSession object
spark = SparkSession.builder \
                    .master('local[*]') \
                    .appName('SparkML') \
                    .getOrCreate()

In [4]:
# What version of Spark?
print(spark.version)

3.0.0


## 1. Business Understanding

* Determine business objectives
* Assess situation
* Determine data mining goals
* Produce project plan


*Short data description*: airline data 

*Data mining goals*: build a model for predict the delay in the differents flights.

Note: a late arrival corresponds to a delay > 15 min

## 2. Data Understanding

### 2.1 Collect initial data

In [5]:
## Is not the best choose for large data sets
flights = spark.read.csv('/content/sample_data/flights_small.csv',
                         sep=',',
                         header=True,
                         inferSchema=True,
                         nullValue='NA'
                        )

### 2.2 Describe data

In [6]:
flights.printSchema()

root
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- day: integer (nullable = true)
 |-- dep_time: integer (nullable = true)
 |-- dep_delay: integer (nullable = true)
 |-- arr_time: integer (nullable = true)
 |-- arr_delay: integer (nullable = true)
 |-- carrier: string (nullable = true)
 |-- tailnum: string (nullable = true)
 |-- flight: integer (nullable = true)
 |-- origin: string (nullable = true)
 |-- dest: string (nullable = true)
 |-- air_time: integer (nullable = true)
 |-- distance: integer (nullable = true)
 |-- hour: integer (nullable = true)
 |-- minute: integer (nullable = true)



Exploring the data dictionary:

<img src="https://github.com/jcestevezc/Machine-Learning-Big-Data/blob/master/Machine%20Learning%20on%20Spark/DD.PNG?raw=true" width="610">


### 2.3 Explore data

In [7]:
# Get number of records
print("The data contain %d records." % flights.count())

The data contain 10000 records.


In [8]:
# View the first five records
flights.show(5)

+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+--------+----+------+
|year|month|day|dep_time|dep_delay|arr_time|arr_delay|carrier|tailnum|flight|origin|dest|air_time|distance|hour|minute|
+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+--------+----+------+
|2014|   12|  8|     658|       -7|     935|       -5|     VX| N846VA|  1780|   SEA| LAX|     132|     954|   6|    58|
|2014|    1| 22|    1040|        5|    1505|        5|     AS| N559AS|   851|   SEA| HNL|     360|    2677|  10|    40|
|2014|    3|  9|    1443|       -2|    1652|        2|     VX| N847VA|   755|   SEA| SFO|     111|     679|  14|    43|
|2014|    4|  9|    1705|       45|    1839|       34|     WN| N360SW|   344|   PDX| SJC|      83|     569|  17|     5|
|2014|    3|  9|     754|       -1|    1015|        1|     AS| N612AS|   522|   SEA| BUR|     127|     937|   7|    54|
+----+-----+---+--------+---------+-----

In [9]:
flights.describe().toPandas().transpose()

Unnamed: 0,0,1,2,3,4
summary,count,mean,stddev,min,max
year,10000,2014.0,0.0,2014,2014
month,10000,6.6438,3.3191600205962097,1,12
day,10000,15.7009,8.895142019392079,1,31
dep_time,9952,1277.1158561093248,524.1142951055886,1,2400
dep_delay,9952,6.068629421221865,28.808608062751805,-19,886
arr_time,9945,1477.7236802413272,526.5936522261665,1,2400
arr_delay,9925,2.2530982367758186,31.074918600451877,-58,900
carrier,10000,,,AA,WN
tailnum,9986,,,D942DN,N988CA


In [10]:
# Check column data types
flights.dtypes

[('year', 'int'),
 ('month', 'int'),
 ('day', 'int'),
 ('dep_time', 'int'),
 ('dep_delay', 'int'),
 ('arr_time', 'int'),
 ('arr_delay', 'int'),
 ('carrier', 'string'),
 ('tailnum', 'string'),
 ('flight', 'int'),
 ('origin', 'string'),
 ('dest', 'string'),
 ('air_time', 'int'),
 ('distance', 'int'),
 ('hour', 'int'),
 ('minute', 'int')]

### 2.4 Verify data quality

Assement about the data quality, a report similar to pandas profiling could be very usefuld.

## 3. Data preparation

### 3.1 Select data

hour + minute = dep_time

In [11]:
flights = flights.drop('year','hour','minute','flight')
flights.show(5)

+-----+---+--------+---------+--------+---------+-------+-------+------+----+--------+--------+
|month|day|dep_time|dep_delay|arr_time|arr_delay|carrier|tailnum|origin|dest|air_time|distance|
+-----+---+--------+---------+--------+---------+-------+-------+------+----+--------+--------+
|   12|  8|     658|       -7|     935|       -5|     VX| N846VA|   SEA| LAX|     132|     954|
|    1| 22|    1040|        5|    1505|        5|     AS| N559AS|   SEA| HNL|     360|    2677|
|    3|  9|    1443|       -2|    1652|        2|     VX| N847VA|   SEA| SFO|     111|     679|
|    4|  9|    1705|       45|    1839|       34|     WN| N360SW|   PDX| SJC|      83|     569|
|    3|  9|     754|       -1|    1015|        1|     AS| N612AS|   SEA| BUR|     127|     937|
+-----+---+--------+---------+--------+---------+-------+-------+------+----+--------+--------+
only showing top 5 rows



### 3.2 Clean data

#### 3.2.1 Filtering out missing data

In [12]:
flights.filter('dep_delay is null').count()

48

In [13]:
flights.filter('arr_delay is null').count()

75

In [14]:
flights = flights.filter('dep_delay is not null')
flights = flights.filter('arr_delay is not null')
flights.show(5)

+-----+---+--------+---------+--------+---------+-------+-------+------+----+--------+--------+
|month|day|dep_time|dep_delay|arr_time|arr_delay|carrier|tailnum|origin|dest|air_time|distance|
+-----+---+--------+---------+--------+---------+-------+-------+------+----+--------+--------+
|   12|  8|     658|       -7|     935|       -5|     VX| N846VA|   SEA| LAX|     132|     954|
|    1| 22|    1040|        5|    1505|        5|     AS| N559AS|   SEA| HNL|     360|    2677|
|    3|  9|    1443|       -2|    1652|        2|     VX| N847VA|   SEA| SFO|     111|     679|
|    4|  9|    1705|       45|    1839|       34|     WN| N360SW|   PDX| SJC|      83|     569|
|    3|  9|     754|       -1|    1015|        1|     AS| N612AS|   SEA| BUR|     127|     937|
+-----+---+--------+---------+--------+---------+-------+-------+------+----+--------+--------+
only showing top 5 rows



In [15]:
flights = flights.dropna()
flights.show(5)

+-----+---+--------+---------+--------+---------+-------+-------+------+----+--------+--------+
|month|day|dep_time|dep_delay|arr_time|arr_delay|carrier|tailnum|origin|dest|air_time|distance|
+-----+---+--------+---------+--------+---------+-------+-------+------+----+--------+--------+
|   12|  8|     658|       -7|     935|       -5|     VX| N846VA|   SEA| LAX|     132|     954|
|    1| 22|    1040|        5|    1505|        5|     AS| N559AS|   SEA| HNL|     360|    2677|
|    3|  9|    1443|       -2|    1652|        2|     VX| N847VA|   SEA| SFO|     111|     679|
|    4|  9|    1705|       45|    1839|       34|     WN| N360SW|   PDX| SJC|      83|     569|
|    3|  9|     754|       -1|    1015|        1|     AS| N612AS|   SEA| BUR|     127|     937|
+-----+---+--------+---------+--------+---------+-------+-------+------+----+--------+--------+
only showing top 5 rows



### 3.3 Construct data

Convert 'mile' to 'km' and drop 'mile' column

In [16]:
flights = flights.withColumn('distance_km', round(flights.distance * 1.60934, 0)).drop('distance')
flights.show(5)

+-----+---+--------+---------+--------+---------+-------+-------+------+----+--------+-----------+
|month|day|dep_time|dep_delay|arr_time|arr_delay|carrier|tailnum|origin|dest|air_time|distance_km|
+-----+---+--------+---------+--------+---------+-------+-------+------+----+--------+-----------+
|   12|  8|     658|       -7|     935|       -5|     VX| N846VA|   SEA| LAX|     132|     1535.0|
|    1| 22|    1040|        5|    1505|        5|     AS| N559AS|   SEA| HNL|     360|     4308.0|
|    3|  9|    1443|       -2|    1652|        2|     VX| N847VA|   SEA| SFO|     111|     1093.0|
|    4|  9|    1705|       45|    1839|       34|     WN| N360SW|   PDX| SJC|      83|      916.0|
|    3|  9|     754|       -1|    1015|        1|     AS| N612AS|   SEA| BUR|     127|     1508.0|
+-----+---+--------+---------+--------+---------+-------+-------+------+----+--------+-----------+
only showing top 5 rows



Create 'label' column indicating whether flight delayed (1) or not (0)

In [17]:
flights = flights.withColumn('label', (flights.arr_delay >=15).cast('integer'))
flights.show(5)

+-----+---+--------+---------+--------+---------+-------+-------+------+----+--------+-----------+-----+
|month|day|dep_time|dep_delay|arr_time|arr_delay|carrier|tailnum|origin|dest|air_time|distance_km|label|
+-----+---+--------+---------+--------+---------+-------+-------+------+----+--------+-----------+-----+
|   12|  8|     658|       -7|     935|       -5|     VX| N846VA|   SEA| LAX|     132|     1535.0|    0|
|    1| 22|    1040|        5|    1505|        5|     AS| N559AS|   SEA| HNL|     360|     4308.0|    0|
|    3|  9|    1443|       -2|    1652|        2|     VX| N847VA|   SEA| SFO|     111|     1093.0|    0|
|    4|  9|    1705|       45|    1839|       34|     WN| N360SW|   PDX| SJC|      83|      916.0|    1|
|    3|  9|     754|       -1|    1015|        1|     AS| N612AS|   SEA| BUR|     127|     1508.0|    0|
+-----+---+--------+---------+--------+---------+-------+-------+------+----+--------+-----------+-----+
only showing top 5 rows



### 3.4 Integrate data

Is not necessary for this dataset

### 3.5 Format data

#### 3.5.1 Indexing categorical data

In [18]:
# Create an indexer
indexer = StringIndexer(inputCol='carrier', outputCol='carrier_idx')

# Indexer identifies categories in the data
indexer_model = indexer.fit(flights)

# Indexer creates a new column with numeric index values
flights_indexed = indexer_model.transform(flights)

# Repeat the process for the other categorical feature
flights_indexed = StringIndexer(inputCol='origin', outputCol='origin_idx').fit(flights_indexed).transform(flights_indexed)
flights_indexed = StringIndexer(inputCol='dest', outputCol='dest_idx').fit(flights_indexed).transform(flights_indexed)
flights_indexed = StringIndexer(inputCol='tailnum', outputCol='tailnum_idx').fit(flights_indexed).transform(flights_indexed)

In [19]:
flights_indexed.show(5)

+-----+---+--------+---------+--------+---------+-------+-------+------+----+--------+-----------+-----+-----------+----------+--------+-----------+
|month|day|dep_time|dep_delay|arr_time|arr_delay|carrier|tailnum|origin|dest|air_time|distance_km|label|carrier_idx|origin_idx|dest_idx|tailnum_idx|
+-----+---+--------+---------+--------+---------+-------+-------+------+----+--------+-----------+-----+-----------+----------+--------+-----------+
|   12|  8|     658|       -7|     935|       -5|     VX| N846VA|   SEA| LAX|     132|     1535.0|    0|        8.0|       0.0|     1.0|      320.0|
|    1| 22|    1040|        5|    1505|        5|     AS| N559AS|   SEA| HNL|     360|     4308.0|    0|        0.0|       0.0|    21.0|      119.0|
|    3|  9|    1443|       -2|    1652|        2|     VX| N847VA|   SEA| SFO|     111|     1093.0|    0|        8.0|       0.0|     0.0|      433.0|
|    4|  9|    1705|       45|    1839|       34|     WN| N360SW|   PDX| SJC|      83|      916.0|    1|  

#### 3.5.2 Assembling columns

In [20]:
# Create an assembler object
assembler = VectorAssembler(
    inputCols=['month', 
               'day', 
               'dep_time', 
               'dep_delay', 
               'arr_time', 
               'arr_delay', 
               'air_time', 
               'distance_km', 
               'carrier_idx', 
               'origin_idx', 
               'dest_idx',
               'tailnum_idx',
              ], outputCol='features')

# Consolidate predictor columns
flights = assembler.transform(flights_indexed)

In [21]:
flights.show(5)

+-----+---+--------+---------+--------+---------+-------+-------+------+----+--------+-----------+-----+-----------+----------+--------+-----------+--------------------+
|month|day|dep_time|dep_delay|arr_time|arr_delay|carrier|tailnum|origin|dest|air_time|distance_km|label|carrier_idx|origin_idx|dest_idx|tailnum_idx|            features|
+-----+---+--------+---------+--------+---------+-------+-------+------+----+--------+-----------+-----+-----------+----------+--------+-----------+--------------------+
|   12|  8|     658|       -7|     935|       -5|     VX| N846VA|   SEA| LAX|     132|     1535.0|    0|        8.0|       0.0|     1.0|      320.0|[12.0,8.0,658.0,-...|
|    1| 22|    1040|        5|    1505|        5|     AS| N559AS|   SEA| HNL|     360|     4308.0|    0|        0.0|       0.0|    21.0|      119.0|[1.0,22.0,1040.0,...|
|    3|  9|    1443|       -2|    1652|        2|     VX| N847VA|   SEA| SFO|     111|     1093.0|    0|        8.0|       0.0|     0.0|      433.0|[3

#### 3.5.3 Train/test split

In [22]:
# Split into training and testing sets in a 80:20 ratio
flights_train, flights_test = flights.randomSplit([0.8, 0.2], seed=17)

# Check that training set has around 80% of records
training_ratio = flights_train.count() / flights.count()
print(training_ratio)

0.7949622166246851


## 4. Modeling

### 4.1 Select modeling technique: Decision Tree

In [23]:
# Import the Decision Tree Classifier class
from pyspark.ml.classification import DecisionTreeClassifier

# Create a classifier object and fit to the training data
tree = DecisionTreeClassifier()
tree_model = tree.fit(flights_train)

# Create predictions for the testing data and take a look at the predictions
prediction = tree_model.transform(flights_test)
prediction.select('label', 'prediction', 'probability').show(5, False)

IllegalArgumentException: ignored

### 4.2 Assess model

In [None]:
# Create a confusion matrix
prediction.groupBy('label', 'prediction').count().show()

# Calculate the elements of the confusion matrix
TN = prediction.filter('prediction = 0 AND label = prediction').count()
TP = prediction.filter('prediction = 1 AND label = prediction').count()
FN = prediction.filter('prediction = 0 AND label != prediction').count()
FP = prediction.filter('prediction = 1 AND label != prediction').count()

# Accuracy measures the proportion of correct predictions
accuracy = (TN + TP) / (TN + TP + FN + FP) 
print(accuracy)