In [1]:
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.sql import SparkSession
from pyspark.ml import Pipeline
#from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.feature import VectorAssembler, StringIndexer, VectorIndexer, MinMaxScaler,OneHotEncoder,StandardScaler
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.evaluation import BinaryClassificationEvaluator

In [2]:
sc = SparkSession.builder.appName('hw4').getOrCreate()

In [3]:
customSchema = StructType([
  StructField("Year", IntegerType(), True),
  StructField("Month", IntegerType(), True),
  StructField("DayofMonth", IntegerType(), True),
  StructField("DayOfWeek", IntegerType(), True),
  StructField("DepTime", IntegerType(), True),
  StructField("CRSDepTime", IntegerType(), True),
  StructField("ArrTime", IntegerType(), True),
  StructField("CRSArrTime", IntegerType(), True),
  StructField("UniqueCarrier", StringType(), True),
  StructField("FlightNum", IntegerType(), True),
  StructField("TailNum", StringType(), True),
  StructField("ActualElapsedTime", IntegerType(), True),
  StructField("CRSElapsedTime", IntegerType(), True),
  StructField("AirTime", IntegerType(), True),
  StructField("ArrDelay", IntegerType(), True),
  StructField("DepDelay", IntegerType(), True),
  StructField("Origin", StringType(), True),
  StructField("Dest", StringType(), True),
  StructField("Distance", IntegerType(), True),
  StructField("TaxiIn", IntegerType(), True),
  StructField("TaxiOut", IntegerType(), True),
  StructField("Cancelled", IntegerType(), True),
  StructField("CancellationCode", StringType(), True),
  StructField("Diverted", IntegerType(), True),
  StructField("CarrierDelay", IntegerType(), True),
  StructField("WeatherDelay", IntegerType(), True),
  StructField("NASDelay", IntegerType(), True),
  StructField("SecurityDelay", IntegerType(), True),
  StructField("LateAircraftDelay", IntegerType(), True)]
)


# In[28]:


df0 = sc.read.csv('2000.csv', schema=customSchema, header=True)
df1 = sc.read.csv('2001.csv', schema=customSchema, header=True)
df2 = sc.read.csv('2002.csv', schema=customSchema, header=True)
df3 = sc.read.csv('2003.csv', schema=customSchema, header=True)
df4 = sc.read.csv('2004.csv', schema=customSchema, header=True)
df5 = sc.read.csv('2005.csv', schema=customSchema, header=True)


# In[29]:


df = df0.union(df1)
df = df.union(df2)
df = df.union(df3)
df = df.union(df4)
df = df.union(df5)

In [5]:
#df = df0.sample(withReplacement=False, fraction=0.001)

In [None]:
na_count = df.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in df.columns])

In [None]:
na_count.show()

+----+-----+----------+---------+-------+----------+-------+----------+-------------+---------+-------+-----------------+--------------+-------+--------+--------+------+----+--------+------+-------+---------+----------------+--------+------------+------------+--------+-------------+-----------------+
|Year|Month|DayofMonth|DayOfWeek|DepTime|CRSDepTime|ArrTime|CRSArrTime|UniqueCarrier|FlightNum|TailNum|ActualElapsedTime|CRSElapsedTime|AirTime|ArrDelay|DepDelay|Origin|Dest|Distance|TaxiIn|TaxiOut|Cancelled|CancellationCode|Diverted|CarrierDelay|WeatherDelay|NASDelay|SecurityDelay|LateAircraftDelay|
+----+-----+----------+---------+-------+----------+-------+----------+-------------+---------+-------+-----------------+--------------+-------+--------+--------+------+----+--------+------+-------+---------+----------------+--------+------------+------------+--------+-------------+-----------------+
|   0|    0|         0|        0| 846787|         0| 921499|         0|            0|        0

In [None]:
df.count()*0.4

15072236.8

In [None]:
nalist = df.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in df.columns]).collect()

In [None]:
someNull_cols = [df.columns[c] for c in range(len(df.columns)) if (nalist[0][c] < df.count()*0.4) & (nalist[0][c] > 0)]
nonNull_cols = [df.columns[c] for c in range(len(df.columns)) if (nalist[0][c] == 0)]

In [None]:
someNull_cols

['DepTime',
 'ArrTime',
 'TailNum',
 'ActualElapsedTime',
 'CRSElapsedTime',
 'AirTime',
 'ArrDelay',
 'DepDelay']

In [None]:
nonNull_cols

['Year',
 'Month',
 'DayofMonth',
 'DayOfWeek',
 'CRSDepTime',
 'CRSArrTime',
 'UniqueCarrier',
 'FlightNum',
 'Origin',
 'Dest',
 'Distance',
 'TaxiIn',
 'TaxiOut',
 'Cancelled',
 'Diverted']

