In [1]:
###########
# 載入套件 #
###########
import findspark
findspark.init()
import pyspark
findspark.find()
import pandas as pd
from pyspark import SparkConf
from pyspark.sql import  SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler,StandardScaler
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
conf = SparkConf()
conf.set('spark.executor.memory', '32g')
spark = SparkSession.builder \
        .config(conf=conf) \
        .appName('test') \
        .getOrCreate()

In [2]:
############################
# 建立Spark相關語法&讀取資料 #
############################
data1 = spark.read.csv('./2003.csv', header = True, sep = ',',inferSchema=True)
data2 = spark.read.csv("./2004.csv", header = True, sep = ",",inferSchema=True)
train_data = data2.union(data1)
valid_data = spark.read.csv('./2005.csv', header = True, sep = ',',inferSchema=True)

In [3]:
#######################
# 定義Target variable #
#######################
train_data = train_data.withColumn('Delay', 
when((train_data.ArrDelay <0) & (train_data.DepDelay<0), 1).otherwise(0))
valid_data = valid_data.withColumn('Delay', 
when((valid_data.ArrDelay <0) & (valid_data.DepDelay<0), 1).otherwise(0))

In [4]:
#####################################
# 找出本次要使用的欄位並進行缺失值填補 #
#####################################
target = ["Delay"]
numeric = ['Year', 'Month', 'DayofMonth', 'DayOfWeek', 'CRSDepTime', 'CRSArrTime', 'CRSElapsedTime', 'Distance']
category = ['UniqueCarrier', 'FlightNum', 'TailNum', 'Origin', 'Dest', 'Cancelled']
train_data = train_data.select(list(target+numeric+category))
valid_data = valid_data.select(list(target+numeric+category))

def fill_null(df):
    for col in numeric:
        df = df.withColumn(col, df[col].cast('double'))
        mean = df.select(col).agg({col:'mean'}).collect()[0][0]
        df = df.withColumn(col, when(df[col].isNull(), mean).otherwise(df[col]))
    for col in category:
        freq = df.select(col).groupby(col).count().orderBy('count', ascending=False).limit(1).collect()[0][0]
        df = df.withColumn(col, when((df[col].isNull() | (df[col] == '')), freq).otherwise(df[col]))
    return df

train_data = fill_null(train_data)
valid_data = fill_null(valid_data)

In [5]:
train_data.count()

13617810

In [6]:
valid_data.count()

7140596

In [7]:
for c in category:
    ls = [i.__getitem__(c) for i in train_data.select(c).distinct().collect()]
    valid_data = valid_data.filter(valid_data[c].isin(ls))

In [8]:
# ls = [i.__getitem__('UniqueCarrier') for i in train_data.select('UniqueCarrier').distinct().collect()]

In [9]:
valid_data.count()

6852952

In [10]:
train_data.groupby("Delay").count().show()

+-----+-------+
|Delay|  count|
+-----+-------+
|    1|4623786|
|    0|8994024|
+-----+-------+



In [11]:
################
# Oversampling #
################

# major_df = train_data.filter(col("Delay") == 0)
# minor_df = train_data.filter(col("Delay") == 1)
# ratio = major_df.count()/minor_df.count()-1
# oversampled_df = minor_df.sample(False, ratio,7)
# train_data2 = major_df.union(oversampled_df)
# train_data = train_data2.union(minor_df)

In [12]:
# train_data.groupby("Delay").count().show()

+-----+-------+
|Delay|  count|
+-----+-------+
|    1|8994575|
|    0|8994024|
+-----+-------+



In [13]:
#######################################
# 針對category欄位進行one_hot_encoding #
#######################################
def preprocessing(df):
    indexer = [StringIndexer(inputCol=col,outputCol="{}_index".format(col))for col in category]
    encoder = [OneHotEncoder(inputCol="{}_index".format(col),outputCol="{}_one_hot".format(col)) for col in category]
    assembler = VectorAssembler(inputCols = numeric +["{}_one_hot".format(col) for col in category],outputCol = "features")
    preprocessor = Pipeline(stages = indexer + encoder+[assembler]).fit(df)
    return preprocessor


In [14]:
preprocessor = preprocessing(train_data)
train_data = preprocessor.transform(train_data)
valid_data = preprocessor.transform(valid_data)

In [15]:
lr = LogisticRegression(maxIter=10^3,regParam=0.1,featuresCol="features",
labelCol="Delay")
lrmodel = lr.fit(train_data)
train_pred = lrmodel.transform(train_data)
valid_pred = lrmodel.transform(valid_data)

In [16]:
eval_precision = MulticlassClassificationEvaluator(labelCol="Delay", predictionCol="prediction", metricName="precisionByLabel")
eval_recall = MulticlassClassificationEvaluator(labelCol="Delay", predictionCol="prediction", metricName="recallByLabel")
eval_f1 = MulticlassClassificationEvaluator(labelCol="Delay", predictionCol="prediction", metricName="f1")
eval_accuracy = MulticlassClassificationEvaluator(labelCol="Delay", predictionCol="prediction", metricName="accuracy")

In [17]:
# ################################
# # training result OverSampling #
# ################################
# print("train_precision: ",eval_precision.evaluate(train_pred))
# print("train_recall: ",eval_recall.evaluate(train_pred))
# print("train_f1score: ",eval_f1.evaluate(train_pred))
# print("train_auc: ",eval_accuracy.evaluate(train_pred))
# ##############################
# # valid result  OverSampling #
# ##############################
# print("valid_precision: ",eval_precision.evaluate(valid_pred))
# print("valid_recall: ",eval_recall.evaluate(valid_pred))
# print("valid_f1score: ", eval_f1.evaluate(valid_pred))
# print("valid_accuracy: ",eval_accuracy.evaluate(valid_pred))

train_precision:  0.9279557072862297
train_recall:  0.3322259313517509
train_f1score:  0.6133910193234708
train_auc:  0.6532269689262626
valid_precision:  0.8619001015110084
valid_recall:  0.3108978472182377
valid_f1score:  0.4878977598721202
valid_accuracy:  0.507643129559349


In [None]:
###################
# training result #
###################
print("train_precision: ",eval_precision.evaluate(train_pred))
print("train_recall: ",eval_recall.evaluate(train_pred))
print("train_f1score: ",eval_f1.evaluate(train_pred))
print("train_auc: ",eval_accuracy.evaluate(train_pred))
################
# valid result #
################
print("valid_precision: ",eval_precision.evaluate(valid_pred))
print("valid_recall: ",eval_recall.evaluate(valid_pred))
print("valid_f1score: ", eval_f1.evaluate(valid_pred))
print("valid_accuracy: ",eval_accuracy.evaluate(valid_pred))


train_precision:  0.6789184943257511
train_recall:  0.9615665913277527
train_f1score:  0.5915148567029522
train_auc:  0.6742689169550757
valid_precision:  0.6792266763454395
valid_recall:  0.9630940122349099
valid_f1score:  0.583343397244667
valid_accuracy:  0.6723429552694955


In [None]:
df = valid_pred.select("Delay","prediction")

In [None]:
df = df.withColumn("confusion", when((df.Delay==1)&(df.prediction==1),"TN")
.when((df.Delay==0)&(df.prediction==1),"FN")
.when((df.Delay==0)&(df.prediction==0),"TP")
.when((df.Delay==1)&(df.prediction==0),"FP"))

In [None]:
df.groupBy("confusion").count().show()

+---------+-------+
|confusion|  count|
+---------+-------+
|       TP|4307252|
|       TN| 296022|
|       FN| 259026|
|       FP|1990652|
+---------+-------+

