In [6]:
from google.colab import drive
drive.mount('/content/d')

Mounted at /content/d


In [1]:
# build & inititate the environment
!apt-get update
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget http://mirror.klaus-uwe.me/apache/spark/spark-2.4.7/spark-2.4.7-bin-hadoop2.7.tgz
!tar xf spark-2.4.7-bin-hadoop2.7.tgz
!pip install -q findspark
!update-alternatives --set java /usr/lib/jvm/java-8-openjdk-amd64/jre/bin/java
!java -version

!pip install sparkdl
!pip install tensorframes
!pip install kafka-python
!pip install tensorflowonspark

import os
import findspark
import itertools

import numpy as np
import pandas as pd

from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import train_test_split
from sklearn.utils import resample
from sklearn.ensemble import RandomForestClassifier

os.environ['JAVA_HOME'] = '/usr/lib/jvm/java-8-openjdk-amd64'
os.environ['SPARK_HOME'] = 'spark-2.4.7-bin-hadoop2.7'

findspark.init()

from pyspark.sql import SparkSession
from pyspark.sql.types import IntegerType, StringType
from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorIndexer, VectorAssembler, StringIndexer, OneHotEncoderEstimator, MinMaxScaler
from pyspark.ml.classification import RandomForestClassifier, GBTClassifier, MultilayerPerceptronClassifier
from pyspark.sql.types import StringType
from pyspark.sql.functions import col, when, lit

spark = SparkSession.builder.master('local[*]').getOrCreate()

