In [1]:
from pyspark.sql import SparkSession

In [2]:
spark = SparkSession.builder.appName('logregression').getOrCreate()

### Titanic DataSet
We will use it to attempt to predict what passengers survived the titanic crash based solely on passenger's features (age, cabin, children, etc)
<br>We will deal with missing data

In [3]:
data = spark.read.csv('titanic.csv', inferSchema=True, header=True)

In [4]:
data.show()

+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|          Ticket|   Fare|Cabin|Embarked|
+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|          1|       0|     3|Braund, Mr. Owen ...|  male|22.0|    1|    0|       A/5 21171|   7.25| null|       S|
|          2|       1|     1|Cumings, Mrs. Joh...|female|38.0|    1|    0|        PC 17599|71.2833|  C85|       C|
|          3|       1|     3|Heikkinen, Miss. ...|female|26.0|    0|    0|STON/O2. 3101282|  7.925| null|       S|
|          4|       1|     1|Futrelle, Mrs. Ja...|female|35.0|    1|    0|          113803|   53.1| C123|       S|
|          5|       0|     3|Allen, Mr. Willia...|  male|35.0|    0|    0|          373450|   8.05| null|       S|
|          6|       0|     3|    Moran, Mr. James|  male|null|    0|    0|      

In [6]:
data.printSchema()

root
 |-- PassengerId: integer (nullable = true)
 |-- Survived: integer (nullable = true)
 |-- Pclass: integer (nullable = true)
 |-- Name: string (nullable = true)
 |-- Sex: string (nullable = true)
 |-- Age: double (nullable = true)
 |-- SibSp: integer (nullable = true)
 |-- Parch: integer (nullable = true)
 |-- Ticket: string (nullable = true)
 |-- Fare: double (nullable = true)
 |-- Cabin: string (nullable = true)
 |-- Embarked: string (nullable = true)



In [15]:
data.columns

['PassengerId',
 'Survived',
 'Pclass',
 'Name',
 'Sex',
 'Age',
 'SibSp',
 'Parch',
 'Ticket',
 'Fare',
 'Cabin',
 'Embarked']

In [14]:
data.head(1)[0]

Row(PassengerId=1, Survived=0, Pclass=3, Name='Braund, Mr. Owen Harris', Sex='male', Age=22.0, SibSp=1, Parch=0, Ticket='A/5 21171', Fare=7.25, Cabin=None, Embarked='S')

In [16]:
my_cols = data.select(['Survived',
 'Pclass',
 'Sex',
 'Age',
 'SibSp',
 'Parch',
 'Fare',
 'Embarked'])

### Deal with missing Data
To keep simply we will drop the missing data

In [17]:
my_final_data = my_cols.na.drop()

In [21]:
my_final_data.show()

+--------+------+------+----+-----+-----+-------+--------+
|Survived|Pclass|   Sex| Age|SibSp|Parch|   Fare|Embarked|
+--------+------+------+----+-----+-----+-------+--------+
|       0|     3|  male|22.0|    1|    0|   7.25|       S|
|       1|     1|female|38.0|    1|    0|71.2833|       C|
|       1|     3|female|26.0|    0|    0|  7.925|       S|
|       1|     1|female|35.0|    1|    0|   53.1|       S|
|       0|     3|  male|35.0|    0|    0|   8.05|       S|
|       0|     1|  male|54.0|    0|    0|51.8625|       S|
|       0|     3|  male| 2.0|    3|    1| 21.075|       S|
|       1|     3|female|27.0|    0|    2|11.1333|       S|
|       1|     2|female|14.0|    1|    0|30.0708|       C|
|       1|     3|female| 4.0|    1|    1|   16.7|       S|
|       1|     1|female|58.0|    0|    0|  26.55|       S|
|       0|     3|  male|20.0|    0|    0|   8.05|       S|
|       0|     3|  male|39.0|    1|    5| 31.275|       S|
|       0|     3|female|14.0|    0|    0| 7.8542|       

### Work with categorial columns

In [18]:
from pyspark.ml.feature import VectorAssembler, VectorIndexer, OneHotEncoder, StringIndexer

In [19]:
# StringIndexer allows us to convert every string into a number
gender_indexer = StringIndexer(inputCol='Sex', outputCol='SexIndex')

