# LLM Training Pipeline

This notebook provides an interactive interface for training a 7B parameter language model through the complete pipeline:

1. **Data Preparation** - Download, clean, and tokenize training data
2. **Pretraining** - Train on large text corpora with curriculum learning
3. **SFT** - Supervised fine-tuning on instruction-response pairs
4. **DPO** - Direct preference optimization for alignment
5. **LoRA** - Optional domain-specific fine-tuning

## Requirements

- **GPU**: NVIDIA A100 80GB recommended, H100 for FP8 support
- **Colab**: Pro/Pro+ recommended for longer training sessions
- **Storage**: Google Drive for persistent checkpoints (recommended)

---
## Step 0: Environment Setup

Run these cells first to set up the training environment. Choose your deployment method below.

In [None]:
#@title ### 0.1 Detect Environment & Mount Google Drive
#@markdown Run this cell to detect if running in Colab and mount Google Drive for persistent storage.

import os
import sys

# Detect if running in Google Colab
IN_COLAB = 'google.colab' in sys.modules

if IN_COLAB:
    print("Running in Google Colab")
    
    # Mount Google Drive for persistent storage
    from google.colab import drive
    drive.mount('/content/drive')
    
    # Set up persistent directories on Google Drive
    DRIVE_BASE = "/content/drive/MyDrive/llm-training-pipeline"
    os.makedirs(DRIVE_BASE, exist_ok=True)
    os.makedirs(f"{DRIVE_BASE}/checkpoints", exist_ok=True)
    os.makedirs(f"{DRIVE_BASE}/data", exist_ok=True)
    
    print(f"Google Drive mounted. Persistent storage at: {DRIVE_BASE}")
else:
    print("Running locally (not in Colab)")
    DRIVE_BASE = None

In [None]:
#@title ### 0.2 Clone Repository & Install Dependencies
#@markdown Choose your repository source and install dependencies.

#@markdown ---
#@markdown **Repository Settings:**
REPO_URL = "https://github.com/rmarnold/llm-training-pipeline.git"  #@param {type:"string"}
BRANCH = "main"  #@param {type:"string"}

import os
import subprocess

# Clone or update repository
REPO_DIR = "/content/llm-training-pipeline"

if IN_COLAB:
    if os.path.exists(REPO_DIR):
        print(f"Repository exists. Pulling latest changes...")
        %cd {REPO_DIR}
        !git pull origin {BRANCH}
    else:
        print(f"Cloning repository from {REPO_URL}...")
        !git clone -b {BRANCH} {REPO_URL} {REPO_DIR}
        %cd {REPO_DIR}
    
    # Install core dependencies
    print("\n" + "="*50)
    print("Installing dependencies...")
    print("="*50)
    
    # Install the package with colab extras (includes pyfastcopy, ipywidgets)
    !pip install -q -e ".[colab]"
    
    # Verify colab extras installed
    colab_extras_ok = True
    try:
        import pyfastcopy
        print("✓ pyfastcopy installed (faster file copying)")
    except ImportError:
        print("⚠ pyfastcopy not available")
        colab_extras_ok = False
    
    try:
        import ipywidgets
        print("✓ ipywidgets installed (better progress bars)")
    except ImportError:
        print("⚠ ipywidgets not available")
        colab_extras_ok = False
    
    # Install flash-attn separately (requires special handling)
    print("\nInstalling flash-attention (this may take a few minutes)...")
    try:
        # Try installing pre-built wheel first (faster)
        !pip install -q flash-attn --no-build-isolation 2>/dev/null || \
         pip install -q flash-attn --no-build-isolation --no-cache-dir
        print("✓ flash-attn installed successfully!")
    except:
        print("⚠ Warning: flash-attn installation failed. Training will use standard attention.")
    
    # Install kernel optimizations (Liger Kernel + Cut Cross-Entropy + 8-bit Adam)
    print("\nInstalling kernel optimizations...")
    
    # Liger Kernel (~20% speedup, ~60% memory reduction)
    !pip install -q liger-kernel
    try:
        import liger_kernel
        print("✓ liger-kernel installed (~20% speedup, ~60% memory reduction)")
    except ImportError:
        print("⚠ liger-kernel not available")
    
    # bitsandbytes for 8-bit Adam optimizer (~4x memory reduction)
    !pip install -q bitsandbytes
    try:
        import bitsandbytes
        print("✓ bitsandbytes installed (8-bit Adam, ~4x optimizer memory reduction)")
    except ImportError:
        print("⚠ bitsandbytes not available - using standard AdamW")
    
    # Cut Cross-Entropy (may need specific torch version)
    !pip install -q cut-cross-entropy
    try:
        import cut_cross_entropy
        print("✓ cut-cross-entropy installed (~95% memory reduction on loss)")
    except ImportError:
        print("⚠ cut-cross-entropy not available - trying alternative install...")
        !pip install -q git+https://github.com/apple/ml-cross-entropy.git
        try:
            import cut_cross_entropy
            print("✓ cut-cross-entropy installed from source")
        except ImportError:
            print("⚠ cut-cross-entropy could not be installed. Training will work without it.")
    
    PROJECT_ROOT = REPO_DIR
    print("\n" + "="*50)
    print("Installation complete!")
    print("="*50)
else:
    # Local development - assume we're in the repo
    PROJECT_ROOT = os.path.dirname(os.getcwd()) if 'notebooks' in os.getcwd() else os.getcwd()
    
print(f"\nProject root: {PROJECT_ROOT}")
os.chdir(PROJECT_ROOT)
sys.path.insert(0, os.path.join(PROJECT_ROOT, 'scripts'))

In [None]:
#@title ### 0.3 Set Up Persistent Storage (Google Drive)
#@markdown Link checkpoints and data directories to Google Drive for persistence across sessions.

#@markdown ---
USE_DRIVE_STORAGE = True  #@param {type:"boolean"}

import os

