In [2]:
'''# innstall java
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

# install spark (change the version number if needed)
!wget -q https://archive.apache.org/dist/spark/spark-3.0.0/spark-3.0.0-bin-hadoop3.2.tgz

# unzip the spark file to the current folder
!tar xf spark-3.0.0-bin-hadoop3.2.tgz

# set your spark folder to your system path environment. 
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.0.0-bin-hadoop3.2"


# install findspark using pip
!pip install -q findspark'''

In [1]:
import findspark
findspark.init()

In [2]:
import pyspark # only run after findspark.init()
from pyspark.sql import SparkSession



# start a spark session 
spark = SparkSession.builder.master("local[*]").config("spark.driver.memory", "7g").getOrCreate()

# load data with inferred schema 
df = spark.read.options(header='True', inferSchema='True', delimiter=',') \
  .csv("Complete_Data_BDT.csv")
#df = spark.read.load("C:/Users/Admin/Downloads/Complete_Data_BDT.csv", format="csv", inferSchema="true", header="true")

# The inferred schema can be seen using .printSchema().
df.printSchema()

root
 |-- MONTANT: double (nullable = true)
 |-- FREQUENCE_RECH: double (nullable = true)
 |-- ARPU_SEGMENT: double (nullable = true)
 |-- FREQUENCE: double (nullable = true)
 |-- DATA_VOLUME: double (nullable = true)
 |-- ON_NET: double (nullable = true)
 |-- ORANGE: double (nullable = true)
 |-- TIGO: double (nullable = true)
 |-- REGULARITY: double (nullable = true)
 |-- FREQ_TOP_PACK: double (nullable = true)
 |-- CHURN: integer (nullable = true)
 |-- REGION_DIOURBEL: integer (nullable = true)
 |-- REGION_FATICK: integer (nullable = true)
 |-- REGION_KAFFRINE: integer (nullable = true)
 |-- REGION_KAOLACK: integer (nullable = true)
 |-- REGION_KEDOUGOU: integer (nullable = true)
 |-- REGION_KOLDA: integer (nullable = true)
 |-- REGION_LOUGA: integer (nullable = true)
 |-- REGION_MATAM: integer (nullable = true)
 |-- REGION_Other: integer (nullable = true)
 |-- REGION_SAINT-LOUIS: integer (nullable = true)
 |-- REGION_SEDHIOU: integer (nullable = true)
 |-- REGION_TAMBACOUNDA: integ

In [3]:
import pandas as pd

#Converting spark df into pandas just for using SMOTE
df = df.toPandas()

#splitting X and Y sets and the test train data
y = df.CHURN
x = df.drop(columns=['CHURN'])

In [4]:
#splitting test and train sets

from sklearn.model_selection import train_test_split
x_train, x_test, y_train, y_test = train_test_split(x, y, test_size = 0.2, random_state = 1)

Upsample is done only for train dataset and test should be untouched

In [5]:
from imblearn.over_sampling import SMOTE
os = SMOTE(random_state=0)
#columns = X_train.columns
os_data_x,os_data_y=os.fit_resample(x_train, y_train)
x_train = pd.DataFrame(data=os_data_x, )
y_train = pd.DataFrame(data=os_data_y,columns=['CHURN'])
# we can Check the numbers of our data
print("length of oversampled data is ",len(x_train))
print("Number of no subscription in oversampled data",len(y_train[y_train['CHURN']==0]))
print("Number of subscription",len(y_train[y_train['CHURN']==1]))
print("Proportion of no subscription data in oversampled data is ",len(y_train[y_train['CHURN']==0])/len(x_train))
print("Proportion of subscription data in oversampled data is ",len(y_train[y_train['CHURN']==1])/len(x_train))



length of oversampled data is  837618
Number of no subscription in oversampled data 418809
Number of subscription 418809
Proportion of no subscription data in oversampled data is  0.5
Proportion of subscription data in oversampled data is  0.5


