In [1]:
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt
import numpy as np
import random

from pyspark import *
from pyspark.ml import Pipeline
from pyspark.sql.functions import *
from pyspark.ml.feature import StringIndexer, Bucketizer, VectorAssembler, StandardScaler
from pyspark.ml.stat import Correlation
from pyspark.ml.linalg import Vectors
from pyspark.ml.regression import LinearRegression, DecisionTreeRegressor, GBTRegressor
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.mllib.stat import Statistics
from pyspark.mllib.evaluation import MulticlassMetrics

from keras.models import Sequential
from keras.layers.core import Dense, Dropout, Activation
from keras.utils import np_utils, generic_utils
from elephas.ml_model import ElephasEstimator
from elephas import optimizers as elephas_optimizers

#### Exploratory data analysis (EDA)

In [3]:
# Carrega dataset do desafio (basta descompactar todos na mesma pasta)
#display(dbutils.fs.ls("dbfs:/FileStore/tables/"))
df_desafio = spark.read.format("csv").options(header='true').load("/FileStore/tables/*.csv")

In [4]:
# Elimina na's e duplicados do df
df_desafio_v2 = df_desafio.dropna(how='any').dropDuplicates()

# Ajusta tipo de colunas
df_desafio_v2 = df_desafio_v2.selectExpr(
  'cast(time as timestamp) time',
  'ca',
  'unit',
  'scp',
  'station',
  'linename',
  'division',
  'desc',
  'cast(entries as int) entries',
  'cast(exits as int) exits'
)

# Features para visão temporal
df_desafio_v2 = df_desafio_v2.withColumn(
  "dt_year",
  year(col("time"))
).withColumn(
  "dt_month",
  month(col("time"))
).withColumn(
  "dt_day",
  dayofmonth(col("time"))
).withColumn(
  "dt_dayofy",
  dayofyear(col("time"))  
).withColumn(
  "dt_hour",
  hour(col("time"))
).withColumn(
  "dt_min",
  minute(col("time"))
).withColumn(
  "dt_week_no",
  weekofyear(col("time"))
).withColumn(
  "dt_int",
  unix_timestamp(col("time"))
).withColumn(
  "dt_month_year",
  date_format(col("time"), "Y-MM")
)

In [5]:
# Intervalos para buckets
splits = [-30000000, -20000000, -10000000, 0.0, 10000000, 20000000, 30000000]

# ===> Bucket: 'entries'
# dataFrame = df_desafio_v2.select(col('entries'))
# bucketizer = Bucketizer(splits=splits, inputCol="entries", outputCol="bucketedFeatures")
# bucketedData_entries = bucketizer.transform(dataFrame)
# sorted(bucketedData_entries.groupBy("bucketedFeatures").count().collect())
# [Row(bucketedFeatures=0.0, count=15721),
#  Row(bucketedFeatures=1.0, count=4028),
#  Row(bucketedFeatures=2.0, count=496),
#  Row(bucketedFeatures=4.0, count=71457425), ==> Begin
#  Row(bucketedFeatures=5.0, count=5052549),  <== End
#  Row(bucketedFeatures=6.0, count=278162),
#  Row(bucketedFeatures=7.0, count=2237294)]

# ===> Bucket: 'exits'
# dataFrame = df_desafio_v2.select(col('exits'))
# bucketizer = Bucketizer(splits=splits, inputCol="exits", outputCol="bucketedFeatures")
# bucketedData_exits = bucketizer.transform(dataFrame)
# sorted(bucketedData_exits.groupBy("bucketedFeatures").count().collect())
# [Row(bucketedFeatures=0.0, count=30576),
#  Row(bucketedFeatures=2.0, count=576),
#  Row(bucketedFeatures=3.0, count=1),
#  Row(bucketedFeatures=4.0, count=73751241), ==> Begin
#  Row(bucketedFeatures=5.0, count=3698911),  <== End
#  Row(bucketedFeatures=6.0, count=413740),
#  Row(bucketedFeatures=7.0, count=1150630)]

# Parâmetros para filtros de outliers
outlier_begin = 0
outlier_end = 20000000

df_desafio_v2 = df_desafio_v2.where((col('entries')>=outlier_begin) & (col('entries')<=outlier_end) & (col('exits')>=outlier_begin) & (col('exits')<=outlier_end))

In [6]:
# Check: 79.609.191 / 79.130.015 / 79.045.675 / 75.923.980
count_desafio = df_desafio.count()
count_desafio_na = df_desafio.dropna(how='any').count()
count_desafio_final = df_desafio.dropna(how='any').dropDuplicates().count()
count_desafio_outliers = df_desafio_v2.count()

