In [None]:
%pip install pyspark



In [2]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('App_Name').getOrCreate()
sc = spark.sparkContext

In [3]:
df = spark.read.format('csv').\
option('header', 'true').\
option('inferSchema', 'true').\
load('hepatitis.csv')

In [4]:
df.show(5)

+---+------+-------+----------+-------+-------+--------+---------+----------+---------------+-------+-------+-------+---------+-------------+----+-------+-------+---------+-----+
|AGE|   SEX|STEROID|ANTIVIRALS|FATIGUE|MALAISE|ANOREXIA|LIVER_BIG|LIVER_FIRM|SPLEEN_PALPABLE|SPIDERS|ASCITES|VARICES|BILIRUBIN|ALK_PHOSPHATE|SGOT|ALBUMIN|PROTIME|HISTOLOGY|Class|
+---+------+-------+----------+-------+-------+--------+---------+----------+---------------+-------+-------+-------+---------+-------------+----+-------+-------+---------+-----+
| 30|  male|     no|        no|     no|     no|      no|       no|        no|             no|     no|     no|     no|        1|           85|  18|      4|      ?|       no| LIVE|
| 50|female|     no|        no|    yes|     no|      no|       no|        no|             no|     no|     no|     no|      0.9|          135|  42|    3.5|      ?|       no| LIVE|
| 78|female|    yes|        no|    yes|     no|      no|      yes|        no|             no|     no|    

In [5]:
df.columns

['AGE',
 'SEX',
 'STEROID',
 'ANTIVIRALS',
 'FATIGUE',
 'MALAISE',
 'ANOREXIA',
 'LIVER_BIG',
 'LIVER_FIRM',
 'SPLEEN_PALPABLE',
 'SPIDERS',
 'ASCITES',
 'VARICES',
 'BILIRUBIN',
 'ALK_PHOSPHATE',
 'SGOT',
 'ALBUMIN',
 'PROTIME',
 'HISTOLOGY',
 'Class']

In [6]:
import warnings
warnings.filterwarnings('ignore')

from pyspark import SQLContext
sqlContext = SQLContext(spark)

In [7]:
sql = df.createOrReplaceTempView('hpt')

In [8]:
sqlContext.sql('Select * from hpt where AGE >=65').show()

+---+------+-------+----------+-------+-------+--------+---------+----------+---------------+-------+-------+-------+---------+-------------+----+-------+-------+---------+-----+
|AGE|   SEX|STEROID|ANTIVIRALS|FATIGUE|MALAISE|ANOREXIA|LIVER_BIG|LIVER_FIRM|SPLEEN_PALPABLE|SPIDERS|ASCITES|VARICES|BILIRUBIN|ALK_PHOSPHATE|SGOT|ALBUMIN|PROTIME|HISTOLOGY|Class|
+---+------+-------+----------+-------+-------+--------+---------+----------+---------------+-------+-------+-------+---------+-------------+----+-------+-------+---------+-----+
| 78|female|    yes|        no|    yes|     no|      no|      yes|        no|             no|     no|     no|     no|      0.7|           96|  32|      4|      ?|       no| LIVE|
| 66|female|    yes|        no|    yes|     no|      no|      yes|        no|             no|     no|     no|     no|      1.2|          102|  53|    4.3|      ?|       no| LIVE|
| 65|female|    yes|        no|    yes|    yes|      no|      yes|       yes|            yes|    yes|    

In [10]:
query = """
SELECT sex, class, count(*) as count
FROM hpt
GROUP BY class, sex
"""
sqlContext.sql(query).toPandas()

Unnamed: 0,sex,class,count
0,male,LIVE,16
1,female,LIVE,107
2,female,DIE,32


In [12]:
query = """
SELECT sex, min(age) as min_age, max(age) as max_age
FROM hpt
WHERE sex = 'female'
GROUP BY sex
"""
sqlContext.sql(query).toPandas()

Unnamed: 0,sex,min_age,max_age
0,female,7,78


In [13]:
spark_2 = SparkSession.builder.appName('Churn_Analysis')\
.config('spark.sql.debug.maxToStringFields', 1)\
.getOrCreate()

