# Exercise: Understanding Pipeline Parallelism and Micro-Batch Sizing

The purpose of this exercise is to demonstrate how to correctly configure a model for pipeline parallelism and to explore the performance impact of the `micro_batch_size` parameter.

We will:
1.  **Define a model** and a script to run it, highlighting a common pitfall (`nn.ModuleList` vs. `nn.Sequential`) that prevents automatic partitioning.
2.  **Create multiple DeepSpeed configs** to test different `micro_batch_size` values.
3.  **Run experiments** using the `deepspeed` launcher.
4.  **Analyze the results** to understand the trade-offs of micro-batch sizing.

### Step 1: Install Dependencies (if needed)

In [None]:
# uncomment and run this cell if you are running in SageMaker Notebook
# !pip install deepspeed transformers

### Step 2: Define the Model and Training Script

Next, we define our model and the main training script. 

#### Model Architecture

The `RealisticModel` class defines a simple transformer-like model. A key aspect for DeepSpeed's pipeline parallelism is how the model layers are structured. 

DeepSpeed's automatic partitioning works best with `nn.Sequential` containers, as it can easily divide the layers into stages. 

In this exercise, the student's task is to replace `nn.ModuleList` with `nn.Sequential` to enable efficient pipeline parallelism.

`nn.ModuleList` is a list-like container for `nn.Module`'s, but it doesn't have a `forward` method and thus the layers cannot be executed sequentially by simply calling the container. In contrast, `nn.Sequential` wraps a sequence of layers and provides a `forward` method that applies each layer in order, which is what DeepSpeed's pipeline parallelism leverages for automatic partitioning.

#### Throughput Measurement

The `measure_throughput` function is a helper to benchmark the performance of our model. It runs several iterations of a forward pass with dummy data and calculates the number of samples processed per second.

#### Main Execution Logic

The `main` function handles the overall execution. It first measures the baseline performance on a single GPU. 

Then, it initializes DeepSpeed, which automatically partitions the model across the available GPUs based on the provided configuration. 

Finally, it measures the throughput with pipeline parallelism enabled.

In [None]:
%%writefile exercise_solution.py

import torch
import torch.nn as nn
import deepspeed
import argparse
import time

# --- Model Definition ---
class MockTransformerBlock(nn.Module):
    def __init__(self, hidden_size):
        super().__init__()
        self.layer1 = nn.Linear(hidden_size, hidden_size)
        self.relu = nn.ReLU()
        self.layer2 = nn.Linear(hidden_size, hidden_size)
    def forward(self, x):
        return self.layer2(self.relu(self.layer1(x)))

class RealisticModel(nn.Module):
    def __init__(self, hidden_size=2048, num_layers=30, vocab_size=1000):
        super().__init__()
        self.embedding = nn.Embedding(vocab_size, hidden_size)

        # By switching to `nn.Sequential`, we create a single, sequential block that DeepSpeed's
        # "uniform" partitioner can easily divide across multiple GPUs.
        transformer_layer_list = [MockTransformerBlock(hidden_size) for _ in range(num_layers)]
        self.transformer_blocks = nn.Sequential(*transformer_layer_list)

        self.output_head = nn.Linear(hidden_size, vocab_size)

    def forward(self, x):
        x = self.embedding(x)
        # If using nn.Sequential, you call it directly
        x = self.transformer_blocks(x)
        x = self.output_head(x)
        return x

# --- Helper Function for Performance Measurement ---
def measure_throughput(model, dummy_input, iterations):
    # Warm-up
    for _ in range(5):
        _ = model(dummy_input)
    torch.cuda.synchronize()

    start_time = time.time()
    for _ in range(iterations):
        with torch.no_grad():
            _ = model(dummy_input)
    torch.cuda.synchronize()
    end_time = time.time()

    total_samples = dummy_input.size(0) * iterations
    duration = end_time - start_time
    throughput = total_samples / duration
    return throughput

# --- Main Execution Logic ---
def main():
    parser = argparse.ArgumentParser()
    parser.add_argument("--local_rank", type=int, default=-1)
    parser = deepspeed.add_config_arguments(parser)
    args = parser.parse_args()

    # --- Setup ---
    global_batch_size = 64
    hidden_size = 2048
    vocab_size = 1000
    iterations = 20
    is_rank_0 = args.local_rank <= 0 # Rank -1 for non-distributed, 0 for distributed

    # --- Baseline Measurement (Single GPU) ---
    if is_rank_0:
        print("\n--- Measuring Baseline Performance (Single GPU) ---", flush=True)
        try:
            baseline_model = RealisticModel().to('cuda:0')
            dummy_input = torch.randint(0, vocab_size, (global_batch_size, 128), device='cuda:0')
            baseline_throughput = measure_throughput(baseline_model, dummy_input, iterations)
            print(f"Baseline Throughput: {baseline_throughput:.2f} samples/sec", flush=True)
        except Exception as e:
            print(f"Could not run baseline on single GPU, likely out of memory: {e}", flush=True)
        print("--------------------------------------------------\n", flush=True)

    if torch.distributed.is_initialized():
        torch.distributed.barrier()

    # --- DeepSpeed Pipeline Parallelism ---
    print(f"\n--- Rank {args.local_rank}: Setting up DeepSpeed Pipeline ---", flush=True)

    ds_model = RealisticModel()
    model_engine, _, _, _ = deepspeed.initialize(args=args, model=ds_model, model_parameters=ds_model.parameters())

    # Input needs to be integer indices for the embedding layer
    dummy_input_ds = torch.randint(0, vocab_size, (global_batch_size, 128), device=model_engine.device)

    pipelined_throughput = measure_throughput(model_engine, dummy_input_ds, iterations)

    if model_engine.is_last_stage():
        print(f"\n--- Results on Last Stage (Rank {args.local_rank}) ---", flush=True)
        print(f"Pipelined Throughput: {pipelined_throughput:.2f} samples/sec", flush=True)
        print("------------------------------------------\n", flush=True)

