# Logistic regression in Spark

Sushant N. More

Ref: Udemy course -- Spark and Python for Big data

In [2]:
import findspark

In [3]:
findspark.init('/home/sushant/spark-2.1.0-bin-hadoop2.7')

In [4]:
from pyspark.sql import SparkSession

In [5]:
spark = SparkSession.builder.appName('logReg').getOrCreate()

In [6]:
from pyspark.ml.classification import LogisticRegression

In [7]:
data = spark.read.format('libsvm').load('./Python-and-Spark-for-Big-Data-master/Spark_for_Machine_Learning/Logistic_Regression/sample_libsvm_data.txt')

In [8]:
data.show(5)

+-----+--------------------+
|label|            features|
+-----+--------------------+
|  0.0|(692,[127,128,129...|
|  1.0|(692,[158,159,160...|
|  1.0|(692,[124,125,126...|
|  1.0|(692,[152,153,154...|
|  1.0|(692,[151,152,153...|
+-----+--------------------+
only showing top 5 rows



In [9]:
logRegModel = LogisticRegression()

In [10]:
fittedLogRegModel = logRegModel.fit(data)

In [11]:
logRegModelSummary = fittedLogRegModel.summary

In [12]:
logRegModelSummary.predictions.show(10)

+-----+--------------------+--------------------+--------------------+----------+
|label|            features|       rawPrediction|         probability|prediction|
+-----+--------------------+--------------------+--------------------+----------+
|  0.0|(692,[127,128,129...|[19.8534775947478...|[0.99999999761359...|       0.0|
|  1.0|(692,[158,159,160...|[-20.377398194908...|[1.41321555111056...|       1.0|
|  1.0|(692,[124,125,126...|[-27.401459284891...|[1.25804865126979...|       1.0|
|  1.0|(692,[152,153,154...|[-18.862741612668...|[6.42710509170303...|       1.0|
|  1.0|(692,[151,152,153...|[-20.483011833009...|[1.27157209200604...|       1.0|
|  0.0|(692,[129,130,131...|[19.8506078990277...|[0.99999999760673...|       0.0|
|  1.0|(692,[158,159,160...|[-20.337256674833...|[1.47109814695581...|       1.0|
|  1.0|(692,[99,100,101,...|[-19.595579753418...|[3.08850168102631...|       1.0|
|  0.0|(692,[154,155,156...|[19.2708803215613...|[0.99999999572670...|       0.0|
|  0.0|(692,[127

In [13]:
logRegModelSummary.predictions.printSchema()

root
 |-- label: double (nullable = true)
 |-- features: vector (nullable = true)
 |-- rawPrediction: vector (nullable = true)
 |-- probability: vector (nullable = true)
 |-- prediction: double (nullable = true)



In [14]:
train, test = data.randomSplit([0.7, 0.3])

In [15]:
model = LogisticRegression()

In [16]:
fitModel = model.fit(train)

In [17]:
predictionAndLabels = fitModel.evaluate(test)

In [18]:
predictionAndLabels.predictions.show(10)

+-----+--------------------+--------------------+--------------------+----------+
|label|            features|       rawPrediction|         probability|prediction|
+-----+--------------------+--------------------+--------------------+----------+
|  0.0|(692,[98,99,100,1...|[33.1480305474426...|[0.99999999999999...|       0.0|
|  0.0|(692,[122,123,124...|[18.6217989110058...|[0.99999999182184...|       0.0|
|  0.0|(692,[123,124,125...|[29.7373985331799...|[0.99999999999987...|       0.0|
|  0.0|(692,[126,127,128...|[27.6458685453425...|[0.99999999999901...|       0.0|
|  0.0|(692,[126,127,128...|[18.8427613184601...|[0.99999999344318...|       0.0|
|  0.0|(692,[126,127,128...|[32.9634297084732...|[0.99999999999999...|       0.0|
|  0.0|(692,[126,127,128...|[21.6523085107572...|[0.99999999960506...|       0.0|
|  0.0|(692,[127,128,129...|[18.9293884421683...|[0.99999999398727...|       0.0|
|  0.0|(692,[127,128,129...|[20.8747854480115...|[0.99999999914059...|       0.0|
|  0.0|(692,[150

### Introducing evaluators

In [19]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator

In [20]:
testEval = BinaryClassificationEvaluator()

The metric BinaryClassificationEvaluator() ouputs is roc. The MulticlassClassificationEvaluator can output other things such as F1 score, weighted precission, weighted recall, accuracy etc.

In [21]:
testEval.evaluate(predictionAndLabels.predictions)
# perfect classification: not realistic!

1.0

### Titanic time

In [22]:
titanicData = spark.read.csv('./Python-and-Spark-for-Big-Data-master/Spark_for_Machine_Learning/Logistic_Regression/titanic.csv', 
                            inferSchema=True, header=True)

In [23]:
titanicData.show(4)

+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|          Ticket|   Fare|Cabin|Embarked|
+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|          1|       0|     3|Braund, Mr. Owen ...|  male|22.0|    1|    0|       A/5 21171|   7.25| null|       S|
|          2|       1|     1|Cumings, Mrs. Joh...|female|38.0|    1|    0|        PC 17599|71.2833|  C85|       C|
|          3|       1|     3|Heikkinen, Miss. ...|female|26.0|    0|    0|STON/O2. 3101282|  7.925| null|       S|
|          4|       1|     1|Futrelle, Mrs. Ja...|female|35.0|    1|    0|          113803|   53.1| C123|       S|
+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
only showing top 4 rows



In [24]:
titanicData.printSchema()

root
 |-- PassengerId: integer (nullable = true)
 |-- Survived: integer (nullable = true)
 |-- Pclass: integer (nullable = true)
 |-- Name: string (nullable = true)
 |-- Sex: string (nullable = true)
 |-- Age: double (nullable = true)
 |-- SibSp: integer (nullable = true)
 |-- Parch: integer (nullable = true)
 |-- Ticket: string (nullable = true)
 |-- Fare: double (nullable = true)
 |-- Cabin: string (nullable = true)
 |-- Embarked: string (nullable = true)



In [25]:
titanicData.columns

['PassengerId',
 'Survived',
 'Pclass',
 'Name',
 'Sex',
 'Age',
 'SibSp',
 'Parch',
 'Ticket',
 'Fare',
 'Cabin',
 'Embarked']

In [26]:
titanicSelectCols = titanicData.select(['Survived', 'Pclass', 'Sex', 'Age', 'SibSp', 'Parch', 'Fare',
 'Embarked'])

In [27]:
titanicSelectCols.describe().show()

+-------+-------------------+------------------+------+------------------+------------------+-------------------+-----------------+--------+
|summary|           Survived|            Pclass|   Sex|               Age|             SibSp|              Parch|             Fare|Embarked|
+-------+-------------------+------------------+------+------------------+------------------+-------------------+-----------------+--------+
|  count|                891|               891|   891|               714|               891|                891|              891|     889|
|   mean| 0.3838383838383838| 2.308641975308642|  null| 29.69911764705882|0.5230078563411896|0.38159371492704824| 32.2042079685746|    null|
| stddev|0.48659245426485753|0.8360712409770491|  null|14.526497332334035|1.1027434322934315| 0.8060572211299488|49.69342859718089|    null|
|    min|                  0|                 1|female|              0.42|                 0|                  0|              0.0|       C|
|    max|    

For now, let's drop the null values. 

In [28]:
titanicSelectnonNull = titanicSelectCols.na.drop()

In [29]:
from pyspark.ml.feature import (VectorAssembler, StringIndexer, VectorIndexer, OneHotEncoder)

In [30]:
genderIndexer = StringIndexer(inputCol='Sex', outputCol='SexIndex')

After StringIndexer, we gotta One Hot Encode.

Say, we have three index: A, B, C.

Say using StringIndexer: A -> 0, B -> 1, C -> 2. 

One hot encoder converts each example into arrays of 0 and 1.

For instance if our example is 1, then one hot encoder makes it: [1, 0, 0]

In [31]:
genderEncoder = OneHotEncoder(inputCol = 'SexIndex', outputCol = 'SexVec')

In [32]:
embarkIndexer = StringIndexer(inputCol='Embarked', outputCol='EmbarkIndex')
embarkEncoder = StringIndexer(inputCol='EmbarkIndex', outputCol='EmbarkVec')

In [33]:
assembler = VectorAssembler(inputCols=['SexVec', 'EmbarkVec', 'Pclass', 'Age', 'SibSp',
                                       'Parch', 'Fare'], 
                           outputCol='features')

Creating a pipeline

In [34]:
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression

In [35]:
logReg = LogisticRegression(labelCol='Survived')

In [36]:
pipeline = Pipeline(stages = [genderIndexer, embarkIndexer, genderEncoder, embarkEncoder, assembler, logReg])

In [37]:
train_data, test_data = titanicSelectnonNull.randomSplit([0.7, 0.3])

In [38]:
fitModel = pipeline.fit(train_data)

In [39]:
results = fitModel.transform(test_data)

In [40]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

In [41]:
results.show(3)

+--------+------+----+----+-----+-----+-----+--------+--------+-----------+-------------+---------+--------------------+--------------------+--------------------+----------+
|Survived|Pclass| Sex| Age|SibSp|Parch| Fare|Embarked|SexIndex|EmbarkIndex|       SexVec|EmbarkVec|            features|       rawPrediction|         probability|prediction|
+--------+------+----+----+-----+-----+-----+--------+--------+-----------+-------------+---------+--------------------+--------------------+--------------------+----------+
|       0|     1|male|18.0|    1|    0|108.9|       C|     0.0|        1.0|(1,[0],[1.0])|      1.0|[1.0,1.0,1.0,18.0...|[-0.5510121797359...|[0.36562960745303...|       1.0|
|       0|     1|male|27.0|    0|    2|211.5|       C|     0.0|        1.0|(1,[0],[1.0])|      1.0|[1.0,1.0,1.0,27.0...|[-0.1585491207939...|[0.46044554451789...|       1.0|
|       0|     1|male|31.0|    1|    0| 52.0|       S|     0.0|        0.0|(1,[0],[1.0])|      0.0|[1.0,0.0,1.0,31.0...|[0.1493141

In [42]:
myEval = BinaryClassificationEvaluator(rawPredictionCol='prediction', labelCol='Survived')

In [43]:
myEval.evaluate(results)

0.7284375000000001

In [44]:
results.select('Survived', 'prediction').show(10)

+--------+----------+
|Survived|prediction|
+--------+----------+
|       0|       1.0|
|       0|       1.0|
|       0|       0.0|
|       0|       1.0|
|       0|       1.0|
|       0|       0.0|
|       0|       0.0|
|       0|       0.0|
|       0|       0.0|
|       0|       0.0|
+--------+----------+
only showing top 10 rows



## Project

## Binary Customer Churn

A marketing agency has many customers that use their service to produce ads for the client/customer websites. They've noticed that they have quite a bit of churn in clients. They basically randomly assign account managers right now, but want you to create a machine learning model that will help predict which customers will churn (stop buying their service) so that they can correctly assign the customers most at risk to churn an account manager. Luckily they have some historical data, can you help them out? Create a classification algorithm that will help classify whether or not a customer churned. Then the company can test this against incoming data for future customers to predict which customers will churn and assign them an account manager.

The data is saved as customer_churn.csv. Here are the fields and their definitions:

    Name : Name of the latest contact at Company
    Age: Customer Age
    Total_Purchase: Total Ads Purchased
    Account_Manager: Binary 0=No manager, 1= Account manager assigned
    Years: Totaly Years as a customer
    Num_sites: Number of websites that use the service.
    Onboard_date: Date that the name of the latest contact was onboarded
    Location: Client HQ Address
    Company: Name of Client Company
    
Once you've created the model and evaluated it, test out the model on some new data (you can think of this almost like a hold-out set) that your client has provided, saved under new_customers.csv. The client wants to know which customers are most likely to churn given this data (they don't have the label yet).

In [45]:
companyData = spark.read.csv('./Python-and-Spark-for-Big-Data-master/Spark_for_Machine_Learning/Logistic_Regression/customer_churn.csv', 
                             inferSchema=True, header = True)

In [46]:
companyData.show(6)

+----------------+----+--------------+---------------+-----+---------+--------------------+--------------------+--------------------+-----+
|           Names| Age|Total_Purchase|Account_Manager|Years|Num_Sites|        Onboard_date|            Location|             Company|Churn|
+----------------+----+--------------+---------------+-----+---------+--------------------+--------------------+--------------------+-----+
|Cameron Williams|42.0|       11066.8|              0| 7.22|      8.0|2013-08-30 07:00:...|10265 Elizabeth M...|          Harvey LLC|    1|
|   Kevin Mueller|41.0|      11916.22|              0|  6.5|     11.0|2013-08-13 00:38:...|6157 Frank Garden...|          Wilson PLC|    1|
|     Eric Lozano|38.0|      12884.75|              0| 6.67|     12.0|2016-06-29 06:20:...|1331 Keith Court ...|Miller, Johnson a...|    1|
|   Phillip White|42.0|       8010.76|              0| 6.71|     10.0|2014-04-22 12:43:...|13120 Daniel Moun...|           Smith Inc|    1|
|  Cynthia Norton|37

In [47]:
companyData.head(2)

[Row(Names='Cameron Williams', Age=42.0, Total_Purchase=11066.8, Account_Manager=0, Years=7.22, Num_Sites=8.0, Onboard_date=datetime.datetime(2013, 8, 30, 7, 0, 40), Location='10265 Elizabeth Mission Barkerburgh, AK 89518', Company='Harvey LLC', Churn=1),
 Row(Names='Kevin Mueller', Age=41.0, Total_Purchase=11916.22, Account_Manager=0, Years=6.5, Num_Sites=11.0, Onboard_date=datetime.datetime(2013, 8, 13, 0, 38, 46), Location='6157 Frank Gardens Suite 019 Carloshaven, RI 17756', Company='Wilson PLC', Churn=1)]

In [48]:
companyData.printSchema()

root
 |-- Names: string (nullable = true)
 |-- Age: double (nullable = true)
 |-- Total_Purchase: double (nullable = true)
 |-- Account_Manager: integer (nullable = true)
 |-- Years: double (nullable = true)
 |-- Num_Sites: double (nullable = true)
 |-- Onboard_date: timestamp (nullable = true)
 |-- Location: string (nullable = true)
 |-- Company: string (nullable = true)
 |-- Churn: integer (nullable = true)



In [49]:
companyData.describe().show()

+-------+-------------+-----------------+-----------------+------------------+-----------------+------------------+--------------------+--------------------+-------------------+
|summary|        Names|              Age|   Total_Purchase|   Account_Manager|            Years|         Num_Sites|            Location|             Company|              Churn|
+-------+-------------+-----------------+-----------------+------------------+-----------------+------------------+--------------------+--------------------+-------------------+
|  count|          900|              900|              900|               900|              900|               900|                 900|                 900|                900|
|   mean|         null|41.81666666666667|10062.82403333334|0.4811111111111111| 5.27315555555555| 8.587777777777777|                null|                null|0.16666666666666666|
| stddev|         null|6.127560416916251|2408.644531858096|0.4999208935073339|1.274449013194616|1.764835592035

Count is 900 across all the columns. So looks like we don't have any missing data.

In [50]:
companyData.groupby('Company').count().orderBy('count').show()

+--------------------+-----+
|             Company|count|
+--------------------+-----+
|            Todd LLC|    1|
|Wilson, Collins a...|    1|
|        White-Dennis|    1|
|     Campbell-Willis|    1|
|    Martinez-Roberts|    1|
|        Robinson PLC|    1|
|Jennings, Gates a...|    1|
|Hernandez, Middle...|    1|
|        Nguyen-Grant|    1|
|          Hayes-Hill|    1|
|      Jackson-Garcia|    1|
|Pittman, Watts an...|    1|
|        Nguyen-Pratt|    1|
|          Barton Inc|    1|
|Miller, Johnson a...|    1|
|          Obrien PLC|    1|
|Hunter, Reyes and...|    1|
|Smith, Marshall a...|    1|
|           Smith PLC|    1|
|          Hall Group|    1|
+--------------------+-----+
only showing top 20 rows



In [51]:
companyData.count()

900

In [52]:
from pyspark.sql.functions import countDistinct

In [53]:
companyData.select(countDistinct('Company')).show()

+-----------------------+
|count(DISTINCT Company)|
+-----------------------+
|                    873|
+-----------------------+



Since there are so many companies, it's probably not the best idea to one hot encode it. Let's convert a feature vector using the numerical values. 

In [54]:
from pyspark.ml.feature import VectorAssembler

In [55]:
assembler = VectorAssembler(inputCols=['Age', 'Total_Purchase', 'Years', 'Num_Sites'], outputCol='features')

In [56]:
assembledData = assembler.transform(companyData)

In [57]:
assembledData.show(3)

+----------------+----+--------------+---------------+-----+---------+--------------------+--------------------+--------------------+-----+--------------------+
|           Names| Age|Total_Purchase|Account_Manager|Years|Num_Sites|        Onboard_date|            Location|             Company|Churn|            features|
+----------------+----+--------------+---------------+-----+---------+--------------------+--------------------+--------------------+-----+--------------------+
|Cameron Williams|42.0|       11066.8|              0| 7.22|      8.0|2013-08-30 07:00:...|10265 Elizabeth M...|          Harvey LLC|    1|[42.0,11066.8,7.2...|
|   Kevin Mueller|41.0|      11916.22|              0|  6.5|     11.0|2013-08-13 00:38:...|6157 Frank Garden...|          Wilson PLC|    1|[41.0,11916.22,6....|
|     Eric Lozano|38.0|      12884.75|              0| 6.67|     12.0|2016-06-29 06:20:...|1331 Keith Court ...|Miller, Johnson a...|    1|[38.0,12884.75,6....|
+----------------+----+-----------

In [58]:
dataToUse = assembledData.select('features', 'Churn')

In [59]:
dataToUse.show(2)

+--------------------+-----+
|            features|Churn|
+--------------------+-----+
|[42.0,11066.8,7.2...|    1|
|[41.0,11916.22,6....|    1|
+--------------------+-----+
only showing top 2 rows



In [60]:
companyData.show(2)

+----------------+----+--------------+---------------+-----+---------+--------------------+--------------------+----------+-----+
|           Names| Age|Total_Purchase|Account_Manager|Years|Num_Sites|        Onboard_date|            Location|   Company|Churn|
+----------------+----+--------------+---------------+-----+---------+--------------------+--------------------+----------+-----+
|Cameron Williams|42.0|       11066.8|              0| 7.22|      8.0|2013-08-30 07:00:...|10265 Elizabeth M...|Harvey LLC|    1|
|   Kevin Mueller|41.0|      11916.22|              0|  6.5|     11.0|2013-08-13 00:38:...|6157 Frank Garden...|Wilson PLC|    1|
+----------------+----+--------------+---------------+-----+---------+--------------------+--------------------+----------+-----+
only showing top 2 rows



In [61]:
companyData.write.csv('test2.csv', header = True)

In [62]:
!pwd

/home/sushant/Documents/SparkUdemyCourse


In [63]:
dataToUse.head(3)

[Row(features=DenseVector([42.0, 11066.8, 7.22, 8.0]), Churn=1),
 Row(features=DenseVector([41.0, 11916.22, 6.5, 11.0]), Churn=1),
 Row(features=DenseVector([38.0, 12884.75, 6.67, 12.0]), Churn=1)]

In [64]:
train, test = dataToUse.randomSplit([0.7, 0.3])

In [65]:
from pyspark.ml.classification import LogisticRegression

In [77]:
churnLogReg = LogisticRegression(labelCol='Churn')

In [79]:
fittedChurn = churnLogReg.fit(train)

In [80]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

In [81]:
predictionAndLabels = fittedChurn.evaluate(test)

In [83]:
type(fittedChurn)

pyspark.ml.classification.LogisticRegressionModel

In [84]:
type(predictionAndLabels)

pyspark.ml.classification.BinaryLogisticRegressionSummary

In [85]:
type(predictionAndLabels.predictions)

pyspark.sql.dataframe.DataFrame

In [82]:
predictionAndLabels.predictions.show(10)

+--------------------+-----+--------------------+--------------------+----------+
|            features|Churn|       rawPrediction|         probability|prediction|
+--------------------+-----+--------------------+--------------------+----------+
|[22.0,11254.38,4....|    0|[4.32590065552678...|[0.98695089394580...|       0.0|
|[28.0,11204.23,3....|    0|[1.39674579035469...|[0.80166698678519...|       0.0|
|[29.0,11274.46,4....|    0|[4.30801091430924...|[0.98671847657765...|       0.0|
|[30.0,8403.78,4.1...|    0|[5.61883357502287...|[0.99638424964562...|       0.0|
|[30.0,10960.52,5....|    0|[2.33397132615089...|[0.91165172205624...|       0.0|
|[30.0,12788.37,4....|    0|[2.04932453176479...|[0.88587934861851...|       0.0|
|[31.0,8829.83,4.5...|    0|[4.23572329105926...|[0.98573703483756...|       0.0|
|[31.0,10182.6,3.7...|    0|[4.60260670020639...|[0.99007384855316...|       0.0|
|[31.0,11743.24,5....|    0|[5.98659497691818...|[0.99749409173725...|       0.0|
|[32.0,8011.38,5

In [86]:
churnEval = BinaryClassificationEvaluator(rawPredictionCol='prediction', labelCol='Churn')

In [91]:
auc = churnEval.evaluate(predictionAndLabels.predictions)

In [92]:
auc

0.7936023227873449