<a href="https://colab.research.google.com/github/hamzafarooq/multi-agent-course/blob/main/Module_2/Quantization/TextStreamer_Meta_Llama_3_1_8B_Instruct.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Efficient Text Streaming with Hugging Face Transformers and Llama 3.1 8B

This notebook demonstrates how to use the `TextStreamer` and `TextIteratorStreamer` utilities from the Hugging Face `transformers` library for generating text token by token with the `unsloth/Meta-Llama-3.1-8B-Instruct` model. Streaming is crucial for applications requiring real-time output, such as chatbots, as it allows users to see the response as it's being generated, rather than waiting for the entire sequence.

We will explore two main scenarios:
1.  **Text Streaming without Quantization:** Demonstrates basic streaming with the model in its default precision.
2.  **Text Streaming with Quantization (4-bit):** Shows how to stream text while using a memory-efficient quantized version of the model.

Additionally, this notebook includes helper functions to monitor GPU memory usage and provides an example of how to calculate common Large Language Model (LLM) performance metrics suchs as:
- Time To First Token (TTFT)
- Inter-Token Latency (ITL)
- End-to-end Latency
- Throughput

## Setup and Dependencies

This section imports the necessary libraries for model loading, tokenization, text streaming, and GPU monitoring.

In [None]:
# Install necessary libraries
# %%capture will suppress the output of this cell
%%capture
!pip install transformers
!pip install bitsandbytes

In [None]:
# Import core libraries from Hugging Face and PyTorch
from transformers import AutoTokenizer, AutoModelForCausalLM, pipeline, TextStreamer
import torch
from transformers import BitsAndBytesConfig

### GPU Utility Functions

The following functions are defined to help monitor GPU memory usage before and after loading the model. This is useful for understanding the memory footprint of different model configurations (e.g., full precision vs. quantized).

In [None]:
# Define helper functions to display GPU memory statistics
# This helps in understanding the memory footprint of the models.

def start_gpu_stat():
    """Records and prints initial GPU memory stats before model loading.

    This function captures the currently reserved GPU memory and the total
    available GPU memory, printing these values in GB. It's intended to be
    called before a memory-intensive operation like loading a large model
    to establish a baseline.

    Returns:
        tuple: A tuple containing:
            - initial_gpu_memory (float): The initially reserved GPU memory in GB.
            - max_memory (float): The total available GPU memory in GB.
    """
    #@title Show current memory stats
    # torch.cuda.get_device_properties(0) gets properties of the default CUDA device.
    gpu_stats = torch.cuda.get_device_properties(0)
    # torch.cuda.max_memory_reserved() returns the maximum GPU memory managed by the caching allocator in bytes.
    initial_gpu_memory = round(torch.cuda.max_memory_reserved() / 1024 / 1024 / 1024, 3) # Convert to GB
    max_memory = round(gpu_stats.total_memory / 1024 / 1024 / 1024, 3) # Convert to GB
    print(f"Initial Max memory reserved: {initial_gpu_memory} GB / Max memory available: {max_memory} GB")
    return initial_gpu_memory, max_memory


def final_gpu_stat(_initial_gpu_memory, _max_memory):
    """Calculates and prints GPU memory usage after an operation, like model loading.

    This function determines the peak reserved GPU memory after an operation,
    the difference in memory usage from the initial state, and presents these
    as absolute values (GB) and percentages of total available memory.

    Args:
        _initial_gpu_memory (float): The initially reserved GPU memory in GB,
                                     as returned by start_gpu_stat().
        _max_memory (float): The total available GPU memory in GB,
                             as returned by start_gpu_stat().
    """
    #@title Show final memory and time stats
    # Calculates and prints the peak GPU memory usage and the difference from the initial state.
    used_memory = round(torch.cuda.max_memory_reserved() / 1024 / 1024 / 1024, 3)
    used_memory_for_diff = round(used_memory - _initial_gpu_memory, 3)
    used_percentage = round(used_memory         /_max_memory*100, 3)
    diff_percentage = round(used_memory_for_diff/_max_memory*100, 3)

    print(f"Max memory = {_max_memory} GB.")
    print(f"{_initial_gpu_memory} GB of INITIAL memory reserved.")
    print(f"Peak reserved FINAL memory = {used_memory} GB.")
    print(f"Peak reserved memory DIFFERENCE = {used_memory_for_diff} GB.")
    print(f"Peak reserved memory % of FINAL memory = {used_percentage} %.")
    print(f"Peak reserved memory % of DIFFERENCE memory = {diff_percentage} %.")

# 1. Text Streaming Without Quantization

This section demonstrates text streaming using the `unsloth/Meta-Llama-3.1-8B-Instruct` model loaded in its default precision (typically float16 or float32). This provides the highest accuracy but consumes more memory and may be slower compared to quantized models.