df_amostras = sc.parallelize([
  ('antes',count_desafio,0,0,0,0),
  ('depois',0,count_desafio_final,count_desafio-count_desafio_na,count_desafio_na-count_desafio_final,count_desafio_final-count_desafio_outliers)
]).toDF(['AMOSTRAS','TOTAL','UNICO','NA','DUPLICADO','OUTLIERS'])

display(df_amostras)

In [7]:
df_amostras_v2 = sc.parallelize([
  ('',count_desafio-count_desafio_na,count_desafio_na-count_desafio_final,count_desafio_final-count_desafio_outliers)
]).toDF(['AMOSTRAS','NA','DUPLICADO','OUTLIERS'])

display(df_amostras_v2)

In [8]:
# Método para variáveis categóricas (dummys) ~14.59 minutes
lista_idx = ['ca', 'unit', 'scp', 'station', 'linename', 'division', 'desc']
indexers = [StringIndexer(inputCol=column,outputCol=column+"_idx").fit(df_desafio_v2) for column in lista_idx]
pipeline = Pipeline(stages=indexers)
df_desafio_v2 = pipeline.fit(df_desafio_v2).transform(df_desafio_v2)
#display(df_desafio_v2)

In [9]:
df_desafio_v2.write.parquet("/FileStore/tables/df_desafio.parquet")
df_desafio_num.write.parquet("/FileStore/tables/df_desafio_ml.parquet")

In [10]:
#df_graficos = df_desafio_v2.filter(col('dt_year')=='2017')
df_graficos = df_desafio_v2.groupBy(
  'dt_month_year',
  'dt_year',
  'dt_month',
  'dt_day',
  'dt_hour'
).agg(
  sum('entries'),
  sum('exits')
).orderBy(
  "dt_month_year"
).toPandas()

#df_desafio_num.describe().toPandas().transpose()
#df_desafio_num.dtypes


#### Carga de Parquet para modelos

In [12]:
# Carrega Parquet
df_desafio_pqt = spark.read.parquet("/FileStore/tables/df_desafio.parquet")
df_desafio_num = df_desafio_pqt.select(
  'entries','exits','dt_year','dt_month','dt_day','dt_dayofy',
  'dt_hour','dt_min','dt_week_no','dt_int','ca_idx','unit_idx',
  'scp_idx','station_idx','linename_idx','division_idx','desc_idx'
)

#### Correlação entre features

In [14]:
col_names = df_desafio.columns
features = df_desafio.rdd.map(lambda row: row[0:])
corr_mat = Statistics.corr(features, method="pearson")
corr_df = pd.DataFrame(corr_mat)
corr_df.index, corr_df.columns = col_names, col_names
spark.createDataFrame(corr_df).write.parquet("/FileStore/tables/corr_df.parquet")

#### Modelo de Machine Learning - Regressão Linear

In [16]:
# Verorização de features
colunas = ['exits', 'dt_year', 'dt_month', 'dt_day', 'dt_dayofy', 'dt_hour', 'dt_min', 'dt_week_no', 'dt_int', 'ca_idx', 'unit_idx', 'scp_idx', 'station_idx', 'linename_idx', 'division_idx', 'desc_idx']
vectorAssembler = VectorAssembler(inputCols = colunas, outputCol = 'features')
vdf_desafio_num = vectorAssembler.transform(df_desafio_num)
vdf_desafio_num.take(1)

# Seleciona features de interesse
vdf_desafio_num = vdf_desafio_num.select(['features', 'entries'])
vdf_desafio_num.show(3)

# Split para dados de treino/teste
splits = vdf_desafio_num.randomSplit([0.7, 0.3])
train_df = splits[0]
test_df = splits[1]

In [17]:
# Treinando os dados de treino para predizer os dados de teste
lr = LinearRegression(featuresCol = 'features', labelCol='entries', maxIter=10, regParam=0.3, elasticNetParam=0.8)
lr_model = lr.fit(train_df)

#Salva modelo treinado
lr_model.save("/FileStore/tables/lr_model")

In [18]:
#Resumo do treinamento
print("Coefficients: " + str(lr_model.coefficients))
print("Intercept: " + str(lr_model.intercept))

# Resumo do modelo treinado
trainingSummary = lr_model.summary
print("RMSE: %f" % trainingSummary.rootMeanSquaredError)
print("r2: %f" % trainingSummary.r2)

In [19]:
# Descritiva da variável de interesse treinada
train_df.describe().show()

