In [1]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

In [3]:
'''
In this example code along, we will introduce the concept of "Evaluators"
Evaluators behave similar to Machine Learning Algorithm Objects, but they 
are designed to take in evaluation dataframes ~ model.evaluate(test_data)

Evaluators are technically still "experimental" according to the documentation,
so use caution when using them for production code.

'''

from pyspark.sql import SparkSession

In [4]:
spark = SparkSession.builder.appName("logReg").getOrCreate()

In [5]:
from pyspark.ml.classification import LogisticRegression

myData = spark.read.format("libsvm").load("/Users/jaskiratsinghp/Desktop/Python-and-Spark-for-Big-Data-master/Spark_for_Machine_Learning/Logistic_Regression/sample_libsvm_data.txt")
myData.show()

+-----+--------------------+
|label|            features|
+-----+--------------------+
|  0.0|(692,[127,128,129...|
|  1.0|(692,[158,159,160...|
|  1.0|(692,[124,125,126...|
|  1.0|(692,[152,153,154...|
|  1.0|(692,[151,152,153...|
|  0.0|(692,[129,130,131...|
|  1.0|(692,[158,159,160...|
|  1.0|(692,[99,100,101,...|
|  0.0|(692,[154,155,156...|
|  0.0|(692,[127,128,129...|
|  1.0|(692,[154,155,156...|
|  0.0|(692,[153,154,155...|
|  0.0|(692,[151,152,153...|
|  1.0|(692,[129,130,131...|
|  0.0|(692,[154,155,156...|
|  1.0|(692,[150,151,152...|
|  0.0|(692,[124,125,126...|
|  0.0|(692,[152,153,154...|
|  1.0|(692,[97,98,99,12...|
|  1.0|(692,[124,125,126...|
+-----+--------------------+
only showing top 20 rows



In [6]:
## Lets create a Logistic Regression Model

logisticRegModel = LogisticRegression()

fitted_logisticReg = logisticRegModel.fit(myData)

logisticReg_summary = fitted_logisticReg.summary

In [7]:
logisticReg_summary.predictions.printSchema()

root
 |-- label: double (nullable = true)
 |-- features: vector (nullable = true)
 |-- rawPrediction: vector (nullable = true)
 |-- probability: vector (nullable = true)
 |-- prediction: double (nullable = false)



In [8]:
## Lets see these columns in tabular format
logisticReg_summary.predictions.show()

+-----+--------------------+--------------------+--------------------+----------+
|label|            features|       rawPrediction|         probability|prediction|
+-----+--------------------+--------------------+--------------------+----------+
|  0.0|(692,[127,128,129...|[19.8534775947478...|[0.99999999761359...|       0.0|
|  1.0|(692,[158,159,160...|[-20.377398194908...|[1.41321555111056...|       1.0|
|  1.0|(692,[124,125,126...|[-27.401459284891...|[1.25804865126979...|       1.0|
|  1.0|(692,[152,153,154...|[-18.862741612668...|[6.42710509170303...|       1.0|
|  1.0|(692,[151,152,153...|[-20.483011833009...|[1.27157209200604...|       1.0|
|  0.0|(692,[129,130,131...|[19.8506078990277...|[0.99999999760673...|       0.0|
|  1.0|(692,[158,159,160...|[-20.337256674833...|[1.47109814695581...|       1.0|
|  1.0|(692,[99,100,101,...|[-19.595579753418...|[3.08850168102631...|       1.0|
|  0.0|(692,[154,155,156...|[19.2708803215613...|[0.99999999572670...|       0.0|
|  0.0|(692,[127

In [9]:
'''
Now to do evaluation, we need to have test_data, but we didn't split the dataset
into train_data and test_data.
Let's do that now;

'''

lr_train , lr_test = myData.randomSplit([0.7 , 0.3])

finalModel = LogisticRegression()

In [10]:
fitFinal = finalModel.fit(lr_train)

In [11]:
predictions_and_labels = fitFinal.evaluate(lr_test)

In [12]:
predictions_and_labels.predictions.show()

+-----+--------------------+--------------------+--------------------+----------+
|label|            features|       rawPrediction|         probability|prediction|
+-----+--------------------+--------------------+--------------------+----------+
|  0.0|(692,[100,101,102...|[9.89104393135796...|[0.99994937649461...|       0.0|
|  0.0|(692,[122,123,148...|[20.7692889692019...|[0.99999999904498...|       0.0|
|  0.0|(692,[124,125,126...|[30.9988648094182...|[0.99999999999996...|       0.0|
|  0.0|(692,[126,127,128...|[22.1359065604703...|[0.99999999975650...|       0.0|
|  0.0|(692,[128,129,130...|[23.7173834661564...|[0.99999999994991...|       0.0|
|  0.0|(692,[152,153,154...|[13.8075060142325...|[0.99999899196435...|       0.0|
|  0.0|(692,[153,154,155...|[27.2634869920189...|[0.99999999999855...|       0.0|
|  0.0|(692,[154,155,156...|[19.4280376498931...|[0.99999999634817...|       0.0|
|  0.0|(692,[154,155,156...|[8.40423015093813...|[0.99977613201756...|       0.0|
|  0.0|(692,[234

In [13]:
## IMPORTANT:
## Its HIGHLY RECOMMENDED to check the Documentation for the same.

from pyspark.ml.evaluation import (BinaryClassificationEvaluator, 
                                   MulticlassClassificationEvaluator)

myEval = BinaryClassificationEvaluator()

myFinalROC = myEval.evaluate(predictions_and_labels.predictions)  

In [14]:
myFinalROC

1.0

# Now Lets Work on the Real Time Dataset:

## **We will be working on "classic" Titanic Dataset

In [None]:
'''
* We will use this dataset to attempt to predict what passengers survived the 
  titanic crash based solely on passengers's features (age , cabin , children etc)

* We will see some better ways to deal with categorical data through a two-step
  process.

* We will also use pipelines to set stages and build models that can be easily
  used again.
  
* This data also have lot of missing information, so we will need to deal with
  that as well.

'''

In [15]:
df = spark.read.csv("/Users/jaskiratsinghp/Desktop/PersonalStuff/Datasets/titanic.csv" , inferSchema = True , header = True)

df.printSchema()

root
 |-- PassengerId: integer (nullable = true)
 |-- Survived: integer (nullable = true)
 |-- Pclass: integer (nullable = true)
 |-- Name: string (nullable = true)
 |-- Sex: string (nullable = true)
 |-- Age: double (nullable = true)
 |-- SibSp: integer (nullable = true)
 |-- Parch: integer (nullable = true)
 |-- Ticket: string (nullable = true)
 |-- Fare: double (nullable = true)
 |-- Cabin: string (nullable = true)
 |-- Embarked: string (nullable = true)



In [16]:
df.columns

['PassengerId',
 'Survived',
 'Pclass',
 'Name',
 'Sex',
 'Age',
 'SibSp',
 'Parch',
 'Ticket',
 'Fare',
 'Cabin',
 'Embarked']

In [17]:
## Lets do Feature Selection

myCols = df.select([
     'Survived',
     'Pclass',
     'Sex',
     'Age',
     'SibSp',
     'Parch',
     'Fare',
     'Embarked'
])


In [18]:
## Lets deal with missing data:
## The simplest way to deal with them is to drop them.

myFinalData = myCols.na.drop()

In [20]:
## Now lets deal with the categorical data:

from pyspark.ml.feature import (VectorAssembler, 
                                VectorIndexer,
                                OneHotEncoder,
                                StringIndexer)

'''
The way we are going to operate working on categorical columns is:

First, we are going to create a String Indexer and then we are going to
One Hot Encode them.
This is because the String Indexer allows you to convert every string into
a number and then we can easily apply One Hot Encoder.

'''

## Lets do that for Sex column.
genderIndexer = StringIndexer(inputCol = "Sex" , outputCol = "SexIndex")

genderEncoder = OneHotEncoder(inputCol = "SexIndex" , outputCol = "SexVec")

In [22]:
## Now, lets format Embarked column.
embarkIndexer = StringIndexer(inputCol = "Embarked" , outputCol = "EmbarkIndex")

embarkEncoder = OneHotEncoder(inputCol = "EmbarkIndex" , outputCol = "EmbarkVec")


In [23]:
## Now lets format the dataset in a way that is accepted by MLlib library.

assembler = VectorAssembler(inputCols = ["Pclass", "SexVec", "EmbarkVec", "Age", "SibSp", "Parch", "Fare"] , 
                            outputCol = "features")

In [25]:
## Lets create our Pipeline.

from pyspark.ml.classification import LogisticRegression
from pyspark.ml import Pipeline

'''
So what Pipeline does is, it sets stages for different steps.

If you have a very complex machine learning task you will often have
to set up a bunch of stages. 

'''

logisticReg_titanic = LogisticRegression(featuresCol = "features" , labelCol = "Survived")


In [26]:
## Now lets set out Pipeline.

pipeline = Pipeline(stages = [genderIndexer,
                              embarkIndexer,
                              genderEncoder,
                              embarkEncoder,
                              assembler,
                              logisticReg_titanic
                             ])

## Now this pipeline will do operations in this sequence one by one.

In [27]:
## Lets split our data in train_data and test_data

train_data , test_data = myFinalData.randomSplit([0.7 , 0.3])

fitModel = pipeline.fit(train_data)

In [28]:
## Now lets transform our test_data.

results = fitModel.transform(test_data)

In [32]:
results.select("Survived" , "prediction").show()

+--------+----------+
|Survived|prediction|
+--------+----------+
|       0|       1.0|
|       0|       0.0|
|       0|       0.0|
|       0|       0.0|
|       0|       0.0|
|       0|       0.0|
|       0|       0.0|
|       0|       0.0|
|       0|       0.0|
|       0|       0.0|
|       0|       0.0|
|       0|       0.0|
|       0|       0.0|
|       0|       0.0|
|       0|       0.0|
|       0|       0.0|
|       0|       0.0|
|       0|       0.0|
|       0|       0.0|
|       0|       1.0|
+--------+----------+
only showing top 20 rows



In [31]:
## Lets evaluate our model

from pyspark.ml.evaluation import BinaryClassificationEvaluator

myEval = BinaryClassificationEvaluator(rawPredictionCol = "prediction" , 
                                      labelCol = "Survived")

In [34]:
AUC = myEval.evaluate(results)
AUC

0.7614044540229885