<a href="https://colab.research.google.com/github/vloneonme/trew/blob/main/mobd_lr_sparkml.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

<h>Spark ML <h>

In [1]:
! pip install pyspark



In [2]:
from pyspark.sql import SparkSession
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer # Выполнение энкодинга
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.evaluation import MulticlassClassificationEvaluator





In [3]:
spark = SparkSession.builder.appName("PySparkTitanikJob").getOrCreate()

#Создаём Spark сессию

In [4]:
spark

In [6]:

#С помощью метода spark.read.parquet прочитайте датасет в переменную titanic_df
titanic_df = spark.read.parquet('train.parquet')

In [7]:

#Выведите значения датафрейма с помощью метода show()
titanic_df.show()

# Мы будем классифицировать, погибнет или выживет пассажир Титаника
# Целевой признак - Survived


+--------+------+------+----+-----+-----+-------+--------+-----------+-----+
|Survived|Pclass|   Sex| Age|SibSp|Parch|   Fare|Embarked|Family_Size|Alone|
+--------+------+------+----+-----+-----+-------+--------+-----------+-----+
|       0|     3|  male|22.0|    1|    0|   7.25|       S|          1|    0|
|       1|     1|female|38.0|    1|    0|71.2833|       C|          1|    0|
|       1|     3|female|26.0|    0|    0|  7.925|       S|          0|    1|
|       1|     1|female|35.0|    1|    0|   53.1|       S|          1|    0|
|       0|     3|  male|35.0|    0|    0|   8.05|       S|          0|    1|
|       0|     3|  male|30.0|    0|    0| 8.4583|       Q|          0|    1|
|       0|     1|  male|54.0|    0|    0|51.8625|       S|          0|    1|
|       0|     3|  male| 2.0|    3|    1| 21.075|       S|          4|    0|
|       1|     3|female|27.0|    0|    2|11.1333|       S|          2|    0|
|       1|     2|female|14.0|    1|    0|30.0708|       C|          1|    0|

In [8]:
# Поработаем с категориальными признаками.

#Функция StringIndexer индексирует (энкодит) строки
#Концепция, лежащая в основе индексирования строк, очень интуитивно понятна.
#Мы просто заменяем каждую категорию номером.

sex_index = StringIndexer(inputCol='Sex', outputCol="Sex_index")  #StringIndexer - это Estimator, который нам формирует Transformer для преобразования данных
embarked_index = StringIndexer(inputCol='Embarked', outputCol="Embarked_index")

In [9]:
# теперь применим новые столбцы к нашему датафрейму. Сначала обращаемся ним и выполняем операцию fit на нашем датафрейме.
#После чего мы получим функцию трансформер и сможем вызвать ф-ию transform для того, чтобы выполнить преобразование наших данных

titanic_df = sex_index.fit(titanic_df).transform(titanic_df)

In [10]:
# Сделайте преобразование для embarked_index
titanic_df = embarked_index.fit(titanic_df).transform(titanic_df)


In [11]:
titanic_df.show()

+--------+------+------+----+-----+-----+-------+--------+-----------+-----+---------+--------------+
|Survived|Pclass|   Sex| Age|SibSp|Parch|   Fare|Embarked|Family_Size|Alone|Sex_index|Embarked_index|
+--------+------+------+----+-----+-----+-------+--------+-----------+-----+---------+--------------+
|       0|     3|  male|22.0|    1|    0|   7.25|       S|          1|    0|      0.0|           0.0|
|       1|     1|female|38.0|    1|    0|71.2833|       C|          1|    0|      1.0|           1.0|
|       1|     3|female|26.0|    0|    0|  7.925|       S|          0|    1|      1.0|           0.0|
|       1|     1|female|35.0|    1|    0|   53.1|       S|          1|    0|      1.0|           0.0|
|       0|     3|  male|35.0|    0|    0|   8.05|       S|          0|    1|      0.0|           0.0|
|       0|     3|  male|30.0|    0|    0| 8.4583|       Q|          0|    1|      0.0|           2.0|
|       0|     1|  male|54.0|    0|    0|51.8625|       S|          0|    1|      

