In [None]:
#! pip install evaluate

In [None]:
import tensorflow as tf

In [None]:
#import keras
import math
import numpy as np
import tensorflow as tf
import transformers

from datasets import load_dataset
from evaluate import load
from transformers import DataCollatorWithPadding, TFAutoModelForSequenceClassification, create_optimizer
from transformers.keras_callbacks import KerasMetricCallback
from typing import Any, Dict, List

In [None]:
print(tf.__version__)

In [None]:
import tensorflow as tf

print("Num GPUs Available: ", len(tf.config.list_physical_devices('GPU')))

# LoRA layer

In [None]:
class LoraLayer(tf.keras.layers.Layer):

    def __init__(
        self,
        original_layer,
        rank: int = 8,
        alpha: int = 32,
        dim: int = 768,
        dropout: float = 0.05,
        **kwargs,
    ):
        # We want to keep the name of this layer the same as the original
        # dense layer.
        original_layer_config = original_layer.get_config()
        name = original_layer_config["name"]

        kwargs.pop("name", None)

        super().__init__(name=name, **kwargs)

        self.rank = rank
        self.alpha = alpha
        self._scale = alpha / rank
        self.dim = dim  # dim of DistilBert hidden states.
        self.dropout = dropout

        # Layers.

        # Original dense layer.
        self.original_layer = original_layer
        # No matter whether we are training the model or are in inference mode,
        # this layer should be frozen.
        self.original_layer.trainable = False

        # LoRA dense layers.
        self.A = tf.keras.layers.Dense(
            units=rank,
            use_bias=False,
            # Note: the original paper mentions that normal distribution was
            # used for initialization. However, the official LoRA implementation
            # uses "Kaiming/He Initialization".
            kernel_initializer=tf.keras.initializers.VarianceScaling(scale=math.sqrt(5), mode="fan_in", distribution="uniform"),
            name="lora_A",
        )

        self.B = tf.keras.layers.Dense(
            units=self.dim,
            use_bias=False,
            kernel_initializer="zeros",
            name="lora_B",
        )

        self.dropout_layer = tf.keras.layers.Dropout(self.dropout)

    def call(self, inputs: tf.Tensor) -> tf.Tensor:
        original_output = self.original_layer(inputs)

        x = self.A(inputs)
        x = self.dropout_layer(x)
        lora_output = self.B(x) * self._scale

        return original_output + lora_output


# Applying LoRA layers before training

In [None]:
def apply_lora(
    model,
    rank: int,
    alpha: int,
    target_modules: List[str],
    dropout: float = 0.05
):
    for i in range(model.distilbert.transformer.n_layers):
        for target_module in target_modules:
            parent_layer_name = DISTILBERT_LINEAR_MODULES_DICT[target_module]["parent_layer"]
            parent_layer = getattr(
                model.distilbert.transformer.layer[i],
                parent_layer_name,
            )

            original_target_layer = getattr(parent_layer, target_module)
            original_target_layer_dim = DISTILBERT_LINEAR_MODULES_DICT[target_module]["dim"]

            lora_layer = LoraLayer(
                original_layer=original_target_layer,
                rank=rank,
                alpha=alpha,
                trainable=True,
                dim=original_target_layer_dim,
                dropout=dropout,
            )
            setattr(parent_layer, target_module, lora_layer)

            input_dim = DISTILBERT_LINEAR_MODULES_DICT[target_module]["input_dim"]
            getattr(parent_layer, target_module).A.build(input_dim)
            getattr(parent_layer, target_module).B.build(rank)

    # Set all distilbert linear layers to trainable=False except the LoRA layers
    model.distilbert.embeddings.trainable = False
    for (layer) in (model.distilbert._flatten_layers()):
        lst_of_sublayers = list(layer._flatten_layers())

        if len(lst_of_sublayers) == 1:  # "leaves of the model"
            if layer.name in ["lora_A", "lora_B"]:
                layer.trainable = True
            else:
                layer.trainable = False

    return model



# Merging LoRA layers' weights after training

In [None]:
def merge_lora_weights(
    model,
    rank: int,
    alpha: int,
    target_modules: List[str]
):

    scale = alpha / rank
    for i in range(model.distilbert.transformer.n_layers):
        for target_module in target_modules:
            parent_layer_name = DISTILBERT_LINEAR_MODULES_DICT[target_module]["parent_layer"]
            parent_layer = getattr(
                model.distilbert.transformer.layer[i],
                parent_layer_name,
            )

            target_layer = getattr(parent_layer, target_module)
            target_layer_input_dim = DISTILBERT_LINEAR_MODULES_DICT[target_module]["input_dim"]

            A_layer = getattr(target_layer, "A")
            B_layer = getattr(target_layer, "B")
            original_dense_layer = getattr(target_layer, "original_layer")

            lora_weights = tf.linalg.matmul(A_layer.kernel, B_layer.kernel)
            original_dense_layer_weights = original_dense_layer.kernel

            merged_layer_weights = original_dense_layer_weights + lora_weights * scale
            merged_layer_bias = original_dense_layer.bias

            merged_layer = tf.keras.layers.Dense(
                units=original_dense_layer.units,
                kernel_initializer=tf.constant_initializer(merged_layer_weights.numpy()),
                bias_initializer=tf.constant_initializer(merged_layer_bias.numpy()),
                name=target_module,
            )
            merged_layer.build(target_layer_input_dim)

            setattr(parent_layer, target_module, merged_layer)

    return model

# Training loop on IMDB sentiment analysis dataset

