In [1]:
import ray

In [2]:
if ray.is_initialized():
    ray.shutdown()
ray.init()

2025-09-12 12:16:58,052	INFO worker.py:1843 -- Started a local Ray instance. View the dashboard at [1m[32m127.0.0.1:8265 [39m[22m


0,1
Python version:,3.10.12
Ray version:,2.44.1
Dashboard:,http://127.0.0.1:8265


[36m(reduce pid=940684)[0m Error calculating size for column 'title': cannot call `vectorize` on size 0 inputs unless `otypes` is set
[36m(reduce pid=940684)[0m Error calculating size for column 'description': cannot call `vectorize` on size 0 inputs unless `otypes` is set
[36m(reduce pid=940684)[0m Error calculating size for column 'tag': cannot call `vectorize` on size 0 inputs unless `otypes` is set
[36m(reduce pid=940616)[0m Error calculating size for column 'title': cannot call `vectorize` on size 0 inputs unless `otypes` is set
[36m(reduce pid=940616)[0m Error calculating size for column 'description': cannot call `vectorize` on size 0 inputs unless `otypes` is set
[36m(reduce pid=940616)[0m Error calculating size for column 'tag': cannot call `vectorize` on size 0 inputs unless `otypes` is set
[36m(reduce pid=940660)[0m Error calculating size for column 'title': cannot call `vectorize` on size 0 inputs unless `otypes` is set
[36m(reduce pid=940660)[0m Error calcu

In [53]:
ray.cluster_resources()

{'node:__internal_head__': 1.0,
 'node:172.27.14.77': 1.0,
 'CPU': 96.0,
 'memory': 877115883520.0,
 'accelerator_type:RTX': 1.0,
 'object_store_memory': 200000000000.0,
 'GPU': 1.0}

## Finetunning our own llm

In [54]:
import os
import random
import torch
import numpy as np
import ray
from ray.data.preprocessor import Preprocessor

In [55]:
def set_seed(seed=42):
    """Set seeds for reproducibility."""
    np.random.seed(seed)
    random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed(seed)
    #eval("setattr(torch.backends.cudnn, 'deterministic', True)")
    #eval("setattr(torch.backends.cudnn, 'benchmark', False)")
    os.environ['PYTHONHASHSEED'] = str(seed)

In [56]:
set_seed()

In [57]:
DATASET_LOC = "https://raw.githubusercontent.com/GokuMohandas/Made-With-ML/main/datasets/dataset.csv"

In [58]:
def load_data(num_samples=None, loc=DATASET_LOC):
    ds = ray.data.read_csv(loc)
    ds = ds.random_shuffle(seed=1234)
    ds = ray.data.from_items(ds.take(num_samples)) if num_samples else ds
    return ds

<div style="border-left: 4px solid #00c896; background-color:rgb(23, 22, 22); padding: 1em; border-radius: 8px; margin: 1em 0;">
  <p style="margin: 0; font-weight: bold; color: #059669;">💡 Tip</p>
  <p style="margin: 0;">
    When working with very large datasets, it's a good idea to limit the number of samples in our dataset so that we can execute our code quickly and iterate on bugs, etc.  
    This is why we have a <code>num_samples</code> input argument in our <code>load_data</code> function (<code>None</code> = no limit, all samples).
  </p>
</div>


## Helpful functions

In [9]:
import numpy as np
from transformers import BertTokenizer
import re
import nltk
from nltk.corpus import stopwords

In [10]:
nltk.download("stopwords")
STOPWORDS = stopwords.words("english")

[nltk_data] Downloading package stopwords to
[nltk_data]     /work/ngkuissi/nltk_data...
[nltk_data]   Package stopwords is already up-to-date!


In [11]:
tokenizer = BertTokenizer.from_pretrained("allenai/scibert_scivocab_uncased", return_dict=False)



In [12]:
def tokenize(batch, tokenizer=tokenizer):
    encoded_inputs = tokenizer(batch['text'].tolist(), return_tensors="np", padding="longest")
    return dict(ids=encoded_inputs['input_ids'], mask=encoded_inputs['attention_mask'],
                target=np.array(batch['tag']))

In [13]:
def clean_text(text, stopwords=STOPWORDS):
    """Clean raw text string."""
    text = text.lower()
    
    # remove stopwords
    pattern = re.compile(r'\b(' + r"|".join(stopwords) + r")\b\s*")
    text = pattern.sub("", text)
    
    text = re.sub(r"([!\"'#$%&()*\+,-./:;<=>?@\\\[\]^_`{|}~])", r" \1 ", text)  # add spacing
    text = re.sub("[^A-Za-z0-9]+", " ", text)  # remove non alphanumeric chars
    text = re.sub(" +", " ", text)  # remove multiple spaces
    text = text.strip()  # strip white space at the ends
    text = re.sub(r"http\S+", "", text)  #  remove links
    
    return text
    

In [14]:
def preprocess(df, class_to_index):
    """Preprocess the data."""
    df["text"] = df.title + " " + df.description  # feature engineering
    df["text"] = df.text.apply(clean_text)  # clean text
    df = df.drop(columns=["id", "created_on", "title", "description"], errors="ignore")  # clean dataframe
    df = df[["text", "tag"]]  # rearrange columns
    df["tag"] = df["tag"].map(class_to_index)  # label encoding
    outputs = tokenize(df)
    return outputs

In [15]:
class CustomPreprocessor(Preprocessor):
    
    def _fit(self, ds):
        tags = ds.unique(column="tag")
        self.class_to_index = {tag: i for i, tag in enumerate(tags)}
        self.index_to_class = {v:k for k, v in self.class_to_index.items()}
    
    def _transform_pandas(self, batch):
        return preprocess(batch, class_to_index=self.class_to_index)

In [193]:
def decode(indices, index_to_class):
    return [index_to_class[idx] for idx in indices]

## Model

In [16]:
import torch.nn as nn
import torch.nn.functional as F
from transformers import BertModel

In [17]:
llm = BertModel.from_pretrained("allenai/scibert_scivocab_uncased", return_dict=False)
embedding_dim = llm.config.hidden_size

Some weights of the model checkpoint at allenai/scibert_scivocab_uncased were not used when initializing BertModel: ['cls.predictions.bias', 'cls.predictions.transform.LayerNorm.bias', 'cls.predictions.transform.LayerNorm.weight', 'cls.seq_relationship.weight', 'cls.predictions.decoder.bias', 'cls.predictions.decoder.weight', 'cls.seq_relationship.bias', 'cls.predictions.transform.dense.weight', 'cls.predictions.transform.dense.bias']
- This IS expected if you are initializing BertModel from the checkpoint of a model trained on another task or with another architecture (e.g. initializing a BertForSequenceClassification model from a BertForPreTraining model).
- This IS NOT expected if you are initializing BertModel from the checkpoint of a model that you expect to be exactly identical (initializing a BertForSequenceClassification model from a BertForSequenceClassification model).


In [18]:
text = "Transfer learning with transformers for text classification."
batch = tokenizer([text], return_tensors="pt", padding="longest")
seq, pool = llm(input_ids=batch['input_ids'], attention_mask=batch['attention_mask'])

In [19]:
seq.shape

torch.Size([1, 10, 768])

In [20]:
pool.shape

torch.Size([1, 768])

In [21]:
class FinetunnedLLM(nn.Module):
    
    def __init__(self, llm, dropout_p, embedding_dim, num_classes):
        super(FinetunnedLLM, self).__init__()
        self.llm = llm
        self.dropout = nn.Dropout(dropout_p)
        self.classifier = nn.Linear(embedding_dim, num_classes)
    
    def forward(self, batch):
        ids, mask = batch['ids'], batch['mask']
        _, pool = self.llm(input_ids=ids, attention_mask = mask)
        
        return self.classifier(self.dropout(pool))

    @torch.inference_mode
    def predict(self, batch):
        self.eval()
        z = self(batch)
        return torch.argmax(z, dim=-1).cpu().numpy()

    @torch.inference_mode
    def predict_prob(self, batch):
        self.eval()
        z = self(batch)
        y_probs = F.softmax(z, dim=-1).cpu().numpy()
        return y_probs

In [22]:
model = FinetunnedLLM(llm, 0.5, embedding_dim, 4)

## Data setup

In [23]:
ray.data.DatasetContext.get_current().execution_options.preserve_order = True
ds = ray.data.read_csv(DATASET_LOC)
ds = ds.random_shuffle(seed=1234)

In [24]:
import sys
sys.path.append("..")
from madewithml.data import stratify_split

  import pkg_resources


In [25]:
test_size = 0.2
train_ds, val_ds = stratify_split(ds, stratify="tag", test_size=test_size)

In [26]:
tags = train_ds.unique(column="tag")
class_to_index = {tag: i for i, tag in enumerate(tags)}

2025-09-12 12:31:37,145	INFO dataset.py:2796 -- Tip: Use `take_batch()` instead of `take() / show()` to return records in pandas or numpy batch format.
2025-09-12 12:31:37,151	INFO streaming_executor.py:108 -- Starting execution of Dataset. Full logs are in /tmp/ray/session_2025-09-12_12-16-55_178831_940183/logs/ray-data
2025-09-12 12:31:37,152	INFO streaming_executor.py:109 -- Execution plan of Dataset: InputDataBuffer[Input] -> TaskPoolMapOperator[ReadCSV] -> AllToAllOperator[RandomShuffle] -> AllToAllOperator[Sort] -> AllToAllOperator[MapBatches(_add_split)->MapBatches(_filter_split)->RandomShuffle] -> AllToAllOperator[Aggregate] -> LimitOperator[limit=1]


Running 0: 0.00 row [00:00, ? row/s]

- ReadCSV->SplitBlocks(192) 1: 0.00 row [00:00, ? row/s]

- RandomShuffle 2: 0.00 row [00:00, ? row/s]

Shuffle Map 3:   0%|                                                                                          …

Shuffle Reduce 4:   0%|                                                                                       …

- Sort 5: 0.00 row [00:00, ? row/s]

Sort Sample 6:   0%|                                                                                          …

Shuffle Map 7:   0%|                                                                                          …

Shuffle Reduce 8:   0%|                                                                                       …

- MapBatches(_add_split)->MapBatches(_filter_split)->RandomShuffle 9: 0.00 row [00:00, ? row/s]

Shuffle Map 10:   0%|                                                                                         …

Shuffle Reduce 11:   0%|                                                                                      …

- Aggregate 12: 0.00 row [00:00, ? row/s]

Sort Sample 13:   0%|                                                                                         …

Shuffle Map 14:   0%|                                                                                         …

Shuffle Reduce 15:   0%|                                                                                      …

- limit=1 16: 0.00 row [00:00, ? row/s]



In [27]:
sample_ds = train_ds.map_batches(
    preprocess,
    fn_kwargs={"class_to_index": class_to_index},
    batch_format="pandas"
)

## Batching

In [28]:
from ray.train.torch import get_device

Created a temporary directory at /tmp/tmpplyp5tun
Writing /tmp/tmpplyp5tun/_remote_module_non_scriptable.py


In [29]:
device = "cuda" if torch.cuda.is_available() else "cpu"

In [30]:
def pad_array(arr, dtype=np.int32):
    max_len = max([len(row) for row in arr])
    padded_arr = np.zeros((arr.shape[0], max_len), dtype=dtype)
    for i, row in enumerate(arr):
        padded_arr[i, :len(row)]= row
    return padded_arr

In [31]:
def collate_fn(batch, device=device):
    batch["ids"] = pad_array(batch["ids"])
    batch["mask"] = pad_array(batch["mask"])
    dtypes = {"ids": torch.int32, "mask": torch.int32, "target": torch.int64}
    tensor_batch = {}
    for key, arr in batch.items():
        tensor_batch[key] = torch.as_tensor(list(arr), dtype=dtypes[key], device=device)
    return tensor_batch
    

In [32]:
sample_batch = sample_ds.take_batch(128)

2025-09-12 12:31:52,448	INFO streaming_executor.py:108 -- Starting execution of Dataset. Full logs are in /tmp/ray/session_2025-09-12_12-16-55_178831_940183/logs/ray-data
2025-09-12 12:31:52,448	INFO streaming_executor.py:109 -- Execution plan of Dataset: InputDataBuffer[Input] -> TaskPoolMapOperator[ReadCSV] -> AllToAllOperator[RandomShuffle] -> AllToAllOperator[Sort] -> AllToAllOperator[MapBatches(_add_split)->MapBatches(_filter_split)->RandomShuffle] -> TaskPoolMapOperator[MapBatches(preprocess)] -> LimitOperator[limit=128]


Running 0: 0.00 row [00:00, ? row/s]

- ReadCSV->SplitBlocks(192) 1: 0.00 row [00:00, ? row/s]

- RandomShuffle 2: 0.00 row [00:00, ? row/s]

Shuffle Map 3:   0%|                                                                                          …

Shuffle Reduce 4:   0%|                                                                                       …

- Sort 5: 0.00 row [00:00, ? row/s]

Sort Sample 6:   0%|                                                                                          …

Shuffle Map 7:   0%|                                                                                          …

Shuffle Reduce 8:   0%|                                                                                       …

- MapBatches(_add_split)->MapBatches(_filter_split)->RandomShuffle 9: 0.00 row [00:00, ? row/s]

Shuffle Map 10:   0%|                                                                                         …

Shuffle Reduce 11:   0%|                                                                                      …

- MapBatches(preprocess) 12: 0.00 row [00:00, ? row/s]

- limit=128 13: 0.00 row [00:00, ? row/s]

In [33]:
collate_fn(sample_batch)

  tensor_batch[key] = torch.as_tensor(list(arr), dtype=dtypes[key], device=device)


{'ids': tensor([[  102,  1438,  4706,  ...,     0,     0,     0],
         [  102,  2389, 29477,  ...,     0,     0,     0],
         [  102, 24638, 15781,  ...,     0,     0,     0],
         ...,
         [  102,  1613, 15037,  ...,     0,     0,     0],
         [  102,  1572,  8039,  ...,     0,     0,     0],
         [  102,   856,  3997,  ...,     0,     0,     0]], device='cuda:0',
        dtype=torch.int32),
 'mask': tensor([[1, 1, 1,  ..., 0, 0, 0],
         [1, 1, 1,  ..., 0, 0, 0],
         [1, 1, 1,  ..., 0, 0, 0],
         ...,
         [1, 1, 1,  ..., 0, 0, 0],
         [1, 1, 1,  ..., 0, 0, 0],
         [1, 1, 1,  ..., 0, 0, 0]], device='cuda:0', dtype=torch.int32),
 'target': tensor([2, 0, 3, 0, 1, 1, 1, 0, 0, 2, 1, 2, 0, 0, 2, 0, 3, 0, 1, 1, 3, 0, 0, 1,
         1, 0, 0, 0, 0, 3, 0, 0, 1, 1, 2, 1, 1, 3, 3, 0, 0, 1, 1, 0, 0, 1, 1, 2,
         1, 1, 3, 0, 0, 1, 1, 0, 0, 2, 3, 1, 1, 3, 1, 1, 0, 0, 1, 1, 2, 1, 1, 3,
         0, 0, 1, 1, 0, 0, 3, 0, 0, 1, 1, 0, 0, 2, 3, 0,

## Utilities

In [103]:
from ray.air import session
from ray.air.config import CheckpointConfig, DatasetConfig, RunConfig, ScalingConfig
from ray.train.torch import prepare_model
from ray.train.torch import TorchCheckpoint, TorchTrainer
import torch.nn.functional as F

ImportError: cannot import name 'get_model_state_dict' from 'ray.train.torch' (/mnt/hpc/work/ngkuissi/Made-With-ML/venv/lib/python3.10/site-packages/ray/train/torch/__init__.py)

In [35]:
def train_step(ds, model, batch_size, num_classes, loss_fn, optimizer):
    model.train()
    loss = 0.0
    ds_generator = ds.iter_torch_batches(batch_size=batch_size, collate_fn=collate_fn)
    for i, batch in enumerate(ds_generator):
        optimizer.zero_grad()
        z = model(batch)
        targets = batch['target']
        e_loss = loss_fn(z.view(-1, z.shape[-1]), targets)
        e_loss.backward()
        optimizer.step()
        loss += (e_loss.detach().item() - loss) / (i + 1)
    return loss

In [36]:
def eval_step(ds, batch_size, model, num_classes, loss_fn):
    model.eval()
    loss = 0.0
    y_trues, y_preds = [], []
    ds_generator = ds.iter_torch_batches(batch_size=batch_size, collate_fn=collate_fn)
    
    with torch.inference_mode():
        for i, batch in enumerate(ds_generator):
            z = model(batch)
            targets = batch['target']
            e_loss = loss_fn(z.view(-1, z.shape[-1]), targets).item()
            loss += (e_loss - loss) / (i + 1)
            y_trues.extend(batch['target'].cpu().numpy())
            y_preds.extend(torch.argmax(z, dim=-1).cpu().numpy())
            
    return loss, np.vstack(y_trues), np.vstack(y_preds)
            

In [None]:
def train_loop_per_worker(config):
    dropout = config['dropout_p']
    lr = config['lr']
    lr_factor = config['lr_factor']
    lr_patience = config['lr_patience']
    num_epochs = config['num_epochs']
    batch_size = config['batch_size']
    num_classes = config['num_classes']
    device = config['device']
    
    #set_seed()
    train_ds = session.get_dataset_shard("train")
    val_ds = session.get_dataset_shard("val")
    
    llm = BertModel.from_pretrained("allenai/scibert_scivocab_uncased", return_dict=False).cuda()
    model = FinetunnedLLM(llm, dropout, llm.config.hidden_size, num_classes)
    model = prepare_model(model)
    
    loss_fn = nn.CrossEntropyLoss()
    optimizer = torch.optim.Adam(model.parameters(), lr=lr)
    scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode="min", factor=lr_factor, patience=lr_patience)
    
    batch_size_per_worker = batch_size // session.get_world_size()
    for epoch in range(num_epochs):
        train_loss = train_step(train_ds, model, batch_size_per_worker, num_classes, loss_fn, optimizer)
        val_loss, _, _ = eval_step(val_ds, batch_size_per_worker, model, num_classes, loss_fn)
        scheduler.step(val_loss)
        
        metrics = dict(epoch=epoch, lr=optimizer.param_groups[0]["lr"], train_loss=train_loss, val_loss=val_loss)
        checkpoint = TorchCheckpoint.from_state_dict(model)
        session.report(metrics, checkpoint=checkpoint)
    

In [90]:
train_loop_config = {
    "dropout_p": 0.5,
    "lr": 1e-4,
    "lr_factor": 0.8,
    "lr_patience": 3,
    "num_epochs": 10,
    "batch_size": 128,
    "num_classes": 4,
    "device": device
}

In [91]:
num_workers = 1
resources_per_worker = {"CPU": 10, "GPU": 1}

In [92]:
from ray.train import ScalingConfig

scaling_config = ScalingConfig(
    num_workers=num_workers,
    use_gpu=bool(resources_per_worker.get("GPU", 0)),
    resources_per_worker=resources_per_worker,
)

In [93]:
checkpoint_config = CheckpointConfig(num_to_keep=1, checkpoint_score_attribute="val_loss", checkpoint_score_order="min")

In [94]:
run_config = RunConfig(name="llm", storage_path="~/ray_results", checkpoint_config=checkpoint_config)

## Training

In [95]:
ds = load_data()
train_ds, val_ds = stratify_split(ds, stratify="tag", test_size=test_size)

In [96]:
preprocessor = CustomPreprocessor()
train_ds = preprocessor.fit_transform(train_ds)
val_ds = preprocessor.transform(val_ds)

2025-09-12 13:04:54,236	INFO streaming_executor.py:108 -- Starting execution of Dataset. Full logs are in /tmp/ray/session_2025-09-12_12-16-55_178831_940183/logs/ray-data
2025-09-12 13:04:54,236	INFO streaming_executor.py:109 -- Execution plan of Dataset: InputDataBuffer[Input] -> TaskPoolMapOperator[ReadCSV] -> AllToAllOperator[RandomShuffle] -> AllToAllOperator[Sort] -> AllToAllOperator[MapBatches(_add_split)->MapBatches(_filter_split)->RandomShuffle] -> AllToAllOperator[Aggregate] -> LimitOperator[limit=1]


Running 0: 0.00 row [00:00, ? row/s]

- ReadCSV->SplitBlocks(192) 1: 0.00 row [00:00, ? row/s]

- RandomShuffle 2: 0.00 row [00:00, ? row/s]

Shuffle Map 3:   0%|                                                                                          …

Shuffle Reduce 4:   0%|                                                                                       …

- Sort 5: 0.00 row [00:00, ? row/s]

Sort Sample 6:   0%|                                                                                          …

Shuffle Map 7:   0%|                                                                                          …

Shuffle Reduce 8:   0%|                                                                                       …

- MapBatches(_add_split)->MapBatches(_filter_split)->RandomShuffle 9: 0.00 row [00:00, ? row/s]

Shuffle Map 10:   0%|                                                                                         …

Shuffle Reduce 11:   0%|                                                                                      …

- Aggregate 12: 0.00 row [00:00, ? row/s]

Sort Sample 13:   0%|                                                                                         …

Shuffle Map 14:   0%|                                                                                         …

Shuffle Reduce 15:   0%|                                                                                      …

- limit=1 16: 0.00 row [00:00, ? row/s]

In [97]:
train_ds = train_ds.materialize()
val_ds = val_ds.materialize()

2025-09-12 13:05:06,920	INFO streaming_executor.py:108 -- Starting execution of Dataset. Full logs are in /tmp/ray/session_2025-09-12_12-16-55_178831_940183/logs/ray-data
2025-09-12 13:05:06,921	INFO streaming_executor.py:109 -- Execution plan of Dataset: InputDataBuffer[Input] -> TaskPoolMapOperator[ReadCSV] -> AllToAllOperator[RandomShuffle] -> AllToAllOperator[Sort] -> AllToAllOperator[MapBatches(_add_split)->MapBatches(_filter_split)->RandomShuffle] -> TaskPoolMapOperator[CustomPreprocessor]


Running 0: 0.00 row [00:00, ? row/s]

- ReadCSV->SplitBlocks(192) 1: 0.00 row [00:00, ? row/s]

- RandomShuffle 2: 0.00 row [00:00, ? row/s]

Shuffle Map 3:   0%|                                                                                          …

Shuffle Reduce 4:   0%|                                                                                       …

- Sort 5: 0.00 row [00:00, ? row/s]

Sort Sample 6:   0%|                                                                                          …

Shuffle Map 7:   0%|                                                                                          …

Shuffle Reduce 8:   0%|                                                                                       …

- MapBatches(_add_split)->MapBatches(_filter_split)->RandomShuffle 9: 0.00 row [00:00, ? row/s]

Shuffle Map 10:   0%|                                                                                         …

Shuffle Reduce 11:   0%|                                                                                      …

- CustomPreprocessor 12: 0.00 row [00:00, ? row/s]

2025-09-12 13:05:13,690	INFO streaming_executor.py:108 -- Starting execution of Dataset. Full logs are in /tmp/ray/session_2025-09-12_12-16-55_178831_940183/logs/ray-data
2025-09-12 13:05:13,691	INFO streaming_executor.py:109 -- Execution plan of Dataset: InputDataBuffer[Input] -> TaskPoolMapOperator[ReadCSV] -> AllToAllOperator[RandomShuffle] -> AllToAllOperator[Sort] -> AllToAllOperator[MapBatches(_add_split)->MapBatches(_filter_split)->RandomShuffle] -> TaskPoolMapOperator[CustomPreprocessor]


Running 0: 0.00 row [00:00, ? row/s]

- ReadCSV->SplitBlocks(192) 1: 0.00 row [00:00, ? row/s]

- RandomShuffle 2: 0.00 row [00:00, ? row/s]

Shuffle Map 3:   0%|                                                                                          …

Shuffle Reduce 4:   0%|                                                                                       …

- Sort 5: 0.00 row [00:00, ? row/s]

Sort Sample 6:   0%|                                                                                          …

Shuffle Map 7:   0%|                                                                                          …

Shuffle Reduce 8:   0%|                                                                                       …

- MapBatches(_add_split)->MapBatches(_filter_split)->RandomShuffle 9: 0.00 row [00:00, ? row/s]

Shuffle Map 10:   0%|                                                                                         …

Shuffle Reduce 11:   0%|                                                                                      …

- CustomPreprocessor 12: 0.00 row [00:00, ? row/s]

In [98]:
from ray.train import DataConfig

In [99]:
dataset_config = DataConfig(["train", "val"])

In [100]:
trainer = TorchTrainer(
    train_loop_per_worker=train_loop_per_worker,
    train_loop_config=train_loop_config,
    scaling_config=scaling_config,
    run_config= run_config,
    datasets= {"train": train_ds, "val": val_ds},
    dataset_config=dataset_config,
    #preprocessor=preprocessor   
)

In [101]:
results = trainer.fit()

2025-09-12 13:05:21,170	INFO tune.py:616 -- [output] This uses the legacy output and progress reporter, as Jupyter notebooks are not supported by the new engine, yet. For more information, please see https://github.com/ray-project/ray/issues/36949


== Status ==
Current time: 2025-09-12 13:05:21 (running for 00:00:00.11)
Using FIFO scheduling algorithm.
Logical resource usage: 11.0/96 CPUs, 1.0/1 GPUs (0.0/1.0 accelerator_type:RTX)
Result logdir: /tmp/ray/session_2025-09-12_12-16-55_178831_940183/artifacts/2025-09-12_13-05-21/llm/driver_artifacts
Number of trials: 1/1 (1 PENDING)


== Status ==
Current time: 2025-09-12 13:05:26 (running for 00:00:05.12)
Using FIFO scheduling algorithm.
Logical resource usage: 11.0/96 CPUs, 1.0/1 GPUs (0.0/1.0 accelerator_type:RTX)
Result logdir: /tmp/ray/session_2025-09-12_12-16-55_178831_940183/artifacts/2025-09-12_13-05-21/llm/driver_artifacts
Number of trials: 1/1 (1 PENDING)


== Status ==
Current time: 2025-09-12 13:05:31 (running for 00:00:10.18)
Using FIFO scheduling algorithm.
Logical resource usage: 11.0/96 CPUs, 1.0/1 GPUs (0.0/1.0 accelerator_type:RTX)
Result logdir: /tmp/ray/session_2025-09-12_12-16-55_178831_940183/artifacts/2025-09-12_13-05-21/llm/driver_artifacts
Number of trials: 1

(pid=1006923) Running 0: 0.00 row [00:00, ? row/s]

(pid=1006923) - split(1, equal=True) 1: 0.00 row [00:00, ? row/s]

== Status ==
Current time: 2025-09-12 13:05:36 (running for 00:00:15.20)
Using FIFO scheduling algorithm.
Logical resource usage: 11.0/96 CPUs, 1.0/1 GPUs (0.0/1.0 accelerator_type:RTX)
Result logdir: /tmp/ray/session_2025-09-12_12-16-55_178831_940183/artifacts/2025-09-12_13-05-21/llm/driver_artifacts
Number of trials: 1/1 (1 RUNNING)




2025-09-12 13:05:37,054	ERROR tune_controller.py:1331 -- Trial task failed for trial TorchTrainer_aa638_00000
Traceback (most recent call last):
  File "/mnt/hpc/work/ngkuissi/Made-With-ML/venv/lib/python3.10/site-packages/ray/air/execution/_internal/event_manager.py", line 110, in resolve_future
    result = ray.get(future)
  File "/mnt/hpc/work/ngkuissi/Made-With-ML/venv/lib/python3.10/site-packages/ray/_private/auto_init_hook.py", line 21, in auto_init_wrapper
    return fn(*args, **kwargs)
  File "/mnt/hpc/work/ngkuissi/Made-With-ML/venv/lib/python3.10/site-packages/ray/_private/client_mode_hook.py", line 103, in wrapper
    return func(*args, **kwargs)
  File "/mnt/hpc/work/ngkuissi/Made-With-ML/venv/lib/python3.10/site-packages/ray/_private/worker.py", line 2782, in get
    values, debugger_breakpoint = worker.get_objects(object_refs, timeout=timeout)
  File "/mnt/hpc/work/ngkuissi/Made-With-ML/venv/lib/python3.10/site-packages/ray/_private/worker.py", line 929, in get_objects
  

== Status ==
Current time: 2025-09-12 13:05:37 (running for 00:00:15.89)
Using FIFO scheduling algorithm.
Logical resource usage: 11.0/96 CPUs, 1.0/1 GPUs (0.0/1.0 accelerator_type:RTX)
Result logdir: /tmp/ray/session_2025-09-12_12-16-55_178831_940183/artifacts/2025-09-12_13-05-21/llm/driver_artifacts
Number of trials: 1/1 (1 ERROR)
Number of errored trials: 1
+--------------------------+--------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------+
| Trial name               |   # failures | error file                                                                                                                                                     |
|--------------------------+--------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------|
| TorchTrainer_aa638_0

TrainingFailedError: The Ray Train run failed. Please inspect the previous error messages for a cause. After fixing the issue (assuming that the error is not caused by your own application logic, but rather an error such as OOM), you can restart the run from scratch or continue this run.
To continue this run, you can use: `trainer = TorchTrainer.restore("/work/ngkuissi/ray_results/llm")`.
To start a new run that will retry on training failures, set `train.RunConfig(failure_config=train.FailureConfig(max_failures))` in the Trainer's `run_config` with `max_failures > 0`, or `max_failures = -1` for unlimited retries.

In [50]:
results.metrics_dataframe.columns

Index(['message', 'timestamp', 'checkpoint_dir_name', 'done',
       'training_iteration', 'trial_id', 'date', 'time_this_iter_s',
       'time_total_s', 'pid', 'hostname', 'node_ip', 'time_since_restore',
       'iterations_since_restore', 'config/train_loop_config/dropout_p',
       'config/train_loop_config/lr', 'config/train_loop_config/lr_factor',
       'config/train_loop_config/lr_patience',
       'config/train_loop_config/num_epochs',
       'config/train_loop_config/batch_size',
       'config/train_loop_config/num_classes',
       'config/train_loop_config/device', 'epoch', 'lr', 'train_loss',
       'val_loss', 'should_checkpoint'],
      dtype='object')

In [51]:
results.metrics_dataframe[['epoch', 'train_loss', 'val_loss']]

Unnamed: 0,epoch,train_loss,val_loss
0,,,
1,0.0,1.278447,1.118461
2,1.0,1.01407,0.667756
3,2.0,0.585977,0.613653
4,3.0,0.293521,0.533704
5,4.0,0.150316,0.473259
6,5.0,0.077945,0.524334
7,6.0,0.038293,0.579658
8,7.0,0.022495,0.582006
9,8.0,0.0062,0.551338


In [52]:
results.best_checkpoints

[(Checkpoint(filesystem=local, path=/work/ngkuissi/ray_results/llm/TorchTrainer_0e39d_00000_0_2025-09-12_12-32-21/checkpoint_000004),
  {'epoch': 4,
   'lr': 0.0001,
   'train_loss': 0.15031552463769912,
   'val_loss': 0.47325916588306427,
   'timestamp': 1757694769,
   'checkpoint_dir_name': 'checkpoint_000004',
   'should_checkpoint': True,
   'done': False,
   'training_iteration': 6,
   'trial_id': '0e39d_00000',
   'date': '2025-09-12_12-32-49',
   'time_this_iter_s': 2.6137330532073975,
   'time_total_s': 22.434314727783203,
   'pid': 964091,
   'hostname': 'gpu-pt1-04',
   'node_ip': '172.27.14.77',
   'config': {'train_loop_config': {'dropout_p': 0.5,
     'lr': 0.0001,
     'lr_factor': 0.8,
     'lr_patience': 3,
     'num_epochs': 10,
     'batch_size': 128,
     'num_classes': 4,
     'device': 'cuda'}},
   'time_since_restore': 22.434314727783203,
   'iterations_since_restore': 6})]

In [62]:
from ray.train.torch import TorchPredictor
from sklearn.metrics import precision_recall_fscore_support

In [65]:
llm = BertModel.from_pretrained("allenai/scibert_scivocab_uncased", return_dict=False).cuda()
model = FinetunnedLLM(llm, 0.5, llm.config.hidden_size, 4)

Some weights of the model checkpoint at allenai/scibert_scivocab_uncased were not used when initializing BertModel: ['cls.predictions.bias', 'cls.predictions.transform.LayerNorm.bias', 'cls.predictions.transform.LayerNorm.weight', 'cls.seq_relationship.weight', 'cls.predictions.decoder.bias', 'cls.predictions.decoder.weight', 'cls.seq_relationship.bias', 'cls.predictions.transform.dense.weight', 'cls.predictions.transform.dense.bias']
- This IS expected if you are initializing BertModel from the checkpoint of a model trained on another task or with another architecture (e.g. initializing a BertForSequenceClassification model from a BertForPreTraining model).
- This IS NOT expected if you are initializing BertModel from the checkpoint of a model that you expect to be exactly identical (initializing a BertForSequenceClassification model from a BertForSequenceClassification model).


In [130]:
results.best_checkpoints

[(Checkpoint(filesystem=local, path=/work/ngkuissi/ray_results/llm/TorchTrainer_0e39d_00000_0_2025-09-12_12-32-21/checkpoint_000004),
  {'epoch': 4,
   'lr': 0.0001,
   'train_loss': 0.15031552463769912,
   'val_loss': 0.47325916588306427,
   'timestamp': 1757694769,
   'checkpoint_dir_name': 'checkpoint_000004',
   'should_checkpoint': True,
   'done': False,
   'training_iteration': 6,
   'trial_id': '0e39d_00000',
   'date': '2025-09-12_12-32-49',
   'time_this_iter_s': 2.6137330532073975,
   'time_total_s': 22.434314727783203,
   'pid': 964091,
   'hostname': 'gpu-pt1-04',
   'node_ip': '172.27.14.77',
   'config': {'train_loop_config': {'dropout_p': 0.5,
     'lr': 0.0001,
     'lr_factor': 0.8,
     'lr_patience': 3,
     'num_epochs': 10,
     'batch_size': 128,
     'num_classes': 4,
     'device': 'cuda'}},
   'time_since_restore': 22.434314727783203,
   'iterations_since_restore': 6})]

In [None]:
best_checkpoint = results.best_checkpoints[0][0].path + "/model.pt"
state_dict = torch.load(best_checkpoint)
model.load_state_dict(state_dict)

In [136]:
model

FinetunnedLLM(
  (llm): BertModel(
    (embeddings): BertEmbeddings(
      (word_embeddings): Embedding(31090, 768, padding_idx=0)
      (position_embeddings): Embedding(512, 768)
      (token_type_embeddings): Embedding(2, 768)
      (LayerNorm): LayerNorm((768,), eps=1e-12, elementwise_affine=True)
      (dropout): Dropout(p=0.1, inplace=False)
    )
    (encoder): BertEncoder(
      (layer): ModuleList(
        (0-11): 12 x BertLayer(
          (attention): BertAttention(
            (self): BertSelfAttention(
              (query): Linear(in_features=768, out_features=768, bias=True)
              (key): Linear(in_features=768, out_features=768, bias=True)
              (value): Linear(in_features=768, out_features=768, bias=True)
              (dropout): Dropout(p=0.1, inplace=False)
            )
            (output): BertSelfOutput(
              (dense): Linear(in_features=768, out_features=768, bias=True)
              (LayerNorm): LayerNorm((768,), eps=1e-12, elementwise_affi

In [138]:
HOLDOUT_LOC = "https://raw.githubusercontent.com/GokuMohandas/Made-With-ML/main/datasets/holdout.csv"
test_ds = ray.data.read_csv(HOLDOUT_LOC)
preprocessed_ds = preprocessor.transform(test_ds)

In [186]:
preprocessed_ds = preprocessor.transform(test_ds)
values = preprocessed_ds.select_columns(cols=["target"]).take_all()
y_true = np.stack([item["target"] for item in values])

2025-09-12 15:08:31,121	INFO streaming_executor.py:108 -- Starting execution of Dataset. Full logs are in /tmp/ray/session_2025-09-12_12-16-55_178831_940183/logs/ray-data
2025-09-12 15:08:31,122	INFO streaming_executor.py:109 -- Execution plan of Dataset: InputDataBuffer[Input] -> TaskPoolMapOperator[ReadCSV] -> TaskPoolMapOperator[CustomPreprocessor->Project]


Running 0: 0.00 row [00:00, ? row/s]

- ReadCSV->SplitBlocks(192) 1: 0.00 row [00:00, ? row/s]

- CustomPreprocessor->Project 2: 0.00 row [00:00, ? row/s]

In [187]:
y_true

array([3, 3, 3, 0, 1, 0, 0, 0, 0, 1, 0, 0, 1, 3, 0, 0, 1, 1, 3, 1, 3, 0,
       3, 1, 0, 1, 1, 2, 2, 1, 1, 1, 1, 1, 1, 0, 0, 0, 0, 0, 2, 2, 1, 0,
       0, 3, 2, 1, 0, 1, 1, 3, 3, 0, 1, 3, 1, 3, 3, 3, 3, 0, 0, 0, 0, 1,
       1, 0, 1, 2, 0, 1, 3, 0, 0, 1, 1, 1, 1, 1, 0, 0, 1, 0, 2, 0, 0, 0,
       0, 3, 0, 0, 1, 0, 1, 1, 3, 1, 0, 1, 0, 1, 0, 3, 0, 0, 0, 0, 0, 1,
       0, 0, 1, 1, 1, 1, 3, 0, 1, 0, 1, 0, 1, 3, 3, 3, 1, 0, 1, 1, 1, 1,
       0, 1, 1, 1, 0, 2, 1, 1, 1, 1, 1, 2, 1, 0, 3, 0, 1, 1, 2, 2, 1, 0,
       0, 0, 0, 0, 0, 1, 1, 1, 0, 1, 2, 2, 1, 0, 0, 2, 1, 3, 1, 1, 1, 0,
       0, 1, 0, 1, 0, 3, 0, 1, 1, 0, 2, 1, 2, 1, 1])

In [161]:
model = model.to("cuda")

In [179]:
ds_generator = preprocessed_ds.iter_torch_batches(batch_size=128, collate_fn=collate_fn)
y_pred = None
model.eval()
with torch.inference_mode():
    for i, batch in enumerate(ds_generator):
        z = model(batch)
        if y_pred is not None:
            y_pred = torch.concat([y_pred, torch.argmax(z, dim=-1)], dim=0)
        else:
            y_pred = torch.argmax(z, dim=-1)

2025-09-12 14:07:16,974	INFO streaming_executor.py:108 -- Starting execution of Dataset. Full logs are in /tmp/ray/session_2025-09-12_12-16-55_178831_940183/logs/ray-data
2025-09-12 14:07:16,974	INFO streaming_executor.py:109 -- Execution plan of Dataset: InputDataBuffer[Input] -> TaskPoolMapOperator[ReadCSV] -> TaskPoolMapOperator[CustomPreprocessor]


Running 0: 0.00 row [00:00, ? row/s]

- ReadCSV->SplitBlocks(192) 1: 0.00 row [00:00, ? row/s]

- CustomPreprocessor 2: 0.00 row [00:00, ? row/s]

In [180]:
y_pred = y_pred.cpu().numpy()

In [182]:
metrics = precision_recall_fscore_support(y_true, y_pred, average="weighted")

In [184]:
{"precision": metrics[0], "recall": metrics[1], "f1": metrics[2]}

{'precision': 0.9132518906340897,
 'recall': 0.9057591623036649,
 'f1': 0.9054612316063202}

In [188]:
def evaluate(ds, model, preprocessor=preprocessor, device="cuda"):
    # y_true
    preprocessed_ds = preprocessor.transform(ds)
    values = preprocessed_ds.select_columns(cols=["target"]).take_all()
    y_true = np.stack([item["target"] for item in values])
    
    # y_pred
    model = model.to(device)
    ds_generator = preprocessed_ds.iter_torch_batches(batch_size=128, collate_fn=lambda x: collate_fn(x, device=device))
    y_pred = None
    model.eval()
    with torch.inference_mode():
        for i, batch in enumerate(ds_generator):
            z = model(batch)
            if y_pred is not None:
                y_pred = torch.concat([y_pred, torch.argmax(z, dim=-1)], dim=0)
            else:
                y_pred = torch.argmax(z, dim=-1)
    
    y_pred = y_pred.cpu().numpy()
    metrics = precision_recall_fscore_support(y_true, y_pred, average="weighted")
    return {"precision": metrics[0], "recall": metrics[1], "f1": metrics[2]}

In [189]:
evaluate(test_ds, model)

2025-09-12 15:11:04,379	INFO streaming_executor.py:108 -- Starting execution of Dataset. Full logs are in /tmp/ray/session_2025-09-12_12-16-55_178831_940183/logs/ray-data
2025-09-12 15:11:04,380	INFO streaming_executor.py:109 -- Execution plan of Dataset: InputDataBuffer[Input] -> TaskPoolMapOperator[ReadCSV] -> TaskPoolMapOperator[CustomPreprocessor->Project]


Running 0: 0.00 row [00:00, ? row/s]

- ReadCSV->SplitBlocks(192) 1: 0.00 row [00:00, ? row/s]

- CustomPreprocessor->Project 2: 0.00 row [00:00, ? row/s]

2025-09-12 15:11:06,580	INFO streaming_executor.py:108 -- Starting execution of Dataset. Full logs are in /tmp/ray/session_2025-09-12_12-16-55_178831_940183/logs/ray-data
2025-09-12 15:11:06,581	INFO streaming_executor.py:109 -- Execution plan of Dataset: InputDataBuffer[Input] -> TaskPoolMapOperator[ReadCSV] -> TaskPoolMapOperator[CustomPreprocessor]


Running 0: 0.00 row [00:00, ? row/s]

- ReadCSV->SplitBlocks(192) 1: 0.00 row [00:00, ? row/s]

- CustomPreprocessor 2: 0.00 row [00:00, ? row/s]

{'precision': 0.9132518906340897,
 'recall': 0.9057591623036649,
 'f1': 0.9054612316063202}

## Inference

In [195]:
import pandas as pd

In [190]:
preprocessor.index_to_class

{0: 'computer-vision',
 1: 'natural-language-processing',
 2: 'mlops',
 3: 'other'}

In [192]:
def format_prob(prob, index_to_class):
    d = {}
    for i, item in enumerate(prob):
        d[index_to_class[i]] = item
    return d

In [212]:
def predict_prob(df, model, preprocessor=preprocessor, device="cuda"):
    
    processed = preprocessor._transform_pandas(df)
    processed = collate_fn(processed, device)
    model = model.to(device)
    output = model(processed)
    output = output.cpu().detach()
    y_prob = output.softmax(dim=1).numpy()
    
    res = []
    for i, prob in enumerate(y_prob):
        tag = decode([prob.argmax()], preprocessor.index_to_class)[0]
        res.append({"prediction": tag, "probabilities": format_prob(prob, preprocessor.index_to_class)})
    return res
    

In [213]:
title = "Transfer learning with transformers"
description = "Using transformers for transfer learning on text classification tasks."
sample_df = pd.DataFrame([{"title": title, "description": description, "tag": "natural-language-processing"}])

In [214]:
predict_prob(sample_df, model)

[{'prediction': 'natural-language-processing',
  'probabilities': {'computer-vision': 0.0018231588,
   'natural-language-processing': 0.9936919,
   'mlops': 0.0021260863,
   'other': 0.002358786}}]