## Overview

This notebook will show you how to create and query a table or DataFrame that you uploaded to DBFS. [DBFS](https://docs.databricks.com/user-guide/dbfs-databricks-file-system.html) is a Databricks File System that allows you to store data for querying inside of Databricks. This notebook assumes that you have a file already inside of DBFS that you would like to read from.

This notebook is written in **Python** so the default cell type is Python. However, you can use different languages by using the `%LANGUAGE` syntax. Python, Scala, SQL, and R are all supported.

In [0]:
# File location and type
file_location = "/FileStore/tables/Telco_customer_churn.csv"
file_type = "csv"

# CSV options
infer_schema = "true"
first_row_is_header = "true"
delimiter = ","

# The applied options are for CSV files. For other file types, these will be ignored.
df = spark.read.format(file_type) \
  .option("inferSchema", infer_schema) \
  .option("header", first_row_is_header) \
  .option("sep", delimiter).option('nanValue', ' ').option('nullValue', ' ').load(file_location)


In [0]:
# Removing extra columns 
df = df.drop('Churn Reason', 'CLTV', 'Count','Lat Long' ,'Country' 'State','City', 'Zip', 'Code', 'Lat', 'Long', 'Latitude', 'Longitude', 'Churn Value',
       'Churn Score', 'Zip Code')

In [0]:
df.printSchema()

root
 |-- CustomerID: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- State: string (nullable = true)
 |-- Gender: string (nullable = true)
 |-- Senior Citizen: string (nullable = true)
 |-- Partner: string (nullable = true)
 |-- Dependents: string (nullable = true)
 |-- Tenure Months: integer (nullable = true)
 |-- Phone Service: string (nullable = true)
 |-- Multiple Lines: string (nullable = true)
 |-- Internet Service: string (nullable = true)
 |-- Online Security: string (nullable = true)
 |-- Online Backup: string (nullable = true)
 |-- Device Protection: string (nullable = true)
 |-- Tech Support: string (nullable = true)
 |-- Streaming TV: string (nullable = true)
 |-- Streaming Movies: string (nullable = true)
 |-- Contract: string (nullable = true)
 |-- Paperless Billing: string (nullable = true)
 |-- Payment Method: string (nullable = true)
 |-- Monthly Charges: double (nullable = true)
 |-- Total Charges: double (nullable = true)
 |-- Churn Label: strin

In [0]:
from pyspark.sql.functions import isnan, when, count, col
df.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in df.columns]).show()

+----------+-------+-----+------+--------------+-------+----------+-------------+-------------+--------------+----------------+---------------+-------------+-----------------+------------+------------+----------------+--------+-----------------+--------------+---------------+-------------+-----------+
|CustomerID|Country|State|Gender|Senior Citizen|Partner|Dependents|Tenure Months|Phone Service|Multiple Lines|Internet Service|Online Security|Online Backup|Device Protection|Tech Support|Streaming TV|Streaming Movies|Contract|Paperless Billing|Payment Method|Monthly Charges|Total Charges|Churn Label|
+----------+-------+-----+------+--------------+-------+----------+-------------+-------------+--------------+----------------+---------------+-------------+-----------------+------------+------------+----------------+--------+-----------------+--------------+---------------+-------------+-----------+
|         0|      0|    0|     0|             0|      0|         0|            0|          

In [0]:
## create or replace temp table
temp_table = "churn_analysis"
df.createOrReplaceTempView(temp_table)

In [0]:
%sql
select * from churn_analysis
limit 5

CustomerID,Country,State,Gender,Senior Citizen,Partner,Dependents,Tenure Months,Phone Service,Multiple Lines,Internet Service,Online Security,Online Backup,Device Protection,Tech Support,Streaming TV,Streaming Movies,Contract,Paperless Billing,Payment Method,Monthly Charges,Total Charges,Churn Label
3668-QPYBK,United States,California,Male,No,No,No,2,Yes,No,DSL,Yes,Yes,No,No,No,No,Month-to-month,Yes,Mailed check,53.85,108.15,Yes
9237-HQITU,United States,California,Female,No,No,Yes,2,Yes,No,Fiber optic,No,No,No,No,No,No,Month-to-month,Yes,Electronic check,70.7,151.65,Yes
9305-CDSKC,United States,California,Female,No,No,Yes,8,Yes,Yes,Fiber optic,No,No,Yes,No,Yes,Yes,Month-to-month,Yes,Electronic check,99.65,820.5,Yes
7892-POOKP,United States,California,Female,No,Yes,Yes,28,Yes,Yes,Fiber optic,No,No,Yes,Yes,Yes,Yes,Month-to-month,Yes,Electronic check,104.8,3046.05,Yes
0280-XJGEX,United States,California,Male,No,No,Yes,49,Yes,Yes,Fiber optic,No,Yes,Yes,No,Yes,Yes,Month-to-month,Yes,Bank transfer (automatic),103.7,5036.3,Yes


