<a href="https://colab.research.google.com/github/spyrgalaz/Deep_Neural_Networks/blob/main/PySpark.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Running Pyspark in Colab
To run spark in Colab, we need to first install all the dependencies in Colab environment i.e. Apache Spark 2.3.2 with hadoop 2.7, Java 8 and Findspark to locate the spark in the system. The tools installation can be carried out inside the Jupyter Notebook of the Colab. One important note is that if you are new in Spark, it is better to avoid Spark 2.4.0 version since some people have already complained about its compatibility issue with python. Follow the steps to install the dependencies:

In [2]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://www-us.apache.org/dist/spark/spark-2.4.7/spark-2.4.7-bin-hadoop2.7.tgz


In [3]:
!tar xf spark-2.4.7-bin-hadoop2.7.tgz
!pip install -q findspark
!pip install pyspark



In [4]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.4.7-bin-hadoop2.7"

In [5]:
import findspark
findspark.init()
import pyspark
from pyspark import SparkContext
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()

# New Section

# Linear Regression Model
Linear Regression model is one the oldest and widely used machine learning approach which assumes a relationship between dependent and independent variables. For example, a modeler might want to predict the forecast of the rain based on the humidity ratio. Linear Regression consists of the best fitting line through the scattered points on the graph and the best fitting line is known as the regression line.

The goal of this exercise to predict the housing prices by the given features. Let's predict the prices of the Boston Housing dataset by considering MEDV as the output variable and all the other variables as input.

In [6]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression

flights = spark.read.csv('/content/flights-larger.csv',inferSchema=True, header =True,nullValue='NA')

In [7]:
flights.show(5)

+---+---+---+-------+------+---+----+------+--------+-----+
|mon|dom|dow|carrier|flight|org|mile|depart|duration|delay|
+---+---+---+-------+------+---+----+------+--------+-----+
| 10| 10|  1|     OO|  5836|ORD| 157|  8.18|      51|   27|
|  1|  4|  1|     OO|  5866|ORD| 466|  15.5|     102| null|
| 11| 22|  1|     OO|  6016|ORD| 738|  7.17|     127|  -19|
|  2| 14|  5|     B6|   199|JFK|2248| 21.17|     365|   60|
|  5| 25|  3|     WN|  1675|SJC| 386| 12.92|      85|   22|
+---+---+---+-------+------+---+----+------+--------+-----+
only showing top 5 rows



In [8]:
flights.printSchema()

root
 |-- mon: integer (nullable = true)
 |-- dom: integer (nullable = true)
 |-- dow: integer (nullable = true)
 |-- carrier: string (nullable = true)
 |-- flight: integer (nullable = true)
 |-- org: string (nullable = true)
 |-- mile: integer (nullable = true)
 |-- depart: double (nullable = true)
 |-- duration: integer (nullable = true)
 |-- delay: integer (nullable = true)



In [9]:
print(flights.dtypes)

[('mon', 'int'), ('dom', 'int'), ('dow', 'int'), ('carrier', 'string'), ('flight', 'int'), ('org', 'string'), ('mile', 'int'), ('depart', 'double'), ('duration', 'int'), ('delay', 'int')]


In [10]:
print("The data contain %d records." % flights.count())

The data contain 29418 records.


In [11]:
# Remove the 'flight' column
flights_drop_column = flights.drop('flight')
print(flights_drop_column.show(5))
# Number of records with missing 'delay' values
flights_drop_column.filter('delay IS NULL').count()
print(flights_drop_column.show(5))
# Remove records with missing 'delay' values
flights_valid_delay = flights_drop_column.filter('delay IS NOT NULL')
print(flights_valid_delay.show(5))
# Remove records with missing values in any column and get the number of remaining rows
flights_none_missing = flights_valid_delay.dropna()
print(flights_none_missing.count())