In [None]:
#someNull_cols = [c for c in na_count.columns if (na_count[[c]].first()[c] < 15072237) & (na_count[[c]].first()[c] > 0)]
#nonNull_cols = [c for c in na_count.columns if na_count[[c]].first()[c] == 0]

In [None]:
df.groupby('TailNum').count().show()

+-------+-----+
|TailNum|count|
+-------+-----+
|  N6700| 6922|
| N919UA| 8899|
| N499AA| 6509|
| N502US| 5876|
|   N656| 8272|
| N516UA| 5381|
| N513UA| 5505|
| N102UW| 6742|
| N912TW| 1655|
| N385US| 8785|
| N240AU| 1909|
| N411US| 2927|
| N902DE| 8879|
| N407AA| 6237|
| N33637| 7436|
| N607NW| 7429|
| N745AS| 8989|
| N567AA| 5676|
| N2CAAA| 3038|
| N201US| 1344|
+-------+-----+
only showing top 20 rows



In [6]:
someNull_cols.remove('TailNum')

In [7]:
for i in someNull_cols:
    na_mean = df.select(mean(df[i])).collect()
    df = df.na.fill(na_mean[0][0],subset=[i])

In [8]:
clean_cols = nonNull_cols+someNull_cols
df = df.select(*clean_cols)
df = df.withColumnRenamed("Cancelled","label")

stages = []
str_col = ['UniqueCarrier','Origin','Dest',"FlightNum"]
for i in str_col:
    stringIndexer = StringIndexer(inputCol = i,outputCol = i+'Index')
    encoder_str = OneHotEncoder(inputCols = [stringIndexer.getOutputCol()],outputCols = [i+"_str_cat"])
    stages += [stringIndexer, encoder_str]

cat_col = ["Month","DayofMonth","DayOfWeek","Diverted"]
encoder_cat = OneHotEncoder(inputCols = cat_col,outputCols = [i+"_cat" for i in cat_col])
stages += [encoder_cat]

conti_col = ["DepTime","ArrTime","ActualElapsedTime","CRSElapsedTime","AirTime","ArrDelay","DepDelay",\
             "CRSDepTime","CRSArrTime","Distance","TaxiIn","TaxiOut"]
numVect = VectorAssembler(inputCols = conti_col, outputCol="numFeatures")
scaler = StandardScaler(inputCol=numVect.getOutputCol(), outputCol="scaledFeatures")
stages += [numVect, scaler]

all_col = [i+"_str_cat" for i in str_col] + [i+"_cat" for i in cat_col]+["scaledFeatures"]
assembler = VectorAssembler(inputCols=all_col, outputCol="features")
stages += [assembler]

pipeline = Pipeline(stages = stages)
pipelineModel = pipeline.fit(df)
df_clean = pipelineModel.transform(df)

In [9]:
train = df_clean.filter(df_clean.Year < 2005)
test = df_clean.filter(df_clean.Year == 2005)
test = test.withColumnRenamed("label","trueLabel")

lr = LogisticRegression(featuresCol = 'features', labelCol = 'label')

grid = ParamGridBuilder().addGrid(lr.regParam, [0.01,1]).build().addGrid(lr.maxIter, [10, 20]).build()
cv = CrossValidator(estimator = lr_pipeline,estimatorParamMaps = grid,evaluator = BinaryClassificationEvaluator(),numFolds = 3)
cvModel = cv.fit(train)