In [14]:
churn = spark_2.read.format('csv').\
option('header', 'true').\
option('inferSchema', 'true').\
load('churn.csv')

In [24]:
churn.toPandas().head(5)

Unnamed: 0,RowNumber,CustomerId,Surname,CreditScore,Geography,Gender,Age,Tenure,Balance,NumOfProducts,HasCrCard,IsActiveMember,EstimatedSalary,Exited
0,1,15634602,Hargrave,619,France,Female,42,2,0.0,1,1,1,101348.88,1
1,2,15647311,Hill,608,Spain,Female,41,1,83807.86,1,0,1,112542.58,0
2,3,15619304,Onio,502,France,Female,42,8,159660.8,3,1,0,113931.57,1
3,4,15701354,Boni,699,France,Female,39,1,0.0,2,0,0,93826.63,0
4,5,15737888,Mitchell,850,Spain,Female,43,2,125510.82,1,1,1,79084.1,0


In [25]:
churn.columns

['RowNumber',
 'CustomerId',
 'Surname',
 'CreditScore',
 'Geography',
 'Gender',
 'Age',
 'Tenure',
 'Balance',
 'NumOfProducts',
 'HasCrCard',
 'IsActiveMember',
 'EstimatedSalary',
 'Exited']

In [26]:
churn.printSchema()

