In [None]:
%%capture
%pip install accelerate peft bitsandbytes transformers trl

In [None]:
import csv
from typing import List, Dict
import os
import torch
from datasets import load_dataset, Dataset
from transformers import (
    AutoModelForCausalLM,
    AutoTokenizer,
    BitsAndBytesConfig,
    TrainingArguments,
    pipeline,
    logging,
)
from peft import LoraConfig
from trl import SFTTrainer

In [None]:
base_model = "meta-llama/Llama-2-7b-chat-hf"
new_model = "RickSanchez-7b-q"

In [None]:
class Message:
    """
    A class to represent a message in a dialogue.

    Attributes:
    role (str): The role of the speaker in the dialogue (e.g., 'user', 'assistant').
    content (str): The actual text content of the message.
    """
    def __init__(self, role: str, content: str):
        """
        Initializes a new instance of the Message class.

        Parameters:
        role (str): The role of the speaker in the dialogue.
        content (str): The text content of the message.
        """
        self.role = role  # Assign the role to the instance variable
        self.content = content  # Assign the content to the instance variable

    def to_dict(self) -> Dict:
        """
        Converts the Message instance into a dictionary format.

        Returns:
        Dict: A dictionary with 'role' and 'content' as keys.
        """
        # Return the message as a dictionary with 'role' and 'content' keys
        return {"role": self.role, "content": self.content}


def format_dialogue_for_chat(line1: str, line2: str) -> List[Message]:
    """
    Formats a pair of lines as a dialogue for chat completion.

    This function takes two lines of dialogue, assigns roles to them ('user' and 'assistant'),
    and creates Message objects for each line. It's designed to format dialogues where
    the first line is from a user and the second line is a system (like Rick's) response.

    Parameters:
    line1 (str): The text of the first line in the dialogue.
    line2 (str): The text of the second line in the dialogue.

    Returns:
    List[Message]: A list of two Message objects representing the dialogue.
    """
    # Create a Message object for the user's line
    user_message = Message("user", line1)
    # Create a Message object for the assistant's (Rick's) response
    assistant_message = Message("assistant", line2)

    # Return the two messages as a list
    return [user_message, assistant_message]


In [None]:
def process_csv(csv_file_path: str) -> List[List[Message]]:
    """
    Process a CSV file to extract dialogues.

    This function reads a CSV file containing script lines, and extracts dialogues where one
    of the lines is spoken by Rick. It assumes a specific format of the CSV where the speaker's
    name is in the fifth column and the dialogue text is in the sixth column.

    Parameters:
    csv_file_path (str): The file path to the CSV file containing the script data.

    Returns:
    List[List[Message]]: A list of dialogues, each dialogue is a list containing two Message objects.
                         The first Message is a line by any character (not Rick), and the second Message
                         is a response by Rick.
    """

    # Initialize an empty list to store dialogues
    dialogs = []

    # Open the CSV file for reading
    with open(csv_file_path, mode='r', encoding='utf-8') as file:
        reader = csv.reader(file)

        # Skip the header row
        next(reader)

        # Initialize a variable to keep track of the previous line
        prev_line = None

        # Iterate over each row in the CSV
        for row in reader:
            # If it's the first iteration, set prev_line and skip to the next row
            if prev_line is None:
                prev_line = row
                continue

            # Check if the current line is spoken by Rick
            if row[4] == "Rick":
                # If it is, format the previous line and the current line as a dialogue
                dialogs.append(format_dialogue_for_chat(prev_line[5], row[5]))

            # Update prev_line with the current row for the next iteration
            prev_line = row

    # Return the list of extracted dialogues
    return dialogs

In [None]:
# Path to the CSV file containing the Rick and Morty script data
csv_file_path = 'RickAndMortyScripts.csv'

# Call the function process_csv with the CSV file path. This function will read the CSV file,
# extract dialogues involving Rick, and return them as a list of Message objects.
dialogs = process_csv(csv_file_path)

