# Churn Dataset Correlation Calculation in Spark

In [1]:
import sys
sys.path.append("..")
from helpers.data_prep_and_print import print_df
from helpers.path_translation import translate_to_file_string
from pyspark.ml.feature import IndexToString, Normalizer, StringIndexer, VectorAssembler, VectorIndexer
from pyspark.ml.stat import Correlation, ChiSquareTest

from pyspark.sql import DataFrameReader
from pyspark.sql import SparkSession


## Select the churn file 

In [2]:
inputFile = translate_to_file_string("../data/churn.csv")

## Create the Spark Session 

In [3]:
#create a SparkSession
spark = (SparkSession
       .builder
       .appName("Churn Proprocessing")
       .getOrCreate())
# create a DataFrame using an ifered Schema 
df = spark.read.option("header", "true") \
       .option("inferSchema", "true") \
       .option("delimiter", ";") \
       .csv(inputFile)   

## Data Preparation
### Transform labels into index

In [4]:
labelIndexer = StringIndexer().setInputCol("LEAVE").setOutputCol("label").fit(df)
collegeIndexer = StringIndexer().setInputCol("COLLEGE").setOutputCol("COLLEGE_NUM").fit(df)
satIndexer = StringIndexer().setInputCol("REPORTED_SATISFACTION").setOutputCol("REPORTED_SATISFACTION_NUM").fit(df)
usageIndexer = StringIndexer().setInputCol("REPORTED_USAGE_LEVEL").setOutputCol("REPORTED_USAGE_LEVEL_NUM").fit(df)
changeIndexer = StringIndexer().setInputCol("CONSIDERING_CHANGE_OF_PLAN").setOutputCol("CONSIDERING_CHANGE_OF_PLAN_NUM").fit(df)

 ### Build the feature vector

In [5]:
featureCols = df.columns.copy()
featureCols.remove("LEAVE")
featureCols.remove("COLLEGE")
featureCols.remove("REPORTED_SATISFACTION")
featureCols.remove("REPORTED_USAGE_LEVEL")
featureCols.remove("CONSIDERING_CHANGE_OF_PLAN")
featureCols = featureCols +["COLLEGE_NUM","REPORTED_SATISFACTION_NUM","REPORTED_USAGE_LEVEL_NUM","CONSIDERING_CHANGE_OF_PLAN_NUM"]

### Build the feature Vector Assembler

In [6]:
assembler =  VectorAssembler(outputCol="features", inputCols=list(featureCols))

## Do the Data Preparation

In [7]:
labeledData = labelIndexer.transform(df)
print(labeledData.printSchema())
indexedLabedData = collegeIndexer.transform(satIndexer.transform(usageIndexer.transform(changeIndexer.transform(labeledData))))
labeledPointData = assembler.transform(indexedLabedData)
labeledPointData.show()

root
 |-- COLLEGE: string (nullable = true)
 |-- INCOME: integer (nullable = true)
 |-- OVERAGE: integer (nullable = true)
 |-- LEFTOVER: integer (nullable = true)
 |-- HOUSE: integer (nullable = true)
 |-- HANDSET_PRICE: integer (nullable = true)
 |-- OVER_15MINS_CALLS_PER_MONTH: integer (nullable = true)
 |-- AVERAGE_CALL_DURATION: integer (nullable = true)
 |-- REPORTED_SATISFACTION: string (nullable = true)
 |-- REPORTED_USAGE_LEVEL: string (nullable = true)
 |-- CONSIDERING_CHANGE_OF_PLAN: string (nullable = true)
 |-- LEAVE: string (nullable = true)
 |-- label: double (nullable = false)

None
+-------+------+-------+--------+------+-------------+---------------------------+---------------------+---------------------+--------------------+--------------------------+-----+-----+------------------------------+------------------------+-------------------------+-----------+--------------------+
|COLLEGE|INCOME|OVERAGE|LEFTOVER| HOUSE|HANDSET_PRICE|OVER_15MINS_CALLS_PER_MONTH|AVERAGE_CA

### As formated output

In [8]:
print_df(labeledPointData.limit(10))