if __name__ == "__main__":
    main()

Overwriting exercise_solution.py


### Step 3: Create DeepSpeed Configuration Files

We now create several JSON configuration files to experiment with different `micro_batch_size` values. 

The `micro_batch_size` is a crucial parameter in pipeline parallelism, as it determines the size of the smaller data chunks that are fed through the pipeline.

The configurations we will test are:
*   **`ds_config_mbs_4.json`**: A very small micro-batch size, which is expected to result in high pipeline bubble overhead.
*   **`ds_config_mbs_8.json`**: A medium micro-batch size.
*   **`ds_config_mbs_16.json`**: A potentially optimal micro-batch size.
*   **`ds_config_mbs_32.json`**: A large micro-batch size, which may lead to reduced parallelism.

In [None]:
import json

# We will create several config files to test the effect of `micro_batch_size`.
# The global batch size is 64, and we have 4 GPUs, so each stage gets a batch of 32.
configs = {
    # Case 1: Very small micro-batch size. Expect high pipeline bubble overhead.
    "ds_config_mbs_4.json": { "micro_batch_size": 4, "stages": 2 },

    # Case 2: A medium micro-batch size.
    "ds_config_mbs_8.json": { "micro_batch_size": 8, "stages": 2 },

    # Case 3: Optimal micro-batch size is often global_batch_size / (2 * num_stages)
    "ds_config_mbs_16.json": { "micro_batch_size": 16, "stages": 2 },

    # Case 4: Large micro-batch size. Less pipelining, closer to sequential execution.
    "ds_config_mbs_32.json": { "micro_batch_size": 32, "stages": 2 }
}

# Base config template
base_config = {
    "train_batch_size": 64, # This is the *global* batch size across all GPUs
    "optimizer": { "type": "SGD", "params": { "lr": 0.001 } },
    "fp16": { "enabled": True }
}

for filename, pipeline_config in configs.items():
    full_config = base_config.copy()
    full_config["pipeline"] = pipeline_config
    with open(filename, 'w') as f:
        json.dump(full_config, f, indent=2)
    print(f"Created config file: {filename}")

Created config file: ds_config_mbs_4.json
Created config file: ds_config_mbs_8.json
Created config file: ds_config_mbs_16.json
Created config file: ds_config_mbs_32.json


### Step 4: Run the Experiments

Now we run our training script with each of the configurations created in the previous step. 

The `deepspeed` launcher will execute the script on 4 GPUs, and we will collect the performance data for each `micro_batch_size`.

In [None]:
# We will now run the exercise script for each configuration to collect performance data.
for config_file in configs.keys():
    print(f"\n\n{'='*60}")
    print(f"RUNNING EXPERIMENT WITH CONFIG: {config_file}")
    print(f"{'='*60}\n")
    !deepspeed --num_gpus 4 exercise_solution.py --deepspeed_config {config_file}
    print("\n\n")



RUNNING EXPERIMENT WITH CONFIG: ds_config_mbs_4.json

--- Measuring Baseline Performance (Single GPU) ---

--- Rank 3: Setting up DeepSpeed Pipeline ---

--- Rank 1: Setting up DeepSpeed Pipeline ---

--- Rank 2: Setting up DeepSpeed Pipeline ---
Baseline Throughput: 79.40 samples/sec
--------------------------------------------------


--- Rank 0: Setting up DeepSpeed Pipeline ---
--- Results on Last Stage (Rank 3) ---
Pipelined Throughput: 453.78 samples/sec
------------------------------------------






RUNNING EXPERIMENT WITH CONFIG: ds_config_mbs_8.json


--- Rank 3: Setting up DeepSpeed Pipeline ---

--- Rank 1: Setting up DeepSpeed Pipeline ---

--- Measuring Baseline Performance (Single GPU) ---

--- Rank 2: Setting up DeepSpeed Pipeline ---
Baseline Throughput: 78.89 samples/sec
--------------------------------------------------


--- Rank 0: Setting up DeepSpeed Pipeline ---
--- Results on Last Stage (Rank 3) ---
Pipelined Throughput: 460.76 samples/sec
------------------

### Step 5: Analyze the Results

The table below is filled in using the output from the previous cell. Your numbers may vary slightly but should show a similar trend.

| Micro Batch Size (mbs) | Pipelined Throughput (samples/sec) | Explanation of Performance                                                                                                                                                                 |
| :--------------------- | :--------------------------------- | :----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| 4                      | 453.78                             | **High Pipeline Bubble**: With very small micro-batches, the pipeline is almost always partially empty. The "bubble" (idle time) dominates, leading to lower throughput compared to the optimal case. |
| 8                      | 460.76                             | **Bubble Decreasing**: As mbs increases, the pipeline stays full for longer periods, reducing idle time and increasing throughput.                                                          |
| 16                     | 464.51                             | **Approaching Optimal**: This mbs likely represents a good balance. The pipeline bubble is small compared to computation time, and batches are small enough for effective interleaving.        |
| 32                     | 461.25                             | **Low Parallelism/Large Bubble**: With `mbs` equal to `train_batch_size / num_stages`, each stage processes only one large micro-batch. The pipeline is mostly idle, resembling sequential execution. This can reduce throughput. |