In [None]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql import SQLContext
import pandas as pd
import sys
import os
from pyspark.sql.functions import isnan, when, count, col
from pyspark.sql.functions import to_timestamp
import time
from pyspark.sql import functions as fn
from pyspark.ml import feature, regression, Pipeline

import datetime
from pyspark.sql.functions import year, month, dayofmonth, hour, minute, second
from pyspark.mllib.stat import Statistics
import pandas as pd

import matplotlib.pyplot as plt
import numpy as np

from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

from pyspark.sql.types import FloatType
from pyspark.sql.types import IntegerType

spark = SparkSession.builder.appName('data-cleaning').\
                        config("spark.executor.instances", '3').\
                        config("spark.executor.memory", '40g').\
                        config('spark.executor.cores', '5').\
                        config('spark.cores.max', '5').appName('data_clean').\
                        getOrCreate()

sqlContext = SQLContext(spark.sparkContext)

from pyspark.ml import Pipeline
from pyspark.ml import feature
from pyspark.ml import classification
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator, RegressionEvaluator

In [None]:
train_data = spark.read.format('csv').option('header', 'true').load('../data/spark_data.csv')

In [None]:
train_data.count()

In [None]:
train_data.limit(3).toPandas()

In [None]:
#train_data.select('town').distinct().collect()

In [None]:
training, validation, test = train_data.randomSplit([0.7, 0.2, 0.1], )

In [None]:
float_columns = ['device_id', 'app_id', 'label_id', 'event_id', 'longitude', 'latitude']
int_columns = ['is_active', 'age', 'is_installed', 'day', 'hour', 'minute', 'seconds']
string_columns = ['gender', 'phone_brand', 'device_model', 'town', 'country', 'category_mapped', 'time_of_day', 'age_group']

training = training.select(*(col(c).cast("float").alias(c) for c in float_columns), \
                                                 *(col(c).cast("int").alias(c) for c in int_columns), \
                                                 *(col(c).alias(c) for c in string_columns))

validation = validation.select(*(col(c).cast("float").alias(c) for c in float_columns), \
                                                 *(col(c).cast("int").alias(c) for c in int_columns), \
                                                 *(col(c).alias(c) for c in string_columns))

test = test.select(*(col(c).cast("float").alias(c) for c in float_columns), \
                                                 *(col(c).cast("int").alias(c) for c in int_columns), \
                                                 *(col(c).alias(c) for c in string_columns))

In [None]:
training.count()

In [None]:
test.count()

In [None]:
# #PCA for town

# gender_indexer = feature.StringIndexer(inputCol="gender", outputCol="gender_label",handleInvalid='skip')
# category_indexer = feature.StringIndexer(inputCol='category_mapped', outputCol='category_encoded',handleInvalid='skip')
# phone_brand_indexer = feature.StringIndexer(inputCol='phone_brand', outputCol='phone_brand_encoded',handleInvalid='skip')
# is_active_indexer = feature.StringIndexer(inputCol='is_active', outputCol='is_active_encoded',handleInvalid='skip')
# device_model_indexer = feature.StringIndexer(inputCol='device_model', outputCol='device_model_encoded',handleInvalid='skip')
# town_indexer = feature.StringIndexer(inputCol='town', outputCol='town_encoded',handleInvalid='skip')
# country_indexer = feature.StringIndexer(inputCol='country', outputCol='country_encoded',handleInvalid='skip')
# time_of_day_indexer = feature.StringIndexer(inputCol='time_of_day', outputCol='time_of_day_encoded',handleInvalid='skip')
# age_group_indexer = feature.StringIndexer(inputCol='age_group', outputCol='age_group_encoded',handleInvalid='skip')
# #area_cluster_id_indexer = feature.StringIndexer(inputCol='area_cluster_id', outputCol='area_cluster_id_encoded',handleInvalid='skip')

