<a href="https://colab.research.google.com/github/ralsouza/predicting_santander_customer_satisfaction/blob/main/notebooks%5C01_predicting_santander_customer_satisfaction.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Predicting Santander Customer Satisfaction

Customer satisfaction is a key measure of success. Unhappy customers don't stick around. What's more, unhappy customers rarely voice their dissatisfaction before leaving.

Santander Bank is asking to help them identify dissatisfied customers early in their relationship. Doing so would allow Santander to take proactive steps to improve a customer's happiness before it's too late.

In this project, we'll work with hundreds of anonymized features to predict if a customer is satisfied or dissatisfied with their banking experience.

Source: [Kaggle: Santander Customer Satisfaction](https://www.kaggle.com/c/santander-customer-satisfaction)

## Business Problem: 
Identify dissatisfied customers before leave the Santander services with 70% of accuracy.

# PySpark Setup

In [None]:
!apt-get update

In [55]:
# Install the dependencies
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://archive.apache.org/dist/spark/spark-2.4.4/spark-2.4.4-bin-hadoop2.7.tgz
!tar xf spark-2.4.4-bin-hadoop2.7.tgz
!pip install -q findspark

In [56]:
# Environment variables
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.4.4-bin-hadoop2.7"

In [57]:
# Make pyspark "importable"
import findspark
findspark.init('spark-2.4.4-bin-hadoop2.7')

In [58]:
# Libraries and Context Setup
import pyspark
from pyspark.sql import SparkSession
from pyspark import SparkContext, SparkConf

In [59]:
# create the session
conf = SparkConf().set("spark.ui.port", "4050")

# create the context
sc = pyspark.SparkContext(conf=conf)


# Instance Spark Session
spark = SparkSession.builder.master('local').appName('spark_ml_lib').getOrCreate()

# Create the SQL Context
sqlContext = pyspark.SQLContext(sc)

ValueError: ignored

In [None]:
# Create Spark Session
sp_session = SparkSession.builder.master('local').appName('app_santander_satisfaction').getOrCreate()

# Import libraries

In [128]:
from pyspark.sql import Row
from pyspark.sql.functions import col, regexp_replace

# Load data
You are provided with an anonymized dataset containing a large number of numeric variables. The `TARGET` column is the variable to predict. It equals `one` for unsatisfied customers and `0` for satisfied customers.

The task is to predict the probability that each customer in the test set is an unsatisfied customer.

**File descriptions:**
*   train.csv - the training set including the target
*   test.csv - the test set without the target




## Train data

In [None]:
# Load train data as rdd
rdd_cust_train = sc.textFile('/content/drive/My Drive/Colab Notebooks/08-apache-spark/projects/data/train.csv')

In [None]:
# Persist data in cache
rdd_cust_train.cache()

/content/drive/My Drive/Colab Notebooks/08-apache-spark/projects/data/train.csv MapPartitionsRDD[1] at textFile at NativeMethodAccessorImpl.java:0

# Transform RDD to Spark Dataframe


In [None]:
# Select header header
header = rdd_cust_train.first()

In [None]:
# First function converts to str
header

'ID,var3,var15,imp_ent_var16_ult1,imp_op_var39_comer_ult1,imp_op_var39_comer_ult3,imp_op_var40_comer_ult1,imp_op_var40_comer_ult3,imp_op_var40_efect_ult1,imp_op_var40_efect_ult3,imp_op_var40_ult1,imp_op_var41_comer_ult1,imp_op_var41_comer_ult3,imp_op_var41_efect_ult1,imp_op_var41_efect_ult3,imp_op_var41_ult1,imp_op_var39_efect_ult1,imp_op_var39_efect_ult3,imp_op_var39_ult1,imp_sal_var16_ult1,ind_var1_0,ind_var1,ind_var2_0,ind_var2,ind_var5_0,ind_var5,ind_var6_0,ind_var6,ind_var8_0,ind_var8,ind_var12_0,ind_var12,ind_var13_0,ind_var13_corto_0,ind_var13_corto,ind_var13_largo_0,ind_var13_largo,ind_var13_medio_0,ind_var13_medio,ind_var13,ind_var14_0,ind_var14,ind_var17_0,ind_var17,ind_var18_0,ind_var18,ind_var19,ind_var20_0,ind_var20,ind_var24_0,ind_var24,ind_var25_cte,ind_var26_0,ind_var26_cte,ind_var26,ind_var25_0,ind_var25,ind_var27_0,ind_var28_0,ind_var28,ind_var27,ind_var29_0,ind_var29,ind_var30_0,ind_var30,ind_var31_0,ind_var31,ind_var32_cte,ind_var32_0,ind_var32,ind_var33_0,ind_var33

In [None]:
# Remove header
rdd_cust_train2 = rdd_cust_train.filter(lambda x: x != header)

In [None]:
# Check new rdd
rdd_cust_train2.take(5)

['1,2,23,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,0,0,0,1,0,0,1,0,0,0,0,0,0,0,0,0,3,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,3,0,0,0,0,0,0,0,0,0,0,0,0,0,3,0,0,3,0,0,3,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,99,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,2,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,39205.17,0',
 '3,2,34,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,1,0,0,0,0,0,0,0,1,1,1,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,1,1,0,0,0,0,0,0,0,0,0,0,0,0,1,0,0,1,0,0,0,0,0,0,0,0,1,3,0,0,0,0,0,0,0,3,3,3,0,0,0,0,3,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,

In [None]:
# Split rdd by columns
train2 = rdd_cust_train2.map(lambda x: x.split(','))

In [None]:
# Create Spark Dataframe with headers
# The headers are converted from string to list separated by ','
train = spark.createDataFrame(train2,header.split(','))

In [None]:
# Show dataframe
train.show()

+---+----+-----+------------------+-----------------------+-----------------------+-----------------------+-----------------------+-----------------------+-----------------------+-----------------+-----------------------+-----------------------+-----------------------+-----------------------+-----------------+-----------------------+-----------------------+-----------------+------------------+----------+--------+----------+--------+----------+--------+----------+--------+----------+--------+-----------+---------+-----------+-----------------+---------------+-----------------+---------------+-----------------+---------------+---------+-----------+---------+-----------+---------+-----------+---------+---------+-----------+---------+-----------+---------+-------------+-----------+-------------+---------+-----------+---------+-----------+-----------+---------+---------+-----------+---------+-----------+---------+-----------+---------+-------------+-----------+---------+-----------+--------

In [None]:
# Persist Spark Dataframe into RAM
train.cache()

DataFrame[ID: string, var3: string, var15: string, imp_ent_var16_ult1: string, imp_op_var39_comer_ult1: string, imp_op_var39_comer_ult3: string, imp_op_var40_comer_ult1: string, imp_op_var40_comer_ult3: string, imp_op_var40_efect_ult1: string, imp_op_var40_efect_ult3: string, imp_op_var40_ult1: string, imp_op_var41_comer_ult1: string, imp_op_var41_comer_ult3: string, imp_op_var41_efect_ult1: string, imp_op_var41_efect_ult3: string, imp_op_var41_ult1: string, imp_op_var39_efect_ult1: string, imp_op_var39_efect_ult3: string, imp_op_var39_ult1: string, imp_sal_var16_ult1: string, ind_var1_0: string, ind_var1: string, ind_var2_0: string, ind_var2: string, ind_var5_0: string, ind_var5: string, ind_var6_0: string, ind_var6: string, ind_var8_0: string, ind_var8: string, ind_var12_0: string, ind_var12: string, ind_var13_0: string, ind_var13_corto_0: string, ind_var13_corto: string, ind_var13_largo_0: string, ind_var13_largo: string, ind_var13_medio_0: string, ind_var13_medio: string, ind_var13

# Exploratory data analysis

In [None]:
# Shape
print(train.count(),'x',len(train.columns))

76020 x 371


In [86]:
# Target distribuition
# Happy customers have TARGET == 0, unhappy custormers have TARGET == 1

dist_target = train.groupBy('TARGET').count() \
  .withColumn('percentage', 100 * col('count') / train.count())

dist_target.show()

+------+-----+------------------+
|TARGET|count|        percentage|
+------+-----+------------------+
|     0|73012| 96.04314654038411|
|     1| 3008|3.9568534596158904|
+------+-----+------------------+



## var3: nationality of the customer

In [100]:
# Top 20 most common values

# 116 values in column var3 are -999999
# var3 is suspected to be the nationality of the customer
# -999999 would mean that the nationality of the customer is unknown

train.groupBy('var3').count().sort(col('count').desc()).show()

+-------+-----+
|   var3|count|
+-------+-----+
|      2|74165|
|      8|  138|
|-999999|  116|
|      9|  110|
|      3|  108|
|      1|  105|
|     13|   98|
|      7|   97|
|      4|   86|
|     12|   85|
|      6|   82|
|      0|   75|
|     10|   72|
|     11|   66|
|      5|   63|
|     14|   61|
|     15|   34|
|     18|   10|
|     16|    9|
|     23|    7|
+-------+-----+
only showing top 20 rows



In [105]:
# Count -999999 values
train.filter(train['var3'] == -999999).count()

116

In [152]:
# Replace -999999 in var3 column with most common value 2 
# See https://www.kaggle.com/cast42/santander-customer-satisfaction/debugging-var3-999999
# for details

train.withColumn("var3", \
                 when(train["var3"] == -999999, 2).otherwise(train["var3"])).show()

+---+----+-----+------------------+-----------------------+-----------------------+-----------------------+-----------------------+-----------------------+-----------------------+-----------------+-----------------------+-----------------------+-----------------------+-----------------------+-----------------+-----------------------+-----------------------+-----------------+------------------+----------+--------+----------+--------+----------+--------+----------+--------+----------+--------+-----------+---------+-----------+-----------------+---------------+-----------------+---------------+-----------------+---------------+---------+-----------+---------+-----------+---------+-----------+---------+---------+-----------+---------+-----------+---------+-------------+-----------+-------------+---------+-----------+---------+-----------+-----------+---------+---------+-----------+---------+-----------+---------+-----------+---------+-------------+-----------+---------+-----------+--------

In [153]:
# Count -999999 values again
train.filter(train['var3'] == -999999).count()

116

## Save Modified train dataset

In [119]:
# Save dataset
train.write.save('/content/drive/My Drive/Colab Notebooks/08-apache-spark/projects/data/train_output.parquet')