# Predicting Santander Customer Satisfaction

In [None]:
# Python Language Version
from platform import python_version
print('Python Language Version used in this Jupyter Notebook:', python_version())

In [None]:
# Import findspark and initialize
import findspark
findspark.init()

In [None]:
# Imports
import numpy as np
import pyspark
from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.types import * 
from sklearn.metrics import confusion_matrix
from pyspark.sql.functions import *
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import MinMaxScaler
from pyspark.ml.classification import *
from pyspark.ml.stat import Correlation
from pyspark.ml.regression import *
from pyspark.ml.evaluation import *
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder# Versions of packages used in this notebook jupyter

In [None]:
%reload_ext watermark
%watermark -a "Matheus Francelino Barbosa" --iversions

## Preparing the Spark Environment

In [None]:
# Create Spark Context
sc = SparkContext(appName = "Mini-Projeto5")

In [None]:
sc.setLogLevel("ERROR")

In [None]:
# Create the session
spark = SparkSession.builder.getOrCreate()

In [None]:
spark

## Loading the Dataset

In [None]:
# Loading the Dataset
dados = spark.read.csv('dados/train.csv', inferSchema = True, header = True)

In [None]:
type(dados)

In [None]:
dados.count()

In [None]:
dados.show(10)

In [None]:
# View the data in the Pandas format
dados.limit(10).toPandas()

In [None]:
print((dados.count(), len(dados.columns)))

In [None]:
# Schema
dados.printSchema()

In [None]:
#Remove the column ID
dados = dados.drop('ID')

In [None]:
# Schema
dados.printSchema()

In [None]:
colunas_dados = dados.columns[:-1]

In [None]:
colunas_test = ['var3','var15']

In [None]:
colunas_test

In [None]:
#Testing to change the data type from Integer to Double
for colunas in colunas_test:
    if str(dados.schema[colunas].dataType) == 'IntegerType()':
        dados_new = dados.withColumn(colunas,dados[colunas].cast(DoubleType()))
        print((colunas,dados_new.schema[colunas].dataType))


In [None]:
#Applying the change from Integer to Double
for colunas in colunas_dados:
    
    #if str(dados.schema[colunas].dataType) == 'DoubleType()':
    dados = dados.withColumn(colunas,dados[colunas].cast(FloatType()))
    #print((colunas,dados.schema[colunas].dataType))


In [None]:
# Schema
dados.printSchema()

In [None]:
# View the data in the Pandas format
dados.limit(10).toPandas()

# Preparing the data

### Checking for Null Data

In [None]:
# Separate the missing data (if any) and remove it (if any)
dados_com_linhas_removidas = dados.na.drop()
print('Number of rows before removing missing values:', dados.count())
print('Number of rows after removing missing values:', dados_com_linhas_removidas.count())

