In [1]:
import os
from pathlib import Path

import ray
import pandas as pd
import numpy as np

from ray.train.sklearn import SklearnTrainer
from ray.data import Dataset
from ray.data.preprocessors import BatchMapper, Chain
from ray.data.preprocessor import Preprocessor
from ray.air.config import ScalingConfig

from pprint import pprint

# Configuration

In [2]:
INPUT_DATA_PATH = '/Users/rgareev/data/openfoodfacts/wrk/20220831-dev/train.parquet'
LABEL_COLUMN = 'nova_group'

In [3]:
OUTPUT_MODEL_PATH = '/Users/rgareev/projects/mlops-openfoodfacts/wrk/trainings/20220831-dev/model'

In [4]:
ray.init()

2022-09-11 17:36:25,871	INFO worker.py:1509 -- Started a local Ray instance. View the dashboard at [1m[32m127.0.0.1:8265 [39m[22m


0,1
Python version:,3.9.12
Ray version:,2.0.0
Dashboard:,http://127.0.0.1:8265


In [5]:
ray.available_resources()

{'memory': 11824282010.0,
 'node:127.0.0.1': 1.0,
 'object_store_memory': 2147483648.0,
 'CPU': 8.0}

# Script
## Read data

In [6]:
ds = ray.data.read_parquet(INPUT_DATA_PATH)



In [7]:
ds.schema()

product_name: string
nova_group: int8
ingredients_list: list<item: string>
  child 0, item: string
code: string
-- schema metadata --
pandas: '{"index_columns": ["code"], "column_indexes": [{"name": null, "f' + 684

In [8]:
from ray.data.context import DatasetContext

ctx = DatasetContext.get_current()
ctx.enable_tensor_extension_casting = False

In [9]:
ds = ds.repartition(5)

Read: 100%|██████████████████████████████████████████████████████████████████████████████████████████████████████████████████| 1/1 [00:00<00:00,  2.39it/s]
Repartition: 100%|███████████████████████████████████████████████████████████████████████████████████████████████████████████| 5/5 [00:00<00:00, 17.94it/s]


## Convert to input format accepted by preprocessors / features generators

In [8]:
FEATURE_COLS = ['ingredients_list']

def to_model_input(df: pd.DataFrame) -> list:
    return df[FEATURE_COLS].to_dict(orient='records')



## Define featurizers

In [19]:
processed_ds = ds.map_batches(
            lambda df: df[FEATURES_MULTIHOT].to_dict(orient='records'),
            batch_size=None)

Map_Batches: 100%|███████████████████████████████████████████████████████████████████████████████████████████████████████████| 5/5 [00:01<00:00,  3.87it/s]


In [20]:
processed_ds.schema()

dict

## Train and tune model

In [12]:
# baseline
from sklearn.naive_bayes import BernoulliNB
from sklearn.pipeline import Pipeline
from sklearn.feature_extraction import DictVectorizer
from sklearn.preprocessing import FunctionTransformer

FEATURE_COLS = ['ingredients_list']

df_converter = FunctionTransformer(lambda X: X[FEATURE_COLS].to_dict(orient='records'))
ingredient_encoder = DictVectorizer()
nb_clf = BernoulliNB(binarize=None)
sk_pipe = Pipeline([
    ('df_converter', df_converter),
    ('encoder', ingredient_encoder),
    ('clf', nb_clf)
])

In [13]:
trainer = SklearnTrainer(
    estimator = sk_pipe,
    datasets = {
        'train' : ds,
    },
    label_column = LABEL_COLUMN,
    cv = 5,
    parallelize_cv = True,
    scaling_config = ScalingConfig(trainer_resources = {'CPU' : 5})
)

train_result = trainer.fit()

Trial name,status,loc,iter,total time (s),fit_time
SklearnTrainer_4ae05_00000,TERMINATED,127.0.0.1:19217,1,23.1445,6.73823




Result for SklearnTrainer_4ae05_00000:
  cv:
    fit_time: [6.570423126220703, 6.808997869491577, 6.523216962814331, 6.6626482009887695,
      6.584656000137329]
    fit_time_mean: 6.629988431930542
    fit_time_std: 0.10011417570422458
    score_time: [1.5979688167572021, 1.466637134552002, 1.5029540061950684, 1.4252598285675049,
      1.4267301559448242]
    score_time_mean: 1.4839099884033202
    score_time_std: 0.0638432720151937
    test_score: [0.8784641284641285, 0.8788803788803788, 0.879009879009879, 0.8782976282976283,
      0.8792318792318792]
    test_score_mean: 0.8787767787767787
    test_score_std: 0.00034627172106322605
  date: 2022-09-11_17-39-24
  done: false
  experiment_id: 5791ef84cf65414ea38fd5c6d512b686
  fit_time: 6.738233804702759
  hostname: GRM-MacBook-Prov.local
  iterations_since_restore: 1
  node_ip: 127.0.0.1
  pid: 19217
  should_checkpoint: true
  time_since_restore: 23.144503116607666
  time_this_iter_s: 23.144503116607666
  time_total_s: 23.14450311660

2022-09-11 17:39:24,410	INFO tune.py:758 -- Total run time: 24.48 seconds (24.37 seconds for the tuning loop).


In [14]:
train_result.metrics_dataframe

Unnamed: 0,fit_time,time_this_iter_s,should_checkpoint,done,timesteps_total,episodes_total,training_iteration,trial_id,experiment_id,date,...,warmup_time,cv/fit_time,cv/score_time,cv/test_score,cv/fit_time_mean,cv/fit_time_std,cv/score_time_mean,cv/score_time_std,cv/test_score_mean,cv/test_score_std
0,6.738234,23.144503,True,False,,,1,4ae05_00000,5791ef84cf65414ea38fd5c6d512b686,2022-09-11_17-39-24,...,0.002387,[6.57042313 6.80899787 6.52321696 6.6626482 6...,[1.59796882 1.46663713 1.50295401 1.42525983 1...,[0.87846413 0.87888038 0.87900988 0.87829763 0...,6.629988,0.100114,1.48391,0.063843,0.878777,0.000346


In [15]:
train_result.metrics

{'cv': {'fit_time': array([6.57042313, 6.80899787, 6.52321696, 6.6626482 , 6.584656  ]),
  'score_time': array([1.59796882, 1.46663713, 1.50295401, 1.42525983, 1.42673016]),
  'test_score': array([0.87846413, 0.87888038, 0.87900988, 0.87829763, 0.87923188]),
  'fit_time_mean': 6.629988431930542,
  'fit_time_std': 0.10011417570422458,
  'score_time_mean': 1.4839099884033202,
  'score_time_std': 0.0638432720151937,
  'test_score_mean': 0.8787767787767787,
  'test_score_std': 0.00034627172106322605},
 'fit_time': 6.738233804702759,
 'time_this_iter_s': 23.144503116607666,
 'should_checkpoint': True,
 'done': True,
 'timesteps_total': None,
 'episodes_total': None,
 'training_iteration': 1,
 'trial_id': '4ae05_00000',
 'experiment_id': '5791ef84cf65414ea38fd5c6d512b686',
 'date': '2022-09-11_17-39-24',
 'timestamp': 1662943164,
 'time_total_s': 23.144503116607666,
 'pid': 19217,
 'hostname': 'GRM-MacBook-Prov.local',
 'node_ip': '127.0.0.1',
 'config': {},
 'time_since_restore': 23.1445031

In [16]:
train_result.checkpoint

Checkpoint(local_path=/Users/rgareev/ray_results/SklearnTrainer_2022-09-11_17-38-59/SklearnTrainer_4ae05_00000_0_2022-09-11_17-39-00/checkpoint_000001)

In [17]:
train_result.checkpoint.to_directory(OUTPUT_MODEL_PATH)

'/Users/rgareev/projects/mlops-openfoodfacts/wrk/trainings/20220831-dev/model'

In [41]:
# TODO register experiment metrics