# Módulo # 4 - Big Data

## Tarea # 3 
## Autor: Jose Martinez

# Datos de Entrada

## Abandono de Banco

Este conjunto de datos contiene detalles de los clientes de un banco y la variable objetivo es una variable binaria que refleja el hecho de si el cliente dejó el banco (cerró su cuenta) o si continúa siendo un cliente.

### Features

- `RowNumber`: Número de fila (Int)
- `CustomerId`: Identificador del cliente (Int)
- `Surname`: apellido del cliente (String)
- `CreditScore`: puntaje de crédito del cliente (Number)
- `Geography`: geografía del cliente (String)
- `Gender`: Sexo del cliente (String)
- `Age`: Edad del cliente (Int)
- `Tenure`: Número de años que el cliente ha estado en el banco (Int)
- `Balance`: estado de cuenta del cliente (Float)
- `NumOfProducts`: numero de productos del cliente (Int)
- `HasCrCard`: tiene tarjeta de crédito (Bool)
- `IsActiveMember`: es un miembro activo (Bool)
- `EstimatedSalary`: salario estimado del cliente (Float)

### Objetivo Predictivo

- `Exited`: si el cliente abandonó el banco (Bool)

In [None]:
import findspark
findspark.init('/usr/lib/python3.7/site-packages/pyspark')

from pyspark.sql.types import (StringType, IntegerType, FloatType, 
                                StructField, StructType)

from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .appName("Bigdata: Tarea 3") \
    .config("spark.driver.extraClassPath", "postgresql-42.2.14.jar") \
    .config("spark.executor.extraClassPath", "postgresql-42.2.14.jar") \
    .getOrCreate()

# Define the schema of the dataframe
churn_df = spark \
    .read \
    .format("csv") \
    .option("path", "churn_modelling.csv") \
    .option("header", True) \
    .schema(StructType([
                StructField("RowNumber", IntegerType()),
                StructField("CustomerId", IntegerType()),
                StructField("Surname", StringType()),
                StructField("CreditScore", IntegerType()),
                StructField("Geography", StringType()),
                StructField("Gender", StringType()),
                StructField("Age", IntegerType()),
                StructField("Tenure", IntegerType()),
                StructField("Balance", FloatType()),
                StructField("NumOfProducts", IntegerType()),
                StructField("HasCrCard", IntegerType()),
                StructField("IsActiveMember", IntegerType()),
                StructField("EstimatedSalary", FloatType()),
                StructField("Exited", IntegerType())])) \
    .load()

# Print the schema of the dataframe
churn_df.printSchema()
churn_df.show()

# Preprocesamiento de datos

In [None]:
from pyspark.sql.functions import when

# Se hace un primer filtrado para eliminar los registros que no tienen
# información valiosa para calcular el modelo. Como lo son los registros
# RowNumber, CustomerId, Surname. 

columns_kept = ['CreditScore', 'Gender', 'Age', 'Tenure', 'Geography',
                'Balance', 'NumOfProducts', 'HasCrCard',
                'IsActiveMember', 'EstimatedSalary', 'Exited']

columns_features = ['CreditScore', 'Gender', 'Age', 'Tenure', 'Geography',
                    'Balance', 'NumOfProducts', 'HasCrCard',
                    'IsActiveMember', 'EstimatedSalary']

selected_columns_df = churn_df.select(columns_kept)

# Change Gender to int
selected_columns_df = selected_columns_df.withColumn('Gender',
                                                     when(selected_columns_df.Gender == 'Male', 1)
                                                     .when(selected_columns_df.Gender == 'Female', 0)
                                                     .otherwise(selected_columns_df.Gender))
selected_columns_df = selected_columns_df .withColumn('Gender', selected_columns_df['Gender'].cast(IntegerType()))

# Change Geography to int
selected_columns_df = selected_columns_df.withColumn('Geography',
                                                     when(selected_columns_df.Geography == 'Spain', 0)
                                                     .when(selected_columns_df.Geography == 'France', 1)
                                                     .when(selected_columns_df.Geography == 'Germany', 2)
                                                     .otherwise(selected_columns_df.Geography))
selected_columns_df = selected_columns_df .withColumn('Geography', selected_columns_df['Geography'].cast(IntegerType()))

selected_columns_df.show()

## Gráficos y Estadísticas Descriptivas

In [None]:
# Imprimimos información de los datos para verificar que no hay ningún
# problema con los datos.
selected_columns_df.describe(['CreditScore', 'Gender', 'Age', 'Tenure']).show()
selected_columns_df.describe(['Balance', 'NumOfProducts', 'HasCrCard', 'Geography']).show()
selected_columns_df.describe(['IsActiveMember', 'EstimatedSalary', 'Exited']).show()