In [None]:
id2label = {0: "NEGATIVE", 1: "POSITIVE"}
label2id = {"NEGATIVE": 0, "POSITIVE": 1}
model_name = "distilbert-base-uncased" # "roberta-base"

model = TFAutoModelForSequenceClassification.from_pretrained(
    model_name, num_labels=2, id2label=id2label, label2id=label2id
)
tokenizer = transformers.AutoTokenizer.from_pretrained(model_name)

In [None]:
def preprocess_function(examples):
    return tokenizer(examples["text"], truncation=True)

def compute_metrics(eval_pred):
    predictions, labels = eval_pred
    predictions = np.argmax(predictions, axis=1)
    return accuracy.compute(predictions=predictions, references=labels)

def prepare_datasets(dataset_name, tokenizer, batch_size):
    dataset = load_dataset(dataset_name)
    tokenized_dataset = dataset.map(preprocess_function, batched=True)
    data_collator = DataCollatorWithPadding(tokenizer=tokenizer, return_tensors="tf")

    tf_train_set = model.prepare_tf_dataset(
        tokenized_dataset["train"],
        shuffle=True,
        batch_size=batch_size,
        collate_fn=data_collator,
    )

    tf_validation_set = model.prepare_tf_dataset(
        tokenized_dataset["test"],
        shuffle=False,
        batch_size=batch_size,
        collate_fn=data_collator,
    )

    return tf_train_set, tf_validation_set

def train(train_dataset, validation_dataset, model, training_parameters, lora_parameters) -> None:

    if lora_parameters:
        print("Model summary before applying LoRA layers:")
        print(model.summary())
        print()
        model = apply_lora(model, **lora_parameters)
        print("Model summary after applying LoRA layers:")
        print(model.summary())
        print()

    optimizer, schedule = create_optimizer(
        init_lr=training_parameters["learning_rate"],
        num_warmup_steps=training_parameters["num_warmup_steps"],
        num_train_steps=training_parameters["total_train_steps"]
    )
    model.compile(optimizer=optimizer)

    metric_callback = KerasMetricCallback(metric_fn=compute_metrics, eval_dataset=validation_dataset)

    model.fit(
        x=train_dataset,
        validation_data=validation_dataset,
        epochs=training_parameters["num_epochs"],
        callbacks=[metric_callback]
    )

    if lora_parameters:
        model = merge_lora_weights(
            model,
            rank=lora_parameters["rank"],
            alpha=lora_parameters["alpha"],
            target_modules=lora_parameters["target_modules"]
        )
        print()
        print("Model summary after merging LoRA weights:")
        print(model.summary())
        print()
        evaluation = model.evaluate(validation_dataset, return_dict=True)
        print("Evaluation after merging weights:", evaluation)

    return model

## Distilbert

In [None]:
batch_size = 16
tf_train_set, tf_validation_set = prepare_datasets("imdb", tokenizer, batch_size)
accuracy = load("accuracy")

In [None]:
num_epochs = 3
batches_per_epoch = len(tf_train_set) // batch_size
total_train_steps = int(batches_per_epoch * num_epochs)

print(f"Total num of train steps: {total_train_steps}")
print(f"Num batchs per epoch: {batches_per_epoch}")
print()

training_parameters = {
    "batch_size": batch_size,
    "num_epochs": num_epochs,
    "batches_per_epoch": batches_per_epoch,
    "total_train_steps": total_train_steps,
    "learning_rate": 2e-5,
    "num_warmup_steps": 0,
}

trained_model = train(
    train_dataset=tf_train_set,
    validation_dataset=tf_validation_set,
    model=model,
    training_parameters=training_parameters,
    lora_parameters=None
)

## Distilbert w/ LoRA

In [None]:
DISTILBERT_LINEAR_MODULES_DICT = {
    "q_lin": {"parent_layer": "attention", "input_dim": 768, "dim": 768},
    "v_lin": {"parent_layer": "attention", "input_dim": 768, "dim": 768},
    "k_lin": {"parent_layer": "attention", "input_dim": 768, "dim": 768},
    "out_lin": {"parent_layer": "attention", "input_dim": 768, "dim": 768},
    "lin1": {"parent_layer": "ffn", "input_dim": 768, "dim": 3072},
    "lin2": {"parent_layer": "ffn", "input_dim": 3072, "dim": 768},
}

LORA_PARAMETERS = {
    "rank": 8,
    "alpha": 8,
    "target_modules": ["q_lin", "v_lin", "k_lin", "out_lin", "lin1", "lin2"],
    "dropout": 0.05,
}

In [None]:
batch_size = 16
tf_train_set, tf_validation_set = prepare_datasets("imdb", tokenizer, batch_size)
accuracy = load("accuracy")

In [None]:
num_epochs = 3
batches_per_epoch = len(tf_train_set) // batch_size
total_train_steps = int(batches_per_epoch * num_epochs)

print(f"Total num of train steps: {total_train_steps}")
print(f"Num batchs per epoch: {batches_per_epoch}")
print()

training_parameters = {
    "batch_size": batch_size,
    "num_epochs": num_epochs,
    "batches_per_epoch": batches_per_epoch,
    "total_train_steps": total_train_steps,
    "learning_rate": 7e-4,
    "num_warmup_steps": total_train_steps // 10,
}

trained_model = train(
    train_dataset=tf_train_set,
    validation_dataset=tf_validation_set,
    model=model,
    training_parameters=training_parameters,
    lora_parameters=LORA_PARAMETERS
)