# Federated Fine-Tuning of a Hugging Face Model Using OpenFL


In this tutorial, we demonstrate how to fine-tune a Hugging Face Transformers model i.e BERT in a federated learning workflow.

We will fine-tune Hugging Face Transformers model using a diverse dataset such as [Math_10k](https://github.com/AGI-Edgerunners/LLM-Adapters/tree/main), an open-source dataset containing mathematical question-answer pairs collected from various smaller math datasets.

## The Workflow Interface

The workflow interface is an innovative approach to designing federated learning experiments with OpenFL. It was developed in response to discussions with researchers and users who had unique use cases that didn’t perfectly align with the traditional horizontal federated learning model. This interface enables more flexible compositions of experiments, allowing for greater customization and adaptability in complex, real-world scenarios

## Installing OpenFL
To install OpenFL, follow the official documentation: 
[OpenFL Installation Guide](https://openfl.readthedocs.io/en/latest/installation.html)

After installation, activate experimental APIs using:   
`fx experimental activate`

In [7]:
# Install dependencies 
!pip install torch transformers peft datasets trl==0.12.2 -q

## Import libraries

In [None]:
import hashlib
import os

import numpy as np
import requests
import torch
import transformers
from datasets import load_dataset
from peft import LoraConfig, get_peft_model
from peft.utils import get_peft_model_state_dict, set_peft_model_state_dict
from transformers import AutoModelForCausalLM, AutoTokenizer, TrainingArguments, Trainer, DataCollatorWithPadding 
from transformers.trainer_callback import PrinterCallback
from openfl.experimental.workflow.interface import Aggregator, Collaborator, FLSpec
from openfl.experimental.workflow.placement import aggregator, collaborator
from openfl.experimental.workflow.runtime import LocalRuntime

## Acquiring and preprocessing dataset

We can download the dataset directly from the [LLM-Adapters repository](https://github.com/AGI-Edgerunners/LLM-Adapters)

In [None]:
def file_checksum(file_path, algorithm="sha256"):
    """
    Calculate the checksum of a file using the specified hashing algorithm.

    Parameters:
    file_path (str): The path to the file for which the checksum is to be calculated.
    algorithm (str): The hashing algorithm to use (default is 'sha256').

    Returns:
    str: The calculated checksum of the file.
    """
    hash_func = hashlib.new(algorithm)
    with open(file_path, "rb") as f:
        for chunk in iter(lambda: f.read(4096), b""):
            hash_func.update(chunk)
    return hash_func.hexdigest()


if not os.path.exists("math_10k.json"):
    r = requests.get(
        "https://raw.githubusercontent.com/AGI-Edgerunners/LLM-Adapters/main/ft-training_set/math_10k.json",
    )
    with open(
        "math_10k.json",
        "wb",
    ) as f:
        f.write(r.content)

    actual_checksum = file_checksum("math_10k.json")
    if (
        actual_checksum
        != "0342d0d860ad8592b579329337c90e42eefd3d9f2898043140cbd120630418b8"
    ):
        raise ValueError(
            "Checksum verification failed. The file may have been altered."
        )

raw_dataset = load_dataset("json", data_files="math_10k.json")

## Initialize arguments and configurations

In [10]:
training_config = {
    "bf16": True,
    "use_cpu": True,
    "do_eval": False,
    "learning_rate": 5.0e-06,
    "log_level": "info",
    "logging_steps": 20,
    "lr_scheduler_type": "cosine",
    "num_train_epochs": 1,
    "output_dir": "./checkpoint_dir",
    "overwrite_output_dir": True,
    "per_device_eval_batch_size": 1,
    "per_device_train_batch_size": 1,
    "save_steps": 100,
    "save_total_limit": 1,
    "seed": 0,
    "gradient_checkpointing": True,
    "gradient_checkpointing_kwargs": {"use_reentrant": False},
    "warmup_ratio": 0.2,
}

peft_config = {
    "r": 8,
    "lora_alpha": 16,
    "lora_dropout": 0.1,
    "bias": "none",
    "task_type": "CAUSAL_LM",  # Sequence classification task
    "target_modules": ["query", "value"],    # Target modules for LoRA
}
model_kwargs = dict(
    use_cache=False,
    trust_remote_code=True,
    torch_dtype=torch.bfloat16,
    device_map=None,
)
train_conf = TrainingArguments(**training_config)
peft_conf = LoraConfig(**peft_config)

## Load and initialize model

In [None]:
checkpoint_path = "bert-base-uncased"
model = AutoModelForCausalLM.from_pretrained(
    checkpoint_path, return_dict=True, num_labels=2, **model_kwargs
)
model = get_peft_model(model, peft_conf)

tokenizer = AutoTokenizer.from_pretrained(checkpoint_path)
sequence_max_length = 512
val_set_size = 2000
tokenizer.pad_token_id = 0  # we want this to be different from the eos token
tokenizer.padding_side = "left"  # Allow batched inference

## Preprocess dataset

In [None]:
# Define data collator
data_collator = DataCollatorWithPadding(tokenizer)

def generate_prompt(data_point):
    """
    Generate a prompt based on the given data point.

    Parameters:
    data_point (dict): A dictionary containing the instruction, input, and output.

    Returns:
    str: The generated prompt as a string.
    """
    if data_point["input"]:
        return f"""Below is an instruction that describes a task, paired with an input that provides further context. Write a response that appropriately completes the request. 

                ### Instruction:
                {data_point["instruction"]}
                
                ### Input:
                {data_point["input"]}
                
                ### Response:
                {data_point["output"]}"""
    else:
        return f"""Below is an instruction that describes a task. Write a response that appropriately completes the request.  

                ### Instruction:
                {data_point["instruction"]}
                
                ### Response:
                {data_point["output"]}"""


def tokenize(data_point, add_eos_token=True):
    """
    Tokenize the given data point and set the correct labels for sequence classification.
    """
    full_prompt = generate_prompt(data_point)
    result = tokenizer(
        full_prompt,
        truncation=True,
        max_length=sequence_max_length,
        padding="max_length",        
    )
    result["labels"] = result["input_ids"].copy()
    return result

def generate_and_tokenize_prompt(data_point):
    """
    Generate and tokenize a prompt based on the given data point.
    """
    return tokenize(data_point)


# Split the dataset into training and validation sets
train_val = raw_dataset["train"].train_test_split(
    test_size=val_set_size, shuffle=True, seed=42
)

# Process the datasets
processed_train_dataset = train_val["train"].shuffle().map(generate_and_tokenize_prompt).select(range(10))
processed_test_dataset = train_val["test"].shuffle().map(generate_and_tokenize_prompt).select(range(10))

## Define Federated Averaging Method
The FedAvg method is used to average the models from all the collaborators after training.

In [21]:
def FedAvg(peft_params, model, weights=None):
    """
    Perform Federated Averaging (FedAvg) on the model parameters.

    Parameters:
    peft_params (list): A list of state dictionaries containing the model parameters from different clients.
    model (torch.nn.Module): The model to which the averaged parameters will be applied.
    weights (list, optional): A list of weights for averaging the parameters. If None, equal weights are used.

    Returns:
    torch.nn.Module: The model with the averaged parameters applied.
    """
    state_dicts = peft_params
    state_dict = get_peft_model_state_dict(model)
    for key in peft_params[0]:
        dtype = state_dicts[0][key].dtype
        state_dict[key] = torch.from_numpy(
            np.average(
                [state[key].to(torch.float).numpy() for state in state_dicts], axis=0, weights=weights
            )
        ).to(dtype)
    set_peft_model_state_dict(model, state_dict)
    return model

Now we come to the flow definition. The OpenFL Workflow Interface adopts the conventions set by Metaflow, that every workflow begins with `start` and concludes with the `end` task. The aggregator begins with an optionally passed in model and optimizer. The aggregator begins the flow with the `start` task, where the list of collaborators is extracted from the runtime (`self.collaborators = self.runtime.collaborators`) and is then used as the list of participants to run the task listed in `self.next`, `aggregated_model_validation`. The model, optimizer, and anything that is not explicitly excluded from the next function will be passed from the `start` function on the aggregator to the `aggregated_model_validation` task on the collaborator. Where the tasks run is determined by the placement decorator that precedes each task definition (`@aggregator` or `@collaborator`). Once each of the collaborators (defined in the runtime) complete the `aggregated_model_validation` task, they pass their current state onto the `train` task, from `train` to `local_model_validation`, and then finally to `join` at the aggregator. It is in `join` that an average is taken of the model weights, and the next round can begin.

![Workflow Interface](../../../../docs/images/workflow_interface.png)

In [22]:
class FederatedFlow(FLSpec):
    def __init__(self, model=None, optimizer=None, rounds=3, **kwargs):
        """
        Initialize the class with the given model, optimizer, and number of rounds.

        Parameters:
        model (torch.nn.Module, optional): The model to be used. If None, a ValueError is raised.
        optimizer (torch.optim.Optimizer, optional): The optimizer to be used.
        rounds (int, optional): The number of rounds for training or processing (default is 3).
        **kwargs: Additional keyword arguments to be passed to the superclass initializer.

        Raises:
        ValueError: If no model is provided.
        """
        super().__init__(**kwargs)
        if model is not None:
            self.model = model
            self.peft_params = get_peft_model_state_dict(self.model)
            self.optimizer = optimizer
        else:
            raise ValueError("No model inputted")

        self.rounds = rounds
        

    @aggregator
    def start(self):
        """
        Initialize the model and set up the collaborators for federated learning.

        This method performs the initial setup for the model, including setting the
        collaborators, initializing private variables, and starting the first round
        of the federated learning process.
        """
        print(f"Performing initialization for model")
        self.collaborators = self.runtime.collaborators
        self.current_round = 0
        self.next(
            self.aggregated_model_validation,
            foreach="collaborators",
        )

    
    @collaborator
    def aggregated_model_validation(self):
        """
        Perform aggregated model validation for a collaborator.

        This method loads the model, applies the PEFT configuration, and evaluates
        the model using the provided training and evaluation datasets. The validation
        score is then stored and the next step in the process is triggered.
        """
        print(f"Performing aggregated model validation for collaborator {self.input}")
        self.model = AutoModelForCausalLM.from_pretrained(
            checkpoint_path, return_dict=True, num_labels=2, **model_kwargs
        )
        self.model = get_peft_model(self.model, peft_conf)
        set_peft_model_state_dict(self.model, self.peft_params)
        trainer = Trainer(
            model=self.model,
            args=train_conf,
            train_dataset=self.train_dataset,
            eval_dataset=self.eval_dataset,
            tokenizer=tokenizer,
            data_collator=data_collator,
        )
        trainer.remove_callback(PrinterCallback)
        out = trainer.evaluate()
        self.agg_validation_score = out["eval_loss"]
        print(f"{self.input} value of {self.agg_validation_score}")
        self.next(self.train)

    @collaborator
    def train(self):
        """
        Train the model for a collaborator.

        This method trains the model using the provided training and evaluation datasets.
        The training loss is stored, the model is saved, and the next step in the process
        is triggered.
        """
        trainer = Trainer(
            model=self.model,
            args=train_conf,
            train_dataset=self.train_dataset,
            eval_dataset=self.eval_dataset,
            tokenizer=tokenizer,
            data_collator=data_collator,
        )

        out = trainer.train()
        self.loss = out.training_loss
        trainer.save_model()
        self.training_completed = True
        self.next(self.local_model_validation)

    @collaborator
    def local_model_validation(self):
        """
        Perform local model validation for a collaborator.

        This method evaluates the model using the provided training and evaluation datasets.
        The validation score is stored, the PEFT parameters are updated, and the next step
        in the process is triggered.
        """
        trainer = Trainer(
            model=self.model,
            args=train_conf,
            train_dataset=self.train_dataset,  # Use collaborator-specific dataset
            eval_dataset=self.eval_dataset, 
            tokenizer=tokenizer,
            data_collator=data_collator,
        )
        out = trainer.evaluate()
        self.local_validation_score = out["eval_loss"]
        self.peft_params = get_peft_model_state_dict(self.model)
        print(f"Doing local model validation for collaborator {self.input}")
        self.next(self.join, exclude=["training_completed", "model"])

    @aggregator
    def join(self, inputs):
        """
        Aggregate the results from all collaborators and update the model.

        This method calculates the average loss, aggregated model accuracy, and local model
        accuracy from all collaborators. The model parameters are updated using Federated
        Averaging (FedAvg), and the next round of the process is triggered if applicable.
        """
        self.average_loss = sum(input.loss for input in inputs) / len(inputs)
        self.aggregated_model_accuracy = sum(
            input.agg_validation_score for input in inputs
        ) / len(inputs)
        self.local_model_accuracy = sum(
            input.local_validation_score for input in inputs
        ) / len(inputs)
        print(
            f"Average aggregated model validation values = {self.aggregated_model_accuracy}"
        )
        print(f"Average training loss = {self.average_loss}")
        print(f"Average local model validation values = {self.local_model_accuracy}")

        self.model = FedAvg([input.peft_params for input in inputs], self.model)
        self.peft_params = get_peft_model_state_dict(self.model)

        self.model.save_pretrained("./aggregated/model")
        tokenizer.save_pretrained("./aggregated/tokenizer")
        self.current_round += 1
        if self.current_round < self.rounds:
            self.next(
                self.aggregated_model_validation,
                foreach="collaborators",
                exclude=["model"],
            )
        else:
            self.next(self.end)

    @aggregator
    def end(self):
        """
        End the federated learning process.

        This method marks the end of the federated learning process and performs any
        necessary cleanup or finalization steps.
        """
        print(f"This is the end of the flow")


Aggregator step "start" registered
Collaborator step "aggregated_model_validation" registered
Collaborator step "train" registered
Collaborator step "local_model_validation" registered
Aggregator step "join" registered
Aggregator step "end" registered


You'll notice in the `FederatedFlow` definition above that there were certain attributes that the flow was not initialized with, namely the `train_dataset` and `eval_dataset` for each of the collaborators. These are **private_attributes** that are exposed only through the runtime. Each participant has its own set of private attributes: a dictionary where the key is the attribute name, and the value is the object that will be made accessible through that participant's task.

Below, we segment shards of the Math_10k dataset for **two collaborators**: Portland and Seattle. Each has their own slice of the dataset that's accessible via the `train_dataset` or `eval_dataset` attribute. Note that the private attributes are flexible, and you can choose to pass in a completely different type of object to any of the collaborators or aggregator (with an arbitrary name). These private attributes will always be filtered out of the current state when transfering from collaborator to aggregator, or vice versa.

In [None]:
# Setup participants
_aggregator = Aggregator()
_aggregator.private_attributes = {}

# Setup collaborators with private attributes
collaborator_names = [
    "Portland",
    "Seattle",
]
_collaborators = [Collaborator(name=name) for name in collaborator_names]

for idx, current_collaborator in enumerate(_collaborators):
    # Set the private attributes of the Collaborator to include their specific training and testing data loaders
    current_collaborator.private_attributes = {
        "train_dataset": processed_train_dataset.shard(
            num_shards=len(_collaborators), index=idx
        ),
        "eval_dataset": processed_test_dataset.shard(
            num_shards=len(_collaborators), index=idx
        ),
    }

local_runtime = LocalRuntime(
    aggregator=_aggregator, collaborators=_collaborators, backend="single_process"
)
print(f"Local runtime collaborators = {local_runtime.collaborators}")

## Run Experiment

In [None]:
flflow = FederatedFlow(model, rounds=2)
flflow.runtime = local_runtime
flflow.run()

# Determine the final model accuracy:
print(f'\nFinal aggregated model accuracy for {flflow.rounds} rounds of training: {flflow.aggregated_model_accuracy}')

## 🎉 Congratulations! 🎉

Now that you've completed this notebook, check out our [other tutorials](https://github.com/securefederatedai/openfl/tree/develop/openfl-tutorials/experimental/)

- Using the LocalRuntime Ray Backend for dedicated GPU access
- Vertical Federated Learning
- Model Watermarking
- Differential Privacy
- And More!