#  Creating a spark session

In [85]:
from pyspark.sql import SparkSession

In [57]:
# importing data in spark in proper format of the dtypes
df = sqlContext.read.format('com.databricks.spark.csv').options(header='true', inferschema='true').load("flights.csv")
flights=df

In [58]:
# converting dtypes from string to int
from pyspark.sql.types import IntegerType
df = df.withColumn("hour", df["hour"].cast(IntegerType()))
df = df.withColumn("minute", df["minute"].cast(IntegerType()))
df = df.withColumn("air_time", df["air_time"].cast(IntegerType()))
df = df.withColumn("arr_time", df["arr_time"].cast(IntegerType()))

In [4]:
# converting spark dataframe into pandas
d1=df.toPandas()

In [5]:
# creating temporary spark table
spark_temp=spark.createDataFrame(d1)

spark_temp=spark_temp.createOrReplaceTempView('temp')

In [9]:
# get all list of tables
print(spark.catalog.listTables())

AnalysisException: 'java.lang.RuntimeException: java.lang.RuntimeException: Unable to instantiate org.apache.hadoop.hive.ql.metadata.SessionHiveMetaStoreClient;'

In [8]:
# creating columns in dataframe
    # SparkDataFrame.withcolumns(new_col , df.old_col with changes)
        
df=df.withColumn("duration_hrs",df.air_time/60)

                    # OR 
avg_speed=(df.distance/(df.air_time/60)).alias("avg_speed")

# This column can be accessed by using selecting
df.select("origin", "dest", "tailnum", avg_speed).show()

+------+----+-------+------------------+
|origin|dest|tailnum|         avg_speed|
+------+----+-------+------------------+
|   SEA| LAX| N846VA| 433.6363636363636|
|   SEA| HNL| N559AS| 446.1666666666667|
|   SEA| SFO| N847VA|367.02702702702703|
|   PDX| SJC| N360SW| 411.3253012048193|
|   SEA| BUR| N612AS| 442.6771653543307|
|   PDX| DEN| N646SW|491.40495867768595|
|   PDX| OAK| N422WN|             362.0|
|   SEA| SFO| N361VA| 415.7142857142857|
|   SEA| SAN| N309AS| 466.6666666666667|
|   SEA| ORD| N564AS| 521.5151515151515|
|   SEA| LAX| N323AS| 440.3076923076923|
|   SEA| PHX| N305AS|431.29870129870125|
|   SEA| LAS| N433AS| 409.6062992125984|
|   SEA| ANC| N765AS|474.75409836065575|
|   SEA| SFO| N713AS| 315.8139534883721|
|   PDX| SFO| N27205| 366.6666666666667|
|   SEA| SMF| N626AS|477.63157894736844|
|   SEA| MDW| N8634A|481.38888888888886|
|   SEA| BOS| N597AS| 516.4137931034483|
|   PDX| BUR| N215AG| 441.6216216216216|
+------+----+-------+------------------+
only showing top

In [7]:
# We can create with select expression
df.selectExpr("origin", "dest", "tailnum", "distance/(air_time/60) as avg_speed").show()

+------+----+-------+------------------+
|origin|dest|tailnum|         avg_speed|
+------+----+-------+------------------+
|   SEA| LAX| N846VA| 433.6363636363636|
|   SEA| HNL| N559AS| 446.1666666666667|
|   SEA| SFO| N847VA|367.02702702702703|
|   PDX| SJC| N360SW| 411.3253012048193|
|   SEA| BUR| N612AS| 442.6771653543307|
|   PDX| DEN| N646SW|491.40495867768595|
|   PDX| OAK| N422WN|             362.0|
|   SEA| SFO| N361VA| 415.7142857142857|
|   SEA| SAN| N309AS| 466.6666666666667|
|   SEA| ORD| N564AS| 521.5151515151515|
|   SEA| LAX| N323AS| 440.3076923076923|
|   SEA| PHX| N305AS|431.29870129870125|
|   SEA| LAS| N433AS| 409.6062992125984|
|   SEA| ANC| N765AS|474.75409836065575|
|   SEA| SFO| N713AS| 315.8139534883721|
|   PDX| SFO| N27205| 366.6666666666667|
|   SEA| SMF| N626AS|477.63157894736844|
|   SEA| MDW| N8634A|481.38888888888886|
|   SEA| BOS| N597AS| 516.4137931034483|
|   PDX| BUR| N215AG| 441.6216216216216|
+------+----+-------+------------------+
only showing top

In [10]:
# To get a view on a column
df.select("duration_hrs").show()

+------------------+
|      duration_hrs|
+------------------+
|               2.2|
|               6.0|
|              1.85|
|1.3833333333333333|
|2.1166666666666667|
|2.0166666666666666|
|               1.5|
|1.6333333333333333|
|              2.25|
|               3.3|
|2.1666666666666665|
| 2.566666666666667|
|2.1166666666666667|
|              3.05|
|              2.15|
|               1.5|
|1.2666666666666666|
|               3.6|
| 4.833333333333333|
|              1.85|
+------------------+
only showing top 20 rows



