## Dev Notebook for optimizing code

In [1]:
print("Hello, World!")

Hello, World!


In [2]:
import os
import sys
import torch
import time

# Add the parent directory ('pretrain') to the Python path
# This allows importing modules like config, utils, models
notebook_dir = os.getcwd()
pretrain_dir = os.path.dirname(notebook_dir)
if pretrain_dir not in sys.path:
    sys.path.append(pretrain_dir)
print(f"Added '{pretrain_dir}' to sys.path")

# Import your modules
from config import Config
from utils import dataloader, distributed, optimization, evaluation, hellaswag # Ensure hellaswag is importable if needed directly, or rely on evaluation
from models import gpt2 # Import the specific model module
import pretrain.train

print("Imports successful.")
# Verify GPU availability
print(f"PyTorch version: {torch.__version__}")
print(f"CUDA available: {torch.cuda.is_available()}")
if torch.cuda.is_available():
    print(f"Number of GPUs: {torch.cuda.device_count()}")
    for i in range(torch.cuda.device_count()):
        print(f"  GPU {i}: {torch.cuda.get_device_name(i)}")

Added '/root/MicroSwag' to sys.path


  from .autonotebook import tqdm as notebook_tqdm


Imports successful.
PyTorch version: 2.6.0+cu124
CUDA available: True
Number of GPUs: 2
  GPU 0: NVIDIA GeForce RTX 3090
  GPU 1: NVIDIA GeForce RTX 3090


In [3]:
# Load the default configuration
config = Config()
# You can override specific settings here if needed for testing, e.g.:
# config.model_training.micro_batch_size = 4
# config.system.use_compile = False # Disable compile for easier debugging if needed

print("Configuration loaded:")
print(f"  Model Type: {config.model.model_type}")
print(f"  Total Batch Size: {config.model_training.total_batch_size}")
print(f"  Micro Batch Size: {config.model_training.micro_batch_size}")
print(f"  Sequence Length: {config.model_training.sequence_length}")
print(f"  Data Root: {config.data.data_root}") # Make sure this matches your structure
print(f"  Log Dir: {config.system.log_dir}")
print(f"  Use Compile: {config.system.use_compile}")

# Ensure the data root exists relative to the pretrain directory
config.data.data_root = os.path.join(pretrain_dir, config.data.data_root)
print(f"  Absolute Data Root: {config.data.data_root}")
assert os.path.isdir(config.data.data_root), f"Data root directory not found: {config.data.data_root}"

# Ensure log dir exists relative to the pretrain directory
config.system.log_dir = os.path.join(pretrain_dir, config.system.log_dir)
print(f"  Absolute Log Dir: {config.system.log_dir}")
os.makedirs(config.system.log_dir, exist_ok=True)

Configuration loaded:
  Model Type: gpt2
  Total Batch Size: 524288
  Micro Batch Size: 64
  Sequence Length: 1024
  Data Root: edu_fineweb10B
  Log Dir: log
  Use Compile: True
  Absolute Data Root: /root/MicroSwag/edu_fineweb10B
  Absolute Log Dir: /root/MicroSwag/log


In [4]:
# In a notebook, RANK env vars aren't set, so this will default to non-DDP
dist_config = distributed.setup_distributed()

print("Distributed config:")
for key, value in dist_config.items():
    print(f"  {key}: {value}")

device = dist_config["device"]
device_type = dist_config["device_type"]
master_process = dist_config["master_process"]

# Set precision (as done in train.py)
torch.set_float32_matmul_precision(config.system.float32_matmul_precision)
print(f"\nSet float32 matmul precision to: {config.system.float32_matmul_precision}")

Using device: cuda
Distributed config:
  ddp: False
  ddp_rank: 0
  ddp_local_rank: 0
  ddp_world_size: 1
  device: cuda
  master_process: True
  device_type: cuda

Set float32 matmul precision to: high


In [5]:
print("Setting up data loaders...")

# Use process_rank 0 and num_processes 1 for non-DDP test
train_loader = dataloader.DataLoader(
    B=4,
    T=512,
    process_rank=dist_config["ddp_rank"], # Should be 0
    num_processes=dist_config["ddp_world_size"], # Should be 1
    split="train",
    data_root=config.data.data_root,
    master_process=master_process
)

val_loader = dataloader.DataLoader(
    B=config.model_training.micro_batch_size,
    T=config.model_training.sequence_length,
    process_rank=dist_config["ddp_rank"], # Should be 0
    num_processes=dist_config["ddp_world_size"], # Should be 1
    split="val",
    data_root=config.data.data_root,
    master_process=master_process
)

print("Data loaders initialized.")

# Fetch one batch from each loader
print("Fetching one train batch...")
x_train, y_train = train_loader.next_batch()
print(f"  Train batch shapes: x={x_train.shape}, y={y_train.shape}")
print(f"  Train batch types: x={x_train.dtype}, y={y_train.dtype}")


print("Fetching one validation batch...")
x_val, y_val = val_loader.next_batch()
print(f"  Val batch shapes: x={x_val.shape}, y={y_val.shape}")
print(f"  Val batch types: x={x_val.dtype}, y={y_val.dtype}")