## Histogramas

In [None]:
from pyspark_dist_explore import hist
import matplotlib.pyplot as plt

fig, ax = plt.subplots(nrows=6, ncols=2)
fig.set_size_inches(20, 20)

hist(ax[0, 0], selected_columns_df.select('CreditScore'))
ax[0, 0].set_title('CreditScore')

hist(ax[0, 1], selected_columns_df.select('Gender'))
ax[0, 1].set_title('Gender')

hist(ax[1, 0], selected_columns_df.select('Age'))
ax[1, 0].set_title('Age')

hist(ax[1, 1], selected_columns_df.select('Tenure'))
ax[1, 1].set_title('Tenure')

hist(ax[2, 0], selected_columns_df.select('Balance'))
ax[2, 0].set_title('Balance')

hist(ax[2, 1], selected_columns_df.select('NumOfProducts'))
ax[2, 1].set_title('NumOfProducts')

hist(ax[3, 0], selected_columns_df.select('HasCrCard'))
ax[3, 0].set_title('HasCrCard')

hist(ax[3, 1], selected_columns_df.select('IsActiveMember'))
ax[3, 1].set_title('IsActiveMember')

hist(ax[4, 0], selected_columns_df.select('EstimatedSalary'))
ax[4, 0].set_title('EstimatedSalary')

hist(ax[4, 1], selected_columns_df.select('Geography'))
ax[4, 1].set_title('Geography')

hist(ax[5, 0], selected_columns_df.select('Exited'))
ax[5, 0].set_title('Exited')

## Oversampling

Como se observa en el histograma de la variable objetivo, la cantidad de datos no es balanceada y hay muchos mas casos de abandono de cuenta que de no abandono. Por lo que se va a aplicar oversampling para mejorar la balanceación de los datos.

In [None]:
from pyspark.sql.functions import explode, array, lit
from pyspark_dist_explore import hist
import matplotlib.pyplot as plt

selected_columns_df.groupby("Exited").count().show()

major_df = selected_columns_df.filter(selected_columns_df.Exited == 0)
minor_df = selected_columns_df.filter(selected_columns_df.Exited == 1)
ratio = int(major_df.count()/minor_df.count())
print("ratio: {}".format(ratio))

a = range(ratio)

# Duplicate the minority rows
oversampled_df = minor_df.withColumn("dummy", explode(array([lit(x) for x in a]))).drop('dummy')

# Combine both oversampled minority rows and previous majority rows 
combined_df = major_df.unionAll(oversampled_df)

combined_df.groupby("Exited").count().show()

## Matriz de Correlaciones

In [None]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.stat import Correlation
import matplotlib.pyplot as plt
import seaborn as sns

# Para realizar operaciones más detalladas es necesario expresar las filas originales en vectores
assembler = VectorAssembler(
    inputCols=columns_features,
    outputCol='Features')

vector_df = assembler.transform(combined_df)
vector_df = vector_df.select(['Features', 'Exited'])
vector_df.show()

# Con la representación de vectores podemos calcular correlaciones
pearson_matrix = Correlation.corr(vector_df, 'Features').collect()[0][0]
sns.heatmap(pearson_matrix.toArray(), annot=True, fmt=".2f", cmap='RdYlGn')

## Imputacion de valores faltantes

El dataset fue revisando previamente para ver si existen valores faltantes. En este se cuanta con la fortuna de que no existen valores faltantes, por lo que no es necesario realizar ninguna acción. 

## Normalización / Estandarización de Datos

In [None]:
from pyspark.ml.feature import StandardScaler, Normalizer

# standard_normalizer = Normalizer(inputCol='features', outputCol='normFeatures')
# normalize_df = standard_normalizer.transform(vector_df)
# normalize_df.show()

standard_scaler = StandardScaler(inputCol='Features', outputCol='scaledFeatures')
scale_model = standard_scaler.fit(vector_df)

scaled_df = scale_model.transform(vector_df)
scaled_df = scaled_df.select(['scaledFeatures', 'Exited'])
scaled_df.printSchema()
scaled_df.show()

## Escritura a base de datos.

In [None]:
from pyspark.ml.functions import vector_to_array
from pyspark.sql.functions import col


# Covertir vector a columnas
pre_df = (scaled_df.withColumn("scaledFeatures", vector_to_array("scaledFeatures"))).select(["Exited"] + [col("scaledFeatures")[i].alias(columns_features[i]) for i in range(len(columns_features))])
pre_df.printSchema()
pre_df.show()