In [None]:
# Define the model ID for Hugging Face Hub
model_id = "unsloth/Meta-Llama-3.1-8B-Instruct" # Replace with your model

# Load tokenizer and full precision model
# The tokenizer converts text into a format (tokens) that the model can understand.
tokenizer = AutoTokenizer.from_pretrained(model_id)

# Record GPU memory before loading the model
initial_gpu_memory, max_memory = start_gpu_stat()

# Load the Causal Language Model
# AutoModelForCausalLM is used for models that predict the next token in a sequence (e.g., GPT-like models).
# device_map="auto" automatically distributes the model across available GPUs/CPU based on resources.
model = AutoModelForCausalLM.from_pretrained(
    model_id,
    device_map="auto"
)

# Record GPU memory after loading the model
final_gpu_stat(initial_gpu_memory, max_memory)

### 1.1 Using `TextStreamer`

The `TextStreamer` class provides a simple way to print the generated tokens to the console as they are produced. It's useful for immediate visual feedback during interactive sessions.

In [None]:
# Prepare the prompt using an Alpaca-style format
# This common instruction-following format helps the model understand the task.
alpaca_prompt = """Below is an instruction that describes a task. Write a response that appropriately completes the request.

### Instruction:
{}

### Response:
"""

# Prepare input text
prompt_text = alpaca_prompt.format("What is the importance of using renewable energy?")  # instruction

# Tokenize the input prompt and move it to the model's device (e.g., GPU)
inputs = tokenizer([prompt_text], return_tensors="pt").to(model.device)  # Move inputs to model's device

# Initialize text streamer
# skip_prompt=False: The input prompt will also be printed by the streamer.
# skip_special_tokens=False: Special tokens (like EOS, BOS) will be printed.
text_streamer = TextStreamer(tokenizer, skip_prompt=False, skip_special_tokens=False)

# Generate response with streamer
# The model.generate() method will call the streamer for each new token.
# max_new_tokens limits the length of the generated response.
_ = model.generate(**inputs, streamer=text_streamer, max_new_tokens=100)


### 1.2 Understanding Performance Metrics: TTFT, ITL, Latency, Throughput

When evaluating streaming performance, several metrics are important:

-   **Time To First Token (TTFT):** The time elapsed from sending the request until the first token of the response is received. A lower TTFT means a quicker initial response.
-   **Inter-Token Latency (ITL):** The average time taken to generate each subsequent token after the first one. Lower ITL means faster streaming of the rest ofthe response.
-   **End-to-end Latency:** The total time taken from sending the request to receiving the complete response.
    -   *Approximation: Average output length (in tokens) * Inter-token latency + TTFT*
-   **Throughput:** The number of output tokens generated per second. Higher throughput indicates better overall generation speed.

### 1.3 Using `TextIteratorStreamer` and Calculating Performance Metrics

The `TextIteratorStreamer` allows for more programmatic control over the streamed tokens. It makes the generation process iterable, so you can process each token (or chunk of tokens) as it's generated. This is useful for applications where you need to handle the output in a custom way (e.g., sending it over a network, updating a UI).

This section also demonstrates how to calculate TTFT, ITL, end-to-end latency, and throughput.

In [None]:
# Prepare the prompt using an Alpaca-style format
# This common instruction-following format helps the model understand the task.
alpaca_prompt = """Below is an instruction that describes a task. Write a response that appropriately completes the request.

### Instruction:
{}

### Response:
"""

# Prepare input text
prompt_text = alpaca_prompt.format("What is the importance of using renewable energy?")
inputs = tokenizer(prompt_text, return_tensors="pt").to(model.device)

# Initialize variables for time measurements
start_time = time.time()
token_times = []

# Initialize TextIteratorStreamer
# skip_prompt=True: The input prompt will not be part of the streamed output.
# skip_special_tokens=False: Special tokens will be included in the stream.
streamer = TextIteratorStreamer(tokenizer, skip_prompt=True, skip_special_tokens=False)

# Start generation in a separate thread
# This is crucial because model.generate() with a streamer is a blocking call.
# Running it in a thread allows the main thread to iterate over the streamer simultaneously.
thread = Thread(target=model.generate, kwargs={
    'input_ids': inputs['input_ids'],
    'attention_mask': inputs['attention_mask'],
    'streamer': streamer,
    'max_new_tokens': 100
})
thread.start()

# Initialize a variable to store the model output
model_output = ""
first_token_time = None

# Iterate over the streamer to get generated text token by token (or small chunks)
for i, new_text in enumerate(streamer):
    model_output += new_text
    print(new_text, end='')

    # Measure time for the first token
    if i == 0:
        first_token_time = time.time()
    # Measure time for each token
    token_times.append(time.time())