In [12]:
# Наш dataframe готов к тому, чтобы применить к нему модель. Но мы хотим сначала выполнить векторизацию, чтобы данные можно было применять в моделе.
# Формируем список признаков, которые мы будем использовать, как фичи.
features = ['Pclass', 'Age', 'SibSp', 'SibSp', 'Parch', 'Fare', 'Alone', 'Sex_index', 'Embarked_index']

In [13]:
feature = VectorAssembler(inputCols=features, outputCol="features")
#VectorAssembler объединяет заданный список столбцов в один векторный столбец.
# Является, пожалуй, самым важным векторным преобразователем в PySpark,
# поскольку модели машинного обучения требуют на вход векторы.

# VectorAssembler - трансформер, соответственно, вызываем метод transform  и применяем его к нашему датафрейму.
feature_vector= feature.transform(titanic_df)

In [14]:
feature_vector.show(5)

+--------+------+------+----+-----+-----+-------+--------+-----------+-----+---------+--------------+--------------------+
|Survived|Pclass|   Sex| Age|SibSp|Parch|   Fare|Embarked|Family_Size|Alone|Sex_index|Embarked_index|            features|
+--------+------+------+----+-----+-----+-------+--------+-----------+-----+---------+--------------+--------------------+
|       0|     3|  male|22.0|    1|    0|   7.25|       S|          1|    0|      0.0|           0.0|[3.0,22.0,1.0,1.0...|
|       1|     1|female|38.0|    1|    0|71.2833|       C|          1|    0|      1.0|           1.0|[1.0,38.0,1.0,1.0...|
|       1|     3|female|26.0|    0|    0|  7.925|       S|          0|    1|      1.0|           0.0|[3.0,26.0,0.0,0.0...|
|       1|     1|female|35.0|    1|    0|   53.1|       S|          1|    0|      1.0|           0.0|[1.0,35.0,1.0,1.0...|
|       0|     3|  male|35.0|    0|    0|   8.05|       S|          0|    1|      0.0|           0.0|(9,[0,1,5,6],[3.0...|
+--------+------

In [15]:
(training_data, test_data) = feature_vector.randomSplit([0.8, 0.2],seed = 42)
#Формируем тренировочный и тестовый датасет. 80% данных в тренировочный, остальные - в тестовый. Seed - параметр случайности


In [16]:
training_data.show(5)

+--------+------+------+----+-----+-----+------+--------+-----------+-----+---------+--------------+--------------------+
|Survived|Pclass|   Sex| Age|SibSp|Parch|  Fare|Embarked|Family_Size|Alone|Sex_index|Embarked_index|            features|
+--------+------+------+----+-----+-----+------+--------+-----------+-----+---------+--------------+--------------------+
|       0|     1|female| 2.0|    1|    2|151.55|       S|          3|    0|      1.0|           0.0|[1.0,2.0,1.0,1.0,...|
|       0|     1|female|25.0|    1|    2|151.55|       S|          3|    0|      1.0|           0.0|[1.0,25.0,1.0,1.0...|
|       0|     1|  male|18.0|    1|    0| 108.9|       C|          1|    0|      0.0|           1.0|[1.0,18.0,1.0,1.0...|
|       0|     1|  male|19.0|    1|    0|  53.1|       S|          1|    0|      0.0|           0.0|[1.0,19.0,1.0,1.0...|
|       0|     1|  male|19.0|    3|    2| 263.0|       S|          5|    0|      0.0|           0.0|[1.0,19.0,3.0,3.0...|
+--------+------+------+

# ML models

# LogisticRegression

In [17]:
evaluator = MulticlassClassificationEvaluator(labelCol="Survived", predictionCol="prediction", metricName="accuracy")

#Сначала создаем объект evaluator. Указываем целевую колонку обучения модели - labelCol. далее указываем
# имя колонки, где будет лежать предсказанное значение  - predictionCol. И указываем метрику для оценки качества модели.

In [18]:
from pyspark.ml.classification import LogisticRegression
lr = LogisticRegression(labelCol="Survived", featuresCol="features") # labelCol="Survived" - целевая фича,  featuresCol="features" - фичи, которые
#используются для предсказания занчения целевой колонки

lrModel = lr.fit(training_data) # получаем обученную модель.

lr_prediction = lrModel.transform(test_data) # применяем модель на данных, получаем предсказание
lr_prediction.select("prediction", "Survived", "features").show(5)

+----------+--------+--------------------+
|prediction|Survived|            features|
+----------+--------+--------------------+
|       1.0|       0|[1.0,50.0,0.0,0.0...|
|       1.0|       0|(9,[0,1,4,5],[1.0...|
|       1.0|       0|[1.0,24.0,0.0,0.0...|
|       0.0|       0|(9,[0,1,5,6],[1.0...|
|       0.0|       0|(9,[0,1,5,6],[1.0...|
+----------+--------+--------------------+
only showing top 5 rows


In [19]:
# Применяем оценщик (evaluator), чтобы узнать точность модели.
lr_accuracy = evaluator.evaluate(lr_prediction)
print("LogisticRegression [Accuracy] = %g"% (lr_accuracy))
print("LogisticRegression [Error] = %g " % (1.0 - lr_accuracy))

LogisticRegression [Accuracy] = 0.813793
LogisticRegression [Error] = 0.186207 


# DecisionTreeClassifier

In [20]:
from pyspark.ml.classification import DecisionTreeClassifier
dt = DecisionTreeClassifier(labelCol="Survived", featuresCol="features")
dt_model = dt.fit(training_data)
dt_prediction = dt_model.transform(test_data)

dt_prediction.select("prediction", "Survived", "features").show(5)

+----------+--------+--------------------+
|prediction|Survived|            features|
+----------+--------+--------------------+
|       1.0|       0|[1.0,50.0,0.0,0.0...|
|       0.0|       0|(9,[0,1,4,5],[1.0...|
|       0.0|       0|[1.0,24.0,0.0,0.0...|
|       0.0|       0|(9,[0,1,5,6],[1.0...|
|       0.0|       0|(9,[0,1,5,6],[1.0...|
+----------+--------+--------------------+
only showing top 5 rows


In [21]:
dt_accuracy = evaluator.evaluate(dt_prediction)
print("DecisionTreeClassifier [Accuracy] = %g"% (dt_accuracy))
print("DecisionTreeClassifier [Error] = %g " % (1.0 - dt_accuracy))

DecisionTreeClassifier [Accuracy] = 0.82069
DecisionTreeClassifier [Error] = 0.17931 


# RandomForestClassifier

Реализуйте самостоятельно и оцените точность, как в примерах выше

In [23]:
#Реализуйте самостоятельно и оцените точность, как в примерах выше
from pyspark.ml.classification import RandomForestClassifier
rf = RandomForestClassifier(labelCol="Survived", featuresCol="features")
rf_model = rf.fit(training_data)
rf_prediction = rf_model.transform(test_data)
rf_prediction.select("prediction", "Survived", "features").show(5)
#выведите accuracy модели
rf_accuracy = evaluator.evaluate(rf_prediction)
print("RandomForestClassifier [Accuracy] = %g"% (rf_accuracy))
print("RandomForestClassifier [Error] = %g " % (1.0 - rf_accuracy))

+----------+--------+--------------------+
|prediction|Survived|            features|
+----------+--------+--------------------+
|       1.0|       0|[1.0,50.0,0.0,0.0...|
|       0.0|       0|(9,[0,1,4,5],[1.0...|
|       0.0|       0|[1.0,24.0,0.0,0.0...|
|       0.0|       0|(9,[0,1,5,6],[1.0...|
|       0.0|       0|(9,[0,1,5,6],[1.0...|
+----------+--------+--------------------+
only showing top 5 rows
RandomForestClassifier [Accuracy] = 0.813793
RandomForestClassifier [Error] = 0.186207 


# Gradient-boosted tree classifier

Реализуйте самостоятельно и оцените точность, как в примерах выше

In [24]:
#Реализуйте самостоятельно и оцените точность, как в примерах выше
from pyspark.ml.classification import GBTClassifier
#выведите accuracy модели
qwe = GBTClassifier(labelCol="Survived", featuresCol="features")
qwe_model = qwe.fit(training_data)
qwe_prediction = qwe_model.transform(test_data)
qwe_prediction.select("prediction", "Survived", "features").show(5)
qwe_accuracy = evaluator.evaluate(qwe_prediction)
print("GBTClassifier [Accuracy] = %g"% (qwe_accuracy))
print("GBTClassifier [Error] = %g " % (1.0 - qwe_accuracy))

+----------+--------+--------------------+
|prediction|Survived|            features|
+----------+--------+--------------------+
|       1.0|       0|[1.0,50.0,0.0,0.0...|
|       0.0|       0|(9,[0,1,4,5],[1.0...|
|       0.0|       0|[1.0,24.0,0.0,0.0...|
|       1.0|       0|(9,[0,1,5,6],[1.0...|
|       1.0|       0|(9,[0,1,5,6],[1.0...|
+----------+--------+--------------------+
only showing top 5 rows
GBTClassifier [Accuracy] = 0.862069
GBTClassifier [Error] = 0.137931 


# Save & Load Model

In [25]:
# обученную модель мы можем сохранять. Сохраним модель RandomForest.
rf_model.write().overwrite().save('rf_model')

In [26]:
# Теперь модель загрузим. Для этого сначала модключим нужный класс
from pyspark.ml.classification import RandomForestClassificationModel
type(RandomForestClassificationModel.load('rf_model'))

# Pipeline

In [27]:
from pyspark.ml.pipeline import PipelineModel

# pipeline упрощает процесс внедрения ваших моделей в нужное окружение.

# Перед тем, как модель обучить, мы уже выполнили какие-то преобразования данных.
# Когда мы модель внедряем, нам нужно внедрять и все преобразования над данными.
# Соответственно, появляется место для ошибок (человеческий фактор). Какую-то предобработку могут забыть сделать или сделать неправильно.
# Пайплайны позволяют собрать вашу модель вместе с трансформациями.

In [28]:
titanic_df = spark.read.parquet('train.parquet')

In [29]:
titanic_df.show()

+--------+------+------+----+-----+-----+-------+--------+-----------+-----+
|Survived|Pclass|   Sex| Age|SibSp|Parch|   Fare|Embarked|Family_Size|Alone|
+--------+------+------+----+-----+-----+-------+--------+-----------+-----+
|       0|     3|  male|22.0|    1|    0|   7.25|       S|          1|    0|
|       1|     1|female|38.0|    1|    0|71.2833|       C|          1|    0|
|       1|     3|female|26.0|    0|    0|  7.925|       S|          0|    1|
|       1|     1|female|35.0|    1|    0|   53.1|       S|          1|    0|
|       0|     3|  male|35.0|    0|    0|   8.05|       S|          0|    1|
|       0|     3|  male|30.0|    0|    0| 8.4583|       Q|          0|    1|
|       0|     1|  male|54.0|    0|    0|51.8625|       S|          0|    1|
|       0|     3|  male| 2.0|    3|    1| 21.075|       S|          4|    0|
|       1|     3|female|27.0|    0|    2|11.1333|       S|          2|    0|
|       1|     2|female|14.0|    1|    0|30.0708|       C|          1|    0|

In [30]:
train, test = titanic_df.randomSplit([0.8, 0.2])

In [31]:
train.show(5)

+--------+------+------+----+-----+-----+-------+--------+-----------+-----+
|Survived|Pclass|   Sex| Age|SibSp|Parch|   Fare|Embarked|Family_Size|Alone|
+--------+------+------+----+-----+-----+-------+--------+-----------+-----+
|       0|     1|female| 2.0|    1|    2| 151.55|       S|          3|    0|
|       0|     1|female|50.0|    0|    0|28.7125|       C|          0|    1|
|       0|     1|  male|18.0|    1|    0|  108.9|       C|          1|    0|
|       0|     1|  male|19.0|    1|    0|   53.1|       S|          1|    0|
|       0|     1|  male|21.0|    0|    1|77.2875|       S|          1|    0|
+--------+------+------+----+-----+-----+-------+--------+-----------+-----+
only showing top 5 rows


In [32]:
indexer_sex = StringIndexer(inputCol="Sex", outputCol="Sex_index")

In [33]:
indexer_embarked = StringIndexer(inputCol="Embarked", outputCol="Embarked_index")

In [34]:
feature = VectorAssembler(
    inputCols=["Pclass","Age","SibSp","Parch","Fare","Family_Size","Embarked_index","Sex_index"],
    outputCol="features")


In [35]:
rf_classifier = RandomForestClassifier(labelCol="Survived", featuresCol="features")

In [36]:
pipeline = Pipeline(stages=[indexer_sex, indexer_embarked, feature, rf_classifier])
# Определились с моделью созддаём Pipeline. Указываем массив из трансформаций, которые необходимо выполнить.
# Собранный Pipeline является Estimator'ом, вызываем метод fit, из датафрейма (начального) в одно действие получается готовая модель.

In [37]:
p_model = pipeline.fit(train)

In [38]:
type(p_model)

In [39]:
p_model.write().overwrite().save('p_model')
# сохраним модель с перезаписью.

In [40]:
model = PipelineModel.load('p_model')
# Загрузим модель.

In [41]:
prediction = p_model.transform(test)
# применим модель, вызовем метод transrorm, применим его к выборке test

In [42]:
test.show(5)

+--------+------+------+----+-----+-----+------+--------+-----------+-----+
|Survived|Pclass|   Sex| Age|SibSp|Parch|  Fare|Embarked|Family_Size|Alone|
+--------+------+------+----+-----+-----+------+--------+-----------+-----+
|       0|     1|female|25.0|    1|    2|151.55|       S|          3|    0|
|       0|     1|  male|19.0|    3|    2| 263.0|       S|          5|    0|
|       0|     1|  male|29.0|    0|    0|  30.0|       S|          0|    1|
|       0|     1|  male|30.0|    0|    0|   0.0|       S|          0|    1|
|       0|     1|  male|30.0|    0|    0|  26.0|       S|          0|    1|
+--------+------+------+----+-----+-----+------+--------+-----------+-----+
only showing top 5 rows


In [43]:
prediction.select(["Pclass","Age","SibSp","Parch","Fare","Family_Size","Embarked_index","Sex_index","prediction"]).show(5)

+------+----+-----+-----+------+-----------+--------------+---------+----------+
|Pclass| Age|SibSp|Parch|  Fare|Family_Size|Embarked_index|Sex_index|prediction|
+------+----+-----+-----+------+-----------+--------------+---------+----------+
|     1|25.0|    1|    2|151.55|          3|           0.0|      1.0|       1.0|
|     1|19.0|    3|    2| 263.0|          5|           0.0|      0.0|       1.0|
|     1|29.0|    0|    0|  30.0|          0|           0.0|      0.0|       0.0|
|     1|30.0|    0|    0|   0.0|          0|           0.0|      0.0|       0.0|
|     1|30.0|    0|    0|  26.0|          0|           0.0|      0.0|       0.0|
+------+----+-----+-----+------+-----------+--------------+---------+----------+
only showing top 5 rows


In [44]:
evaluator = MulticlassClassificationEvaluator(labelCol="Survived", predictionCol="prediction", metricName="accuracy")

In [45]:
p_accuracy = evaluator.evaluate(prediction)
print("Pipeline model [Accuracy] = %g"% (p_accuracy))
print("Pipeline model [Error] = %g " % (1.0 - p_accuracy))

Pipeline model [Accuracy] = 0.823529
Pipeline model [Error] = 0.176471 


Сделайте самостоятельно Pipeline для Градиентного Бустинга, проверьте точность на тестовой выборке.

In [46]:
#Сделайте самостоятельно Pipeline для Градиентного Бустинга qwe выше, проверьте точность на тестовой выборке
from pyspark.ml.classification import GBTClassifier

pipeline_qwe = Pipeline(stages=[indexer_sex, indexer_embarked, feature, qwe])

p_model_qwe = pipeline_qwe.fit(train)

prediction_qwe = p_model_qwe.transform(test)

qwe_accuracy = evaluator.evaluate(prediction_qwe)
print("Pipeline GBT model [Accuracy] = %g"% (qwe_accuracy))
print("Pipeline GBT model [Error] = %g " % (1.0 - qwe_accuracy))


Pipeline GBT model [Accuracy] = 0.841176
Pipeline GBT model [Error] = 0.158824 
