In [1]:
cd drive/MyDrive/學業/巨量/HW2/

/content/drive/MyDrive/學業/巨量/HW2


In [2]:
import os

# Install java
! apt-get update -qq
!apt-get -y install openjdk-8-jre-headless

os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["PATH"] = os.environ["JAVA_HOME"] + "/bin:" + os.environ["PATH"]
! java -version

# Install pyspark
! pip install --ignore-installed -q pyspark==2.4.4
! pip install --ignore-installed -q spark-nlp==2.6.3

Reading package lists... Done
Building dependency tree       
Reading state information... Done
Suggested packages:
  libnss-mdns fonts-dejavu-extra fonts-ipafont-gothic fonts-ipafont-mincho
  fonts-wqy-microhei fonts-wqy-zenhei fonts-indic
The following NEW packages will be installed:
  openjdk-8-jre-headless
0 upgraded, 1 newly installed, 0 to remove and 16 not upgraded.
Need to get 28.2 MB of archives.
After this operation, 104 MB of additional disk space will be used.
Get:1 http://archive.ubuntu.com/ubuntu bionic-updates/universe amd64 openjdk-8-jre-headless amd64 8u275-b01-0ubuntu1~18.04 [28.2 MB]
Fetched 28.2 MB in 3s (10.6 MB/s)
Selecting previously unselected package openjdk-8-jre-headless:amd64.
(Reading database ... 145480 files and directories currently installed.)
Preparing to unpack .../openjdk-8-jre-headless_8u275-b01-0ubuntu1~18.04_amd64.deb ...
Unpacking openjdk-8-jre-headless:amd64 (8u275-b01-0ubuntu1~18.04) ...
Setting up openjdk-8-jre-headless:amd64 (8u275-b01-0ubunt

In [3]:
from pyspark import SparkContext 
from pyspark.sql import SQLContext
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql.types import *
from pyspark.ml.classification import LogisticRegression, GBTClassifier
from pyspark.mllib.util import MLUtils
from pyspark.ml.feature import OneHotEncoder, StringIndexer, StringIndexerModel, StandardScaler, Imputer, VectorAssembler, SQLTransformer
from pyspark.mllib.evaluation import BinaryClassificationMetrics, MulticlassMetrics
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder, CrossValidatorModel
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml import Pipeline, PipelineModel
from pyspark.ml.linalg import Vectors
spark = SparkSession.builder.master("local").getOrCreate()
sc = SparkContext.getOrCreate()
sqlContext = SQLContext(sc)

In [4]:
# List numerical features & categorical features
target_col = "Cancelled"
highest_cols = ["Month", "DayOfWeek", "UniqueCarrier", "FlightNum", "Origin", "Dest"]
mean_cols = ["Year", "DayofMonth", "CRSDepTime", "CRSArrTime", "CRSElapsedTime", "Distance", "TaxiIn", "TaxiOut"]

In [5]:
def load(path):
  # Load DataFrame
  df = sqlContext.read.format('com.databricks.spark.csv').options(header='true').load(path)

  # Select useful columns (drop columns that should not be known 
  # before the flight take place) 
  df = df.select([target_col] + mean_cols + highest_cols)

  # Impute numerical features
  for col in mean_cols:
      df = df.withColumn(col, df[col].cast('double'))
      mu = df.select(col).agg({col:'mean'}).collect()[0][0]
      df = df.withColumn(col, F.when(df[col].isNull(), mu).otherwise(df[col]))
  df = df.withColumn('label', df[target_col].cast('double'))
  df = df.filter(df['label'].isNotNull())

  # Impute categorical features
  for col in highest_cols:
      frq = df.select(col).groupby(col).count().orderBy('count', ascending=False).limit(1).collect()[0][0]
      df = df.withColumn(col, F.when((df[col].isNull() | (df[col] == '')), frq).otherwise(df[col]))

  # Assure there is no missing values
  for col in mean_cols + highest_cols + ['label']:
      assert df.filter(df[col].isNull()).count() == 0, "Column '{}' exists NULL value(s)".format(col)
      assert df.filter(df[col] == '').count() == 0, "Column '{}' exists empty string(s)".format(col)
  return df


In [6]:
def preprocess_train(df):
  # String Indexing for categorical features
  indexers = [StringIndexer(inputCol=col, outputCol="{}_idx".format(col)) for col in highest_cols]   
  # One-hot encoding for categorical features
  encoders = [OneHotEncoder(inputCol="{}_idx".format(col), outputCol="{}_oh".format(col)) for col in highest_cols]
  # Concat Feature Columns
  assembler = VectorAssembler(inputCols = mean_cols + ["{}_oh".format(col) for col in highest_cols], outputCol = "_features")
  
  # Standardize Features
  scaler = StandardScaler(inputCol='_features', outputCol='features', withStd=True, withMean=False)
  preprocess = Pipeline(stages = indexers + encoders + [assembler, scaler]).fit(df)
  return preprocess


In [7]:
def preprocess_test(df, preprocess):
  dic = {x._java_obj.getInputCol():
    [lab for lab in x._java_obj.labels()] for x in preprocess.stages if isinstance(x, StringIndexerModel)}

  # Filter out unseen labels 
  for col in highest_cols:
    df = df.filter(F.col(col).isin(dic[col]))
  
  # Assure there is no unseen values
  for col in highest_cols:
    assert df.filter(F.col(col).isin(dic[col]) == False).count() == 0, "Column '{}' exists unseen label(s)".format(col)
  df = preprocess.transform(df)
  return df


In [8]:
def evaluate(predictionAndLabels):
  log = {}
  # Show Validation Score (AUROC)
  evaluator = BinaryClassificationEvaluator(metricName='areaUnderROC')
  #log['AUROC'] = "%f" % evaluator.evaluate(predictionAndLabels)
  print("Area Under ROC: " + str(evaluator.evaluate(predictionAndLabels)))

  # Show Validation Score (AUPR)
  evaluator = BinaryClassificationEvaluator(metricName='areaUnderPR')
  print("Area Under PR: " + str(evaluator.evaluate(predictionAndLabels)))

  # Metrics
  predictionRDD = predictionAndLabels.select(['label', 'prediction']).rdd.map(lambda line: (line[1], line[0]))
  metrics = MulticlassMetrics(predictionRDD)

  # Confusion Matrix
  print("Confusion Matrix:")
  print(metrics.confusionMatrix().toArray())

  # Overall statistics
  log['precision'] = "%s" % metrics.precision()
  log['recall'] = "%s" % metrics.recall()
  log['F1 Measure'] = "%s" % metrics.fMeasure()
  print("[Overall]\tprecision = %s | recall = %s | F1 Measure = %s" % (log['precision'], log['recall'], log['F1 Measure']))

  # Statistics by class
  labels = [0.0, 1.0]
  for label in sorted(labels):
    log[label] = {}
    log[label]['precision'] = "%s" % metrics.precision(label)
    log[label]['recall'] = "%s" % metrics.recall(label)
    log[label]['F1 Measure'] = "%s" % metrics.fMeasure(label, beta=1.0)
    print("[Class %s]\tprecision = %s | recall = %s | F1 Measure = %s" % (label, log[label]['precision'], log[label]['recall'], log[label]['F1 Measure']))
  return log


In [12]:
df = load("./Q2/*.csv")
df_test = load("./test/2005.csv")

KeyboardInterrupt: ignored

In [9]:
from pyspark import SparkConf
conf = SparkConf()
#conf.set("spark.sql.execution.arrow.enabled", "true")
spark.conf.set("spark.sql.execution.arrow.enabled", "true")

In [10]:
# Load DataFrame
df = sqlContext.read.format('com.databricks.spark.csv').options(header='true').load("./Q2/*.csv")

In [18]:
df.show(5)

+---------+------+----------+----------+----------+--------------+--------+------+-------+-----+---------+-------------+---------+------+----+
|Cancelled|  Year|DayofMonth|CRSDepTime|CRSArrTime|CRSElapsedTime|Distance|TaxiIn|TaxiOut|Month|DayOfWeek|UniqueCarrier|FlightNum|Origin|Dest|
+---------+------+----------+----------+----------+--------------+--------+------+-------+-----+---------+-------------+---------+------+----+
|        0|2004.0|        12|       630|       915|           105|     599|     7|     11|    1|        1|           UA|      462|   ORD| CLT|
|        0|2004.0|        13|       630|       915|           105|     599|    16|     16|    1|        2|           UA|      462|   ORD| CLT|
|        0|2004.0|        14|       630|       915|           105|     599|     4|     15|    1|        3|           UA|      462|   ORD| CLT|
|        0|2004.0|        15|       630|       915|           105|     599|     4|     10|    1|        4|           UA|      462|   ORD| CLT|

In [11]:
# Select useful columns (drop columns that should not be known 
# before the flight take place) 
df = df.select([target_col] + mean_cols + highest_cols)

# Impute numerical features
for col in mean_cols:
  df = df.withColumn(col, df[col].cast('double'))
  mu = df.select(col).agg({col:'mean'}).collect()[0][0]
  df = df.withColumn(col, F.when(df[col].isNull(), mu).otherwise(df[col]))
df = df.withColumn('label', df[target_col].cast('double'))
df = df.filter(df['label'].isNotNull())

# Impute categorical features
for col in highest_cols:
  frq = df.select(col).groupby(col).count().orderBy('count', ascending=False).limit(1).collect()[0][0]
  df = df.withColumn(col, F.when((df[col].isNull() | (df[col] == '')), frq).otherwise(df[col]))

# Assure there is no missing values
for col in mean_cols + highest_cols + ['label']:
  assert df.filter(df[col].isNull()).count() == 0, "Column '{}' exists NULL value(s)".format(col)
  assert df.filter(df[col] == '').count() == 0, "Column '{}' exists empty string(s)".format(col)

In [12]:
df = df.withColumn("Cancelled", F.col("Cancelled").cast(IntegerType()))

In [13]:
print("train preprocess")
#preprocess = preprocess_train(df)
preprocess = PipelineModel.load("./output/preprocess")  
df = preprocess.transform(df)
#print("save model")
#preprocess.write().overwrite().save("./output/preprocess")

train preprocess


In [None]:
df.show(5)

+---------+------+----------+----------+----------+--------------+--------+------+-------+-----+---------+-------------+---------+------+----+-----+
|Cancelled|  Year|DayofMonth|CRSDepTime|CRSArrTime|CRSElapsedTime|Distance|TaxiIn|TaxiOut|Month|DayOfWeek|UniqueCarrier|FlightNum|Origin|Dest|label|
+---------+------+----------+----------+----------+--------------+--------+------+-------+-----+---------+-------------+---------+------+----+-----+
|        0|2004.0|      12.0|     630.0|     915.0|         105.0|   599.0|   7.0|   11.0|    1|        1|           UA|      462|   ORD| CLT|  0.0|
|        0|2004.0|      13.0|     630.0|     915.0|         105.0|   599.0|  16.0|   16.0|    1|        2|           UA|      462|   ORD| CLT|  0.0|
|        0|2004.0|      14.0|     630.0|     915.0|         105.0|   599.0|   4.0|   15.0|    1|        3|           UA|      462|   ORD| CLT|  0.0|
|        0|2004.0|      15.0|     630.0|     915.0|         105.0|   599.0|   4.0|   10.0|    1|        4|

In [None]:
print((df.count(), len(df.columns)))

In [14]:
lr = LogisticRegression(maxIter=5)#10^5
#paramGrid = (ParamGridBuilder().addGrid(gbt.maxDepth, [2, 4, 6]).addGrid(gbt.maxBins, [20, 60]).addGrid(gbt.maxIter, [10, 20]).build())
# Train the 10-fold Cross Validator
print("start training")
cvModel = CrossValidator(estimator=Pipeline(stages = [lr]), estimatorParamMaps=ParamGridBuilder().addGrid(lr.regParam, [0.1, 0.01]).build(), evaluator=BinaryClassificationEvaluator(metricName='areaUnderPR'), numFolds=3)

start training


In [15]:
#切割training data 分開跑
split = ['2000', '2001', '2002', '2003', '2004']
for i in split:
  print(i)
  df_train = df.filter(F.col('Year')==i)
  cvModel.fit(df_train)
  cvModel.bestModel.write().overwrite().save("./output/model")

2000
2001


AttributeError: ignored

In [None]:
df.printSchema

<bound method DataFrame.printSchema of DataFrame[Cancelled: int, Year: double, DayofMonth: double, CRSDepTime: double, CRSArrTime: double, CRSElapsedTime: double, Distance: double, TaxiIn: double, TaxiOut: double, Month: string, DayOfWeek: string, UniqueCarrier: string, FlightNum: string, Origin: string, Dest: string, label: double, Month_idx: double, DayOfWeek_idx: double, UniqueCarrier_idx: double, FlightNum_idx: double, Origin_idx: double, Dest_idx: double, Month_oh: vector, DayOfWeek_oh: vector, UniqueCarrier_oh: vector, FlightNum_oh: vector, Origin_oh: vector, Dest_oh: vector, _features: vector, features: vector]>

In [None]:
gbt = GBTClassifier(maxIter=5, maxDepth=2, labelCol="Cancelled")#10^5
paramGrid = (ParamGridBuilder().addGrid(gbt.maxDepth, [2, 4, 6]).addGrid(gbt.maxBins, [20, 60]).addGrid(gbt.maxIter, [10, 20]).build())
print("start training")
cvModel = CrossValidator(estimator=Pipeline(stages = [gbt]), estimatorParamMaps=paramGrid, evaluator=BinaryClassificationEvaluator(metricName='areaUnderPR'), numFolds=5).fit(df)

start training


In [None]:
cvModel.bestModel.write().overwrite().save("./output/model")

In [16]:
predictionAndLabels = cvModel.transform(df)
log = evaluate(predictionAndLabels)
with open('./output/log/train.json'.format(), 'w') as f:
    json.dump(log, f)

Area Under ROC: 0.9968500387905483
Area Under PR: 0.935165250078613
Confusion Matrix:
[[2.9824591e+07 2.3480000e+03]
 [4.8983700e+05 2.2322000e+05]]
[Overall]	precision = 0.9838839206134801 | recall = 0.9838839206134801 | F1 Measure = 0.9838839206134801


KeyboardInterrupt: ignored

In [17]:
df_test = load("./test/2005.csv")
df_test = preprocess_test(df_test, preprocess)

In [19]:
predictionAndLabels = cvModel.transform(df_test)
#predictionAndLabels.write.format('json').write().overwrite().save("./HW4/output/result")

In [20]:
log = evaluate(predictionAndLabels)
with open('./log/test.json', 'w') as f:
    json.dump(log, f)

Area Under ROC: 0.9971801952575035
Area Under PR: 0.9278206546670063
Confusion Matrix:
[[6.943669e+06 6.200000e+01]
 [1.003530e+05 3.291400e+04]]
[Overall]	precision = 0.9858110741305848 | recall = 0.9858110741305848 | F1 Measure = 0.9858110741305848
[Class 0.0]	precision = 0.9857534516502078 | recall = 0.9999910710826787 | F1 Measure = 0.9928212201059026
[Class 1.0]	precision = 0.9981198447355653 | recall = 0.2469778714910668 | F1 Measure = 0.39597456735020425


FileNotFoundError: ignored