### Logistic Regresssion using Python and Spark

This notebook will give you a quick walkthrough on implementing Logistic Regression using PySpark (Spark's Python API)


A generalized way to use PySpark in Jupyter notebook is to use findpsark. The init method will add the pyspark module to PATH during run time.

In [1]:
import findspark

In [2]:
findspark.init('/home/jinudaniel74/spark-2.1.1-bin-hadoop2.7')

Let's create a Spark Session

In [3]:
from pyspark.sql import SparkSession

In [4]:
spark = SparkSession.builder.appName('classification').getOrCreate()

#### Loading the Data Set

The data set we will use is the classic Titanic Dataset which will help us in predicting if a given passenger survived the titanic given a set of features about the passenger

In [5]:
data = spark.read.csv('titanic.csv', inferSchema=True, header=True)

In [6]:
data.printSchema()

root
 |-- PassengerId: integer (nullable = true)
 |-- Survived: integer (nullable = true)
 |-- Pclass: integer (nullable = true)
 |-- Name: string (nullable = true)
 |-- Sex: string (nullable = true)
 |-- Age: double (nullable = true)
 |-- SibSp: integer (nullable = true)
 |-- Parch: integer (nullable = true)
 |-- Ticket: string (nullable = true)
 |-- Fare: double (nullable = true)
 |-- Cabin: string (nullable = true)
 |-- Embarked: string (nullable = true)



In [7]:
data.columns

['PassengerId',
 'Survived',
 'Pclass',
 'Name',
 'Sex',
 'Age',
 'SibSp',
 'Parch',
 'Ticket',
 'Fare',
 'Cabin',
 'Embarked']

In [8]:
data.show()

+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|          Ticket|   Fare|Cabin|Embarked|
+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|          1|       0|     3|Braund, Mr. Owen ...|  male|22.0|    1|    0|       A/5 21171|   7.25| null|       S|
|          2|       1|     1|Cumings, Mrs. Joh...|female|38.0|    1|    0|        PC 17599|71.2833|  C85|       C|
|          3|       1|     3|Heikkinen, Miss. ...|female|26.0|    0|    0|STON/O2. 3101282|  7.925| null|       S|
|          4|       1|     1|Futrelle, Mrs. Ja...|female|35.0|    1|    0|          113803|   53.1| C123|       S|
|          5|       0|     3|Allen, Mr. Willia...|  male|35.0|    0|    0|          373450|   8.05| null|       S|
|          6|       0|     3|    Moran, Mr. James|  male|null|    0|    0|      

#### Feature Engineering

Let's select the columns that we think played a role in determining if the passenger survived. Also let us drop columns that dont have any values in any one the columns.

In [9]:
columns = ['Survived','Pclass','Sex','Age','SibSp','Parch',
 'Fare','Cabin','Embarked']

final_data = data.select(columns).na.drop()

Convert the String columns to a one hot encoding using String Indexer and then applying one hot encoding.

In [10]:
from pyspark.ml.feature import VectorAssembler, StringIndexer, OneHotEncoder

In [11]:
gender_indexer = StringIndexer(inputCol='Sex', outputCol='SexIndex')
gender_encoder = OneHotEncoder(inputCol='SexIndex', outputCol='SexVec')

In [12]:
embark_indexer = StringIndexer(inputCol='Embarked', outputCol='EmbarkedIndex')
embark_encoder = OneHotEncoder(inputCol='EmbarkedIndex', outputCol='EmbarkedVec')

VectorAssembler is a transformer that combines a given list of columns into a single vector column. It is useful for combining raw features and features generated by different feature transformers into a single feature vector, in order to train ML models.

In [13]:
assembler = VectorAssembler(inputCols=['Pclass','SexVec','Age','SibSp','Parch','Fare', 'EmbarkedVec'], outputCol='features')

#### Training the Model

We will initialize an estimator, the Logistic Regression Classifier, and chain them together in a machine learning pipeline. When the pipeline is used to fit training data, the transformers and estimator will apply to the dataframe in the sequence defined in the pipeline

In [14]:
from pyspark.ml.classification import LogisticRegression
from pyspark.ml import Pipeline

In [15]:
log_model = LogisticRegression(labelCol='Survived', featuresCol='features')

In [16]:
pipeline = Pipeline(stages=[gender_indexer, gender_encoder, embark_indexer, embark_encoder, assembler, log_model])

Split the data into Train and Test set in ratio 70:30

In [17]:
train_data, test_data = final_data.randomSplit([0.7, 0.3])

In [18]:
train_fit = pipeline.fit(train_data)

#### Validating the Model

How do we evaluate our model that we just trained? How do we know if the model we trained is a good one?
We have various metrics like accuracy, precision, recall etc which will help us in evaluating our model. Let’s try to find the accuracy of the model that we trained on.

In [19]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

In [20]:
predictions = train_fit.transform(test_data)
evaluator = MulticlassClassificationEvaluator(labelCol='Survived')
accuracy = evaluator.evaluate(predictions, {evaluator.metricName: "accuracy"})

In [21]:
print(accuracy)

0.6716417910447762


Use a different classifier like DecisionTreeClassifier.

In [23]:
from pyspark.ml.classification import DecisionTreeClassifier

In [24]:
dt_model = DecisionTreeClassifier(labelCol='Survived', featuresCol='features')

In [25]:
pipeline = Pipeline(stages=[gender_indexer, gender_encoder, embark_indexer, embark_encoder, assembler, dt_model])

In [26]:
dt_fit = pipeline.fit(train_data)

In [27]:
predictions = dt_fit.transform(test_data)
evaluator = MulticlassClassificationEvaluator(labelCol='Survived')
accuracy = evaluator.evaluate(predictions, {evaluator.metricName: "accuracy"})

In [28]:
print(accuracy)

0.7164179104477612
