see also https://www.youtube.com/watch?v=hbTJvjfX1fA

In [1]:
import findspark
findspark.init()

In [25]:
from pyspark import SparkConf, SparkContext
from pyspark.sql import SQLContext
import pyspark.sql.functions as F

In [167]:
from pyspark.ml import Pipeline, PipelineModel
from pyspark.ml.feature import OneHotEncoder, StringIndexer, QuantileDiscretizer, VectorAssembler

In [83]:
from itertools import chain

In [3]:
#from pyspark import SparkConf, SparkContext
conf = SparkConf().setAppName("Telcom").setMaster("local[4]")
sc = SparkContext(conf=conf)

In [4]:
sc

In [20]:

sqlContext = SQLContext(sc)

#df = sqlContext.read.format('com.databricks.spark.csv').options(header='true', inferschema='true').load('./data/WA_Fn-UseC_-Telco-Customer-Churn.csv')
df = sqlContext.read.format('csv').options(header='true', inferschema='true').load('./data/WA_Fn-UseC_-Telco-Customer-Churn.csv')

In [21]:
df.show(5)

+----------+------+-------------+-------+----------+------+------------+----------------+---------------+--------------+------------+----------------+-----------+-----------+---------------+--------------+----------------+--------------------+--------------+------------+-----+
|customerID|gender|SeniorCitizen|Partner|Dependents|tenure|PhoneService|   MultipleLines|InternetService|OnlineSecurity|OnlineBackup|DeviceProtection|TechSupport|StreamingTV|StreamingMovies|      Contract|PaperlessBilling|       PaymentMethod|MonthlyCharges|TotalCharges|Churn|
+----------+------+-------------+-------+----------+------+------------+----------------+---------------+--------------+------------+----------------+-----------+-----------+---------------+--------------+----------------+--------------------+--------------+------------+-----+
|7590-VHVEG|Female|            0|    Yes|        No|     1|          No|No phone service|            DSL|            No|         Yes|              No|         No|    

In [22]:
df.printSchema()

root
 |-- customerID: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- SeniorCitizen: integer (nullable = true)
 |-- Partner: string (nullable = true)
 |-- Dependents: string (nullable = true)
 |-- tenure: integer (nullable = true)
 |-- PhoneService: string (nullable = true)
 |-- MultipleLines: string (nullable = true)
 |-- InternetService: string (nullable = true)
 |-- OnlineSecurity: string (nullable = true)
 |-- OnlineBackup: string (nullable = true)
 |-- DeviceProtection: string (nullable = true)
 |-- TechSupport: string (nullable = true)
 |-- StreamingTV: string (nullable = true)
 |-- StreamingMovies: string (nullable = true)
 |-- Contract: string (nullable = true)
 |-- PaperlessBilling: string (nullable = true)
 |-- PaymentMethod: string (nullable = true)
 |-- MonthlyCharges: double (nullable = true)
 |-- TotalCharges: string (nullable = true)
 |-- Churn: string (nullable = true)



In [48]:
df.select([F.count(F.when(F.col(c).isNull() | F.isnan(c), c)).alias(c) for c in df.columns]).show()

+----------+------+-------------+-------+----------+------+------------+-------------+---------------+--------------+------------+----------------+-----------+-----------+---------------+--------+----------------+-------------+--------------+------------+-----+
|customerID|gender|SeniorCitizen|Partner|Dependents|tenure|PhoneService|MultipleLines|InternetService|OnlineSecurity|OnlineBackup|DeviceProtection|TechSupport|StreamingTV|StreamingMovies|Contract|PaperlessBilling|PaymentMethod|MonthlyCharges|TotalCharges|Churn|
+----------+------+-------------+-------+----------+------+------------+-------------+---------------+--------------+------------+----------------+-----------+-----------+---------------+--------+----------------+-------------+--------------+------------+-----+
|         0|     0|            0|      0|         0|     0|           0|            0|              0|             0|           0|               0|          0|          0|              0|       0|               0| 

# Process with PySpark

### PySpark -- Data Inspection

In [69]:
df.groupby('Churn').count().show()

+-----+-----+
|Churn|count|
+-----+-----+
|   No| 5174|
|  Yes| 1869|
+-----+-----+



* imbalanced groups

In [70]:
df.select('tenure','TotalCharges','MonthlyCharges').describe().show()

+-------+------------------+------------------+------------------+
|summary|            tenure|      TotalCharges|    MonthlyCharges|
+-------+------------------+------------------+------------------+
|  count|              7043|              7043|              7043|
|   mean| 32.37114865824223|2283.3004408418697| 64.76169246059922|
| stddev|24.559481023094442| 2266.771361883145|30.090047097678482|
|    min|                 0|                  |             18.25|
|    max|                72|             999.9|            118.75|
+-------+------------------+------------------+------------------+



