# Creating the dataset

In [70]:
import json
from pathlib import Path
import tempfile
import sys; sys.path.append("..")

import ray

from app.config import ROOT_DIR
from app.util import stratify_split

with open(Path(ROOT_DIR, "experiments/evaluations/gpt-4/llama-2-70b-gtebase.json")) as f:
    data = json.load(f)

In [52]:
ds = ray.data.from_items([{"question": result["question"], "targets": 0 if result["score"] < 4 else 1} for result in data["results"]])
train_ds, val_ds = stratify_split(ds, stratify="targets", test_size=0.2)

In [21]:
import numpy as np
import torch
import torch.nn as nn
from transformers import BertModel, BertTokenizer

In [22]:
llm = BertModel.from_pretrained("allenai/scibert_scivocab_uncased", return_dict=False)
embedding_dim = llm.config.hidden_size

In [23]:
# Sample
tokenizer = BertTokenizer.from_pretrained("allenai/scibert_scivocab_uncased", return_dict=False)
text = "Transfer learning with transformers for text classification."
batch = tokenizer([text], return_tensors="np", padding="longest")
batch = {k: torch.tensor(v) for k, v in batch.items()}  # convert to torch tensors
seq, pool = llm(input_ids=batch["input_ids"], attention_mask=batch["attention_mask"])
np.shape(seq), np.shape(pool)

(torch.Size([1, 10, 768]), torch.Size([1, 768]))

In [24]:
class FinetunedLLM(nn.Module):
    def __init__(self, llm, dropout_p, embedding_dim, num_classes):
        super(FinetunedLLM, self).__init__()
        self.llm = llm
        self.dropout = torch.nn.Dropout(dropout_p)
        self.fc1 = torch.nn.Linear(embedding_dim, num_classes)

    def forward(self, batch):
        ids, masks = batch["ids"], batch["masks"]
        seq, pool = self.llm(input_ids=ids, attention_mask=masks)
        z = self.dropout(pool)
        z = self.fc1(z)
        return z
    
    @torch.inference_mode()
    def predict(self, batch):
        self.eval()
        z = self(inputs)
        y_pred = torch.argmax(z, dim=1).cpu().numpy()
        return y_pred
    
    @torch.inference_mode()
    def predict_proba(self, batch):
        self.eval()
        z = self(batch)
        y_probs = F.softmax(z).cpu().numpy()
        return y_probs

In [25]:
model = FinetunedLLM(llm=llm, dropout_p=0.5, embedding_dim=embedding_dim, num_classes=2)
print (model.named_parameters)