# Calculate end-to-end latency
end_time = time.time()
end_to_end_latency = end_time - start_time

# Calculate Time To First Token (TTFT)
ttft = first_token_time - start_time if first_token_time else 0

# Calculate Inter-Token Latency (ITL)
itl = sum(x - y for x, y in zip(token_times[1:], token_times[:-1])) / (len(token_times) - 1) if len(token_times) > 1 else 0

# Calculate Throughput (tokens per second)
# Note: tokenizer.encode(model_output) re-tokenizes the output string.
# For more precise token count, you could count tokens as they arrive if the streamer yields individual tokens.
throughput = len(tokenizer.encode(model_output)) / end_to_end_latency if model_output else 0

print("\nTime To First Token (TTFT):", ttft)
print("Inter-token latency (ITL):", itl)
print("End-to-end Latency:", end_to_end_latency)
print("Throughput:", throughput)

# 2. Text Streaming With Quantization (4-bit)

Quantization is a technique to reduce the memory footprint and potentially speed up the inference of large language models. Here, we use 4-bit quantization (`BitsAndBytesConfig`) to load the model. This significantly reduces the GPU VRAM required, making it possible to run larger models on consumer hardware. We will repeat the streaming exercises with this quantized model.

**Important:** Shutdown and restart the kernel before running the cells below if you have already run the non-quantized model. This ensures that GPU memory is cleared, providing accurate memory usage statistics for the quantized model.

## 2.1 Setup for Quantized Model

Re-import libraries and redefine GPU utility functions if the kernel was restarted. Then, configure and load the model with 4-bit quantization.

In [None]:
# Re-import necessary libraries (if kernel was restarted)
from transformers import AutoTokenizer, AutoModelForCausalLM, pipeline, TextStreamer
import torch
from transformers import BitsAndBytesConfig

In [None]:
# Define helper functions to display GPU memory statistics
# This helps in understanding the memory footprint of the models.


def start_gpu_stat():
    """Records and prints initial GPU memory stats before model loading.

    This function captures the currently reserved GPU memory and the total
    available GPU memory, printing these values in GB. It's intended to be
    called before a memory-intensive operation like loading a large model
    to establish a baseline.

    Returns:
        tuple: A tuple containing:
            - initial_gpu_memory (float): The initially reserved GPU memory in GB.
            - max_memory (float): The total available GPU memory in GB.
    """
    #@title Show current memory stats
    # torch.cuda.get_device_properties(0) gets properties of the default CUDA device.
    gpu_stats = torch.cuda.get_device_properties(0)
    # torch.cuda.max_memory_reserved() returns the maximum GPU memory managed by the caching allocator in bytes.
    initial_gpu_memory = round(torch.cuda.max_memory_reserved() / 1024 / 1024 / 1024, 3) # Convert to GB
    max_memory = round(gpu_stats.total_memory / 1024 / 1024 / 1024, 3) # Convert to GB
    print(f"Initial Max memory reserved: {initial_gpu_memory} GB / Max memory available: {max_memory} GB")
    return initial_gpu_memory, max_memory


def final_gpu_stat(_initial_gpu_memory, _max_memory):
    """Calculates and prints GPU memory usage after an operation, like model loading.

    This function determines the peak reserved GPU memory after an operation,
    the difference in memory usage from the initial state, and presents these
    as absolute values (GB) and percentages of total available memory.

    Args:
        _initial_gpu_memory (float): The initially reserved GPU memory in GB,
                                     as returned by start_gpu_stat().
        _max_memory (float): The total available GPU memory in GB,
                             as returned by start_gpu_stat().
    """
    #@title Show final memory and time stats
    # Calculates and prints the peak GPU memory usage and the difference from the initial state.
    used_memory = round(torch.cuda.max_memory_reserved() / 1024 / 1024 / 1024, 3)
    used_memory_for_diff = round(used_memory - _initial_gpu_memory, 3)
    used_percentage = round(used_memory         /_max_memory*100, 3)
    diff_percentage = round(used_memory_for_diff/_max_memory*100, 3)

    print(f"Max memory = {_max_memory} GB.")
    print(f"{_initial_gpu_memory} GB of INITIAL memory reserved.")
    print(f"Peak reserved FINAL memory = {used_memory} GB.")
    print(f"Peak reserved memory DIFFERENCE = {used_memory_for_diff} GB.")
    print(f"Peak reserved memory % of FINAL memory = {used_percentage} %.")
    print(f"Peak reserved memory % of DIFFERENCE memory = {diff_percentage} %.")

In [None]:
# Define the model ID
model_id = "unsloth/Meta-Llama-3.1-8B-Instruct" # Replace with your model