In [80]:
df.groupby('churn').pivot('gender').count().show()

+-----+------+----+
|churn|Female|Male|
+-----+------+----+
|   No|  2549|2625|
|  Yes|   939| 930|
+-----+------+----+



In [81]:
df.groupby('SeniorCitizen').pivot('churn').count().show()

+-------------+----+----+
|SeniorCitizen|  No| Yes|
+-------------+----+----+
|            1| 666| 476|
|            0|4508|1393|
+-------------+----+----+



In [91]:
def make_normalized_pivot(df, group_col, pivot_col, verbose=1):
    
    # https://stackoverflow.com/questions/40805808/percentage-count-per-group-and-pivot-with-pyspark
    levels = [x for x in chain(*df.select("churn").distinct().collect())]
    if verbose: print(f'levels of "{pivot_col}": {levels}')
    
    pivoted = df.groupby(group_col).pivot(pivot_col, levels).count()
    row_count = sum(F.coalesce(F.col(x), F.lit(0)) for x in levels)
    #row_count = sum(coalesce(col(x), lit(0)) for x in levels)
    adjusted = [(F.col(x) / row_count).alias(x) for x in levels]
    return pivoted.select(F.col(group_col), *adjusted)
                    
make_normalized_pivot(df=df, group_col='SeniorCitizen', pivot_col='churn').show()

levels of "churn": ['No', 'Yes']
+-------------+------------------+-------------------+
|SeniorCitizen|                No|                Yes|
+-------------+------------------+-------------------+
|            1|0.5831873905429071| 0.4168126094570928|
|            0| 0.763938315539739|0.23606168446026096|
+-------------+------------------+-------------------+



In [92]:
df.stat.crosstab("SeniorCitizen","InternetService").show()

+-----------------------------+----+-----------+----+
|SeniorCitizen_InternetService| DSL|Fiber optic|  No|
+-----------------------------+----+-----------+----+
|                            1| 259|        831|  52|
|                            0|2162|       2265|1474|
+-----------------------------+----+-----------+----+



In [96]:
df.select('SeniorCitizen','InternetService').sample(False,.002).show()

+-------------+---------------+
|SeniorCitizen|InternetService|
+-------------+---------------+
|            0|             No|
|            1|            DSL|
|            1|    Fiber optic|
|            0|            DSL|
|            0|            DSL|
|            0|             No|
|            0|            DSL|
|            0|            DSL|
|            0|             No|
|            0|    Fiber optic|
|            1|    Fiber optic|
|            0|             No|
|            1|    Fiber optic|
|            1|    Fiber optic|
|            0|    Fiber optic|
|            1|    Fiber optic|
|            0|             No|
+-------------+---------------+



### PySpark -- Mach Learn

In [126]:
# Will drop customerID -- but first save it to a vector or later reference
customerID_vec = df.select('customerID')
customerID_vec.show(2)

# DROP:
df = df.drop('customerID')