+---+---+---+-------+---+----+------+--------+-----+
|mon|dom|dow|carrier|org|mile|depart|duration|delay|
+---+---+---+-------+---+----+------+--------+-----+
| 10| 10|  1|     OO|ORD| 157|  8.18|      51|   27|
|  1|  4|  1|     OO|ORD| 466|  15.5|     102| null|
| 11| 22|  1|     OO|ORD| 738|  7.17|     127|  -19|
|  2| 14|  5|     B6|JFK|2248| 21.17|     365|   60|
|  5| 25|  3|     WN|SJC| 386| 12.92|      85|   22|
+---+---+---+-------+---+----+------+--------+-----+
only showing top 5 rows

None
+---+---+---+-------+---+----+------+--------+-----+
|mon|dom|dow|carrier|org|mile|depart|duration|delay|
+---+---+---+-------+---+----+------+--------+-----+
| 10| 10|  1|     OO|ORD| 157|  8.18|      51|   27|
|  1|  4|  1|     OO|ORD| 466|  15.5|     102| null|
| 11| 22|  1|     OO|ORD| 738|  7.17|     127|  -19|
|  2| 14|  5|     B6|JFK|2248| 21.17|     365|   60|
|  5| 25|  3|     WN|SJC| 386| 12.92|      85|   22|
+---+---+---+-------+---+----+------+--------+-----+
only showing top

In [12]:
from pyspark.sql.types import StructType, StructField, IntegerType, StringType

In [13]:

flights=flights_none_missing

In [14]:
# Import the required function
from pyspark.sql.functions import round

# Convert 'mile' to 'km' and drop 'mile' column
flights_km = flights.withColumn('km', round(flights.mile * 1.60934, 0)) \
                    .drop('mile')

# Create 'label' column indicating whether flight delayed (1) or not (0)
flights_km = flights_km.withColumn('label', (flights_km.delay >= 15).cast('integer'))

# Check first five records
flights_km.show(5)

+---+---+---+-------+---+------+--------+-----+------+-----+
|mon|dom|dow|carrier|org|depart|duration|delay|    km|label|
+---+---+---+-------+---+------+--------+-----+------+-----+
| 10| 10|  1|     OO|ORD|  8.18|      51|   27| 253.0|    1|
| 11| 22|  1|     OO|ORD|  7.17|     127|  -19|1188.0|    0|
|  2| 14|  5|     B6|JFK| 21.17|     365|   60|3618.0|    1|
|  5| 25|  3|     WN|SJC| 12.92|      85|   22| 621.0|    1|
|  3| 28|  1|     B6|LGA| 13.33|     182|   70|1732.0|    1|
+---+---+---+-------+---+------+--------+-----+------+-----+
only showing top 5 rows



In [15]:
flights=flights_km

In [16]:
from pyspark.ml.feature import StringIndexer

# Create an indexer
indexer = StringIndexer(inputCol='carrier', outputCol='carrier_idx')

# Indexer identifies categories in the data
indexer_model = indexer.fit(flights)

# Indexer creates a new column with numeric index values
flights_indexed = indexer_model.transform(flights)

# Repeat the process for the other categorical feature
flights_indexed = StringIndexer(inputCol='org', outputCol='org_idx').fit(flights_indexed).transform(flights_indexed)

In [17]:
flights_indexed.show(5)

+---+---+---+-------+---+------+--------+-----+------+-----+-----------+-------+
|mon|dom|dow|carrier|org|depart|duration|delay|    km|label|carrier_idx|org_idx|
+---+---+---+-------+---+------+--------+-----+------+-----+-----------+-------+
| 10| 10|  1|     OO|ORD|  8.18|      51|   27| 253.0|    1|        2.0|    0.0|
| 11| 22|  1|     OO|ORD|  7.17|     127|  -19|1188.0|    0|        2.0|    0.0|
|  2| 14|  5|     B6|JFK| 21.17|     365|   60|3618.0|    1|        4.0|    2.0|
|  5| 25|  3|     WN|SJC| 12.92|      85|   22| 621.0|    1|        3.0|    5.0|
|  3| 28|  1|     B6|LGA| 13.33|     182|   70|1732.0|    1|        4.0|    3.0|
+---+---+---+-------+---+------+--------+-----+------+-----+-----------+-------+
only showing top 5 rows



In [18]:
flights=flights_indexed

In [19]:
# Import the necessary class
from pyspark.ml.feature import VectorAssembler

