In [225]:
import pyspark
from pyspark.sql import SparkSession
import numpy as np

In [226]:
spark = SparkSession.builder.master('local[*]').getOrCreate()
sc = spark.sparkContext

## PySpark Part 2

In [227]:
!wget https://raw.githubusercontent.com/datasciencedojo/datasets/master/titanic.csv

--2023-02-16 01:28:02--  https://raw.githubusercontent.com/datasciencedojo/datasets/master/titanic.csv
Resolving raw.githubusercontent.com (raw.githubusercontent.com)... 185.199.108.133, 185.199.109.133, 185.199.110.133, ...
Connecting to raw.githubusercontent.com (raw.githubusercontent.com)|185.199.108.133|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 60302 (59K) [text/plain]
Saving to: ‘titanic.csv.2’


2023-02-16 01:28:03 (616 KB/s) - ‘titanic.csv.2’ saved [60302/60302]



In [228]:
df = spark.read.csv('./titanic.csv', inferSchema=True, header=True)

In [229]:
df.show(10)

+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|          Ticket|   Fare|Cabin|Embarked|
+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|          1|       0|     3|Braund, Mr. Owen ...|  male|22.0|    1|    0|       A/5 21171|   7.25| null|       S|
|          2|       1|     1|Cumings, Mrs. Joh...|female|38.0|    1|    0|        PC 17599|71.2833|  C85|       C|
|          3|       1|     3|Heikkinen, Miss. ...|female|26.0|    0|    0|STON/O2. 3101282|  7.925| null|       S|
|          4|       1|     1|Futrelle, Mrs. Ja...|female|35.0|    1|    0|          113803|   53.1| C123|       S|
|          5|       0|     3|Allen, Mr. Willia...|  male|35.0|    0|    0|          373450|   8.05| null|       S|
|          6|       0|     3|    Moran, Mr. James|  male|null|    0|    0|      

### Удалим ненужные коллонки

In [230]:
df.columns

['PassengerId',
 'Survived',
 'Pclass',
 'Name',
 'Sex',
 'Age',
 'SibSp',
 'Parch',
 'Ticket',
 'Fare',
 'Cabin',
 'Embarked']

In [231]:
df_filtered = df.select('Survived', 'Pclass', 'Sex', 'Age', 'Fare', 'Embarked')

In [232]:
df_filtered.show(5)

+--------+------+------+----+-------+--------+
|Survived|Pclass|   Sex| Age|   Fare|Embarked|
+--------+------+------+----+-------+--------+
|       0|     3|  male|22.0|   7.25|       S|
|       1|     1|female|38.0|71.2833|       C|
|       1|     3|female|26.0|  7.925|       S|
|       1|     1|female|35.0|   53.1|       S|
|       0|     3|  male|35.0|   8.05|       S|
+--------+------+------+----+-------+--------+
only showing top 5 rows



## колонки Age и Embarked содержат пропуски

In [233]:
df_filtered.groupBy('Embarked').count().show()

+--------+-----+
|Embarked|count|
+--------+-----+
|       Q|   77|
|    null|    2|
|       C|  168|
|       S|  644|
+--------+-----+



In [234]:
df_filtered.describe().show()

+-------+-------------------+------------------+------+------------------+-----------------+--------+
|summary|           Survived|            Pclass|   Sex|               Age|             Fare|Embarked|
+-------+-------------------+------------------+------+------------------+-----------------+--------+
|  count|                891|               891|   891|               714|              891|     889|
|   mean| 0.3838383838383838| 2.308641975308642|  null| 29.69911764705882| 32.2042079685746|    null|
| stddev|0.48659245426485753|0.8360712409770491|  null|14.526497332334035|49.69342859718089|    null|
|    min|                  0|                 1|female|              0.42|              0.0|       C|
|    max|                  1|                 3|  male|              80.0|         512.3292|       S|
+-------+-------------------+------------------+------+------------------+-----------------+--------+



In [235]:
df_filtered = df_filtered.na.fill({'Age': 29, 'Embarked': 'S'})

## Преобразовываем колонку Sex в числа, а колонку Embarked закодируем по принципу OneHot

In [236]:
from pyspark.ml.feature import StringIndexer, OneHotEncoder
from pyspark.ml.feature import VectorAssembler

In [237]:
indexer = StringIndexer(inputCol='Sex', outputCol='SexInd')
indexer_train = indexer.fit(df_filtered)
df_features = indexer_train.transform(df_filtered)

In [238]:
indexer_train.labels

['male', 'female']

In [239]:
df_features.show()