# Almacenar el conjunto de datos limpio en la base de datos
pre_df \
    .write \
    .format("jdbc") \
    .mode('overwrite') \
    .option("url", "jdbc:postgresql://172.17.0.1:5433/postgres") \
    .option("user", "postgres") \
    .option("password", "testPassword") \
    .option("dbtable", "tarea3") \
    .save()

# Entrenamiento del modelo

In [None]:
# Reading single DataFrame in Spark by retrieving all rows from a DB table.
df = spark \
    .read \
    .format("jdbc") \
    .option("url", "jdbc:postgresql://172.17.0.1:5433/postgres") \
    .option("user", "postgres") \
    .option("password", "testPassword") \
    .option("dbtable", "tarea3") \
    .load()

df.show()

assembler = VectorAssembler(
    inputCols=columns_features,
    outputCol='Features')

vector_df = assembler.transform(selected_columns_df)
vector_df = vector_df.select(['Features', 'Exited'])
vector_df.show()

## Dividir dataset en conjunto de entrenamiento y prueba

In [None]:
# Split the data into training and test sets (70 % training, 30 % test)
training_df, test_df = scaled_df.randomSplit([0.7, 0.3])

# Imprimir tamano de los conjuntos de datos
print(scaled_df.count())
print(training_df.count())
print(test_df.count())

## Uso de protocolo K-fold cross validation

In [None]:
# Codigo tomado de https://stackoverflow.com/questions/53600615/cross-validation-metrics-with-pyspark
from pyspark.ml.tuning import CrossValidator, CrossValidatorModel
from pyspark.mllib.evaluation import BinaryClassificationMetrics
from pyspark.sql.functions import rand
import numpy as np
import collections

TestResult = collections.namedtuple("TestResult", ["params", "metrics"])

class CrossValidatorVerbose(CrossValidator):

    def _fit(self, dataset):
        folds = []
        est = self.getOrDefault(self.estimator)
        epm = self.getOrDefault(self.estimatorParamMaps)
        numModels = len(epm)

        eva = self.getOrDefault(self.evaluator)
        metricName = eva.getMetricName()
        nFolds = self.getOrDefault(self.numFolds)
        seed = self.getOrDefault(self.seed)
        h = 1.0 / nFolds

        randCol = self.uid + "_rand"
        df = dataset.select("*", rand(seed).alias(randCol))
        metrics = [0.0] * numModels

        for i in range(nFolds):
            folds.append([])
            foldNum = i + 1
            print("Comparing models on fold %d" % foldNum)

            validateLB = i * h
            validateUB = (i + 1) * h
            condition = (df[randCol] >= validateLB) & (df[randCol] < validateUB)
            validation = df.filter(condition)
            train = df.filter(~condition)

            for j in range(numModels):
                paramMap = epm[j]
                model = est.fit(train, paramMap)
                # TODO: duplicate evaluator to take extra params from input
                prediction = model.transform(validation, paramMap)
                metric = eva.evaluate(prediction)
                metrics[j] += metric

                avgSoFar = metrics[j] / foldNum
                print("params: %s\t%s: %f\tavg: %f" % (
                    {param.name: val for (param, val) in paramMap.items()},
                    metricName, metric, avgSoFar))
                
                predictionLabels = prediction.select("prediction", "Exited")
                allMetrics = BinaryClassificationMetrics(predictionLabels.rdd)
                folds[i].append(TestResult(paramMap.items(), allMetrics))
                

        if eva.isLargerBetter():
            bestIndex = np.argmax(metrics)
        else:
            bestIndex = np.argmin(metrics)

        bestParams = epm[bestIndex]
        bestModel = est.fit(dataset, bestParams)
        avgMetrics = [m / nFolds for m in metrics]
        bestAvg = avgMetrics[bestIndex]
        print("Best model:\nparams: %s\t%s: %f" % (
            {param.name: val for (param, val) in bestParams.items()},
            metricName, bestAvg))

        return self._copyValues(CrossValidatorModel(bestModel, avgMetrics)), folds

## Modelo 1: Regresión Logística 

In [None]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.tuning import ParamGridBuilder


df = training_df.select('scaledFeatures', 'Exited')
df.show()

lr = LogisticRegression(featuresCol='scaledFeatures', labelCol='Exited', maxIter=10)
grid = ParamGridBuilder().addGrid(param=lr.maxIter, values=[10]).build()
# lr_model = lr.fit(df)

# Implementa k-Folds e imprime informacion por cada iteracion
cv = CrossValidatorVerbose(estimator=lr,
                           estimatorParamMaps=grid,
                           evaluator=BinaryClassificationEvaluator(labelCol='Exited'),
                           numFolds=5)

