<a href="https://colab.research.google.com/github/vivekvision/PySparkMLRepo/blob/main/PySparkLogisticRegTitanic.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.2.0.tar.gz (281.3 MB)
[K     |████████████████████████████████| 281.3 MB 33 kB/s 
[?25hCollecting py4j==0.10.9.2
  Downloading py4j-0.10.9.2-py2.py3-none-any.whl (198 kB)
[K     |████████████████████████████████| 198 kB 44.1 MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.2.0-py2.py3-none-any.whl size=281805912 sha256=efc71916dba6aee6e2d1f54328070e2ea080eda54ee462951b8af4bed4120435
  Stored in directory: /root/.cache/pip/wheels/0b/de/d2/9be5d59d7331c6c2a7c1b6d1a4f463ce107332b1ecd4e80718
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9.2 pyspark-3.2.0


In [2]:
from pyspark.sql import SparkSession
spark = SparkSession.builder\
        .master("local")\
        .appName("Colab")\
        .config('spark.ui.port', '4050')\
        .getOrCreate()

In [31]:
from pyspark.sql import functions as sf


In [41]:
from pyspark.ml.classification import LogisticRegression

df = spark.read.csv("titanic.csv", header=True)

In [42]:
df.limit(10).toPandas()

Unnamed: 0,PassengerId,Survived,Pclass,Name,Sex,Age,SibSp,Parch,Ticket,Fare,Cabin,Embarked
0,1,0,3,"Braund, Mr. Owen Harris",male,22.0,1,0,A/5 21171,7.25,,S
1,2,1,1,"Cumings, Mrs. John Bradley (Florence Briggs Th...",female,38.0,1,0,PC 17599,71.2833,C85,C
2,3,1,3,"Heikkinen, Miss. Laina",female,26.0,0,0,STON/O2. 3101282,7.925,,S
3,4,1,1,"Futrelle, Mrs. Jacques Heath (Lily May Peel)",female,35.0,1,0,113803,53.1,C123,S
4,5,0,3,"Allen, Mr. William Henry",male,35.0,0,0,373450,8.05,,S
5,6,0,3,"Moran, Mr. James",male,,0,0,330877,8.4583,,Q
6,7,0,1,"McCarthy, Mr. Timothy J",male,54.0,0,0,17463,51.8625,E46,S
7,8,0,3,"Palsson, Master. Gosta Leonard",male,2.0,3,1,349909,21.075,,S
8,9,1,3,"Johnson, Mrs. Oscar W (Elisabeth Vilhelmina Berg)",female,27.0,0,2,347742,11.1333,,S
9,10,1,2,"Nasser, Mrs. Nicholas (Adele Achem)",female,14.0,1,0,237736,30.0708,,C


In [43]:
df.printSchema()

root
 |-- PassengerId: string (nullable = true)
 |-- Survived: string (nullable = true)
 |-- Pclass: string (nullable = true)
 |-- Name: string (nullable = true)
 |-- Sex: string (nullable = true)
 |-- Age: string (nullable = true)
 |-- SibSp: string (nullable = true)
 |-- Parch: string (nullable = true)
 |-- Ticket: string (nullable = true)
 |-- Fare: string (nullable = true)
 |-- Cabin: string (nullable = true)
 |-- Embarked: string (nullable = true)



In [53]:
my_cols = df.select('Survived','Pclass','Sex','Age','SibSp','Parch','Fare','Embarked')

# Drop the missing data 
my_final_data = my_cols.na.drop()

# convert to double
my_final_data = my_final_data.withColumn('Survived', sf.col('Survived').cast('double')) 
my_final_data = my_final_data.withColumn('Pclass', sf.col('Pclass').cast('double'))
my_final_data = my_final_data.withColumn('Age', sf.col('Age').cast('double'))
my_final_data = my_final_data.withColumn('SibSp', sf.col('SibSp').cast('double'))
my_final_data = my_final_data.withColumn('Parch', sf.col('Parch').cast('double'))
my_final_data = my_final_data.withColumn('Fare', sf.col('Fare').cast('double'))

In [54]:
from pyspark.ml.feature import(VectorAssembler, VectorIndexer, OneHotEncoder, StringIndexer)

In [55]:
# string indexer convert string into number 

gender_indexer = StringIndexer(inputCol='Sex', outputCol='SexIndex')

# one hot encode
# Key A B C
# A -> [1,0,0]

gender_encoder = OneHotEncoder(inputCol = 'SexIndex', outputCol='SexVec')

In [56]:
embark_indexer = StringIndexer(inputCol='Embarked', outputCol='EmbarkIndex')
embark_encoder = OneHotEncoder(inputCol='EmbarkIndex', outputCol='EmbarkVec')

In [57]:
assembler = VectorAssembler(inputCols=['Pclass', 'SexVec', 'EmbarkVec', 'Age', 'SibSp', 'Parch', 'Fare'], outputCol='features')

In [58]:
from pyspark.ml.classification import LogisticRegression

# Pipeline stages for different steps, for complex machine learning task, often have to set up few stages 
# Stages like indexing encoding etc. 
from pyspark.ml import Pipeline

In [59]:
log_reg_titanic = LogisticRegression(featuresCol='features', labelCol='Survived')

In [60]:
pipeline = Pipeline(stages=[gender_indexer, embark_indexer, gender_encoder, embark_encoder, assembler, log_reg_titanic])

# treat pipeline object like a model object 

train_data, test_data = my_final_data.randomSplit([0.7, 0.3])

In [63]:
# call a pipeline object just like a machine learning model
fit_model = pipeline.fit(train_data)

In [64]:
results = fit_model.transform(test_data)

In [65]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

In [69]:
# by default predicted column is always 'prediction' when transform is called on model
my_eval = BinaryClassificationEvaluator(rawPredictionCol='prediction', labelCol='Survived')



In [70]:
# display results on test set
results.select('Survived', 'prediction').show()

+--------+----------+
|Survived|prediction|
+--------+----------+
|     0.0|       1.0|
|     0.0|       1.0|
|     0.0|       1.0|
|     0.0|       1.0|
|     0.0|       1.0|
|     0.0|       1.0|
|     0.0|       0.0|
|     0.0|       0.0|
|     0.0|       0.0|
|     0.0|       0.0|
|     0.0|       0.0|
|     0.0|       0.0|
|     0.0|       0.0|
|     0.0|       0.0|
|     0.0|       0.0|
|     0.0|       0.0|
|     0.0|       0.0|
|     0.0|       0.0|
|     0.0|       0.0|
|     0.0|       0.0|
+--------+----------+
only showing top 20 rows



In [72]:
# BinaryClassificationEvaluator returns area under the curve on evaluate function call

AUC = my_eval.evaluate(results)

In [73]:
AUC

0.7711400205409106