### Step 1: Setup Ray Federated Learning Environment

In [None]:
import os
import ray
import torch
from ray import train
from ray.train.torch import TorchTrainer
from ray.train.torch import TorchConfig
from transformers import AutoModelForCausalLM, AutoTokenizer
from peft import LoraConfig, PeftModel, get_peft_model

In [None]:
# Initialize Ray
num_gpus = torch.cuda.device_count()
ray.init(
    object_store_memory=2 * 1024 * 1024 * 1024,
    num_cpus=20,
    num_gpus=num_gpus
)
# Configuration for federated learning
fed_config = {
    "base_model": "meta-llama/Llama-3.1-8B-Instruct",
    "num_epochs": 0.5,
    "batch_size": 4,
    "learning_rate": 2e-5,
    "lora_r": 16,
    "lora_alpha": 32,
    "lora_dropout": 0.05,
    "target_modules": ["q_proj", "v_proj", "k_proj", "o_proj"]
}

### Step 2: Define Local Training Function

In [None]:
def train_on_local_documents(config):
    # Get worker-specific device
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    
    # Set up local model with LoRA
    base_model = AutoModelForCausalLM.from_pretrained(
        config["base_model"],
        device_map="auto",
        load_in_4bit=True
    )
    
    # Configure LoRA adapter
    lora_config = LoraConfig(
        r=config["lora_r"],
        lora_alpha=config["lora_alpha"],
        target_modules=config["target_modules"],
        lora_dropout=config["lora_dropout"],
        bias="none",
        task_type="CAUSAL_LM"
    )
    
    # Apply LoRA to model
    model = get_peft_model(base_model, lora_config)
    model = model.to(device)
    
    # Set up tokenizer
    tokenizer = AutoTokenizer.from_pretrained(config["base_model"])
    
    # Get worker ID to identify which partition to use
    worker_id = train.get_context().get_worker_id()
    
    # Process local documents (adapted from existing pipeline)
    local_data = process_local_documents(f"Documents_partition_{worker_id}")
    
    # Convert to dataset
    dataset = convert_to_dataset(local_data)
    
    # Training arguments
    training_args = TrainingArguments(
        output_dir=f"./worker_{worker_id}_output",
        per_device_train_batch_size=config["batch_size"],
        learning_rate=config["learning_rate"],
        num_train_epochs=config["num_epochs"],
        gradient_accumulation_steps=8,
        optim="adamw_torch",
        report_to="none",
    )
    
    # Initialize trainer
    trainer = SFTTrainer(
        model=model,
        train_dataset=dataset,
        args=training_args,
    )
    
    # Train model
    trainer.train()
    
    # Extract LoRA adapter weights only (much smaller than full model)
    adapter_weights = extract_lora_weights(model)
    
    # Report weights back to aggregator
    train.report({"adapter_weights": adapter_weights})

### Step 3: Implement Data Partitioning

In [None]:
def partition_documents(source_dir, num_partitions):
    """Partition documents into separate directories for federated learning"""
    
    all_files = []
    for root, _, files in os.walk(source_dir):
        for file in files:
            file_path = os.path.join(root, file)
            all_files.append(file_path)
    
    # Create partition directories
    for i in range(num_partitions):
        os.makedirs(f"Documents_partition_{i}", exist_ok=True)
    
    # Distribute files across partitions
    for i, file_path in enumerate(all_files):
        partition_idx = i % num_partitions
        dest_path = os.path.join(f"Documents_partition_{partition_idx}", 
                                os.path.basename(file_path))
        shutil.copy(file_path, dest_path)
    
    print(f"Partitioned {len(all_files)} documents into {num_partitions} partitions")

### Step 4: Create Aggregation Logic