In [None]:
# Data preparation function
def func_modulo_prep_dados(df,
                           variaveis_entrada,
                           variavel_saida,
                           tratar_outliers = True,
                           padronizar_dados = True):

    # Vamos gerar um novo dataframe, renomeando o argumento que representa a variável de saída.
    novo_df = df.withColumnRenamed(variavel_saida, 'label')
    
    # Convertemos a variável alvo para o tipo numérico como float (encoding)
    if str(novo_df.schema['label'].dataType) != 'IntegerType':
        novo_df = novo_df.withColumn("label", novo_df["label"].cast(FloatType()))
    
    # Listas de controle para as variáveis
    variaveis_numericas = []
    variaveis_categoricas = []
    
    # Se tiver variáveis de entrada do tipo string, convertemos para o tipo numérico
    for coluna in variaveis_entrada:
        
        # Verifica se a variável é do tipo string
        if str(novo_df.schema[coluna].dataType) == 'StringType':
            
            # Definimos a variável com um sufixo
            novo_nome_coluna = coluna + "_num"
            
            # Adicionamos à lista de variáveis categóricas
            variaveis_categoricas.append(novo_nome_coluna)          
            
        else:
            
            # Se não for variável do tipo string, então é numérica e adicionamos na lista correspondente
            variaveis_numericas.append(coluna)
            
            # Colocamos os dados no dataframe de variáveis indexadas
            df_indexed = novo_df
            
    # Se o dataframe tiver dados do tipo string, aplicamos a indexação
    # Verificamos se a lista de variáveis categóricas não está vazia
    if len(variaveis_categoricas) != 0: 
        
        # Loop pelas colunas
        for coluna in novo_df:
            
            # Se a variável é do tipo string, criamos, treinamos e aplicamos o indexador
            if str(novo_df.schema[coluna].dataType) == 'StringType':
                
                # Cria o indexador
                indexer = StringIndexer(inputCol = coluna, outputCol = coluna + "_num") 
                
                # Treina e aplica o indexador
                df_indexed = indexer.fit(novo_df).transform(novo_df)
    else:
        
        # Se não temos mais variáveis categóricas, então colocamos os dados no dataframe de variáveis indexadas
        df_indexed = novo_df
        
    # Se for necessário tratar outliers, faremos isso agora
    if tratar_outliers == True:
        print("\nAplicando o tratamento de outliers...")
        
        # Dicionário
        d = {}
        
        # Dicionário de quartis das variáveis do dataframe indexado (somente variáveis numéricas)
        for col in variaveis_numericas: 
            d[col] = df_indexed.approxQuantile(col,[0.01, 0.99], 0.25) 
        
        # Agora aplicamos transformação dependendo da distribuição de cada variável
        for col in variaveis_numericas:
            
            # Extraímos a assimetria dos dados e usamos isso para tratar os outliers
            skew = df_indexed.agg(skewness(df_indexed[col])).collect() 
            skew = skew[0][0]
            
            # Verificamos a assimetria e então aplicamos:
            
            # Transformação de log + 1 se a assimetria for positiva
            if skew > 1:
                indexed = df_indexed.withColumn(col, log(when(df[col] < d[col][0], d[col][0])\
                .when(df_indexed[col] > d[col][1], d[col][1])\
                .otherwise(df_indexed[col] ) + 1).alias(col))
                print("\nA variável " + col + " foi tratada para assimetria positiva (direita) com skew =", skew)
            
            # Transformação exponencial se a assimetria for negativa
            else:
            #elif skew < -1:
                indexed = df_indexed.withColumn(col, \
                exp(when(df[col] < d[col][0], d[col][0])\
                .when(df_indexed[col] > d[col][1], d[col][1])\
                .otherwise(df_indexed[col] )).alias(col))
                print("\nA variável " + col + " foi tratada para assimetria negativa (esquerda) com skew =", skew)
                
            # Assimetria entre -1 e 1 não precisamos aplicar transformação aos dados

    # Vetorização
    
    # Lista final de atributos
    lista_atributos = variaveis_numericas + variaveis_categoricas
    
    # Cria o vetorizador para os atributos
    vetorizador = VectorAssembler(inputCols = lista_atributos, outputCol = 'features')
    
    # Aplica o vetorizador ao conjunto de dados
    dados_vetorizados = vetorizador.transform(df_indexed).select('features', 'label')
    
    # Se a flag padronizar_dados está como True, então padronizamos os dados colocando-os na mesma escala
    if padronizar_dados == True:
        print("\nPadronizando o conjunto de dados para o intervalo de 0 a 1...")
        
        # Cria o scaler
        scaler = MinMaxScaler(inputCol = "features", outputCol = "scaledFeatures")

        # Calcula o sumário de estatísticas e gera o padronizador
        global scalerModel
        scalerModel = scaler.fit(dados_vetorizados)

        # Padroniza as variáveis para o intervalo [min, max]
        dados_padronizados = scalerModel.transform(dados_vetorizados)
        
        # Gera os dados finais
        dados_finais = dados_padronizados.select('label', 'scaledFeatures')
        
        # Renomeia as colunas (requerido pelo Spark)
        dados_finais = dados_finais.withColumnRenamed('scaledFeatures', 'features')
        
        print("\nProcesso Concluído!")

    # Se a flag está como False, então não padronizamos os dados
    else:
        print("\nOs dados não serão padronizados pois a flag padronizar_dados está com o valor False.")
        dados_finais = dados_vetorizados
    
    return dados_finais

# developed by Data Science Academy

In [None]:
# List of input variables (all but the last)
variaveis_entrada = dados.columns[:-1] 

