# Made with ML

- https://github.com/GokuMohandas/Made-With-ML/blob/main/notebooks/madewithml.ipynb
- https://madewithml.com/

In [1]:
import os
import ray
import random
import torch
import torch.nn as nn
from ray.data.preprocessor import Preprocessor
from transformers import BertTokenizer
import json
import nltk
from nltk.corpus import stopwords
from nltk.stem import PorterStemmer
import re

In [2]:
import sys; sys.path.append("..")
import warnings; warnings.filterwarnings("ignore")
from dotenv import load_dotenv; load_dotenv()
%load_ext autoreload
%autoreload 2

In [3]:

import numpy as np
import pandas as pd
from sklearn.model_selection import train_test_split

In [4]:
# Initialize Ray
if ray.is_initialized():
    ray.shutdown()
ray.init()

2023-12-26 09:47:41,000	INFO worker.py:1633 -- Started a local Ray instance. View the dashboard at [1m[32m127.0.0.1:8265 [39m[22m


0,1
Python version:,3.10.11
Ray version:,2.7.0
Dashboard:,http://127.0.0.1:8265


In [5]:
ray.cluster_resources()

{'memory': 49214788404.0,
 'object_store_memory': 2147483648.0,
 'CPU': 10.0,
 'node:__internal_head__': 1.0,
 'node:127.0.0.1': 1.0}

In [6]:
num_workers = 6  # prefer to do a few less than total available CPU (1 for head node + 1 for background tasks)
resources_per_worker={"CPU": 1, "GPU": 0}

In [7]:
# Label to index
tags = ['mlops', 'natural-language-processing', 'computer-vision', 'other']
num_classes = len(tags)
class_to_index = {tag: i for i, tag in enumerate(tags)}
class_to_index

{'mlops': 0,
 'natural-language-processing': 1,
 'computer-vision': 2,
 'other': 3}

In [8]:
nltk.download("stopwords")
STOPWORDS = stopwords.words("english")

[nltk_data] Downloading package stopwords to
[nltk_data]     /Users/sguys99/nltk_data...
[nltk_data]   Package stopwords is already up-to-date!


In [9]:
def clean_text(text, stopwords=STOPWORDS):
    """Clean raw text string."""
    # Lower
    text = text.lower()

    # Remove stopwords
    pattern = re.compile(r'\b(' + r"|".join(stopwords) + r")\b\s*")
    text = pattern.sub('', text)

    # Spacing and filters
    text = re.sub(r"([!\"'#$%&()*\+,-./:;<=>?@\\\[\]^_`{|}~])", r" \1 ", text)  # add spacing
    text = re.sub("[^A-Za-z0-9]+", " ", text)  # remove non alphanumeric chars
    text = re.sub(" +", " ", text)  # remove multiple spaces
    text = text.strip()  # strip white space at the ends
    text = re.sub(r"http\S+", "", text)  #  remove links
    
    return text

In [10]:
# Bert tokenizer
tokenizer = BertTokenizer.from_pretrained("allenai/scibert_scivocab_uncased", return_dict=False)

In [11]:
def tokenize(batch):
    tokenizer = BertTokenizer.from_pretrained("allenai/scibert_scivocab_uncased", return_dict=False)
    encoded_inputs = tokenizer(batch["text"].tolist(), return_tensors="np", padding="longest")
    return dict(ids=encoded_inputs["input_ids"], masks=encoded_inputs["attention_mask"], targets=np.array(batch["tag"]))

In [12]:
def preprocess(df, class_to_index):
    """Preprocess the data."""
    df["text"] = df.title + " " + df.description  # feature engineering
    df["text"] = df.text.apply(clean_text)  # clean text
    df = df.drop(columns=["id", "created_on", "title", "description"], errors="ignore")  # clean dataframe
    df = df[["text", "tag"]]  # rearrange columns
    df["tag"] = df["tag"].map(class_to_index)  # label encoding
    outputs = tokenize(df)
    return outputs

In [13]:
from ray.data import Dataset
from typing import Dict, List, Tuple

def stratify_split(
    ds: Dataset,
    stratify: str,
    test_size: float,
    shuffle: bool = True,
    seed: int = 1234,
) -> Tuple[Dataset, Dataset]:
    """Split a dataset into train and test splits with equal
    amounts of data points from each class in the column we
    want to stratify on.

    Args:
        ds (Dataset): Input dataset to split.
        stratify (str): Name of column to split on.
        test_size (float): Proportion of dataset to split for test set.
        shuffle (bool, optional): whether to shuffle the dataset. Defaults to True.
        seed (int, optional): seed for shuffling. Defaults to 1234.

    Returns:
        Tuple[Dataset, Dataset]: the stratified train and test datasets.
    """

    def _add_split(df: pd.DataFrame) -> pd.DataFrame:  # pragma: no cover, used in parent function
        """Naively split a dataframe into train and test splits.
        Add a column specifying whether it's the train or test split."""
        train, test = train_test_split(df, test_size=test_size, shuffle=shuffle, random_state=seed)
        train["_split"] = "train"
        test["_split"] = "test"
        return pd.concat([train, test])

    def _filter_split(df: pd.DataFrame, split: str) -> pd.DataFrame:  # pragma: no cover, used in parent function
        """Filter by data points that match the split column's value
        and return the dataframe with the _split column dropped."""
        return df[df["_split"] == split].drop("_split", axis=1)

    # Train, test split with stratify
    grouped = ds.groupby(stratify).map_groups(_add_split, batch_format="pandas")  # group by each unique value in the column we want to stratify on
    train_ds = grouped.map_batches(_filter_split, fn_kwargs={"split": "train"}, batch_format="pandas")  # combine
    test_ds = grouped.map_batches(_filter_split, fn_kwargs={"split": "test"}, batch_format="pandas")  # combine

    # Shuffle each split (required)
    train_ds = train_ds.random_shuffle(seed=seed)
    test_ds = test_ds.random_shuffle(seed=seed)

    return train_ds, test_ds

In [14]:
ray.data.DatasetContext.get_current().execution_options.preserve_order = True

In [15]:
DATASET_LOC = "https://raw.githubusercontent.com/GokuMohandas/Made-With-ML/main/datasets/dataset.csv"
# df = pd.read_csv(DATASET_LOC)
# df.head()

In [16]:
# Data ingestion
ds = ray.data.read_csv(DATASET_LOC)
ds = ds.random_shuffle(seed=1234)
ds.take(1)

2023-12-26 09:47:43,514	INFO read_api.py:406 -- To satisfy the requested parallelism of 20, each read task output is split into 20 smaller blocks.
2023-12-26 09:47:43,526	INFO dataset.py:2380 -- Tip: Use `take_batch()` instead of `take() / show()` to return records in pandas or numpy batch format.
2023-12-26 09:47:43,528	INFO streaming_executor.py:93 -- Executing DAG InputDataBuffer[Input] -> TaskPoolMapOperator[ReadCSV->SplitBlocks(20)] -> AllToAllOperator[RandomShuffle] -> LimitOperator[limit=1]
2023-12-26 09:47:43,529	INFO streaming_executor.py:94 -- Execution config: ExecutionOptions(resource_limits=ExecutionResources(cpu=None, gpu=None, object_store_memory=None), locality_with_output=False, preserve_order=True, actor_locality_enabled=True, verbose_progress=False)
2023-12-26 09:47:43,530	INFO streaming_executor.py:96 -- Tip: For detailed progress reporting, run `ray.data.DataContext.get_current().execution_options.verbose_progress = True`


- RandomShuffle 1:   0%|          | 0/400 [00:00<?, ?it/s]

Shuffle Map 2:   0%|          | 0/400 [00:00<?, ?it/s]

Shuffle Reduce 3:   0%|          | 0/400 [00:00<?, ?it/s]

Running 0:   0%|          | 0/1 [00:00<?, ?it/s]

[{'id': 549,
  'created_on': datetime.datetime(2020, 4, 16, 3, 48, 35),
  'title': '15 Best Tools for Tracking Machine Learning Experiments',
  'description': 'A feature comparison of all the open-source and commercial options for experiment tracking.',
  'tag': 'mlops'}]

In [17]:
# Split dataset
test_size = 0.2
train_ds, val_ds = stratify_split(ds, stratify="tag", test_size=test_size)

2023-12-26 09:47:44,570	INFO streaming_executor.py:93 -- Executing DAG InputDataBuffer[Input] -> TaskPoolMapOperator[ReadCSV->SplitBlocks(20)] -> AllToAllOperator[RandomShuffle] -> LimitOperator[limit=1]
2023-12-26 09:47:44,570	INFO streaming_executor.py:94 -- Execution config: ExecutionOptions(resource_limits=ExecutionResources(cpu=None, gpu=None, object_store_memory=None), locality_with_output=False, preserve_order=True, actor_locality_enabled=True, verbose_progress=False)
2023-12-26 09:47:44,571	INFO streaming_executor.py:96 -- Tip: For detailed progress reporting, run `ray.data.DataContext.get_current().execution_options.verbose_progress = True`


- RandomShuffle 1:   0%|          | 0/400 [00:00<?, ?it/s]

Shuffle Map 2:   0%|          | 0/400 [00:00<?, ?it/s]

Shuffle Reduce 3:   0%|          | 0/400 [00:00<?, ?it/s]

Running 0:   0%|          | 0/1 [00:00<?, ?it/s]

In [18]:
# Mapping
tags = train_ds.unique(column="tag")
class_to_index = {tag: i for i, tag in enumerate(tags)}

2023-12-26 09:47:45,102	INFO streaming_executor.py:93 -- Executing DAG InputDataBuffer[Input] -> TaskPoolMapOperator[ReadCSV->SplitBlocks(20)] -> AllToAllOperator[RandomShuffle] -> AllToAllOperator[Sort] -> AllToAllOperator[MapBatches(group_fn)->MapBatches(_filter_split)->RandomShuffle] -> LimitOperator[limit=1]
2023-12-26 09:47:45,103	INFO streaming_executor.py:94 -- Execution config: ExecutionOptions(resource_limits=ExecutionResources(cpu=None, gpu=None, object_store_memory=None), locality_with_output=False, preserve_order=True, actor_locality_enabled=True, verbose_progress=False)
2023-12-26 09:47:45,103	INFO streaming_executor.py:96 -- Tip: For detailed progress reporting, run `ray.data.DataContext.get_current().execution_options.verbose_progress = True`


- RandomShuffle 1:   0%|          | 0/400 [00:00<?, ?it/s]

Shuffle Map 2:   0%|          | 0/400 [00:00<?, ?it/s]

Shuffle Reduce 3:   0%|          | 0/400 [00:00<?, ?it/s]

- Sort 4:   0%|          | 0/400 [00:00<?, ?it/s]

Sort Sample 5:   0%|          | 0/400 [00:00<?, ?it/s]

Shuffle Map 6:   0%|          | 0/400 [00:00<?, ?it/s]

Shuffle Reduce 7:   0%|          | 0/400 [00:00<?, ?it/s]

- MapBatches(group_fn)->MapBatches(_filter_split)->RandomShuffle 8:   0%|          | 0/400 [00:00<?, ?it/s]

Shuffle Map 9:   0%|          | 0/400 [00:00<?, ?it/s]

Shuffle Reduce 10:   0%|          | 0/400 [00:00<?, ?it/s]

Running 0:   0%|          | 0/1 [00:00<?, ?it/s]

Sort Sample 0:   0%|          | 0/20 [00:00<?, ?it/s]

2023-12-26 09:47:46,127	INFO streaming_executor.py:93 -- Executing DAG InputDataBuffer[Input] -> TaskPoolMapOperator[ReadCSV->SplitBlocks(20)] -> AllToAllOperator[RandomShuffle] -> AllToAllOperator[Sort] -> AllToAllOperator[MapBatches(group_fn)->MapBatches(_filter_split)->RandomShuffle] -> AllToAllOperator[Aggregate] -> TaskPoolMapOperator[MapBatches(<lambda>)]
2023-12-26 09:47:46,128	INFO streaming_executor.py:94 -- Execution config: ExecutionOptions(resource_limits=ExecutionResources(cpu=None, gpu=None, object_store_memory=None), locality_with_output=False, preserve_order=True, actor_locality_enabled=True, verbose_progress=False)
2023-12-26 09:47:46,129	INFO streaming_executor.py:96 -- Tip: For detailed progress reporting, run `ray.data.DataContext.get_current().execution_options.verbose_progress = True`


- RandomShuffle 1:   0%|          | 0/400 [00:00<?, ?it/s]

Shuffle Map 2:   0%|          | 0/400 [00:00<?, ?it/s]

Shuffle Reduce 3:   0%|          | 0/400 [00:00<?, ?it/s]

- Sort 4:   0%|          | 0/400 [00:00<?, ?it/s]

Sort Sample 5:   0%|          | 0/400 [00:00<?, ?it/s]

Shuffle Map 6:   0%|          | 0/400 [00:00<?, ?it/s]

Shuffle Reduce 7:   0%|          | 0/400 [00:00<?, ?it/s]

- MapBatches(group_fn)->MapBatches(_filter_split)->RandomShuffle 8:   0%|          | 0/400 [00:00<?, ?it/s]

Shuffle Map 9:   0%|          | 0/400 [00:00<?, ?it/s]

Shuffle Reduce 10:   0%|          | 0/400 [00:00<?, ?it/s]

- Aggregate 11:   0%|          | 0/400 [00:00<?, ?it/s]

Shuffle Map 12:   0%|          | 0/400 [00:00<?, ?it/s]

Shuffle Reduce 13:   0%|          | 0/400 [00:00<?, ?it/s]

Running 0:   0%|          | 0/400 [00:00<?, ?it/s]

Sort Sample 0:   0%|          | 0/20 [00:00<?, ?it/s]

Sort Sample 0:   0%|          | 0/20 [00:00<?, ?it/s]

In [19]:
import os
import random
import torch
from ray.data.preprocessor import Preprocessor

In [20]:
def set_seeds(seed=42):
    """Set seeds for reproducibility."""
    np.random.seed(seed)
    random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed(seed)
    eval("setattr(torch.backends.cudnn, 'deterministic', True)")
    eval("setattr(torch.backends.cudnn, 'benchmark', False)")
    os.environ["PYTHONHASHSEED"] = str(seed)

In [21]:
def load_data(num_samples=None):
    ds = ray.data.read_csv(DATASET_LOC)
    ds = ds.random_shuffle(seed=1234)
    ds = ray.data.from_items(ds.take(num_samples)) if num_samples else ds
    return ds

In [22]:
class CustomPreprocessor():
    """Custom preprocessor class."""
    def __init__(self, class_to_index={}):
        self.class_to_index = class_to_index or {}  # mutable defaults
        self.index_to_class = {v: k for k, v in self.class_to_index.items()}
        
    def fit(self, ds):
        tags = ds.unique(column="tag")
        self.class_to_index = {tag: i for i, tag in enumerate(tags)}
        self.index_to_class = {v:k for k, v in self.class_to_index.items()}
        return self
    
    def transform(self, ds):
        return ds.map_batches(
            preprocess, 
            fn_kwargs={"class_to_index": self.class_to_index}, 
            batch_format="pandas")

In [23]:
import torch.nn as nn
from transformers import BertModel

In [24]:

# Pretrained LLM
llm = BertModel.from_pretrained("allenai/scibert_scivocab_uncased", return_dict=False)
embedding_dim = llm.config.hidden_size

Some weights of the model checkpoint at allenai/scibert_scivocab_uncased were not used when initializing BertModel: ['cls.predictions.bias', 'cls.predictions.transform.LayerNorm.weight', 'cls.predictions.decoder.weight', 'cls.predictions.transform.LayerNorm.bias', 'cls.seq_relationship.bias', 'cls.predictions.transform.dense.weight', 'cls.predictions.decoder.bias', 'cls.seq_relationship.weight', 'cls.predictions.transform.dense.bias']
- This IS expected if you are initializing BertModel from the checkpoint of a model trained on another task or with another architecture (e.g. initializing a BertForSequenceClassification model from a BertForPreTraining model).
- This IS NOT expected if you are initializing BertModel from the checkpoint of a model that you expect to be exactly identical (initializing a BertForSequenceClassification model from a BertForSequenceClassification model).


In [25]:
class FinetunedLLM(nn.Module):
    def __init__(self, llm, dropout_p, embedding_dim, num_classes):
        super(FinetunedLLM, self).__init__()
        self.llm = llm
        self.dropout_p = dropout_p
        self.embedding_dim = embedding_dim
        self.num_classes = num_classes
        self.dropout = torch.nn.Dropout(dropout_p)
        self.fc1 = torch.nn.Linear(embedding_dim, num_classes)

    def forward(self, batch):
        ids, masks = batch["ids"], batch["masks"]
        seq, pool = self.llm(input_ids=ids, attention_mask=masks)
        z = self.dropout(pool)
        z = self.fc1(z)
        return z
    
    @torch.inference_mode()
    def predict(self, batch):
        self.eval()
        z = self(batch)
        y_pred = torch.argmax(z, dim=1).cpu().numpy()
        return y_pred
    
    @torch.inference_mode()
    def predict_proba(self, batch):
        self.eval()
        z = self(batch)
        y_probs = F.softmax(z, dim=1).cpu().numpy()
        return y_probs
    
    def save(self, dp):
        with open(Path(dp, "args.json"), "w") as fp:
            contents = {
                "dropout_p": self.dropout_p,
                "embedding_dim": self.embedding_dim,
                "num_classes": self.num_classes,
            }
            json.dump(contents, fp, indent=4, sort_keys=False)
        torch.save(self.state_dict(), os.path.join(dp, "model.pt"))

    @classmethod
    def load(cls, args_fp, state_dict_fp):
        with open(args_fp, "r") as fp:
            kwargs = json.load(fp=fp)
        llm = BertModel.from_pretrained("allenai/scibert_scivocab_uncased", return_dict=False)
        model = cls(llm=llm, **kwargs)
        model.load_state_dict(torch.load(state_dict_fp, map_location=torch.device("cpu")))
        return model


In [26]:
# Initialize model
model = FinetunedLLM(llm=llm, dropout_p=0.5, embedding_dim=embedding_dim, num_classes=num_classes)
print (model.named_parameters)

<bound method Module.named_parameters of FinetunedLLM(
  (llm): BertModel(
    (embeddings): BertEmbeddings(
      (word_embeddings): Embedding(31090, 768, padding_idx=0)
      (position_embeddings): Embedding(512, 768)
      (token_type_embeddings): Embedding(2, 768)
      (LayerNorm): LayerNorm((768,), eps=1e-12, elementwise_affine=True)
      (dropout): Dropout(p=0.1, inplace=False)
    )
    (encoder): BertEncoder(
      (layer): ModuleList(
        (0-11): 12 x BertLayer(
          (attention): BertAttention(
            (self): BertSelfAttention(
              (query): Linear(in_features=768, out_features=768, bias=True)
              (key): Linear(in_features=768, out_features=768, bias=True)
              (value): Linear(in_features=768, out_features=768, bias=True)
              (dropout): Dropout(p=0.1, inplace=False)
            )
            (output): BertSelfOutput(
              (dense): Linear(in_features=768, out_features=768, bias=True)
              (LayerNorm): Layer

In [27]:
from ray.train.torch import get_device

In [28]:
def pad_array(arr, dtype=np.int32):
    max_len = max(len(row) for row in arr)
    padded_arr = np.zeros((arr.shape[0], max_len), dtype=dtype)
    for i, row in enumerate(arr):
        padded_arr[i][:len(row)] = row
    return padded_arr

In [29]:
def collate_fn(batch):
    batch["ids"] = pad_array(batch["ids"])
    batch["masks"] = pad_array(batch["masks"])
    dtypes = {"ids": torch.int32, "masks": torch.int32, "targets": torch.int64}
    tensor_batch = {}
    for key, array in batch.items():
        tensor_batch[key] = torch.as_tensor(array, dtype=dtypes[key], device=get_device())
    return tensor_batch

In [30]:
from pathlib import Path
import ray.train as train
from ray.train import Checkpoint, CheckpointConfig, DataConfig, RunConfig, ScalingConfig
from ray.train.torch import TorchCheckpoint, TorchTrainer
import tempfile
import torch.nn.functional as F
from torch.nn.parallel.distributed import DistributedDataParallel

In [31]:
def train_step(ds, batch_size, model, num_classes, loss_fn, optimizer):
    """Train step."""
    model.train()
    loss = 0.0
    ds_generator = ds.iter_torch_batches(batch_size=batch_size, collate_fn=collate_fn)
    for i, batch in enumerate(ds_generator):
        optimizer.zero_grad()  # reset gradients
        z = model(batch)  # forward pass
        targets = F.one_hot(batch["targets"], num_classes=num_classes).float()  # one-hot (for loss_fn)
        J = loss_fn(z, targets)  # define loss
        J.backward()  # backward pass
        optimizer.step()  # update weights
        loss += (J.detach().item() - loss) / (i + 1)  # cumulative loss
    return loss

In [32]:
def eval_step(ds, batch_size, model, num_classes, loss_fn):
    """Eval step."""
    model.eval()
    loss = 0.0
    y_trues, y_preds = [], []
    ds_generator = ds.iter_torch_batches(batch_size=batch_size, collate_fn=collate_fn)
    with torch.inference_mode():
        for i, batch in enumerate(ds_generator):
            z = model(batch)
            targets = F.one_hot(batch["targets"], num_classes=num_classes).float()  # one-hot (for loss_fn)
            J = loss_fn(z, targets).item()
            loss += (J - loss) / (i + 1)
            y_trues.extend(batch["targets"].cpu().numpy())
            y_preds.extend(torch.argmax(z, dim=1).cpu().numpy())
    return loss, np.vstack(y_trues), np.vstack(y_preds)

In [33]:
# Training loop
def train_loop_per_worker(config):
    # Hyperparameters
    dropout_p = config["dropout_p"]
    lr = config["lr"]
    lr_factor = config["lr_factor"]
    lr_patience = config["lr_patience"]
    num_epochs = config["num_epochs"]
    batch_size = config["batch_size"]
    num_classes = config["num_classes"]

    # Get datasets
    set_seeds()
    train_ds = train.get_dataset_shard("train")
    val_ds = train.get_dataset_shard("val")

    # Model
    llm = BertModel.from_pretrained("allenai/scibert_scivocab_uncased", return_dict=False)
    model = FinetunedLLM(llm=llm, dropout_p=dropout_p, embedding_dim=llm.config.hidden_size, num_classes=num_classes)
    model = train.torch.prepare_model(model)

    # Training components
    loss_fn = nn.BCEWithLogitsLoss()
    optimizer = torch.optim.Adam(model.parameters(), lr=lr)
    scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode="min", factor=lr_factor, patience=lr_patience)

    # Training
    num_workers = train.get_context().get_world_size()
    batch_size_per_worker = batch_size // num_workers
    for epoch in range(num_epochs):
        # Step
        train_loss = train_step(train_ds, batch_size_per_worker, model, num_classes, loss_fn, optimizer)
        val_loss, _, _ = eval_step(val_ds, batch_size_per_worker, model, num_classes, loss_fn)
        scheduler.step(val_loss)

        # Checkpoint
        with tempfile.TemporaryDirectory() as dp:
            if isinstance(model, DistributedDataParallel):  # cpu
                model.module.save(dp=dp)
            else:
                model.save(dp=dp)
            metrics = dict(epoch=epoch, lr=optimizer.param_groups[0]["lr"], train_loss=train_loss, val_loss=val_loss)
            checkpoint = Checkpoint.from_directory(dp)
            train.report(metrics, checkpoint=checkpoint)

In [34]:
# Train loop config
train_loop_config = {
    "dropout_p": 0.5,
    "lr": 1e-4,
    "lr_factor": 0.8,
    "lr_patience": 3,
    "num_epochs": 10,
    "batch_size": 256,
    "num_classes": num_classes,
}

In [35]:
# Scaling config
scaling_config = ScalingConfig(
    num_workers=num_workers,
    use_gpu=bool(resources_per_worker["GPU"]),
    resources_per_worker=resources_per_worker
)

In [36]:
# Run config
checkpoint_config = CheckpointConfig(num_to_keep=1, checkpoint_score_attribute="val_loss", checkpoint_score_order="min")
run_config = RunConfig(name="llm", checkpoint_config=checkpoint_config, storage_path="~/ray/ray_results")
# 주의 : 경로를 "~/ray/ray_results"로 수정해야 동작함

In [37]:
# Dataset
ds = load_data()
train_ds, val_ds = stratify_split(ds, stratify="tag", test_size=test_size)

2023-12-26 09:47:49,041	INFO read_api.py:406 -- To satisfy the requested parallelism of 20, each read task output is split into 20 smaller blocks.
2023-12-26 09:47:49,044	INFO streaming_executor.py:93 -- Executing DAG InputDataBuffer[Input] -> TaskPoolMapOperator[ReadCSV->SplitBlocks(20)] -> AllToAllOperator[RandomShuffle] -> LimitOperator[limit=1]
2023-12-26 09:47:49,045	INFO streaming_executor.py:94 -- Execution config: ExecutionOptions(resource_limits=ExecutionResources(cpu=None, gpu=None, object_store_memory=None), locality_with_output=False, preserve_order=True, actor_locality_enabled=True, verbose_progress=False)
2023-12-26 09:47:49,045	INFO streaming_executor.py:96 -- Tip: For detailed progress reporting, run `ray.data.DataContext.get_current().execution_options.verbose_progress = True`


- RandomShuffle 1:   0%|          | 0/400 [00:00<?, ?it/s]

Shuffle Map 2:   0%|          | 0/400 [00:00<?, ?it/s]

Shuffle Reduce 3:   0%|          | 0/400 [00:00<?, ?it/s]

Running 0:   0%|          | 0/1 [00:00<?, ?it/s]

In [38]:
# Preprocess
preprocessor = CustomPreprocessor()
preprocessor =  preprocessor.fit(train_ds)
train_ds = preprocessor.transform(train_ds)
val_ds = preprocessor.transform(val_ds)
train_ds = train_ds.materialize()
val_ds = val_ds.materialize()

2023-12-26 09:47:49,401	INFO streaming_executor.py:93 -- Executing DAG InputDataBuffer[Input] -> TaskPoolMapOperator[ReadCSV->SplitBlocks(20)] -> AllToAllOperator[RandomShuffle] -> AllToAllOperator[Sort] -> AllToAllOperator[MapBatches(group_fn)->MapBatches(_filter_split)->RandomShuffle] -> LimitOperator[limit=1]
2023-12-26 09:47:49,402	INFO streaming_executor.py:94 -- Execution config: ExecutionOptions(resource_limits=ExecutionResources(cpu=None, gpu=None, object_store_memory=None), locality_with_output=False, preserve_order=True, actor_locality_enabled=True, verbose_progress=False)
2023-12-26 09:47:49,402	INFO streaming_executor.py:96 -- Tip: For detailed progress reporting, run `ray.data.DataContext.get_current().execution_options.verbose_progress = True`


- RandomShuffle 1:   0%|          | 0/400 [00:00<?, ?it/s]

Shuffle Map 2:   0%|          | 0/400 [00:00<?, ?it/s]

Shuffle Reduce 3:   0%|          | 0/400 [00:00<?, ?it/s]

- Sort 4:   0%|          | 0/400 [00:00<?, ?it/s]

Sort Sample 5:   0%|          | 0/400 [00:00<?, ?it/s]

Shuffle Map 6:   0%|          | 0/400 [00:00<?, ?it/s]

Shuffle Reduce 7:   0%|          | 0/400 [00:00<?, ?it/s]

- MapBatches(group_fn)->MapBatches(_filter_split)->RandomShuffle 8:   0%|          | 0/400 [00:00<?, ?it/s]

Shuffle Map 9:   0%|          | 0/400 [00:00<?, ?it/s]

Shuffle Reduce 10:   0%|          | 0/400 [00:00<?, ?it/s]

Running 0:   0%|          | 0/1 [00:00<?, ?it/s]

Sort Sample 0:   0%|          | 0/20 [00:00<?, ?it/s]

2023-12-26 09:47:49,995	INFO streaming_executor.py:93 -- Executing DAG InputDataBuffer[Input] -> TaskPoolMapOperator[ReadCSV->SplitBlocks(20)] -> AllToAllOperator[RandomShuffle] -> AllToAllOperator[Sort] -> AllToAllOperator[MapBatches(group_fn)->MapBatches(_filter_split)->RandomShuffle] -> AllToAllOperator[Aggregate] -> TaskPoolMapOperator[MapBatches(<lambda>)]
2023-12-26 09:47:49,996	INFO streaming_executor.py:94 -- Execution config: ExecutionOptions(resource_limits=ExecutionResources(cpu=None, gpu=None, object_store_memory=None), locality_with_output=False, preserve_order=True, actor_locality_enabled=True, verbose_progress=False)
2023-12-26 09:47:49,996	INFO streaming_executor.py:96 -- Tip: For detailed progress reporting, run `ray.data.DataContext.get_current().execution_options.verbose_progress = True`


- RandomShuffle 1:   0%|          | 0/400 [00:00<?, ?it/s]

Shuffle Map 2:   0%|          | 0/400 [00:00<?, ?it/s]

Shuffle Reduce 3:   0%|          | 0/400 [00:00<?, ?it/s]

- Sort 4:   0%|          | 0/400 [00:00<?, ?it/s]

Sort Sample 5:   0%|          | 0/400 [00:00<?, ?it/s]

Shuffle Map 6:   0%|          | 0/400 [00:00<?, ?it/s]

Shuffle Reduce 7:   0%|          | 0/400 [00:00<?, ?it/s]

- MapBatches(group_fn)->MapBatches(_filter_split)->RandomShuffle 8:   0%|          | 0/400 [00:00<?, ?it/s]

Shuffle Map 9:   0%|          | 0/400 [00:00<?, ?it/s]

Shuffle Reduce 10:   0%|          | 0/400 [00:00<?, ?it/s]

- Aggregate 11:   0%|          | 0/400 [00:00<?, ?it/s]

Shuffle Map 12:   0%|          | 0/400 [00:00<?, ?it/s]

Shuffle Reduce 13:   0%|          | 0/400 [00:00<?, ?it/s]

Running 0:   0%|          | 0/400 [00:00<?, ?it/s]

Sort Sample 0:   0%|          | 0/20 [00:00<?, ?it/s]

Sort Sample 0:   0%|          | 0/20 [00:00<?, ?it/s]

2023-12-26 09:47:50,549	INFO streaming_executor.py:93 -- Executing DAG InputDataBuffer[Input] -> TaskPoolMapOperator[ReadCSV->SplitBlocks(20)] -> AllToAllOperator[RandomShuffle] -> AllToAllOperator[Sort] -> AllToAllOperator[MapBatches(group_fn)->MapBatches(_filter_split)->RandomShuffle] -> TaskPoolMapOperator[MapBatches(preprocess)]
2023-12-26 09:47:50,550	INFO streaming_executor.py:94 -- Execution config: ExecutionOptions(resource_limits=ExecutionResources(cpu=None, gpu=None, object_store_memory=None), locality_with_output=False, preserve_order=True, actor_locality_enabled=True, verbose_progress=False)
2023-12-26 09:47:50,550	INFO streaming_executor.py:96 -- Tip: For detailed progress reporting, run `ray.data.DataContext.get_current().execution_options.verbose_progress = True`


- RandomShuffle 1:   0%|          | 0/400 [00:00<?, ?it/s]

Shuffle Map 2:   0%|          | 0/400 [00:00<?, ?it/s]

Shuffle Reduce 3:   0%|          | 0/400 [00:00<?, ?it/s]

- Sort 4:   0%|          | 0/400 [00:00<?, ?it/s]

Sort Sample 5:   0%|          | 0/400 [00:00<?, ?it/s]

Shuffle Map 6:   0%|          | 0/400 [00:00<?, ?it/s]

Shuffle Reduce 7:   0%|          | 0/400 [00:00<?, ?it/s]

- MapBatches(group_fn)->MapBatches(_filter_split)->RandomShuffle 8:   0%|          | 0/400 [00:00<?, ?it/s]

Shuffle Map 9:   0%|          | 0/400 [00:00<?, ?it/s]

Shuffle Reduce 10:   0%|          | 0/400 [00:00<?, ?it/s]

Running 0:   0%|          | 0/400 [00:00<?, ?it/s]

Sort Sample 0:   0%|          | 0/20 [00:00<?, ?it/s]

2023-12-26 09:47:52,395	INFO streaming_executor.py:93 -- Executing DAG InputDataBuffer[Input] -> TaskPoolMapOperator[ReadCSV->SplitBlocks(20)] -> AllToAllOperator[RandomShuffle] -> AllToAllOperator[Sort] -> AllToAllOperator[MapBatches(group_fn)->MapBatches(_filter_split)->RandomShuffle] -> TaskPoolMapOperator[MapBatches(preprocess)]
2023-12-26 09:47:52,395	INFO streaming_executor.py:94 -- Execution config: ExecutionOptions(resource_limits=ExecutionResources(cpu=None, gpu=None, object_store_memory=None), locality_with_output=False, preserve_order=True, actor_locality_enabled=True, verbose_progress=False)
2023-12-26 09:47:52,396	INFO streaming_executor.py:96 -- Tip: For detailed progress reporting, run `ray.data.DataContext.get_current().execution_options.verbose_progress = True`


- RandomShuffle 1:   0%|          | 0/400 [00:00<?, ?it/s]

Shuffle Map 2:   0%|          | 0/400 [00:00<?, ?it/s]

Shuffle Reduce 3:   0%|          | 0/400 [00:00<?, ?it/s]

- Sort 4:   0%|          | 0/400 [00:00<?, ?it/s]

Sort Sample 5:   0%|          | 0/400 [00:00<?, ?it/s]

Shuffle Map 6:   0%|          | 0/400 [00:00<?, ?it/s]

Shuffle Reduce 7:   0%|          | 0/400 [00:00<?, ?it/s]

- MapBatches(group_fn)->MapBatches(_filter_split)->RandomShuffle 8:   0%|          | 0/400 [00:00<?, ?it/s]

Shuffle Map 9:   0%|          | 0/400 [00:00<?, ?it/s]

Shuffle Reduce 10:   0%|          | 0/400 [00:00<?, ?it/s]

Running 0:   0%|          | 0/400 [00:00<?, ?it/s]

Sort Sample 0:   0%|          | 0/20 [00:00<?, ?it/s]

In [39]:
# Dataset config
options = ray.data.ExecutionOptions(preserve_order=True)
dataset_config = DataConfig(
    datasets_to_split=["train"],
    execution_options=options)

In [40]:
# Trainer
trainer = TorchTrainer(
    train_loop_per_worker=train_loop_per_worker,
    train_loop_config=train_loop_config,
    scaling_config=scaling_config,
    run_config=run_config,
    datasets={"train": train_ds, "val": val_ds},
    dataset_config=dataset_config,
    metadata={"class_to_index": preprocessor.class_to_index}
)

In [41]:
%%time
# Train
#results = trainer.fit()

CPU times: user 1 µs, sys: 1 µs, total: 2 µs
Wall time: 4.05 µs


In [42]:
from sklearn.metrics import precision_recall_fscore_support

In [43]:
class TorchPredictor:
    def __init__(self, preprocessor, model):
        self.preprocessor = preprocessor
        self.model = model
        self.model.eval()
        
    def __call__(self, batch):
        results = self.model.predict(collate_fn(batch))
        return {"output": results}

    def predict_proba(self, batch):
        results = self.model.predict_proba(collate_fn(batch))
        return {"output": results}
        
    def get_preprocessor(self):
        return self.preprocessor
        
    @classmethod
    def from_checkpoint(cls, checkpoint):
        metadata = checkpoint.get_metadata()
        preprocessor = CustomPreprocessor(class_to_index=metadata["class_to_index"])
        model = FinetunedLLM.load(Path(checkpoint.path, "args.json"), Path(checkpoint.path, "model.pt"))
        return cls(preprocessor=preprocessor, model=model)

In [44]:
def evaluate(ds, predictor):
    # y_true
    preprocessor = predictor.get_preprocessor()
    preprocessed_ds = preprocessor.transform(ds)
    values = preprocessed_ds.select_columns(cols=["targets"]).take_all()
    y_true = np.stack([item["targets"] for item in values])
    
    # y_pred
    predictions = preprocessed_ds.map_batches(predictor).take_all()
    y_pred = np.array([d["output"] for d in predictions])

    # Evaluate
    metrics = precision_recall_fscore_support(y_true, y_pred, average="weighted")
    performance = {"precision": metrics[0], "recall": metrics[1], "f1": metrics[2]}
    return performance

In [45]:
def format_prob(prob, index_to_class):
    d = {}
    for i, item in enumerate(prob):
        d[index_to_class[i]] = item
    return d

In [46]:
def predict_proba(ds, predictor):
    preprocessor = predictor.get_preprocessor()
    preprocessed_ds = preprocessor.transform(ds)
    outputs = preprocessed_ds.map_batches(predictor.predict_proba)
    y_prob = np.array([d["output"] for d in outputs.take_all()])
    results = []
    for i, prob in enumerate(y_prob):
        tag = preprocessor.index_to_class[prob.argmax()]
        results.append({"prediction": tag, "probabilities": format_prob(prob, preprocessor.index_to_class)})
    return results

# 3. Model
## 3.2 Experiment Tracking

### Intuition

지금까지는 다양한 베이스라인을 훈련하고 평가해왔지만 이러한 실험을 실제로 추적하지는 않았습니다.  
이 문제를 해결하면서 향후 모든 실험(하이퍼파라미터 최적화 포함)에 사용할 실험 추적에 대한 적절한 프로세스를 정의할 것입니다.  
실험 추적은 매개변수, 메트릭, 모델 및 기타 아티팩트와 같은 다양한 실험과 그 구성 요소를 모두 관리하는 프로세스이며, 이를 통해 다음을 수행할 수 있습니다:

- 특정 실험에 필요한 모든 구성 요소를 정리합니다. 모든 것을 한곳에 모아두고 나중에 사용할 수 있도록 위치를 파악하는 것이 중요합니다.
- 저장된 실험을 사용해 과거 결과를 쉽게 재현할 수 있습니다.
- 시간, 데이터, 아이디어, 팀 등에 걸쳐 반복적인 개선 사항을 기록합니다.

### Tools

실험 추적을 위한 많은 옵션이 있지만, 필요한 모든 기능을 갖춘 100% 무료 오픈 소스인 MLFlow를 사용하겠습니다.   
자체 서버와 데이터베이스에서 MLFlow를 실행할 수 있으므로 스토리지 비용이나 제한이 없어 가장 인기 있는 옵션 중 하나이며 Microsoft, Facebook, Databricks 등에서 사용하고 있습니다.  
또한 Comet ML(Google AI, HuggingFace 등에서 사용), Neptune(Roche, NewYorker 등에서 사용), 가중치 및 편향성(Open AI, Toyota Research 등에서 사용) 등 몇 가지 인기 있는 옵션이 있습니다.  
이러한 솔루션은 대시보드, 보고서 등과 같은 기능을 제공하는 완전 관리형 솔루션입니다.

### Setup

In [47]:
import mlflow
from pathlib import Path
from ray.air.integrations.mlflow import MLflowLoggerCallback
import time

In [48]:
# Config MLflow
MODEL_REGISTRY = Path("/tmp/mlflow")
Path(MODEL_REGISTRY).mkdir(parents=True, exist_ok=True)
MLFLOW_TRACKING_URI = "file://" + str(MODEL_REGISTRY.absolute())
mlflow.set_tracking_uri(MLFLOW_TRACKING_URI)
print (mlflow.get_tracking_uri())

file:///tmp/mlflow


### Integration

Mlflow를 직접사용할수 있지만 대신 Ray를 사용하여 MLflow와 통합하겠습니다. 특히 실험에 필요한 모든 컴포넌트를 MLFLOW_TRACKING_URI에 지정된 위치에 자동으로 로깅하는 MLflowLoggerCallback을 사용하겠습니다. 물론 콜백에 의해 자동으로 기록되지 않는 것을 기록하려면 MLflow를 직접 사용할 수도 있습니다. 그리고 다른 실험 트래커를 사용하는 경우 Ray는 해당 트래커를 위한 통합 기능도 제공합니다.

In [49]:
# MLflow callback
experiment_name = f"llm-{int(time.time())}"
mlflow_callback = MLflowLoggerCallback(
    tracking_uri=MLFLOW_TRACKING_URI,
    experiment_name=experiment_name,
    save_artifact=True)

콜백을 정의한 후에는 이를 포함하도록 RunConfig를 업데이트하기만 하면 됩니다.

In [50]:
# Run configuration with MLflow callback
run_config = RunConfig(
    callbacks=[mlflow_callback],
    checkpoint_config=checkpoint_config,
)

### Training

이제 MLflow 콜백을 사용하여 업데이트된 RunConfig를 통해 모델을 훈련할 수 있으며 필요한 모든 컴포넌트가 MLflow에 기록됩니다. 이것은 지금까지 트레이닝 강의에서 사용했던 트레이닝 워크플로와 완전히 동일합니다.

In [51]:
# Dataset
ds = load_data()
train_ds, val_ds = stratify_split(ds, stratify="tag", test_size=test_size)

# Preprocess
preprocessor = CustomPreprocessor()
preprocessor = preprocessor.fit(train_ds)
train_ds = preprocessor.transform(train_ds)
val_ds = preprocessor.transform(val_ds)
train_ds = train_ds.materialize()
val_ds = val_ds.materialize()

# Trainer
# Trainer
trainer = TorchTrainer( # 이부분을 다음과 같이 구현해야 동작함
    train_loop_per_worker=train_loop_per_worker,
    train_loop_config=train_loop_config,
    scaling_config=scaling_config,
    run_config=run_config,  # uses RunConfig with MLflow callback
    datasets={"train": train_ds, "val": val_ds},
    dataset_config=dataset_config,
    metadata={"class_to_index": preprocessor.class_to_index}
)
# Train
results = trainer.fit()

0,1
Current time:,2023-12-26 09:51:52
Running for:,00:03:52.52
Memory:,38.3/64.0 GiB

Trial name,status,loc,iter,total time (s),epoch,lr,train_loss
TorchTrainer_6aadb_00000,TERMINATED,127.0.0.1:62394,10,228.037,9,0.0001,0.0430745


[2m[36m(TorchTrainer pid=62394)[0m Starting distributed worker processes: ['62395 (127.0.0.1)', '62396 (127.0.0.1)', '62397 (127.0.0.1)', '62398 (127.0.0.1)', '62399 (127.0.0.1)', '62400 (127.0.0.1)']
[2m[36m(RayTrainWorker pid=62395)[0m Setting up process group for: env:// [rank=0, world_size=6]
[2m[36m(SplitCoordinator pid=62403)[0m Auto configuring locality_with_output=['6c5777ec229c6145f1e345a1e93446f12463425c61752303979d149b', '6c5777ec229c6145f1e345a1e93446f12463425c61752303979d149b', '6c5777ec229c6145f1e345a1e93446f12463425c61752303979d149b', '6c5777ec229c6145f1e345a1e93446f12463425c61752303979d149b', '6c5777ec229c6145f1e345a1e93446f12463425c61752303979d149b', '6c5777ec229c6145f1e345a1e93446f12463425c61752303979d149b']
[2m[36m(RayTrainWorker pid=62398)[0m Some weights of the model checkpoint at allenai/scibert_scivocab_uncased were not used when initializing BertModel: ['cls.predictions.transform.dense.weight', 'cls.predictions.transform.LayerNorm.weight', 'cls.predi

(pid=62403) Running 0:   0%|          | 0/20 [00:00<?, ?it/s]

[2m[36m(SplitCoordinator pid=62403)[0m Executing DAG InputDataBuffer[Input] -> OutputSplitter[split(6, equal=True)]
[2m[36m(SplitCoordinator pid=62403)[0m Execution config: ExecutionOptions(resource_limits=ExecutionResources(cpu=None, gpu=None, object_store_memory=None), locality_with_output=['6c5777ec229c6145f1e345a1e93446f12463425c61752303979d149b', '6c5777ec229c6145f1e345a1e93446f12463425c61752303979d149b', '6c5777ec229c6145f1e345a1e93446f12463425c61752303979d149b', '6c5777ec229c6145f1e345a1e93446f12463425c61752303979d149b', '6c5777ec229c6145f1e345a1e93446f12463425c61752303979d149b', '6c5777ec229c6145f1e345a1e93446f12463425c61752303979d149b'], preserve_order=True, actor_locality_enabled=True, verbose_progress=False)
[2m[36m(SplitCoordinator pid=62403)[0m Tip: For detailed progress reporting, run `ray.data.DataContext.get_current().execution_options.verbose_progress = True`
[2m[36m(RayTrainWorker pid=62399)[0m Checkpoint successfully created at: Checkpoint(filesystem=loca

(pid=62403) Running 0:   0%|          | 0/20 [00:00<?, ?it/s]

[2m[36m(SplitCoordinator pid=62403)[0m Executing DAG InputDataBuffer[Input] -> OutputSplitter[split(6, equal=True)]
[2m[36m(SplitCoordinator pid=62403)[0m Execution config: ExecutionOptions(resource_limits=ExecutionResources(cpu=None, gpu=None, object_store_memory=None), locality_with_output=['6c5777ec229c6145f1e345a1e93446f12463425c61752303979d149b', '6c5777ec229c6145f1e345a1e93446f12463425c61752303979d149b', '6c5777ec229c6145f1e345a1e93446f12463425c61752303979d149b', '6c5777ec229c6145f1e345a1e93446f12463425c61752303979d149b', '6c5777ec229c6145f1e345a1e93446f12463425c61752303979d149b', '6c5777ec229c6145f1e345a1e93446f12463425c61752303979d149b'], preserve_order=True, actor_locality_enabled=True, verbose_progress=False)
[2m[36m(SplitCoordinator pid=62403)[0m Tip: For detailed progress reporting, run `ray.data.DataContext.get_current().execution_options.verbose_progress = True`
[2m[36m(RayTrainWorker pid=62400)[0m Checkpoint successfully created at: Checkpoint(filesystem=loca

(pid=62403) Running 0:   0%|          | 0/20 [00:00<?, ?it/s]

[2m[36m(SplitCoordinator pid=62403)[0m Executing DAG InputDataBuffer[Input] -> OutputSplitter[split(6, equal=True)]
[2m[36m(SplitCoordinator pid=62403)[0m Execution config: ExecutionOptions(resource_limits=ExecutionResources(cpu=None, gpu=None, object_store_memory=None), locality_with_output=['6c5777ec229c6145f1e345a1e93446f12463425c61752303979d149b', '6c5777ec229c6145f1e345a1e93446f12463425c61752303979d149b', '6c5777ec229c6145f1e345a1e93446f12463425c61752303979d149b', '6c5777ec229c6145f1e345a1e93446f12463425c61752303979d149b', '6c5777ec229c6145f1e345a1e93446f12463425c61752303979d149b', '6c5777ec229c6145f1e345a1e93446f12463425c61752303979d149b'], preserve_order=True, actor_locality_enabled=True, verbose_progress=False)
[2m[36m(SplitCoordinator pid=62403)[0m Tip: For detailed progress reporting, run `ray.data.DataContext.get_current().execution_options.verbose_progress = True`
[2m[36m(RayTrainWorker pid=62400)[0m Checkpoint successfully created at: Checkpoint(filesystem=loca

(pid=62403) Running 0:   0%|          | 0/20 [00:00<?, ?it/s]

[2m[36m(SplitCoordinator pid=62403)[0m Executing DAG InputDataBuffer[Input] -> OutputSplitter[split(6, equal=True)]
[2m[36m(SplitCoordinator pid=62403)[0m Execution config: ExecutionOptions(resource_limits=ExecutionResources(cpu=None, gpu=None, object_store_memory=None), locality_with_output=['6c5777ec229c6145f1e345a1e93446f12463425c61752303979d149b', '6c5777ec229c6145f1e345a1e93446f12463425c61752303979d149b', '6c5777ec229c6145f1e345a1e93446f12463425c61752303979d149b', '6c5777ec229c6145f1e345a1e93446f12463425c61752303979d149b', '6c5777ec229c6145f1e345a1e93446f12463425c61752303979d149b', '6c5777ec229c6145f1e345a1e93446f12463425c61752303979d149b'], preserve_order=True, actor_locality_enabled=True, verbose_progress=False)
[2m[36m(SplitCoordinator pid=62403)[0m Tip: For detailed progress reporting, run `ray.data.DataContext.get_current().execution_options.verbose_progress = True`
[2m[36m(RayTrainWorker pid=62400)[0m Checkpoint successfully created at: Checkpoint(filesystem=loca

(pid=62403) Running 0:   0%|          | 0/20 [00:00<?, ?it/s]

[2m[36m(SplitCoordinator pid=62403)[0m Executing DAG InputDataBuffer[Input] -> OutputSplitter[split(6, equal=True)]
[2m[36m(SplitCoordinator pid=62403)[0m Execution config: ExecutionOptions(resource_limits=ExecutionResources(cpu=None, gpu=None, object_store_memory=None), locality_with_output=['6c5777ec229c6145f1e345a1e93446f12463425c61752303979d149b', '6c5777ec229c6145f1e345a1e93446f12463425c61752303979d149b', '6c5777ec229c6145f1e345a1e93446f12463425c61752303979d149b', '6c5777ec229c6145f1e345a1e93446f12463425c61752303979d149b', '6c5777ec229c6145f1e345a1e93446f12463425c61752303979d149b', '6c5777ec229c6145f1e345a1e93446f12463425c61752303979d149b'], preserve_order=True, actor_locality_enabled=True, verbose_progress=False)
[2m[36m(SplitCoordinator pid=62403)[0m Tip: For detailed progress reporting, run `ray.data.DataContext.get_current().execution_options.verbose_progress = True`
[2m[36m(RayTrainWorker pid=62400)[0m Checkpoint successfully created at: Checkpoint(filesystem=loca

(pid=62403) Running 0:   0%|          | 0/20 [00:00<?, ?it/s]

[2m[36m(SplitCoordinator pid=62403)[0m Executing DAG InputDataBuffer[Input] -> OutputSplitter[split(6, equal=True)]
[2m[36m(SplitCoordinator pid=62403)[0m Execution config: ExecutionOptions(resource_limits=ExecutionResources(cpu=None, gpu=None, object_store_memory=None), locality_with_output=['6c5777ec229c6145f1e345a1e93446f12463425c61752303979d149b', '6c5777ec229c6145f1e345a1e93446f12463425c61752303979d149b', '6c5777ec229c6145f1e345a1e93446f12463425c61752303979d149b', '6c5777ec229c6145f1e345a1e93446f12463425c61752303979d149b', '6c5777ec229c6145f1e345a1e93446f12463425c61752303979d149b', '6c5777ec229c6145f1e345a1e93446f12463425c61752303979d149b'], preserve_order=True, actor_locality_enabled=True, verbose_progress=False)
[2m[36m(SplitCoordinator pid=62403)[0m Tip: For detailed progress reporting, run `ray.data.DataContext.get_current().execution_options.verbose_progress = True`
[2m[36m(RayTrainWorker pid=62400)[0m Checkpoint successfully created at: Checkpoint(filesystem=loca

(pid=62403) Running 0:   0%|          | 0/20 [00:00<?, ?it/s]

[2m[36m(SplitCoordinator pid=62403)[0m Executing DAG InputDataBuffer[Input] -> OutputSplitter[split(6, equal=True)]
[2m[36m(SplitCoordinator pid=62403)[0m Execution config: ExecutionOptions(resource_limits=ExecutionResources(cpu=None, gpu=None, object_store_memory=None), locality_with_output=['6c5777ec229c6145f1e345a1e93446f12463425c61752303979d149b', '6c5777ec229c6145f1e345a1e93446f12463425c61752303979d149b', '6c5777ec229c6145f1e345a1e93446f12463425c61752303979d149b', '6c5777ec229c6145f1e345a1e93446f12463425c61752303979d149b', '6c5777ec229c6145f1e345a1e93446f12463425c61752303979d149b', '6c5777ec229c6145f1e345a1e93446f12463425c61752303979d149b'], preserve_order=True, actor_locality_enabled=True, verbose_progress=False)
[2m[36m(SplitCoordinator pid=62403)[0m Tip: For detailed progress reporting, run `ray.data.DataContext.get_current().execution_options.verbose_progress = True`
[2m[36m(RayTrainWorker pid=62398)[0m Checkpoint successfully created at: Checkpoint(filesystem=loca

(pid=62403) Running 0:   0%|          | 0/20 [00:00<?, ?it/s]

[2m[36m(SplitCoordinator pid=62403)[0m Executing DAG InputDataBuffer[Input] -> OutputSplitter[split(6, equal=True)]
[2m[36m(SplitCoordinator pid=62403)[0m Execution config: ExecutionOptions(resource_limits=ExecutionResources(cpu=None, gpu=None, object_store_memory=None), locality_with_output=['6c5777ec229c6145f1e345a1e93446f12463425c61752303979d149b', '6c5777ec229c6145f1e345a1e93446f12463425c61752303979d149b', '6c5777ec229c6145f1e345a1e93446f12463425c61752303979d149b', '6c5777ec229c6145f1e345a1e93446f12463425c61752303979d149b', '6c5777ec229c6145f1e345a1e93446f12463425c61752303979d149b', '6c5777ec229c6145f1e345a1e93446f12463425c61752303979d149b'], preserve_order=True, actor_locality_enabled=True, verbose_progress=False)
[2m[36m(SplitCoordinator pid=62403)[0m Tip: For detailed progress reporting, run `ray.data.DataContext.get_current().execution_options.verbose_progress = True`
[2m[36m(RayTrainWorker pid=62399)[0m Checkpoint successfully created at: Checkpoint(filesystem=loca

(pid=62403) Running 0:   0%|          | 0/20 [00:00<?, ?it/s]

[2m[36m(SplitCoordinator pid=62403)[0m Executing DAG InputDataBuffer[Input] -> OutputSplitter[split(6, equal=True)]
[2m[36m(SplitCoordinator pid=62403)[0m Execution config: ExecutionOptions(resource_limits=ExecutionResources(cpu=None, gpu=None, object_store_memory=None), locality_with_output=['6c5777ec229c6145f1e345a1e93446f12463425c61752303979d149b', '6c5777ec229c6145f1e345a1e93446f12463425c61752303979d149b', '6c5777ec229c6145f1e345a1e93446f12463425c61752303979d149b', '6c5777ec229c6145f1e345a1e93446f12463425c61752303979d149b', '6c5777ec229c6145f1e345a1e93446f12463425c61752303979d149b', '6c5777ec229c6145f1e345a1e93446f12463425c61752303979d149b'], preserve_order=True, actor_locality_enabled=True, verbose_progress=False)
[2m[36m(SplitCoordinator pid=62403)[0m Tip: For detailed progress reporting, run `ray.data.DataContext.get_current().execution_options.verbose_progress = True`
[2m[36m(RayTrainWorker pid=62395)[0m Checkpoint successfully created at: Checkpoint(filesystem=loca

(pid=62403) Running 0:   0%|          | 0/20 [00:00<?, ?it/s]

[2m[36m(SplitCoordinator pid=62403)[0m Executing DAG InputDataBuffer[Input] -> OutputSplitter[split(6, equal=True)]
[2m[36m(SplitCoordinator pid=62403)[0m Execution config: ExecutionOptions(resource_limits=ExecutionResources(cpu=None, gpu=None, object_store_memory=None), locality_with_output=['6c5777ec229c6145f1e345a1e93446f12463425c61752303979d149b', '6c5777ec229c6145f1e345a1e93446f12463425c61752303979d149b', '6c5777ec229c6145f1e345a1e93446f12463425c61752303979d149b', '6c5777ec229c6145f1e345a1e93446f12463425c61752303979d149b', '6c5777ec229c6145f1e345a1e93446f12463425c61752303979d149b', '6c5777ec229c6145f1e345a1e93446f12463425c61752303979d149b'], preserve_order=True, actor_locality_enabled=True, verbose_progress=False)
[2m[36m(SplitCoordinator pid=62403)[0m Tip: For detailed progress reporting, run `ray.data.DataContext.get_current().execution_options.verbose_progress = True`
[2m[36m(RayTrainWorker pid=62398)[0m Checkpoint successfully created at: Checkpoint(filesystem=loca

In [52]:
results.metrics_dataframe

Unnamed: 0,epoch,lr,train_loss,val_loss,timestamp,should_checkpoint,done,training_iteration,trial_id,date,...,time_since_restore,iterations_since_restore,checkpoint_dir_name,config/train_loop_config/dropout_p,config/train_loop_config/lr,config/train_loop_config/lr_factor,config/train_loop_config/lr_patience,config/train_loop_config/num_epochs,config/train_loop_config/batch_size,config/train_loop_config/num_classes
0,0,0.0001,0.572482,0.495559,1703551716,True,False,1,6aadb_00000,2023-12-26_09-48-36,...,34.772266,1,checkpoint_000000,0.5,0.0001,0.8,3,10,256,4
1,1,0.0001,0.478798,0.447897,1703551738,True,False,2,6aadb_00000,2023-12-26_09-48-58,...,56.95267,2,checkpoint_000001,0.5,0.0001,0.8,3,10,256,4
2,2,0.0001,0.400449,0.349853,1703551759,True,False,3,6aadb_00000,2023-12-26_09-49-20,...,78.416745,3,checkpoint_000002,0.5,0.0001,0.8,3,10,256,4
3,3,0.0001,0.294455,0.276391,1703551781,True,False,4,6aadb_00000,2023-12-26_09-49-41,...,99.817919,4,checkpoint_000003,0.5,0.0001,0.8,3,10,256,4
4,4,0.0001,0.213965,0.235674,1703551804,True,False,5,6aadb_00000,2023-12-26_09-50-04,...,123.048597,5,checkpoint_000004,0.5,0.0001,0.8,3,10,256,4
5,5,0.0001,0.145549,0.227897,1703551827,True,False,6,6aadb_00000,2023-12-26_09-50-27,...,145.340903,6,checkpoint_000005,0.5,0.0001,0.8,3,10,256,4
6,6,0.0001,0.09769,0.185525,1703551846,True,False,7,6aadb_00000,2023-12-26_09-50-48,...,166.22351,7,checkpoint_000006,0.5,0.0001,0.8,3,10,256,4
7,7,0.0001,0.067569,0.177923,1703551867,True,False,8,6aadb_00000,2023-12-26_09-51-09,...,186.848497,8,checkpoint_000007,0.5,0.0001,0.8,3,10,256,4
8,8,0.0001,0.05402,0.201359,1703551888,True,False,9,6aadb_00000,2023-12-26_09-51-29,...,207.290346,9,checkpoint_000008,0.5,0.0001,0.8,3,10,256,4
9,9,0.0001,0.043075,0.186953,1703551909,True,False,10,6aadb_00000,2023-12-26_09-51-50,...,228.036804,10,checkpoint_000009,0.5,0.0001,0.8,3,10,256,4


In [53]:
# Sorted runs
sorted_runs = mlflow.search_runs(experiment_names=[experiment_name], order_by=["metrics.val_loss ASC"])
sorted_runs

Unnamed: 0,run_id,experiment_id,status,artifact_uri,start_time,end_time,metrics.config/train_loop_config/num_epochs,metrics.config/train_loop_config/lr_patience,metrics.config/train_loop_config/dropout_p,metrics.config/train_loop_config/num_classes,...,metrics.val_loss,params.train_loop_config/batch_size,params.train_loop_config/lr_patience,params.train_loop_config/lr,params.train_loop_config/num_epochs,params.train_loop_config/lr_factor,params.train_loop_config/num_classes,params.train_loop_config/dropout_p,tags.mlflow.runName,tags.trial_name
0,280cdc68854349b2ba4a8c772626cf73,943005253295196078,FINISHED,file:///tmp/mlflow/943005253295196078/280cdc68...,2023-12-26 00:48:01.530000+00:00,2023-12-26 00:51:52.139000+00:00,10.0,3.0,0.5,4.0,...,0.186953,256,3,0.0001,10,0.8,4,0.5,TorchTrainer_6aadb_00000,TorchTrainer_6aadb_00000


In [54]:
# Best run
best_run = sorted_runs.iloc[0]
best_run

run_id                                                           280cdc68854349b2ba4a8c772626cf73
experiment_id                                                                  943005253295196078
status                                                                                   FINISHED
artifact_uri                                    file:///tmp/mlflow/943005253295196078/280cdc68...
start_time                                                       2023-12-26 00:48:01.530000+00:00
end_time                                                         2023-12-26 00:51:52.139000+00:00
metrics.config/train_loop_config/num_epochs                                                  10.0
metrics.config/train_loop_config/lr_patience                                                  3.0
metrics.config/train_loop_config/dropout_p                                                    0.5
metrics.config/train_loop_config/num_classes                                                  4.0
metrics.epoch       

### Dashboard

 - 작동안함

### Loading

In [55]:
from ray.air import Result
from urllib.parse import urlparse

In [56]:
def get_best_checkpoint(run_id):
    artifact_dir = urlparse(mlflow.get_run(run_id).info.artifact_uri).path  # get path from mlflow
    results = Result.from_path(artifact_dir)
    return results.best_checkpoints[0][0]

In [57]:
# # Evaluate on test split
# best_checkpoint = get_best_checkpoint(run_id=best_run.run_id)
# predictor = TorchPredictor.from_checkpoint(best_checkpoint)
# performance = evaluate(ds=test_ds, predictor=predictor)
# print (json.dumps(performance, indent=2))