<a href="https://colab.research.google.com/github/saravananpsg/nlp/blob/master/ray_distributed_training.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

Collecting ray
  Downloading ray-2.43.0-cp311-cp311-manylinux2014_x86_64.whl.metadata (19 kB)
Downloading ray-2.43.0-cp311-cp311-manylinux2014_x86_64.whl (67.7 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m67.7/67.7 MB[0m [31m33.9 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: ray
Successfully installed ray-2.43.0


In [9]:
import wandb
wandb.Api()

[34m[1mwandb[0m: Using wandb-core as the SDK backend.  Please refer to https://wandb.me/wandb-core for more information.


<IPython.core.display.Javascript object>

[34m[1mwandb[0m: Logging into wandb.ai. (Learn how to deploy a W&B server locally: https://wandb.me/wandb-server)
[34m[1mwandb[0m: You can find your API key in your browser here: https://wandb.ai/authorize
wandb: Paste an API key from your profile and hit enter:

 ··········


[34m[1mwandb[0m: No netrc file found, creating one.
[34m[1mwandb[0m: Appending key for api.wandb.ai to your netrc file: /root/.netrc
[34m[1mwandb[0m: Currently logged in as: [33msarapsg[0m to [32mhttps://api.wandb.ai[0m. Use [1m`wandb login --relogin`[0m to force relogin


<wandb.apis.public.api.Api at 0x7cc239c705d0>

In [28]:
import torch
import subprocess
import socket
import time
import ray
import wandb
from datasets import load_dataset, DatasetDict
from transformers import (AutoModelForCausalLM, AutoTokenizer, pipeline, TrainingArguments,
                          DataCollatorForLanguageModeling)
from peft import LoraConfig, get_peft_model, PeftModel
from trl import SFTTrainer

# Model and Paths
MODEL_NAME = "meta-llama/Llama-2-7b-hf"  # Replace with your model
LORA_SAVE_PATH = "./lora_adapter"
MERGED_MODEL_PATH = "./merged_llm"

# ==============================
# 1️⃣ Start Ray Cluster
# ==============================

def get_local_ip():
    """Get the local IP address of the machine."""
    return socket.gethostbyname(socket.gethostname())

def start_ray_cluster():
    """Start Ray head node or connect worker nodes dynamically."""
    local_ip = get_local_ip()
    try:
        # Try starting Ray as head node
        print("Starting Ray Head Node...")
        subprocess.run(["ray", "start", "--head"], check=True)
        print("Ray Head Node started successfully!")
    except subprocess.CalledProcessError:
        print("Ray Head Node is already running, assuming this is a worker node.")

        # Connect worker node to head node
        head_node_ip = local_ip  # You can pass this as an environment variable
        print(f"Connecting to Ray Head Node at {head_node_ip}...")
        subprocess.run(["ray", "start", f"--address={head_node_ip}:6379"], check=True)

# Start Ray Cluster
start_ray_cluster()

# Initialize Ray
ray.init(ignore_reinit_error=True)

# ==============================
# 2️⃣ Load IMDb Dataset
# ==============================

def load_and_preprocess_data():
    """Load and preprocess IMDb dataset for causal LM training."""
    dataset = load_dataset("imdb")

    # Combine text and label into a single format
    def format_text(example):
        return {"text": f"Review: {example['text']}\nSentiment: {'positive' if example['label'] == 1 else 'negative'}"}

    dataset = dataset.map(format_text, batched=False)

    # Train/Test split (keeping only text column)
    dataset = DatasetDict({
        "train": dataset["train"].remove_columns(["label"]),
        "test": dataset["test"].remove_columns(["label"]),
    })

    return dataset

# ==============================
# 3️⃣ LoRA Training Function
# ==============================

@ray.remote(num_gpus=1)
def train_lora():
    wandb.init(project="IMDb_LoRA", entity="sarapsg")  # Replace with your WandB details

    # Load dataset
    dataset = load_and_preprocess_data()

    # Load tokenizer and model
    tokenizer = AutoTokenizer.from_pretrained(MODEL_NAME)
    tokenizer.pad_token = tokenizer.eos_token
    model = AutoModelForCausalLM.from_pretrained(MODEL_NAME, load_in_8bit=True, device_map="auto")

    # LoRA Configuration
    lora_config = LoraConfig(
        r=16, lora_alpha=32,
        lora_dropout=0.1, bias="none"
    )
    model = get_peft_model(model, lora_config)

    # Define training arguments
    training_args = TrainingArguments(
        output_dir="./results",
        per_device_train_batch_size=4,
        gradient_accumulation_steps=2,
        num_train_epochs=1,
        learning_rate=2e-4,
        fp16=True,
        logging_dir="./logs",
        remove_unused_columns=False,
    )

    # Tokenize the dataset
    def tokenize_function(examples):
        return tokenizer(examples["text"], padding="max_length", truncation=True, max_length=256)

    tokenized_datasets = dataset.map(tokenize_function, batched=True)

    # Data collator
    data_collator = DataCollatorForLanguageModeling(tokenizer=tokenizer, mlm=False)

    trainer = SFTTrainer(
        model=model,
        args=training_args,
        train_dataset=tokenized_datasets["train"],
        tokenizer=tokenizer,
        data_collator=data_collator
    )

    # Train the model
    trainer.train()

    # Save LoRA adapters
    model.save_pretrained(LORA_SAVE_PATH)
    tokenizer.save_pretrained(LORA_SAVE_PATH)

    # Extract final loss
    final_loss = trainer.state.log_history[-1]['loss'] if trainer.state.log_history else None

    return final_loss

# Run LoRA Training on Multiple GPUs
losses = ray.get([train_lora.remote() for _ in range(2)])  # Adjust workers as needed
print("Final Training Losses from Parallel Runs:", losses)

# ==============================
# 4️⃣ Merge LoRA Weights
# ==============================

@ray.remote(num_gpus=1)
def merge_lora():
    # Load base model
    base_model = AutoModelForCausalLM.from_pretrained(MODEL_NAME, device_map="auto")

    # Load trained LoRA adapter
    lora_model = PeftModel.from_pretrained(base_model, LORA_SAVE_PATH)

    # Merge LoRA weights into base model
    merged_model = lora_model.merge_and_unload()

    # Save merged model
    merged_model.save_pretrained(MERGED_MODEL_PATH)
    tokenizer = AutoTokenizer.from_pretrained(MODEL_NAME)
    tokenizer.save_pretrained(MERGED_MODEL_PATH)

    print(f"Merged model saved at {MERGED_MODEL_PATH}")

# Run Merging on Distributed GPUs
ray.get(merge_lora.remote())

# ==============================
# 5️⃣ Inference with Merged Model
# ==============================

def run_inference():
    print("Loading merged model for inference...")
    merged_pipeline = pipeline("text-generation", model=MERGED_MODEL_PATH)

    # Generate text
    output = merged_pipeline("The movie was", max_length=50)
    print("Generated Text:", output)

run_inference()


Starting Ray Head Node...


[36m(train_lora pid=21158)[0m  12%|█▏        | 390/3125 [04:45<33:18,  1.37it/s]
[36m(train_lora pid=21158)[0m  12%|█▏        | 390/3125 [04:45<33:18,  1.37it/s]
[36m(train_lora pid=21158)[0m  12%|█▏        | 390/3125 [04:45<33:18,  1.37it/s]
[36m(train_lora pid=21158)[0m  12%|█▏        | 390/3125 [04:45<33:18,  1.37it/s]
[36m(train_lora pid=21158)[0m  12%|█▏        | 390/3125 [04:45<33:18,  1.37it/s]
[36m(train_lora pid=21158)[0m  12%|█▏        | 390/3125 [04:45<33:18,  1.37it/s]
[36m(train_lora pid=21158)[0m  12%|█▏        | 390/3125 [04:45<33:18,  1.37it/s]
[36m(train_lora pid=21158)[0m  12%|█▏        | 390/3125 [04:45<33:18,  1.37it/s]
[36m(train_lora pid=21158)[0m  12%|█▏        | 390/3125 [04:45<33:18,  1.37it/s]
[36m(train_lora pid=21158)[0m  12%|█▏        | 390/3125 [04:45<33:18,  1.37it/s]
[36m(train_lora pid=21158)[0m  12%|█▏        | 390/3125 [04:45<33:18,  1.37it/s]
[36m(train_lora pid=21158)[0m  12%|█▏        | 390/3125 [04:45<33:18,  1.

Ray Head Node is already running, assuming this is a worker node.
Connecting to Ray Head Node at 172.28.0.12...


[36m(train_lora pid=21158)[0m  13%|█▎        | 391/3125 [04:46<33:18,  1.37it/s]
[36m(train_lora pid=21158)[0m  13%|█▎        | 391/3125 [04:46<33:18,  1.37it/s]
[36m(train_lora pid=21158)[0m  13%|█▎        | 391/3125 [04:46<33:18,  1.37it/s]
[36m(train_lora pid=21158)[0m  13%|█▎        | 391/3125 [04:46<33:18,  1.37it/s]
[36m(train_lora pid=21158)[0m  13%|█▎        | 391/3125 [04:46<33:18,  1.37it/s]
[36m(train_lora pid=21158)[0m  13%|█▎        | 391/3125 [04:46<33:18,  1.37it/s]
[36m(train_lora pid=21158)[0m  13%|█▎        | 391/3125 [04:46<33:18,  1.37it/s]
[36m(train_lora pid=21158)[0m  13%|█▎        | 391/3125 [04:46<33:18,  1.37it/s]
[36m(train_lora pid=21158)[0m  13%|█▎        | 391/3125 [04:46<33:18,  1.37it/s]
[36m(train_lora pid=21158)[0m  13%|█▎        | 391/3125 [04:46<33:18,  1.37it/s]
[36m(train_lora pid=21158)[0m  13%|█▎        | 391/3125 [04:46<33:18,  1.37it/s]
[36m(train_lora pid=21158)[0m  13%|█▎        | 391/3125 [04:46<33:18,  1.

[36m(train_lora pid=17751)[0m {'train_runtime': 2.6691, 'train_samples_per_second': 1.124, 'train_steps_per_second': 1.124, 'train_loss': 4.042674700419108, 'mean_token_accuracy': 0.2777777810891469, 'epoch': 1.0}
[36m(merge_lora pid=17750)[0m Merged model saved at ./merged_llm


[36m(train_lora pid=21158)[0m  13%|█▎        | 393/3125 [04:48<37:57,  1.20it/s]
[36m(train_lora pid=21158)[0m  13%|█▎        | 393/3125 [04:48<37:57,  1.20it/s]
[36m(train_lora pid=21158)[0m  13%|█▎        | 393/3125 [04:48<37:57,  1.20it/s]
[36m(train_lora pid=21158)[0m  13%|█▎        | 393/3125 [04:48<37:57,  1.20it/s]
[36m(train_lora pid=21158)[0m  13%|█▎        | 393/3125 [04:48<37:57,  1.20it/s]
[36m(train_lora pid=21158)[0m  13%|█▎        | 393/3125 [04:48<37:57,  1.20it/s]
[36m(train_lora pid=21158)[0m  13%|█▎        | 393/3125 [04:48<37:57,  1.20it/s]
[36m(train_lora pid=21158)[0m  13%|█▎        | 393/3125 [04:48<37:57,  1.20it/s]
[36m(train_lora pid=21158)[0m  13%|█▎        | 393/3125 [04:48<37:57,  1.20it/s]
[36m(train_lora pid=21158)[0m  13%|█▎        | 393/3125 [04:48<37:57,  1.20it/s]
[36m(train_lora pid=21158)[0m  13%|█▎        | 393/3125 [04:48<37:57,  1.20it/s]
[36m(train_lora pid=21158)[0m  13%|█▎        | 393/3125 [04:48<37:57,  1.

RayTaskError(ValueError): [36mray::train_lora()[39m (pid=22919, ip=172.28.0.12)
  File "<ipython-input-28-8fa0fd3224c3>", line 84, in train_lora
  File "/usr/local/lib/python3.11/dist-packages/transformers/models/auto/auto_factory.py", line 564, in from_pretrained
    return model_class.from_pretrained(
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/dist-packages/transformers/modeling_utils.py", line 4188, in from_pretrained
    hf_quantizer.validate_environment(device_map=device_map)
  File "/usr/local/lib/python3.11/dist-packages/transformers/quantizers/quantizer_bnb_8bit.py", line 101, in validate_environment
    raise ValueError(
ValueError: Some modules are dispatched on the CPU or the disk. Make sure you have enough GPU RAM to fit the quantized model. If you want to dispatch the model on the CPU or the disk while keeping these modules in 32-bit, you need to set `llm_int8_enable_fp32_cpu_offload=True` and pass a custom `device_map` to `from_pretrained`. Check https://huggingface.co/docs/transformers/main/en/main_classes/quantization#offload-between-cpu-and-gpu for more details.

Map: 100%|█████████▉| 49849/50000 [00:03<00:00, 16471.84 examples/s]
Map: 100%|█████████▉| 49849/50000 [00:03<00:00, 16471.84 examples/s]
Map: 100%|█████████▉| 49849/50000 [00:03<00:00, 16471.84 examples/s]
Map: 100%|█████████▉| 49849/50000 [00:03<00:00, 16471.84 examples/s]
Map: 100%|█████████▉| 49849/50000 [00:03<00:00, 16471.84 examples/s]
Map: 100%|█████████▉| 49849/50000 [00:03<00:00, 16471.84 examples/s]