In [6]:
print("length of test data is ",len(x_test))

length of test data is  107246


Now the pandas df is converted back to spark df for further modelling

In [7]:
combined_train_dataset = pd.concat([x_train,y_train], axis=1, join='inner')
combined_train_dataset.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 837618 entries, 0 to 837617
Columns: 138 entries, MONTANT to CHURN
dtypes: float64(10), int32(128)
memory usage: 472.9 MB


In [8]:
combined_test_dataset = pd.concat([x_test,y_test], axis=1, join='inner')
combined_test_dataset.info()

<class 'pandas.core.frame.DataFrame'>
Int64Index: 107246 entries, 236859 to 110351
Columns: 138 entries, MONTANT to CHURN
dtypes: float64(10), int32(128)
memory usage: 61.4 MB


In [9]:
# Enable Arrow-based columnar data transfers
spark.conf.set("spark.sql.execution.arrow.enabled", "true")
#converting to spark df
train_dataset = spark.createDataFrame(combined_train_dataset)
test_dataset = spark.createDataFrame(combined_test_dataset)

  PyArrow >= 1.0.0 must be installed; however, it was not found.
Attempting non-optimization as 'spark.sql.execution.arrow.pyspark.fallback.enabled' is set to true.


Now createing feature vector using vectorAssemebler

In [10]:
#vector assembler threw error since one column header contains '.' . removing column headers before passing it to the function
tempList = [] #Edit01
for col in train_dataset.columns:
  new_name = col.strip()
  #new_name = "".join(new_name.split())
  new_name = new_name.replace('.','point') # EDIT
  tempList.append(new_name) #Edit02
#print(tempList) #Just for the sake of it #Edit03

train_dataset = train_dataset.toDF(*tempList)
test_dataset = test_dataset.toDF(*tempList)

In [11]:
from pyspark.ml.feature import VectorAssembler
inputCols = train_dataset.drop('CHURN').columns
outputCol = "features"
assembler = VectorAssembler(inputCols = inputCols, outputCol = outputCol)
df_va_train = assembler.transform(train_dataset)

df_va_test = assembler.transform(test_dataset)

Decision Tree Classifier

In [13]:
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml import Pipeline
import time

start_time = time.time()

dt = DecisionTreeClassifier(labelCol="CHURN", featuresCol="features", maxDepth=25, minInstancesPerNode=30, impurity="gini")
pipeline = Pipeline(stages=[dt])
model = pipeline.fit(df_va_train)

print("Time Taken for Decision Tree Classifier : " + str(time.time() - start_time))

Time Taken for Decision Tree Classifier : 46.843886852264404


In [51]:
y_pred = model.transform(df_va_test)

In [60]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

evaluatorMulti = MulticlassClassificationEvaluator(labelCol="CHURN", predictionCol="prediction")
acc = evaluatorMulti.evaluate(y_pred, {evaluatorMulti.metricName: "accuracy"})
precision = evaluatorMulti.evaluate(y_pred, {evaluatorMulti.metricName: "precisionByLabel"})
recall = evaluatorMulti.evaluate(y_pred, {evaluatorMulti.metricName: "recallByLabel"})
f1 = evaluatorMulti.evaluate(y_pred, {evaluatorMulti.metricName: "f1"})
roc_auc = evaluator.evaluate(y_pred)


In [62]:
print("accuracy: %f, precision: %f, recall: %f, f1: %f, roc_auc: %f" % (acc, precision, recall, f1, roc_auc))

accuracy: 0.906803, precision: 0.989045, recall: 0.914606, f1: 0.932991, roc_auc: 0.605211


Gradient Boosting classification

In [64]:
from pyspark.ml.classification import GBTClassifier
# define the classifier 
gbt = GBTClassifier(labelCol="CHURN", featuresCol="features")

# train the classifier with train data 
GBT = gbt.fit(df_va_train)