# vector_assembler = feature.VectorAssembler(inputCols=['device_id', 'app_id', 'label_id', 'event_id', 'is_active',\
#                                                       'device_model_encoded', 'phone_brand_encoded', 'gender_label', 'country_encoded',\
#                                                       'time_of_day_encoded', 'age_group_encoded', 'category_encoded'],
#                                         outputCol='features')
# sc = feature.StandardScaler(inputCol='features',outputCol='sfeatures')

# evaluator = BinaryClassificationEvaluator(labelCol='gender_label')

# pipe_prep_location=Pipeline(stages=[gender_indexer, category_indexer,phone_brand_indexer, is_active_indexer, device_model_indexer,\
#                            town_indexer, country_indexer, time_of_day_indexer, age_group_indexer, \
#                            vector_assembler, sc])

In [None]:
# from pyspark.ml.clustering import KMeans

# # df_kmeans.show()
# df_kmeans = pipe_prep_location.fit(training).transform(training)
# cost = np.zeros(30)
# for k in range(2,30):
#     kmeans = KMeans().setK(k).setSeed(1).setFeaturesCol("sfeatures")
#     model = kmeans.fit(df_kmeans.sample(False,0.1, seed=42))
#     cost[k] = model.computeCost(df_kmeans) # requires Spark 2.0 or later
    
# fig, ax = plt.subplots(1,1, figsize =(8,6))
# ax.plot(range(2,30),cost[2:30])
# ax.set_xlabel('k')
# ax.set_ylabel('cost')

In [None]:
# from pyspark.ml.clustering import KMeans
# k = 30
# df_kmeans = pipe_prep_location.fit(train_data).transform(train_data)
# kmeans = KMeans().setK(k).setSeed(1).setFeaturesCol("sfeatures")
# model = kmeans.fit(df_kmeans)
# centers = model.clusterCenters()

# print("Cluster Centers: ")
# for center in centers:
#     print(center)

# new_data = model.transform(df_kmeans)

In [None]:
# Gender Pipeline
gender_indexer = feature.StringIndexer(inputCol="gender", outputCol="gender_label",handleInvalid='skip')
category_indexer = feature.StringIndexer(inputCol='category_mapped', outputCol='category_encoded',handleInvalid='skip')
phone_brand_indexer = feature.StringIndexer(inputCol='phone_brand', outputCol='phone_brand_encoded',handleInvalid='skip')
is_active_indexer = feature.StringIndexer(inputCol='is_active', outputCol='is_active_encoded',handleInvalid='skip')
device_model_indexer = feature.StringIndexer(inputCol='device_model', outputCol='device_model_encoded',handleInvalid='skip')
town_indexer = feature.StringIndexer(inputCol='town', outputCol='town_encoded',handleInvalid='skip')
country_indexer = feature.StringIndexer(inputCol='country', outputCol='country_encoded',handleInvalid='skip')
time_of_day_indexer = feature.StringIndexer(inputCol='time_of_day', outputCol='time_of_day_encoded',handleInvalid='skip')
age_group_indexer = feature.StringIndexer(inputCol='age_group', outputCol='age_group_encoded',handleInvalid='skip')
#area_cluster_id_indexer = feature.StringIndexer(inputCol='area_cluster_id', outputCol='area_cluster_id_encoded',handleInvalid='skip')

vector_assembler = feature.VectorAssembler(inputCols=['device_id', 'app_id', 'label_id', 'event_id', 'is_active',\
                                                      'device_model_encoded', 'phone_brand_encoded', 'town_encoded', 'country_encoded',\
                                                      'time_of_day_encoded', 'age_group_encoded', 'category_encoded'],
                                        outputCol='features')
sc = feature.StandardScaler(inputCol='features',outputCol='sfeatures')

evaluator = BinaryClassificationEvaluator(labelCol='gender_label')

pipe_prep=Pipeline(stages=[gender_indexer, category_indexer,phone_brand_indexer, is_active_indexer, device_model_indexer,\
                           town_indexer, country_indexer, time_of_day_indexer, age_group_indexer, \
                           vector_assembler, sc])