In [None]:
# Target Variable
variavel_saida = dados.columns[-1] 

In [None]:
# Apply the function
dados_finais = func_modulo_prep_dados(dados, variaveis_entrada, variavel_saida, False, True)

In [None]:
# Visualize
dados_finais.show(10, truncate = False)

## Checking the Correlation

In [None]:
# Extract the correlation
coeficientes_corr = Correlation.corr(dados_finais, 'features', 'pearson').collect()[0][0]

In [None]:
# Convert the result into an array
array_corr = coeficientes_corr.toArray()

In [None]:
array_corr

In [None]:
# List the correlation between the attributes and the target variable
for item in array_corr:
    print(item[7])

In [None]:
len(array_corr)

In [None]:
array_test = array_corr

In [None]:
len(array_test)

In [None]:
for item in array_test:
    if 1 in array_test:
        print(item)

In [None]:
if 1 in array_test: print('existe')

## Preparing the data

In [None]:
dados_finais.toPandas()

In [None]:
dados_finais.printSchema()

## Split into Training and Test Data

In [None]:
# 70/30 ratio split
dados_treino, dados_teste = dados_finais.randomSplit([0.7,0.3])
print("Training Data Count:" + str(dados_treino.count()))
print("Test Data Count:" + str(dados_teste.count()))

In [None]:
dados_treino.toPandas()

## Balanced classes

In [None]:
dados_treino.filter(col("label") == 1).count()

In [None]:
dados_treino.filter(col("label") == 0).count()

In [None]:
df_treino = dados_treino.toPandas()
df_treino['label'].value_counts()

In [None]:
minor_df = dados_treino.filter(col("label") == 1)
major_df = dados_treino.filter(col("label") == 0)
ratio = int(major_df.count()/minor_df.count())
print("ratio: {}".format(ratio))

In [None]:
a = range(ratio)

In [None]:
oversampled_df = minor_df.withColumn("dummy", explode(array([lit(x) for x in a]))).drop('dummy')

In [None]:
dados_treino = major_df.unionAll(oversampled_df)
dados_treino.show()

In [None]:
dados_treino.filter(col("label") == 1).count()

In [None]:
dados_treino.filter(col("label") == 0).count()

# Applying Logistic Regression

In [None]:
# Set the parameters
lr = LogisticRegression(featuresCol = 'features', labelCol = 'label', maxIter=5)

In [None]:
# Train the model
modelo1 = lr.fit(dados_treino)

In [None]:
# Predictions
predictions = modelo1.transform(dados_teste)

In [None]:
predictions.select('label', 'features', 'rawPrediction', 'prediction', 'probability').toPandas().head(5)

In [None]:
evaluator = BinaryClassificationEvaluator()

In [None]:
print('Test Are Under ROC', evaluator.evaluate(predictions))

In [None]:
# Accuracy
accuracy = predictions.filter(predictions.label == predictions.prediction).count() / float(predictions.count())
print("Accuracy : ",accuracy)

In [None]:
import matplotlib.pyplot as plt
import numpy as np

beta = np.sort(modelo1.coefficients)
plt.plot(beta)
plt.ylabel('Beta Coefficients')
plt.show()

In [None]:
trainingSummary = modelo1.summary
roc = trainingSummary.roc.toPandas()
plt.plot(roc['FPR'],roc['TPR'])
plt.ylabel('False Positive Rate')
plt.xlabel('True Positive Rate')
plt.title('ROC Curve')
plt.show()
print('Training set areaUnderROC: ' + str(trainingSummary.areaUnderROC))

In [None]:
pr = trainingSummary.pr.toPandas()
plt.plot(pr['recall'],pr['precision'])
plt.ylabel('Precision')
plt.xlabel('Recall')
plt.show()

## Improving the previous model by setting the parameters

In [None]:
# Parameters
paramGrid = (ParamGridBuilder()
            .addGrid(lr.regParam, [0.01, 0.5, 2.0])
            .addGrid(lr.elasticNetParam, [0.0, 0.5, 1.0])
            .addGrid(lr.maxIter, [1,5,10])
            .build())

