In [1]:
import findspark
findspark.init('/home/karan/spark-2.1.0-bin-hadoop2.7')
import pyspark
from pyspark.sql import SparkSession
Spark = SparkSession.builder.appName('ReadingeFlightCSV').getOrCreate()
from pyspark.sql.types import *

In [2]:
df = Spark.read.format("csv").option("header","true").load("/home/karan/Downloads/619927769_T_ONTIME_REPORTING.csv")

In [3]:
df.printSchema()

root
 |-- DAY_OF_WEEK: string (nullable = true)
 |-- OP_UNIQUE_CARRIER: string (nullable = true)
 |-- ORIGIN_AIRPORT_ID: string (nullable = true)
 |-- ORIGIN_AIRPORT_SEQ_ID: string (nullable = true)
 |-- ORIGIN_CITY_MARKET_ID: string (nullable = true)
 |-- ORIGIN: string (nullable = true)
 |-- DEST_AIRPORT_ID: string (nullable = true)
 |-- DEST_AIRPORT_SEQ_ID: string (nullable = true)
 |-- DEST_CITY_MARKET_ID: string (nullable = true)
 |-- DEST: string (nullable = true)
 |-- CRS_DEP_TIME: string (nullable = true)
 |-- DEP_DELAY: string (nullable = true)
 |-- CRS_ARR_TIME: string (nullable = true)
 |-- ARR_DELAY: string (nullable = true)
 |-- CRS_ELAPSED_TIME: string (nullable = true)
 |-- DISTANCE: string (nullable = true)
 |-- _c16: string (nullable = true)



In [4]:
df.createOrReplaceTempView('flights')

In [5]:
sqlDF = Spark.sql("SELECT CONCAT(ORIGIN,'_',DEST) AS ORG_DEST, COUNT(DEP_DELAY) AS DEP_DELAY_COUNT FROM flights WHERE DEP_DELAY>40 GROUP BY CONCAT(ORIGIN,'_',DEST) ORDER BY COUNT(DEP_DELAY) DESC LIMIT 5 ")

In [6]:
sqlDF.show()

+--------+---------------+
|ORG_DEST|DEP_DELAY_COUNT|
+--------+---------------+
| ORD_LGA|            286|
| LAX_SFO|            245|
| LGA_ORD|            234|
| SFO_LAX|            193|
| BOS_LGA|            178|
+--------+---------------+



In [7]:
from pyspark.sql.functions import concat, col, lit
from pyspark.sql.types import DoubleType
df1 = df.select(concat(col("ORIGIN"), lit("_"), col("DEST")).alias("ORG_DEST"), 'DEP_DELAY')
df3= df1.withColumn("DEP_DELAY_Conv", col("DEP_DELAY").cast(DoubleType()))
df3.printSchema()

root
 |-- ORG_DEST: string (nullable = true)
 |-- DEP_DELAY: string (nullable = true)
 |-- DEP_DELAY_Conv: double (nullable = true)



In [8]:
from pyspark.ml.feature import Bucketizer

In [9]:
splits = [-float("inf"), 0.0, 40.0, float("inf")]

In [10]:
bucketizer = Bucketizer(splits=splits, inputCol="DEP_DELAY_Conv", outputCol="delayed")

In [11]:
df2 = bucketizer.transform(df3)

In [22]:
df_temp = df2.filter('DEP_DELAY_Conv IS NOT NULL')
df_temp.groupBy('delayed').count().show()

+-------+------+
|delayed| count|
+-------+------+
|    0.0|354929|
|    1.0|161465|
|    2.0| 51236|
+-------+------+



In [23]:
df2.createOrReplaceTempView('flights')

In [26]:
sqlDF = Spark.sql("SELECT Dep_DELAY_Conv,delayed, COUNT(delayed) as delayed FROM flights WHERE delayed<2.0 GROUP BY Dep_DELAY_Conv, delayed")

In [27]:
sqlDF.show()

+--------------+-------+-------+
|Dep_DELAY_Conv|delayed|delayed|
+--------------+-------+-------+
|          38.0|    1.0|   1104|
|         -15.0|    0.0|   2433|
|         -36.0|    0.0|      3|
|          10.0|    1.0|   4655|
|          -6.0|    0.0|  36491|
|          24.0|    1.0|   1985|
|           6.0|    1.0|   6131|
|          39.0|    1.0|   1115|
|          36.0|    1.0|   1241|
|         -23.0|    0.0|    133|
|          23.0|    1.0|   2173|
|          11.0|    1.0|   4231|
|         -47.0|    0.0|      2|
|          22.0|    1.0|   2198|
|         -38.0|    0.0|      1|
|           5.0|    1.0|   6906|
|          35.0|    1.0|   1304|
|          -4.0|    0.0|  43253|
|           1.0|    1.0|  12188|
|         -20.0|    0.0|    360|
+--------------+-------+-------+
only showing top 20 rows