AnalysisException: cannot resolve '`customerID`' given input columns: [Churn, Contract, Dependents, DeviceProtection, InternetService, MonthlyCharges, MultipleLines, OnlineBackup, OnlineSecurity, PaperlessBilling, Partner, PaymentMethod, PhoneService, SeniorCitizen, StreamingMovies, StreamingTV, TechSupport, TotalCharges, gender, tenure];
'Project ['customerID]
+- Project [gender#472, SeniorCitizen#473, Partner#474, Dependents#475, tenure#476, PhoneService#477, MultipleLines#478, InternetService#479, OnlineSecurity#480, OnlineBackup#481, DeviceProtection#482, TechSupport#483, StreamingTV#484, StreamingMovies#485, Contract#486, PaperlessBilling#487, PaymentMethod#488, MonthlyCharges#489, TotalCharges#490, Churn#491]
   +- Relation[customerID#471,gender#472,SeniorCitizen#473,Partner#474,Dependents#475,tenure#476,PhoneService#477,MultipleLines#478,InternetService#479,OnlineSecurity#480,OnlineBackup#481,DeviceProtection#482,TechSupport#483,StreamingTV#484,StreamingMovies#485,Contract#486,PaperlessBilling#487,PaymentMethod#488,MonthlyCharges#489,TotalCharges#490,Churn#491] csv


In [104]:
train_data, test_data = df.randomSplit([.7,.3], seed=0)
print(train_data.count(), test_data.count())

4900 2143


#### feature engineering

In [143]:
[c + '_Indexed' for c in cat_cols]

['gender_Indexed',
 'Partner_Indexed',
 'Dependents_Indexed',
 'PhoneService_Indexed',
 'MultipleLines_Indexed',
 'InternetService_Indexed',
 'OnlineSecurity_Indexed',
 'OnlineBackup_Indexed',
 'DeviceProtection_Indexed',
 'TechSupport_Indexed',
 'StreamingTV_Indexed',
 'StreamingMovies_Indexed',
 'Contract_Indexed',
 'PaperlessBilling_Indexed',
 'PaymentMethod_Indexed',
 'TotalCharges_Indexed',
 'Churn_Indexed',
 'SeniorCitizen_Indexed']

In [147]:
label_cols = ['Churn']

In [158]:
num_cols = [name for name, dtype in df.dtypes if dtype in ('int','float','double')]
num_cols

['SeniorCitizen', 'tenure', 'MonthlyCharges']

In [148]:
str_cols = [name for name, dtype in df.dtypes if dtype=='string' and name not in label_cols]
str_cols

['gender',
 'Partner',
 'Dependents',
 'PhoneService',
 'MultipleLines',
 'InternetService',
 'OnlineSecurity',
 'OnlineBackup',
 'DeviceProtection',
 'TechSupport',
 'StreamingTV',
 'StreamingMovies',
 'Contract',
 'PaperlessBilling',
 'PaymentMethod',
 'TotalCharges']

In [149]:
bin_cols = ['SeniorCitizen']

In [150]:
cat_cols = str_cols + bin_cols
cat_cols_inds = [c + '_Indexed' for c in cat_cols]
cat_cols_ohes = [c + '_OHE' for c in cat_cols]

In [152]:
SI = StringIndexer()
SI.setInputCols(cat_cols)
SI.setOutputCols(cat_cols_inds)

OHE = OneHotEncoder()
OHE.setInputCols(cat_cols_inds)
OHE.setOutputCols(cat_cols_ohes)
OHE

LE = StringIndexer(inputCols=label_cols, outputCols=[c + '_Indexed' for c in label_cols])

In [156]:
QD = QuantileDiscretizer(numBuckets=3, inputCols=['tenure'], outputCols=['tenure_bined'])
QD

QuantileDiscretizer_8ba6eb4b712c

#### transform features

In [159]:
VA = VectorAssembler(inputCols=cat_cols_ohes + num_cols, outputCol = 'features')

In [160]:
stages = [SI, OHE, LE, QD, VA]

In [176]:
# fit() and transform() are applied on different objects ....
feature_pipeline = Pipeline().setStages(stages)
feature_pipelineModel = feature_pipeline.fit(train_data)

train_data_transformed = feature_pipelineModel.transform(train_data)
test_data_transformed = feature_pipelineModel.transform(test_data)

In [None]:
feature_pipeline.

In [177]:
train_data_transformed.head(1)

[Row(customerID='0003-MKNFE', gender='Male', SeniorCitizen=0, Partner='No', Dependents='No', tenure=9, PhoneService='Yes', MultipleLines='Yes', InternetService='DSL', OnlineSecurity='No', OnlineBackup='No', DeviceProtection='No', TechSupport='No', StreamingTV='No', StreamingMovies='Yes', Contract='Month-to-month', PaperlessBilling='No', PaymentMethod='Mailed check', MonthlyCharges=59.9, TotalCharges='542.4', Churn='No', Contract_Indexed=0.0, DeviceProtection_Indexed=0.0, OnlineBackup_Indexed=0.0, OnlineSecurity_Indexed=0.0, SeniorCitizen_Indexed=0.0, MultipleLines_Indexed=1.0, Partner_Indexed=0.0, gender_Indexed=0.0, PhoneService_Indexed=0.0, PaperlessBilling_Indexed=1.0, PaymentMethod_Indexed=1.0, Dependents_Indexed=0.0, InternetService_Indexed=1.0, TechSupport_Indexed=0.0, StreamingMovies_Indexed=1.0, StreamingTV_Indexed=0.0, TotalCharges_Indexed=3221.0, PhoneService_OHE=SparseVector(1, {0: 1.0}), OnlineSecurity_OHE=SparseVector(2, {0: 1.0}), Contract_OHE=SparseVector(2, {0: 1.0}), S

# Process as standard pandas df

In [62]:
pd_df = df.sample(False, .05, seed=1).toPandas()

In [63]:
pd_df.__len__()

347

# Process as SQL

In [66]:
tmp_table_name = "churn_analysis"
df.createOrReplaceTempView(tmp_table_name)

In [67]:
type(df)

pyspark.sql.dataframe.DataFrame

In [68]:
%sql
select * from churn_analysis

SyntaxError: invalid syntax (<ipython-input-68-a45a80323020>, line 2)