+--------+------+------+----+-------+--------+------+
|Survived|Pclass|   Sex| Age|   Fare|Embarked|SexInd|
+--------+------+------+----+-------+--------+------+
|       0|     3|  male|22.0|   7.25|       S|   0.0|
|       1|     1|female|38.0|71.2833|       C|   1.0|
|       1|     3|female|26.0|  7.925|       S|   1.0|
|       1|     1|female|35.0|   53.1|       S|   1.0|
|       0|     3|  male|35.0|   8.05|       S|   0.0|
|       0|     3|  male|29.0| 8.4583|       Q|   0.0|
|       0|     1|  male|54.0|51.8625|       S|   0.0|
|       0|     3|  male| 2.0| 21.075|       S|   0.0|
|       1|     3|female|27.0|11.1333|       S|   1.0|
|       1|     2|female|14.0|30.0708|       C|   1.0|
|       1|     3|female| 4.0|   16.7|       S|   1.0|
|       1|     1|female|58.0|  26.55|       S|   1.0|
|       0|     3|  male|20.0|   8.05|       S|   0.0|
|       0|     3|  male|39.0| 31.275|       S|   0.0|
|       0|     3|female|14.0| 7.8542|       S|   1.0|
|       1|     2|female|55.0

In [240]:
indexer = StringIndexer(inputCol='Embarked', outputCol='EmbarkedInd')
indexer_train = indexer.fit(df_features)
df_features = indexer_train.transform(df_features)

In [241]:
df_features.show()

+--------+------+------+----+-------+--------+------+-----------+
|Survived|Pclass|   Sex| Age|   Fare|Embarked|SexInd|EmbarkedInd|
+--------+------+------+----+-------+--------+------+-----------+
|       0|     3|  male|22.0|   7.25|       S|   0.0|        0.0|
|       1|     1|female|38.0|71.2833|       C|   1.0|        1.0|
|       1|     3|female|26.0|  7.925|       S|   1.0|        0.0|
|       1|     1|female|35.0|   53.1|       S|   1.0|        0.0|
|       0|     3|  male|35.0|   8.05|       S|   0.0|        0.0|
|       0|     3|  male|29.0| 8.4583|       Q|   0.0|        2.0|
|       0|     1|  male|54.0|51.8625|       S|   0.0|        0.0|
|       0|     3|  male| 2.0| 21.075|       S|   0.0|        0.0|
|       1|     3|female|27.0|11.1333|       S|   1.0|        0.0|
|       1|     2|female|14.0|30.0708|       C|   1.0|        1.0|
|       1|     3|female| 4.0|   16.7|       S|   1.0|        0.0|
|       1|     1|female|58.0|  26.55|       S|   1.0|        0.0|
|       0|

In [242]:
OHE = OneHotEncoder(inputCol='EmbarkedInd', outputCol='EmbarkedOHE')
OHE_train = OHE.fit(df_features)

In [243]:
df_features = OHE_train.transform(df_features)

In [244]:
df_features.show(10)

+--------+------+------+----+-------+--------+------+-----------+-------------+
|Survived|Pclass|   Sex| Age|   Fare|Embarked|SexInd|EmbarkedInd|  EmbarkedOHE|
+--------+------+------+----+-------+--------+------+-----------+-------------+
|       0|     3|  male|22.0|   7.25|       S|   0.0|        0.0|(2,[0],[1.0])|
|       1|     1|female|38.0|71.2833|       C|   1.0|        1.0|(2,[1],[1.0])|
|       1|     3|female|26.0|  7.925|       S|   1.0|        0.0|(2,[0],[1.0])|
|       1|     1|female|35.0|   53.1|       S|   1.0|        0.0|(2,[0],[1.0])|
|       0|     3|  male|35.0|   8.05|       S|   0.0|        0.0|(2,[0],[1.0])|
|       0|     3|  male|29.0| 8.4583|       Q|   0.0|        2.0|    (2,[],[])|
|       0|     1|  male|54.0|51.8625|       S|   0.0|        0.0|(2,[0],[1.0])|
|       0|     3|  male| 2.0| 21.075|       S|   0.0|        0.0|(2,[0],[1.0])|
|       1|     3|female|27.0|11.1333|       S|   1.0|        0.0|(2,[0],[1.0])|
|       1|     2|female|14.0|30.0708|   

In [245]:
df_features.columns

['Survived',
 'Pclass',
 'Sex',
 'Age',
 'Fare',
 'Embarked',
 'SexInd',
 'EmbarkedInd',
 'EmbarkedOHE']

In [246]:
vecAssembler_cols = [
     'Pclass',
     'Age',
     'Fare',
     'SexInd',
     'EmbarkedOHE'
]

vecAssembler = VectorAssembler(inputCols=vecAssembler_cols, outputCol='vecAssembler_col')
df_features = vecAssembler.transform(df_features)

In [247]:
from pyspark.ml import Pipeline

In [248]:
pipeline = Pipeline(stages = [
    StringIndexer(inputCol='Sex', outputCol='SexInd'),
    StringIndexer(inputCol='Embarked', outputCol='EmbarkedInd'),
    OneHotEncoder(inputCol='EmbarkedInd', outputCol='EmbarkedOHE'),
    VectorAssembler(inputCols=vecAssembler_cols, outputCol='vecAssembler_col')
])