# Check token range (optional)
print(f"  Train x min/max: {x_train.min()}, {x_train.max()}")
print(f"  Train y min/max: {y_train.min()}, {y_train.max()}")

# Test loader checkpointing (optional)
print("\nTesting loader checkpointing...")
cp = train_loader.get_loader_checkpoint()
print(f"  Initial train loader checkpoint: {cp}")
# Advance position by fetching another batch
_, _ = train_loader.next_batch()
cp_after = train_loader.get_loader_checkpoint()
print(f"  Train loader checkpoint after 1 batch: {cp_after}")
# Reset and check again
train_loader.reset()
cp_reset = train_loader.get_loader_checkpoint()
print(f"  Train loader checkpoint after reset: {cp_reset}")
# Set back to original state
train_loader.set(cp)
cp_set = train_loader.get_loader_checkpoint()
print(f"  Train loader checkpoint after set: {cp_set}")
assert cp_set['current_shard'] == cp['current_shard']
assert cp_set['current_position'] == cp['current_position']
print("  Loader checkpointing seems functional.")

Setting up data loaders...
found 99 shards for split train
found 1 shards for split val
Data loaders initialized.
Fetching one train batch...
  Train batch shapes: x=torch.Size([4, 512]), y=torch.Size([4, 512])
  Train batch types: x=torch.int64, y=torch.int64
Fetching one validation batch...
  Val batch shapes: x=torch.Size([64, 1024]), y=torch.Size([64, 1024])
  Val batch types: x=torch.int64, y=torch.int64
  Train x min/max: 0, 50256
  Train y min/max: 0, 50256

Testing loader checkpointing...
  Initial train loader checkpoint: {'current_shard': 0, 'current_position': 2048}
  Train loader checkpoint after 1 batch: {'current_shard': 0, 'current_position': 4096}
  Train loader checkpoint after reset: {'current_shard': 0, 'current_position': 0}
  Train loader checkpoint after set: {'current_shard': 0, 'current_position': 2048}
  Loader checkpointing seems functional.


In [6]:
print("Creating the model...")
# Use the specific model's creation logic if available, or direct instantiation
if config.model.model_type == "gpt2":
     # Option 1: Use helper if defined correctly in gpt2.py
     # model_instance = gpt2.create_gpt_from_config(config) # Make sure this function exists and works
     # Option 2: Directly use the class
     model_instance = gpt2.GPT(config.model_specific)
else:
     raise ValueError(f"Unsupported model type for testing: {config.model.model_type}")

print(f"Model '{config.model.model_type}' created.")
# print(model_instance) # Optional: Print model structure

# Wrap model (moves to device, applies compile if enabled)
print(f"Wrapping model (device='{device}', use_compile={config.system.use_compile})...")
# This might take a minute if compilation is enabled
model, raw_model = distributed.wrap_model_for_distributed(
    model_instance,
    device,
    dist_config["ddp"],
    dist_config["ddp_local_rank"],
    use_compile=False
)
print("Model wrapped successfully.")

# Check model device
print(f"Model device: {next(model.parameters()).device}")

Creating the model...
GPT-2 model with 124.5M parameters
Model 'gpt2' created.
Wrapping model (device='cuda', use_compile=True)...
Model wrapped successfully.
Model device: cuda:0


In [7]:
print("Creating optimizer...")
optimizer = optimization.create_optimizer(
    raw_model, # Use the unwrapped model for optimizer configuration
    config,
    device_type,
    master_process=master_process
)
print("Optimizer created.")
# print(optimizer) # Optional: Print optimizer details

print("Creating LR scheduler function...")
get_lr = optimization.get_lr_scheduler(config, optimizer)
print("LR scheduler function created.")

# Test LR scheduler for a few steps
print(f"  LR at step 0: {get_lr(0):.4e}")
print(f"  LR at step {config.model_training.warmup_steps // 2}: {get_lr(config.model_training.warmup_steps // 2):.4e}")
print(f"  LR at step {config.model_training.warmup_steps}: {get_lr(config.model_training.warmup_steps):.4e}")
print(f"  LR at step {config.model_training.max_steps}: {get_lr(config.model_training.max_steps):.4e}")

Creating optimizer...
num decayed parameter tensors: 50, with 124,354,560 parameters
num non-decayed parameter tensors: 98, with 121,344 parameters
using fused AdamW: True
Optimizer created.
Creating LR scheduler function...
LR scheduler function created.
  LR at step 0: 8.3916e-07
  LR at step 357: 3.0042e-04
  LR at step 715: 6.0000e-04
  LR at step 19073: 6.0000e-05


In [8]:
print("Testing a single training step...")
model.train() # Set model to training mode
optimizer.zero_grad()

# Get a batch and move to device
x, y = train_loader.next_batch()
x, y = x.to(device), y.to(device)
print(f"  Input shapes on device '{device}': x={x.shape}, y={y.shape}")