root
 |-- RowNumber: integer (nullable = true)
 |-- CustomerId: integer (nullable = true)
 |-- Surname: string (nullable = true)
 |-- CreditScore: integer (nullable = true)
 |-- Geography: string (nullable = true)
 |-- Gender: string (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Tenure: integer (nullable = true)
 |-- Balance: double (nullable = true)
 |-- NumOfProducts: integer (nullable = true)
 |-- HasCrCard: integer (nullable = true)
 |-- IsActiveMember: integer (nullable = true)
 |-- EstimatedSalary: double (nullable = true)
 |-- Exited: integer (nullable = true)



In [27]:
churn.toPandas().info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 10000 entries, 0 to 9999
Data columns (total 14 columns):
 #   Column           Non-Null Count  Dtype  
---  ------           --------------  -----  
 0   RowNumber        10000 non-null  int32  
 1   CustomerId       10000 non-null  int32  
 2   Surname          10000 non-null  object 
 3   CreditScore      10000 non-null  int32  
 4   Geography        10000 non-null  object 
 5   Gender           10000 non-null  object 
 6   Age              10000 non-null  int32  
 7   Tenure           10000 non-null  int32  
 8   Balance          10000 non-null  float64
 9   NumOfProducts    10000 non-null  int32  
 10  HasCrCard        10000 non-null  int32  
 11  IsActiveMember   10000 non-null  int32  
 12  EstimatedSalary  10000 non-null  float64
 13  Exited           10000 non-null  int32  
dtypes: float64(2), int32(9), object(3)
memory usage: 742.3+ KB


In [33]:
churn.describe().toPandas()

Unnamed: 0,summary,RowNumber,CustomerId,Surname,CreditScore,Geography,Gender,Age,Tenure,Balance,NumOfProducts,HasCrCard,IsActiveMember,EstimatedSalary,Exited
0,count,10000.0,10000.0,10000,10000.0,10000,10000,10000.0,10000.0,10000.0,10000.0,10000.0,10000.0,10000.0,10000.0
1,mean,5000.5,15690940.5694,,650.5288,,,38.9218,5.0128,76485.88928799961,1.5302,0.7055,0.5151,100090.2398809998,0.2037
2,stddev,2886.8956799071675,71936.18612274907,,96.65329873613037,,,10.487806451704587,2.892174377049684,62397.40520238599,0.5816543579989917,0.4558404644751332,0.4997969284589181,57510.49281769821,0.4027685839948606
3,min,1.0,15565701.0,Abazu,350.0,France,Female,18.0,0.0,0.0,1.0,0.0,0.0,11.58,0.0
4,max,10000.0,15815690.0,Zuyeva,850.0,Spain,Male,92.0,10.0,250898.09,4.0,1.0,1.0,199992.48,1.0


In [35]:
churn.select('exited').distinct().show()

+------+
|exited|
+------+
|     1|
|     0|
+------+



In [36]:
churn.dtypes

[('RowNumber', 'int'),
 ('CustomerId', 'int'),
 ('Surname', 'string'),
 ('CreditScore', 'int'),
 ('Geography', 'string'),
 ('Gender', 'string'),
 ('Age', 'int'),
 ('Tenure', 'int'),
 ('Balance', 'double'),
 ('NumOfProducts', 'int'),
 ('HasCrCard', 'int'),
 ('IsActiveMember', 'int'),
 ('EstimatedSalary', 'double'),
 ('Exited', 'int')]

In [37]:
churn.groupby('exited').count().show()

+------+-----+
|exited|count|
+------+-----+
|     1| 2037|
|     0| 7963|
+------+-----+



In [38]:
churn = churn.drop('rowNumber', 'customerID', 'surname')

In [40]:
churn.show(5)

+-----------+---------+------+---+------+---------+-------------+---------+--------------+---------------+------+
|CreditScore|Geography|Gender|Age|Tenure|  Balance|NumOfProducts|HasCrCard|IsActiveMember|EstimatedSalary|Exited|
+-----------+---------+------+---+------+---------+-------------+---------+--------------+---------------+------+
|        619|   France|Female| 42|     2|      0.0|            1|        1|             1|      101348.88|     1|
|        608|    Spain|Female| 41|     1| 83807.86|            1|        0|             1|      112542.58|     0|
|        502|   France|Female| 42|     8| 159660.8|            3|        1|             0|      113931.57|     1|
|        699|   France|Female| 39|     1|      0.0|            2|        0|             0|       93826.63|     0|
|        850|    Spain|Female| 43|     2|125510.82|            1|        1|             1|        79084.1|     0|
+-----------+---------+------+---+------+---------+-------------+---------+-------------

In [45]:
from pyspark.sql.types import StructType, StructField, IntegerType, DoubleType, StringType

In [44]:
numlist = [field.name
           for field in churn.schema.fields
              if (isinstance(field.dataType, IntegerType)
              or isinstance(field.dataType, DoubleType))
              and field.name != 'Exited']

In [46]:
numlist

['CreditScore',
 'Age',
 'Tenure',
 'Balance',
 'NumOfProducts',
 'HasCrCard',
 'IsActiveMember',
 'EstimatedSalary']

In [47]:
catlist = [field.name
           for field in churn.schema.fields
              if isinstance(field.dataType, StringType)]

In [48]:
catlist

['Geography', 'Gender']

In [51]:
churn.select('Geography').toPandas().value_counts()

Unnamed: 0_level_0,count
Geography,Unnamed: 1_level_1
France,5014
Germany,2509
Spain,2477


In [52]:
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler

# Stringindexer is a function that concatenate string to each other in spark
stringindexer_stages = [StringIndexer(inputCol=c, outputCol='stringindexed_' + c) for c in catlist]

In [53]:
stringindexer_stages

[StringIndexer_73a72dade9d6, StringIndexer_6a698af4170c]

In [54]:
stringindexer_stages += [StringIndexer(inputCol='Exited', outputCol='label')]

In [55]:
stringindexer_stages

[StringIndexer_73a72dade9d6,
 StringIndexer_6a698af4170c,
 StringIndexer_f92f6fc38ea9]

In [56]:
onehotencoder_stages = [OneHotEncoder(inputCol='stringindexed_' + c, outputCol='onehot_' + c) for c in catlist]

In [57]:
onehotencoder_stages

[OneHotEncoder_4e7153a54916, OneHotEncoder_09167f521a5c]

In [58]:
feature_columns = numlist + ['onehot_' + c for c in catlist]

In [59]:
feature_columns

['CreditScore',
 'Age',
 'Tenure',
 'Balance',
 'NumOfProducts',
 'HasCrCard',
 'IsActiveMember',
 'EstimatedSalary',
 'onehot_Geography',
 'onehot_Gender']

In [60]:
vectorassembler_stage = VectorAssembler(inputCols=feature_columns, outputCol='features')

In [61]:
all_stages = stringindexer_stages + onehotencoder_stages + [vectorassembler_stage]

In [62]:
all_stages

[StringIndexer_73a72dade9d6,
 StringIndexer_6a698af4170c,
 StringIndexer_f92f6fc38ea9,
 OneHotEncoder_4e7153a54916,
 OneHotEncoder_09167f521a5c,
 VectorAssembler_69a4bf4119f7]

In [63]:
from pyspark.ml import Pipeline

pipeline = Pipeline(stages=all_stages)

In [64]:
pipeline_model = pipeline.fit(churn)

In [65]:
final_columns = feature_columns + ['features', 'label']

In [66]:
final_columns

['CreditScore',
 'Age',
 'Tenure',
 'Balance',
 'NumOfProducts',
 'HasCrCard',
 'IsActiveMember',
 'EstimatedSalary',
 'onehot_Geography',
 'onehot_Gender',
 'features',
 'label']

In [67]:
churn_df = pipeline_model.transform(churn).select(final_columns)

In [71]:
churn.show(5)

+-----------+---------+------+---+------+---------+-------------+---------+--------------+---------------+------+
|CreditScore|Geography|Gender|Age|Tenure|  Balance|NumOfProducts|HasCrCard|IsActiveMember|EstimatedSalary|Exited|
+-----------+---------+------+---+------+---------+-------------+---------+--------------+---------------+------+
|        619|   France|Female| 42|     2|      0.0|            1|        1|             1|      101348.88|     1|
|        608|    Spain|Female| 41|     1| 83807.86|            1|        0|             1|      112542.58|     0|
|        502|   France|Female| 42|     8| 159660.8|            3|        1|             0|      113931.57|     1|
|        699|   France|Female| 39|     1|      0.0|            2|        0|             0|       93826.63|     0|
|        850|    Spain|Female| 43|     2|125510.82|            1|        1|             1|        79084.1|     0|
+-----------+---------+------+---+------+---------+-------------+---------+-------------

In [70]:
churn_df.show(5)

+-----------+---+------+---------+-------------+---------+--------------+---------------+----------------+-------------+--------------------+-----+
|CreditScore|Age|Tenure|  Balance|NumOfProducts|HasCrCard|IsActiveMember|EstimatedSalary|onehot_Geography|onehot_Gender|            features|label|
+-----------+---+------+---------+-------------+---------+--------------+---------------+----------------+-------------+--------------------+-----+
|        619| 42|     2|      0.0|            1|        1|             1|      101348.88|   (2,[0],[1.0])|    (1,[],[])|[619.0,42.0,2.0,0...|  1.0|
|        608| 41|     1| 83807.86|            1|        0|             1|      112542.58|       (2,[],[])|    (1,[],[])|[608.0,41.0,1.0,8...|  0.0|
|        502| 42|     8| 159660.8|            3|        1|             0|      113931.57|   (2,[0],[1.0])|    (1,[],[])|[502.0,42.0,8.0,1...|  1.0|
|        699| 39|     1|      0.0|            2|        0|             0|       93826.63|   (2,[0],[1.0])|    (1

In [72]:
train, test = churn_df.randomSplit([0.8, 0.2], seed=42)

In [73]:
train.show(3)

+-----------+---+------+---------+-------------+---------+--------------+---------------+----------------+-------------+--------------------+-----+
|CreditScore|Age|Tenure|  Balance|NumOfProducts|HasCrCard|IsActiveMember|EstimatedSalary|onehot_Geography|onehot_Gender|            features|label|
+-----------+---+------+---------+-------------+---------+--------------+---------------+----------------+-------------+--------------------+-----+
|        350| 39|     0| 109733.2|            2|        0|             0|      123602.11|   (2,[1],[1.0])|(1,[0],[1.0])|[350.0,39.0,0.0,1...|  1.0|
|        350| 40|     0|111098.85|            1|        1|             1|      172321.21|   (2,[0],[1.0])|    (1,[],[])|[350.0,40.0,0.0,1...|  1.0|
|        350| 54|     1|152677.48|            1|        1|             1|      191973.49|       (2,[],[])|(1,[0],[1.0])|[350.0,54.0,1.0,1...|  1.0|
+-----------+---+------+---------+-------------+---------+--------------+---------------+----------------+------

In [74]:
test.show(3)

+-----------+---+------+---------+-------------+---------+--------------+---------------+----------------+-------------+--------------------+-----+
|CreditScore|Age|Tenure|  Balance|NumOfProducts|HasCrCard|IsActiveMember|EstimatedSalary|onehot_Geography|onehot_Gender|            features|label|
+-----------+---+------+---------+-------------+---------+--------------+---------------+----------------+-------------+--------------------+-----+
|        350| 51|    10|      0.0|            1|        1|             1|      125823.79|   (2,[0],[1.0])|(1,[0],[1.0])|[350.0,51.0,10.0,...|  1.0|
|        358| 52|     8|143542.36|            3|        1|             0|      141959.11|       (2,[],[])|    (1,[],[])|[358.0,52.0,8.0,1...|  1.0|
|        363| 28|     6|146098.43|            3|        1|             0|      100615.14|       (2,[],[])|    (1,[],[])|[363.0,28.0,6.0,1...|  1.0|
+-----------+---+------+---------+-------------+---------+--------------+---------------+----------------+------

In [76]:
from pyspark.ml.classification import LogisticRegression, RandomForestClassifier

log_model = LogisticRegression(featuresCol='features', labelCol='label').fit(train)

In [77]:
y_pred = log_model.transform(test)

In [78]:
y_pred.show(5)

+-----------+---+------+---------+-------------+---------+--------------+---------------+----------------+-------------+--------------------+-----+--------------------+--------------------+----------+
|CreditScore|Age|Tenure|  Balance|NumOfProducts|HasCrCard|IsActiveMember|EstimatedSalary|onehot_Geography|onehot_Gender|            features|label|       rawPrediction|         probability|prediction|
+-----------+---+------+---------+-------------+---------+--------------+---------------+----------------+-------------+--------------------+-----+--------------------+--------------------+----------+
|        350| 51|    10|      0.0|            1|        1|             1|      125823.79|   (2,[0],[1.0])|(1,[0],[1.0])|[350.0,51.0,10.0,...|  1.0|[1.70440748558649...|[0.84610950159848...|       0.0|
|        358| 52|     8|143542.36|            3|        1|             0|      141959.11|       (2,[],[])|    (1,[],[])|[358.0,52.0,8.0,1...|  1.0|[-0.0963633845269...|[0.47592777867642...|       

In [81]:
y_pred.select('label', 'prediction').toPandas()

Unnamed: 0,label,prediction
0,1.0,0.0
1,1.0,1.0
2,1.0,0.0
3,1.0,0.0
4,1.0,1.0
...,...,...
1916,1.0,0.0
1917,1.0,0.0
1918,1.0,1.0
1919,0.0,0.0


In [82]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator

evaluatorMulti = MulticlassClassificationEvaluator(labelCol='label', predictionCol='prediction')

In [83]:
acc = evaluatorMulti.evaluate(y_pred, {evaluatorMulti.metricName: 'accuracy'})

In [84]:
acc

0.8110359187922956

In [85]:
w_precision = evaluatorMulti.evaluate(y_pred, {evaluatorMulti.metricName: 'weightedPrecision'})
w_recall = evaluatorMulti.evaluate(y_pred, {evaluatorMulti.metricName: 'weightedRecall'})
f1 = evaluatorMulti.evaluate(y_pred, {evaluatorMulti.metricName: 'f1'})

print('Accuracy: ', acc)
print('Weighted Precision: ', w_precision)
print('Weighted Recall: ', w_recall)
print('F1 Score: ', f1)

Accuracy:  0.8110359187922956
Weighted Precision:  0.7749640981476236
Weighted Recall:  0.8110359187922956
F1 Score:  0.7790803182942297
