# PySpark Data Transformation:

We will be using ML package to build models in PySpark. ML Package needs data be put in a (**label: Double, features: Vector**) DataFrame format with correspondingly named fields. We will be using:
- *StringIndexer*, 
- *OneHotEncoderEstimator* and 
- *VectorAssembler* in PySpark ML Package to do so.

----

**Import dependencies**

In [1]:
from pyspark.sql.functions import *
from pyspark.ml import Pipeline
from pyspark.ml.feature import OneHotEncoderEstimator, StringIndexer, VectorAssembler

In [2]:
import numpy as np
import os

In [3]:
print(os.listdir("./"))

['.DS_Store', 'pyspark_data_transformation.ipynb', 'data_preprocessing.ipynb', '.ipynb_checkpoints']


**Load Data**

In [4]:
teleco_data = spark.read.csv('../data/Telco-Customer-Churn.csv', header = True, inferSchema = True)

In [5]:
# check dataframe schema 
teleco_data.printSchema()

root
 |-- customerID: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- SeniorCitizen: integer (nullable = true)
 |-- Partner: string (nullable = true)
 |-- Dependents: string (nullable = true)
 |-- tenure: integer (nullable = true)
 |-- PhoneService: string (nullable = true)
 |-- MultipleLines: string (nullable = true)
 |-- InternetService: string (nullable = true)
 |-- OnlineSecurity: string (nullable = true)
 |-- OnlineBackup: string (nullable = true)
 |-- DeviceProtection: string (nullable = true)
 |-- TechSupport: string (nullable = true)
 |-- StreamingTV: string (nullable = true)
 |-- StreamingMovies: string (nullable = true)
 |-- Contract: string (nullable = true)
 |-- PaperlessBilling: string (nullable = true)
 |-- PaymentMethod: string (nullable = true)
 |-- MonthlyCharges: double (nullable = true)
 |-- TotalCharges: string (nullable = true)
 |-- Churn: string (nullable = true)



**Replace/ Drop Missing Values**

In [6]:
# Replacing spaces with null values in total charges column
dfWithEmptyReplaced = teleco_data.withColumn('TotalCharges', when(col('TotalCharges') == ' ', None).otherwise(col('TotalCharges')).cast("float"))
dfWithEmptyReplaced = dfWithEmptyReplaced.na.drop()

In [7]:
# Replacing 'No internet service' to No for the following columns
replace_cols = [ 'OnlineSecurity', 'OnlineBackup', 'DeviceProtection','TechSupport','StreamingTV', 'StreamingMovies']

In [8]:
# replace values
for col_name in replace_cols:
    dfwithNo = dfWithEmptyReplaced.withColumn(col_name, when(col(col_name)== "No internet service","No").otherwise(col(col_name)))

In [9]:
dfwithNo.createOrReplaceTempView("datawrangling")

In [10]:
# Using Spark SQL to create categories 
df_wrangling = spark.sql("""
select distinct 
         customerID
        ,gender
        ,SeniorCitizen
        ,Partner
        ,Dependents
        ,tenure
        ,case when (tenure<=12) then "Tenure_0-12"
              when (tenure>12 and tenure <=24) then "Tenure_12-24"
              when (tenure>24 and tenure <=48) then "Tenure_24-48"
              when (tenure>48 and tenure <=60) then "Tenure_48-60"
              when (tenure>60) then "Tenure_gt_60"
        end as tenure_group
        ,PhoneService
        ,MultipleLines
        ,InternetService
        ,OnlineSecurity
        ,OnlineBackup
        ,DeviceProtection
        ,TechSupport
        ,StreamingTV
        ,StreamingMovies
        ,Contract
        ,PaperlessBilling
        ,PaymentMethod
        ,MonthlyCharges
        ,TotalCharges
        ,Churn
    from datawrangling
""")


In [11]:
df_wrangling

DataFrame[customerID: string, gender: string, SeniorCitizen: int, Partner: string, Dependents: string, tenure: int, tenure_group: string, PhoneService: string, MultipleLines: string, InternetService: string, OnlineSecurity: string, OnlineBackup: string, DeviceProtection: string, TechSupport: string, StreamingTV: string, StreamingMovies: string, Contract: string, PaperlessBilling: string, PaymentMethod: string, MonthlyCharges: double, TotalCharges: float, Churn: string]

**Variables Correlation**

In [12]:
# convert to pandas dataframe to see correlations
df_wrangling_pandas = df_wrangling.toPandas()

In [13]:
# correlation
correlation = df_wrangling_pandas.corr()
correlation

Unnamed: 0,SeniorCitizen,tenure,MonthlyCharges,TotalCharges
SeniorCitizen,1.0,0.015683,0.219874,0.102411
tenure,0.015683,1.0,0.246862,0.82588
MonthlyCharges,0.219874,0.246862,1.0,0.651065
TotalCharges,0.102411,0.82588,0.651065,1.0