In [0]:
df.groupBy('Churn Label').count().show() ## here we can say that data is not balanced properly

+-----------+-----+
|Churn Label|count|
+-----------+-----+
|         No| 5174|
|        Yes| 1869|
+-----------+-----+



In [0]:
df.select('Tenure Months', 'Total Charges', 'Monthly Charges').describe().show()

+-------+------------------+------------------+------------------+
|summary|     Tenure Months|     Total Charges|   Monthly Charges|
+-------+------------------+------------------+------------------+
|  count|              7043|              7032|              7043|
|   mean| 32.37114865824223|2283.3004408418697|  64.7616924605991|
| stddev|24.559481023094488| 2266.771361883143|30.090047097678507|
|    min|                 0|              18.8|             18.25|
|    max|                72|            8684.8|            118.75|
+-------+------------------+------------------+------------------+



In [0]:
%sql
select gender, `Churn Label`, count(*) from churn_analysis
group by `Churn Label`, gender

gender,Churn Label,count(1)
Female,No,2549
Male,No,2625
Male,Yes,930
Female,Yes,939


In [0]:
%sql
select  `Churn Label`,`Senior Citizen`,  count(*) from churn_analysis
group by  `senior citizen`, `churn label`

Churn Label,Senior Citizen,count(1)
Yes,Yes,476
No,No,4508
No,Yes,666
Yes,No,1393


In [0]:
%sql
select `Tenure Months`, `churn label`, count(*) from churn_analysis
group by `Tenure Months`, `churn label`
order by `Tenure Months`  
-- here we can say that newer customers usually churn

Tenure Months,churn label,count(1)
0,No,11
1,No,233
1,Yes,380
2,No,115
2,Yes,123
3,No,106
3,Yes,94
4,No,93
4,Yes,83
5,No,69


In [0]:
df.stat.crosstab("Senior Citizen", "Internet Service").show()

+-------------------------------+----+-----------+----+
|Senior Citizen_Internet Service| DSL|Fiber optic|  No|
+-------------------------------+----+-----------+----+
|                             No|2162|       2265|1474|
|                            Yes| 259|        831|  52|
+-------------------------------+----+-----------+----+



In [0]:
df.stat.freqItems(['Internet Service', 'Dependents','Phone Service', 'Multiple Lines', 
                   'Online Security','Online Backup', 'Device Protection', 'Paperless Billing', 
                   'Paperless Billing' ]).collect()

Out[74]: [Row(Internet Service_freqItems=['No', 'Fiber optic', 'DSL'], Dependents_freqItems=['No', 'Yes'], Phone Service_freqItems=['No', 'Yes'], Multiple Lines_freqItems=['No', 'Yes', 'No phone service'], Online Security_freqItems=['No', 'No internet service', 'Yes'], Online Backup_freqItems=['No', 'No internet service', 'Yes'], Device Protection_freqItems=['No', 'No internet service', 'Yes'], Paperless Billing_freqItems=['No', 'Yes'], Paperless Billing_freqItems=['No', 'Yes'])]

In [0]:
%sql
select * from churn_analysis
limit 2

CustomerID,Country,State,Gender,Senior Citizen,Partner,Dependents,Tenure Months,Phone Service,Multiple Lines,Internet Service,Online Security,Online Backup,Device Protection,Tech Support,Streaming TV,Streaming Movies,Contract,Paperless Billing,Payment Method,Monthly Charges,Total Charges,Churn Label
3668-QPYBK,United States,California,Male,No,No,No,2,Yes,No,DSL,Yes,Yes,No,No,No,No,Month-to-month,Yes,Mailed check,53.85,108.15,Yes
9237-HQITU,United States,California,Female,No,No,Yes,2,Yes,No,Fiber optic,No,No,No,No,No,No,Month-to-month,Yes,Electronic check,70.7,151.65,Yes


In [0]:
%sql
select `Paperless Billing`, `Churn Label`, count(*) from churn_analysis
group by `Paperless Billing`, `Churn Label`   -- here we can analyze that paperless billing customers are mroe rone to churn

Paperless Billing,Churn Label,count(1)
Yes,Yes,1400
No,No,2403
Yes,No,2771
No,Yes,469