# Create an assembler object
assembler = VectorAssembler(inputCols=['mon', 'dom', 'dow', 'carrier_idx', 'org_idx', 'km', 'depart', 'duration'],outputCol='features')

# Consolidate predictor columns
flights_assembled = assembler.transform(flights)

# Check the resulting column
flights_assembled.select('features', 'delay').show(5, truncate=False)

+-----------------------------------------+-----+
|features                                 |delay|
+-----------------------------------------+-----+
|[10.0,10.0,1.0,2.0,0.0,253.0,8.18,51.0]  |27   |
|[11.0,22.0,1.0,2.0,0.0,1188.0,7.17,127.0]|-19  |
|[2.0,14.0,5.0,4.0,2.0,3618.0,21.17,365.0]|60   |
|[5.0,25.0,3.0,3.0,5.0,621.0,12.92,85.0]  |22   |
|[3.0,28.0,1.0,4.0,3.0,1732.0,13.33,182.0]|70   |
+-----------------------------------------+-----+
only showing top 5 rows



ONE HOT ENCODER   DON'T USE IT FOR FLIGHT-LARGER SET

In [None]:
# Import the one hot encoder class
from pyspark.ml.feature import OneHotEncoderEstimator

# Create an instance of the one hot encoder
onehot = OneHotEncoderEstimator(inputCols=['org_idx'], outputCols=['org_dummy'])

# Apply the one hot encoder to the flights data
onehot = onehot.fit(flights)
flights_onehot = onehot.transform(flights)

# Check the results
flights_onehot.select('org', 'org_idx', 'org_dummy').distinct().sort('org_idx').show()

+---+-------+-------------+
|org|org_idx|    org_dummy|
+---+-------+-------------+
|ORD|    0.0|(7,[0],[1.0])|
|SFO|    1.0|(7,[1],[1.0])|
|JFK|    2.0|(7,[2],[1.0])|
|LGA|    3.0|(7,[3],[1.0])|
|SMF|    4.0|(7,[4],[1.0])|
|SJC|    5.0|(7,[5],[1.0])|
|TUS|    6.0|(7,[6],[1.0])|
|OGG|    7.0|    (7,[],[])|
+---+-------+-------------+



In [20]:
flights=flights_assembled

In [21]:
# Split into training and testing sets in a 80:20 ratio
flights_train, flights_test = flights.randomSplit([0.8, 0.2], seed=17)

# Check that training set has around 80% of records
training_ratio = flights_train.count() / flights.count()
print(training_ratio)

0.798506921794593


In [22]:
flights_train.show(3)

+---+---+---+-------+---+------+--------+-----+------+-----+-----------+-------+--------------------+
|mon|dom|dow|carrier|org|depart|duration|delay|    km|label|carrier_idx|org_idx|            features|
+---+---+---+-------+---+------+--------+-----+------+-----+-----------+-------+--------------------+
|  0|  1|  2|     AA|JFK|  6.58|     230|   50|2570.0|    1|        1.0|    2.0|[0.0,1.0,2.0,1.0,...|
|  0|  1|  2|     AA|JFK| 15.75|     185|   16|1519.0|    1|        1.0|    2.0|[0.0,1.0,2.0,1.0,...|
|  0|  1|  2|     AA|LGA|  11.5|     195|  -11|1765.0|    0|        1.0|    3.0|[0.0,1.0,2.0,1.0,...|
+---+---+---+-------+---+------+--------+-----+------+-----+-----------+-------+--------------------+
only showing top 3 rows



In [23]:
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator

# Create a regression object and train on training data
regression = LinearRegression(labelCol='delay').fit(flights_train)



In [24]:
# Create predictions for the testing data and take a look at the predictions
predictions = regression.transform(flights_test)
predictions.select('delay', 'prediction').show(5, False)

# Calculate the RMSE
RegressionEvaluator(labelCol='delay').evaluate(predictions)

+-----+------------------+
|delay|prediction        |
+-----+------------------+
|31   |33.330984436174916|
|52   |12.818073524277002|
|109  |16.498319808799558|
|21   |35.00913620284559 |
|22   |39.750432756541635|
+-----+------------------+
only showing top 5 rows



51.11496825480485