<h1> Install pyspark Library </h1>

In [1]:
!pip install pyspark 



In [2]:
#import pyspark library
import pyspark 

In [3]:
#import spark session library 
from pyspark.sql import SparkSession 

In [4]:
#Create SparkSession object
spark = SparkSession.builder \
            .master('local[*]') \
            .appName('multi_regression') \
            .getOrCreate()

<h1> Create DataFrame </h1>

In [5]:
#To create dataframe form external datasets
df = spark.read.option("header","true").csv(r"C:\Users\User\Downloads\partitions\Data\*")

In [6]:
df.show()

+----+-------+-----+----------+---------+----------+-------------+---------+-------+-------+---------+---------------+------------------+------------------+------+--------------+-----------+---------------+---------------+---------+-------------+----------------+----------------+----+---------------+---------+-------------+-------------+-------+----------+-------+--------+---------------+--------+--------------------+----------+-------+---------+--------+------+----------+-------+--------+---------------+--------+------------------+----------+---------+----------------+--------+--------------+-----------------+-------+-------+--------+-------------+------------+------------+--------+-------------+-----------------+------------+-------------+---------------+------------------+--------------+--------------------+-----------+-----------+-----------+-------------+----------------+------------+--------------+----------------+-------------+-----------+-----------+-------------+--------------

<h1> Data Preparation </h1>

In [7]:
#create new dataframe as per required columns for prediction
AirlineDF = df.select("Origin", "Dest", "AirTime", "Distance", "DepDelayMinutes", "ArrDelayMinutes")

In [8]:
#cache data in-memory
AirlineDF.cache()

DataFrame[Origin: string, Dest: string, AirTime: string, Distance: string, DepDelayMinutes: string, ArrDelayMinutes: string]

In [9]:
AirlineDF.show()

+------+----+-------+--------+---------------+---------------+
|Origin|Dest|AirTime|Distance|DepDelayMinutes|ArrDelayMinutes|
+------+----+-------+--------+---------------+---------------+
|   JFK| LAX| 338.00| 2475.00|           1.00|           4.00|
|   JFK| LAX| 349.00| 2475.00|           0.00|          26.00|
|   JFK| LAX| 370.00| 2475.00|           1.00|          28.00|
|   JFK| LAX| 350.00| 2475.00|           0.00|          56.00|
|   JFK| LAX| 335.00| 2475.00|           0.00|           0.00|
|   JFK| LAX| 336.00| 2475.00|           0.00|           0.00|
|   JFK| LAX| 380.00| 2475.00|           0.00|          35.00|
|   JFK| LAX| 359.00| 2475.00|           0.00|          17.00|
|   JFK| LAX| 368.00| 2475.00|           0.00|          58.00|
|   JFK| LAX| 356.00| 2475.00|           0.00|           7.00|
|   JFK| LAX| 353.00| 2475.00|           0.00|           7.00|
|   JFK| LAX| 332.00| 2475.00|           0.00|           0.00|
|   JFK| LAX| 339.00| 2475.00|           0.00|         

In [10]:
#check data types of each columns 
AirlineDF.printSchema()

root
 |-- Origin: string (nullable = true)
 |-- Dest: string (nullable = true)
 |-- AirTime: string (nullable = true)
 |-- Distance: string (nullable = true)
 |-- DepDelayMinutes: string (nullable = true)
 |-- ArrDelayMinutes: string (nullable = true)



<h1> We need to change data types of columns </h1>

In [12]:
#import sql library for data types 
from pyspark.sql.types import IntegerType, DoubleType

In [13]:
AirlineDF = AirlineDF.withColumn("Distance", AirlineDF["Distance"].cast(IntegerType()))
AirlineDF = AirlineDF.withColumn("AirTime", AirlineDF["AirTime"].cast(IntegerType()))
AirlineDF = AirlineDF.withColumn("DepDelayMinutes", AirlineDF["DepDelayminutes"].cast(DoubleType()))
AirlineDF = AirlineDF.withColumn("ArrDelayMinutes", AirlineDF["ArrDelayMinutes"].cast(DoubleType()))

In [14]:
AirlineDF.printSchema()

root
 |-- Origin: string (nullable = true)
 |-- Dest: string (nullable = true)
 |-- AirTime: integer (nullable = true)
 |-- Distance: integer (nullable = true)
 |-- DepDelayMinutes: double (nullable = true)
 |-- ArrDelayMinutes: double (nullable = true)



<h1> Check is there any null values in DataFrame </h1>

In [15]:
#import library for sql function col
from pyspark.sql.functions import col 

In [16]:
#check null values in each columns 
print(AirlineDF.where(col("Origin").isNull()).count())
print(AirlineDF.where(col("Dest").isNull()).count())
print(AirlineDF.where(col("AirTime").isNull()).count())
print(AirlineDF.where(col("Distance").isNull()).count())
print(AirlineDF.where(col("DepDelayMinutes").isNull()).count())
print(AirlineDF.where(col("ArrDelayMinutes").isNull()).count())