# Now we will one hot encode them = This means it will transform the actual numbers for the categories into
# a one hot encoding where you have an array indicating kind of zeros and ones of the actual category it was

# A B C
# 0 1 2   indexer
# ONE HOT ENCODE
# KEY A B C
# Example A
# [1, 0, 0]

# What this does is indicates though a vector format of only ones and zeros what the category is.

gender_encoder = OneHotEncoder(inputCol='SexIndex',outputCol='SexVec')

In [20]:
embark_indexer = StringIndexer(inputCol='Embarked', outputCol='EmbarkIndex')
embark_encoder = OneHotEncoder(inputCol='EmbarkIndex',outputCol='EmbarkVec')

In [22]:
# Then we need to create an assembler
assembler = VectorAssembler(inputCols=['Pclass', 'SexVec', 'EmbarkVec', 'Age', 'SibSp', 'Parch', 'Fare'],
                            outputCol='features')

In [23]:
# Use classification model
# Use pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml import Pipeline

### Pipelines
What a pipeline does is it sets stages for different steps.
<br> If we have a very complex machine learning task you will often have to set up a bunch of stages and in our case this was a little more complex than what we've dealt with before because we are dealing with indexing and encoding, so we want to set the stages of how to do this

First we need to do the indexer and then the encoder before doing the assembling.

In [24]:
log_reg_titanic = LogisticRegression(featuresCol='features',labelCol='Survived')

In [25]:
# Create a pipeline
# it has a stages parameter, so it tells the order of what we want to do.

pipeline = Pipeline(stages = [gender_indexer, embark_indexer, gender_encoder, embark_encoder, assembler, log_reg_titanic])

In [26]:
# We can treat a pipeline the same way we treat a normal model

In [27]:
# Get train and test data
train_data, test_data = my_final_data.randomSplit([0.7, 0.3])

In [28]:
fit_model = pipeline.fit(train_data)

In [29]:
# Transform our test data
results = fit_model.transform(test_data)

In [30]:
# Evaluate
from pyspark.ml.evaluation import BinaryClassificationEvaluator

In [31]:
# When you tranform the test_data, then you need to send the prediction string to the BinaryClassification method.
my_eval = BinaryClassificationEvaluator(rawPredictionCol='prediction', labelCol='Survived')

In [32]:
results.show()

+--------+------+------+----+-----+-----+--------+--------+--------+-----------+-------------+-------------+--------------------+--------------------+--------------------+----------+
|Survived|Pclass|   Sex| Age|SibSp|Parch|    Fare|Embarked|SexIndex|EmbarkIndex|       SexVec|    EmbarkVec|            features|       rawPrediction|         probability|prediction|
+--------+------+------+----+-----+-----+--------+--------+--------+-----------+-------------+-------------+--------------------+--------------------+--------------------+----------+
|       0|     1|female| 2.0|    1|    2|  151.55|       S|     1.0|        0.0|    (1,[],[])|(2,[0],[1.0])|[1.0,0.0,1.0,0.0,...|[-3.7620323485052...|[0.02270879555774...|       1.0|
|       0|     1|  male|19.0|    3|    2|   263.0|       S|     0.0|        0.0|(1,[0],[1.0])|(2,[0],[1.0])|[1.0,1.0,1.0,0.0,...|[-0.4273108346641...|[0.39476866191152...|       1.0|
|       0|     1|  male|21.0|    0|    1| 77.2875|       S|     0.0|        0.0|(1,[0

In [34]:
results.select('Survived','prediction').show()

+--------+----------+
|Survived|prediction|
+--------+----------+
|       0|       1.0|
|       0|       1.0|
|       0|       1.0|
|       0|       1.0|
|       0|       1.0|
|       0|       0.0|
|       0|       1.0|
|       0|       0.0|
|       0|       0.0|
|       0|       0.0|
|       0|       0.0|
|       0|       0.0|
|       0|       0.0|
|       0|       0.0|
|       0|       0.0|
|       0|       0.0|
|       0|       0.0|
|       0|       0.0|
|       0|       0.0|
|       0|       0.0|
+--------+----------+
only showing top 20 rows



In [35]:
AUC = my_eval.evaluate(results)

In [36]:
AUC

0.7948379351740698