In [None]:
def convert_to_hf_chat_format(dialogs):
    """
    Converts a list of dialogues into a format compatible with Hugging Face's chat models.

    This function iterates through each dialogue in the provided list. Each dialogue is composed of
    message objects which are formatted into a string that follows the conventions used by Hugging Face
    chat models. Specifically, it adds special tokens to denote the start (BOS) and end (EOS) of each message,
    as well as markers to indicate the role of the speaker (user or system).

    Parameters:
    dialogs (List[List[Message]]): A list of dialogues, where each dialogue is a list of Message objects.

    Returns:
    List[str]: A list of dialogues formatted as strings, suitable for Hugging Face chat models.
    """

    # Initialize an empty list to store the formatted dialogues
    formatted_dialogs = []

    # Iterate over each dialogue in the provided list
    for dialog in dialogs:
        # Initialize an empty string to build the chat input
        chat_input = ''

        # Iterate over each message in the dialogue
        for message in dialog:
            # Format messages from the 'user' role
            if message.role == 'user':
                # Add special tokens and the user's message content
                chat_input += f'INST <<USER>> BOS {message.content.strip()} EOS '

            # Format messages from the 'assistant' role
            elif message.role == 'assistant':
                # Add special tokens and the assistant's message content
                chat_input += f'<<SYS>> BOS {message.content.strip()} EOS '

        # Append the formatted chat input to the list, stripping any trailing spaces
        formatted_dialogs.append(chat_input.strip())

    # Return the list of formatted dialogues
    return formatted_dialogs


In [None]:
# Convert the dialogues to a format compatible with Hugging Face's chat models.
hf_formatted_dialogs = convert_to_hf_chat_format(dialogs)

# Create a Hugging Face Dataset from the formatted dialogues.
dataset = Dataset.from_dict({"dialog": hf_formatted_dialogs})

In [None]:
# Retrieve the data type 'float16' from the torch module. This data type is used for half-precision floating-point numbers,
# which can reduce memory usage and potentially increase performance during model computation, especially on GPUs.
compute_dtype = getattr(torch, "float16")

# Configure quantization settings for the BitsAndBytes library. This library optimizes model parameter storage and computation.
quant_config = BitsAndBytesConfig(
    load_in_4bit=True,  # Enable loading model parameters in 4-bit precision to reduce memory usage.
    bnb_4bit_use_double_quant=True,  # Use double quantization for 4-bit precision, enhancing the balance between memory savings and model fidelity.
    bnb_4bit_quant_type="nf4",  # Set the quantization type to 'nf4', a specific 4-bit quantization scheme.
    bnb_4bit_compute_dtype=torch.bfloat16,  # Use bfloat16 (Brain Floating Point) for computation, offering a balance between precision and performance.
)

In [None]:
# Load a pre-trained causal language model using the Hugging Face Transformers library.
model = AutoModelForCausalLM.from_pretrained(
    base_model,  # 'base_model' is a variable holding the name or path of the pre-trained model.
    quantization_config=quant_config,  # Apply the previously defined BitsAndBytes quantization configuration to optimize memory usage.
    device_map={"": 0}  # Assign the model to the first available GPU (device 0). If running on CPU, this would be an empty string.
)

# Update the model's configuration.
model.config.use_cache = False  # Disable caching of past hidden states. This can save memory in trade-off for speed, especially in generation tasks.
model.config.pretraining_tp = 1  # Set 'pretraining_tp' (tensor parallelism during pretraining) to 1, indicating no parallelism is used. This is relevant for multi-GPU setups.

In [None]:
# Load the tokenizer for the specified base model with the ability to include custom tokenization logic.
tokenizer = AutoTokenizer.from_pretrained(base_model, trust_remote_code=True)

# Set the tokenizer's padding token to be the same as its end-of-sequence (EOS) token.
tokenizer.pad_token = tokenizer.eos_token

# Configure the tokenizer to add padding (if necessary) to the right side of the sequence.
tokenizer.padding_side = "right"

In [None]:
# Configuration for Low-Rank Adaptation (LoRA)
peft_params = LoraConfig(
    r=8,  # The rank for the low-rank matrices in LoRA, affecting the number of parameters to be trained.
    lora_alpha=32,  # A scaling factor for the low-rank matrices, controlling the magnitude of updates.
    lora_dropout=0.05,  # Dropout probability applied to LoRA layers to prevent overfitting.
    bias="none",  # Specifies whether to apply bias in the LoRA layers; 'none' indicates no bias.
    task_type="CAUSAL_LM",  # The type of task for the language model, here set for causal language modeling.
)

In [None]:
# Configuration for training parameters using the Hugging Face Transformers library.
training_params = TrainingArguments(
    output_dir="./results",  # Directory where the training results and model checkpoints will be saved.
    max_steps=150,  # Maximum number of training steps to perform.
    save_steps=50,  # Save a model checkpoint after this many steps.
    gradient_accumulation_steps=2,  # Number of steps to accumulate gradients before performing a backward/update pass.
    learning_rate=2e-4,  # Learning rate for the optimizer.
    per_device_train_batch_size=4,  # Batch size per device during training.
    warmup_steps=2,  # Number of warmup steps for learning rate scheduler.
    logging_steps=1,  # Log training information every this many steps.
    fp16=True,  # Whether to use 16-bit (mixed) precision instead of 32-bit.
    seed=42,  # Random seed for initialization, ensuring reproducibility.
    optim="paged_adamw_8bit",  # The optimizer to use. Here, it's a specific 8-bit version of AdamW for efficiency.
)

