In [1]:
!pip install findspark

Defaulting to user installation because normal site-packages is not writeable


## Пререквизиты

Код в этом ноутбуке будет работать с MLFlow Tracking Server, который запущен на удаленной виртуальной машине. Кроме того, артефакты моделирования сохранятся в S3-бакет в YC. Поэтому **прежде чем выполнять ячейки с кодом**, убедитесь, что установлены следующие переменные окружения:

```bash
MLFLOW_S3_ENDPOINT_URL=https://storage.yandexcloud.net/
MLFLOW_TRACKING_URI=http://<ip вашей виртуалки с MLFlow>:8000
AWS_ACCESS_KEY_ID=<id вашего ключа>
AWS_SECRET_ACCESS_KEY=<ваш секретный ключ>
```

Установить переменные я рекомендую так:

1. Создаете файл (в той же директории, откуда запускаете `jupyter`) `.env`.
2. Записываете в файл `env` следующее содержание:
   
```bash
MLFLOW_S3_ENDPOINT_URL=https://storage.yandexcloud.net/
MLFLOW_TRACKING_URI=http://<ip вашей виртуалки с MLFlow>:8000
AWS_ACCESS_KEY_ID=<id вашего ключа>
AWS_SECRET_ACCESS_KEY=<ваш секретный ключ>
```
3. Устанавливаете пакет `python-dotenv`