In [None]:
def fedavg_aggregate(weights_list):
    """Implement Federated Averaging (FedAvg) algorithm"""
    
    # Initialize aggregated weights with the first model's weights
    aggregated_weights = {k: torch.zeros_like(v) for k, v in weights_list[0].items()}
    
    # Sum all weights
    for weights in weights_list:
        for key in aggregated_weights:
            aggregated_weights[key] += weights[key]
    
    # Average the weights
    num_models = len(weights_list)
    for key in aggregated_weights:
        aggregated_weights[key] /= num_models
    
    return aggregated_weights

In [None]:
def apply_aggregated_weights(base_model, lora_config, aggregated_weights):
    """Apply aggregated weights to a fresh model"""
    
    model = get_peft_model(base_model, lora_config)
    
    # Load the aggregated weights
    with torch.no_grad():
        for name, param in model.named_parameters():
            if name in aggregated_weights:
                param.copy_(aggregated_weights[name])
    
    return model

### Step 5: Orchestrate the Federated Training

In [None]:
def run_federated_training():
    # 1. Partition the data
    partition_documents("Documents_semi_structured", num_gpus)
    
    # 2. Setup the trainer
    trainer = TorchTrainer(
        train_on_local_documents,
        train_loop_config=fed_config,
        scaling_config=train.ScalingConfig(
            num_workers=num_gpus,
            use_gpu=True,
        ),
        torch_config=TorchConfig(backend="nccl")
    )
    
    # 3. Run federated training
    results = trainer.fit()
    
    # 4. Extract all worker model weights
    worker_weights = [result["adapter_weights"] for result in results.metrics_dataframe.to_dict('records')]
    
    # 5. Aggregate weights using FedAvg
    aggregated_weights = fedavg_aggregate(worker_weights)
    
    # 6. Create and save the final model
    base_model = AutoModelForCausalLM.from_pretrained(
        fed_config["base_model"],
        device_map="auto",
        load_in_4bit=True
    )
    
    lora_config = LoraConfig(
        r=fed_config["lora_r"],
        lora_alpha=fed_config["lora_alpha"],
        target_modules=fed_config["target_modules"],
        lora_dropout=fed_config["lora_dropout"],
        bias="none",
        task_type="CAUSAL_LM"
    )
    
    final_model = apply_aggregated_weights(base_model, lora_config, aggregated_weights)
    
    # 7. Save the final model
    save_path = "./federated_model"
    final_model.save_pretrained(save_path)
    
    return save_path

### Step 6: Federated Evaluation

In [None]:
def federated_evaluation(model_path):
    """Run distributed evaluation on the federated model"""
    
    @ray.remote(num_gpus=1)
    def evaluate_on_partition(partition_id, model_path):
        # Load model
        model = PeftModel.from_pretrained(
            AutoModelForCausalLM.from_pretrained(
                fed_config["base_model"],
                device_map="auto",
                load_in_4bit=True
            ),
            model_path
        )
        
        # Load eval datasets for this partition
        eval_datasets = load_evaluation_datasets(f"eval_partition_{partition_id}")
        
        # Run evaluation
        metrics = {}
        for dataset_name, dataset in eval_datasets.items():
            # Implement evaluation logic similar to existing code
            dataset_metrics = evaluate_dataset(model, dataset)
            metrics[dataset_name] = dataset_metrics
        
        return metrics
    
    # Distribute evaluation across GPUs
    futures = [evaluate_on_partition.remote(i, model_path) for i in range(num_gpus)]
    results = ray.get(futures)
    
    # Combine results
    combined_metrics = {}
    for result in results:
        for dataset_name, metrics in result.items():
            if dataset_name not in combined_metrics:
                combined_metrics[dataset_name] = []
            combined_metrics[dataset_name].append(metrics)
    
    # Average metrics across partitions
    final_metrics = {}
    for dataset_name, metrics_list in combined_metrics.items():
        final_metrics[dataset_name] = {
            metric: sum(m[metric] for m in metrics_list) / len(metrics_list)
            for metric in metrics_list[0]
        }
    
    return final_metrics