# Configure 4-bit quantization using BitsAndBytesConfig
quantization_config = BitsAndBytesConfig(
    load_in_4bit=True,
    bnb_4bit_compute_dtype=torch.float16,
    bnb_4bit_quant_type="nf4"
)

# Load tokenizer and model in 4-bit
tokenizer = AutoTokenizer.from_pretrained(model_id)

# Record GPU memory before loading the quantized model
initial_gpu_memory, max_memory = start_gpu_stat()

# Load the model with quantization configuration
# device_map="auto" will handle placing the quantized model layers.
model = AutoModelForCausalLM.from_pretrained(
    model_id,
    quantization_config=quantization_config,
    device_map="auto"
)

# Record GPU memory after loading the quantized model
final_gpu_stat(initial_gpu_memory, max_memory)

### 2.2 Using `TextStreamer` with Quantized Model

Now, we use `TextStreamer` with the 4-bit quantized model. The process is identical to the non-quantized version, but we use `model_quantized`.

In [None]:
# Define Alpaca-style prompt format
alpaca_prompt = """Below is an instruction that describes a task. Write a response that appropriately completes the request.

### Instruction:
{}

### Response:
"""

# Prepare input text
prompt_text = alpaca_prompt.format("What is the importance of using renewable energy?")  # instruction

# Tokenize inputs
inputs = tokenizer([prompt_text], return_tensors="pt").to(model.device)  # Move inputs to model's device

# Initialize text streamer
text_streamer = TextStreamer(tokenizer, skip_prompt=True, skip_special_tokens=False)

# Generate response with streamer
_ = model.generate(**inputs, streamer=text_streamer, max_new_tokens=100)


### 2.3 Using `TextIteratorStreamer` and Metrics with Quantized Model

Finally, we repeat the performance metric calculation using `TextIteratorStreamer` with the 4-bit quantized model. This will help compare its TTFT, ITL, latency, and throughput against the non-quantized version.

In [None]:
from transformers import TextIteratorStreamer
from threading import Thread
import time

# Define Alpaca-style prompt format
alpaca_prompt = """Below is an instruction that describes a task. Write a response that appropriately completes the request.

### Instruction:
{}

### Response:
"""

# Prepare input text
prompt_text = alpaca_prompt.format("What is the importance of using renewable energy?")
inputs = tokenizer(prompt_text, return_tensors="pt").to(model.device)

# Initialize variables for time measurements
start_time = time.time()
token_times = []

# Initialize TextIteratorStreamer
streamer = TextIteratorStreamer(tokenizer, skip_prompt=True, skip_special_tokens=False)

# Start generation in a separate thread
thread = Thread(target=model.generate, kwargs={
    'input_ids': inputs['input_ids'],
    'attention_mask': inputs['attention_mask'],
    'streamer': streamer,
    'max_new_tokens': 100
})
thread.start()

# Initialize a variable to store the model output
model_output = ""
first_token_time = None

# Iterate over the streamer to get the generated text in chunks
for i, new_text in enumerate(streamer):
    model_output += new_text
    print(new_text, end='')

    # Measure time for the first token
    if i == 0:
        first_token_time = time.time()
    # Measure time for each token
    token_times.append(time.time())

# Calculate end-to-end latency
end_time = time.time()
end_to_end_latency = end_time - start_time

# Calculate Time To First Token (TTFT)
ttft = first_token_time - start_time if first_token_time else 0

# Calculate Inter-Token Latency (ITL)
itl = sum(x - y for x, y in zip(token_times[1:], token_times[:-1])) / (len(token_times) - 1) if len(token_times) > 1 else 0

# Calculate throughput
throughput = len(tokenizer.encode(model_output)) / end_to_end_latency if model_output else 0

print("\nTime To First Token (TTFT):", ttft)
print("Inter-token latency (ITL):", itl)
print("End-to-end Latency:", end_to_end_latency)
print("Throughput:", throughput)

# 3. Conclusion

This notebook demonstrated text streaming with `TextStreamer` and `TextIteratorStreamer` using both a full-precision and a 4-bit quantized version of the `unsloth/Meta-Llama-3.1-8B-Instruct` model.

Key takeaways:
- **TextStreamer** is straightforward for displaying streamed output directly.
- **TextIteratorStreamer** offers more control for programmatic handling of streamed tokens and is suitable for calculating performance metrics.
- **Quantization** significantly reduces GPU memory usage, which can be observed using the provided utility functions. This often comes with a trade-off in inference speed (TTFT, ITL, throughput) and potentially a minor impact on output quality, which can be compared by examining the metrics from both sections.

By comparing the performance metrics and memory usage, users can make informed decisions about whether to use quantization for their specific application based on available hardware and performance requirements.