# PySpark

## Installing the required Pyspark library

In [None]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.2.1.tar.gz (281.4 MB)
[K     |████████████████████████████████| 281.4 MB 30 kB/s 
[?25hCollecting py4j==0.10.9.3
  Downloading py4j-0.10.9.3-py2.py3-none-any.whl (198 kB)
[K     |████████████████████████████████| 198 kB 13.8 MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.2.1-py2.py3-none-any.whl size=281853642 sha256=c6461bb73a4383dfa89675153b7222c38ef721031e4f6db29493c98bcd58128c
  Stored in directory: /root/.cache/pip/wheels/9f/f5/07/7cd8017084dce4e93e84e92efd1e1d5334db05f2e83bcef74f
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9.3 pyspark-3.2.1


In [None]:
#Initializing PySpark
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
from pyspark.sql import SQLContext

# #Spark Config
conf = SparkConf().setAppName("AirLine_app")
sc = SparkContext(conf=conf)
spark = SparkSession.builder.appName('Air Line data Analysis').getOrCreate()
sqlContext = SQLContext(sc)



## Reading the CSV file into Data Frame

In [None]:
# Reading the csv file into airline_df data frame
airline_df = spark.read.format("csv").option("header", "true").option("inferSchema","true").load("/content/Airline_data.csv")


### 1. Show a sample of 5 records from dataset.

In [None]:
# printing 5 records from data frame
airline_df.show(5)

+----+-----+----------+---------+-------+----------+-------+----------+-------------+---------+-------+-----------------+--------------+-------+--------+--------+------+----+--------+------+-------+---------+----------------+--------+------------+------------+--------+-------------+-----------------+
|Year|Month|DayofMonth|DayOfWeek|DepTime|CRSDepTime|ArrTime|CRSArrTime|UniqueCarrier|FlightNum|TailNum|ActualElapsedTime|CRSElapsedTime|AirTime|ArrDelay|DepDelay|Origin|Dest|Distance|TaxiIn|TaxiOut|Cancelled|CancellationCode|Diverted|CarrierDelay|WeatherDelay|NASDelay|SecurityDelay|LateAircraftDelay|
+----+-----+----------+---------+-------+----------+-------+----------+-------------+---------+-------+-----------------+--------------+-------+--------+--------+------+----+--------+------+-------+---------+----------------+--------+------------+------------+--------+-------------+-----------------+
|1989|    1|        23|        1|   1419|      1230|   1742|      1552|           UA|      183

### 2. Read the data with data types.

In [None]:
# printing the datatypes of columns
airline_df.printSchema()

root
 |-- Year: integer (nullable = true)
 |-- Month: integer (nullable = true)
 |-- DayofMonth: integer (nullable = true)
 |-- DayOfWeek: integer (nullable = true)
 |-- DepTime: integer (nullable = true)
 |-- CRSDepTime: integer (nullable = true)
 |-- ArrTime: integer (nullable = true)
 |-- CRSArrTime: integer (nullable = true)
 |-- UniqueCarrier: string (nullable = true)
 |-- FlightNum: integer (nullable = true)
 |-- TailNum: string (nullable = true)
 |-- ActualElapsedTime: integer (nullable = true)
 |-- CRSElapsedTime: integer (nullable = true)
 |-- AirTime: string (nullable = true)
 |-- ArrDelay: integer (nullable = true)
 |-- DepDelay: integer (nullable = true)
 |-- Origin: string (nullable = true)
 |-- Dest: string (nullable = true)
 |-- Distance: integer (nullable = true)
 |-- TaxiIn: string (nullable = true)
 |-- TaxiOut: string (nullable = true)
 |-- Cancelled: integer (nullable = true)
 |-- CancellationCode: string (nullable = true)
 |-- Diverted: integer (nullable = true)
 |

### 3. Make a new column MonthStr, Which has months in form of 01, 02, 03, ..., 12.

In [None]:
# importing the required libraries
from pyspark.sql.functions import udf,col
from pyspark.sql.types import StringType


d = {}  # empty list
# Changing the data type of 'Month' column to string
df1 = airline_df.withColumn("Month",airline_df["Month"].cast(StringType()))
for i in df1.collect():
  d[i['Month']] = '0'+i['Month']

def monthstr(Month):
  return str(d.get(Month))
# creating the udf
a = udf(monthstr)
df1 = df1.withColumn("MonthStr",a(col('Month')))   # passing the Month column
df1.show(10) 

+----+-----+----------+---------+-------+----------+-------+----------+-------------+---------+-------+-----------------+--------------+-------+--------+--------+------+----+--------+------+-------+---------+----------------+--------+------------+------------+--------+-------------+-----------------+--------+
|Year|Month|DayofMonth|DayOfWeek|DepTime|CRSDepTime|ArrTime|CRSArrTime|UniqueCarrier|FlightNum|TailNum|ActualElapsedTime|CRSElapsedTime|AirTime|ArrDelay|DepDelay|Origin|Dest|Distance|TaxiIn|TaxiOut|Cancelled|CancellationCode|Diverted|CarrierDelay|WeatherDelay|NASDelay|SecurityDelay|LateAircraftDelay|MonthStr|
+----+-----+----------+---------+-------+----------+-------+----------+-------------+---------+-------+-----------------+--------------+-------+--------+--------+------+----+--------+------+-------+---------+----------------+--------+------------+------------+--------+-------------+-----------------+--------+
|1989|    1|        23|        1|   1419|      1230|   1742|      1

**In above data frame we can see 'MonthStr' column at the last**



### 4. Find the # of flights each airline made.

In [None]:
# grouping by the UniqueCarrier
n = airline_df.groupBy('UniqueCarrier').count()
n.show()

+-------------+-----+
|UniqueCarrier|count|
+-------------+-----+
|           UA|  426|
+-------------+-----+



### 5. Find the mean Arrival Delay per origination airport.

In [None]:
# Grouping by the Origin and finding the average of Arrival Delay
arrivalDelay_mean = airline_df.groupBy('Origin').mean('ArrDelay').withColumnRenamed('avg(ArrDelay)','Average of Arrival Delay')
arrivalDelay_mean.show()

+------+------------------------+
|Origin|Average of Arrival Delay|
+------+------------------------+
|   LIH|     0.16666666666666666|
|   HNL|       14.21774193548387|
|   EWR|                    9.25|
|   DEN|      20.166666666666668|
|   IAD|      12.966666666666667|
|   SFO|      11.215384615384615|
|   PHL|       6.827586206896552|
|   OGG|       16.24137931034483|
+------+------------------------+



In the above output, there are Average of Arrival Delay for each Oigin

### 6. What is the average departure delay from each airport?

In [None]:
# Grouping by the Origin and finding the average of departure delay
departureDelay_mean = airline_df.groupBy('Origin').mean('DepDelay').withColumnRenamed('avg(DepDelay)','Average of Departure Delay')
departureDelay_mean.show()

+------+--------------------------+
|Origin|Average of Departure Delay|
+------+--------------------------+
|   LIH|       -3.7666666666666666|
|   HNL|         3.217741935483871|
|   EWR|         4.958333333333333|
|   DEN|                      27.6|
|   IAD|                       8.9|
|   SFO|        19.646153846153847|
|   PHL|        16.137931034482758|
|   OGG|                       6.0|
+------+--------------------------+



In the above output, there are Average of Depature Delay for each Oigin