```bash
pip install python-dotenv`
```

4. Выполняете следующую ячейку с кодом:

In [1]:
%load_ext dotenv
%dotenv

## Инициализируем spark

In [2]:
import os

import findspark
findspark.init()

from pyspark.sql import SparkSession

In [3]:
os.environ['PYSPARK_PYTHON'] = './venv/bin/python'
spark = SparkSession\
    .builder\
    .master('local[*]')\
    .appName('Spark ML Research')\
    .config('spark.sql.repl.eagerEval.enabled', True) \
    .config("spark.hadoop.fs.s3a.access.key", os.environ['AWS_ACCESS_KEY_ID']) \
    .config("spark.hadoop.fs.s3a.secret.key", os.environ['AWS_SECRET_ACCESS_KEY']) \
    .config('spark.hadoop.fs.s3.impl', 'org.apache.hadoop.fs.s3a.S3AFileSystem') \
    .config('spark.yarn.dist.archives', 's3a://pyspark-venvs/mlflow-hyperopt-dataproc-2.1.18.tar.gz#venv')\
    .getOrCreate()

SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/usr/lib/spark/jars/log4j-slf4j-impl-2.17.2.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/usr/lib/hadoop/lib/slf4j-log4j12-1.7.30.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.apache.logging.slf4j.Log4jLoggerFactory]
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


24/12/11 15:39:20 WARN S3ABlockOutputStream: Application invoked the Syncable API against stream writing to dataproc/hadoop/var/log/spark/apps/local-1733931557916.inprogress. This is unsupported


## Считываем данные 

In [4]:
df = spark.read.csv(
    's3a://mlops204-dataproc-bucket/data/titanic/train.csv', 
    header=True, 
    inferSchema=True
)

df = df.select([
    'Survived',
    'Pclass',
    'Sex',
    'Age',
    'SibSp',
    'Parch',
    'Fare',
    'Embarked'
]).na.drop()

df.limit(10)

                                                                                

Survived,Pclass,Sex,Age,SibSp,Parch,Fare,Embarked
0,3,male,22.0,1,0,7.25,S
1,1,female,38.0,1,0,71.2833,C
1,3,female,26.0,0,0,7.925,S
1,1,female,35.0,1,0,53.1,S
0,3,male,35.0,0,0,8.05,S
0,1,male,54.0,0,0,51.8625,S
0,3,male,2.0,3,1,21.075,S
1,3,female,27.0,0,2,11.1333,S
1,2,female,14.0,1,0,30.0708,C
1,3,female,4.0,1,1,16.7,S


## Конструируем пайплайн обработки данных

In [5]:
from pyspark.ml.feature import VectorAssembler, OneHotEncoder, StringIndexer
from pyspark.ml import Pipeline

In [6]:
gender_index = StringIndexer(inputCol='Sex', outputCol='SexIndex')
gender_encoder = OneHotEncoder(inputCol='SexIndex', outputCol='SexVector')

embark_indexer = StringIndexer(inputCol='Embarked', outputCol='EmbarkedIndex')
embark_encoder = OneHotEncoder(inputCol='EmbarkedIndex', outputCol='EmbarkVector')

assembler = VectorAssembler(
    inputCols=[
        'Pclass',
        'SexVector',
        'Age',
        'SibSp',
        'Parch',
        'Fare',
        'EmbarkVector'
    ],
    outputCol='Features'
)

dataproc = Pipeline(stages=[
    gender_index,
    embark_indexer,
    gender_encoder,
    embark_encoder,
    assembler
])

In [7]:
ready_data = dataproc.fit(df).transform(df)
ready_data.limit(10)

                                                                                

Survived,Pclass,Sex,Age,SibSp,Parch,Fare,Embarked,SexIndex,EmbarkedIndex,SexVector,EmbarkVector,Features
0,3,male,22.0,1,0,7.25,S,0.0,0.0,"(1,[0],[1.0])","(2,[0],[1.0])","[3.0,1.0,22.0,1.0..."
1,1,female,38.0,1,0,71.2833,C,1.0,1.0,"(1,[],[])","(2,[1],[1.0])","[1.0,0.0,38.0,1.0..."
1,3,female,26.0,0,0,7.925,S,1.0,0.0,"(1,[],[])","(2,[0],[1.0])","(8,[0,2,5,6],[3.0..."
1,1,female,35.0,1,0,53.1,S,1.0,0.0,"(1,[],[])","(2,[0],[1.0])","[1.0,0.0,35.0,1.0..."
0,3,male,35.0,0,0,8.05,S,0.0,0.0,"(1,[0],[1.0])","(2,[0],[1.0])","[3.0,1.0,35.0,0.0..."
0,1,male,54.0,0,0,51.8625,S,0.0,0.0,"(1,[0],[1.0])","(2,[0],[1.0])","[1.0,1.0,54.0,0.0..."
0,3,male,2.0,3,1,21.075,S,0.0,0.0,"(1,[0],[1.0])","(2,[0],[1.0])","[3.0,1.0,2.0,3.0,..."
1,3,female,27.0,0,2,11.1333,S,1.0,0.0,"(1,[],[])","(2,[0],[1.0])","[3.0,0.0,27.0,0.0..."
1,2,female,14.0,1,0,30.0708,C,1.0,1.0,"(1,[],[])","(2,[1],[1.0])","[2.0,0.0,14.0,1.0..."
1,3,female,4.0,1,1,16.7,S,1.0,0.0,"(1,[],[])","(2,[0],[1.0])","[3.0,0.0,4.0,1.0,..."


## Подбираем гиперпараметры и тренируем модель

In [8]:
from functools import partial

import mlflow
from hyperopt import fmin, tpe, hp, STATUS_OK, Trials, SparkTrials, Trials

from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator

import logging
logger = logging.getLogger("mlflow")
# Set log level to debugging
logger.setLevel(logging.WARN)

In [9]:
# Определяем пространство поиска для hyperopt
search_space = {
    'regParam': hp.lognormal('regParam', 0, 1.0),
    'fitIntercept': hp.choice('fitIntercept', [False, True])
}

def objective(params, train_data, test_data):
    print(params)

    lr = LogisticRegression()\
        .setMaxIter(1000)\
        .setRegParam(params['regParam'])\
        .setFeaturesCol('Features')\
        .setLabelCol('Survived')

    evaluator = BinaryClassificationEvaluator()\
            .setLabelCol('Survived')

    lg_model = lr.fit(train_data)

    auc = evaluator.evaluate(lg_model.transform(test_data))

    with mlflow.start_run():
        mlflow.log_params(params)
        mlflow.log_metric('auc', auc)
    
    return {'loss': -auc, 'status': STATUS_OK}

In [10]:
train_data, test_data = ready_data.randomSplit([.7, .3])

trials = Trials()

mlflow.set_experiment('classification')

best = fmin(
    fn=partial(
        objective, 
        train_data=train_data,
        test_data=test_data
    ),
    space=search_space,
    algo=tpe.suggest,
    max_evals=10,
    trials=trials
)

{'fitIntercept': True, 'regParam': 1.255119385583293}                                                                                                                                                                                         
  0%|                                                                                                                                                                                                  | 0/10 [00:00<?, ?trial/s, best loss=?]

                                                                                

24/12/11 15:39:42 WARN InstanceBuilder$NativeBLAS: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS
24/12/11 15:39:42 WARN InstanceBuilder$NativeBLAS: Failed to load implementation from:dev.ludovic.netlib.blas.ForeignLinkerBLAS
{'fitIntercept': True, 'regParam': 0.48254977467864435}                                                                                                                                                                                       
{'fitIntercept': False, 'regParam': 0.685966186664396}                                                                                                                                                                                        
{'fitIntercept': True, 'regParam': 0.19815190356438855}                                                                                                                                                                                       
{'fitIntercept': True, 'regParam': 0.