## Santander Customer Satisfaction

Autor: **Matheus Jericó Palhares** <br>
**LinkedIn:** https://linkedin.com/in/matheusjerico <br>
**Github**: https://github.com/matheusjerico/

#### 1. Importando bibliotecas

In [1]:
from pyspark.sql import Row
from pyspark.ml.feature import VectorAssembler, PCA, StringIndexer
from pyspark.sql.types import FloatType
from pyspark.ml.classification import GBTClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
import numpy as np

#### 2. Criando session

In [2]:
spSession = SparkSession.builder.master("local").appName("SparkMLlib").getOrCreate()

#### 3. Importando dados

In [3]:
trainFile = spSession.read.csv('santander-customer-satisfaction/train.csv', header= "true", sep=",")
testFile = spSession.read.csv('santander-customer-satisfaction/test.csv', header= "true", sep=",")

In [4]:
print("Formato dos dados de treino: {}\nFormato dos dados de teste: {}".format(type(trainFile), type(testFile)))

Formato dos dados de treino: <class 'pyspark.sql.dataframe.DataFrame'>
Formato dos dados de teste: <class 'pyspark.sql.dataframe.DataFrame'>


In [5]:
print("Quantidade de linhas no arquivo de Treino: {}\n\
Quantidade de linhas no arquivo de Teste: {}".format(trainFile.count(), testFile.count()))

Quantidade de linhas no arquivo de Treino: 76020
Quantidade de linhas no arquivo de Teste: 75818


#### 4. Visualizando Schema e conteúdo

In [6]:
trainFile.printSchema()

root
 |-- ID: string (nullable = true)
 |-- var3: string (nullable = true)
 |-- var15: string (nullable = true)
 |-- imp_ent_var16_ult1: string (nullable = true)
 |-- imp_op_var39_comer_ult1: string (nullable = true)
 |-- imp_op_var39_comer_ult3: string (nullable = true)
 |-- imp_op_var40_comer_ult1: string (nullable = true)
 |-- imp_op_var40_comer_ult3: string (nullable = true)
 |-- imp_op_var40_efect_ult1: string (nullable = true)
 |-- imp_op_var40_efect_ult3: string (nullable = true)
 |-- imp_op_var40_ult1: string (nullable = true)
 |-- imp_op_var41_comer_ult1: string (nullable = true)
 |-- imp_op_var41_comer_ult3: string (nullable = true)
 |-- imp_op_var41_efect_ult1: string (nullable = true)
 |-- imp_op_var41_efect_ult3: string (nullable = true)
 |-- imp_op_var41_ult1: string (nullable = true)
 |-- imp_op_var39_efect_ult1: string (nullable = true)
 |-- imp_op_var39_efect_ult3: string (nullable = true)
 |-- imp_op_var39_ult1: string (nullable = true)
 |-- imp_sal_var16_ult1: string

In [7]:
trainFile.toPandas().head()

Unnamed: 0,ID,var3,var15,imp_ent_var16_ult1,imp_op_var39_comer_ult1,imp_op_var39_comer_ult3,imp_op_var40_comer_ult1,imp_op_var40_comer_ult3,imp_op_var40_efect_ult1,imp_op_var40_efect_ult3,...,saldo_medio_var33_hace2,saldo_medio_var33_hace3,saldo_medio_var33_ult1,saldo_medio_var33_ult3,saldo_medio_var44_hace2,saldo_medio_var44_hace3,saldo_medio_var44_ult1,saldo_medio_var44_ult3,var38,TARGET
0,1,2,23,0,0,0,0,0,0,0,...,0,0,0,0,0,0,0,0,39205.17,0
1,3,2,34,0,0,0,0,0,0,0,...,0,0,0,0,0,0,0,0,49278.03,0
2,4,2,23,0,0,0,0,0,0,0,...,0,0,0,0,0,0,0,0,67333.77,0
3,8,2,37,0,195,195,0,0,0,0,...,0,0,0,0,0,0,0,0,64007.97,0
4,10,2,39,0,0,0,0,0,0,0,...,0,0,0,0,0,0,0,0,117310.979016494,0


#### 5. Transforma todas as colunas para float
- Todas as colunas estão em formato 'string' e são valores numéricos;
- Transaformando todas as colunas em valores em formato 'float'

In [8]:
for c in trainFile.columns:
    # add condition for the cols to be type cast
    trainFile=trainFile.withColumn(c, trainFile[c].cast(FloatType()))
    
for c in testFile.columns:
    # add condition for the cols to be type cast
    testFile=testFile.withColumn(c, testFile[c].cast(FloatType()))

In [9]:
trainFile.printSchema()