cvlr_model, lr_folds = cv.fit(df)

## Modelo 2: Random Forest

In [None]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.tuning import ParamGridBuilder

df = training_df.select('scaledFeatures', 'Exited')
df.show()

rf = RandomForestClassifier(featuresCol='scaledFeatures', labelCol='Exited', maxDepth=4)
grid = ParamGridBuilder().addGrid(rf.maxDepth, values=[4]).build()

# Implementa k-Folds e imprime informacion por cada iteracion
cv = CrossValidatorVerbose(estimator=rf,
                           estimatorParamMaps=grid,
                           evaluator=BinaryClassificationEvaluator(labelCol='Exited'),
                           numFolds=5)

cvrf_model, rf_folds = cv.fit(df)

## Model 3: Decision Tree

In [None]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.tuning import ParamGridBuilder

df = training_df.select('scaledFeatures', 'Exited')
df.show()

dt = DecisionTreeClassifier(featuresCol='scaledFeatures', labelCol='Exited', maxDepth = 4)
grid = ParamGridBuilder().addGrid(dt.maxDepth, values=[4]).build()

# Implementa k-Folds e imprime informacion por cada iteracion
cv = CrossValidatorVerbose(estimator=dt,
                           estimatorParamMaps=grid,
                           evaluator=BinaryClassificationEvaluator(labelCol='Exited'),
                           numFolds=5)

cvdt_model, dt_folds = cv.fit(df)

# Evaluación del conjunto de validación

## Evaluación y almacenado de modelo 1

In [None]:
from pyspark.mllib.evaluation import BinaryClassificationMetrics
from pyspark.ml.evaluation import BinaryClassificationEvaluator
import matplotlib.pyplot as plt
import numpy as np

# cv = CrossValidator(estimator=lr,
#                            estimatorParamMaps=grid,
#                            evaluator=BinaryClassificationEvaluator(labelCol='Exited'),
#                            numFolds=5)

# print(cvlr_model.subModels)

# print(cvlr_model.avgMetrics[0])

# print(cvlr_model.extractParamMap())


# # Make prediction
# predictionAndTarget = predictions.select("Exited", "prediction")

# # Create both evaluators
# metrics_binary = BinaryClassificationMetrics(predictionAndTarget.rdd.map(tuple))
# acc = metrics_binary.accuracy
# f1 = metrics_binary.fMeasure(1.0)
# precision = metrics_binary.precision(1.0)
# recall = metrics_binary.recall(1.0)
# auc = metrics_binary.areaUnderROC

# print('Accuracy:', acc)
# print('F1:', f1)
# print('Precision:', precision)
# print('Recall:', recall)
# print('Area Under ROC:', auc)

# print('Pesos: {}\n b: {}'.format(lr_model.coefficients, lr_model.intercept))

# print('RMSE: {} r2: {}'.format(
#     lr_model.summary.rootMeanSquaredError,
#     lr_model.summary.r2))

# scaled_df.describe().show()
# predictions = lr_model.transform(test_df)


# from pyspark.ml.evaluation import BinaryClassificationEvaluator

# evaluator = BinaryClassificationEvaluator(rawPredictionCol='prediction', labelCol='Exited')

# Coefficients 
# beta = np.sort(cvlr_model.coefficients)
# plt.plot(beta)
# plt.ylabel('Beta Coefficients')
# plt.show()

# # Area under ROC
# trainingSummary = cvlr_model.summary
# roc = trainingSummary.roc.toPandas()
# plt.plot(roc['FPR'],roc['TPR'])
# plt.ylabel('False Positive Rate')
# plt.xlabel('True Positive Rate')
# plt.title('ROC Curve')
# plt.show()
# print('Training set areaUnderROC: ' + str(trainingSummary.areaUnderROC))

# # Precision and recall.
# pr = trainingSummary.pr.toPandas()
# plt.plot(pr['recall'],pr['precision'])
# plt.ylabel('Precision')
# plt.xlabel('Recall')
# plt.show()

# Make predictions on the test set
predictions = cvlr_model.transform(test_df.select('scaledFeatures', 'Exited'))
predictions.show()

# Evaluate the model
evaluator = BinaryClassificationEvaluator(labelCol='Exited')
print('Test Area Under ROC', evaluator.evaluate(predictions))

## Evaluación y almacenado de modelo 2

In [None]:
# Guardar en base de datos

## Evaluación y almacenado de modelo 3

In [None]:
# Guardar en base de datos

## Analisis de resultados

In [None]:
# PORQUE DIO BIEN? 