In [30]:
fractions = {0.0: 0.13, 1.0: 0.13, 2.0: 1.0}
strain = df2.sampleBy("delayed", fractions)
(trainingdata, testdata) = strain.randomSplit([0.7, 0.3])
strain.groupBy('delayed').count().show()

+-------+-----+
|delayed|count|
+-------+-----+
|    0.0|45787|
|    1.0|20772|
|    2.0|51236|
+-------+-----+



In [31]:
from pyspark.mllib.stat import Statistics

In [32]:
df.printSchema()

root
 |-- DAY_OF_WEEK: string (nullable = true)
 |-- OP_UNIQUE_CARRIER: string (nullable = true)
 |-- ORIGIN_AIRPORT_ID: string (nullable = true)
 |-- ORIGIN_AIRPORT_SEQ_ID: string (nullable = true)
 |-- ORIGIN_CITY_MARKET_ID: string (nullable = true)
 |-- ORIGIN: string (nullable = true)
 |-- DEST_AIRPORT_ID: string (nullable = true)
 |-- DEST_AIRPORT_SEQ_ID: string (nullable = true)
 |-- DEST_CITY_MARKET_ID: string (nullable = true)
 |-- DEST: string (nullable = true)
 |-- CRS_DEP_TIME: string (nullable = true)
 |-- DEP_DELAY: string (nullable = true)
 |-- CRS_ARR_TIME: string (nullable = true)
 |-- ARR_DELAY: string (nullable = true)
 |-- CRS_ELAPSED_TIME: string (nullable = true)
 |-- DISTANCE: string (nullable = true)
 |-- _c16: string (nullable = true)



In [40]:
catergoricalColumns = df.select('OP_UNIQUE_CARRIER', 'ORIGIN', 'DEST', 'DAY_OF_WEEK', 'DISTANCE', concat(col("ORIGIN"), lit("_"), col("DEST")).alias("ORG_DEST"))

In [41]:
from pyspark.ml.feature import StringIndexer
from pyspark.ml import Pipeline

In [42]:
stringIndexer = [StringIndexer(inputCol=column, outputCol=column+'Indexed').fit(catergoricalColumns) for column in list(set(catergoricalColumns.columns))]

In [43]:
pipeline = Pipeline(stages=stringIndexer)

In [44]:
indexed = pipeline.fit(catergoricalColumns).transform(catergoricalColumns)

In [45]:
indexed.head(1)

[Row(OP_UNIQUE_CARRIER='9E', ORIGIN='MSP', DEST='CVG', DAY_OF_WEEK='1', DISTANCE='596.00', ORG_DEST='MSP_CVG', DAY_OF_WEEKIndexed=4.0, ORIGINIndexed=11.0, DISTANCEIndexed=521.0, ORG_DESTIndexed=1364.0, DESTIndexed=45.0, OP_UNIQUE_CARRIERIndexed=10.0)]

In [46]:
labeler = Bucketizer(splits=[-float("inf"), 0.0, 40.0, float("inf")], inputCol="DEP_DELAY_Conv", outputCol="label")

In [47]:
featureCols = indexed.select('OP_UNIQUE_CARRIERIndexed', 'DESTIndexed', 'ORIGINIndexed', 'DAY_OF_WEEKIndexed', 'ORG_DESTIndexed', 'DEST')

In [48]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.linalg import Vectors

In [49]:
assembler = VectorAssembler(inputCols=['OP_UNIQUE_CARRIERIndexed', 'DESTIndexed', 'ORIGINIndexed', 'DAY_OF_WEEKIndexed', 'ORG_DESTIndexed'], outputCol='features')

In [50]:
output = assembler.transform(indexed)

In [51]:
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

In [52]:
rf = RandomForestClassifier(labelCol="label", featuresCol="features")

In [57]:
pipeline = Pipeline(stages=[stringIndexer, labeler, assembler, rf])

In [58]:
model = pipeline.fit(trainingdata)

TypeError: Cannot recognize a pipeline stage of type <class 'list'>.