# Titanic with Spark MLlib

In [1]:
import warnings
warnings.filterwarnings('ignore')
spark_ui_port = 4040
app_name = "Otus"

from itertools import groupby

In [2]:
import pyspark

spark = (
    pyspark.sql.SparkSession.builder
        .appName(app_name)
        .master("local[4]") # limit executor to 4 cores
        .config("spark.executor.memory", "1g")
        .config("spark.driver.memory", "1g")
        .config("spark.ui.port", spark_ui_port)
        .getOrCreate()
)
spark.conf.set('spark.sql.repl.eagerEval.enabled', True)  # to pretty print pyspark.DataFrame in jupyter

# sc = spark.sparkContext
# sc.setLogLevel("ERROR")

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/02/08 21:07:27 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
df = spark.read.csv("./titanic/train.csv", inferSchema=True, header=True)
df.show(5)

+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|          Ticket|   Fare|Cabin|Embarked|
+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|          1|       0|     3|Braund, Mr. Owen ...|  male|22.0|    1|    0|       A/5 21171|   7.25| NULL|       S|
|          2|       1|     1|Cumings, Mrs. Joh...|female|38.0|    1|    0|        PC 17599|71.2833|  C85|       C|
|          3|       1|     3|Heikkinen, Miss. ...|female|26.0|    0|    0|STON/O2. 3101282|  7.925| NULL|       S|
|          4|       1|     1|Futrelle, Mrs. Ja...|female|35.0|    1|    0|          113803|   53.1| C123|       S|
|          5|       0|     3|Allen, Mr. Willia...|  male|35.0|    0|    0|          373450|   8.05| NULL|       S|
+-----------+--------+------+--------------------+------+----+-----+-----+------

In [4]:
type(df)

pyspark.sql.dataframe.DataFrame

In [5]:
# dataset shape
(len(df.columns), df.count())

(12, 891)

In [6]:
df.printSchema()

root
 |-- PassengerId: integer (nullable = true)
 |-- Survived: integer (nullable = true)
 |-- Pclass: integer (nullable = true)
 |-- Name: string (nullable = true)
 |-- Sex: string (nullable = true)
 |-- Age: double (nullable = true)
 |-- SibSp: integer (nullable = true)
 |-- Parch: integer (nullable = true)
 |-- Ticket: string (nullable = true)
 |-- Fare: double (nullable = true)
 |-- Cabin: string (nullable = true)
 |-- Embarked: string (nullable = true)



Так смотреть не удобно - отформатируем вывод

In [7]:
dt = df.dtypes
for r in dt:
    print(f"{r[0]:>25}\t{r[1]}")

              PassengerId	int
                 Survived	int
                   Pclass	int
                     Name	string
                      Sex	string
                      Age	double
                    SibSp	int
                    Parch	int
                   Ticket	string
                     Fare	double
                    Cabin	string
                 Embarked	string


Давайте cортируем по типам

In [8]:
for r in sorted(
    df.dtypes, 
    key=lambda x: x[1]
):
    print(f"{r[0]:>25}\t{r[1]}")

                      Age	double
                     Fare	double
              PassengerId	int
                 Survived	int
                   Pclass	int
                    SibSp	int
                    Parch	int
                     Name	string
                      Sex	string
                   Ticket	string
                    Cabin	string
                 Embarked	string


Соберем по типам

In [9]:
dt.sort(key=lambda x: x[1])

print('Data types:')
for k, g in groupby(dt, lambda x: x[1]):
    print(f'{k:<6} - {len(list(g))}')

Data types:
double - 2
int    - 5
string - 5


## Кодирование категориальных признаков

In [10]:
df.show(5)

+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|          Ticket|   Fare|Cabin|Embarked|
+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|          1|       0|     3|Braund, Mr. Owen ...|  male|22.0|    1|    0|       A/5 21171|   7.25| NULL|       S|
|          2|       1|     1|Cumings, Mrs. Joh...|female|38.0|    1|    0|        PC 17599|71.2833|  C85|       C|
|          3|       1|     3|Heikkinen, Miss. ...|female|26.0|    0|    0|STON/O2. 3101282|  7.925| NULL|       S|
|          4|       1|     1|Futrelle, Mrs. Ja...|female|35.0|    1|    0|          113803|   53.1| C123|       S|
|          5|       0|     3|Allen, Mr. Willia...|  male|35.0|    0|    0|          373450|   8.05| NULL|       S|
+-----------+--------+------+--------------------+------+----+-----+-----+------