In [0]:
%sql 
select `Payment Method`, `Churn Label`, count(*) from churn_analysis
group by `Payment Method`, `Churn Label`

Payment Method,Churn Label,count(1)
Credit card (automatic),No,1290
Bank transfer (automatic),No,1286
Mailed check,Yes,308
Credit card (automatic),Yes,232
Electronic check,No,1294
Electronic check,Yes,1071
Bank transfer (automatic),Yes,258
Mailed check,No,1304


In [0]:
## Spliting the data into train test data sets
churn_df = df
(train_data, test_data) = churn_df.randomSplit([0.7, 0.3], 24)

print('No of records in train_data ', train_data.count())
print('No of records in test_data ', test_data.count())

No of records in train_data  4942
No of records in test_data  2101


In [0]:
# designing a pipline
from pyspark.ml import Pipeline
from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler
catColumns = ['Gender', 'Senior Citizen', 'Partner', 'Dependents','Phone Service', 'Multiple Lines', 'Internet Service', 'Online Security','Online Backup',
 'Device Protection', 'Tech Support', 'Streaming TV', 'Streaming Movies', 'Contract', 'Paperless Billing', 'Payment Method',]


In [0]:
## Creating pipeline satges for categorical variable encoding
stages = []
for catCol in catColumns:
    stringIndexer = StringIndexer(inputCol=catCol, outputCol=catCol+ "Index")
    encoder = OneHotEncoder(inputCols=[stringIndexer.getOutputCol()], outputCols=[catCol + "catVec"])
    stages += [stringIndexer, encoder]

In [0]:
# Creating the stages for missing value imputation for our data pipeline
from pyspark.ml.feature import Imputer
imputer = Imputer(inputCols= ['Total Charges'], outputCols=["Out_Total Charges"])
stages += [imputer]


In [0]:
## Output label Indexer for our data pipline
label_Idx = StringIndexer(inputCol="Churn Label", outputCol="label")
stages += [label_Idx]

In [0]:
# lets verify our transfromer
temp = label_Idx.fit(train_data).transform(train_data)
temp.show(1)

+----------+-------------+----------+------+--------------+-------+----------+-------------+-------------+--------------+----------------+---------------+-------------+-----------------+------------+------------+----------------+--------+-----------------+--------------+---------------+-------------+-----------+-----+
|CustomerID|      Country|     State|Gender|Senior Citizen|Partner|Dependents|Tenure Months|Phone Service|Multiple Lines|Internet Service|Online Security|Online Backup|Device Protection|Tech Support|Streaming TV|Streaming Movies|Contract|Paperless Billing|Payment Method|Monthly Charges|Total Charges|Churn Label|label|
+----------+-------------+----------+------+--------------+-------+----------+-------------+-------------+--------------+----------------+---------------+-------------+-----------------+------------+------------+----------------+--------+-----------------+--------------+---------------+-------------+-----------+-----+
|0002-ORFBO|United States|California|Fem

In [0]:
df.stat.corr('Total Charges', 'Monthly Charges')

Out[81]: 0.6511738315787847

In [0]:
%sql
select `Tenure Months`, `churn label`, count(*) as churned from churn_analysis
where `churn label` = 'Yes'
group by `Tenure Months`,  `churn label` order by `Tenure Months` limit 5
-- here we can notice that as tenure get higher # churner gets lower lets bin the data into the form of three

Tenure Months,churn label,churned
1,Yes,380
2,Yes,123
3,Yes,94
4,Yes,83
5,Yes,64


In [0]:
## converting tenure month column to the tenure bins
from pyspark.ml.feature import QuantileDiscretizer
tenure_bin = QuantileDiscretizer(numBuckets=10, inputCol='Tenure Months', outputCol= 'tenure_bin')
stages += [tenure_bin]

In [0]:
numericCols = ["tenure_bin","Out_Total Charges", "Monthly Charges" ]
assembleInputs = assembleInputs = [c + 'catVec' for c in catColumns] + numericCols
assembler = VectorAssembler(inputCols=assembleInputs, outputCol="features")
stages += [assembler]

In [0]:
assembleInputs

Out[84]: ['GendercatVec',
 'Senior CitizencatVec',
 'PartnercatVec',
 'DependentscatVec',
 'Phone ServicecatVec',
 'Multiple LinescatVec',
 'Internet ServicecatVec',
 'Online SecuritycatVec',
 'Online BackupcatVec',
 'Device ProtectioncatVec',
 'Tech SupportcatVec',
 'Streaming TVcatVec',
 'Streaming MoviescatVec',
 'ContractcatVec',
 'Paperless BillingcatVec',
 'Payment MethodcatVec',
 'tenure_bin',
 'Out_Total Charges',
 'Monthly Charges']