0
0
184800
0
167689
184800


<h1> we need to handle null values </h1>

In [17]:
#now drop row as correspondace to null values 
AirlineDF = AirlineDF.dropna()

In [18]:
#check null values in each columns 
print(AirlineDF.where(col("Origin").isNull()).count())
print(AirlineDF.where(col("Dest").isNull()).count())
print(AirlineDF.where(col("AirTime").isNull()).count())
print(AirlineDF.where(col("Distance").isNull()).count())
print(AirlineDF.where(col("DepDelayMinutes").isNull()).count())
print(AirlineDF.where(col("ArrDelayMinutes").isNull()).count())

0
0
0
0
0
0


<h1> we need convert distance miles into kilometers </h1>

In [19]:
from pyspark.sql.functions import round

#Convert 'mile' to 'km'
AirlineDF = AirlineDF.withColumn('Distance', round(AirlineDF.Distance * 1.60934, 0))
AirlineDF.show()

+------+----+-------+--------+---------------+---------------+
|Origin|Dest|AirTime|Distance|DepDelayMinutes|ArrDelayMinutes|
+------+----+-------+--------+---------------+---------------+
|   JFK| LAX|    338|  3983.0|            1.0|            4.0|
|   JFK| LAX|    349|  3983.0|            0.0|           26.0|
|   JFK| LAX|    370|  3983.0|            1.0|           28.0|
|   JFK| LAX|    350|  3983.0|            0.0|           56.0|
|   JFK| LAX|    335|  3983.0|            0.0|            0.0|
|   JFK| LAX|    336|  3983.0|            0.0|            0.0|
|   JFK| LAX|    380|  3983.0|            0.0|           35.0|
|   JFK| LAX|    359|  3983.0|            0.0|           17.0|
|   JFK| LAX|    368|  3983.0|            0.0|           58.0|
|   JFK| LAX|    356|  3983.0|            0.0|            7.0|
|   JFK| LAX|    353|  3983.0|            0.0|            7.0|
|   JFK| LAX|    332|  3983.0|            0.0|            0.0|
|   JFK| LAX|    339|  3983.0|            0.0|         

In [20]:
AirlineDF.printSchema()

root
 |-- Origin: string (nullable = true)
 |-- Dest: string (nullable = true)
 |-- AirTime: integer (nullable = true)
 |-- Distance: double (nullable = true)
 |-- DepDelayMinutes: double (nullable = true)
 |-- ArrDelayMinutes: double (nullable = true)



<h1> Vectorize the features </h1>

In [21]:
from pyspark.ml.feature import *

In [22]:
from pyspark.ml.feature import VectorAssembler

In [23]:
vectorizer = VectorAssembler()
vectorizer.setInputCols(['Distance','DepDelayMinutes','ArrDelayMinutes'])
vectorizer.setOutputCol('features')

df_vect = vectorizer.setHandleInvalid("keep").transform(AirlineDF)

In [24]:
df_vect.show()

+------+----+-------+--------+---------------+---------------+-----------------+
|Origin|Dest|AirTime|Distance|DepDelayMinutes|ArrDelayMinutes|         features|
+------+----+-------+--------+---------------+---------------+-----------------+
|   JFK| LAX|    338|  3983.0|            1.0|            4.0| [3983.0,1.0,4.0]|
|   JFK| LAX|    349|  3983.0|            0.0|           26.0|[3983.0,0.0,26.0]|
|   JFK| LAX|    370|  3983.0|            1.0|           28.0|[3983.0,1.0,28.0]|
|   JFK| LAX|    350|  3983.0|            0.0|           56.0|[3983.0,0.0,56.0]|
|   JFK| LAX|    335|  3983.0|            0.0|            0.0| [3983.0,0.0,0.0]|
|   JFK| LAX|    336|  3983.0|            0.0|            0.0| [3983.0,0.0,0.0]|
|   JFK| LAX|    380|  3983.0|            0.0|           35.0|[3983.0,0.0,35.0]|
|   JFK| LAX|    359|  3983.0|            0.0|           17.0|[3983.0,0.0,17.0]|
|   JFK| LAX|    368|  3983.0|            0.0|           58.0|[3983.0,0.0,58.0]|
|   JFK| LAX|    356|  3983.

In [25]:
print(vectorizer.explainParams())