In [None]:
cv = CrossValidator(estimator=lr, 
                    estimatorParamMaps=paramGrid, 
                    evaluator=evaluator,
                    numFolds=5)

In [None]:
# Train the model
cvModel = cv.fit(dados_treino)

In [None]:
#Predictions
predictions = cvModel.transform(dados_teste)

In [None]:
print('Test Are Under ROC', evaluator.evaluate(predictions))

In [None]:
# Accuracy
accuracy = predictions.filter(predictions.label == predictions.prediction).count() / float(predictions.count())
print("Accuracy : ",accuracy)

In [None]:
# Preparing the confusion matrix
y_pred=predictions.select("prediction").collect()
y_orig=predictions.select("label").collect()

In [None]:
cm = confusion_matrix(y_orig, y_pred)
print("Confusion Matrix:")
print(cm) 

## Checking the best parameters

In [None]:
weights = cvModel.bestModel.coefficients
weights = [(float(w),) for w in weights]
weightsDF = spark.createDataFrame(weights, ["Feature Weight"])
weightsDF.toPandas().head(10)

In [None]:
best_model = cvModel.bestModel

In [None]:
print('Best Param (regParam): ', best_model._java_obj.getRegParam())
print('Best Param (MaxIter): ', best_model._java_obj.getMaxIter())
print('Best Param (elasticNetParam): ', best_model._java_obj.getElasticNetParam())

## Random Forest Classifier

In [None]:
# Set the parameters
rf = RandomForestClassifier(labelCol="label", featuresCol="features")

In [None]:
from pyspark.ml import Pipeline

In [None]:
# Train the model
modelo2 = rf.fit(dados_treino)

In [None]:
# Predictions
predictionsRF = modelo2.transform(dados_teste)

In [None]:
print('Test Are Under ROC', evaluator.evaluate(predictionsRF))

In [None]:
# Accuracy
accuracy = predictionsRF.filter(predictionsRF.label == predictionsRF.prediction).count() / float(predictionsRF.count())
print("Accuracy : ",accuracy)

In [None]:
# Preparing the confusion matrix
y_pred=predictionsRF.select("prediction").collect()
y_orig=predictionsRF.select("label").collect()

In [None]:
cm = confusion_matrix(y_orig, y_pred)
print("Confusion Matrix:")
print(cm) 

## Naive Bayes Classifier

In [None]:
# Set the parameters
nb = NaiveBayes(smoothing=1.0, modelType="multinomial")

In [None]:
# Train the model
modelo3 = nb.fit(dados_treino)

In [None]:
# Predictions
predictionsNB = modelo3.transform(dados_teste)

In [None]:
predictionsNB.show(5, True)

In [None]:
print('Test Are Under ROC', evaluator.evaluate(predictionsNB))

In [None]:
# Accuracy
accuracy = predictionsNB.filter(predictionsNB.label == predictionsNB.prediction).count() / float(predictionsNB.count())
print("Accuracy : ",accuracy)

In [None]:
# Preparing the confusion matrix
y_pred=predictionsNB.select("prediction").collect()
y_orig=predictionsNB.select("label").collect()

In [None]:
cm = confusion_matrix(y_orig, y_pred)
print("Confusion Matrix:")
print(cm) 

## GBT Classifier

In [None]:
# Set the parameters
gbt = GBTClassifier(maxIter=5, maxDepth=2, labelCol="label", seed=42,
    leafCol="leafId")

In [None]:
# Train the model
modelo4 = gbt.fit(dados_treino)

In [None]:
# Predictions
predictionsGBT = modelo4.transform(dados_teste)

In [None]:
predictionsGBT.show(3) 

In [None]:
evaluator=MulticlassClassificationEvaluator(metricName="accuracy")

In [None]:
print('Test Are Under ROC', evaluator.evaluate(predictionsGBT))

In [None]:
# Accuracy
acc = evaluator.evaluate(predictionsGBT)
print("Prediction Accuracy: ", acc)

In [None]:
# Preparing the confusion matrix
y_pred=predictionsGBT.select("prediction").collect()
y_orig=predictionsGBT.select("label").collect()

In [None]:
cm = confusion_matrix(y_orig, y_pred)
print("Confusion Matrix:")
print(cm) 

In [None]:
sc.stop() 