Unnamed: 0,COLLEGE,INCOME,OVERAGE,LEFTOVER,HOUSE,HANDSET_PRICE,OVER_15MINS_CALLS_PER_MONTH,AVERAGE_CALL_DURATION,REPORTED_SATISFACTION,REPORTED_USAGE_LEVEL,CONSIDERING_CHANGE_OF_PLAN,LEAVE,label,CONSIDERING_CHANGE_OF_PLAN_NUM,REPORTED_USAGE_LEVEL_NUM,REPORTED_SATISFACTION_NUM,COLLEGE_NUM,features
0,zero,31953,0,6,313378,161,0,4,unsat,little,no,STAY,0.0,2.0,0.0,2.0,1.0,"[31953.0, 0.0, 6.0, 313378.0, 161.0, 0.0, 4.0, 1.0, 2.0, 0.0, 2.0]"
1,one,36147,0,13,800586,244,0,6,unsat,little,considering,STAY,0.0,0.0,0.0,2.0,0.0,"(36147.0, 0.0, 13.0, 800586.0, 244.0, 0.0, 6.0, 0.0, 2.0, 0.0, 0.0)"
2,one,27273,230,0,305049,201,16,15,unsat,very_little,perhaps,STAY,0.0,4.0,2.0,2.0,0.0,"[27273.0, 230.0, 0.0, 305049.0, 201.0, 16.0, 15.0, 0.0, 2.0, 2.0, 4.0]"
3,zero,120070,38,33,788235,780,3,2,unsat,very_high,considering,LEAVE,1.0,0.0,1.0,2.0,1.0,"[120070.0, 38.0, 33.0, 788235.0, 780.0, 3.0, 2.0, 1.0, 2.0, 1.0, 0.0]"
4,one,29215,208,85,224784,241,21,1,very_unsat,little,never_thought,STAY,0.0,3.0,0.0,0.0,0.0,"[29215.0, 208.0, 85.0, 224784.0, 241.0, 21.0, 1.0, 0.0, 0.0, 0.0, 3.0]"
5,zero,133728,64,48,632969,626,3,2,unsat,high,no,STAY,0.0,2.0,3.0,2.0,1.0,"[133728.0, 64.0, 48.0, 632969.0, 626.0, 3.0, 2.0, 1.0, 2.0, 3.0, 2.0]"
6,zero,42052,224,0,697949,191,10,5,very_unsat,little,actively_looking_into_it,STAY,0.0,1.0,0.0,0.0,1.0,"[42052.0, 224.0, 0.0, 697949.0, 191.0, 10.0, 5.0, 1.0, 0.0, 0.0, 1.0]"
7,one,84744,0,20,688098,357,0,5,very_unsat,little,considering,STAY,0.0,0.0,0.0,0.0,0.0,"(84744.0, 0.0, 20.0, 688098.0, 357.0, 0.0, 5.0, 0.0, 0.0, 0.0, 0.0)"
8,zero,38171,0,7,274218,190,0,5,very_sat,little,actively_looking_into_it,STAY,0.0,1.0,0.0,1.0,1.0,"[38171.0, 0.0, 7.0, 274218.0, 190.0, 0.0, 5.0, 1.0, 1.0, 0.0, 1.0]"
9,zero,105824,174,18,153560,687,25,4,very_sat,little,never_thought,LEAVE,1.0,3.0,0.0,1.0,1.0,"[105824.0, 174.0, 18.0, 153560.0, 687.0, 25.0, 4.0, 1.0, 1.0, 0.0, 3.0]"


In [9]:
r1_matrix = Correlation.corr(labeledPointData, "features").collect()[0][0]
corr_matrix = r1_matrix.toArray().tolist()
df_corr_matrix = spark.createDataFrame(corr_matrix,featureCols)

print_df(df_corr_matrix)

Unnamed: 0,INCOME,OVERAGE,LEFTOVER,HOUSE,HANDSET_PRICE,OVER_15MINS_CALLS_PER_MONTH,AVERAGE_CALL_DURATION,COLLEGE_NUM,REPORTED_SATISFACTION_NUM,REPORTED_USAGE_LEVEL_NUM,CONSIDERING_CHANGE_OF_PLAN_NUM
0,1.0,0.000458,0.006515,-0.010964,0.7272,0.002136,-0.007219,-0.011122,3e-05,0.007533,0.004521
1,0.000458,1.0,-0.003123,0.002412,0.000324,0.770557,0.000653,0.003091,-0.010458,-0.005948,-0.005362
2,0.006515,-0.003123,1.0,0.00653,0.004004,-0.010411,-0.660285,0.003925,0.00217,-0.001175,-0.009123
3,-0.010964,0.002412,0.00653,1.0,-0.007756,0.00741,-0.009359,0.000217,-0.011677,0.001771,0.003066
4,0.7272,0.000324,0.004004,-0.007756,1.0,0.00268,-0.00519,-0.00995,0.007122,-0.008672,0.005436
5,0.002136,0.770557,-0.010411,0.00741,0.00268,1.0,0.007769,0.007205,-0.013264,0.003033,-0.007777
6,-0.007219,0.000653,-0.660285,-0.009359,-0.00519,0.007769,1.0,0.00149,-0.001386,-0.005885,0.008342
7,-0.011122,0.003091,0.003925,0.000217,-0.00995,0.007205,0.00149,1.0,-0.002907,-0.001342,-0.003883
8,3e-05,-0.010458,0.00217,-0.011677,0.007122,-0.013264,-0.001386,-0.002907,1.0,0.000627,0.000421
9,0.007533,-0.005948,-0.001175,0.001771,-0.008672,0.003033,-0.005885,-0.001342,0.000627,1.0,0.007601


In [10]:
#r = ChiSquareTest.test(labeledPointData, "features", "label").head()
#print("pValues: " + str(r.pValues))
#print("degreesOfFreedom: " + str(r.degreesOfFreedom))
#print("statistics: " + str(r.statistics))

In [11]:
spark.stop()