In [11]:
# Filtering data 
longest_flights=df.filter("distance>1000")
                        # OR
df.origin == "SEA"

Column<b'(origin = SEA)'>

In [12]:
# Filtering data basis of 2 factors( Origin of flight and destinantion of flight)
A = df.origin == "SEA"
B = df.dest == "PDX"
C=df.filter(A).filter(B)

In [13]:
# count total numbers of rows in dataframe
df.count()
longest_flights.count()

4883

In [14]:
# can select custom column from dataframes

selected1=df.select("air_time",'year','carrier')
selected2=df.select(df.air_time , df.year , df.carrier)

In [15]:
#df.filter(df.dest == "SEA").groupBy().min().show()
                 # OR
df.filter(df.dest == "SEA").groupBy().min().show()

+---------+----------+--------+-------------+-----------+-------------+-------------+---------+-----------+-------------------+
|min(year)|min(month)|min(day)|min(arr_time)|min(flight)|min(air_time)|min(distance)|min(hour)|min(minute)|  min(duration_hrs)|
+---------+----------+--------+-------------+-----------+-------------+-------------+---------+-----------+-------------------+
|     2014|         1|       1|          652|        413|           26|          129|        6|          0|0.43333333333333335|
+---------+----------+--------+-------------+-----------+-------------+-------------+---------+-----------+-------------------+



In [16]:
by_planes=df.groupBy('tailnum').count()

In [17]:
# import function package to solve standard deviation
import pyspark.sql.functions as F

# Group by month and dest
by_month_dest = df.groupBy("month","dest")

# Average departure delay by month and destination
by_month_dest.avg().show()

+-----+----+---------+----------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+-------------------+
|month|dest|avg(year)|avg(month)|          avg(day)|     avg(arr_time)|       avg(flight)|     avg(air_time)|     avg(distance)|         avg(hour)|       avg(minute)|  avg(duration_hrs)|
+-----+----+---------+----------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+-------------------+
|    4| PHX|   2014.0|       4.0|13.083333333333334|1538.0833333333333| 741.4166666666666|135.73333333333332|1074.3333333333333|12.516666666666667|28.783333333333335|  2.262222222222222|
|    1| RDM|   2014.0|       1.0|             11.25|           1845.75|            5384.0|            27.375|             116.0|              17.5|             31.75|0.45625000000000004|
|    5| ONT|   2014.0|       5.0| 16.22222222222222|1371.33333333

# airport dataset

In [18]:
airports = sqlContext.read.format('com.databricks.spark.csv').options(header='true', inferschema='true').load("airports.csv")

In [19]:
          # airports.show()
# now rename the column
airports=airports.withColumnRenamed("faa","dest")
airports.show()

+----+--------------------+----------------+-----------------+----+---+---+
|dest|                name|             lat|              lon| alt| tz|dst|
+----+--------------------+----------------+-----------------+----+---+---+
| 04G|   Lansdowne Airport|      41.1304722|      -80.6195833|1044| -5|  A|
| 06A|Moton Field Munic...|      32.4605722|      -85.6800278| 264| -5|  A|
| 06C| Schaumburg Regional|      41.9893408|      -88.1012428| 801| -6|  A|
| 06N|     Randall Airport|       41.431912|      -74.3915611| 523| -5|  A|
| 09J|Jekyll Island Air...|      31.0744722|      -81.4277778|  11| -4|  A|
| 0A9|Elizabethton Muni...|      36.3712222|      -82.1734167|1593| -4|  A|
| 0G6|Williams County A...|      41.4673056|      -84.5067778| 730| -5|  A|
| 0G7|Finger Lakes Regi...|      42.8835647|      -76.7812318| 492| -5|  A|
| 0P2|Shoestring Aviati...|      39.7948244|      -76.6471914|1000| -5|  U|
| 0S9|Jefferson County ...|      48.0538086|     -122.8106436| 108| -8|  A|
| 0W3|Harfor

In [20]:
# now joining the tables(airports with flights)
flight_with_airports=flights.join(airports,on="dest",how="leftouter")

# Importing planes dataset

In [6]:
planes=sqlContext.read.format('com.databricks.spark.csv').options(header='True',inferschema='True').load('planes.csv')

In [23]:
flights_with_planes=flights.join(planes,on="tailnum",how="leftouter")

In [25]:
     # get all flights late in yes and no

    # Create is_late(Getting values in True and False)

flight1 = flights_with_planes.withColumn("is_late",flights_with_planes.arr_delay>0)

flight1.select("is_late").dtypes   # As this is in boolean type convert integer type

flight1=flight1.withColumn("label",flight1.is_late.cast("integer"))

flight1 = flight1.filter("arr_delay is not NULL and dep_delay is not NULL and air_time is not NULL ")