handleInvalid: How to handle invalid data (NULL and NaN values). Options are 'skip' (filter out rows with invalid data), 'error' (throw an error), or 'keep' (return relevant number of NaN in the output). Column lengths are taken from the size of ML Attribute Group, which can be set using `VectorSizeHint` in a pipeline before `VectorAssembler`. Column lengths can also be inferred from first rows of the data since it is safe to do so but only in case of 'error' or 'skip'). (default: error, current: keep)
inputCols: input column names. (current: ['Distance', 'DepDelayMinutes', 'ArrDelayMinutes'])
outputCol: output column name. (default: VectorAssembler_23387f29bac7__output, current: features)


<h1> Train Test data Splitting </h1>

In [28]:
flights_train, flights_test = df_vect.randomSplit([0.8, 0.2])

In [29]:
flights_train.show()

+------+----+-------+--------+---------------+---------------+-----------------+
|Origin|Dest|AirTime|Distance|DepDelayMinutes|ArrDelayMinutes|         features|
+------+----+-------+--------+---------------+---------------+-----------------+
|   ABE| CLT|     70|   774.0|            3.0|           10.0| [774.0,3.0,10.0]|
|   ABE| CLT|     77|   774.0|            0.0|            6.0|  [774.0,0.0,6.0]|
|   ABE| CLT|     77|   774.0|           50.0|           31.0|[774.0,50.0,31.0]|
|   ABE| CLT|     78|   774.0|            5.0|            2.0|  [774.0,5.0,2.0]|
|   ABE| CLT|     79|   774.0|            0.0|            0.0|  [774.0,0.0,0.0]|
|   ABE| CLT|     79|   774.0|            0.0|            0.0|  [774.0,0.0,0.0]|
|   ABE| CLT|     82|   774.0|            0.0|            0.0|  [774.0,0.0,0.0]|
|   ABE| CLT|     82|   774.0|            0.0|            0.0|  [774.0,0.0,0.0]|
|   ABE| CLT|     83|   774.0|            0.0|            0.0|  [774.0,0.0,0.0]|
|   ABE| CLT|     83|   774.

<h1> Regression Model Training </h1>

In [27]:
from pyspark.ml.regression import LinearRegression 

In [30]:
lr = LinearRegression()
print(lr.explainParams())

aggregationDepth: suggested depth for treeAggregate (>= 2). (default: 2)
elasticNetParam: the ElasticNet mixing parameter, in range [0, 1]. For alpha = 0, the penalty is an L2 penalty. For alpha = 1, it is an L1 penalty. (default: 0.0)
epsilon: The shape parameter to control the amount of robustness. Must be > 1.0. Only valid when loss is huber (default: 1.35)
featuresCol: features column name. (default: features)
fitIntercept: whether to fit an intercept term. (default: True)
labelCol: label column name. (default: label)
loss: The loss function to be optimized. Supported options: squaredError, huber. (default: squaredError)
maxBlockSizeInMB: maximum memory in MB for stacking input data into blocks. Data is stacked within partitions. If more than remaining data size in a partition then it is adjusted to the data size. Default 0.0 represents choosing optimal value, depends on specific algorithm. Must be >= 0. (default: 0.0)
maxIter: max number of iterations (>= 0). (default: 100)
predic

In [31]:
lr.setLabelCol('AirTime')
lr.setFeaturesCol('features')
model = lr.fit(flights_train)

In [32]:
type(model)

pyspark.ml.regression.LinearRegressionModel

<h1> View model summary </h1>

In [33]:
print("R2:", model.summary.r2)
print("Intercept:", model.intercept, "Coefficients", model.coefficients)

R2: 0.9325119607396892
Intercept: 20.057105688201478 Coefficients [0.07210055443067806,-0.34339514778644586,0.3496147604470657]


<h1> Model Testing </h1>

In [34]:
df_pred = model.transform(flights_test)
df_pred.show()

+------+----+-------+--------+---------------+---------------+------------------+------------------+
|Origin|Dest|AirTime|Distance|DepDelayMinutes|ArrDelayMinutes|          features|        prediction|
+------+----+-------+--------+---------------+---------------+------------------+------------------+
|   ABE| CLT|     72|   774.0|           19.0|            4.0|  [774.0,19.0,4.0]| 70.73688605139209|
|   ABE| CLT|     74|   774.0|            0.0|            0.0|   [774.0,0.0,0.0]| 75.86293481754629|
|   ABE| CLT|     76|   774.0|            0.0|            0.0|   [774.0,0.0,0.0]| 75.86293481754629|
|   ABE| CLT|     77|   774.0|            0.0|            0.0|   [774.0,0.0,0.0]| 75.86293481754629|
|   ABE| CLT|     77|   774.0|            0.0|            0.0|   [774.0,0.0,0.0]| 75.86293481754629|
|   ABE| CLT|     89|   774.0|            0.0|           12.0|  [774.0,0.0,12.0]| 80.05831194291108|
|   ABE| CLT|     90|   774.0|            3.0|            0.0|   [774.0,3.0,0.0]| 74.832749