In [49]:
!pip install pyspark py4j

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


#Added the Data from CSV

In [50]:
from pyspark.sql import SparkSession

In [51]:
spark = SparkSession.builder.appName('titanic_lor').getOrCreate()

In [52]:
data = spark.read.csv("titanic.csv",inferSchema=True,header=True)

In [53]:
data.show()

+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|          Ticket|   Fare|Cabin|Embarked|
+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|          1|       0|     3|Braund, Mr. Owen ...|  male|22.0|    1|    0|       A/5 21171|   7.25| null|       S|
|          2|       1|     1|Cumings, Mrs. Joh...|female|38.0|    1|    0|        PC 17599|71.2833|  C85|       C|
|          3|       1|     3|Heikkinen, Miss. ...|female|26.0|    0|    0|STON/O2. 3101282|  7.925| null|       S|
|          4|       1|     1|Futrelle, Mrs. Ja...|female|35.0|    1|    0|          113803|   53.1| C123|       S|
|          5|       0|     3|Allen, Mr. Willia...|  male|35.0|    0|    0|          373450|   8.05| null|       S|
|          6|       0|     3|    Moran, Mr. James|  male|null|    0|    0|      

In [54]:
data.printSchema()

root
 |-- PassengerId: integer (nullable = true)
 |-- Survived: integer (nullable = true)
 |-- Pclass: integer (nullable = true)
 |-- Name: string (nullable = true)
 |-- Sex: string (nullable = true)
 |-- Age: double (nullable = true)
 |-- SibSp: integer (nullable = true)
 |-- Parch: integer (nullable = true)
 |-- Ticket: string (nullable = true)
 |-- Fare: double (nullable = true)
 |-- Cabin: string (nullable = true)
 |-- Embarked: string (nullable = true)



In [55]:
data.columns

['PassengerId',
 'Survived',
 'Pclass',
 'Name',
 'Sex',
 'Age',
 'SibSp',
 'Parch',
 'Ticket',
 'Fare',
 'Cabin',
 'Embarked']

#Filtering Out The Columns And Droping Rows That Have Null Value



In [56]:
my_cols = data.select(['Survived',
 'Pclass','Sex',
 'Age',
 'SibSp',
 'Parch','Fare','Embarked'])

In [57]:
my_final_data = my_cols.na.drop()

#Added the Assembler And Indexer And OneHotEncoder For Creating A DataFrame which is understandable For PySpark

In [58]:
from pyspark.ml.feature import (VectorAssembler,VectorIndexer,
                                OneHotEncoder,StringIndexer)

In [59]:
gender_indexer = StringIndexer(inputCol='Sex',outputCol='SexIndex')
#A B C
#0 1 2
#[1,0,0] [0,1,0] [0,0,1]
gneder_encoder = OneHotEncoder(inputCol='SexIndex',outputCol='SexVec')

In [60]:
embarked_indexer = StringIndexer(inputCol='Embarked',outputCol='EmbarkedIndex')
embarked_encoder = OneHotEncoder(inputCol='EmbarkedIndex',outputCol='EmbarkedVec')

In [61]:
assembler = VectorAssembler(inputCols=['Pclass','SexVec',
 'Age',
 'SibSp',
 'Parch','Fare','EmbarkedVec'],outputCol='features')

#Creating The ML Model

In [62]:
from pyspark.ml.classification import LogisticRegression
from pyspark.ml import Pipeline

In [63]:
log_reg_titanic = LogisticRegression(featuresCol='features',labelCol='Survived')

#Adding all the step from the StringIndexer, OneHotEncoding, Assembler & ML Model in a Pipeline to automate the process

A simple pipeline, which acts as an estimator. A Pipeline consists
of a sequence of stages, each of which is either an
Estimator or a Transformer. When
Pipeline.fit is called, the stages are executed in
order. If a stage is an Estimator, its
Estimator.fit method will be called on the input
dataset to fit a model. Then the model, which is a transformer,
will be used to transform the dataset as the input to the next
stage. If a stage is a Transformer, its
Transformer.transform method will be called to produce
the dataset for the next stage. The fitted model from a
Pipeline is a PipelineModel, which
consists of fitted models and transformers, corresponding to the
pipeline stages. If stages is an empty list, the pipeline acts as an
identity transformer.

In [64]:
pipeline = Pipeline(stages=[gender_indexer,embarked_indexer,
                            gneder_encoder,embarked_encoder,
                            assembler,log_reg_titanic])

#Splitting the Data in Train & Test 

In [65]:
train_data,test_data = my_final_data.randomSplit([0.7,0.3])

#Starting the pipeline by calling the pipeline.fit in which train_data is passed
(By this all the process in the pipeline from Indexer to Ml Object everything will run on train_data)

