In [1]:
import ray

In [2]:
@ray.remote
class ActorRolloutRef:
    def __init__(self, name):
        self.name = name 
    def get_name(self):
        return self.name
    
class ActorRolloutRefOg:
    def __init__(self, name):
        self.name = name 
    def get_name(self):
        return self.name

In [3]:
ray_worker = ray.remote(ActorRolloutRefOg)
obj = ray_worker.remote('Inception')
promise = obj.get_name.remote()
ray.get(promise)

2025-08-31 00:46:47,794	INFO worker.py:1747 -- Connecting to existing Ray cluster at address: 127.0.0.1:6379...
2025-08-31 00:46:47,806	INFO worker.py:1918 -- Connected to Ray cluster. View the dashboard at [1m[32mhttp://127.0.0.1:8265 [39m[22m


'Inception'

In [4]:
ray_worker = ActorRolloutRef.remote('Inception')
promise = ray_worker.get_name.remote()
ray.get(promise)

'Inception'

In [5]:
import ray

ray.init()
ray.state.available_resources_per_node()

2025-08-31 01:34:14,685	INFO worker.py:1747 -- Connecting to existing Ray cluster at address: 127.0.0.1:6379...
2025-08-31 01:34:14,704	INFO worker.py:1918 -- Connected to Ray cluster. View the dashboard at [1m[32mhttp://127.0.0.1:8265 [39m[22m


{'d9327891687774ad3e99128b4afa02d344320dcd8646990dac07d1a2': {'CPU': 8.0,
  'memory': 9132425216.0,
  'node:127.0.0.1': 1.0,
  'object_store_memory': 2147483648.0,
  'node:__internal_head__': 1.0}}

In [3]:
import time

database = [
    "Learning", "Ray",
    "Flexible", "Distributed", "Python", "for", "Machine", "Learning"
]


def retrieve(item):
    time.sleep(item / 10.)
    return item, database[item]

In [4]:
def print_runtime(input_data, start_time):
    print(f'Runtime: {time.time() - start_time:.2f} seconds, data:')
    print(*input_data, sep="\n")


start = time.time()
data = [retrieve(item) for item in range(8)]
print_runtime(data, start)

Runtime: 2.80 seconds, data:
(0, 'Learning')
(1, 'Ray')
(2, 'Flexible')
(3, 'Distributed')
(4, 'Python')
(5, 'for')
(6, 'Machine')
(7, 'Learning')


`https://docs.ray.io/en/latest/ray-core/examples/gentle_walkthrough.html`

In [5]:
import ray 

'''
With the decorator, the function retrieve_task becomes a :ref:ray-remote-functions<Ray task>_. A Ray task is a function that Ray executes on a different process from where it was called, and possibly on a different machine.
Ray is convenient to use because you can continue writing Python code, without having to significantly change your approach or programming style. Using the :func:ray.remote()<@ray.remote> decorator on the retrieve function is the intended use of decorators, and you did not modify the original code in this example.
To retrieve database entries and measure performance, you do not need to make many changes to the code. Here’s an overview of the process:
start = time.time()
'''
@ray.remote
def retrieve_task(item):
    return retrieve(item)

In [None]:
import ray
ray.init()

2025-09-02 15:01:45,514	INFO worker.py:1908 -- Started a local Ray instance. View the dashboard at [1m[32mhttp://127.0.0.1:8265 [39m[22m


0,1
Python version:,3.12.3
Ray version:,2.47.1
Dashboard:,http://127.0.0.1:8265


[33m(raylet)[0m [2025-09-02 19:58:06,706 E 161083 161083] (raylet) node_manager.cc:3193: 2 Workers (tasks / actors) killed due to memory pressure (OOM), 0 Workers crashed due to other reasons at node (ID: 3eb9113167fa495f433e967f7fb989d54914bf5af89f22841c556171, IP: 192.168.1.9) over the last time period. To see more information about the Workers killed on this node, use `ray logs raylet.out -ip 192.168.1.9`
[33m(raylet)[0m 
[33m(raylet)[0m Refer to the documentation on how to address the out of memory issue: https://docs.ray.io/en/latest/ray-core/scheduling/ray-oom-prevention.html. Consider provisioning more memory on this node or reducing task parallelism by requesting more CPUs per task. To adjust the kill threshold, set the environment variable `RAY_memory_usage_threshold` when starting Ray. To disable worker killing, set the environment variable `RAY_memory_monitor_refresh_ms` to zero.
[33m(raylet)[0m 
[33m(raylet)[0m [2025-09-02 19:59:06,707 E 161083 161083] (raylet) no

[33m(raylet)[0m The autoscaler failed with the following error:
Terminated with signal 15
  File "/home/tmittra/ai/lib/python3.12/site-packages/ray/autoscaler/_private/monitor.py", line 747, in <module>
    monitor.run()
  File "/home/tmittra/ai/lib/python3.12/site-packages/ray/autoscaler/_private/monitor.py", line 604, in run
    self._run()
  File "/home/tmittra/ai/lib/python3.12/site-packages/ray/autoscaler/_private/monitor.py", line 459, in _run
    time.sleep(AUTOSCALER_UPDATE_INTERVAL_S)



In [7]:
start = time.time()
object_references = [
    retrieve_task.remote(item) for item in range(8)
]
data = ray.get(object_references)
print_runtime(data, start)

Runtime: 1.30 seconds, data:
(0, 'Learning')
(1, 'Ray')
(2, 'Flexible')
(3, 'Distributed')
(4, 'Python')
(5, 'for')
(6, 'Machine')
(7, 'Learning')


In [8]:
@ray.remote 
def retrieve_task(item, db):
    time.sleep(item / 10.)
    return item, db[item]

'''
 In this scenario the database is only defined on the driver, but the worker processes need access 
 to it to run the retrieve task. Ray’s solution for sharing objects between the driver and workers 
 or between workers is to use the ray.put function to place the data into Ray’s distributed object store. 
 In the retrieve_task definition, you can add a db argument to pass later as the db_object_ref object.
'''
# **db_object_ref** just a reference (a pointer/ID) to that object in the store, not the data itself.
# This allows any worker to fetch the object from the store if they are given the reference.
# Ray workers are isolated processes. They don’t automatically “see” the variables defined in the driver (your main script).
db_object_ref = ray.put(database)
start = time.time()
object_references = [
    retrieve_task.remote(item, db=db_object_ref) for item in range(8)
]
data = ray.get(object_references)
print_runtime(data, start)


Runtime: 0.77 seconds, data:
(0, 'Learning')
(1, 'Ray')
(2, 'Flexible')
(3, 'Distributed')
(4, 'Python')
(5, 'for')
(6, 'Machine')
(7, 'Learning')


In [13]:
start = time.time()
object_references = [
    retrieve_task.remote(item, db=db_object_ref) for item in range(8)
]

all_data = []
while len(object_references) > 0:
    '''
    finished: list of object refs that completed within 0.1 seconds.
    object_references: list of the remaining (still pending) ones.
    '''
    finished, object_references = ray.wait(
        object_references,
        timeout=0.1, 
    )
    data = ray.get(finished)
    print_runtime(data, start)
    all_data.extend(data)

print_runtime(all_data, start)

Runtime: 0.01 seconds, data:
(0, 'Learning')
Runtime: 0.11 seconds, data:
(1, 'Ray')
Runtime: 0.21 seconds, data:

Runtime: 0.21 seconds, data:
(2, 'Flexible')
Runtime: 0.31 seconds, data:

Runtime: 0.31 seconds, data:
(3, 'Distributed')
Runtime: 0.41 seconds, data:
(4, 'Python')
Runtime: 0.51 seconds, data:

Runtime: 0.51 seconds, data:
(5, 'for')
Runtime: 0.61 seconds, data:
(6, 'Machine')
Runtime: 0.71 seconds, data:
(7, 'Learning')
Runtime: 0.71 seconds, data:
(0, 'Learning')
(1, 'Ray')
(2, 'Flexible')
(3, 'Distributed')
(4, 'Python')
(5, 'for')
(6, 'Machine')
(7, 'Learning')


In [16]:
@ray.remote
def followup_task(data):
    index, item = data
    followup = retrieve(index + 1)
    return data, followup

start = time.time()
object_references = [
    retrieve_task.remote(item, db=db_object_ref) for item in [0, 2, 4, 6]
]
followup_references = [
    followup_task.remote(ref) for ref in object_references
]
data = ray.get(followup_references)
print_runtime(data, start)

Runtime: 1.33 seconds, data:
((0, 'Learning'), (1, 'Ray'))
((2, 'Flexible'), (3, 'Distributed'))
((4, 'Python'), (5, 'for'))
((6, 'Machine'), (7, 'Learning'))


In [None]:
'''
The DataTracker class becomes an actor when you give it the ray.remote decorator. 
This actor is capable of tracking state, such as a count, 
and its methods are Ray actor tasks that you can invoke in the same way as functions using .remote(). 
Modify the retrieve_task to incorporate this actor.
'''
@ray.remote
class DataTracker:
    def __init__(self):
        self._count = 0
    def increment(self):
        self._count += 1
    def get_count(self):
        return self._count

@ray.remote 
def retrieve_task(item, db, tracker):
    time.sleep(item / 10.)
    result = retrieve(item)
    tracker.increment.remote()
    return result

tracker = DataTracker.remote()
start = time.time()
object_references = [
    retrieve_task.remote(item, db_object_ref, tracker) for item in range(8)
] 
data = ray.get(object_references)
print(ray.get(tracker.get_count.remote()))

8


[2025-09-02 19:59:49,980 E 160239 161195] core_worker.cc:925: :info_message: Attempting to recover 1 lost objects by resubmitting their tasks or setting a new primary location from existing copies. To disable object reconstruction, set @ray.remote(max_retries=0).


: 

## Map Reduce

## Verl APIs

## Tutorials

In [2]:
import string 
import random 

characters = string.ascii_letters + string.digits
"".join([random.choice(characters) for _ in range(10)])

'gLuIpXbqR7'

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
import torch.distributed as dist
from torch.distributed.fsdp import FullyShardedDataParallel as FSDP
from torch.distributed.fsdp.fully_sharded_data_parallel import (
    CPUOffload,
    BackwardPrefetch,
)
from torch.distributed.fsdp.wrap import (
    size_based_auto_wrap_policy,
    enable_wrap,
    wrap,
)
from torch.utils.data import DataLoader, DistributedSampler
import os

# Simple transformer-like model for demonstration
class TransformerBlock(nn.Module):
    def __init__(self, d_model, n_heads, d_ff):
        super().__init__()
        self.attention = nn.MultiheadAttention(d_model, n_heads, batch_first=True)
        self.norm1 = nn.LayerNorm(d_model)
        self.norm2 = nn.LayerNorm(d_model)
        self.feed_forward = nn.Sequential(
            nn.Linear(d_model, d_ff),
            nn.ReLU(),
            nn.Linear(d_ff, d_model)
        )
        
    def forward(self, x):
        # Self-attention with residual connection
        attn_out, _ = self.attention(x, x, x)
        x = self.norm1(x + attn_out)
        
        # Feed-forward with residual connection
        ff_out = self.feed_forward(x)
        x = self.norm2(x + ff_out)
        
        return x

class LargeTransformer(nn.Module):
    def __init__(self, vocab_size, d_model, n_heads, d_ff, n_layers, seq_len):
        super().__init__()
        self.embedding = nn.Embedding(vocab_size, d_model)
        self.pos_encoding = nn.Parameter(torch.randn(1, seq_len, d_model))
        
        self.layers = nn.ModuleList([
            TransformerBlock(d_model, n_heads, d_ff) 
            for _ in range(n_layers)
        ])
        
        self.ln_f = nn.LayerNorm(d_model)
        self.head = nn.Linear(d_model, vocab_size)
        
    def forward(self, x):
        # Token + positional embeddings
        x = self.embedding(x) + self.pos_encoding[:, :x.size(1), :]
        
        # Pass through transformer blocks
        for layer in self.layers:
            x = layer(x)
            
        # Final layer norm and output projection
        x = self.ln_f(x)
        return self.head(x)

# Dummy dataset for demonstration
class DummyDataset(torch.utils.data.Dataset):
    def __init__(self, vocab_size, seq_len, num_samples):
        self.vocab_size = vocab_size
        self.seq_len = seq_len
        self.num_samples = num_samples
        
    def __len__(self):
        return self.num_samples
    
    def __getitem__(self, idx):
        # Generate random sequences
        input_ids = torch.randint(0, self.vocab_size, (self.seq_len,))
        # For language modeling, labels are input_ids shifted by 1
        labels = torch.roll(input_ids, -1)
        return input_ids, labels

def setup_distributed():
    """Initialize distributed training"""
    # Initialize the process group
    dist.init_process_group(backend="nccl")
    
    # Set the device for this process
    local_rank = int(os.environ["LOCAL_RANK"])
    torch.cuda.set_device(local_rank)
    
    return local_rank

def cleanup_distributed():
    """Clean up distributed training"""
    dist.destroy_process_group()

def create_fsdp_model(model, auto_wrap_policy=None):
    """Wrap model with FSDP"""
    
    # FSDP configuration
    fsdp_config = {
        # Offload parameters to CPU when not in use (saves GPU memory)
        "cpu_offload": CPUOffload(offload_params=True),
        
        # Overlap backward pass computation with communication
        "backward_prefetch": BackwardPrefetch.BACKWARD_PRE,
        
        # Mixed precision configuration
        "mixed_precision": None,  # Can add MixedPrecision policy here
        
        # Auto-wrap policy for automatic sharding of submodules
        "auto_wrap_policy": auto_wrap_policy,
        
        # Ignore modules that shouldn't be sharded
        "ignored_modules": [],
    }
    
    # Wrap the model with FSDP
    model = FSDP(model, **fsdp_config)
    
    return model

def train_step(model, optimizer, data_loader, device, epoch):
    """Single training step"""
    model.train()
    total_loss = 0.0
    num_batches = 0
    
    for batch_idx, (input_ids, labels) in enumerate(data_loader):
        input_ids = input_ids.to(device)
        labels = labels.to(device)
        
        # Zero gradients
        optimizer.zero_grad()
        
        # Forward pass
        logits = model(input_ids)
        
        # Compute loss (cross-entropy for language modeling)
        loss = nn.CrossEntropyLoss()(
            logits.view(-1, logits.size(-1)), 
            labels.view(-1)
        )
        
        # Backward pass
        loss.backward()
        
        # Optimizer step
        optimizer.step()
        
        total_loss += loss.item()
        num_batches += 1
        
        # Print progress
        if batch_idx % 10 == 0:
            print(f"Epoch {epoch}, Batch {batch_idx}, Loss: {loss.item():.4f}")
    
    return total_loss / num_batches

def main():
    # Setup distributed training
    local_rank = setup_distributed()
    device = torch.device(f"cuda:{local_rank}")
    
    # Model hyperparameters
    vocab_size = 50000
    d_model = 1024
    n_heads = 16
    d_ff = 4096
    n_layers = 24  # Large model with 24 layers
    seq_len = 512
    batch_size = 4
    learning_rate = 1e-4
    num_epochs = 5
    
    print(f"Creating model on rank {dist.get_rank()}")
    
    # Create model
    model = LargeTransformer(vocab_size, d_model, n_heads, d_ff, n_layers, seq_len)
    
    # Define auto-wrap policy (wrap modules with >100M parameters)
    auto_wrap_policy = size_based_auto_wrap_policy(min_num_params=100_000_000)
    
    # Alternative: manually wrap specific modules
    # with enable_wrap(wrapper_cls=FSDP):
    #     for layer in model.layers:
    #         layer = wrap(layer)
    
    # Wrap model with FSDP
    model = create_fsdp_model(model, auto_wrap_policy)
    model = model.to(device)
    
    # Create optimizer
    optimizer = optim.AdamW(model.parameters(), lr=learning_rate)
    
    # Create dataset and dataloader
    dataset = DummyDataset(vocab_size, seq_len, num_samples=1000)
    sampler = DistributedSampler(dataset)
    data_loader = DataLoader(
        dataset, 
        batch_size=batch_size, 
        sampler=sampler,
        num_workers=2
    )
    
    print(f"Starting training on rank {dist.get_rank()}")
    
    # Training loop
    for epoch in range(num_epochs):
        sampler.set_epoch(epoch)  # Important for proper shuffling
        
        avg_loss = train_step(model, optimizer, data_loader, device, epoch)
        
        # Print epoch results (only on rank 0)
        if dist.get_rank() == 0:
            print(f"Epoch {epoch + 1}/{num_epochs}, Average Loss: {avg_loss:.4f}")
    
    # Save model (only on rank 0)
    if dist.get_rank() == 0:
        # For FSDP, use state_dict_type context manager for proper saving
        from torch.distributed.fsdp import StateDictType, FullStateDictConfig
        
        save_policy = FullStateDictConfig(offload_to_cpu=True, rank0_only=True)
        with FSDP.state_dict_type(model, StateDictType.FULL_STATE_DICT, save_policy):
            state_dict = model.state_dict()
            torch.save({
                'model_state_dict': state_dict,
                'optimizer_state_dict': optimizer.state_dict(),
                'epoch': num_epochs,
            }, 'fsdp_checkpoint.pt')
        
        print("Model saved successfully!")
    
    # Clean up
    cleanup_distributed()

# Script for launching distributed training
def launch_script():
    """
    Launch this script with torchrun:
    
    torchrun --nproc_per_node=4 --nnodes=1 fsdp_example.py
    
    Or with multiple nodes:
    torchrun --nproc_per_node=4 --nnodes=2 --node_rank=0 --master_addr="192.168.1.1" --master_port=12355 fsdp_example.py
    """
    pass

if __name__ == "__main__":
    main()

# Additional utility functions for FSDP

def get_model_size(model):
    """Calculate model size in parameters"""
    total_params = sum(p.numel() for p in model.parameters())
    trainable_params = sum(p.numel() for p in model.parameters() if p.requires_grad)
    
    print(f"Total parameters: {total_params:,}")
    print(f"Trainable parameters: {trainable_params:,}")
    print(f"Model size: {total_params * 4 / (1024**3):.2f} GB (float32)")
    
    return total_params

def print_fsdp_config(model):
    """Print FSDP configuration information"""
    if hasattr(model, '_fsdp_wrapped_module'):
        print("FSDP Configuration:")
        print(f"CPU Offload: {model.cpu_offload}")
        print(f"Backward Prefetch: {model.backward_prefetch}")
        print(f"Mixed Precision: {model.mixed_precision}")
        
def memory_summary():
    """Print GPU memory usage"""
    if torch.cuda.is_available():
        print(f"GPU Memory Allocated: {torch.cuda.memory_allocated() / (1024**3):.2f} GB")
        print(f"GPU Memory Cached: {torch.cuda.memory_reserved() / (1024**3):.2f} GB")
        print(f"GPU Memory Free: {(torch.cuda.get_device_properties(0).total_memory - torch.cuda.memory_allocated()) / (1024**3):.2f} GB")

# Reward Function

**1. Creation of dummy response for the reward function.**

**2. Run this after preparing the dataset.**

In [28]:
import pandas as pd 
import numpy as np 
import random

input_path = '../data/grid_test.parquet'
output_path = 'grid_test_resp'

In [29]:
df = pd.read_parquet(input_path)
df.head()

Unnamed: 0,prompt,reward_model
0,[{'content': 'Several of Mrs. Albertson's U. ...,{'ground_truth': '5 minutes | Georgia | Jackso...
1,[{'content': 'The Makombo Reserve rescues and ...,{'ground_truth': '10 feet | Morutana | Botswan...
2,[{'content': 'The Abbeville K-9 Academy trains...,{'ground_truth': 'March | Hercules | Ramsey | ...
3,[{'content': 'Watson County Hospital released ...,{'ground_truth': '12:01am | Maxine | Vaughan 1...
4,[{'content': 'Help Ramon figure out his new se...,{'ground_truth': '1st | 114 | Morton | Biology...


In [30]:
def beautify(solution):
    half_solution = []
    for row in solution:
        half_solution.append(' | '.join(row))
    return '\n'.join(half_solution)

In [31]:
from copy import deepcopy 
NO_OF_RESPONSES = 5 
data_sources = 'prompt'
reward_model_key = 'reward_model'
responses = 'responses'

mega_final_solution = []
for idx, row in df.iterrows():
    parse_solution = []
    solution = row[reward_model_key]['ground_truth']
    solution_rows = solution.split('\n')
    for solution_row in solution_rows:
        element = solution_row.split('|')
        parse_solution.append([x.strip().lower() for x in element])
    rsz, csz = len(parse_solution), len(parse_solution[0])
    responses = []
    for index in range(NO_OF_RESPONSES):
        responses.append(deepcopy(parse_solution))
        idx, jdx = random.randint(0, rsz-1), random.randint(0, rsz-1)
        feature_idx = random.randint(0, csz-1)
        responses[-1][idx][feature_idx], responses[-1][jdx][feature_idx] = responses[-1][jdx][feature_idx], \
            responses[-1][idx][feature_idx]
    # print(f'ORIGINAL:')
    # print(beautify(parse_solution))
    # print()
    final_responses = []
    for index, response in enumerate(responses):
        # print(f'SOLUTION {index}')
        # print(beautify(response))
        # print()
        final_responses.append(beautify(response))
    # print(f'-x'*50)
    mega_final_solution.append(final_responses)
mega_final_solution[0]

['5 minutes | georgia | jackson | d\n8 minutes | chris | lincoln | c-\n11 minutes | peggy | adams | b-\n14 minutes | yvonne | nixon | b+',
 '5 minutes | chris | jackson | d\n8 minutes | georgia | lincoln | c-\n11 minutes | peggy | adams | b-\n14 minutes | yvonne | nixon | b+',
 '5 minutes | georgia | jackson | d\n8 minutes | chris | lincoln | c-\n11 minutes | peggy | adams | b-\n14 minutes | yvonne | nixon | b+',
 '14 minutes | georgia | jackson | d\n8 minutes | chris | lincoln | c-\n11 minutes | peggy | adams | b-\n5 minutes | yvonne | nixon | b+',
 '5 minutes | georgia | lincoln | d\n8 minutes | chris | jackson | c-\n11 minutes | peggy | adams | b-\n14 minutes | yvonne | nixon | b+']

In [32]:
df['responses'] = mega_final_solution
df.iloc[0]

prompt          [{'content': 'Several of Mrs.  Albertson's U. ...
reward_model    {'ground_truth': '5 minutes | Georgia | Jackso...
responses       [5 minutes | georgia | jackson | d\n8 minutes ...
Name: 0, dtype: object

In [33]:
df.to_parquet(output_path + '.parquet')
df.to_csv(output_path + '.csv')

In [34]:
!pwd

/Users/tmittra/verl_x/tm_tutorials


huggingface/tokenizers: The current process just got forked, after parallelism has already been used. Disabling parallelism to avoid deadlocks...
	- Avoid using `tokenizers` before the fork if possible
	- Explicitly set the environment variable TOKENIZERS_PARALLELISM=(true | false)


In [None]:
'''
1. from parquet generate responses using LLM.
2. Use GRIDRewardManger to get the reward. 
    - Implement compute_score(prompt, response_list, ground_truth)
'''
print('--')

--


### Implementing compute_score

In [58]:
import pandas as pd 
import os 

DATA_PATH = os.path.expanduser("~") + "/verl_x/data/aime-2024.parquet"
df_aime = pd.read_parquet(DATA_PATH)
df_aime.to_csv('aime-2024.csv')
df_aime.head()

Unnamed: 0,data_source,prompt,ability,reward_model,extra_info
7,math_dapo,[{'content': 'Solve the following math problem...,MATH,"{'ground_truth': '540', 'style': 'rule-lightev...","{'index': 2, 'raw_problem': 'Find the largest ..."
9,math_dapo,[{'content': 'Solve the following math problem...,MATH,"{'ground_truth': '204', 'style': 'rule-lightev...","{'index': 25, 'raw_problem': 'Every morning Ay..."
11,math_dapo,[{'content': 'Solve the following math problem...,MATH,"{'ground_truth': '721', 'style': 'rule-lightev...","{'index': 14, 'raw_problem': 'Let $\mathcal{B}..."
27,math_dapo,[{'content': 'Solve the following math problem...,MATH,"{'ground_truth': '236', 'style': 'rule-lightev...","{'index': 26, 'raw_problem': 'A list of positi..."
28,math_dapo,[{'content': 'Solve the following math problem...,MATH,"{'ground_truth': '809', 'style': 'rule-lightev...","{'index': 9, 'raw_problem': 'Alice and Bob pla..."


In [None]:
import os 
from transformers import (
    AutoModelForCausalLM,
    AutoTokenizer
)

MODEL_PATH = os.path.expanduser("~") + "/models/Qwen2-1.5B-Instruct"
'''
Feature for automatically initiating the model skeleton and loading and dispatching the 
model weights across all available devices, starting with the fastest device (GPU)
'''

'''
Offloading is a technique used to run models too large to fit entirely in GPU or RAM, 
by storing parts of the model (or all of it) on:

1. Disk (slowest)
2. CPU memory (medium)
3. GPU memory (fastest)

Hugging Face uses accelerate's offloading features to help with this.
'''
model = AutoModelForCausalLM.from_pretrained(f"{MODEL_PATH}", 
                                            device_map = "auto")
tokenizer = AutoTokenizer.from_pretrained(f'{MODEL_PATH}', 
                                        truncation = True, \
                                        padding = True, \
                                        padding_side="left")
print(model.config)

Some parameters are on the meta device because they were offloaded to the disk.


Qwen2Config {
  "architectures": [
    "Qwen2ForCausalLM"
  ],
  "attention_dropout": 0.0,
  "bos_token_id": 151643,
  "eos_token_id": 151645,
  "hidden_act": "silu",
  "hidden_size": 1536,
  "initializer_range": 0.02,
  "intermediate_size": 8960,
  "layer_types": [
    "full_attention",
    "full_attention",
    "full_attention",
    "full_attention",
    "full_attention",
    "full_attention",
    "full_attention",
    "full_attention",
    "full_attention",
    "full_attention",
    "full_attention",
    "full_attention",
    "full_attention",
    "full_attention",
    "full_attention",
    "full_attention",
    "full_attention",
    "full_attention",
    "full_attention",
    "full_attention",
    "full_attention",
    "full_attention",
    "full_attention",
    "full_attention",
    "full_attention",
    "full_attention",
    "full_attention",
    "full_attention"
  ],
  "max_position_embeddings": 32768,
  "max_window_layers": 28,
  "model_type": "qwen2",
  "num_attention_heads": 

In [2]:
import pandas as pd 
DATA_PATH = os.path.expanduser("~") + "/verl_x/data/grid_test.parquet"

df = pd.read_parquet(DATA_PATH)
df.head()

Unnamed: 0,prompt,reward_model
0,[{'content': 'Several of Mrs. Albertson's U. ...,{'ground_truth': '5 minutes | Georgia | Jackso...
1,[{'content': 'The Makombo Reserve rescues and ...,{'ground_truth': '10 feet | Morutana | Botswan...
2,[{'content': 'The Abbeville K-9 Academy trains...,{'ground_truth': 'March | Hercules | Ramsey | ...
3,[{'content': 'Watson County Hospital released ...,{'ground_truth': '12:01am | Maxine | Vaughan 1...
4,[{'content': 'Help Ramon figure out his new se...,{'ground_truth': '1st | 114 | Morton | Biology...


In [10]:
from langchain_core.prompts import ChatPromptTemplate
print(f'DataLen: {df.shape[0]}')
prompt = ChatPromptTemplate.from_messages(
    [
        ("system", """You are an expert who solves grid puzzles and you should think before answering a question.
Finally print the answer in this format.
Stop immediately after generating the solution.
"""),
        ("human", "{query}")
    ]
)

'''
But ChatPromptTemplate is not a runnable by default. 
You should call .format_messages() or .format() to render it, not .invoke()
'''
for idx, row in df.iterrows():
    sentence = prompt.format(query=row['prompt'][0]['content'])
    model_inputs = tokenizer(sentence, return_tensors="pt")
    response = model.generate(**model_inputs)
    human_response = tokenizer.batch_decode(response, skip_special_tokens=True)[0]
    print(human_response)
    break

DataLen: 54




System: You are an expert who solves grid puzzles and you should think before answering a question.
Finally print the answer in this format.
Stop immediately after generating the solution.

Human: Several of Mrs.  Albertson's U. S.  History students each gave an oral presentation today on a different American President. Using only the clues below, match the lengths to the options from names, presidents, and grades. Remember, as with all grid-based logic puzzles, no option in any category will ever be used more than once.

lengths : 5 minutes, 8 minutes, 11 minutes, 14 minutes.
names : Chris, Georgia, Peggy, Yvonne.
presidents : Adams, Jackson, Lincoln, Nixon.
grades : B+, B-, C-, D.

Clues:
1. Yvonne got the B+.
2. Of Peggy and the presenter who spoke for 8 minutes, one got the C- and the other talked about President Adams.
3. Of the presenter who got the B+ and the student who gave the presentation on President Lincoln, one was Chris and the other spoke for 14 minutes.
4. The student 

## compute_fn

In [33]:
prompt = """[{'content': "Several of Mrs. Albertson's U. S.  History students each gave an oral presentation today on a different American President. Using only the clues below, match the lengths to the options from names, presidents, and grades. Remember, as with all grid-based logic puzzles, no option in any category will ever be used more than once.\n\nlengths : 5 minutes, 8 minutes, 11 minutes, 14 minutes.\nnames : Chris, Georgia, Peggy, Yvonne.\npresidents : Adams, Jackson, Lincoln, Nixon.\ngrades : B+, B-, C-, D.\n\nClues:\n1. Yvonne got the B+.\n2. Of Peggy and the presenter who spoke for 8 minutes, one got the C- and the other talked about President Adams.\n3. Of the presenter who got the B+ and the student who gave the presentation on President Lincoln, one was Chris and the other spoke for 14 minutes.\n4. The student who got the B- spoke for a somewhat longer time than the presenter who gave the presentation on President Jackson.\n5. The student who spoke for 8 minutes was either Chris or Yvonne.\n\nWhile answering use the following format:\nStep-by-step solution:\nYour steps showing how you are solving the puzzle\nFinal Answer:\nFill the following table to show your final answer.\n5 minutes | correct option from names | correct option from presidents | correct option from grades\n8 minutes | correct option from names | correct option from presidents | correct option from grades\n11 minutes | correct option from names | correct option from presidents | correct option from grades\n14 minutes | correct option from names | correct option from presidents | correct option from grades\n", 'role': 'user'}]"""
print(prompt)
print(f'-.' * 100)

prediction = 'Step-by-step solution:\n1. Chris spoke for 5 minutes.\n2. Peggy gave the presentation on President Adams.\n3. The student who spoke for 8 minutes is Yvonne, so the presenter who gave the presentation on President Jackson must have spoken for 8 minutes.\n4. Chris gave the presentation on President Lincoln.\n5. The student who spoke for 8 minutes is Yvonne, so the presenter who gave the presentation on President Adams must have spoken for 8 minutes.\n6. Peggy gave the presentation on President Adams.\n7. The student who spoke for 11 minutes is Georgia, so the presenter who gave the presentation on President Jackson must have spoken for 11 minutes.\n8. Yvonne gave the presentation on President Lincoln.\n9. The student who gave the presentation on President Nixon must have spoken for 14 minutes.\n10. The presenter who gave the presentation on President Adams must have spoken for 5 minutes.\n\nFinal Answer:\n| 5 minutes | Chris | Adams | B+ |\n| 8 minutes | Yvonne | Jackson | C- |\n| 11 minutes | Georgia | Lincoln | B- |\n| 14 minutes | Peggy | Nixon | D |'
print(prediction)
print(f'-.' * 100)

solution ='5 minutes | Georgia | Jackson | D\n8 minutes | Chris | Lincoln | C-\n11 minutes | Peggy | Adams | B-\n14 minutes | Yvonne | Nixon | B+'
print(solution)
print(f'-.' * 100)

[{'content': "Several of Mrs. Albertson's U. S.  History students each gave an oral presentation today on a different American President. Using only the clues below, match the lengths to the options from names, presidents, and grades. Remember, as with all grid-based logic puzzles, no option in any category will ever be used more than once.

lengths : 5 minutes, 8 minutes, 11 minutes, 14 minutes.
names : Chris, Georgia, Peggy, Yvonne.
presidents : Adams, Jackson, Lincoln, Nixon.
grades : B+, B-, C-, D.

Clues:
1. Yvonne got the B+.
2. Of Peggy and the presenter who spoke for 8 minutes, one got the C- and the other talked about President Adams.
3. Of the presenter who got the B+ and the student who gave the presentation on President Lincoln, one was Chris and the other spoke for 14 minutes.
4. The student who got the B- spoke for a somewhat longer time than the presenter who gave the presentation on President Jackson.
5. The student who spoke for 8 minutes was either Chris or Yvonne.

W

In [41]:
# Constants for normalization
SUBSTITUTIONS = [
    ("an ", ""),
    ("a ", ""),
    (".$", "$"),
    ("\\$", ""),
    (r"\ ", ""),
    (" ", ""),
    ("mbox", "text"),
    (",\\text{and}", ","),
    ("\\text{and}", ","),
    ("\\text{m}", "\\text{}"),
]

REMOVED_EXPRESSIONS = [
    "square",
    "ways",
    "integers",
    "dollars",
    "mph",
    "inches",
    "hours",
    "km",
    "units",
    "\\ldots",
    "sue",
    "points",
    "feet",
    "minutes",
    "digits",
    "cents",
    "degrees",
    "cm",
    "gm",
    "pounds",
    "meters",
    "meals",
    "edges",
    "students",
    "childrentickets",
    "multiples",
    "\\text{s}",
    "\\text{.}",
    "\\text{\ns}",
    "\\text{}^2",
    "\\text{}^3",
    "\\text{\n}",
    "\\text{}",
    r"\mathrm{th}",
    r"^\circ",
    r"^{\circ}",
    r"\;",
    r",\!",
    "{,}",
    '"',
    "\\dots",
]

In [53]:
def strict(pred_matrix, sol_matrix):
    """All rows should match exactly."""
    match = 0
    for a_sol in sol_matrix:
        for p_sol in pred_matrix:
            if len(a_sol) != len(p_sol): return 0 
            cnt = 0 
            for a, p in zip(a_sol, p_sol):
                if a == p: cnt += 1
            if len(a_sol) == cnt: 
                match += 1
    return 1.0 if len(sol_matrix) == match else 0.0 

def relax(pred_matrix, sol_matrix):
    """Partial row matching is allowed."""
    match = 0
    for a_sol in sol_matrix:
        for p_sol in pred_matrix:
            if len(a_sol) != len(p_sol): return 0 
            cnt = 0 
            for a, p in zip(a_sol, p_sol):
                if a == p: cnt += 1
            if len(a_sol) == cnt: 
                match += 1
    return match/len(sol_matrix)

In [50]:
import re 

def grid_map(arr):
    matrix = []
    for row in arr:
        row = row.split('|')
        temp_row = []
        for x in row:
            if x: temp_row.append(x)
        row = [x.strip() for x in temp_row]
        # Apply substitutions and removals
        for idx, element in enumerate(row):
            for before, after in SUBSTITUTIONS:
                element = element.replace(before, after)
            for expr in REMOVED_EXPRESSIONS:
                element = element.replace(expr, "")
            row[idx] = element
        matrix.append(row)
    return matrix

def beautify_print(arr):
    for x in arr:
        print(x)
    print(f'-.'*25)

solution_lower = solution.lower()
solution_arr, prediction_arr = [], []
for x in solution_lower.split('\n'):
    solution_arr.append(x)
# beautify_print(solution_arr)
solution_arr_x = grid_map(solution_arr)
print(grid_map(solution_arr))

prediction_lower = prediction.lower()
repeat = len(solution_arr)
match_prediction = re.search(r'final answer\s*:?' + r'\s*(.*)'*repeat, prediction_lower)
if match_prediction:
    for idx in range(repeat):
        prediction_arr.append(match_prediction.group(idx+1))
# beautify_print(prediction_arr)
prediction_arr_x = grid_map(prediction_arr)
print(grid_map(prediction_arr))

[['5', 'georgia', 'jackson', 'd'], ['8', 'chris', 'lincoln', 'c-'], ['11', 'peggy', 'adams', 'b-'], ['14', 'yvonne', 'nixon', 'b+']]
[['5', 'chris', 'adams', 'b+'], ['8', 'yvonne', 'jackson', 'c-'], ['11', 'georgia', 'lincoln', 'b-'], ['14', 'peggy', 'nixon', 'd']]


In [52]:
strict(solution_arr_x, solution_arr_x), strict(solution_arr_x, prediction_arr_x)

(1.0, 0.0)

In [55]:
from copy import deepcopy

hybrid_prediction = deepcopy(prediction_arr_x)
hybrid_prediction[0] = solution_arr_x[0].copy()
hybrid_prediction[-1] = solution_arr_x[-1].copy()

relax(solution_arr_x, solution_arr_x), \
    relax(solution_arr_x, prediction_arr_x), \
        relax(solution_arr_x, hybrid_prediction)


(1.0, 0.0, 0.5)

### validity

In [39]:
'''If prediction has less number of rows, group(x) will return nothing ~ OK'''
'''If prediction has more number of rows, group(x) will return something ~ OK'''

match_prediction = re.search(r'final answer\s*:?' + r'\s*(.*)'*(repeat+1), prediction_lower)
if match_prediction:
    print(match_prediction.group(1))
    row = match_prediction.group(1)
    row = row.split('|')
    temp_row = []
    for x in row:
        if x: temp_row.append(x)
    row = [x.strip() for x in temp_row]
    print(row)

| 5 minutes | chris | adams | b+ |
['5 minutes', 'chris', 'adams', 'b+']
