## Yandex DataSphere Delivery example

**Задача**: доставить Training Job на DataSphere и запустить там.

#### План

* Создать локально TrainingJob со списком task'ов и всей необходимой информацией о датасете

* Запустить локально тем образом, каким будем запускать в DataSphere

* Запустить в DataSphere с помощью Docker, как описано в training_grounds/Delivery.md

### Загрузим бандл на YandexStorage

In [1]:
from sklearn import datasets
import pandas as pd
from tg.common.ml import batched_training as bt


In [2]:
project_name = 'testirisproject'
dataset_name = 'irisdataset'
bucket = 'testirisbucket'


In [3]:
import os
from pathlib import Path
from tg.grammar_ru.common.loc import Loc
from dotenv import load_dotenv
load_dotenv(Loc.root_path / 'environment.env')


True

#### Создадим бандл

In [4]:
from tg.grammar_ru.common.loc import Loc


def get_multilabel_classification_bundle():
    ds = datasets.load_iris()
    features = pd.DataFrame(ds['data'], columns=ds['feature_names'])
    df = pd.DataFrame(ds['target_names'][ds['target']], columns=['label'])
    df['split'] = bt.train_display_test_split(df, 0.2, 0.2, 'label')
    bundle = bt.DataBundle(index=df, features=features)
    return bundle


bundle_temp_folder = Loc.temp_path / 'temp_bundle'
bundle = get_multilabel_classification_bundle()
bundle.save(bundle_temp_folder)


##### Создадим бакет

In [5]:
from tg.grammar_ru.ml.components.yandex_storage.s3_yandex_helpers import S3YandexHandler

try:
    S3YandexHandler.create_bucket(bucket)
except:
    pass  # TODO удалять бакет перед созданием?


##### Загрузим бандл в бакет

In [6]:
s3path = f'datasphere/{project_name}/datasets/{dataset_name}'
S3YandexHandler.upload_folder(bucket, s3path, bundle_temp_folder)


### Создадим task - TaskFactory


copy-paste from TrainingTaskDemo

In [7]:
from tg.grammar_ru.ml.components.training_task_factory import TaskFactory, Conventions
from tg.common.ml import dft


def get_feature_extractor():
    feature_extractor = (bt.PlainExtractor
                         .build('features')
                         .index('features')
                         .apply(transformer=dft.DataFrameTransformerFactory.default_factory())
                         )
    return feature_extractor


def get_multilabel_extractor():
    label_extractor = (bt.PlainExtractor
                       .build(Conventions.LabelFrame)
                       .index()
                       .apply(take_columns=['label'], transformer=dft.DataFrameTransformerFactory.default_factory())
                       )
    return label_extractor


In [8]:
import torch


class ClassificationNetwork(torch.nn.Module):
    def __init__(self, hidden_size, sample):
        super(ClassificationNetwork, self).__init__()
        self.hidden = torch.nn.Linear(sample['features'].shape[1], hidden_size)
        self.output = torch.nn.Linear(hidden_size, sample['label'].shape[1])

    def forward(self, input):
        X = input['features']
        X = torch.tensor(X.astype(float).values).float()
        X = self.hidden(X)
        X = torch.sigmoid(X)
        X = self.output(X)
        X = torch.sigmoid(X)
        return X


In [9]:
from sklearn.metrics import roc_auc_score
from tg.common import Logger
from yo_fluq_ds import *

Logger.disable()


class MulticlassMetrics(bt.Metric):
    def __init__(self, add_accuracy=True, add_rating=False):
        self.add_accuracy = add_accuracy
        self.add_rating = add_rating

    def get_names(self):
        result = []
        if self.add_accuracy:
            result.append('accuracy')
        if self.add_rating:
            result.append('rating')
        return result

    def measure(self, df, _):
        prefix = 'true_label_'
        labels = []
        for c in df.columns:
            if c.startswith(prefix):
                labels.append(c.replace(prefix, ''))

        def ustack(df, prefix, cols, name):
            df = df[[prefix+c for c in cols]]
            df.columns = [c for c in cols]
            df = df.unstack().to_frame(name)
            return df

        predicted = ustack(df, 'predicted_label_', labels, 'predicted')
        true = ustack(df, 'true_label_', labels, 'true')
        df = predicted.merge(true, left_index=True,
                             right_index=True).reset_index()
        df.columns = ['label', 'sample', 'predicted', 'true']
        df = df.feed(fluq.add_ordering_column(
            'sample', ('predicted', False), 'predicted_rating'))

        match = (df.loc[df.predicted_rating ==
                 0].set_index('sample').true > 0.5)
        rating = df.loc[df.true > 0.5].set_index('sample').predicted_rating
        result = []
        if self.add_accuracy:
            result.append(match.mean())
        if self.add_rating:
            result.append(rating.mean())
        return result


def _inner(x, sample):
    return ClassificationNetwork(20, sample)


class ClassificationTask(TaskFactory):
    def create_task(self, data, env):
        metrics = bt.MetricPool().add(MulticlassMetrics())
        self.instantiate_default_task(
            epoch_count=20, batch_size=10000, mini_batch_size=None, metric_pool=metrics)
        self.setup_batcher(
            data, [get_feature_extractor(), get_multilabel_extractor()])
        self.setup_model(_inner, learning_rate=1)


task = ClassificationTask()
task.info['dataset'] = dataset_name
task.info['name'] = 'classification_iris_task'


**Промежуточный результат**: создали task. Обернем его в TrainingJob (~ DeliverableJob).

Реализован класс TrainingJob.

Принимает список task'ов, название проекта и название бандла.

Для каждой таски

* Загружает бандл из ObjectStorage

* Запускает task

* Получает output task'а - модель. Архивирует её и отправляет в ObjectStorage.

* Exception'ы, возникшие при запуске task'а записывает в ObjectStorage.

In [10]:
from tg.grammar_ru.ml.components.yandex_delivery.training_job import TrainingJob


job = TrainingJob(tasks=[task],
                  project_name=project_name,
                  bucket=bucket)


In [11]:
job.run()


Exception in classification_iris_task 11:22:15.550172 loaded to DataSphere


#### Структура ObjectStorage после обучения

* datasphere/project_name
    * datasets/dataset_name
        * file1_of_bundle .parquet
        * file2_of_bundle .parquet
    * output/task_name
        * output/model.tar.gz
    * exceptions
        * task_name_time1.txt
        * task_name_time2.txt