##Установка Spark в GoogleColab

In [None]:
#Скачиваем архив и извлекаем содержимое
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://archive.apache.org/dist/spark/spark-2.4.6/spark-2.4.6-bin-hadoop2.7.tgz
!tar xf spark-2.4.6-bin-hadoop2.7.tgz

In [None]:
#Определеяем системные переменные
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.4.6-bin-hadoop2.7"

In [None]:
#установим findspark, который сделает за нас остальную работу по инициализации
!pip install findspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting findspark
  Downloading findspark-2.0.1-py2.py3-none-any.whl (4.4 kB)
Installing collected packages: findspark
Successfully installed findspark-2.0.1


In [None]:
#Инициализируем PySpark
import findspark 
findspark.init('spark-2.4.6-bin-hadoop2.7')
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()

##Pipeline

In [199]:
#Импорт необходимых библиотек
from pyspark.ml.feature import VectorAssembler, StringIndexer, VectorIndexer
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.classification import DecisionTreeClassifier,\
                                      RandomForestClassifier, \
                                      LogisticRegression
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml import Pipeline
from pyspark.sql.types import StructType, StructField, IntegerType, StringType

In [None]:
#Загрузка датасета
df = spark.read.option("header",True).csv('train.csv')

#Приведение типов
df = df.withColumn('Pclass', df.Pclass.cast(IntegerType()))\
       .withColumn('Age', df.Age.cast(IntegerType()))\
       .withColumn('SibSp', df.SibSp.cast(IntegerType()))\
       .withColumn('Parch', df.Parch.cast(IntegerType()))\
       .withColumn('Fare', df.Fare.cast(IntegerType()))\
       .withColumn('Survived', df.Survived.cast(IntegerType()))

#Индексация номинативных переменных
indexer_sex = StringIndexer(inputCol='Sex', outputCol="sex_index").fit(df)
indexer_embarked = StringIndexer(inputCol='Embarked', outputCol="embarked_index").fit(df)

#Фичи
feature = VectorAssembler(
        inputCols=[ 'Pclass',
                    'sex_index',
                    'Age',
                    'SibSp',
                    'Parch',
                    'Fare',
                    'embarked_index'],
                  outputCol='features')

#В учебном проекте принебрегаем строками с пропущенными значениями
df = df.dropna()

#Разделение на обучающую и тестовую выборки
(trainingData, testData) = df.randomSplit([0.8, 0.2])

In [200]:
# Инициализация классификаторов
dt = DecisionTreeClassifier(labelCol="Survived", featuresCol="features")
rf = RandomForestClassifier(labelCol="Survived", featuresCol="features")
lr = LogisticRegression(labelCol="Survived", featuresCol= "features")


#Инициализация пайплайнов 
pipeline_dt = Pipeline(stages=[indexer_sex, indexer_embarked, feature, dt])
pipeline_rf = Pipeline(stages=[indexer_sex, indexer_embarked, feature, rf])
pipeline_lr = Pipeline(stages=[indexer_sex, indexer_embarked, feature, lr])


#Инициализация сеток для кросс-валидации
#Дерево решений
dt_paramGrid = ParamGridBuilder() \
        .addGrid(dt.maxDepth, [3, 4]) \
        .addGrid(dt.maxBins, [6, 12]) \
        .addGrid(dt.minInfoGain, [0.05, 0.1]) \
        .build()
#Случайный лес
rf_paramGrid = ParamGridBuilder() \
        .addGrid(rf.maxDepth, [2, 3])\
        .addGrid(rf.maxBins, [4, 5])\
        .addGrid(rf.minInfoGain, [0.1, 0.15]) \
        .build()
#Полиномиальная регрессия       
lr_paramGrid = ParamGridBuilder()\
        .addGrid(lr.maxIter, [5,10,20])\
        .addGrid(lr.regParam, [0.2, 0.3, 0.4])\
        .addGrid(lr.elasticNetParam, [0.7, 0.8, 0.9])\
        .build()

#Оценщик
evaluator = MulticlassClassificationEvaluator(
        labelCol="Survived",
        predictionCol="prediction",
        metricName="f1")

#Инициализация объектов кросс-валидации
dt_cv = CrossValidator(
        estimator=pipeline_dt,
        estimatorParamMaps=dt_paramGrid,
        evaluator=evaluator, 
        parallelism=2)

rf_cv = CrossValidator(
        estimator=pipeline_rf,
        estimatorParamMaps=rf_paramGrid,
        evaluator=evaluator,
        parallelism=2)

lr_cv = CrossValidator(
        estimator=pipeline_lr,
        estimatorParamMaps=lr_paramGrid,
        evaluator=evaluator,
        parallelism=2)

In [None]:
#Обучение моделей
dt_model = dt_cv.fit(trainingData).bestModel
rf_model = rf_cv.fit(trainingData).bestModel
lr_model = lr_cv.fit(trainingData).bestModel

In [None]:
# Предсказания моделей.
dt_prediction = dt_model.transform(testData)
rf_prediction = rf_model.transform(testData)
lr_prediction = lr_model.transform(testData)

In [None]:
#Оценка моделей
f1_measure = {dt_model: evaluator.evaluate(dt_prediction),
              rf_model: evaluator.evaluate(rf_prediction),
              lr_model: evaluator.evaluate(lr_prediction)}

In [209]:
#Лучшая модель по метрике f1
best_model = max(f1_measure, key=f1_measure.get)

print(f'Пайплайн лучшей модели {best_model}, результат f1: {f1_measure[best_model]}')


#сохраняем модель
#best_model.write().overwrite().save('res')

Пайплайн лучшей модели PipelineModel_a850bbd0fa13, результат f1: 0.7057519057519058