### Data Preparation:
[ML Package](https://spark.apache.org/docs/latest/ml-guide.html) in Spark wants data in this format: **label: Double, features: Vector**. We will use *StringIndexer*, *OneHotEncoderEstimator* and *VectorAssembler* in PySpark ML Package to do so.

[Reference](https://spark.apache.org/docs/latest/ml-features.html):
- *StringIndexer:* encodes a string column of labels to a column of label indices.
- *OneHotEncoderEstimator:* maps a categorical feature, represented as a label index, to a binary vector with at most a single one-value indicating the presence of a specific feature value from among the set of all feature values.
- *VectorAssembler:* is a transformer that combines a given list of columns into a single vector column. It is useful for combining raw features and features generated by different feature transformers into a single feature vector, 

In [14]:
# select on categorical Columns from dataset
categoricalColumns = ['gender','SeniorCitizen','Partner','Dependents','PhoneService','MultipleLines','InternetService', 'OnlineSecurity', 'OnlineBackup', 'DeviceProtection', 'TechSupport', 'StreamingTV', 'StreamingMovies', 'Contract','PaperlessBilling','PaymentMethod']
stages = [] # stages in our Pipeline

In [15]:
for categoricalCol in categoricalColumns:
    # Category Indexing with StringIndexer
    stringIndexer = StringIndexer(inputCol=categoricalCol, outputCol=categoricalCol + "Index")
    # Use OneHotEncoder to convert categorical variables into binary SparseVectors
    encoder = OneHotEncoderEstimator(inputCols=[stringIndexer.getOutputCol()], outputCols=[categoricalCol + "classVec"])
    # Add stages.
    stages += [stringIndexer, encoder]

In [16]:
# Convert label into label indices using the StringIndexer and store it in 'label' column
label_stringIdx = StringIndexer(inputCol="Churn", outputCol="label")
stages += [label_stringIdx]

**Transforming all features into a vector using VectorAssembler**

In [17]:
# Transform all features into a vector using VectorAssembler
numericCols = ['MonthlyCharges', 'TotalCharges']
# add numerical + categorical columns into assembler inputs
assemblerInputs = numericCols + [c + "classVec" for c in categoricalColumns]
# assemble all features into a single vector and store it in 'feature' column
assembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features")
stages += [assembler]
IDcols = ['customerID']

**Create a pipeline to transform dataset**

In [18]:
# Create a Pipeline.
pipeline = Pipeline(stages=stages)

# Run the feature transformations.
#  fit() computes feature statistics as needed.
pipelineModel = pipeline.fit(df_wrangling)
#  transform() actually transforms the features.
dataset = pipelineModel.transform(df_wrangling)

# Keep relevant columns
selectedcols= ["label", "features"] + IDcols
transformed_dataset = dataset.select(selectedcols)

In [19]:
# peek into transformed dataset
transformed_dataset.show()

+-----+--------------------+----------+
|label|            features|customerID|
+-----+--------------------+----------+
|  0.0|(28,[0,1,3,6,7,10...|6497-TILVL|
|  1.0|(28,[0,1,3,5,6,7,...|0691-JVSYA|
|  0.0|(28,[0,1,3,4,5,6,...|8544-GOQSH|
|  0.0|(28,[0,1,2,3,4,5,...|5172-MIGPM|
|  0.0|(28,[0,1,2,3,5,6,...|4312-KFRXN|
|  1.0|[69.9,69.90000152...|5875-YPQFJ|
|  0.0|(28,[0,1,2,3,4,5,...|3916-NRPAP|
|  0.0|(28,[0,1,3,4,5,10...|4598-ZADCK|
|  0.0|(28,[0,1,2,3,4,5,...|4137-BTIKL|
|  0.0|(28,[0,1,3,6,8,10...|1032-MAELW|
|  0.0|(28,[0,1,2,3,4,5,...|5906-BFOZT|
|  0.0|(28,[0,1,2,3,4,5,...|1335-NTIUC|
|  0.0|(28,[0,1,3,4,5,6,...|8885-QSQBX|
|  0.0|(28,[0,1,2,3,4,5,...|7504-UWHNB|
|  1.0|(28,[0,1,2,3,5,6,...|8443-WVPSS|
|  1.0|(28,[0,1,2,5,10,1...|2778-OCLGR|
|  1.0|(28,[0,1,2,3,4,5,...|8627-EHGIP|
|  0.0|(28,[0,1,3,6,7,21...|4182-BGSIQ|
|  0.0|(28,[0,1,2,3,5,6,...|7929-SKFGK|
|  0.0|(28,[0,1,2,3,4,5,...|5063-IUOKK|
+-----+--------------------+----------+
only showing top 20 rows



In [20]:
# printSchema for transformed data
transformed_dataset.printSchema()

root
 |-- label: double (nullable = false)
 |-- features: vector (nullable = true)
 |-- customerID: string (nullable = true)



In [21]:
# convert to pandas dataframe
dataset = transformed_dataset.toPandas()

In [22]:
# save transformed dataset
dataset.to_csv('../data/transformed_data.csv',index=False)