## Importing Libraries

In [1]:
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.sql import SparkSession
from pyspark.ml import Pipeline
#from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.feature import VectorAssembler, StringIndexer, VectorIndexer, MinMaxScaler
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.evaluation import BinaryClassificationEvaluator

spark = SparkSession.builder.master("local[*]").getOrCreate()

23/09/21 14:14:24 WARN Utils: Your hostname, MacBook-Air-de-Mauricio-5.local resolves to a loopback address: 127.0.0.1; using 192.168.100.74 instead (on interface en0)
23/09/21 14:14:24 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/09/21 14:14:24 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [2]:
import numpy as np
import pandas as pd

## Loading data

In [3]:
data = spark.read.csv('train.csv', header = True, inferSchema = True)
data.show(5)

+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|          Ticket|   Fare|Cabin|Embarked|
+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|          1|       0|     3|Braund, Mr. Owen ...|  male|22.0|    1|    0|       A/5 21171|   7.25| null|       S|
|          2|       1|     1|Cumings, Mrs. Joh...|female|38.0|    1|    0|        PC 17599|71.2833|  C85|       C|
|          3|       1|     3|Heikkinen, Miss. ...|female|26.0|    0|    0|STON/O2. 3101282|  7.925| null|       S|
|          4|       1|     1|Futrelle, Mrs. Ja...|female|35.0|    1|    0|          113803|   53.1| C123|       S|
|          5|       0|     3|Allen, Mr. Willia...|  male|35.0|    0|    0|          373450|   8.05| null|       S|
+-----------+--------+------+--------------------+------+----+-----+-----+------

## Preparing the data

> ### Splitting the data

We will use 80% of data as training data, and 20% as testing data.

In [4]:
splits = data.randomSplit([0.8,0.2])
train = splits[0]
test = splits[1]
train_rows = train.count()
test_rows = test.count()
print("Training Rows:", train_rows, " Testing Rows:", test_rows)

23/09/21 14:14:37 WARN GarbageCollectionMetrics: To enable non-built-in garbage collector(s) List(G1 Concurrent GC), users should configure it(them) to spark.eventLog.gcMetrics.youngGenerationGarbageCollectors or spark.eventLog.gcMetrics.oldGenerationGarbageCollectors


Training Rows: 720  Testing Rows: 171


> ### Defining our pipeline

In [18]:
#Pipeline with all data
#This converts to labels
strIdx = StringIndexer(inputCol = "Sex", outputCol = "SexIdx")
#This creates a vector where the columns asked are included
catVect = VectorAssembler(inputCols = ["Pclass", "SexIdx","SibSp","Parch"], outputCol = "catFeatures")
catIdx = VectorIndexer(inputCol = catVect.getOutputCol(),outputCol = "idxCatFeatures")
numVect = VectorAssembler(inputCols = ["Fare"],outputCol = "numFeatures")
minMax = MinMaxScaler(inputCol = numVect.getOutputCol(), outputCol = "normFeatures")
featVect = VectorAssembler(inputCols = ["idxCatFeatures", "normFeatures"],outputCol = "features")
lr = LogisticRegression(labelCol = "Survived", featuresCol="features", maxIter = 10, regParam = 0.3)
pipeline = Pipeline(stages = [strIdx,catVect,catIdx,numVect,minMax,featVect,lr])

### Training the model

In [19]:
piplineModel = pipeline.fit(train)

23/09/21 14:31:32 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS
23/09/21 14:31:32 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.VectorBLAS


### Generate label predictions

In [23]:
prediction = piplineModel.transform(test)
print(prediction.columns)

predicted = prediction.select("features","prediction","Survived")
predicted.show(100,truncate = False)

['PassengerId', 'Survived', 'Pclass', 'Name', 'Sex', 'Age', 'SibSp', 'Parch', 'Ticket', 'Fare', 'Cabin', 'Embarked', 'SexIdx', 'catFeatures', 'idxCatFeatures', 'numFeatures', 'normFeatures', 'features', 'rawPrediction', 'probability', 'prediction']
+--------------------------------------+----------+--------+
|features                              |prediction|Survived|
+--------------------------------------+----------+--------+
|[2.0,0.0,1.0,0.0,0.014151057562208049]|0.0       |0       |
|[0.0,1.0,1.0,0.0,0.13913573538264068] |1.0       |1       |
|[0.0,1.0,1.0,0.0,0.10364429745562033] |1.0       |1       |
|(5,[0,4],[2.0,0.015712553569072387])  |0.0       |0       |
|[2.0,1.0,1.0,1.0,0.03259622914329302] |0.0       |1       |
|[2.0,1.0,0.0,0.0,0.014102260811993537]|0.0       |1       |
|(5,[0,4],[1.0,0.050748620223090936])  |0.0       |0       |
|[0.0,1.0,1.0,0.0,0.2859895551532101]  |1.0       |1       |
|[2.0,1.0,0.0,0.0,0.01537917417160685] |0.0       |1       |
|(5,[0,4],[2.0,0.01