In [0]:
# creating a pipeline object and fit all the stages we have defined so far
pipeline = Pipeline().setStages(stages)
pipelineModel = pipeline.fit(train_data)

In [0]:
## preparing the training and test data through pipline
trainprepDF = pipelineModel.transform(train_data)
testprepDF = pipelineModel.transform(test_data)

In [0]:
trainprepDF.head(1)

Out[87]: [Row(CustomerID='0002-ORFBO', Country='United States', State='California', Gender='Female', Senior Citizen='No', Partner='Yes', Dependents='No', Tenure Months=9, Phone Service='Yes', Multiple Lines='No', Internet Service='DSL', Online Security='No', Online Backup='Yes', Device Protection='No', Tech Support='Yes', Streaming TV='Yes', Streaming Movies='No', Contract='One year', Paperless Billing='Yes', Payment Method='Mailed check', Monthly Charges=65.6, Total Charges=593.3, Churn Label='No', GenderIndex=1.0, GendercatVec=SparseVector(1, {}), Senior CitizenIndex=0.0, Senior CitizencatVec=SparseVector(1, {0: 1.0}), PartnerIndex=1.0, PartnercatVec=SparseVector(1, {}), DependentsIndex=0.0, DependentscatVec=SparseVector(1, {0: 1.0}), Phone ServiceIndex=0.0, Phone ServicecatVec=SparseVector(1, {0: 1.0}), Multiple LinesIndex=0.0, Multiple LinescatVec=SparseVector(2, {0: 1.0}), Internet ServiceIndex=1.0, Internet ServicecatVec=SparseVector(2, {1: 1.0}), Online SecurityIndex=0.0, Online

In [0]:
trainprepDF.select('tenure_bin').show(5)

+----------+
|tenure_bin|
+----------+
|       2.0|
|       2.0|
|       1.0|
|       3.0|
|       1.0|
+----------+
only showing top 5 rows



In [0]:
## Training the Logistic Regression model
from pyspark.ml.classification import LogisticRegression
lr = LogisticRegression(labelCol='label', featuresCol="features", maxIter=10)

# Train model with training data
lrModel = lr.fit(trainprepDF)

In [0]:
print('Coefficients' + str(lrModel.coefficients))
print('Intercept' + str(lrModel.intercept))

Coefficients[0.03564893846895171,-0.11946894826073622,-0.4030959936340864,1.63530504823171,-0.44974209031873336,-0.23175779092261348,0.07281621858123571,0.5141871351893668,-0.3026786611817885,0.2782366865117555,-0.054437225592115515,0.17178191887124916,0.0712053259609343,0.16075395517954785,0.08326581061346644,0.2714511776086176,-0.045373842548197915,-0.027966184463881487,0.275259011265917,0.003613356750256293,0.24144231927573376,0.7284676271797053,-0.5698608911378193,0.31912226729642484,0.41198221563794735,0.040633451701039075,0.08221931216400995,-0.29716338375587864,-5.27622879835916e-05,0.005619420364160385]
Intercept-2.498375484170447


In [0]:
summary = lrModel.summary

In [0]:
print("accuracy: ",summary.accuracy)
print("falsePositiveRate: ",summary.weightedFalsePositiveRate)
print("truePositiveRate: ",summary.weightedTruePositiveRate)
print("fMeasure: ", summary.weightedFMeasure())
print("precision: ",summary.weightedPrecision)
print("recall: ",summary.weightedRecall)
print('Area Under ROc', summary.areaUnderROC)

accuracy:  0.8174828004856334
falsePositiveRate:  0.33611980863877056
truePositiveRate:  0.8174828004856334
fMeasure:  0.8124473224831792
precision:  0.8105006339265874
recall:  0.8174828004856334
Area Under ROc 0.8681320791343837


In [0]:
display(lrModel, trainprepDF, "ROC")

False Positive Rate,True Positive Rate,Threshold
0.0,0.0,0.8698701428475674
0.0,0.0294117647058823,0.8698701428475674
0.0,0.0588235294117647,0.851632966081016
0.0,0.088235294117647,0.8390014800166914
0.0,0.1176470588235294,0.8310832004392166
0.0,0.1470588235294117,0.8135744889967783
0.0,0.1764705882352941,0.7917850320222334
0.0,0.2058823529411764,0.7914682049167419
0.0133333333333333,0.2058823529411764,0.7647569384065515
0.0133333333333333,0.2352941176470588,0.7508535837226671