In [None]:
pca=feature.PCA(k=2, inputCol='sfeatures', outputCol='pfeat')

pipe_pca=Pipeline(stages=[pipe_prep,pca]).fit(training)

pca_mod=pipe_pca.transform(training)

feat=train_data.columns
actfeat=['device_id', 'app_id', 'label_id', 'event_id', 'is_active',\
                                                      'device_model_encoded', 'phone_brand_encoded', 'town_encoded', 'country_encoded',\
                                                      'time_of_day_encoded', 'age_group_encoded', 'category_encoded']

# feat

# actfeat=pca_mod.columns

pca=pipe_pca.stages[1].pc.toArray()

pc1_df=pd.DataFrame([pca[:, 0],actfeat]).T.rename(columns={0:'pc1',1:'abs_loadings'})
pc2_df=pd.DataFrame([pca[:, 1],actfeat]).T.rename(columns={0:'pc2',1:'abs_loadings'})

In [None]:
pc1_df.pc1=pc1_df.pc1.abs()

pc1_df.sort_values(by=['pc1'],ascending=False)

In [None]:
pc2_df.pc2=pc2_df.pc2.abs()

pc2_df.sort_values(by=['pc2'],ascending=False)

In [None]:
# logistic with default or no parameters

logistic = classification.LogisticRegression(labelCol='gender_label', featuresCol='sfeatures')

lr_pipe = Pipeline(stages=[pipe_prep, logistic]).fit(training)

result1=evaluator.evaluate(lr_pipe.transform(test))

result1

In [None]:
auroc = evaluator.evaluate(lr_pipe.transform(test), {evaluator.metricName: "areaUnderROC"})
auprc = evaluator.evaluate(lr_pipe.transform(test), {evaluator.metricName: "areaUnderPR"})
print("Area under ROC Curve: {:.4f}".format(auroc))
print("Area under PR Curve: {:.4f}".format(auprc))

In [None]:
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import RegressionEvaluator

lr_pipe = Pipeline(stages=[pipe_prep, logistic])

lr_param = ParamGridBuilder() \
    .addGrid(logistic.elasticNetParam, [0.0, 0.5, 1.0]) \
    .addGrid(logistic.regParam, [0.1, 0.01]) \
    .build()

evaluator = BinaryClassificationEvaluator(labelCol="gender_label", metricName='areaUnderROC')
#evaluator = RegressionEvaluator(labelCol="town_encoded", predictionCol="prediction", metricName="r2")
#evaluator = MulticlassClassificationEvaluator(labelCol = 'town_encoded', predictionCol='prediction', metricName = 'accuracy')
crossval = CrossValidator(estimator=lr_pipe,
                         estimatorParamMaps=lr_param,
                         evaluator=evaluator,
                         numFolds=3)

cvmodel = crossval.fit(validation)
cvmodel.bestModel.stages[-1].extractParamMap()

In [None]:
# RF

rf=classification.RandomForestClassifier(labelCol='gender_label', featuresCol='sfeatures')

rf_pipe=Pipeline(stages=[pipe_prep,rf]).fit(training)

evaluator = BinaryClassificationEvaluator(labelCol = 'gender_label', metricName ='areaUnderROC')

resultrf4=evaluator.evaluate(rf_pipe.transform(validation))

resultrf4

In [None]:
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import RegressionEvaluator

rf_pipe=Pipeline(stages=[pipe_prep,rf])

rfParam = ParamGridBuilder() \
.addGrid(rf.maxDepth, [4, 6, 8, 10]) \
.addGrid(rf.maxBins, [5, 10, 20]) \
.addGrid(rf.impurity, ["gini"]) \
.build()

# gbtParam = (ParamGridBuilder()
#              .addGrid(gbt.maxDepth, [4, 6, 8, 10])
#              .addGrid(gbt.maxBins, [5, 10, 20, 40])
#              .addGrid(gbt.maxIter, [5, 10, 15])
#              .build())

