In [1]:
from pyspark.sql import SQLContext
from pyspark.sql.types import *
from pyspark.sql import Row
from pyspark.mllib.regression import LabeledPoint
from pyspark.sql.functions import udf
from pyspark.mllib.linalg import Vectors
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.param import Param, Params
from pyspark.mllib.classification import LogisticRegressionWithLBFGS, LogisticRegressionModel
from pyspark.mllib.regression import LabeledPoint
from pyspark.mllib.stat import Statistics
from pyspark.ml.feature import OneHotEncoder, StringIndexer
from pyspark.mllib.linalg import Vectors
from pyspark.ml.feature import VectorAssembler
from IPython.display import display
from ipywidgets import interact
from pyspark.sql.functions import *
from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType
from operator import add
import sys
import numpy as np
import pandas as pd
import time
import datetime
from pyspark.mllib.tree import RandomForest, RandomForestModel

from pyspark import SparkContext
from pyspark.sql import SparkSession

sc = SparkContext("local",'app')
spark = SparkSession.builder.appName('name').config('spark.sql.shuffle.partitions',10).getOrCreate()

# 1. load data
## 1.1 big trainset

In [2]:
data=spark.read.csv('data/train_flight.csv',header=True,inferSchema=True)
data.printSchema()

root
 |-- DAY_OF_WEEK: integer (nullable = true)
 |-- AIRLINE: string (nullable = true)
 |-- ORIGIN_AIRPORT: string (nullable = true)
 |-- DESTINATION_AIRPORT: string (nullable = true)
 |-- DEPARTURE_TIME: integer (nullable = true)
 |-- DEPARTURE_DELAY: integer (nullable = true)
 |-- SCHEDULED_TIME: integer (nullable = true)
 |-- ELAPSED_TIME: integer (nullable = true)
 |-- DISTANCE: integer (nullable = true)
 |-- SCHEDULED_ARRIVAL: integer (nullable = true)
 |-- ARRIVAL_TIME: integer (nullable = true)
 |-- ARRIVAL_DELAY: integer (nullable = true)
 |-- schedule_departure: integer (nullable = true)
 |-- NEW_DAY: integer (nullable = true)



## 1.2 small_trainset

In [2]:
data_small=spark.read.csv('data/small.csv',header=True,inferSchema=True)
data_small.count()

45616

## 1.3 choose one dataset

In [89]:
 # can be updated
#dataset=data_small
dataset=data
print(dataset.count())
dataset.printSchema()
#dataset.select("SCHEDULED_TIME",'schedule_departure').show()

4571729
root
 |-- DAY_OF_WEEK: integer (nullable = true)
 |-- AIRLINE: string (nullable = true)
 |-- ORIGIN_AIRPORT: string (nullable = true)
 |-- DESTINATION_AIRPORT: string (nullable = true)
 |-- DEPARTURE_TIME: integer (nullable = true)
 |-- DEPARTURE_DELAY: integer (nullable = true)
 |-- SCHEDULED_TIME: integer (nullable = true)
 |-- ELAPSED_TIME: integer (nullable = true)
 |-- DISTANCE: integer (nullable = true)
 |-- SCHEDULED_ARRIVAL: integer (nullable = true)
 |-- ARRIVAL_TIME: integer (nullable = true)
 |-- ARRIVAL_DELAY: integer (nullable = true)
 |-- schedule_departure: integer (nullable = true)
 |-- NEW_DAY: integer (nullable = true)

+--------------+------------------+
|SCHEDULED_TIME|schedule_departure|
+--------------+------------------+
|           110|               375|
|           110|               375|
|           111|               480|
|           111|               480|
|           111|               480|
|           109|               636|
|           109|      

In [90]:
# just try airport clustering
dataset=dataset.filter(dataset['AIRLINE']=='AA')
dataset.count()

570501

# 0.1 change label to classification

# 0.2 change label to doubletype


In [91]:
from pyspark.sql.functions import UserDefinedFunction
from pyspark.sql.types import DoubleType,IntegerType
name = 'DEPARTURE_DELAY'

udf = UserDefinedFunction(lambda x: x*1.0, DoubleType())
new_data=dataset.select('*',udf(dataset['DEPARTURE_DELAY']).alias('double_labels'))
dataset=new_data.drop('DEPARTURE_DELAY')
dataset=dataset.withColumnRenamed('double_labels','DEPARTURE_DELAY')
dataset.printSchema()
# create a new feature
udf = UserDefinedFunction(lambda x: x*x, IntegerType())
new_data=dataset.select('*',udf(dataset['Schedule_departure']).alias('square_Schedule'))
dataset=new_data
dataset.printSchema()