+-------+----+-----+---+--------+---------+--------+---------+-------+------+------+----+--------+--------+----+------+----+--------------------+--------------+-----------+-------+-----+-----+---------+-------+-----+
|tailnum|year|month|day|dep_time|dep_delay|arr_time|arr_delay|carrier|flight|origin|dest|air_time|distance|hour|minute|year|                type|  manufacturer|      model|engines|seats|speed|   engine|is_late|label|
+-------+----+-----+---+--------+---------+--------+---------+-------+------+------+----+--------+--------+----+------+----+--------------------+--------------+-----------+-------+-----+-----+---------+-------+-----+
| N846VA|2014|   12|  8|     658|       -7|     935|       -5|     VX|  1780|   SEA| LAX|     132|     954|   6|    58|2011|Fixed wing multi ...|        AIRBUS|   A320-214|      2|  182|   NA|Turbo-fan|  false|    0|
| N559AS|2014|    1| 22|    1040|        5|    1505|        5|     AS|   851|   SEA| HNL|     360|    2677|  10|    40|2006|Fixed wi

# Building Machine Learning pipeline

In [29]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer

In [31]:
            # Create a StringIndexer
carr_indexer = StringIndexer()
        # Create a OneHotEncoder
carr_encoder = OneHotEncoder(inputCol="carrier_index",outputCol="carrier_fact")
# Make a VectorAssembler
vec_assembler = VectorAssembler(inputCols=["month", "air_time", "carrier_fact", "dest_fact", "plane_age"], outputCol="features")


In [32]:
# Make the pipeline
flights_pipe = Pipeline(stages=[dest_indexer, dest_encoder, carr_indexer, carr_encoder, vec_assembler])

NameError: name 'dest_indexer' is not defined

In [10]:
a = [('g1', 2), ('g3', 4), ('g2', 3), ('g4', 8)]

In [11]:
a

[('g1', 2), ('g3', 4), ('g2', 3), ('g4', 8)]

In [12]:
rdd = sc.parallelize(a);
rdd.collect()

[('g1', 2), ('g3', 4), ('g2', 3), ('g4', 8)]

In [15]:
sorted = rdd.sortByKey()
sorted.collect()

[('g1', 2), ('g2', 3), ('g3', 4), ('g4', 8)]

In [18]:
sorted = rdd.sortByKey(False)
sorted.collect()

[('g4', 8), ('g3', 4), ('g2', 3), ('g1', 2)]

In [21]:
indices = sorted.zipWithIndex()
indices.collect()

[(('g4', 8), 0), (('g3', 4), 1), (('g2', 3), 2), (('g1', 2), 3)]

In [22]:
planes.columns

['tailnum',
 'year',
 'type',
 'manufacturer',
 'model',
 'engines',
 'seats',
 'speed',
 'engine']

In [59]:

planes = planes.withColumn("speed", planes["speed"].cast(IntegerType()))

In [62]:
planes.dtypes

[('tailnum', 'string'),
 ('year', 'string'),
 ('type', 'string'),
 ('manufacturer', 'string'),
 ('model', 'string'),
 ('engines', 'int'),
 ('seats', 'int'),
 ('speed', 'int'),
 ('engine', 'string')]

In [69]:
from pyspark.sql.functions import isnan, when, count, col

In [84]:
# count total null values 
planes.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in planes.columns]).show()
#(a/planes.count()).show()
planes.count()

+-------+----+----+------------+-----+-------+-----+-----+------+
|tailnum|year|type|manufacturer|model|engines|seats|speed|engine|
+-------+----+----+------------+-----+-------+-----+-----+------+
|      0|   0|   0|           0|    0|      0|    0| 2622|     0|
+-------+----+----+------------+-----+-------+-----+-----+------+



2628

In [17]:
from pyspark.sql import Row
from pyspark.ml.linalg import Vectors

In [34]:
x=planes.select(planes.columns[:-1])
y=planes.select(planes.columns[-1:])

In [35]:
x.show()

+-------+----+--------------------+----------------+--------+-------+-----+-----+
|tailnum|year|                type|    manufacturer|   model|engines|seats|speed|
+-------+----+--------------------+----------------+--------+-------+-----+-----+
| N102UW|1998|Fixed wing multi ...|AIRBUS INDUSTRIE|A320-214|      2|  182|   NA|
| N103US|1999|Fixed wing multi ...|AIRBUS INDUSTRIE|A320-214|      2|  182|   NA|
| N104UW|1999|Fixed wing multi ...|AIRBUS INDUSTRIE|A320-214|      2|  182|   NA|
| N105UW|1999|Fixed wing multi ...|AIRBUS INDUSTRIE|A320-214|      2|  182|   NA|
| N107US|1999|Fixed wing multi ...|AIRBUS INDUSTRIE|A320-214|      2|  182|   NA|
| N108UW|1999|Fixed wing multi ...|AIRBUS INDUSTRIE|A320-214|      2|  182|   NA|
| N109UW|1999|Fixed wing multi ...|AIRBUS INDUSTRIE|A320-214|      2|  182|   NA|
| N110UW|1999|Fixed wing multi ...|AIRBUS INDUSTRIE|A320-214|      2|  182|   NA|
| N111US|1999|Fixed wing multi ...|AIRBUS INDUSTRIE|A320-214|      2|  182|   NA|
| N11206|2000|Fi