Py4JJavaError: An error occurred while calling o1235.fit.
: org.apache.spark.SparkException: Job 16 cancelled because SparkContext was shut down
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$cleanUpAfterSchedulerStop$1(DAGScheduler.scala:979)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$cleanUpAfterSchedulerStop$1$adapted(DAGScheduler.scala:977)
	at scala.collection.mutable.HashSet.foreach(HashSet.scala:79)
	at org.apache.spark.scheduler.DAGScheduler.cleanUpAfterSchedulerStop(DAGScheduler.scala:977)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onStop(DAGScheduler.scala:2257)
	at org.apache.spark.util.EventLoop.stop(EventLoop.scala:84)
	at org.apache.spark.scheduler.DAGScheduler.stop(DAGScheduler.scala:2170)
	at org.apache.spark.SparkContext.$anonfun$stop$12(SparkContext.scala:1973)
	at org.apache.spark.util.Utils$.tryLogNonFatalError(Utils.scala:1357)
	at org.apache.spark.SparkContext.stop(SparkContext.scala:1973)
	at org.apache.spark.SparkContext.$anonfun$new$35(SparkContext.scala:631)
	at org.apache.spark.util.SparkShutdownHook.run(ShutdownHookManager.scala:214)
	at org.apache.spark.util.SparkShutdownHookManager.$anonfun$runAll$2(ShutdownHookManager.scala:188)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1932)
	at org.apache.spark.util.SparkShutdownHookManager.$anonfun$runAll$1(ShutdownHookManager.scala:188)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at scala.util.Try$.apply(Try.scala:213)
	at org.apache.spark.util.SparkShutdownHookManager.runAll(ShutdownHookManager.scala:188)
	at org.apache.spark.util.SparkShutdownHookManager$$anon$2.run(ShutdownHookManager.scala:178)
	at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)
	at java.util.concurrent.FutureTask.run(Unknown Source)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
	at java.lang.Thread.run(Unknown Source)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:775)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2099)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2194)
	at org.apache.spark.rdd.RDD.$anonfun$fold$1(RDD.scala:1157)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:388)
	at org.apache.spark.rdd.RDD.fold(RDD.scala:1151)
	at org.apache.spark.rdd.RDD.$anonfun$treeAggregate$1(RDD.scala:1220)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:388)
	at org.apache.spark.rdd.RDD.treeAggregate(RDD.scala:1196)
	at org.apache.spark.ml.classification.LogisticRegression.$anonfun$train$1(LogisticRegression.scala:499)
	at org.apache.spark.ml.util.Instrumentation$.$anonfun$instrumented$1(Instrumentation.scala:191)
	at scala.util.Try$.apply(Try.scala:213)
	at org.apache.spark.ml.util.Instrumentation$.instrumented(Instrumentation.scala:191)
	at org.apache.spark.ml.classification.LogisticRegression.train(LogisticRegression.scala:487)
	at org.apache.spark.ml.classification.LogisticRegression.train(LogisticRegression.scala:482)
	at org.apache.spark.ml.classification.LogisticRegression.train(LogisticRegression.scala:281)
	at org.apache.spark.ml.Predictor.fit(Predictor.scala:150)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
	at java.lang.reflect.Method.invoke(Unknown Source)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Unknown Source)


In [None]:
prediction = cvModel.transform(test)
predicted = prediction.select("prediction", "trueLabel")

In [None]:
tp = float(prediction.filter("prediction == 1.0 AND truelabel == 1").count())
fp = float(prediction.filter("prediction == 1.0 AND truelabel == 0").count())
tn = float(prediction.filter("prediction == 0.0 AND truelabel == 0").count())
fn = float(prediction.filter("prediction == 0.0 AND truelabel == 1").count())
pr = tp / (tp + fp)
re = tp / (tp + fn)
metrics = spark.createDataFrame([
 ("TP", tp),
 ("FP", fp),
 ("TN", tn),
 ("FN", fn),
 ("Precision", pr),
 ("Recall", re),
 ("F1", 2*pr*re/(re+pr)),
 ],["metric", "value"])
metrics.show()

In [None]:
evaluator = BinaryClassificationEvaluator(labelCol="trueLabel", rawPredictionCol="prediction", metricName="areaUnderROC")
auc = evaluator.evaluate(prediction)
print( "AUC = ", auc)

In [None]:
cvPrediction = cvModel.transform(test)


# In[ ]:


tp2 = float(cvPrediction.filter("prediction == 1.0 AND truelabel == 1").count())
fp2 = float(cvPrediction.filter("prediction == 1.0 AND truelabel == 0").count())
tn2 = float(cvPrediction.filter("prediction == 0.0 AND truelabel == 0").count())
fn2 = float(cvPrediction.filter("prediction == 0.0 AND truelabel == 1").count())
pr2 = tp2 / (tp2 + fp2)
re2 = tp2 / (tp2 + fn2)
acc2 = (tp2+tn2)/(tp2+fp2+tn2+fn2)
metrics2 = sc.createDataFrame([
 ("TP", tp2),
 ("FP", fp2),
 ("TN", tn2),
 ("FN", fn2),
 ("Accuracy", acc2),
 ("Precision", pr2),
 ("Recall", re2),
 ("F1", 2*pr2*re2/(re2+pr2))],["metric", "value"])
metrics2.show()

+---------+--------------------+
|   metric|               value|
+---------+--------------------+
|       TP|                 2.0|
|       FP|                 8.0|
|       TN|              1621.0|
|       FN|                53.0|
| Accuracy|  0.9637767220902613|
|Precision|                 0.2|
|   Recall| 0.03636363636363636|
|       F1|0.061538461538461535|
+---------+--------------------+



In [None]:
evaluator2 = BinaryClassificationEvaluator(labelCol="trueLabel", rawPredictionCol="prediction", metricName="areaUnderROC")
auc2 = evaluator2.evaluate(cvPrediction)
print( "AUC = ", auc2)

AUC =  0.5157263240136168