root
 |-- ID: float (nullable = true)
 |-- var3: float (nullable = true)
 |-- var15: float (nullable = true)
 |-- imp_ent_var16_ult1: float (nullable = true)
 |-- imp_op_var39_comer_ult1: float (nullable = true)
 |-- imp_op_var39_comer_ult3: float (nullable = true)
 |-- imp_op_var40_comer_ult1: float (nullable = true)
 |-- imp_op_var40_comer_ult3: float (nullable = true)
 |-- imp_op_var40_efect_ult1: float (nullable = true)
 |-- imp_op_var40_efect_ult3: float (nullable = true)
 |-- imp_op_var40_ult1: float (nullable = true)
 |-- imp_op_var41_comer_ult1: float (nullable = true)
 |-- imp_op_var41_comer_ult3: float (nullable = true)
 |-- imp_op_var41_efect_ult1: float (nullable = true)
 |-- imp_op_var41_efect_ult3: float (nullable = true)
 |-- imp_op_var41_ult1: float (nullable = true)
 |-- imp_op_var39_efect_ult1: float (nullable = true)
 |-- imp_op_var39_efect_ult3: float (nullable = true)
 |-- imp_op_var39_ult1: float (nullable = true)
 |-- imp_sal_var16_ult1: float (nullable = true)
 

#### 4. Dropando colunas com valores constantes

In [18]:
def dropConstantColumns(dataset):
    dataset_df = dataset.toPandas()
    remove = []
    for col in dataset_df.columns:
        if dataset_df[col].std() == 0:
            remove.append(col)
    return remove 

In [19]:
print("Quantidade de colunas no dataframe trainFile: {}".format(len(trainFile.columns)))
columnsConstantDrop = dropConstantColumns(trainFile)
print("Quantidade de colunas com valores constantes: {}".format(len(columnsConstantDrop)))
trainFile2 = trainFile.drop(*columnsConstantDrop)
print("Quantidade de colunas no dataframe: {}".format(len(trainFile2.columns)))

Quantidade de colunas no dataframe trainFile: 371
Quantidade de colunas com valores constantes: 34
Quantidade de colunas no dataframe: 337


In [20]:
print("Quantidade de colunas no dataframe testFile: {}".format(len(testFile.columns)))
columnsConstantDropTest = dropConstantColumns(testFile)
print("Quantidade de colunas com valores constantes: {}".format(len(columnsConstantDropTest)))
testFile2 = testFile.drop(*columnsConstantDropTest)
print("Quantidade de colunas no dataframe: {}".format(len(testFile2.columns)))

Quantidade de colunas no dataframe testFile: 370
Quantidade de colunas com valores constantes: 45
Quantidade de colunas no dataframe: 325


#### 5. Dropando colunas duplicadas

In [21]:
def dropDuplicatedColumns(dataset):
    remove = []
    dataset_df = dataset.toPandas()
    cols = dataset_df.columns
    for i in range(len(cols)-1):
        v = dataset_df[cols[i]].values
        for j in range(i+1,len(cols)):
            if np.array_equal(v,dataset_df[cols[j]].values):
                remove.append(cols[j])

    return remove

In [22]:
print("Quantidade de colunas no dataframe trainFile2: {}".format(len(trainFile2.columns)))
columnsDuplicatedDrop = dropDuplicatedColumns(trainFile2)
print("Quantidade de colunas com valores constantes: {}".format(len(columnsDuplicatedDrop)))
trainFile3 = trainFile2.drop(*columnsDuplicatedDrop)
print("Quantidade de colunas no dataframe: {}".format(len(trainFile3.columns)))

Quantidade de colunas no dataframe trainFile2: 337
Quantidade de colunas com valores constantes: 29
Quantidade de colunas no dataframe: 308


In [23]:
print("Quantidade de colunas no dataframe testFile2: {}".format(len(testFile2.columns)))
columnsDuplicatedDropTest = dropDuplicatedColumns(testFile2)
print("Quantidade de colunas com valores constantes: {}".format(len(columnsDuplicatedDropTest)))
testFile3 = testFile2.drop(*columnsDuplicatedDropTest)
print("Quantidade de colunas no dataframe: {}".format(len(testFile3.columns)))

Quantidade de colunas no dataframe testFile2: 325
Quantidade de colunas com valores constantes: 25
Quantidade de colunas no dataframe: 300


#### 6. Vetorizando colunas

In [24]:
ignore = ['ID','TARGET']

In [25]:
assemblerTrain = VectorAssembler(
    inputCols=[x for x in trainFile3.columns if x not in ignore],
    outputCol='features')

assemblerTest = VectorAssembler(
    inputCols=[x for x in testFile3.columns if x not in ignore],
    outputCol='features')

In [26]:
outputTrain = assemblerTrain.transform(trainFile3)
outputTest = assemblerTest.transform(testFile3)

In [28]:
outputTrain.select("TARGET").show(n=5, truncate=False)

+------+
|TARGET|
+------+
|0.0   |
|0.0   |
|0.0   |
|0.0   |
|0.0   |
+------+
only showing top 5 rows