In [66]:
fitted_model = pipeline.fit(train_data)

#Evaluating the Test Dataframe

In [67]:
results = fitted_model.transform(test_data)

In [68]:
results.printSchema()

root
 |-- Survived: integer (nullable = true)
 |-- Pclass: integer (nullable = true)
 |-- Sex: string (nullable = true)
 |-- Age: double (nullable = true)
 |-- SibSp: integer (nullable = true)
 |-- Parch: integer (nullable = true)
 |-- Fare: double (nullable = true)
 |-- Embarked: string (nullable = true)
 |-- SexIndex: double (nullable = false)
 |-- EmbarkedIndex: double (nullable = false)
 |-- SexVec: vector (nullable = true)
 |-- EmbarkedVec: vector (nullable = true)
 |-- features: vector (nullable = true)
 |-- rawPrediction: vector (nullable = true)
 |-- probability: vector (nullable = true)
 |-- prediction: double (nullable = false)



In [69]:
results.select(['features','Survived','prediction']).show()

+--------------------+--------+----------+
|            features|Survived|prediction|
+--------------------+--------+----------+
|[1.0,1.0,18.0,1.0...|       0|       1.0|
|[1.0,1.0,19.0,1.0...|       0|       1.0|
|[1.0,1.0,21.0,0.0...|       0|       1.0|
|[1.0,1.0,24.0,0.0...|       0|       1.0|
|[1.0,1.0,36.0,0.0...|       0|       1.0|
|[1.0,1.0,37.0,1.0...|       0|       0.0|
|[1.0,1.0,40.0,0.0...|       0|       1.0|
|[1.0,1.0,45.0,0.0...|       0|       0.0|
|[1.0,1.0,45.0,0.0...|       0|       0.0|
|[1.0,1.0,46.0,0.0...|       0|       1.0|
|[1.0,1.0,47.0,0.0...|       0|       0.0|
|[1.0,1.0,47.0,0.0...|       0|       0.0|
|[1.0,1.0,54.0,0.0...|       0|       0.0|
|[1.0,1.0,56.0,0.0...|       0|       0.0|
|[1.0,1.0,56.0,0.0...|       0|       0.0|
|[1.0,1.0,58.0,0.0...|       0|       0.0|
|[1.0,1.0,61.0,0.0...|       0|       0.0|
|[1.0,1.0,61.0,0.0...|       0|       0.0|
|[1.0,1.0,64.0,1.0...|       0|       0.0|
|[1.0,1.0,65.0,0.0...|       0|       0.0|
+----------

In [70]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

In [71]:
my_eval = BinaryClassificationEvaluator(rawPredictionCol='rawPrediction',labelCol='Survived')

In [72]:
ROC = my_eval.evaluate(results)

In [73]:
ROC

0.8689589389534889

In [74]:
test_data.columns

['Survived', 'Pclass', 'Sex', 'Age', 'SibSp', 'Parch', 'Fare', 'Embarked']

#Deploying the Model to Predict Unlabeled Data

In [75]:
unlabeled_data = test_data.select(['Pclass', 'Sex', 'Age', 'SibSp', 'Parch', 'Fare', 'Embarked'])

In [76]:
deployed_prediction = fitted_model.transform(unlabeled_data)

In [77]:
deployed_prediction.select(['features','prediction']).show()

+--------------------+----------+
|            features|prediction|
+--------------------+----------+
|[1.0,1.0,18.0,1.0...|       1.0|
|[1.0,1.0,19.0,1.0...|       1.0|
|[1.0,1.0,21.0,0.0...|       1.0|
|[1.0,1.0,24.0,0.0...|       1.0|
|[1.0,1.0,36.0,0.0...|       1.0|
|[1.0,1.0,37.0,1.0...|       0.0|
|[1.0,1.0,40.0,0.0...|       1.0|
|[1.0,1.0,45.0,0.0...|       0.0|
|[1.0,1.0,45.0,0.0...|       0.0|
|[1.0,1.0,46.0,0.0...|       1.0|
|[1.0,1.0,47.0,0.0...|       0.0|
|[1.0,1.0,47.0,0.0...|       0.0|
|[1.0,1.0,54.0,0.0...|       0.0|
|[1.0,1.0,56.0,0.0...|       0.0|
|[1.0,1.0,56.0,0.0...|       0.0|
|[1.0,1.0,58.0,0.0...|       0.0|
|[1.0,1.0,61.0,0.0...|       0.0|
|[1.0,1.0,61.0,0.0...|       0.0|
|[1.0,1.0,64.0,1.0...|       0.0|
|[1.0,1.0,65.0,0.0...|       0.0|
+--------------------+----------+
only showing top 20 rows