In [None]:
# Initialization of the SFTTrainer
trainer = SFTTrainer(
    model=model,  # The pre-trained language model to be fine-tuned.
    train_dataset=dataset,  # The dataset to be used for training.
    peft_config=peft_params,  # The Low-Rank Adaptation (LoRA) configuration for efficient fine-tuning.
    dataset_text_field="dialog",  # The field in the dataset that contains the input text for training.
    max_seq_length=None,  # Maximum sequence length for model inputs. 'None' means it will use the model's default.
    tokenizer=tokenizer,  # Tokenizer to be used for converting text into model input format.
    args=training_params,  # Training arguments such as learning rate, batch size, etc.
    packing=False,  # Determines whether to use data packing. 'False' means no packing will be used.
)

In [None]:
# Train the model.
trainer.train()

In [None]:
# Save the trained model to the specified directory.
trainer.model.save_pretrained('RickSanchez-7b-q')

# Save the tokenizer associated with the model to the same directory.
trainer.tokenizer.save_pretrained('RickSanchez-7b-q')

In [None]:
# Load the fine-tuned model from the specified directory.
model = AutoModelForCausalLM.from_pretrained('RickSanchez-7b-q')

# Load the tokenizer that was used during the training of this model.
tokenizer = AutoTokenizer.from_pretrained('RickSanchez-7b-q')

In [None]:
# Preparing the prompt for generating a response using the fine-tuned language model.

# Define the system prompt. This sets the context for the language model, instructing it to respond as Rick Sanchez.
system_prompt = "You are Rick Sanchez from Rick and Morty. You are an eccentric and cynical genius. Respond in complete sentences, and only as Rick."

# User input to the model. This represents a question or statement that Rick Sanchez is expected to respond to.
# In this case, the user (Morty) is asking if they can go on an adventure.
user_input = "Can we go on an adventure Rick?"

# Combining the system prompt with the user input to form the full prompt for the model.
# This format sets the scene for the interaction, with Morty's line followed by an expectation for Rick's response.
full_prompt = f"{system_prompt}\nMorty: {user_input}\nRick:"

In [None]:
def generate_response(full_prompt, max_length=100, num_beams=3):
    """
    Generate a response from the model based on a given prompt using beam search.

    This function takes a prompt, encodes it for the model, and generates a response. 
    It uses beam search to enhance the quality of the response. The function is optimized for 
    memory usage by clearing the CUDA cache and using torch.no_grad().

    Args:
        full_prompt (str): The prompt to which the model will generate a response.
        max_length (int): The maximum length of the model's response.
        num_beams (int): The number of beams to use in beam search for diversity in responses.

    Returns:
        str: The text generated by the model as a response to the prompt.

    The generated text is post-processed to ensure proper formatting, remove any 'EOS' tokens,
    and to ensure it doesn't contain the initial prompt.
    """
    # Set the device for computation based on CUDA availability
    device = "cuda" if torch.cuda.is_available() else "cpu"
    model.to(device)

    # Encode the combined prompt
    input_ids = tokenizer.encode(full_prompt, return_tensors='pt').to(device)

    # Generate response using the model with beam search
    output = model.generate(
        input_ids, 
        max_length=max_length,
        num_beams=num_beams,  # Set number of beams for beam search
        no_repeat_ngram_size=2,  # Prevents the model from repeating short phrases
        early_stopping=True,    # Stops when a sentence is completed
        pad_token_id=tokenizer.eos_token_id
    )

    # Decode the output
    response_text = tokenizer.decode(output[0], skip_special_tokens=True)

    # Truncate at the last complete sentence if possible
    sentences = response_text.split('.')
    response_text = '.'.join(sentences[:-1]) + ('.' if len(sentences) > 1 else '')

    # Remove the 'EOS' token and any trailing spaces from the response
    response_text = response_text.replace("EOS", "").strip()

    # Ensure the response does not contain the full prompt
    if response_text.startswith(full_prompt):
        response_text = response_text[len(full_prompt):]

    # Trim leading and trailing spaces again after adjustments
    response_text = response_text.strip()

    return response_text

# Example usage
print(generate_response(full_prompt))