In [29]:
outputTrain.count()

76020

In [30]:
outputTest.count()

75818

In [32]:
type(outputTrain)

pyspark.sql.dataframe.DataFrame

In [33]:
type(outputTest)

pyspark.sql.dataframe.DataFrame

#### 6. Aplicando PCA

In [36]:
pcaTrain = PCA (k = 30, inputCol = "features", outputCol="pcaFeatures")
modelPcaTrain = pcaTrain.fit(outputTrain)

In [38]:
pcaTest = PCA (k = 30, inputCol = "features", outputCol="pcaFeatures")
modelPcaTest = pcaTest.fit(outputTest)

In [44]:
pcaResultTrain = modelPcaTrain.transform(outputTrain).select("ID","TARGET","pcaFeatures")
pcaResultTest = modelPcaTest.transform(outputTest).select("ID","pcaFeatures")

In [45]:
pcaResultTrain.show(n=10, truncate = True)

+----+------+--------------------+
|  ID|TARGET|         pcaFeatures|
+----+------+--------------------+
| 1.0|   0.0|[-0.0422299797062...|
| 3.0|   0.0|[1.35173020001295...|
| 4.0|   0.0|[-0.0725658032555...|
| 8.0|   0.0|[-0.0701296192800...|
|10.0|   0.0|[-2.6084567907153...|
|13.0|   0.0|[-0.0947634097106...|
|14.0|   0.0|[-0.1023202671769...|
|18.0|   0.0|[-0.2710920414729...|
|20.0|   0.0|[-0.1098288071135...|
|23.0|   0.0|[-0.3840035874287...|
+----+------+--------------------+
only showing top 10 rows



In [46]:
pcaResultTest.show(n=10, truncate = True)

+----+--------------------+
|  ID|         pcaFeatures|
+----+--------------------+
| 2.0|[-0.0182111955226...|
| 5.0|[-0.0203952835288...|
| 6.0|[-0.0215552984755...|
| 7.0|[-0.0841040804868...|
| 9.0|[-0.0333051237527...|
|11.0|[-0.0238352788494...|
|12.0|[-0.1243250978108...|
|15.0|[-0.0209919138643...|
|16.0|[-0.0526590810499...|
|17.0|[-0.0309984712096...|
+----+--------------------+
only showing top 10 rows



#### 7. Indexando coluna target no dataset de treino, e id no dataset de teste

In [56]:
stringIndexexTrain = StringIndexer(inputCol="TARGET", outputCol="indexed")
si_model = stringIndexexTrain.fit(pcaResultTrain)
obj_final = si_model.transform(pcaResultTrain)
obj_final.collect()

[Row(ID=1.0, TARGET=0.0, pcaFeatures=DenseVector([-0.0422, 0.0741, -0.0238, -0.2254, -0.0501, -0.1707, 0.1251, 0.0687, 0.163, 0.9591, 0.0939, -0.6194, 0.4298, 0.2569, -39194.1454, 819.366, 389.6893, 69.3574, 3.3296, 83.1479, 22.438, 105.395, -126.1726, 7.7987, -24.9073, 10.0067, -16.0238, 18.1941, 3.5778, 15.5407]), indexed=0.0),
 Row(ID=3.0, TARGET=0.0, pcaFeatures=DenseVector([1.3517, 0.0967, -0.037, -0.2865, -0.0913, -0.211, 0.1587, 0.0784, 0.207, 1.2056, 0.118, -0.777, 0.539, 0.3143, -49276.0002, 797.0783, -233.2658, 84.4365, 0.0516, 194.3764, 55.9989, 274.0327, -59.2885, 4.6418, -37.0335, 12.6069, -6.314, 23.1281, 0.887, -3.9234]), indexed=0.0),
 Row(ID=4.0, TARGET=0.0, pcaFeatures=DenseVector([-0.0726, 0.1273, -0.041, -0.3872, -0.0861, -0.2933, 0.2149, 0.1179, 0.2801, 1.6472, 0.1612, -1.0639, 0.7381, 0.441, -67314.9292, 1404.4785, 668.5296, 118.9974, 4.2901, 142.0598, 37.059, 176.8498, -212.2485, 13.0751, -41.9753, 16.749, -27.3271, 31.2448, 6.197, 26.7596]), indexed=0.0),
 Row(I

#### 8. Separando o dataset de treino em dados de treino e validação

In [57]:
(dados_treino, dados_val) = obj_final.randomSplit([0.7, 0.3])

In [58]:
dados_treino.count()

52993

In [59]:
dados_val.count()

23027

In [61]:
dados_treino.show(5)

+----+------+--------------------+-------+
|  ID|TARGET|         pcaFeatures|indexed|
+----+------+--------------------+-------+
| 4.0|   0.0|[-0.0725658032555...|    0.0|
| 8.0|   0.0|[-0.0701296192800...|    0.0|
|13.0|   0.0|[-0.0947634097106...|    0.0|
|18.0|   0.0|[-0.2710920414729...|    0.0|
|23.0|   0.0|[-0.3840035874287...|    0.0|
+----+------+--------------------+-------+
only showing top 5 rows



#### 9. Criando classificador com GBT

In [62]:
gbt = GBTClassifier(labelCol="indexed", featuresCol="pcaFeatures", maxIter=10)

#### 10. Treinando o modelo e realizando predições

In [63]:
model = gbt.fit(dados_treino)

In [64]:
predictions = model.transform(dados_val)

In [65]:
predictions.select("prediction", "indexed", "pcaFeatures").show(20)

+----------+-------+--------------------+
|prediction|indexed|         pcaFeatures|
+----------+-------+--------------------+
|       0.0|    0.0|[-0.0422299797062...|
|       0.0|    0.0|[1.35173020001295...|
|       0.0|    0.0|[-2.6084567907153...|
|       0.0|    0.0|[-0.1023202671769...|
|       0.0|    0.0|[-0.1098288071135...|
|       0.0|    0.0|[-0.0811834868827...|
|       0.0|    0.0|[-0.1345145467024...|
|       0.0|    0.0|[-0.2060891708604...|
|       0.0|    0.0|[-0.3083268294698...|
|       0.0|    0.0|[-0.1267143155399...|
|       0.0|    0.0|[-0.3019682982154...|
|       0.0|    0.0|[-0.0376457533325...|
|       0.0|    0.0|[-0.1326121438445...|
|       0.0|    0.0|[-0.1268386785098...|
|       0.0|    0.0|[-0.0605327172824...|
|       0.0|    0.0|[-0.1341941943810...|
|       0.0|    0.0|[0.53413716864447...|
|       0.0|    0.0|[-0.1244692165481...|
|       0.0|    0.0|[-0.2552857995629...|
|       0.0|    0.0|[-3.5606017939094...|
+----------+-------+--------------

#### 11. Avaliando modelo com os dados de treino que separei para teste (não participou do treinamento)

In [66]:
evaluator = MulticlassClassificationEvaluator(
    labelCol="indexed", predictionCol="prediction", metricName="accuracy")

In [67]:
accuracy = evaluator.evaluate(predictions)

In [68]:
print("Acurária: {}".format(accuracy))

Acurária: 0.9602206105875711


In [69]:
predictions.columns

['ID',
 'TARGET',
 'pcaFeatures',
 'indexed',
 'rawPrediction',
 'probability',
 'prediction']

In [70]:
predictions.select("ID","prediction").show(5)

+----+----------+
|  ID|prediction|
+----+----------+
| 1.0|       0.0|
| 3.0|       0.0|
|10.0|       0.0|
|14.0|       0.0|
|20.0|       0.0|
+----+----------+
only showing top 5 rows



#### 12. Fazendo predições com os verdadeiros dados de teste

In [71]:
predictionsTest = model.transform(pcaResultTest)

In [86]:
predictionsTest.columns

['ID', 'pcaFeatures', 'rawPrediction', 'probability', 'prediction']

In [87]:
predictionsTest.select("prediction","pcaFeatures").show(20)

+----------+--------------------+
|prediction|         pcaFeatures|
+----------+--------------------+
|       0.0|[-0.0182111955226...|
|       0.0|[-0.0203952835288...|
|       0.0|[-0.0215552984755...|
|       0.0|[-0.0841040804868...|
|       0.0|[-0.0333051237527...|
|       0.0|[-0.0238352788494...|
|       0.0|[-0.1243250978108...|
|       0.0|[-0.0209919138643...|
|       0.0|[-0.0526590810499...|
|       0.0|[-0.0309984712096...|
|       0.0|[-0.0503317630085...|
|       0.0|[-0.0368034536479...|
|       0.0|[-0.0408117475071...|
|       0.0|[-0.0303322284748...|
|       0.0|[-8.0059777594160...|
|       0.0|[-0.1237648474876...|
|       0.0|[-0.0228859545669...|
|       0.0|[-0.0525434236456...|
|       0.0|[-0.0183303583459...|
|       0.0|[-0.0640108456538...|
+----------+--------------------+
only showing top 20 rows



In [88]:
predictionsPandas = predictionsTest.select("ID", "prediction").toPandas()

In [97]:
predictionsPandas.rename(columns=
    {'prediction': 'TARGET'}, 
    inplace=True)

In [100]:
predictionsPandas['ID'] = predictionsPandas['ID'].astype('int')
predictionsPandas['TARGET'] = predictionsPandas['TARGET'].astype('int')

In [102]:
predictionsPandas.to_csv("santander-customer-satisfaction/sample_submission.csv", index = False)