<bound method Module.named_parameters of FinetunedLLM(
  (llm): BertModel(
    (embeddings): BertEmbeddings(
      (word_embeddings): Embedding(31090, 768, padding_idx=0)
      (position_embeddings): Embedding(512, 768)
      (token_type_embeddings): Embedding(2, 768)
      (LayerNorm): LayerNorm((768,), eps=1e-12, elementwise_affine=True)
      (dropout): Dropout(p=0.1, inplace=False)
    )
    (encoder): BertEncoder(
      (layer): ModuleList(
        (0-11): 12 x BertLayer(
          (attention): BertAttention(
            (self): BertSelfAttention(
              (query): Linear(in_features=768, out_features=768, bias=True)
              (key): Linear(in_features=768, out_features=768, bias=True)
              (value): Linear(in_features=768, out_features=768, bias=True)
              (dropout): Dropout(p=0.1, inplace=False)
            )
            (output): BertSelfOutput(
              (dense): Linear(in_features=768, out_features=768, bias=True)
              (LayerNorm): Layer

In [53]:
def preprocess(batch):
    tokenizer = BertTokenizer.from_pretrained("allenai/scibert_scivocab_uncased", return_dict=False)
    encoded_inputs = tokenizer(batch["question"].tolist(), return_tensors="np", padding="longest")
    return {"ids": encoded_inputs["input_ids"], "masks": encoded_inputs["attention_mask"], "targets": batch["targets"]}

train_ds = train_ds.map_batches(preprocess)
val_ds = val_ds.map_batches(preprocess)

train_ds.count()

2023-08-22 13:46:11,234	INFO streaming_executor.py:93 -- Executing DAG InputDataBuffer[Input] -> AllToAllOperator[Sort] -> AllToAllOperator[MapBatches(group_fn)->MapBatches(_filter_split)->RandomShuffle] -> TaskPoolMapOperator[MapBatches(preprocess)]
2023-08-22 13:46:11,235	INFO streaming_executor.py:94 -- Execution config: ExecutionOptions(resource_limits=ExecutionResources(cpu=None, gpu=None, object_store_memory=None), locality_with_output=False, preserve_order=True, actor_locality_enabled=True, verbose_progress=False)
2023-08-22 13:46:11,235	INFO streaming_executor.py:96 -- Tip: For detailed progress reporting, run `ray.data.DataContext.get_current().execution_options.verbose_progress = True`


- Sort 1:   0%|          | 0/189 [00:00<?, ?it/s]

Sort Sample 2:   0%|          | 0/189 [00:00<?, ?it/s]

Shuffle Map 3:   0%|          | 0/189 [00:00<?, ?it/s]

Shuffle Reduce 4:   0%|          | 0/189 [00:00<?, ?it/s]

- MapBatches(group_fn)->MapBatches(_filter_split)->RandomShuffle 5:   0%|          | 0/189 [00:00<?, ?it/s]

Shuffle Map 6:   0%|          | 0/189 [00:00<?, ?it/s]

Shuffle Reduce 7:   0%|          | 0/189 [00:00<?, ?it/s]

Running 0:   0%|          | 0/189 [00:00<?, ?it/s]

Sort Sample 0:   0%|          | 0/189 [00:00<?, ?it/s]

150

In [54]:
from ray.train.torch import get_device

def pad_array(arr, dtype=np.int32):
    max_len = max(len(row) for row in arr)
    padded_arr = np.zeros((arr.shape[0], max_len), dtype=dtype)
    for i, row in enumerate(arr):
        padded_arr[i][:len(row)] = row
    return padded_arr

def collate_fn(batch):
    batch["ids"] = pad_array(batch["ids"])
    batch["masks"] = pad_array(batch["masks"])
    dtypes = {"ids": torch.int32, "masks": torch.int32, "targets": torch.int64}
    tensor_batch = {}
    for key, array in batch.items():
        tensor_batch[key] = torch.as_tensor(array, dtype=dtypes[key], device=get_device())
    return tensor_batch

In [55]:
from ray import train
from ray.train import Checkpoint, CheckpointConfig, DataConfig, RunConfig, ScalingConfig
from ray.train.torch import TorchTrainer
import torch.nn.functional as F

In [56]:
def train_step(ds, batch_size, model, num_classes, loss_fn, optimizer):
    """Train step."""
    model.train()
    loss = 0.0
    ds_generator = ds.iter_torch_batches(batch_size=batch_size, collate_fn=collate_fn)
    for i, batch in enumerate(ds_generator):
        optimizer.zero_grad()  # reset gradients
        z = model(batch)  # forward pass
        targets = F.one_hot(batch["targets"], num_classes=num_classes).float()  # one-hot (for loss_fn)
        J = loss_fn(z, targets)  # define loss
        J.backward()  # backward pass
        optimizer.step()  # update weights
        loss += (J.detach().item() - loss) / (i + 1)  # cumulative loss
    return loss

In [57]:
def eval_step(ds, batch_size, model, num_classes, loss_fn):
    """Eval step."""
    model.eval()
    loss = 0.0
    y_trues, y_preds = [], []
    ds_generator = ds.iter_torch_batches(batch_size=batch_size, collate_fn=collate_fn)
    with torch.inference_mode():
        for i, batch in enumerate(ds_generator):
            z = model(batch)
            targets = F.one_hot(batch["targets"], num_classes=num_classes).float()  # one-hot (for loss_fn)
            J = loss_fn(z, targets).item()
            loss += (J - loss) / (i + 1)
            y_trues.extend(batch["targets"].cpu().numpy())
            y_preds.extend(torch.argmax(z, dim=1).cpu().numpy())
    return loss, np.vstack(y_trues), np.vstack(y_preds)

In [71]:
# Training loop
def train_func(config):
    # Hyperparameters
    dropout_p = config["dropout_p"]
    lr = config["lr"]
    lr_factor = config["lr_factor"]
    lr_patience = config["lr_patience"]
    num_epochs = config["num_epochs"]
    batch_size = config["batch_size"]
    num_classes = config["num_classes"]

    # Get datasets
    # set_seeds()
    train_ds = train.get_dataset_shard("train")
    val_ds = train.get_dataset_shard("val")

    # Model
    llm = BertModel.from_pretrained("allenai/scibert_scivocab_uncased", return_dict=False)
    model = FinetunedLLM(llm=llm, dropout_p=dropout_p, embedding_dim=llm.config.hidden_size, num_classes=num_classes)
    model = train.torch.prepare_model(model)

    # Training components
    loss_fn = nn.BCEWithLogitsLoss()
    optimizer = torch.optim.Adam(model.parameters(), lr=lr)
    scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode="min", factor=lr_factor, patience=lr_patience)

    # Training
    batch_size_per_worker = batch_size // train.get_context().get_world_size()
    for epoch in range(num_epochs):
        # Step
        train_loss = train_step(train_ds, batch_size_per_worker, model, num_classes, loss_fn, optimizer)
        val_loss, _, _ = eval_step(val_ds, batch_size_per_worker, model, num_classes, loss_fn)
        scheduler.step(val_loss)

        # Checkpoint
        metrics = dict(epoch=epoch, lr=optimizer.param_groups[0]["lr"], train_loss=train_loss, val_loss=val_loss)
        with tempfile.TemporaryDirectory() as tmpdir:
            torch.save(model.state_dict(), os.path.join(tmpdir, "model.pt"))
            train.report(metrics, checkpoint=Checkpoint.from_directory(tmpdir))

[2m[1m[36m(autoscaler +17m29s)[0m [workspace snapshot] New snapshot created successfully (size: 8.04 MB).


In [65]:
# Train loop config
train_loop_config = {
    "dropout_p": 0.5,
    "lr": 1e-4,
    "lr_factor": 0.8,
    "lr_patience": 3,
    "num_epochs": 10,
    "batch_size": 256,
    "num_classes": 2,
}

In [66]:
# Scaling config
scaling_config = ScalingConfig(
    num_workers=1,
    use_gpu=True,
    resources_per_worker={"CPU": 10, "GPU": 1},
    _max_cpu_fraction_per_node=0.8,
)

In [67]:
# Run config
checkpoint_config = CheckpointConfig(num_to_keep=1, checkpoint_score_attribute="val_loss", checkpoint_score_order="min")
run_config = RunConfig(name="llm", checkpoint_config=checkpoint_config, local_dir="~/ray_results")



In [73]:
# Trainer
trainer = TorchTrainer(
    train_func,
    train_loop_config=train_loop_config,
    scaling_config=scaling_config,
    run_config=run_config,
    datasets={"train": train_ds.materialize(), "val": val_ds.materialize()},
)

2023-08-22 13:55:16,670	INFO streaming_executor.py:93 -- Executing DAG InputDataBuffer[Input] -> AllToAllOperator[Sort] -> AllToAllOperator[MapBatches(group_fn)->MapBatches(_filter_split)->RandomShuffle] -> TaskPoolMapOperator[MapBatches(preprocess)]
2023-08-22 13:55:16,670	INFO streaming_executor.py:94 -- Execution config: ExecutionOptions(resource_limits=ExecutionResources(cpu=None, gpu=None, object_store_memory=None), locality_with_output=False, preserve_order=True, actor_locality_enabled=True, verbose_progress=False)
2023-08-22 13:55:16,671	INFO streaming_executor.py:96 -- Tip: For detailed progress reporting, run `ray.data.DataContext.get_current().execution_options.verbose_progress = True`


- Sort 1:   0%|          | 0/189 [00:00<?, ?it/s]

Sort Sample 2:   0%|          | 0/189 [00:00<?, ?it/s]

Shuffle Map 3:   0%|          | 0/189 [00:00<?, ?it/s]

Shuffle Reduce 4:   0%|          | 0/189 [00:00<?, ?it/s]

- MapBatches(group_fn)->MapBatches(_filter_split)->RandomShuffle 5:   0%|          | 0/189 [00:00<?, ?it/s]

Shuffle Map 6:   0%|          | 0/189 [00:00<?, ?it/s]

Shuffle Reduce 7:   0%|          | 0/189 [00:00<?, ?it/s]

Running 0:   0%|          | 0/189 [00:00<?, ?it/s]

Sort Sample 0:   0%|          | 0/189 [00:00<?, ?it/s]

In [74]:
%%time
# Train
results = trainer.fit()

0,1
Current time:,2023-08-22 13:56:29
Running for:,00:00:46.39
Memory:,17.7/124.3 GiB

Trial name,status,loc,iter,total time (s),epoch,lr,train_loss
TorchTrainer_426bc_00000,TERMINATED,10.0.26.171:57173,10,40.5794,9,8e-05,0.395704


[2m[36m(TorchTrainer pid=57173)[0m Starting distributed worker processes: ['57260 (10.0.26.171)']
[2m[36m(RayTrainWorker pid=57260)[0m Setting up process group for: env:// [rank=0, world_size=1]
[2m[36m(SplitCoordinator pid=57364)[0m Auto configuring locality_with_output=['fb6c087a10c7e992b9aa1e1ebb6074934247092074bd578afd309a6f']
[2m[36m(RayTrainWorker pid=57260)[0m Moving model to device: cuda:0
[2m[36m(SplitCoordinator pid=57364)[0m Executing DAG InputDataBuffer[Input] -> OutputSplitter[split(1, equal=True)]
[2m[36m(SplitCoordinator pid=57364)[0m Execution config: ExecutionOptions(resource_limits=ExecutionResources(cpu=None, gpu=None, object_store_memory=2000000000.0), locality_with_output=['fb6c087a10c7e992b9aa1e1ebb6074934247092074bd578afd309a6f'], preserve_order=False, actor_locality_enabled=True, verbose_progress=False)
[2m[36m(SplitCoordinator pid=57364)[0m Tip: For detailed progress reporting, run `ray.data.DataContext.get_current().execution_options.verbo

(pid=57364) Running 0:   0%|          | 0/189 [00:00<?, ?it/s]

[2m[36m(SplitCoordinator pid=57364)[0m Executing DAG InputDataBuffer[Input] -> OutputSplitter[split(1, equal=True)]
[2m[36m(SplitCoordinator pid=57364)[0m Execution config: ExecutionOptions(resource_limits=ExecutionResources(cpu=None, gpu=None, object_store_memory=2000000000.0), locality_with_output=['fb6c087a10c7e992b9aa1e1ebb6074934247092074bd578afd309a6f'], preserve_order=False, actor_locality_enabled=True, verbose_progress=False)
[2m[36m(SplitCoordinator pid=57364)[0m Tip: For detailed progress reporting, run `ray.data.DataContext.get_current().execution_options.verbose_progress = True`


(pid=57364) Running 0:   0%|          | 0/189 [00:00<?, ?it/s]

[2m[36m(SplitCoordinator pid=57364)[0m Executing DAG InputDataBuffer[Input] -> OutputSplitter[split(1, equal=True)]
[2m[36m(SplitCoordinator pid=57364)[0m Execution config: ExecutionOptions(resource_limits=ExecutionResources(cpu=None, gpu=None, object_store_memory=2000000000.0), locality_with_output=['fb6c087a10c7e992b9aa1e1ebb6074934247092074bd578afd309a6f'], preserve_order=False, actor_locality_enabled=True, verbose_progress=False)
[2m[36m(SplitCoordinator pid=57364)[0m Tip: For detailed progress reporting, run `ray.data.DataContext.get_current().execution_options.verbose_progress = True`


(pid=57364) Running 0:   0%|          | 0/189 [00:00<?, ?it/s]

[2m[36m(SplitCoordinator pid=57364)[0m Executing DAG InputDataBuffer[Input] -> OutputSplitter[split(1, equal=True)]
[2m[36m(SplitCoordinator pid=57364)[0m Execution config: ExecutionOptions(resource_limits=ExecutionResources(cpu=None, gpu=None, object_store_memory=2000000000.0), locality_with_output=['fb6c087a10c7e992b9aa1e1ebb6074934247092074bd578afd309a6f'], preserve_order=False, actor_locality_enabled=True, verbose_progress=False)
[2m[36m(SplitCoordinator pid=57364)[0m Tip: For detailed progress reporting, run `ray.data.DataContext.get_current().execution_options.verbose_progress = True`


(pid=57364) Running 0:   0%|          | 0/189 [00:00<?, ?it/s]

[2m[36m(SplitCoordinator pid=57364)[0m Executing DAG InputDataBuffer[Input] -> OutputSplitter[split(1, equal=True)]
[2m[36m(SplitCoordinator pid=57364)[0m Execution config: ExecutionOptions(resource_limits=ExecutionResources(cpu=None, gpu=None, object_store_memory=2000000000.0), locality_with_output=['fb6c087a10c7e992b9aa1e1ebb6074934247092074bd578afd309a6f'], preserve_order=False, actor_locality_enabled=True, verbose_progress=False)
[2m[36m(SplitCoordinator pid=57364)[0m Tip: For detailed progress reporting, run `ray.data.DataContext.get_current().execution_options.verbose_progress = True`


(pid=57364) Running 0:   0%|          | 0/189 [00:00<?, ?it/s]

[2m[36m(SplitCoordinator pid=57364)[0m Executing DAG InputDataBuffer[Input] -> OutputSplitter[split(1, equal=True)]
[2m[36m(SplitCoordinator pid=57364)[0m Execution config: ExecutionOptions(resource_limits=ExecutionResources(cpu=None, gpu=None, object_store_memory=2000000000.0), locality_with_output=['fb6c087a10c7e992b9aa1e1ebb6074934247092074bd578afd309a6f'], preserve_order=False, actor_locality_enabled=True, verbose_progress=False)
[2m[36m(SplitCoordinator pid=57364)[0m Tip: For detailed progress reporting, run `ray.data.DataContext.get_current().execution_options.verbose_progress = True`


(pid=57364) Running 0:   0%|          | 0/189 [00:00<?, ?it/s]

[2m[36m(SplitCoordinator pid=57364)[0m Executing DAG InputDataBuffer[Input] -> OutputSplitter[split(1, equal=True)]
[2m[36m(SplitCoordinator pid=57364)[0m Execution config: ExecutionOptions(resource_limits=ExecutionResources(cpu=None, gpu=None, object_store_memory=2000000000.0), locality_with_output=['fb6c087a10c7e992b9aa1e1ebb6074934247092074bd578afd309a6f'], preserve_order=False, actor_locality_enabled=True, verbose_progress=False)
[2m[36m(SplitCoordinator pid=57364)[0m Tip: For detailed progress reporting, run `ray.data.DataContext.get_current().execution_options.verbose_progress = True`


(pid=57364) Running 0:   0%|          | 0/189 [00:00<?, ?it/s]

[2m[36m(SplitCoordinator pid=57364)[0m Executing DAG InputDataBuffer[Input] -> OutputSplitter[split(1, equal=True)]
[2m[36m(SplitCoordinator pid=57364)[0m Execution config: ExecutionOptions(resource_limits=ExecutionResources(cpu=None, gpu=None, object_store_memory=2000000000.0), locality_with_output=['fb6c087a10c7e992b9aa1e1ebb6074934247092074bd578afd309a6f'], preserve_order=False, actor_locality_enabled=True, verbose_progress=False)
[2m[36m(SplitCoordinator pid=57364)[0m Tip: For detailed progress reporting, run `ray.data.DataContext.get_current().execution_options.verbose_progress = True`


(pid=57364) Running 0:   0%|          | 0/189 [00:00<?, ?it/s]

[2m[36m(SplitCoordinator pid=57364)[0m Executing DAG InputDataBuffer[Input] -> OutputSplitter[split(1, equal=True)]
[2m[36m(SplitCoordinator pid=57364)[0m Execution config: ExecutionOptions(resource_limits=ExecutionResources(cpu=None, gpu=None, object_store_memory=2000000000.0), locality_with_output=['fb6c087a10c7e992b9aa1e1ebb6074934247092074bd578afd309a6f'], preserve_order=False, actor_locality_enabled=True, verbose_progress=False)
[2m[36m(SplitCoordinator pid=57364)[0m Tip: For detailed progress reporting, run `ray.data.DataContext.get_current().execution_options.verbose_progress = True`


(pid=57364) Running 0:   0%|          | 0/189 [00:00<?, ?it/s]

[2m[36m(SplitCoordinator pid=57364)[0m Executing DAG InputDataBuffer[Input] -> OutputSplitter[split(1, equal=True)]
[2m[36m(SplitCoordinator pid=57364)[0m Execution config: ExecutionOptions(resource_limits=ExecutionResources(cpu=None, gpu=None, object_store_memory=2000000000.0), locality_with_output=['fb6c087a10c7e992b9aa1e1ebb6074934247092074bd578afd309a6f'], preserve_order=False, actor_locality_enabled=True, verbose_progress=False)
[2m[36m(SplitCoordinator pid=57364)[0m Tip: For detailed progress reporting, run `ray.data.DataContext.get_current().execution_options.verbose_progress = True`


(pid=57364) Running 0:   0%|          | 0/189 [00:00<?, ?it/s]

2023-08-22 13:56:29,776	INFO tune.py:1146 -- Total run time: 46.47 seconds (46.39 seconds for the tuning loop).


CPU times: user 1.26 s, sys: 317 ms, total: 1.58 s
Wall time: 46.5 s