In [11]:
from pyspark.ml.feature import StringIndexer

### Sex

<div class="alert alert-warning">

<b>Warning! Частая ошибка</b>


    
StringIndexer может работать с пропущенными значениями только в формате <b>NaN</b>, но не <b>NULL</b>!

<b>Необходимо проверить на пропуски!</b>
</div>

In [12]:
df.filter(df['Sex'].isNull()).count()

0

In [13]:
sex_indexer = StringIndexer()\
    .setInputCol('Sex')\
    .setOutputCol('SexIndexed')
df_prep = sex_indexer.fit(df).transform(df)
df_prep.show(5)

+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+----------+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|          Ticket|   Fare|Cabin|Embarked|SexIndexed|
+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+----------+
|          1|       0|     3|Braund, Mr. Owen ...|  male|22.0|    1|    0|       A/5 21171|   7.25| NULL|       S|       0.0|
|          2|       1|     1|Cumings, Mrs. Joh...|female|38.0|    1|    0|        PC 17599|71.2833|  C85|       C|       1.0|
|          3|       1|     3|Heikkinen, Miss. ...|female|26.0|    0|    0|STON/O2. 3101282|  7.925| NULL|       S|       1.0|
|          4|       1|     1|Futrelle, Mrs. Ja...|female|35.0|    1|    0|          113803|   53.1| C123|       S|       1.0|
|          5|       0|     3|Allen, Mr. Willia...|  male|35.0|    0|    0|          373450|   8.05| NULL|       S|    

In [14]:
sex_indexer = StringIndexer(inputCol="Sex", outputCol="SexIndex")
sex_indexer_model = sex_indexer.fit(df)
df_prep = sex_indexer_model.transform(df)
df_prep.show(5)

+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+--------+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|          Ticket|   Fare|Cabin|Embarked|SexIndex|
+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+--------+
|          1|       0|     3|Braund, Mr. Owen ...|  male|22.0|    1|    0|       A/5 21171|   7.25| NULL|       S|     0.0|
|          2|       1|     1|Cumings, Mrs. Joh...|female|38.0|    1|    0|        PC 17599|71.2833|  C85|       C|     1.0|
|          3|       1|     3|Heikkinen, Miss. ...|female|26.0|    0|    0|STON/O2. 3101282|  7.925| NULL|       S|     1.0|
|          4|       1|     1|Futrelle, Mrs. Ja...|female|35.0|    1|    0|          113803|   53.1| C123|       S|     1.0|
|          5|       0|     3|Allen, Mr. Willia...|  male|35.0|    0|    0|          373450|   8.05| NULL|       S|     0.0|
+-------

### Pclass

In [15]:
df_prep.filter(df_prep['Pclass'].isNull()).count()

0

In [16]:
from pyspark.ml.feature import OneHotEncoder

pclass_indexer = StringIndexer()\
    .setInputCol('Pclass')\
    .setOutputCol('PclassIndex')

pclass_indexer_model = pclass_indexer.fit(df_prep)
df_prep = pclass_indexer_model.transform(df_prep)
df_prep.show(5)

+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+--------+-----------+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|          Ticket|   Fare|Cabin|Embarked|SexIndex|PclassIndex|
+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+--------+-----------+
|          1|       0|     3|Braund, Mr. Owen ...|  male|22.0|    1|    0|       A/5 21171|   7.25| NULL|       S|     0.0|        0.0|
|          2|       1|     1|Cumings, Mrs. Joh...|female|38.0|    1|    0|        PC 17599|71.2833|  C85|       C|     1.0|        1.0|
|          3|       1|     3|Heikkinen, Miss. ...|female|26.0|    0|    0|STON/O2. 3101282|  7.925| NULL|       S|     1.0|        0.0|
|          4|       1|     1|Futrelle, Mrs. Ja...|female|35.0|    1|    0|          113803|   53.1| C123|       S|     1.0|        1.0|
|          5|       0|     3|Allen, Mr. Willia..