evaluator = BinaryClassificationEvaluator(labelCol="gender_label", metricName='areaUnderROC')
#evaluator = RegressionEvaluator(labelCol="town_encoded", predictionCol="prediction", metricName="r2")
#evaluator = MulticlassClassificationEvaluator(labelCol = 'town_encoded', predictionCol='prediction', metricName = 'accuracy')
crossval = CrossValidator(estimator=rf_pipe,
                         estimatorParamMaps=rfParam,
                         evaluator=evaluator,
                         numFolds=3)

#cv = CrossValidator(estimator=random_forest_pipeline, estimatorParamMaps=paramGrid, evaluator=evaluator, numFolds=5)
cvmodel = crossval.fit(validation)
cvmodel.bestModel.stages[-1].extractParamMap()

In [None]:
# RF
rf=classification.RandomForestClassifier(labelCol='gender_label', featuresCol='sfeatures', \
                                         maxDepth=10, maxBins=20, numTrees=20)

rf_pipe=Pipeline(stages=[pipe_prep,rf]).fit(training)

evaluator = BinaryClassificationEvaluator(labelCol="gender_label", metricName='areaUnderROC')
resultrf4=evaluator.evaluate(rf_pipe.transform(test))

In [None]:
auroc = evaluator.evaluate(rf_pipe.transform(test), {evaluator.metricName: "areaUnderROC"})
auprc = evaluator.evaluate(rf_pipe.transform(test), {evaluator.metricName: "areaUnderPR"})
print("Area under ROC Curve: {:.4f}".format(auroc))
print("Area under PR Curve: {:.4f}".format(auprc))

In [None]:
#GBT without Cross Validation

gbt = classification.GBTClassifier(labelCol='gender_label', featuresCol='sfeatures')
gbt_pipe=Pipeline(stages=[pipe_prep,gbt]).fit(training)

evaluator = BinaryClassificationEvaluator(labelCol="gender_label", metricName='areaUnderROC')

resultrf4=evaluator.evaluate(gbt_pipe.transform(test))

resultrf4

In [None]:
# GBT Cross Validation

from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import RegressionEvaluator

gbt_pipe = Pipeline(stages=[pipe_prep, gbt])

gbtParam = (ParamGridBuilder()
             .addGrid(gbt.maxDepth, [4, 6, 8, 10])
             .addGrid(gbt.maxBins, [5, 10, 20])
             .addGrid(gbt.maxIter, [2])
             .build())

evaluator = BinaryClassificationEvaluator(labelCol="gender_label", metricName='areaUnderROC')
#evaluator = RegressionEvaluator(labelCol="town_encoded", predictionCol="prediction", metricName="r2")
#evaluator = MulticlassClassificationEvaluator(labelCol = 'gender_label', predictionCol='prediction', metricName = 'accuracy')
crossval = CrossValidator(estimator=gbt_pipe,
                         estimatorParamMaps=gbtParam,
                         evaluator=evaluator,
                         numFolds=3)

#cv = CrossValidator(estimator=random_forest_pipeline, estimatorParamMaps=paramGrid, evaluator=evaluator, numFolds=5)


cvmodel = crossval.fit(validation)
cvmodel.bestModel.stages[-1].extractParamMap()

In [None]:
gbt = classification.GBTClassifier(labelCol='gender_label', featuresCol='sfeatures', maxDepth=10, maxBins=20, lossType = 'logistic', maxIter=2)
gbt_pipe=Pipeline(stages=[pipe_prep,gbt]).fit(training)

evaluator = BinaryClassificationEvaluator(labelCol="gender_label", metricName='areaUnderROC')

resultrf4=evaluator.evaluate(gbt_pipe.transform(test))

resultrf4

In [None]:
auroc = evaluator.evaluate(gbt_pipe.transform(test), {evaluator.metricName: "areaUnderROC"})
auprc = evaluator.evaluate(gbt_pipe.transform(test), {evaluator.metricName: "areaUnderPR"})
print("Area under ROC Curve: {:.4f}".format(auroc))
print("Area under PR Curve: {:.4f}".format(auprc))