root
 |-- DAY_OF_WEEK: integer (nullable = true)
 |-- AIRLINE: string (nullable = true)
 |-- ORIGIN_AIRPORT: string (nullable = true)
 |-- DESTINATION_AIRPORT: string (nullable = true)
 |-- DEPARTURE_TIME: integer (nullable = true)
 |-- SCHEDULED_TIME: integer (nullable = true)
 |-- ELAPSED_TIME: integer (nullable = true)
 |-- DISTANCE: integer (nullable = true)
 |-- SCHEDULED_ARRIVAL: integer (nullable = true)
 |-- ARRIVAL_TIME: integer (nullable = true)
 |-- ARRIVAL_DELAY: integer (nullable = true)
 |-- schedule_departure: integer (nullable = true)
 |-- NEW_DAY: integer (nullable = true)
 |-- DEPARTURE_DELAY: double (nullable = true)

root
 |-- DAY_OF_WEEK: integer (nullable = true)
 |-- AIRLINE: string (nullable = true)
 |-- ORIGIN_AIRPORT: string (nullable = true)
 |-- DESTINATION_AIRPORT: string (nullable = true)
 |-- DEPARTURE_TIME: integer (nullable = true)
 |-- SCHEDULED_TIME: integer (nullable = true)
 |-- ELAPSED_TIME: integer (nullable = true)
 |-- DISTANCE: integer (nullabl

# 2 feature transformation pipeline
## 2.1 feature selection (can be updated)

In [92]:
# dataset=data
categoricalColumns = ['ORIGIN_AIRPORT']  # to add
numericCols = ['schedule_departure','NEW_DAY','square_Schedule']  # to add
# all_features=categoricalColumns+numericCols

## 2.2 transform and onehot

In [93]:

from pyspark.sql.types import *
from pyspark.ml import Pipeline
from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler
cols=dataset.columns

stages = [] 
feature_names=[]
for categoricalCol in categoricalColumns:
    stringIndexer = StringIndexer(inputCol=categoricalCol, 
        outputCol=categoricalCol+"Index")
    encoder = OneHotEncoder(inputCol=categoricalCol+"Index", 
        outputCol=categoricalCol+"classVec")
    stages += [stringIndexer, encoder]

assemblerInputs = map(lambda c: c + "classVec", categoricalColumns) + numericCols
assembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features")
stages += [assembler]
pipeline = Pipeline(stages=stages)
pipelineModel = pipeline.fit(dataset)
dataset_transformed = pipelineModel.transform(dataset)
selectedcols = ['DEPARTURE_DELAY', "features"] 
dataset_transformed = dataset_transformed.select(selectedcols)
dataset_transformed=dataset_transformed.select('*').withColumnRenamed('DEPARTURE_DELAY','label')
dataset_transformed.printSchema()
dataset_transformed.select('features').show()

root
 |-- label: double (nullable = true)
 |-- features: vector (nullable = true)

+--------------------+
|            features|
+--------------------+
|(189,[141,186,187...|
|(189,[141,186,187...|
|(189,[141,186,187...|
|(189,[141,186,187...|
|(189,[141,186,187...|
|(189,[141,186,187...|
|(189,[141,186,187...|
|(189,[141,186,187...|
|(189,[141,186,187...|
|(189,[141,186,187...|
|(189,[141,186,187...|
|(189,[141,186,187...|
|(189,[141,186,187...|
|(189,[141,186,187...|
|(189,[141,186,187...|
|(189,[141,186,187...|
|(189,[141,186,187...|
|(189,[141,186,187...|
|(189,[141,186,187...|
|(189,[162,186,187...|
+--------------------+
only showing top 20 rows



## 2.3 sample and split dataset into trainingData and testData

In [96]:
dataset_used,left_behind=dataset_transformed.randomSplit((0.2,0.8),1)
trainingData,testData=dataset_used.randomSplit((0.8,0.2),1)

## 2.7 feature importances

In [63]:
from pyspark.ml.regression import RandomForestRegressor
rf= RandomForestRegressor(numTrees=10, maxDepth=3, seed=42)
model = rf.fit(dataset_transformed)
print(assemblerInputs)
model.featureImportances

['schedule_departure', 'NEW_DAY']


SparseVector(2, {0: 0.6792, 1: 0.3208})

# 4 machine learning model(by pyspark.ml.)

# 4.1 train model (RandomForest)

In [9]:
from pyspark.ml import Pipeline
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import RegressionEvaluator
numFolds =2

rf = RandomForestRegressor(labelCol="label", featuresCol="features")   
paramGrid = ParamGridBuilder()\
    .addGrid(rf.numTrees,[50]) \
    .build()
crossval = CrossValidator(
    estimator=rf,
    estimatorParamMaps=paramGrid,
    evaluator=RegressionEvaluator(),
    numFolds=numFolds)
model = crossval.fit(trainingData)

## 4.2 linear family model

In [97]:
from pyspark.ml.regression import IsotonicRegression
from pyspark.ml.regression import LinearRegression
from pyspark.ml.regression import GeneralizedLinearRegression
# Isotonic
model = IsotonicRegression(labelCol="label", featuresCol="features").fit(trainingData)

#linear regression
#lr = LinearRegression(maxIter=10, regParam=0.3, elasticNetParam=0.8,labelCol="label", featuresCol="features")
#model = lr.fit(trainingData)

# G LR, gaussian,
#glr = GeneralizedLinearRegression(family="Tweedie", maxIter=10, regParam=0.3,labelCol="label", featuresCol="features")
#model = glr.fit(trainingData)

## 4.2 performance

In [98]:
from pyspark.mllib.evaluation import RegressionMetrics
cvModel=model
trainPredictionsAndLabels = cvModel.transform(trainingData).select("label", "prediction").rdd
validPredictionsAndLabels = cvModel.transform(testData).select("label", "prediction").rdd
trainRegressionMetrics = RegressionMetrics(trainPredictionsAndLabels)
validRegressionMetrics = RegressionMetrics(validPredictionsAndLabels)

bestModel = cvModel
#featureImportances = bestModel.featureImportances.toArray()
#print (featureImportances)

output = str("\n=====================================================================\n" +
      "TrainingData count: {0}\n".format(trainingData.count()) +
      "TestData count: {0}\n".format(testData.count()) +
      "=====================================================================\n" +
      "Training data MSE = {}\n".format(trainRegressionMetrics.meanSquaredError) +
      "Training data RMSE = {}\n".format(trainRegressionMetrics.rootMeanSquaredError) +
      "Training data R-squared = {}\n".format(trainRegressionMetrics.r2) +
      "Training data MAE = {}\n".format(trainRegressionMetrics.meanAbsoluteError) +
      "Training data Explained variance = {}\n".format(trainRegressionMetrics.explainedVariance) +
      "=====================================================================\n" +
      "Validation data MSE = {0}\n".format(validRegressionMetrics.meanSquaredError) +
      "Validation data RMSE = {0}\n".format(validRegressionMetrics.rootMeanSquaredError) +
      "Validation data R-squared = {0}\n".format(validRegressionMetrics.r2) +
      "Validation data MAE = {0}\n".format(validRegressionMetrics.meanAbsoluteError) +
      "Validation data Explained variance = {0}\n".format(validRegressionMetrics.explainedVariance) +
     # "=====================================================================\n" +
      #"CV params explained: {}\n".format(cvModel.explainParams()) +
     # "RandomForest params explained: {}\n".format(bestModel.explainParams()) +
      #"RandomForest features importances:\n {0}\n".format("\n".join(map(lambda z: "{0} = {1}".format(str(z[0]),str(z[1])), zip(featureCols, featureImportances)))) +
"=====================================================================\n")
print(output)


TrainingData count: 91378
TestData count: 22915
Training data MSE = 3793.87410536
Training data RMSE = 61.5944324218
Training data R-squared = -0.975014555477
Training data MAE = 34.2013832651
Training data Explained variance = 1997.64767123
Validation data MSE = 3946.28448614
Validation data RMSE = 62.8194594544
Validation data R-squared = -1.08428008797
Validation data MAE = 34.4607898756
Validation data Explained variance = 2120.10973005



# 3. ML model (by MLlib)
## 3.1 generate RDD

In [99]:
# change into RDD
from pyspark.ml.linalg import Vector as MLVector, Vectors as MLVectors
from pyspark.mllib.linalg import Vector as MLLibVector, Vectors as MLLibVectors
from pyspark.ml import linalg as ml_linalg

def as_mllib(v):
    if isinstance(v, ml_linalg.SparseVector):
        return MLLibVectors.sparse(v.size, v.indices, v.values)
    elif isinstance(v, ml_linalg.DenseVector):
        return MLLibVectors.dense(v.toArray())
    else:
        raise TypeError("Unsupported type: {0}".format(type(v)))
        
airlineRDD=dataset_transformed.rdd.map(lambda row: LabeledPoint(row['label'],as_mllib(row['features'])))

## 3.2 split trainset and testset 

In [100]:
#  Spliting dataset into train and test dtasets
airlineRDD.cache()
use_data,left_data=airlineRDD.randomSplit([0.2,0.8])
trainingData,testData=use_data.randomSplit([0.8,0.2])

## 3.3 use Random Forest classifier

In [101]:
from pyspark.mllib.tree import RandomForest, RandomForestModel
# Train a RandomForest model.
#  Empty categoricalFeaturesInfo indicates all features are continuous.
#  Note: Use larger numTrees in practice.
#  Setting featureSubsetStrategy="auto" lets the algorithm choose.
model = RandomForest.trainRegressor(trainingData, categoricalFeaturesInfo={},
                                        numTrees=100, featureSubsetStrategy="auto",
                                        impurity='variance', maxDepth=10, maxBins=32)

# Evaluate model on test instances and compute test error
#predictions = model.predict(testData.map(lambda x: x.features))
#labelsAndPredictions = testData.map(lambda lp: lp.label).zip(predictions)
#testMSE = labelsAndPredictions.map(lambda lp: (lp[0] - lp[1]) * (lp[0] - lp[1])).sum() /\
#       float(testData.count())
#print('Test Mean Squared Error = ' + str(testMSE))

In [102]:
from pyspark.mllib.evaluation import RegressionMetrics

predictions = model.predict(testData.map(lambda x: x.features))
labelsAndPredictions = testData.map(lambda lp: lp.label).zip(predictions)
# Instantiate metrics object
metrics = RegressionMetrics(labelsAndPredictions)
# Squared Error
print("MSE = %s" % metrics.meanSquaredError)
print("RMSE = %s" % metrics.rootMeanSquaredError)
 # R-squared
print("R-squared = %s" % metrics.r2)
# Mean absolute error
print("MAE = %s" % metrics.meanAbsoluteError)
# Explained variance
print("Explained variance = %s" % metrics.explainedVariance)
# exampleoff

MSE = 1508.56180569
RMSE = 38.8402086206
R-squared = -33.1146795254
MAE = 17.8823082125
Explained variance = 1527.0073945


## 3.4 use GBDT

In [82]:
from pyspark.mllib.tree import GradientBoostedTrees, GradientBoostedTreesModel
model = GradientBoostedTrees.trainRegressor(trainingData,
                                             categoricalFeaturesInfo={}, numIterations=100)

## 3.5 logistic 

In [141]:
from pyspark.mllib.classification import LogisticRegressionWithLBFGS, LogisticRegressionModel

# Build the model
model = LogisticRegressionWithLBFGS.train(trainingData,numClasses=3)
print('finish training')
predictions = model.predict(testData.map(lambda x: x.features))
labelsAndPredictions = testData.map(lambda lp: lp.label).zip(predictions)
testErr = labelsAndPredictions.filter(lambda (v, p): v != p).count() / float(testData.count())
print('Test Error = ' + str(testErr))

Test Error = 0.38938509204


## 3.3 model training

In [45]:
# train models
from pyspark.mllib.regression import LabeledPoint, LinearRegressionWithSGD, LinearRegressionModel
model = LinearRegressionWithSGD.train(trainRDD, iterations=100, step=0.0000001)



## 3.4 model evaluation

In [None]:
# Evaluate the model on training data
valuesAndPreds = testRDD.map(lambda p: (p.label, model.predict(p.features)))
MSE = valuesAndPreds \
    .map(lambda vp: (vp[0] - vp[1])**2) \
    .reduce(lambda x, y: x + y) / valuesAndPreds.count()
print("Mean Squared Error = " + str(MSE))

##  3.5 save model

In [None]:
# Save and load model
model.save(sc, "model/pythonLinearRegressionWithSGDModel")

sameModel = LinearRegressionModel.load(sc, "model/pythonLinearRegressionWithSGDModel")