In [17]:
pclass_encoder = OneHotEncoder()\
    .setInputCol('PclassIndex')\
    .setOutputCol('PclassEncoded')

pclass_encoder_model = pclass_encoder.fit(df_prep)
df_prep = pclass_encoder_model.transform(df_prep)
df_prep.show(5)

+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+--------+-----------+-------------+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|          Ticket|   Fare|Cabin|Embarked|SexIndex|PclassIndex|PclassEncoded|
+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+--------+-----------+-------------+
|          1|       0|     3|Braund, Mr. Owen ...|  male|22.0|    1|    0|       A/5 21171|   7.25| NULL|       S|     0.0|        0.0|(2,[0],[1.0])|
|          2|       1|     1|Cumings, Mrs. Joh...|female|38.0|    1|    0|        PC 17599|71.2833|  C85|       C|     1.0|        1.0|(2,[1],[1.0])|
|          3|       1|     3|Heikkinen, Miss. ...|female|26.0|    0|    0|STON/O2. 3101282|  7.925| NULL|       S|     1.0|        0.0|(2,[0],[1.0])|
|          4|       1|     1|Futrelle, Mrs. Ja...|female|35.0|    1|    0|          113803|   53.1| 

In [18]:
df_prep.head(5)

[Row(PassengerId=1, Survived=0, Pclass=3, Name='Braund, Mr. Owen Harris', Sex='male', Age=22.0, SibSp=1, Parch=0, Ticket='A/5 21171', Fare=7.25, Cabin=None, Embarked='S', SexIndex=0.0, PclassIndex=0.0, PclassEncoded=SparseVector(2, {0: 1.0})),
 Row(PassengerId=2, Survived=1, Pclass=1, Name='Cumings, Mrs. John Bradley (Florence Briggs Thayer)', Sex='female', Age=38.0, SibSp=1, Parch=0, Ticket='PC 17599', Fare=71.2833, Cabin='C85', Embarked='C', SexIndex=1.0, PclassIndex=1.0, PclassEncoded=SparseVector(2, {1: 1.0})),
 Row(PassengerId=3, Survived=1, Pclass=3, Name='Heikkinen, Miss. Laina', Sex='female', Age=26.0, SibSp=0, Parch=0, Ticket='STON/O2. 3101282', Fare=7.925, Cabin=None, Embarked='S', SexIndex=1.0, PclassIndex=0.0, PclassEncoded=SparseVector(2, {0: 1.0})),
 Row(PassengerId=4, Survived=1, Pclass=1, Name='Futrelle, Mrs. Jacques Heath (Lily May Peel)', Sex='female', Age=35.0, SibSp=1, Parch=0, Ticket='113803', Fare=53.1, Cabin='C123', Embarked='S', SexIndex=1.0, PclassIndex=1.0, Pc

In [19]:
df_prep.head()[-1]

SparseVector(2, {0: 1.0})

In [20]:
df_prep.show(10)

+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+--------+-----------+-------------+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|          Ticket|   Fare|Cabin|Embarked|SexIndex|PclassIndex|PclassEncoded|
+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+--------+-----------+-------------+
|          1|       0|     3|Braund, Mr. Owen ...|  male|22.0|    1|    0|       A/5 21171|   7.25| NULL|       S|     0.0|        0.0|(2,[0],[1.0])|
|          2|       1|     1|Cumings, Mrs. Joh...|female|38.0|    1|    0|        PC 17599|71.2833|  C85|       C|     1.0|        1.0|(2,[1],[1.0])|
|          3|       1|     3|Heikkinen, Miss. ...|female|26.0|    0|    0|STON/O2. 3101282|  7.925| NULL|       S|     1.0|        0.0|(2,[0],[1.0])|
|          4|       1|     1|Futrelle, Mrs. Ja...|female|35.0|    1|    0|          113803|   53.1| 

### Embarked

In [21]:
df_prep.filter(df_prep['Embarked'].isNull()).count()

2

<div class="alert alert-warning">

<b>Warning! Частая ошибка</b>



StringIndexer может работать с пропущенными значениями только в формате <b>NaN</b>, но не <b>NULL</b>!

Если мы закодируем значения `Embarked` то мы не увидим ошибки. Мы получим ошибку только при обращении к этой строке!
    
</div>

In [22]:
embark_indexer = StringIndexer()\
    .setInputCol('Embarked')\
    .setOutputCol('EmbarkedIndex')

df_err = embark_indexer.fit(df_prep).transform(df_prep)
df_err.show(5)

+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+--------+-----------+-------------+-------------+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|          Ticket|   Fare|Cabin|Embarked|SexIndex|PclassIndex|PclassEncoded|EmbarkedIndex|
+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+--------+-----------+-------------+-------------+
|          1|       0|     3|Braund, Mr. Owen ...|  male|22.0|    1|    0|       A/5 21171|   7.25| NULL|       S|     0.0|        0.0|(2,[0],[1.0])|          0.0|
|          2|       1|     1|Cumings, Mrs. Joh...|female|38.0|    1|    0|        PC 17599|71.2833|  C85|       C|     1.0|        1.0|(2,[1],[1.0])|          1.0|
|          3|       1|     3|Heikkinen, Miss. ...|female|26.0|    0|    0|STON/O2. 3101282|  7.925| NULL|       S|     1.0|        0.0|(2,[0],[1.0])|          0.0|
|          4|   

In [23]:
# uncomment for error

# df_err.show(62)

#### Заполним пропуски

In [24]:
df_prep = df_prep.fillna('X', subset=['Embarked'])
df_prep.show(62)

+-----------+--------+------+--------------------+------+----+-----+-----+----------------+--------+-----------+--------+--------+-----------+-------------+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|          Ticket|    Fare|      Cabin|Embarked|SexIndex|PclassIndex|PclassEncoded|
+-----------+--------+------+--------------------+------+----+-----+-----+----------------+--------+-----------+--------+--------+-----------+-------------+
|          1|       0|     3|Braund, Mr. Owen ...|  male|22.0|    1|    0|       A/5 21171|    7.25|       NULL|       S|     0.0|        0.0|(2,[0],[1.0])|
|          2|       1|     1|Cumings, Mrs. Joh...|female|38.0|    1|    0|        PC 17599| 71.2833|        C85|       C|     1.0|        1.0|(2,[1],[1.0])|
|          3|       1|     3|Heikkinen, Miss. ...|female|26.0|    0|    0|STON/O2. 3101282|   7.925|       NULL|       S|     1.0|        0.0|(2,[0],[1.0])|
|          4|       1|     1|Futrelle, Mrs. Ja...|female|3

In [25]:
embark_indexer = StringIndexer()\
    .setInputCol('Embarked')\
    .setOutputCol('EmbarkedIndex')

embark_indexer_model = embark_indexer.fit(df_prep)
df_prep = embark_indexer_model.transform(df_prep)
df_prep.show(62)

+-----------+--------+------+--------------------+------+----+-----+-----+----------------+--------+-----------+--------+--------+-----------+-------------+-------------+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|          Ticket|    Fare|      Cabin|Embarked|SexIndex|PclassIndex|PclassEncoded|EmbarkedIndex|
+-----------+--------+------+--------------------+------+----+-----+-----+----------------+--------+-----------+--------+--------+-----------+-------------+-------------+
|          1|       0|     3|Braund, Mr. Owen ...|  male|22.0|    1|    0|       A/5 21171|    7.25|       NULL|       S|     0.0|        0.0|(2,[0],[1.0])|          0.0|
|          2|       1|     1|Cumings, Mrs. Joh...|female|38.0|    1|    0|        PC 17599| 71.2833|        C85|       C|     1.0|        1.0|(2,[1],[1.0])|          1.0|
|          3|       1|     3|Heikkinen, Miss. ...|female|26.0|    0|    0|STON/O2. 3101282|   7.925|       NULL|       S|     1.0|        0.0|(2,

In [26]:
embarked_encoder = OneHotEncoder()\
    .setInputCol('EmbarkedIndex')\
    .setOutputCol('EmbarkedEncoded')

embarked_encoder_model = embarked_encoder.fit(df_prep)
df_prep = embarked_encoder_model.transform(df_prep)
df_prep.show(5)

+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+--------+-----------+-------------+-------------+---------------+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|          Ticket|   Fare|Cabin|Embarked|SexIndex|PclassIndex|PclassEncoded|EmbarkedIndex|EmbarkedEncoded|
+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+--------+-----------+-------------+-------------+---------------+
|          1|       0|     3|Braund, Mr. Owen ...|  male|22.0|    1|    0|       A/5 21171|   7.25| NULL|       S|     0.0|        0.0|(2,[0],[1.0])|          0.0|  (3,[0],[1.0])|
|          2|       1|     1|Cumings, Mrs. Joh...|female|38.0|    1|    0|        PC 17599|71.2833|  C85|       C|     1.0|        1.0|(2,[1],[1.0])|          1.0|  (3,[1],[1.0])|
|          3|       1|     3|Heikkinen, Miss. ...|female|26.0|    0|    0|STON/O2. 3101282|  7.925| 

## Нормализация числовых признаков

### Age

In [27]:
from pyspark.ml.feature import Imputer

In [28]:
age_imputer = Imputer()\
    .setInputCols(['Age'])\
    .setOutputCols(['AgeImputed'])\
    .setStrategy('mean')


age_imputer_model = age_imputer.fit(df_prep)
df_prep = age_imputer_model.transform(df_prep)
df_prep.show(62)

+-----------+--------+------+--------------------+------+----+-----+-----+----------------+--------+-----------+--------+--------+-----------+-------------+-------------+---------------+-----------------+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|          Ticket|    Fare|      Cabin|Embarked|SexIndex|PclassIndex|PclassEncoded|EmbarkedIndex|EmbarkedEncoded|       AgeImputed|
+-----------+--------+------+--------------------+------+----+-----+-----+----------------+--------+-----------+--------+--------+-----------+-------------+-------------+---------------+-----------------+
|          1|       0|     3|Braund, Mr. Owen ...|  male|22.0|    1|    0|       A/5 21171|    7.25|       NULL|       S|     0.0|        0.0|(2,[0],[1.0])|          0.0|  (3,[0],[1.0])|             22.0|
|          2|       1|     1|Cumings, Mrs. Joh...|female|38.0|    1|    0|        PC 17599| 71.2833|        C85|       C|     1.0|        1.0|(2,[1],[1.0])|          1.0|  (3,[1],[

In [29]:
from pyspark.ml.feature import VectorAssembler

In [33]:
age_assembler = VectorAssembler()\
    .setInputCols(["AgeImputed"])\
    .setOutputCol("AgeVector")

df_prep = age_assembler.transform(df_prep)
df_prep.show(5)

+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+--------+-----------+-------------+-------------+---------------+----------+------------------+---------+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|          Ticket|   Fare|Cabin|Embarked|SexIndex|PclassIndex|PclassEncoded|EmbarkedIndex|EmbarkedEncoded|AgeImputed|   AssembledVector|AgeVector|
+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+--------+-----------+-------------+-------------+---------------+----------+------------------+---------+
|          1|       0|     3|Braund, Mr. Owen ...|  male|22.0|    1|    0|       A/5 21171|   7.25| NULL|       S|     0.0|        0.0|(2,[0],[1.0])|          0.0|  (3,[0],[1.0])|      22.0|[22.0,1.0,0.0,0.0]|   [22.0]|
|          2|       1|     1|Cumings, Mrs. Joh...|female|38.0|    1|    0|        PC 17599|71.2833|  C85|       C|     1

In [34]:
from pyspark.ml.feature import MinMaxScaler

In [35]:
age_scaler = MinMaxScaler()\
    .setInputCol('AgeVector')\
    .setOutputCol('AgeScaled')

age_scaler_model = age_scaler.fit(df_prep)
df_prep = age_scaler_model.transform(df_prep)
df_prep.show(5)

+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+--------+-----------+-------------+-------------+---------------+----------+------------------+---------+--------------------+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|          Ticket|   Fare|Cabin|Embarked|SexIndex|PclassIndex|PclassEncoded|EmbarkedIndex|EmbarkedEncoded|AgeImputed|   AssembledVector|AgeVector|           AgeScaled|
+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+--------+-----------+-------------+-------------+---------------+----------+------------------+---------+--------------------+
|          1|       0|     3|Braund, Mr. Owen ...|  male|22.0|    1|    0|       A/5 21171|   7.25| NULL|       S|     0.0|        0.0|(2,[0],[1.0])|          0.0|  (3,[0],[1.0])|      22.0|[22.0,1.0,0.0,0.0]|   [22.0]|[0.2711736617240513]|
|          2|       1|     1|Cumings

In [36]:
df_prep.filter(df_prep['Fare'].isNull()).count()

0

In [37]:
fare_imputer = Imputer(inputCol="Fare", outputCol="FareImputed")
fare_imputer_model = fare_imputer.fit(df_prep)
df_prep = fare_imputer_model.transform(df_prep)
df_prep.show(5)

+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+--------+-----------+-------------+-------------+---------------+----------+------------------+---------+--------------------+-----------+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|          Ticket|   Fare|Cabin|Embarked|SexIndex|PclassIndex|PclassEncoded|EmbarkedIndex|EmbarkedEncoded|AgeImputed|   AssembledVector|AgeVector|           AgeScaled|FareImputed|
+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+--------+-----------+-------------+-------------+---------------+----------+------------------+---------+--------------------+-----------+
|          1|       0|     3|Braund, Mr. Owen ...|  male|22.0|    1|    0|       A/5 21171|   7.25| NULL|       S|     0.0|        0.0|(2,[0],[1.0])|          0.0|  (3,[0],[1.0])|      22.0|[22.0,1.0,0.0,0.0]|   [22.0]|[0.2711736617240513]| 

In [38]:
fare_assembler = VectorAssembler(inputCols=["FareImputed"], outputCol="FareVector")
df_prep = fare_assembler.transform(df_prep)
df_prep.show(5)

+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+--------+-----------+-------------+-------------+---------------+----------+------------------+---------+--------------------+-----------+----------+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|          Ticket|   Fare|Cabin|Embarked|SexIndex|PclassIndex|PclassEncoded|EmbarkedIndex|EmbarkedEncoded|AgeImputed|   AssembledVector|AgeVector|           AgeScaled|FareImputed|FareVector|
+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+--------+-----------+-------------+-------------+---------------+----------+------------------+---------+--------------------+-----------+----------+
|          1|       0|     3|Braund, Mr. Owen ...|  male|22.0|    1|    0|       A/5 21171|   7.25| NULL|       S|     0.0|        0.0|(2,[0],[1.0])|          0.0|  (3,[0],[1.0])|      22.0|[22.0,1.0,0.0,0.0]

In [39]:
from pyspark.ml.feature import RobustScaler

fare_scaler = RobustScaler(inputCol="FareVector", outputCol="FareScaled")
fare_scaler_model = fare_scaler.fit(df_prep)
df_prep = fare_scaler_model.transform(df_prep)
df_prep.show(5)


+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+--------+-----------+-------------+-------------+---------------+----------+------------------+---------+--------------------+-----------+----------+--------------------+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|          Ticket|   Fare|Cabin|Embarked|SexIndex|PclassIndex|PclassEncoded|EmbarkedIndex|EmbarkedEncoded|AgeImputed|   AssembledVector|AgeVector|           AgeScaled|FareImputed|FareVector|          FareScaled|
+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+--------+-----------+-------------+-------------+---------------+----------+------------------+---------+--------------------+-----------+----------+--------------------+
|          1|       0|     3|Braund, Mr. Owen ...|  male|22.0|    1|    0|       A/5 21171|   7.25| NULL|       S|     0.0|        0.0|(2,[0],[1.

## Собираем вектор признаков

Для алгоритмов МО из Spark MlLib нужно подавать на вход столбец с вектором признаков

In [40]:
from pyspark.ml.feature import VectorAssembler

features_assembler = VectorAssembler(inputCols=[
    "SexIndex",
    "PclassEncoded",
    "AgeScaled",
    "FareScaled",
    ],
    outputCol="Features",
)

prep_df = df
prep_df = sex_indexer_model.transform(prep_df)
prep_df = pclass_indexer_model.transform(prep_df)
prep_df = pclass_encoder_model.transform(prep_df)
prep_df = age_imputer_model.transform(prep_df)
prep_df = age_assembler.transform(prep_df)
prep_df = age_scaler_model.transform(prep_df)
prep_df = fare_imputer_model.transform(prep_df)
prep_df = fare_assembler.transform(prep_df)
prep_df = fare_scaler_model.transform(prep_df)
feat_df = features_assembler.transform(prep_df)

feat_df.show(5)

+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+--------+-----------+-------------+----------+---------+--------------------+-----------+----------+--------------------+--------------------+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|          Ticket|   Fare|Cabin|Embarked|SexIndex|PclassIndex|PclassEncoded|AgeImputed|AgeVector|           AgeScaled|FareImputed|FareVector|          FareScaled|            Features|
+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+--------+-----------+-------------+----------+---------+--------------------+-----------+----------+--------------------+--------------------+
|          1|       0|     3|Braund, Mr. Owen ...|  male|22.0|    1|    0|       A/5 21171|   7.25| NULL|       S|     0.0|        0.0|(2,[0],[1.0])|      22.0|   [22.0]|[0.2711736617240513]|       7.25|    [7.25]|[0.313795760078

## Конвейер

Объединим различные этапы подготовки признаков в единый конвейер

In [41]:
from pyspark.ml.pipeline import Pipeline

feat_ext_pipe = Pipeline(stages=[
    sex_indexer,
    pclass_indexer,
    pclass_encoder,
    age_imputer,
    age_assembler,
    age_scaler,
    fare_imputer_model,
    fare_assembler,
    fare_scaler_model,
    features_assembler,
]).fit(df)


In [42]:
feat_df = feat_ext_pipe.transform(df)
feat_df.show(5)

+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+--------+-----------+-------------+----------+---------+--------------------+-----------+----------+--------------------+--------------------+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|          Ticket|   Fare|Cabin|Embarked|SexIndex|PclassIndex|PclassEncoded|AgeImputed|AgeVector|           AgeScaled|FareImputed|FareVector|          FareScaled|            Features|
+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+--------+-----------+-------------+----------+---------+--------------------+-----------+----------+--------------------+--------------------+
|          1|       0|     3|Braund, Mr. Owen ...|  male|22.0|    1|    0|       A/5 21171|   7.25| NULL|       S|     0.0|        0.0|(2,[0],[1.0])|      22.0|   [22.0]|[0.2711736617240513]|       7.25|    [7.25]|[0.313795760078

## Сохранение

Сохраним конвейер на диск для последующего использования при подготовке других данных

In [43]:
feat_ext_pipe.write().overwrite().save(f"{app_name}_feat_exty_pipe")

                                                                                

## Обработка тестовых данных

In [44]:
from pyspark.ml.pipeline import PipelineModel

test_df = spark.read.csv("./titanic/test.csv", inferSchema=True, header=True)

feat_ext_pipe_loaded = PipelineModel.load(f"{app_name}_feat_exty_pipe")

prep_test_df = feat_ext_pipe_loaded.transform(test_df)
prep_test_df.show(5)

+-----------+------+--------------------+------+----+-----+-----+-------+-------+-----+--------+--------+-----------+-------------+----------+---------+--------------------+-----------+----------+--------------------+--------------------+
|PassengerId|Pclass|                Name|   Sex| Age|SibSp|Parch| Ticket|   Fare|Cabin|Embarked|SexIndex|PclassIndex|PclassEncoded|AgeImputed|AgeVector|           AgeScaled|FareImputed|FareVector|          FareScaled|            Features|
+-----------+------+--------------------+------+----+-----+-----+-------+-------+-----+--------+--------+-----------+-------------+----------+---------+--------------------+-----------+----------+--------------------+--------------------+
|        892|     3|    Kelly, Mr. James|  male|34.5|    0|    0| 330911| 7.8292| NULL|       Q|     0.0|        0.0|(2,[0],[1.0])|      34.5|   [34.5]|[0.4282483035938678]|     7.8292|  [7.8292]|[0.3388647951454714]|[0.0,1.0,0.0,0.42...|
|        893|     3|Wilkes, Mrs. Jame...|fem

In [46]:
feat_df.head()[-1]

DenseVector([0.0, 1.0, 0.0, 0.2712, 0.3138])