# Forward pass - CHANGE DTYPE HERE
t0 = time.time()
# Use float16 instead of bfloat16
with torch.autocast(device_type=device_type, dtype=torch.float16): # <--- CHANGE HERE
    logits, loss = model(x, y)
t1 = time.time()
print(f"  Forward pass successful. Loss: {loss.item():.4f} (Time: {t1-t0:.2f}s)")
print(f"  Logits shape: {logits.shape}")

# Backward pass
t0 = time.time()
# Loss needs to be float32 for backward typically, autocast handles the ops
# but the loss scaling might need float32 accumulation outside autocast
# However, PyTorch's backward usually handles this correctly with autocast.
loss.backward()
t1 = time.time()
print(f"  Backward pass successful. (Time: {t1-t0:.2f}s)")

# Gradient clipping (optional but good practice)
norm = torch.nn.utils.clip_grad_norm_(model.parameters(), config.model_training.grad_clip)
print(f"  Gradient norm before clip (if any): {norm:.4f}")

# Optimizer step
t0 = time.time()
optimizer.step()
t1 = time.time()
print(f"  Optimizer step successful. (Time: {t1-t0:.2f}s)")

print("Single training step test complete.")

Testing a single training step...
  Input shapes on device 'cuda': x=torch.Size([4, 512]), y=torch.Size([4, 512])
  Forward pass successful. Loss: 10.9247 (Time: 0.33s)
  Logits shape: torch.Size([4, 512, 50304])
  Backward pass successful. (Time: 0.05s)
  Gradient norm before clip (if any): 19.8210
  Optimizer step successful. (Time: 0.01s)
Single training step test complete.


In [9]:
print("Testing validation step logic...")
# --- This logic is adapted directly from the validate function in train.py ---
model.eval() # Set model to evaluation mode
val_loader.reset() # Reset the validation loader

val_steps = 5 # Use fewer steps for a quick test in the notebook
val_loss_accum = 0.0
ddp = dist_config["ddp"] # Get ddp flag from dist_config

t0 = time.time()
with torch.no_grad(): # Ensure no gradients are calculated
    for i in range(val_steps):
        x, y = val_loader.next_batch()
        x, y = x.to(device), y.to(device)
        # Use float16 as decided before for 3090 compatibility
        with torch.autocast(device_type=device_type, dtype=torch.float16): # <--- Use float16
            logits, loss = model(x, y)
        val_loss_accum += loss.detach() # Accumulate loss
        if master_process and i < 3: # Print first few losses
             print(f"  Validation step {i+1}/{val_steps}, Batch Loss: {loss.item():.4f}")

    # Average the accumulated loss
    val_loss_accum /= val_steps

# In a real DDP run, we would average across processes.
# Here, ddp is False, so all_reduce_mean is essentially a no-op, but we include it for completeness.
val_loss = distributed.all_reduce_mean(val_loss_accum, ddp)

t1 = time.time()
# --- End of adapted logic ---

print(f"\nValidation logic test complete (over {val_steps} steps).")
# val_loss is a tensor if not reduced, get item if it's a scalar tensor
final_val_loss = val_loss.item() if isinstance(val_loss, torch.Tensor) else val_loss
print(f"  Average Validation Loss: {final_val_loss:.4f}")
print(f"  Validation Time: {t1 - t0:.2f}s")

# Switch back to train mode potentially for subsequent cells if needed, though HellaSwag uses eval()
model.train() # Or keep as model.eval() if only doing HellaSwag next

Testing validation step logic...


OutOfMemoryError: CUDA out of memory. Tried to allocate 12.28 GiB. GPU 0 has a total capacity of 23.57 GiB of which 7.93 GiB is free. Process 2243306 has 15.63 GiB memory in use. Of the allocated memory 14.97 GiB is allocated by PyTorch, and 356.47 MiB is reserved by PyTorch but unallocated. If reserved but unallocated memory is large try setting PYTORCH_CUDA_ALLOC_CONF=expandable_segments:True to avoid fragmentation.  See documentation for Memory Management  (https://pytorch.org/docs/stable/notes/cuda.html#environment-variables)

In [None]:
# Make sure you fixed the duplicate get_most_likely_row function!
print("Testing HellaSwag evaluation...")
model.eval() # Ensure model is in eval mode

t0 = time.time()
# Note: This iterates through the *entire* HellaSwag val set (10042 examples)
# In a non-DDP setup (like this notebook), this rank will process all examples.
# This might take a few minutes.
results = evaluation.evaluate_hellaswag(
    model,
    device,
    device_type,
    dist_config["ddp_rank"],
    dist_config["ddp_world_size"],
    distributed # Pass the distributed utils module
)
t1 = time.time()

print("HellaSwag evaluation complete.")
print(f"  Accuracy: {results['accuracy']:.4f} ({results['correct']}/{results['total']})")
print(f"  HellaSwag Eval Time: {t1 - t0:.2f}s")

In [None]:
print("Cleaning up distributed context (no-op in non-DDP)...")
distributed.cleanup_distributed(dist_config["ddp"])
print("Cleanup called.")