In [250]:
pipeline_train = pipeline.fit(df_filtered)

In [251]:
pipeline_train.transform(df_filtered).show()

+--------+------+------+----+-------+--------+------+-----------+-------------+--------------------+
|Survived|Pclass|   Sex| Age|   Fare|Embarked|SexInd|EmbarkedInd|  EmbarkedOHE|    vecAssembler_col|
+--------+------+------+----+-------+--------+------+-----------+-------------+--------------------+
|       0|     3|  male|22.0|   7.25|       S|   0.0|        0.0|(2,[0],[1.0])|[3.0,22.0,7.25,0....|
|       1|     1|female|38.0|71.2833|       C|   1.0|        1.0|(2,[1],[1.0])|[1.0,38.0,71.2833...|
|       1|     3|female|26.0|  7.925|       S|   1.0|        0.0|(2,[0],[1.0])|[3.0,26.0,7.925,1...|
|       1|     1|female|35.0|   53.1|       S|   1.0|        0.0|(2,[0],[1.0])|[1.0,35.0,53.1,1....|
|       0|     3|  male|35.0|   8.05|       S|   0.0|        0.0|(2,[0],[1.0])|[3.0,35.0,8.05,0....|
|       0|     3|  male|29.0| 8.4583|       Q|   0.0|        2.0|    (2,[],[])|[3.0,29.0,8.4583,...|
|       0|     1|  male|54.0|51.8625|       S|   0.0|        0.0|(2,[0],[1.0])|[1.0,54.0,51

## test / train split

In [253]:
train, test = df_features.randomSplit([0.8, 0.2], seed=42)

In [254]:
train.show(5)

+--------+------+------+----+------+--------+------+-----------+-------------+--------------------+
|Survived|Pclass|   Sex| Age|  Fare|Embarked|SexInd|EmbarkedInd|  EmbarkedOHE|    vecAssembler_col|
+--------+------+------+----+------+--------+------+-----------+-------------+--------------------+
|       0|     1|female| 2.0|151.55|       S|   1.0|        0.0|(2,[0],[1.0])|[1.0,2.0,151.55,1...|
|       0|     1|female|25.0|151.55|       S|   1.0|        0.0|(2,[0],[1.0])|[1.0,25.0,151.55,...|
|       0|     1|  male|18.0| 108.9|       C|   0.0|        1.0|(2,[1],[1.0])|[1.0,18.0,108.9,0...|
|       0|     1|  male|19.0|  53.1|       S|   0.0|        0.0|(2,[0],[1.0])|[1.0,19.0,53.1,0....|
|       0|     1|  male|19.0| 263.0|       S|   0.0|        0.0|(2,[0],[1.0])|[1.0,19.0,263.0,0...|
+--------+------+------+----+------+--------+------+-----------+-------------+--------------------+
only showing top 5 rows



## Log Reg

In [255]:
from pyspark.ml.classification import LogisticRegression

In [260]:
lr = LogisticRegression(featuresCol='vecAssembler_col', labelCol='Survived')

In [261]:
lr_model = lr.fit(train)

23/02/16 01:37:02 WARN InstanceBuilder$NativeBLAS: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS
23/02/16 01:37:02 WARN InstanceBuilder$NativeBLAS: Failed to load implementation from:dev.ludovic.netlib.blas.ForeignLinkerBLAS


In [263]:
train_result = lr_model.transform(train)
test_result = lr_model.transform(test)

In [268]:
train_result[['rawPrediction', 'probability', 'prediction']].show()

+--------------------+--------------------+----------+
|       rawPrediction|         probability|prediction|
+--------------------+--------------------+----------+
|[-3.2770503739718...|[0.03636694297313...|       1.0|
|[-2.4634326447022...|[0.07846177856536...|       1.0|
|[-0.8425146749117...|[0.30100542886346...|       1.0|
|[-0.2792317510571...|[0.43064213257868...|       1.0|
|[-0.0805599876783...|[0.47987088824941...|       1.0|
|[-0.6757126909257...|[0.33721885629452...|       1.0|
|[-0.4990610510095...|[0.37776135065493...|       1.0|
|[-0.4270309291146...|[0.39483554069659...|       1.0|
|[0.03346136347293...|[0.50836456042550...|       0.0|
|[-0.5140672038821...|[0.37424056304724...|       1.0|
|[0.02425558020251...|[0.50606359776890...|       0.0|
|[0.04879376727062...|[0.51219602219062...|       0.0|
|[0.04886475527564...|[0.51221375861754...|       0.0|
|[0.04938533397911...|[0.51234382480569...|       0.0|
|[-0.5302298116443...|[0.37046328971025...|       1.0|
|[0.052650

## Оценка качества

In [269]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

In [270]:
ev = BinaryClassificationEvaluator(labelCol='Survived')

In [271]:
ev.evaluate(train_result), ev.evaluate(test_result)

(0.8499827803926061, 0.8613387978142075)