In [20]:
# Predição dos dados de teste vs Real
lr_predictions = lr_model.transform(test_df)
lr_predictions.select("prediction","entries","features").show(5)
lr_evaluator = RegressionEvaluator(predictionCol="prediction", labelCol="entries",metricName="r2")
print("R Squared (R2) on test data = %g" % lr_evaluator.evaluate(lr_predictions))

# Resultado RMSE do modelo de teste
test_result = lr_model.evaluate(test_df)
print("Root Mean Squared Error (RMSE) on test data = %g" % test_result.rootMeanSquaredError)

In [21]:
# Histograma para residuals
print("numIterations: %d" % trainingSummary.totalIterations)
print("objectiveHistory: %s" % str(trainingSummary.objectiveHistory))
#display(trainingSummary.residuals)

#### Modelo de Deep Learning - Elephas

In [23]:
# Etapa 01 - Transformações: VectorAssembler

# Variáveis para modelo
features = ['entries', 'exits', 'dt_year', 'dt_month', 'dt_day', 'dt_dayofy', 'dt_hour', 'dt_min', 'dt_week_no', 'dt_int', 'ca_idx', 'unit_idx', 'scp_idx', 'station_idx', 'linename_idx', 'division_idx', 'desc_idx']
var_predict = 'entries'
features.remove(var_predict)

# Vetorização de features
vectorAssembler = VectorAssembler(inputCols = features, outputCol = 'features')
df_desafio = vectorAssembler.transform(df_desafio_num)

# Seleciona features + interesse (y)
df_desafio = df_desafio.select(['features', var_predict])

# Divisão de dados para treino e teste
splits = df_desafio.randomSplit([0.7, 0.3])
train_df = splits[0]
test_df = splits[1]

In [24]:
# Etapa 02 - Transformações: StringIndexer & StandardScaler
string_indexer = StringIndexer(inputCol=var_predict, outputCol="index_category")
fitted_indexer = string_indexer.fit(train_df)
indexed_df = fitted_indexer.transform(train_df)

scaler = StandardScaler(inputCol="features", outputCol="scaled_features", withStd=True, withMean=True)
fitted_scaler = scaler.fit(indexed_df)
scaled_df = fitted_scaler.transform(indexed_df)

In [25]:
# Keras Deep Learning model
nb_classes = train_df.select(var_predict).distinct().count()
input_dim = len(train_df.select("features").first()[0])

model = Sequential()
model.add(Dense(512, input_shape=(input_dim,)))
model.add(Activation('relu'))
model.add(Dropout(0.5))
model.add(Dense(512))
model.add(Activation('relu'))
model.add(Dropout(0.5))
model.add(Dense(512))
model.add(Activation('relu'))
model.add(Dropout(0.5))
model.add(Dense(nb_classes))
model.add(Activation('softmax'))

model.compile(loss='categorical_crossentropy', optimizer='adam')

In [26]:
# Define elephas optimizer (which tells the model how to aggregate updates on the Spark master)
Adadelta = elephas_optimizers.Adadelta()

# Initialize SparkML Estimator and set all relevant properties
estimator = ElephasEstimator()
estimator.setFeaturesCol("scaled_features")             # These two come directly from pyspark,
estimator.setLabelCol("index_category")                 # hence the camel case. Sorry :)
estimator.set_keras_model_config(model.to_yaml())       # Provide serialized Keras model
estimator.set_optimizer_config(Adadelta.get_config())   # Provide serialized Elephas optimizer
estimator.set_categorical_labels(True)
estimator.set_nb_classes(nb_classes)
estimator.set_num_workers(3)  # We just use one worker here. Feel free to adapt it.
estimator.set_epochs(20) 
estimator.set_batch_size(128)
estimator.set_verbosity(1)
estimator.set_validation_split(0.15)
estimator.set_loss('squaredError')

In [27]:
# SparkML Pipelines
pipeline = Pipeline(stages=[string_indexer, scaler, estimator])

In [28]:
# Fitting and evaluating the pipeline
fitted_pipeline = pipeline.fit(train_df) # Fit model to data

In [29]:
prediction = fitted_pipeline.transform(train_df) # Evaluate on train data.
# prediction = fitted_pipeline.transform(test_df) # <-- The same code evaluates test data.
pnl = prediction.select("index_category", "prediction")
pnl.show(100)

prediction_and_label = pnl.map(lambda row: (row.index_category, row.prediction))
metrics = MulticlassMetrics(prediction_and_label)
print(metrics.precision())