if IN_COLAB and USE_DRIVE_STORAGE and DRIVE_BASE:
    print("Setting up persistent storage on Google Drive...")
    
    # Create ALL necessary directories on Drive FIRST (including subdirs)
    drive_subdirs = [
        'checkpoints',
        'data',
        'data/raw',        # Raw downloaded data
        'data/processed',  # Cleaned data
        'data/packed',     # Tokenized/packed data
        'data/sft',        # SFT data
        'data/dpo',        # DPO data
        'logs',
        'evals'
    ]
    
    for subdir in drive_subdirs:
        drive_path = os.path.join(DRIVE_BASE, subdir)
        os.makedirs(drive_path, exist_ok=True)
    
    print(f"  Created directory structure on Google Drive")
    
    # Create symlinks for top-level directories only
    local_dirs = ['checkpoints', 'data', 'logs', 'evals']
    
    for dir_name in local_dirs:
        local_path = os.path.join(PROJECT_ROOT, dir_name)
        drive_path = os.path.join(DRIVE_BASE, dir_name)
        
        # Remove local dir if it exists (but not if it's already a symlink)
        if os.path.exists(local_path) and not os.path.islink(local_path):
            # Move existing contents to drive
            if os.listdir(local_path):
                print(f"  Moving existing {dir_name} to Drive...")
                !cp -r {local_path}/* {drive_path}/ 2>/dev/null || true
            !rm -rf {local_path}
        elif os.path.islink(local_path):
            # Remove old symlink
            os.unlink(local_path)
        
        # Create symlink
        os.symlink(drive_path, local_path)
        print(f"  {dir_name} -> {drive_path}")
    
    print("\nPersistent storage configured!")
    print("Your checkpoints and data will survive Colab disconnections.")
else:
    print("Using local storage (not persistent in Colab)")
    # Create local directories
    for dir_name in ['checkpoints', 'data', 'data/raw', 'data/processed', 'data/packed', 'logs', 'evals']:
        os.makedirs(os.path.join(PROJECT_ROOT, dir_name), exist_ok=True)

# Store paths for later use (always set these)
DRIVE_DATA_RAW = os.path.join(DRIVE_BASE, "data/raw") if DRIVE_BASE else "data/raw"
DRIVE_DATA_PROCESSED = os.path.join(DRIVE_BASE, "data/processed") if DRIVE_BASE else "data/processed"

# Default cleaning paths (will be overridden by cell 0.3.1 if using local SSD)
CLEANING_INPUT_DIR = "data/raw"
CLEANING_OUTPUT_DIR = "data/processed"

print(f"\nData paths:")
print(f"  Raw data: {DRIVE_DATA_RAW}")
print(f"  Processed data: {DRIVE_DATA_PROCESSED}")

In [None]:
#@title ### 0.3.1 Copy Data to Local SSD (Faster I/O) [Recommended]
#@markdown **Copies data from Google Drive to local NVMe SSD for 5-10x faster I/O.**
#@markdown
#@markdown The local SSD in Colab is much faster than Google Drive for random I/O.
#@markdown This significantly speeds up data cleaning and training.
#@markdown
#@markdown **When to use:**
#@markdown - Before data cleaning (copy raw data)
#@markdown - Before training (copy packed data)
#@markdown - Results are automatically backed up to Drive when done

#@markdown ---
USE_LOCAL_SSD = True  #@param {type:"boolean"}
#@markdown *Enable local SSD for faster I/O during processing*
COPY_THREADS = 8  #@param {type:"integer"}
#@markdown *Threads for parallel copy (8-16 recommended)*

import os
import shutil
import time
from concurrent.futures import ThreadPoolExecutor, as_completed
from tqdm import tqdm

# Try to import pyfastcopy for 30-50% faster per-file copy
try:
    import pyfastcopy  # Patches shutil.copy2 to use sendfile syscall
    FAST_COPY_AVAILABLE = True
except ImportError:
    FAST_COPY_AVAILABLE = False

# Local SSD paths (fast storage, not persistent)
LOCAL_DATA = "/content/local_data"
LOCAL_RAW = f"{LOCAL_DATA}/raw"
LOCAL_PROCESSED = f"{LOCAL_DATA}/processed"
LOCAL_PACKED = f"{LOCAL_DATA}/packed"
LOCAL_CACHE = f"{LOCAL_DATA}/.cache"
LOCAL_CHECKPOINTS = "/content/local_checkpoints"

# Drive paths (persistent)
DRIVE_RAW = f"{DRIVE_BASE}/data/raw" if DRIVE_BASE else "data/raw"
DRIVE_PROCESSED = f"{DRIVE_BASE}/data/processed" if DRIVE_BASE else "data/processed"
DRIVE_PACKED = f"{DRIVE_BASE}/data/packed" if DRIVE_BASE else "data/packed"
DRIVE_CHECKPOINTS = f"{DRIVE_BASE}/checkpoints" if DRIVE_BASE else "checkpoints"

def parallel_copy(src_dir, dst_dir, pattern="*", max_workers=8, desc="Copying"):
    """Copy files in parallel with progress bar."""
    import glob
    
    os.makedirs(dst_dir, exist_ok=True)
    
    # Find files to copy
    if pattern == "*":
        files = [f for f in os.listdir(src_dir) if os.path.isfile(os.path.join(src_dir, f))]
    else:
        files = [os.path.basename(f) for f in glob.glob(os.path.join(src_dir, pattern))]
    
    # Filter out already copied files
    to_copy = []
    total_size = 0
    for f in files:
        src = os.path.join(src_dir, f)
        dst = os.path.join(dst_dir, f)
        if not os.path.exists(dst) or os.path.getsize(src) != os.path.getsize(dst):
            size = os.path.getsize(src)
            to_copy.append((src, dst, f, size))
            total_size += size
    
    if not to_copy:
        print(f"  All files already copied")
        return 0
    
    print(f"  Copying {len(to_copy)} files ({total_size / (1024**3):.2f} GB)")
    if FAST_COPY_AVAILABLE:
        print(f"  Using: pyfastcopy + {max_workers} threads (fastest)")
    else:
        print(f"  Using: {max_workers} threads (install pyfastcopy for 30-50% faster)")
    
    def copy_file(args):
        src, dst, name, size = args
        shutil.copy2(src, dst)
        return name, size
    
    copied = 0
    start_time = time.time()
    
    with ThreadPoolExecutor(max_workers=max_workers) as executor:
        futures = {executor.submit(copy_file, args): args for args in to_copy}
        with tqdm(total=total_size, unit='B', unit_scale=True, desc=f"  {desc}") as pbar:
            for future in as_completed(futures):
                name, size = future.result()
                copied += 1
                pbar.update(size)
    
    elapsed = time.time() - start_time
    speed = total_size / (1024**2) / elapsed if elapsed > 0 else 0
    print(f"  Copied {copied} files in {elapsed:.1f}s ({speed:.0f} MB/s)")
    
    return copied

if IN_COLAB and USE_LOCAL_SSD:
    print("Setting up local SSD for faster I/O...")
    print("=" * 50)
    
    # Create local directories
    for d in [LOCAL_DATA, LOCAL_RAW, LOCAL_PROCESSED, LOCAL_PACKED, LOCAL_CACHE, LOCAL_CHECKPOINTS]:
        os.makedirs(d, exist_ok=True)
    
    # Check for existing raw data on Drive
    if os.path.exists(DRIVE_RAW):
        raw_files = [f for f in os.listdir(DRIVE_RAW) if f.endswith('.parquet')]
        if raw_files:
            print(f"\nFound {len(raw_files)} raw data files on Drive")
            parallel_copy(DRIVE_RAW, LOCAL_RAW, "*.parquet", COPY_THREADS, "Raw data")
    
    # Check for existing packed data on Drive
    if os.path.exists(DRIVE_PACKED):
        packed_files = list(os.listdir(DRIVE_PACKED))
        if packed_files:
            print(f"\nFound packed data on Drive")
            parallel_copy(DRIVE_PACKED, LOCAL_PACKED, "*", COPY_THREADS, "Packed data")
    
    # Store paths for use in other cells
    CLEANING_INPUT_DIR = LOCAL_RAW
    CLEANING_OUTPUT_DIR = LOCAL_PROCESSED
    TRAINING_DATA_DIR = LOCAL_PACKED
    TRAINING_CHECKPOINT_DIR = LOCAL_CHECKPOINTS
    
    # Check local disk space
    import subprocess
    result = subprocess.run(['df', '-h', '/content'], capture_output=True, text=True)
    print(f"\nLocal SSD status:")
    print(result.stdout.split('\n')[1])
    
    print(f"\nLocal SSD paths configured:")
    print(f"  Raw data:    {LOCAL_RAW}")
    print(f"  Processed:   {LOCAL_PROCESSED}")
    print(f"  Packed:      {LOCAL_PACKED}")
    print(f"  Checkpoints: {LOCAL_CHECKPOINTS}")
    print("=" * 50)
else:
    CLEANING_INPUT_DIR = "data/raw"
    CLEANING_OUTPUT_DIR = "data/processed"
    TRAINING_DATA_DIR = "data/packed"
    TRAINING_CHECKPOINT_DIR = "checkpoints"
    print("Using Google Drive paths (local SSD disabled)")

### Alternative: Install from pip (if repo is published)

If the package is published to PyPI or you prefer pip installation:

```python
# From PyPI (when published)
!pip install llm-training-pipeline

# From GitHub directly
!pip install git+https://github.com/rmarnold/llm-training-pipeline.git

# With FP8 support (H100 only)
!pip install "llm-training-pipeline[fp8]"
```

In [None]:
#@title ### 0.4 Check GPU Availability
#@markdown Verify GPU is available and check for FP8 support.

import torch

print("=" * 50)
print("GPU INFORMATION")
print("=" * 50)

if torch.cuda.is_available():
    gpu_name = torch.cuda.get_device_name(0)
    gpu_memory = torch.cuda.get_device_properties(0).total_memory / (1024**3)
    print(f"GPU: {gpu_name}")
    print(f"Memory: {gpu_memory:.1f} GB")
    
    # Check for H100/FP8 support
    capability = torch.cuda.get_device_capability()
    print(f"Compute Capability: {capability[0]}.{capability[1]}")
    
    if capability[0] >= 9:
        print("FP8 Support: AVAILABLE (H100)")
        RECOMMENDED_PRECISION = "fp8"
    elif capability[0] >= 8:
        print("FP8 Support: Not available (use BF16)")
        RECOMMENDED_PRECISION = "bf16"
    else:
        print("FP8 Support: Not available")
        RECOMMENDED_PRECISION = "fp16"
    
    # Memory recommendation
    if gpu_memory >= 80:
        print(f"\nRecommendation: Full 7B training supported")
    elif gpu_memory >= 40:
        print(f"\nRecommendation: Use gradient checkpointing, smaller batch size")
    else:
        print(f"\nWarning: GPU memory may be insufficient for 7B model")
        print("Consider using LoRA or a smaller model")
else:
    print("WARNING: No GPU detected!")
    print("Training will be extremely slow on CPU.")
    RECOMMENDED_PRECISION = "fp32"

print("=" * 50)

In [None]:
#@title ### 0.5 Start GPU Keepalive (Prevents Idle Timeout) [DISABLED]
#@markdown **Note:** GPU keepalive is currently disabled. Uncomment the code below to enable if needed.
#@markdown 
#@markdown This starts a background process that periodically pings the GPU to prevent Colab from timing out during long CPU-bound operations (like data cleaning).

# GPU Keepalive is DISABLED - uncomment below to enable
# import subprocess
# import os
# 
# if IN_COLAB:
#     # Kill any existing keepalive process
#     !pkill -f gpu_keepalive.py 2>/dev/null || true
#     
#     # Start GPU keepalive in background
#     keepalive_script = os.path.join(PROJECT_ROOT, 'scripts', 'gpu_keepalive.py')
#     
#     if os.path.exists(keepalive_script):
#         process = subprocess.Popen(
#             ['python', keepalive_script],
#             stdout=subprocess.DEVNULL,
#             stderr=subprocess.DEVNULL,
#             start_new_session=True
#         )
#         print(f"GPU Keepalive started (PID: {process.pid})")
#         print("  - Checks GPU every 60 seconds")
#         print("  - Sends keepalive spike if idle for 5+ minutes")
#         print("  - Prevents Colab idle timeout during CPU-bound tasks")
#         print(f"\nTo stop: !pkill -f gpu_keepalive.py")
#     else:
#         print("Warning: gpu_keepalive.py not found. Run 'git pull' to get latest code.")
# else:
#     print("GPU keepalive not needed outside of Colab")

print("GPU Keepalive: DISABLED")
print("To enable, uncomment the code in this cell.")

In [None]:
#@title ### 0.5 Training Configuration {run: "auto"}
#@markdown Adjust these settings based on your GPU and requirements.

#@markdown ---
#@markdown **Model Size:**
model_size = "7b"  #@param ["125m", "350m", "1b", "3b", "7b"]
#@markdown *Available sizes: 125M (~124M params), 350M, 1B, 3B, 7B*
context_length = 4096  #@param {type:"integer"}
#@markdown *Max context length (tokens). Larger = more memory.*

#@markdown ---
#@markdown **General Settings:**
use_fp8 = "auto"  #@param ["auto", "true", "false"]
seed = 42  #@param {type:"integer"}
enable_oom_recovery = True  #@param {type:"boolean"}

#@markdown ---
#@markdown **Kernel Optimizations (Recommended):**
use_liger_kernel = True  #@param {type:"boolean"}
#@markdown *Liger Kernel: ~20% speedup, ~60% memory reduction*
use_cce = True  #@param {type:"boolean"}
#@markdown *Cut Cross-Entropy: ~95% memory reduction on loss computation*

#@markdown ---
#@markdown **Data Cleaning:**
pipeline_mode = "native"  #@param ["native", "legacy"]
#@markdown *Pipeline modes:*
#@markdown - **native**: Uses datatrove's optimized pipeline (3-5x faster, recommended)
#@markdown - **legacy**: Uses multiprocessing Pool (fallback if native fails)

use_full_clean = False  #@param {type:"boolean"}
#@markdown *Full clean uses plsfix (Rust) for Unicode/mojibake fixing. Slower but more thorough.*

quality_filter_mode = "default"  #@param ["default", "fast", "no-repetition", "no-fineweb"]
#@markdown *Quality filter modes:*
#@markdown - **default**: All filters (GopherQuality + FineWeb + GopherRepetition)
#@markdown - **fast**: Only GopherQuality (~3x faster, basic filtering)
#@markdown - **no-repetition**: Skip n-gram analysis (~2x faster, may miss spam)
#@markdown - **no-fineweb**: Skip line structure checks (~15% faster)

#@markdown ---
#@markdown **Pretraining:**
pretrain_max_steps = 100000  #@param {type:"integer"}
pretrain_save_steps = 1000  #@param {type:"integer"}
pretrain_eval_steps = 1000  #@param {type:"integer"}

#@markdown ---
#@markdown **SFT:**
sft_max_steps = 5000  #@param {type:"integer"}
sft_save_steps = 500  #@param {type:"integer"}

#@markdown ---
#@markdown **DPO:**
dpo_max_steps = 2000  #@param {type:"integer"}
dpo_save_steps = 200  #@param {type:"integer"}

# Model size to batch size recommendations (A100 80GB)
BATCH_SIZE_MAP = {
    "125m": {"batch_size": 32, "grad_accum": 1},
    "350m": {"batch_size": 16, "grad_accum": 2},
    "1b": {"batch_size": 16, "grad_accum": 2},
    "3b": {"batch_size": 8, "grad_accum": 4},
    "7b": {"batch_size": 8, "grad_accum": 4},
}

batch_rec = BATCH_SIZE_MAP.get(model_size, {"batch_size": 8, "grad_accum": 4})

# Build config dict
CONFIG = {
    'model_size': model_size,
    'context_length': context_length,
    'use_fp8': None if use_fp8 == "auto" else (use_fp8 == "true"),
    'seed': seed,
    'enable_oom_recovery': enable_oom_recovery,
    'use_liger_kernel': use_liger_kernel,
    'use_cce': use_cce,
    'pipeline_mode': pipeline_mode,
    'use_full_clean': use_full_clean,
    'quality_filter_mode': quality_filter_mode,
    'pretrain_max_steps': pretrain_max_steps,
    'pretrain_save_steps': pretrain_save_steps,
    'pretrain_eval_steps': pretrain_eval_steps,
    'sft_max_steps': sft_max_steps,
    'sft_save_steps': sft_save_steps,
    'dpo_max_steps': dpo_max_steps,
    'dpo_save_steps': dpo_save_steps,
    'batch_size': batch_rec['batch_size'],
    'grad_accum': batch_rec['grad_accum'],
}

# Model size descriptions
SIZE_INFO = {
    "125m": "~124M params - Fast training, good for testing",
    "350m": "~350M params - Small but capable",
    "1b": "~1B params - Good balance of speed and capability",
    "3b": "~3B params - Strong performance, moderate memory",
    "7b": "~7B params - Full capability, requires A100 80GB",
}

print("=" * 60)
print("TRAINING CONFIGURATION")
print("=" * 60)
print(f"\nModel: {model_size.upper()} - {SIZE_INFO[model_size]}")
print(f"Context length: {context_length} tokens")
print(f"\nRecommended batch settings:")
print(f"  batch_size: {batch_rec['batch_size']}")
print(f"  gradient_accumulation: {batch_rec['grad_accum']}")
print(f"  effective_batch: {batch_rec['batch_size'] * batch_rec['grad_accum']}")

if use_liger_kernel or use_cce:
    print("\nKernel Optimizations:")
    if use_liger_kernel:
        print("  - Liger Kernel: Fused Triton kernels (~20% speedup)")
    if use_cce:
        print("  - Cut Cross-Entropy: Memory-efficient loss (~95% reduction)")

print(f"\nPipeline Mode: {'NATIVE DATATROVE' if pipeline_mode == 'native' else 'LEGACY'}")
print("=" * 60)

---
## Pre-flight Validation

Before starting training, validate that all prerequisites are in place.

In [None]:
# Run pre-flight checks for a specific stage
# Options: 'pretrain', 'sft', 'dpo', 'lora'

STAGE_TO_CHECK = 'pretrain'  # Change this to check different stages

!python scripts/preflight_check.py {STAGE_TO_CHECK}

In [None]:
# Run all pre-flight checks
!python scripts/preflight_check.py --all

---
## Stage 1: Data Preparation

Download, clean, and prepare training data. Skip this section if data is already prepared.

In [None]:
#@title ### Step 1.1: Download raw data
#@markdown Downloads pretraining data from configured sources (HuggingFace).
#@markdown Data is saved to local SSD for speed, then backed up to Drive.

import os

# Determine output directory
if 'USE_LOCAL_SSD' in dir() and USE_LOCAL_SSD and IN_COLAB:
    download_dir = LOCAL_RAW
    backup_dir = DRIVE_RAW
    print(f"Downloading to local SSD: {download_dir}")
    print(f"Backup will be saved to: {backup_dir}")
else:
    download_dir = "data/raw"
    backup_dir = None
    print(f"Downloading to: {download_dir}")

print("=" * 50)

# Download
!python scripts/01_download_data.py --output-dir {download_dir}

# Backup to Drive
if backup_dir:
    print(f"\nBacking up to Google Drive...")
    os.makedirs(backup_dir, exist_ok=True)
    !cp -r {download_dir}/*.parquet {backup_dir}/ 2>/dev/null || true
    print("Backup complete!")

# Check what was downloaded
if os.path.exists(download_dir):
    files = [f for f in os.listdir(download_dir) if f.endswith('.parquet')]
    print(f"\nDownloaded {len(files)} files:")
    for f in files:
        size_mb = os.path.getsize(f"{download_dir}/{f}") / (1024*1024)
        print(f"  - {f} ({size_mb:.1f} MB)")
else:
    print("\nNo data downloaded. Check the script output above for errors.")

In [None]:
#@title ### Step 1.2: Clean and deduplicate data
#@markdown Removes duplicates, filters low-quality content.
#@markdown Uses local SSD for faster I/O, syncs to Google Drive after completion.

#@markdown ---
#@markdown **Auto-sync to Drive (for recovery):**
AUTO_SYNC_TO_DRIVE = True  #@param {type:"boolean"}
#@markdown *Automatically sync after cleaning for recovery if interrupted*

import os

# Build cleaning command
clean_cmd = "python scripts/02_gpu_clean_deduplicate.py"

# Use local SSD paths if configured
if 'USE_LOCAL_SSD' in dir() and USE_LOCAL_SSD and IN_COLAB:
    input_dir = LOCAL_RAW
    output_dir = LOCAL_PROCESSED
    cache_dir = LOCAL_CACHE
    backup_dir = DRIVE_PROCESSED
    print(f"Using LOCAL SSD for faster I/O (5-10x speedup)")
    print(f"  Input:  {input_dir}")
    print(f"  Output: {output_dir}")
    print(f"  Cache:  {cache_dir}")
    clean_cmd += f" --input {input_dir} --output {output_dir} --cache {cache_dir}"
else:
    input_dir = CLEANING_INPUT_DIR
    output_dir = CLEANING_OUTPUT_DIR
    backup_dir = None
    print(f"Using default paths")
    print(f"  Input:  {input_dir}")
    print(f"  Output: {output_dir}")
    if input_dir != "data/raw":
        clean_cmd += f" --input {input_dir} --output {output_dir}"

# Pipeline mode (native datatrove vs legacy multiprocessing)
pipeline_mode = CONFIG.get('pipeline_mode', 'native')
if pipeline_mode == 'native':
    clean_cmd += " --native-pipeline"

# Quality filter mode
filter_mode = CONFIG.get('quality_filter_mode', 'default')
if filter_mode == 'fast':
    clean_cmd += " --fast-quality --no-toxicity"
elif filter_mode == 'no-repetition':
    clean_cmd += " --no-repetition-filter"
elif filter_mode == 'no-fineweb':
    clean_cmd += " --no-fineweb-filter"

print(f"\nPipeline mode: {'GPU (RAPIDS + NeMo)' if 'gpu' in clean_cmd else 'NATIVE DATATROVE' if pipeline_mode == 'native' else 'LEGACY'}")
print(f"Quality filters: {filter_mode}")
print(f"\nCommand: {clean_cmd}")
print("=" * 50)

!{clean_cmd}

# Backup to Drive after cleaning
if backup_dir and AUTO_SYNC_TO_DRIVE:
    print(f"\nBacking up processed data to Google Drive...")
    os.makedirs(backup_dir, exist_ok=True)
    !cp -r {output_dir}/* {backup_dir}/ 2>/dev/null || true
    print("Backup complete!")

In [None]:
#@title ### Step 1.2.1: Verify/Manual Sync to Google Drive [Optional]
#@markdown **Verify sync status or force manual sync if auto-sync was disabled.**
#@markdown
#@markdown With `AUTO_SYNC_TO_DRIVE=True` in Step 1.2, this cell is usually not needed.
#@markdown Use this for:
#@markdown - Verifying data is synced to Drive
#@markdown - Manual sync if auto-sync was disabled
#@markdown - Re-syncing after resuming from an interrupted session

#@markdown ---
FORCE_SYNC = False  #@param {type:"boolean"}
#@markdown *Force sync even if files appear up-to-date*
SYNC_THREADS = 10  #@param {type:"integer"}
#@markdown *Threads for parallel sync (10-20 for many files, 2-4 for large files)*

import os
import json
import shutil
from concurrent.futures import ThreadPoolExecutor, as_completed
from tqdm import tqdm

# Try to import pyfastcopy for 30-50% faster per-file copy
try:
    import pyfastcopy  # Patches shutil.copy2 to use sendfile syscall
    FAST_COPY_AVAILABLE = True
except ImportError:
    FAST_COPY_AVAILABLE = False

def check_stage_status(output_dir: str) -> dict:
    """Check cleaning stage status from state file."""
    state_file = os.path.join(output_dir, ".stage_state.json")
    if os.path.exists(state_file):
        with open(state_file) as f:
            return json.load(f)
    return {'completed_stages': [], 'last_stage': None}

def sync_files_parallel(src_dir: str, dst_dir: str, files: list, max_workers: int = 10):
    """Parallel file sync with progress bar."""
    if not files:
        return 0

    # Build list of files to sync with sizes
    to_sync = []
    total_size = 0
    for f in files:
        src = os.path.join(src_dir, f)
        dst = os.path.join(dst_dir, f)
        size = os.path.getsize(src)
        to_sync.append((src, dst, f, size))
        total_size += size

    if FAST_COPY_AVAILABLE:
        print(f"  Using: pyfastcopy + {max_workers} threads")
    else:
        print(f"  Using: {max_workers} threads")

    def copy_file(args):
        src, dst, name, size = args
        shutil.copy2(src, dst)  # pyfastcopy patches this if available
        return name, size

    synced = 0
    with ThreadPoolExecutor(max_workers=max_workers) as executor:
        futures = {executor.submit(copy_file, args): args for args in to_sync}
        with tqdm(total=total_size, unit='B', unit_scale=True, desc="  Syncing") as pbar:
            for future in as_completed(futures):
                name, size = future.result()
                synced += 1
                pbar.update(size)

    return synced

# Check if using local SSD
try:
    using_local_ssd = CLEANING_OUTPUT_DIR.startswith("/content/data_local")
except NameError:
    using_local_ssd = False

print("=" * 50)
print("CLEANING STAGE STATUS")
print("=" * 50)

# Check local status
local_output = CLEANING_OUTPUT_DIR if 'CLEANING_OUTPUT_DIR' in dir() else "data/processed"
local_status = check_stage_status(local_output)

print(f"\nLocal ({local_output}):")
if local_status['completed_stages']:
    for stage in ['text_clean', 'quality_filter', 'toxicity_filter', 'dedup', 'final']:
        status = "COMPLETE" if stage in local_status['completed_stages'] else "pending"
        print(f"  {stage}: {status}")
else:
    print("  No stage state found (cleaning may not have started)")

# Check Drive status
if IN_COLAB and DRIVE_BASE:
    drive_processed = os.path.join(DRIVE_BASE, "data/processed")
    drive_status = check_stage_status(drive_processed)

    print(f"\nGoogle Drive ({drive_processed}):")
    if drive_status['completed_stages']:
        for stage in ['text_clean', 'quality_filter', 'toxicity_filter', 'dedup', 'final']:
            status = "COMPLETE" if stage in drive_status['completed_stages'] else "pending"
            print(f"  {stage}: {status}")
    else:
        print("  No stage state found")

    # Compare file counts
    local_files = set(f for f in os.listdir(local_output) if f.endswith('.parquet')) if os.path.exists(local_output) else set()
    drive_files = set(f for f in os.listdir(drive_processed) if f.endswith('.parquet')) if os.path.exists(drive_processed) else set()

    print(f"\nFile comparison:")
    print(f"  Local files:  {len(local_files)}")
    print(f"  Drive files:  {len(drive_files)}")

    if local_files == drive_files:
        print("  Status: IN SYNC")
    else:
        missing_on_drive = local_files - drive_files
        if missing_on_drive:
            print(f"  Missing on Drive: {len(missing_on_drive)} files")
            if FORCE_SYNC:
                print("\nSyncing missing files to Google Drive...")
                os.makedirs(drive_processed, exist_ok=True)
                synced = sync_files_parallel(
                    local_output, drive_processed,
                    list(missing_on_drive),
                    max_workers=SYNC_THREADS
                )
                print(f"  Synced {synced} files to Drive!")
            else:
                print("  Set FORCE_SYNC=True to sync missing files")

print("=" * 50)

In [None]:
#@title ### Step 1.3: Tokenize and pack sequences
#@markdown Creates packed sequences for efficient training.
#@markdown Uses local SSD for faster I/O, backs up to Drive when done.

import os

# Determine paths
if 'USE_LOCAL_SSD' in dir() and USE_LOCAL_SSD and IN_COLAB:
    input_dir = LOCAL_PROCESSED
    output_dir = LOCAL_PACKED
    backup_dir = DRIVE_PACKED
    print(f"Using LOCAL SSD for faster I/O")
    print(f"  Input:  {input_dir}")
    print(f"  Output: {output_dir}")
else:
    input_dir = "data/processed"
    output_dir = "data/packed"
    backup_dir = None
    print(f"Using default paths")

print("=" * 50)

!python scripts/03_tokenize_and_pack.py --input-dir {input_dir} --output-dir {output_dir}

# Backup to Drive
if backup_dir:
    print(f"\nBacking up packed data to Google Drive...")
    os.makedirs(backup_dir, exist_ok=True)
    !cp -r {output_dir}/* {backup_dir}/ 2>/dev/null || true
    print("Backup complete!")

In [None]:
# Step 1.4: Initialize model with selected size
# Creates the initial model checkpoint based on CONFIG['model_size']

model_size = CONFIG.get('model_size', '7b')
context_length = CONFIG.get('context_length', 4096)

print(f"Initializing {model_size.upper()} model with {context_length} context length...")

init_cmd = f"python scripts/04_init_model.py --size {model_size} --context-length {context_length}"
print(f"Command: {init_cmd}\n")

!{init_cmd}

In [None]:
# Verify data preparation
import os

paths_to_check = [
    ('Tokenizer', 'configs/tokenizer'),
    ('Initial model', 'checkpoints/init'),
    ('Training data', 'data/packed/train'),
    ('Validation data', 'data/packed/val'),
]

print("Data preparation status:")
print("=" * 50)
all_ready = True
for name, path in paths_to_check:
    exists = os.path.exists(path)
    status = "OK" if exists else "MISSING"
    print(f"  {name}: {status}")
    all_ready = all_ready and exists

print("=" * 50)
if all_ready:
    print("All data preparation complete! Ready for pretraining.")
else:
    print("Some data is missing. Run the preparation steps above.")

---
## Stage 2: Pretraining

Train the base model on large text corpora. This is the longest stage.

**Estimated time:** 25-50 hours depending on GPU (H100 FP8 fastest)

In [None]:
# Build pretraining command with model-size-appropriate batch settings
pretrain_cmd = "python scripts/05_pretrain.py"

if CONFIG['use_fp8'] is True:
    pretrain_cmd += " --fp8"
elif CONFIG['use_fp8'] is False:
    pretrain_cmd += " --no-fp8"

pretrain_cmd += f" --max_steps {CONFIG['pretrain_max_steps']}"
pretrain_cmd += f" --save_steps {CONFIG['pretrain_save_steps']}"
pretrain_cmd += f" --eval_steps {CONFIG['pretrain_eval_steps']}"
pretrain_cmd += f" --seed {CONFIG['seed']}"

if CONFIG['enable_oom_recovery']:
    pretrain_cmd += " --enable-oom-recovery"

# Kernel optimizations (enabled by default)
if CONFIG.get('use_liger_kernel', True):
    pretrain_cmd += " --use-liger-kernel"
else:
    pretrain_cmd += " --no-liger-kernel"

if CONFIG.get('use_cce', True):
    pretrain_cmd += " --use-cce"
else:
    pretrain_cmd += " --no-cce"

print("=" * 60)
print("PRETRAINING CONFIGURATION")
print("=" * 60)
print(f"Model size: {CONFIG.get('model_size', '7b').upper()}")
print(f"Batch size: {CONFIG['batch_size']} (recommended for this model size)")
print(f"Gradient accumulation: {CONFIG['grad_accum']}")
print(f"Effective batch: {CONFIG['batch_size'] * CONFIG['grad_accum']}")
print(f"Max steps: {CONFIG['pretrain_max_steps']}")
print(f"\nCommand: {pretrain_cmd}")

if CONFIG.get('use_liger_kernel') or CONFIG.get('use_cce'):
    print("\nKernel optimizations enabled for faster training!")
print("=" * 60)

In [None]:
#@title ### Start pretraining
#@markdown Runs pretraining with local SSD for faster data loading.
#@markdown Checkpoints are saved locally and backed up to Drive after training.

import os

# Build command with local SSD paths if configured
pretrain_cmd_local = pretrain_cmd

if 'USE_LOCAL_SSD' in dir() and USE_LOCAL_SSD and IN_COLAB:
    data_path = LOCAL_PACKED
    checkpoint_path = f"{LOCAL_CHECKPOINTS}/pretrain"
    backup_checkpoint_dir = f"{DRIVE_CHECKPOINTS}/pretrain"
    
    pretrain_cmd_local += f" --train_data_path {data_path}"
    pretrain_cmd_local += f" --output_dir {checkpoint_path}"
    
    print(f"Using LOCAL SSD for faster I/O:")
    print(f"  Data: {data_path}")
    print(f"  Checkpoints: {checkpoint_path}")
else:
    backup_checkpoint_dir = None
    print("Using default paths")

print(f"\nCommand: {pretrain_cmd_local}")
print("=" * 50)

# Run training
!{pretrain_cmd_local}

# Backup checkpoints to Drive after training
if backup_checkpoint_dir:
    print(f"\nBacking up checkpoints to Google Drive...")
    os.makedirs(backup_checkpoint_dir, exist_ok=True)
    !cp -r {checkpoint_path}/* {backup_checkpoint_dir}/ 2>/dev/null || true
    # Also backup final checkpoint
    if os.path.exists(f"{LOCAL_CHECKPOINTS}/pretrain_final"):
        !cp -r {LOCAL_CHECKPOINTS}/pretrain_final {DRIVE_CHECKPOINTS}/ 2>/dev/null || true
    print("Checkpoint backup complete!")

In [None]:
# Resume pretraining from checkpoint (if interrupted)
# Uncomment and modify the checkpoint path as needed

# CHECKPOINT_PATH = "checkpoints/pretrain/checkpoint-5000"
# !python scripts/05_pretrain.py --resume_from_checkpoint {CHECKPOINT_PATH}

---
## Stage 3: Supervised Fine-Tuning (SFT)

Fine-tune on instruction-response pairs to create a helpful assistant.

**Estimated time:** 2-5 hours

In [None]:
# Prepare SFT data (if not already done)
!python scripts/06_prepare_sft_data.py

In [None]:
# Verify pretrained checkpoint exists
import os

if os.path.exists('checkpoints/pretrain_final'):
    print("Pretrained checkpoint found. Ready for SFT.")
else:
    print("ERROR: Pretrained checkpoint not found!")
    print("Complete pretraining before starting SFT.")

In [None]:
# Build SFT command
sft_cmd = "python scripts/07_sft.py"

if CONFIG['use_fp8'] is True:
    sft_cmd += " --fp8"
elif CONFIG['use_fp8'] is False:
    sft_cmd += " --no-fp8"

sft_cmd += f" --max_steps {CONFIG['sft_max_steps']}"
sft_cmd += f" --save_steps {CONFIG['sft_save_steps']}"
sft_cmd += f" --seed {CONFIG['seed']}"

if CONFIG['enable_oom_recovery']:
    sft_cmd += " --enable-oom-recovery"

print("SFT command:")
print(sft_cmd)

In [None]:
# Start SFT training
!{sft_cmd}

---
## Stage 4: Direct Preference Optimization (DPO)

Align the model with human preferences using chosen/rejected response pairs.

**Estimated time:** 1-3 hours

In [None]:
# Prepare DPO data
!python scripts/08_prepare_dpo_data.py

In [None]:
# Verify SFT checkpoint exists
import os

if os.path.exists('checkpoints/sft_final'):
    print("SFT checkpoint found. Ready for DPO.")
else:
    print("ERROR: SFT checkpoint not found!")
    print("Complete SFT before starting DPO.")

In [None]:
# Build DPO command
dpo_cmd = "python scripts/09_dpo.py"

if CONFIG['use_fp8'] is True:
    dpo_cmd += " --fp8"
elif CONFIG['use_fp8'] is False:
    dpo_cmd += " --no-fp8"

dpo_cmd += f" --max_steps {CONFIG['dpo_max_steps']}"
dpo_cmd += f" --save_steps {CONFIG['dpo_save_steps']}"
dpo_cmd += f" --seed {CONFIG['seed']}"

if CONFIG['enable_oom_recovery']:
    dpo_cmd += " --enable-oom-recovery"

print("DPO command:")
print(dpo_cmd)

In [None]:
# Start DPO training
!{dpo_cmd}

---
## Stage 5: LoRA Fine-Tuning (Optional)

Domain-specific adaptation using LoRA for efficient fine-tuning.

In [None]:
# LoRA fine-tuning (optional)
# Uncomment to run LoRA training

# !python scripts/10_lora_finetune.py

---
## Evaluation

Evaluate the trained model on various benchmarks.

In [None]:
# Run full evaluation suite
CHECKPOINT_TO_EVAL = "checkpoints/dpo_final"  # Change as needed

!python scripts/11_evaluate.py {CHECKPOINT_TO_EVAL}

In [None]:
# Check promotion gates
# Verify model meets quality thresholds

STAGE_TO_CHECK = "dpo"  # Options: pretrain, sft, dpo

!python scripts/12_check_gates.py {STAGE_TO_CHECK}

---
## Monitoring & Utilities

In [None]:
# Monitor GPU utilization
!nvidia-smi

In [None]:
# List all checkpoints
!bash scripts/checkpoint_manager.sh list

In [None]:
# Show disk usage
!bash scripts/checkpoint_manager.sh disk-usage

In [None]:
# Cleanup old checkpoints (keep latest 3)
# Uncomment to run

# !bash scripts/checkpoint_manager.sh cleanup pretrain 3

---
## Model Inference

Test the trained model with interactive generation.

In [None]:
# Load the trained model for inference
from transformers import AutoModelForCausalLM, AutoTokenizer
import torch

MODEL_PATH = "checkpoints/dpo_final"  # Change to your checkpoint

print(f"Loading model from {MODEL_PATH}...")
tokenizer = AutoTokenizer.from_pretrained("configs/tokenizer")
model = AutoModelForCausalLM.from_pretrained(
    MODEL_PATH,
    torch_dtype=torch.bfloat16,
    device_map="auto"
)
print("Model loaded!")

In [None]:
# Generate text
def generate(prompt, max_new_tokens=256, temperature=0.7):
    inputs = tokenizer(prompt, return_tensors="pt").to(model.device)
    
    with torch.no_grad():
        outputs = model.generate(
            **inputs,
            max_new_tokens=max_new_tokens,
            temperature=temperature,
            do_sample=True,
            top_p=0.9,
            pad_token_id=tokenizer.eos_token_id,
        )
    
    response = tokenizer.decode(outputs[0], skip_special_tokens=True)
    return response

# Test generation
prompt = "Explain machine learning in simple terms:"
print(f"Prompt: {prompt}\n")
print(f"Response: {generate(prompt)}")

In [None]:
# Interactive generation cell
# Modify the prompt and run to test different inputs

PROMPT = "Write a Python function to calculate fibonacci numbers:"

print(f"Prompt: {PROMPT}\n")
print("=" * 50)
print(generate(PROMPT, max_new_tokens=512))

---
## Training Summary

After completing all stages, review the training summary.

In [None]:
# Generate training report
import os
import json

print("=" * 60)
print("TRAINING PIPELINE SUMMARY")
print("=" * 60)

stages = [
    ('Pretrain', 'checkpoints/pretrain_final'),
    ('SFT', 'checkpoints/sft_final'),
    ('DPO', 'checkpoints/dpo_final'),
    ('LoRA', 'checkpoints/lora_final'),
]

print("\nCheckpoint Status:")
for name, path in stages:
    if os.path.exists(path):
        # Get checkpoint size
        size = sum(os.path.getsize(os.path.join(path, f)) for f in os.listdir(path) if os.path.isfile(os.path.join(path, f)))
        size_gb = size / (1024**3)
        print(f"  {name}: COMPLETE ({size_gb:.2f} GB)")
    else:
        print(f"  {name}: Not completed")

print("\nEvaluation Results:")
eval_path = "evals/"
if os.path.exists(eval_path):
    for f in os.listdir(eval_path):
        if f.endswith('.json'):
            with open(os.path.join(eval_path, f)) as file:
                results = json.load(file)
                print(f"  {f}: {results}")
else:
    print("  No evaluation results found. Run evaluation first.")

print("\n" + "=" * 60)