0% [Working]            Ign:1 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  InRelease
0% [Waiting for headers] [Waiting for headers] [Waiting for headers] [Waiting f                                                                               Get:2 https://cloud.r-project.org/bin/linux/ubuntu bionic-cran40/ InRelease [3,626 B]
0% [Waiting for headers] [Waiting for headers] [2 InRelease 0 B/3,626 B 0%] [Wa0% [Waiting for headers] [Waiting for headers] [Waiting for headers] [Waiting f0% [2 InRelease gpgv 3,626 B] [Waiting for headers] [Waiting for headers] [Wait                                                                               Get:3 http://security.ubuntu.com/ubuntu bionic-security InRelease [88.7 kB]
                                                                               Get:4 http://ppa.launchpad.net/c2d4u.team/c2d4u4.0+/ubuntu bionic InRelease [15.9 kB]
0% [2 InRelease gpgv 3,626 B] [Waiting for headers] [3 InRelease 14.2

In [7]:
# downlaod kaggle api & data 
!pip install -q kaggle
!mkdir -p ~/.kaggle
!cp 'd/MyDrive/kaggle/kaggle.json' ~/.kaggle/
!kaggle datasets download -d becksddf/churn-in-telecoms-dataset
!unzip churn-in-telecoms-dataset.zip
!ls

Downloading churn-in-telecoms-dataset.zip to /content
  0% 0.00/116k [00:00<?, ?B/s]
100% 116k/116k [00:00<00:00, 41.1MB/s]
Archive:  churn-in-telecoms-dataset.zip
  inflating: bigml_59c28831336c6604c800002a.csv  
bigml_59c28831336c6604c800002a.csv  d		 spark-2.4.7-bin-hadoop2.7
churn-in-telecoms-dataset.zip	    sample_data  spark-2.4.7-bin-hadoop2.7.tgz


In [8]:
# read data to spark dataframe
df = spark.read.csv(
    'bigml_59c28831336c6604c800002a.csv',
        inferSchema=True,
    header=True
)
df.show()

+-----+--------------+---------+------------+------------------+---------------+---------------------+-----------------+---------------+----------------+-----------------+---------------+----------------+-------------------+-----------------+------------------+------------------+----------------+-----------------+----------------------+-----+
|state|account length|area code|phone number|international plan|voice mail plan|number vmail messages|total day minutes|total day calls|total day charge|total eve minutes|total eve calls|total eve charge|total night minutes|total night calls|total night charge|total intl minutes|total intl calls|total intl charge|customer service calls|churn|
+-----+--------------+---------+------------+------------------+---------------+---------------------+-----------------+---------------+----------------+-----------------+---------------+----------------+-------------------+-----------------+------------------+------------------+----------------+-------------

In [9]:
# drop unused columns
df = df.drop('phone number')
df.show()

+-----+--------------+---------+------------------+---------------+---------------------+-----------------+---------------+----------------+-----------------+---------------+----------------+-------------------+-----------------+------------------+------------------+----------------+-----------------+----------------------+-----+
|state|account length|area code|international plan|voice mail plan|number vmail messages|total day minutes|total day calls|total day charge|total eve minutes|total eve calls|total eve charge|total night minutes|total night calls|total night charge|total intl minutes|total intl calls|total intl charge|customer service calls|churn|
+-----+--------------+---------+------------------+---------------+---------------------+-----------------+---------------+----------------+-----------------+---------------+----------------+-------------------+-----------------+------------------+------------------+----------------+-----------------+----------------------+-----+
|   

In [10]:
# encoding binary features
df = df.withColumn(
    'international plan', when(df['international plan'] == 'yes', lit(1)).otherwise(lit(0))
)
df = df.withColumn(
    'voice mail plan', when(df['voice mail plan'] == 'yes', lit(1)).otherwise(lit(0))
)
df = df.withColumn(
    'churn', when(df['churn'] == 'true', lit(1)).otherwise(lit(0))
)
df.show()

+-----+--------------+---------+------------------+---------------+---------------------+-----------------+---------------+----------------+-----------------+---------------+----------------+-------------------+-----------------+------------------+------------------+----------------+-----------------+----------------------+-----+
|state|account length|area code|international plan|voice mail plan|number vmail messages|total day minutes|total day calls|total day charge|total eve minutes|total eve calls|total eve charge|total night minutes|total night calls|total night charge|total intl minutes|total intl calls|total intl charge|customer service calls|churn|
+-----+--------------+---------+------------------+---------------+---------------------+-----------------+---------------+----------------+-----------------+---------------+----------------+-------------------+-----------------+------------------+------------------+----------------+-----------------+----------------------+-----+
|   

In [11]:
# encoding categorical features
print(df.groupBy('state').count().count())

indexers = [StringIndexer(inputCol=column, outputCol=column+"_NUMERIC").fit(df) for column in ['state']]

pipeline = Pipeline(stages=indexers)
df = pipeline.fit(df).transform(df)
df = df.drop('state')
df.show()

51
+--------------+---------+------------------+---------------+---------------------+-----------------+---------------+----------------+-----------------+---------------+----------------+-------------------+-----------------+------------------+------------------+----------------+-----------------+----------------------+-----+-------------+
|account length|area code|international plan|voice mail plan|number vmail messages|total day minutes|total day calls|total day charge|total eve minutes|total eve calls|total eve charge|total night minutes|total night calls|total night charge|total intl minutes|total intl calls|total intl charge|customer service calls|churn|state_NUMERIC|
+--------------+---------+------------------+---------------+---------------------+-----------------+---------------+----------------+-----------------+---------------+----------------+-------------------+-----------------+------------------+------------------+----------------+-----------------+---------------------

In [12]:
# scaling features
columnsToScale = [clm for clm in df.columns if clm != 'churn']
# first convert to vectors
assemblers = [VectorAssembler(
    inputCols=[col], outputCol=col + '_vec'
    ) for col in columnsToScale
]
# second scale
scalers = [MinMaxScaler(
    inputCol=col + '_vec', outputCol = col + '_scaled'
    ) for col in columnsToScale
]
# perform through pipeline
pipeline = Pipeline(stages = assemblers + scalers)
scalerModel = pipeline.fit(df)
scaledData = scalerModel.transform(df)
## drop initital columns and vectors
scaledData = scaledData.select(
    [clm for clm in scaledData.columns if clm not in columnsToScale]
)
scaledData = scaledData.select(
    [clm for clm in scaledData.columns if clm not in [x + '_vec' for x in columnsToScale]]
)

scaledData.show()

+-----+---------------------+--------------------+-------------------------+----------------------+----------------------------+------------------------+----------------------+-----------------------+------------------------+----------------------+-----------------------+--------------------------+------------------------+-------------------------+-------------------------+-----------------------+------------------------+-----------------------------+--------------------+
|churn|account length_scaled|    area code_scaled|international plan_scaled|voice mail plan_scaled|number vmail messages_scaled|total day minutes_scaled|total day calls_scaled|total day charge_scaled|total eve minutes_scaled|total eve calls_scaled|total eve charge_scaled|total night minutes_scaled|total night calls_scaled|total night charge_scaled|total intl minutes_scaled|total intl calls_scaled|total intl charge_scaled|customer service calls_scaled|state_NUMERIC_scaled|
+-----+---------------------+-----------------

In [13]:
# join all variables in a single column
assembler = VectorAssembler(
    inputCols = [c for c in scaledData.columns if c != 'churn'],
    outputCol = 'attributes'
)
output = assembler.transform(scaledData)

# collecting final dataframe for the model fit
finalData = output.select('attributes', 'churn')
finalData.show()

+--------------------+-----+
|          attributes|churn|
+--------------------+-----+
|[0.52479338842975...|    0|
|[0.43801652892561...|    0|
|[0.56198347107438...|    0|
|[0.34297520661157...|    0|
|[0.30578512396694...|    0|
|[0.48347107438016...|    0|
|[0.49586776859504...|    0|
|[0.60330578512396...|    0|
|[0.47933884297520...|    0|
|[0.57851239669421...|    0|
|[0.26446280991735...|    1|
|[0.30165289256198...|    0|
|[0.69008264462809...|    0|
|[0.38842975206611...|    0|
|[0.25206611570247...|    0|
|[0.66115702479338...|    1|
|[0.34710743801652...|    0|
|[0.38016528925619...|    0|
|[0.30991735537190...|    0|
|[0.29752066115702...|    0|
+--------------------+-----+
only showing top 20 rows



In [14]:
trainData, testData = finalData.randomSplit([.8, .2]) # train test split
trainData.show()

+--------------------+-----+
|          attributes|churn|
+--------------------+-----+
|(19,[0,5,6,7,8,9,...|    0|
|[0.0,0.0,0.0,0.0,...|    0|
|[0.0,0.0,0.0,1.0,...|    0|
|[0.0,0.0686274509...|    0|
|[0.0,0.0686274509...|    1|
|[0.0,0.0686274509...|    0|
|[0.0,0.0686274509...|    0|
|[0.0,0.0686274509...|    0|
|[0.00826446280991...|    0|
|[0.00826446280991...|    0|
|[0.00826446280991...|    0|
|[0.00826446280991...|    0|
|[0.00826446280991...|    0|
|[0.01239669421487...|    0|
|[0.01652892561983...|    0|
|[0.02066115702479...|    0|
|[0.02479338842975...|    0|
|[0.02479338842975...|    0|
|[0.02892561983471...|    0|
|[0.03305785123966...|    0|
+--------------------+-----+
only showing top 20 rows



In [15]:
# random forest classifier
rf = RandomForestClassifier(featuresCol='attributes', labelCol ='churn', numTrees=10)
modelRf = rf.fit(trainData)
predRf = modelRf.transform(testData)

accuracyRf = predRf.withColumn(
    'accuracyRf', (predRf['churn'] - predRf['prediction'])
).groupBy().sum().collect()[0][0] / predRf.count()

print(accuracyRf)

0.14367816091954022


In [16]:
# gradient boosted tree classifier
gbt = GBTClassifier(featuresCol='attributes', labelCol ='churn', maxIter=10)
modelGbt = gbt.fit(trainData)
predGbt = modelGbt.transform(testData) 

accuracyGbt = predGbt.withColumn(
    'accuracyGbt', (predGbt['churn'] - predGbt['prediction'])
).groupBy().sum().collect()[0][0] / predGbt.count()

print(accuracyGbt)

0.14367816091954022


In [17]:
# reindexing columns for the MPC model
trainData = trainData.selectExpr('attributes as features', 'churn as label')
testData = testData.selectExpr('attributes as features', 'churn as label')
print(trainData.columns, testData.columns)

# multilayer perceptron classifier
mpc = MultilayerPerceptronClassifier(
    maxIter=100,
    layers=[19, 5, 4, 2],
    blockSize=128,
    seed=42
)
modelMpc = mpc.fit(trainData)
predMpc = modelMpc.transform(testData)

accuracyMpc = predMpc.withColumn(
    'accuracyGbt', (predMpc['label'] - predMpc['prediction'])
).groupBy().sum().collect()[0][0] / predMpc.count()

print(accuracyMpc)

['features', 'label'] ['features', 'label']
0.14367816091954022