# predict test data 
y_pred_GBT = GBT.transform(df_va_test)

In [65]:
#from pyspark.ml.evaluation import MulticlassClassificationEvaluator

evaluatorMulti = MulticlassClassificationEvaluator(labelCol="CHURN", predictionCol="prediction")
acc = evaluatorMulti.evaluate(y_pred_GBT, {evaluatorMulti.metricName: "accuracy"})
precision = evaluatorMulti.evaluate(y_pred_GBT, {evaluatorMulti.metricName: "precisionByLabel"})
recall = evaluatorMulti.evaluate(y_pred_GBT, {evaluatorMulti.metricName: "recallByLabel"})
f1 = evaluatorMulti.evaluate(y_pred_GBT, {evaluatorMulti.metricName: "f1"})
roc_auc = evaluator.evaluate(y_pred_GBT)

In [66]:
print("accuracy: %f, precision: %f, recall: %f, f1: %f, roc_auc: %f" % (acc, precision, recall, f1, roc_auc))

accuracy: 0.878858, precision: 0.995437, recall: 0.879866, f1: 0.917477, roc_auc: 0.930415


Random Forest Classification

In [68]:
from pyspark.ml.classification import RandomForestClassifier
rf = RandomForestClassifier(labelCol="CHURN",featuresCol="features").setImpurity("gini").setMaxDepth(6).setSeed(90)

RFC = rf.fit(df_va_train)

y_pred_RF = RFC.transform(df_va_test)

In [70]:
#from pyspark.ml.evaluation import MulticlassClassificationEvaluator

evaluatorMulti = MulticlassClassificationEvaluator(labelCol="CHURN", predictionCol="prediction")
acc = evaluatorMulti.evaluate(y_pred_RF, {evaluatorMulti.metricName: "accuracy"})
precision = evaluatorMulti.evaluate(y_pred_RF, {evaluatorMulti.metricName: "precisionByLabel"})
recall = evaluatorMulti.evaluate(y_pred_RF, {evaluatorMulti.metricName: "recallByLabel"})
f1 = evaluatorMulti.evaluate(y_pred_RF, {evaluatorMulti.metricName: "f1"})
roc_auc = evaluator.evaluate(y_pred_RF)

In [71]:
print("accuracy: %f, precision: %f, recall: %f, f1: %f, roc_auc: %f" % (acc, precision, recall, f1, roc_auc))

accuracy: 0.859575, precision: 0.996849, recall: 0.858783, f1: 0.905955, roc_auc: 0.929747


Logistic Regression

In [72]:
from pyspark.ml.classification import GBTClassifier, LogisticRegression

# define the classifier 
log_model = LogisticRegression(labelCol="CHURN", featuresCol="features", maxIter=1000)

# train the classifier with train data 
LG = log_model.fit(df_va_train)

# predict test data 
y_pred_LR = LG.transform(df_va_test)

In [73]:
#from pyspark.ml.evaluation import MulticlassClassificationEvaluator

evaluatorMulti = MulticlassClassificationEvaluator(labelCol="CHURN", predictionCol="prediction")
acc = evaluatorMulti.evaluate(y_pred_LR, {evaluatorMulti.metricName: "accuracy"})
precision = evaluatorMulti.evaluate(y_pred_LR, {evaluatorMulti.metricName: "precisionByLabel"})
recall = evaluatorMulti.evaluate(y_pred_LR, {evaluatorMulti.metricName: "recallByLabel"})
f1 = evaluatorMulti.evaluate(y_pred_LR, {evaluatorMulti.metricName: "f1"})
roc_auc = evaluator.evaluate(y_pred_LR)

In [74]:
print("accuracy: %f, precision: %f, recall: %f, f1: %f, roc_auc: %f" % (acc, precision, recall, f1, roc_auc))

accuracy: 0.871053, precision: 0.996403, recall: 0.870978, f1: 0.912881, roc_auc: 0.934114
