In [65]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [66]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://archive.apache.org/dist/spark/spark-3.1.1/spark-3.1.1-bin-hadoop3.2.tgz
!tar xf spark-3.1.1-bin-hadoop3.2.tgz
!pip install -q findspark

In [67]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.1.1-bin-hadoop3.2"

In [68]:
import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
spark.conf.set("spark.sql.repl.eagerEval.enabled", True)
spark

In [69]:
import sklearn
from sklearn.metrics import classification_report, confusion_matrix

In [131]:
from pyspark.ml import Pipeline
from pyspark.ml.classification import GBTClassifier, GBTClassificationModel
from pyspark.ml.feature import StringIndexer, VectorIndexer
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

## WARNING, FILE "delay_clean_SVM.txt" is > 1.2 GB  --  added to gitignore
## Using reduced dataset.....

In [71]:
# Load and parse the data file, converting it to a DataFrame
clean = spark.read.format("libsvm").load('/content/drive/MyDrive/Colab_Notebooks/delay_clean_SVMCOPY.txt')

In [72]:
clean.printSchema()

root
 |-- label: double (nullable = true)
 |-- features: vector (nullable = true)



In [73]:
# Number of rows in dataset
number_rows = clean.count()
number_rows

3

In [74]:
clean.groupBy('label').count().show()

+-----+-----+
|label|count|
+-----+-----+
|  0.0|    3|
+-----+-----+



In [75]:
# Index labels, adding metadata to the label column
# Fit on whole dataset to include all labels in index
labelIndexer = StringIndexer(inputCol = "label", outputCol = "indexedLabel").fit(clean)

In [76]:
# Automatically identify categorical features, and index them
# Set maxCategories so features with > 4 distinct values are treated as continuous
featureIndexer = VectorIndexer(inputCol = "features", outputCol = "indexedFeatures", maxCategories = 4).fit(clean)

In [77]:
from pyspark.ml.feature import Normalizer

In [78]:
normalizer = Normalizer(inputCol = "features", outputCol = "normFeatures", p = 1.0)
NormOutput = normalizer.transform(clean)

In [79]:
# Split the data into training and test sets
# (trainingData, testData) = clean.randomSplit([0.7, 0.3])

In [80]:
# trainingData.show(5)

In [81]:
# testData.show(5)

## Oversampling performed to dataset
### https://medium.com/@junwan01/oversampling-and-undersampling-with-pyspark-5dbc25cdf253

In [82]:
from pyspark.sql.functions import col, explode, array, lit

In [83]:
# major_df = clean.filter(col("label") == 0)
# minor_df = clean.filter(col("label") == 1)
# ratio = int(major_df.count()/minor_df.count())
# print("Ratio of original dataset: {}".format(ratio)+" to 1 (on time : delayed flights)")

In [84]:
# a = range(ratio)

In [85]:
# duplicate the minority rows
# oversampled_df = minor_df.withColumn("dummy", explode(array([lit(x) for x in a]))).drop('dummy')

In [86]:
# combine both oversampled minority rows and previous majority rows
# combined_df = major_df.unionAll(oversampled_df)
# combined_df.show()

In [87]:
# combined_df.groupBy('label').count().show()

In [88]:
# Split the data into training and test sets
# (trainingData, testData) = combined_df.randomSplit([0.7, 0.3])

In [89]:
# trainingData.show(5)

In [90]:
# testData.show()

# Gradient-boosted tree classifier (GBT)

In [91]:
# Train a GBT model
# gbt = GBTClassifier(labelCol = "indexedLabel", featuresCol = "indexedFeatures", maxIter = 30, maxDepth = 10,
#                     stepSize = 1)

In [92]:
# Chain indexers and GBT in a Pipeline
# pipeline = Pipeline(stages = [labelIndexer, featureIndexer, gbt])

In [93]:
# Train model.  This also runs the indexers

# import time

# start_time = time.time()

# model = pipeline.fit(trainingData)

# print("Training Time: %s seconds" % (str(time.time() - start_time)))

In [94]:
# Make predictions
# predictions = model.transform(testData)

In [95]:
# Select example rows to display
# predictions.select("prediction", "indexedLabel", "features").show(5)

In [96]:
# Select (prediction, true label) and compute test error
# evaluator = MulticlassClassificationEvaluator(
#     labelCol = "indexedLabel", predictionCol = "prediction", metricName = "accuracy")
# accuracy = evaluator.evaluate(predictions)
# print("Accuracy = %g" % accuracy)
# print("Test Error = %g" % (1.0 - accuracy))

In [97]:
# gbtModel = model.stages[2]
# print(gbtModel)  # summary only

In [98]:
# y_true = predictions.select(['indexedLabel']).collect()
# y_pred = predictions.select(['prediction']).collect()

In [99]:
# print(confusion_matrix(y_true, y_pred))

In [100]:
# print(classification_report(y_true, y_pred))

In [101]:
# importanceSummary = gbtModel.featureImportances
# importanceSummary

In [102]:
# from matplotlib import pyplot as plt
# plt. bar(x = range (len (importanceSummary)), height = importanceSummary)
# plt.show()

In [151]:
# Saving trained model
model_path = "./SavedModels/" + "GBTmodel"
# model_path = "./drive/" + "./MyDrive/" + "./SavedModels/" + "GBTmodel"
# model_path = "./drive/MyDrive/SavedModels/GBTmodel"
# model_path = "./SavedModels/" + "GBTmodel"

# model.write().overwrite().save(model_path)

In [152]:
# Loading saved model

loaded_model = GBTClassifier.load(model_path)

Py4JJavaError: ignored

In [None]:
# Make predictions with loaded model

predictions = loaded_model.transform(clean)

In [None]:
# Select example rows to display from prediction

predictions.select("prediction", "indexedLabel", "features").show(5)

In [None]:
stop

In [None]:
import pyspark
from pyspark.ml import PipelineModel

spark = pyspark.sql.SparkSession.builder.appName("pyspark_runtime").getOrCreate()

model_unpacked = "./SavedModels/" + "./hdfsData/" + "GBTmodel.zip"
shutil.unpack_archive(model_path, model_unpacked)

trainedModel = PipelineModel.load(model_unpacked)

In [None]:
input = testData

In [None]:
model_path = "./SavedModels/" + "./hdfsData/" + "GBTmodel"
predictions = model_path.transform(input)

In [None]:
preds = [x['prediction'] for x in predictions
print('[INFO] Results was ' + json.dumps(preds))

In [None]:
# import pickle
# with open('model.pkl', 'wb') as f:
#   pickle.dump(model, f)

In [None]:
# import joblib
# joblib.dump(model, "model_joblib.pkl")

In [None]:
# import sklearn
# scikit_ver = sklearn.__version__
# print(scikit_ver)
# joblib.dump(model, "model_{version}.pkl".format(version = scikit_ver))

In [None]:
# import pickle
# pickle_out = open("features.pickle","wb")
# pickle.dump(features, pickle_out)
# pickle_out.close()

In [None]:
stop

# Random forest classifier (RFC)

In [None]:
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.feature import IndexToString

In [None]:
# Train a RandomForest model
rf = RandomForestClassifier(labelCol = "indexedLabel", featuresCol = "indexedFeatures", numTrees = 40)

In [None]:
# Convert indexed labels back to original labels
labelConverter = IndexToString(inputCol = "prediction", outputCol = "predictedLabel",
                               labels = labelIndexer.labels)

In [None]:
# Chain indexers and forest in a Pipeline
pipeline = Pipeline(stages = [labelIndexer, featureIndexer, rf, labelConverter])

In [None]:
# Train model.  This also runs the indexers
model = pipeline.fit(trainingData)

In [None]:
# Make predictions
predictions1 = model.transform(testData)

In [None]:
# Select example rows to display
predictions1.select("predictedLabel", "label", "features").show(5)

In [None]:
# Select (prediction, true label) and compute test error
evaluator = MulticlassClassificationEvaluator(
    labelCol = "indexedLabel", predictionCol = "prediction", metricName = "accuracy")
accuracy = evaluator.evaluate(predictions1)
print("Accuracy = %g" % accuracy)
print("Test Error = %g" % (1.0 - accuracy))

In [None]:
rfModel = model.stages[2]
print(rfModel)  # summary only

In [None]:
print(rfModel.featureImportances)

In [None]:
from matplotlib import pyplot as plt
plt. bar(x = range (len (rfModel.featureImportances)), height = rfModel.featureImportances)
plt.show()

In [None]:
y_true = predictions1.select(['indexedLabel']).collect()
y_pred = predictions1.select(['prediction']).collect()

In [None]:
print(confusion_matrix(y_true, y_pred))

In [None]:
print(classification_report(y_true, y_pred))

In [None]:
model_path = "./hdfsData/" + "RFmodel"
model.write().overwrite().save(model_path)

# Factorization machines classifier

In [None]:
from pyspark.ml.classification import FMClassifier
from pyspark.ml.feature import MinMaxScaler

In [None]:
# Index labels, adding metadata to the label column
# Fit on whole dataset to include all labels in index
labelIndexer = StringIndexer(inputCol = "label", outputCol = "indexedLabel").fit(clean)

# Scale features
featureScaler = MinMaxScaler(inputCol = "features", outputCol = "scaledFeatures").fit(clean) 

In [None]:
# Split the data into training and test sets (30% held out for testing)
(trainingData, testData) = clean.randomSplit([0.7, 0.3])

In [None]:
# Train a FM model
fm = FMClassifier(labelCol = "indexedLabel", featuresCol = "scaledFeatures", stepSize = 0.01)

In [None]:
# Create a Pipeline
pipeline = Pipeline(stages=[labelIndexer, featureScaler, fm])

In [None]:
# Train model
model = pipeline.fit(trainingData)

In [None]:
# Make predictions
predictions2 = model.transform(testData)

In [None]:
# Select example rows to display
predictions2.select("prediction", "indexedLabel", "features").show(5)

In [None]:
# Select (prediction, true label) and compute test accuracy
evaluator = MulticlassClassificationEvaluator(
    labelCol = "indexedLabel", predictionCol = "prediction", metricName = "accuracy")
accuracy = evaluator.evaluate(predictions2)
print("Test set accuracy = %g" % accuracy)
print("Test Error = %g" % (1.0 - accuracy))

In [None]:
y_true = predictions2.select(['indexedLabel']).collect()
y_pred = predictions2.select(['prediction']).collect()

In [None]:
print(confusion_matrix(y_true, y_pred))

In [None]:
print(classification_report(y_true, y_pred))

In [None]:
model_path = "./hdfsData/" + "FMmodel"
model.write().overwrite().save(model_path)