# Qwen 2.5 VL Finetuning

Training 7B Qwen 2.5 VL in bf16 format using AWS p4d and p4de clusters

**1-Library Installation and environment setting**

In [None]:
!git config --global credential.helper store
!pip install huggingface_hub
!huggingface-cli login --token YOUR_HUGGINGFACE_TOKEN

In [None]:
# Install AWS CLI if not already installed
!pip install awscli

In [None]:
# Configure AWS CLI with your credentials
!aws configure

In [None]:
!aws sts get-caller-identity


In [None]:
!pip install --upgrade pip 
!pip install sagemaker transformers datasets peft trl bitsandbytes

In [None]:
!pip install --upgrade sagemaker

**2-Define Training Datasets inside of your S3**

In [75]:
import os
import sagemaker
import boto3

# Specify your custom bucket name
bucket_name = "vlm-training-s3"  # Same as used in upload
s3_prefix = "vlm-training-data"  # Same as used in upload
chosen_schema = "qwen"

# And update the file paths accordingly:
train_s3_path = f"{s3_prefix}/train/{chosen_schema}_train_set.tsv"
validation_s3_path = f"{s3_prefix}/validation/{chosen_schema}_validation_set.tsv"
test_s3_path = f"{s3_prefix}/test/{chosen_schema}_test_set.tsv"


**3-Main Training File Creation**

Since all this is executed in a docker container and we cannot execute cell by cell, therefore, this code creates different codes in the current folder to be able to execute all of them inside of the AWS container to communicate with the cluster and the A100 8 GPUs

**Important Note:** Please, in this cell the main thing you need to change is the model name. It should correspond to the same model name you used for the data preprocessing. So for instance in the case of LG exaone, the huggingface name is: **LGAI-EXAONE/EXAONE-3.5-7.8B-Instruct**. The second thing you will need to do is to ensure that the **.tsv** name matches the same as described in the previous cell. For example: **exaone_train_set.tsv**

**Secondary Note:** If you wonder why the preprocessing code is included inside of this code, there is a good reason. Since we cannot control the execution and ensure that tokenization was done right due to executing in a docker container, we preprocess inside of the container first to check all the tokenization works properly prior to passing it to the model. This way we can see what is happening inside of the docker container, otherwise, it would be very hard to debug since we cannot run cell by cell as in a normal notebook.

In [76]:
!explorer .

In [77]:
%%writefile train_deploy_huggingface.py
import os
import argparse
import subprocess
import json
import shutil
import torch
import deepspeed
import pandas as pd
from PIL import Image
from datasets import Dataset, load_dataset
from transformers import (
    AutoModelForCausalLM,
    AutoTokenizer,
    Trainer,
    TrainingArguments,
    DataCollatorForLanguageModeling,
    TrainerCallback,
    AutoModelForVision2Seq, 
    AutoProcessor,
    DefaultDataCollator
)
from qwen_vl_utils import process_vision_info
import boto3

s3 = boto3.client('s3')

def list_dir_contents(path):
    """
    Recursively lists all files and directories within the given path,
    along with their sizes.
    """
    print(f"\nContents of '{path}':")
    for root, dirs, files in os.walk(path):
        level = root.replace(path, '').count(os.sep)
        indent = ' ' * 4 * level
        print(f"{indent}{os.path.basename(root)}/")
        sub_indent = ' ' * 4 * (level + 1)
        for f in files:
            file_path = os.path.join(root, f)
            size = os.path.getsize(file_path) / 1e6  # Size in MB
            print(f"{sub_indent}{f} - {size:.2f} MB")

def download_images_from_s3(bucket, prefix, local_dir):
    os.makedirs(local_dir, exist_ok=True)
    response = s3.list_objects_v2(Bucket=bucket, Prefix=prefix)
    for obj in response.get("Contents", []):
        file_key = obj["Key"]
        local_file_path = os.path.join(local_dir, os.path.basename(file_key))
        s3.download_file(bucket, file_key, local_file_path)
        print(f"Downloaded {file_key} to {local_file_path}")

def main():
    parser = argparse.ArgumentParser()

    # Hyperparameters
    parser.add_argument("--batch_size", type=int, default=1, help="Per-device training batch size")
    parser.add_argument("--epochs", type=int, default=4, help="Number of training epochs")
    parser.add_argument("--learning_rate", type=float, default=1e-5, help="Learning rate")
    parser.add_argument("--gradient_accumulation_steps", type=int, default=16, help="Gradient accumulation steps")
    parser.add_argument("--max_length", type=int, default=4000, help="Maximum sequence length")

    # Environment variables set by SageMaker
    parser.add_argument("--model_dir", type=str, default=os.environ.get("SM_MODEL_DIR"))
    parser.add_argument("--output_data_dir", type=str, default=os.environ.get("SM_OUTPUT_DATA_DIR"))
    parser.add_argument("--train_dir", type=str, default=os.environ.get("SM_CHANNEL_TRAIN"))
    parser.add_argument("--validation_dir", type=str, default=os.environ.get("SM_CHANNEL_VALIDATION"))

    # **Add the following line to accept --local_rank**
    parser.add_argument("--local_rank", type=int, default=-1, help="Local rank for distributed training")
    parser.add_argument("--images_path", type=str, default="/opt/ml/input/data/images", help="Local path for images")

    args = parser.parse_args()

    # ----- Removed CSV/S3-based data loading as per tutorial instructions -----
    # (The following block was removed:)
    #   - Printing image directory structure and sample paths.
    #   - Downloading images from S3.

    # Initialize processor, tokenizer and model
    processor = AutoProcessor.from_pretrained("Qwen/Qwen2.5-VL-7B-Instruct")
    tokenizer = processor.tokenizer  # Define tokenizer from the processor
    model = AutoModelForVision2Seq.from_pretrained(
        "Qwen/Qwen2.5-VL-7B-Instruct",
        torch_dtype=torch.bfloat16,
        trust_remote_code=True,
        token='YOUR_HUGGINGFACE_TOKEN'
    )
    
    model.gradient_checkpointing_enable()

    with open('./deepspeed_config.json', 'r') as config_file:
        deepspeed_config = json.load(config_file)

    print(f"Model device: {next(model.parameters()).device}")
    
    # ----------- Load and Format the ChartQA Dataset from Hugging Face -----------
    # The tutorial uses the HuggingFaceM4/ChartQA dataset.
    dataset_id = "HuggingFaceM4/ChartQA"
    raw_train_dataset, raw_eval_dataset, raw_test_dataset = load_dataset(dataset_id, split=["train[:10%]", "val[:10%]", "test[:10%]"])
    
    # Define the system message for the chatbot-style interaction.
    system_message = (
        "You are a Vision Language Model specialized in interpreting visual data from chart images.\n"
        "Your task is to analyze the provided chart image and respond to queries with concise answers, "
        "usually a single word, number, or short phrase.\n"
        "The charts include a variety of types (e.g., line charts, bar charts) and contain colors, labels, and text.\n"
        "Focus on delivering accurate, succinct answers based on the visual information. Avoid additional explanation unless absolutely necessary."
    )
    
    def format_data(sample):
        # Ensure the label is in string format (if provided as a list, take the first element)
        label_text = sample["label"][0] if isinstance(sample["label"], list) else sample["label"]
        return [
            {
                "role": "system",
                "content": [{"type": "text", "text": system_message}],
            },
            {
                "role": "user",
                "content": [
                    {
                        "type": "image",
                        "image": sample["image"],
                    },
                    {
                        "type": "text",
                        "text": sample["query"],
                    },
                ],
            },
            {
                "role": "assistant",
                "content": [{"type": "text", "text": label_text}],
            },
        ]
    
    # Format the train and evaluation datasets using the chatbot structure
    train_dataset = [format_data(sample) for sample in raw_train_dataset]
    eval_dataset = [format_data(sample) for sample in raw_eval_dataset]
    
    # ----------- Custom Collate Function (using tutorial's approach) -----------
    def custom_collate_fn(examples):
        # Apply the chat template to each example.
        texts = [processor.apply_chat_template(example, tokenize=False) for example in examples]
        # Process the images using the helper function.
        image_inputs = [process_vision_info(example)[0] for example in examples]
        # Encode texts and images together.
        batch = processor(
            text=texts,
            images=image_inputs,
            return_tensors="pt",
            padding=True,
            truncation=True,
            max_length=args.max_length
        )
        # Prepare labels by masking padding tokens.
        labels = batch["input_ids"].clone()
        labels[labels == processor.tokenizer.pad_token_id] = -100
        
        # Mask image token indices using the tutorial-specified IDs.
        image_tokens = [151652, 151653, 151655]
        for token in image_tokens:
            labels[labels == token] = -100
        
        batch["labels"] = labels
        return batch

    data_collator = custom_collate_fn

    # Define separate directory for checkpoints
    checkpoints_dir = os.path.join(args.model_dir, "checkpoints")
    os.makedirs(checkpoints_dir, exist_ok=True)

    # Training arguments with DeepSpeed integration
    training_args = TrainingArguments(
        output_dir=checkpoints_dir,  # Checkpoints saved here,
        num_train_epochs=args.epochs,
        per_device_train_batch_size=args.batch_size,
        per_device_eval_batch_size=args.batch_size,
        gradient_accumulation_steps=args.gradient_accumulation_steps,
        learning_rate=args.learning_rate,
        weight_decay=3e-7,
        bf16=True,
        logging_dir=os.path.join(args.output_data_dir, "logs"),
        logging_steps=10,
        log_level='debug',
        evaluation_strategy="epoch",
        save_strategy="epoch",
        save_total_limit=2,
        metric_for_best_model="eval_loss",
        dataloader_num_workers=4,
        deepspeed=deepspeed_config,
        remove_unused_columns=False,  # Essential for multimodal training
    )

    # Custom callback to log GPU stats
    class GPUStatsCallback(TrainerCallback):
        def on_step_end(self, args, state, control, **kwargs):
            gpu = int(os.environ.get("LOCAL_RANK", -1))
            if gpu == -1:
                return  # Skip if not using GPU
            torch.cuda.synchronize(gpu)
            allocated = torch.cuda.memory_allocated(gpu) / 1e9  # GB
            reserved = torch.cuda.memory_reserved(gpu) / 1e9  # GB

            try:
                result = subprocess.check_output(
                    ['nvidia-smi', '--id={}'.format(gpu), '--query-gpu=utilization.gpu,memory.used,memory.total', '--format=csv,nounits,noheader'],
                    encoding='utf-8'
                )
                gpu_util, mem_used, mem_total = result.strip().split(',')
                gpu_util = int(gpu_util)
                mem_used = float(mem_used) / 1e3  # MB -> GB
                mem_total = float(mem_total) / 1e3  # MB -> GB
            except Exception as e:
                gpu_util = 'N/A'
                mem_used = allocated
                mem_total = reserved
                print(f"Error getting GPU utilization: {e}")

            print(f"After step {state.global_step}: GPU {gpu}, Utilization: {gpu_util}%, Memory Used: {mem_used:.2f} GB / {mem_total:.2f} GB")
    
    # Initialize Trainer with DeepSpeed and callbacks
    trainer = Trainer(
        model=model,
        args=training_args,
        train_dataset=train_dataset,
        eval_dataset=eval_dataset,
        tokenizer=processor.tokenizer,
        data_collator=data_collator,
        callbacks=[GPUStatsCallback()],
    )

    # Start training
    trainer.train()
    
    # Save the model - all processes must call this
    print("Model trained successfully, proceeding to save...")
    trainer.save_model(args.model_dir)

    # Only the main process handles tokenizer saving, model card creation, and cleanup
    if trainer.is_world_process_zero():
        print("Saving tokenizer and creating model card...")
        tokenizer.save_pretrained(args.model_dir)
        trainer.create_model_card()

        print("Saving completed. Verifying saved files...")
        list_dir_contents(args.model_dir)

        try:
            shutil.rmtree(checkpoints_dir)
            print(f"Removed checkpoints directory: {checkpoints_dir}")
        except Exception as e:
            print(f"Error removing checkpoints directory: {e}")
        
        list_dir_contents(args.model_dir)
        print("Done!")

if __name__ == "__main__":
    main()


Overwriting train_deploy_huggingface.py


**4-Create the requirements file**

Since we are inside of a docker container and cannot do many things, we need to install the necessary libraries through a single command therefore all the necessary libraries are added in this requirements.txt

In [78]:
%%writefile requirements.txt

transformers
torch
datasets
accelerate
sentencepiece
bitsandbytes
peft
pyarrow
deepspeed==0.15.4
accelerate>=0.26.0
pillow
qwen_vl_utils

Overwriting requirements.txt


**4-Deepspeed Configuration**

Since modeldataparallel and dataparallel are not enough to parallelizde this big models, we use deepspeed. Specifically Deepspeed ZeRO Stage 3. This allows us the maximum parallelization inside of the p4d and p4de clusters.

**Important Note:** There are a few important things to take into account. Since deepspeed configuration is tied to the huggingface trainingArguments, make sure both definitions match (the one on step 3 and this one). If you are using p4d make sure that on the "device" definition is set to "cpu". You need to offload parameters and optimizers to cpu or your cluster will crash. If you have a p4de cluster, set it to "none" and the training time would be approximately half (1h vs 30mins for 7~8B family models).

In [79]:
%%writefile deepspeed_config.json
{
  "train_micro_batch_size_per_gpu": 1,
  "gradient_accumulation_steps": 4,
  "steps_per_print": 100,
  "optimizer": {
    "type": "AdamW",
    "params": {
      "lr": 1e-5,
      "betas": [0.9, 0.999],
      "eps": 1e-8,
      "weight_decay": 3e-7
    }
  },
  "bf16": {
    "enabled": true
  },
  "zero_optimization": {
    "stage": 3,
    "stage3_gather_16bit_weights_on_model_save": true,
    "offload_optimizer": {
      "device": "none",
      "pin_memory": true
    },
    "offload_param": {
      "device": "none",
      "pin_memory": true
    },
    "allgather_partitions": true,
    "allgather_bucket_size": 50000000,
    "overlap_comm": true,
    "reduce_scatter": true,
    "reduce_bucket_size": 50000000,
    "contiguous_gradients": true
  },
  "activation_checkpointing": {
    "partition_activations": true,
    "contiguous_memory_optimization": true
  },
  "wall_clock_breakdown": false
}




Overwriting deepspeed_config.json


**5-Deepspeed Launcher**

Unluckily we cannot launch deepspeed inside of our training job of step 3. Due to AWS and container related issues, if you try to do so, you wont be able to see the rest of the GPUs since your code is already running. In order to arrange the previous code, a deepspeed launcher must be made that directly from command line creates all the required processes (in this case 8 processes since we have 8 GPUs) and everything is coordinated by this code.

In [80]:
%%writefile ds_launcher.py
import sys
import os
import subprocess
import json
import sys
import logging
from argparse import ArgumentParser

logger = logging.getLogger(__name__)


def parse_args():
    parser = ArgumentParser(
        description=("SageMaker DeepSpeed Launch helper utility that will spawn deepspeed training scripts")
    )

    # rest from the training program
    parsed, nargs = parser.parse_known_args()

    return nargs


def main():
    # https://github.com/microsoft/DeepSpeed/blob/master/deepspeed/launcher/launch.py
    num_gpus = int(os.environ.get("SM_NUM_GPUS", 0))
    hosts = json.loads(os.environ.get("SM_HOSTS", "{}"))
    num_nodes = len(hosts)
    current_host = os.environ.get("SM_CURRENT_HOST", 0)
    rank = hosts.index(current_host)
    print(f"num_gpus = {num_gpus}, num_nodes = {num_nodes}, current_host = {current_host}, rank = {rank}")

    # os.environ['NCCL_DEBUG'] = 'INFO'

    # get number of GPU
    # if num_gpus == 0:
    #     raise ValueError("No GPUs found.")

    args = parse_args()
    command = f"deepspeed --num_gpus={num_gpus} train_deploy_huggingface.py {' '.join(args)}"
    print(f"command = {command}")
    # launch deepspeed training
    deepspeed_launch(command)


def deepspeed_launch(command):
    # try:
    try:
        subprocess.run(command, shell=True)
    except Exception as e:
        logger.info(e)


if __name__ == "__main__":
    main()

Overwriting ds_launcher.py


**6-Sagemaker Training Job Creation**

This step wraps up all of our work. Basically all files are passed here in order to be able to launch the codes inside of the container.

**Important Note:** Make sure that the instance type is set to the one you need (p4d or p4de) with the exact name AWS requires such as **ml.p4de.24xlarge** or **ml.p4d.24xlarge**. Also make sure all hyperparameters match with the previously described ones.

In [None]:
from sagemaker.pytorch import PyTorch

# Replace with your actual Role ARN
role = "YOUR_AWS_ROLE"

distribution = {
    "deepspeed": {
        "enabled": True,
        "config_path": "deepspeed_config.json"
    }
}

estimator = PyTorch(
    entry_point="ds_launcher.py",
    role=role,
    source_dir=".",  # Ensure 'deepspeed_config.json' is included here
    instance_count=1,
    instance_type="ml.p4de.24xlarge",  # Instance with 8 GPUs
    framework_version="2.1.0",  # Ensure compatibility with DeepSpeed
    py_version="py310",
    dependencies=["requirements.txt"],
    hyperparameters={
        "epochs": 4,
        "batch_size": 1,
        "learning_rate": 1e-5,
        "gradient_accumulation_steps": 4,
        "max_length": 4000,  # Updated max_length to 200
        #Images Route (Is not a hyperparameter)
    },
    distribution=distribution,
    output_path=f"s3://{bucket_name}/{s3_prefix}/model",
)

# Define S3 URIs for TSV data
train_s3_uri = f"s3://{bucket_name}/{train_s3_path}"
validation_s3_uri = f"s3://{bucket_name}/{validation_s3_path}"

# Fit the model
estimator.fit({"train": train_s3_uri, "validation": validation_s3_uri, "images": f"s3://{bucket_name}/{s3_prefix}/images"})


In [37]:
!explorer .

In [42]:
import boto3
import sagemaker
import os 

os.environ['AWS_ACCESS_KEY_ID']=""
os.environ['AWS_SECRET_ACCESS_KEY']=""
os.environ['AWS_DEFAULT_REGION'] = "us-west-2"
sagemaker.Session(boto3.session.Session())

<sagemaker.session.Session at 0x17ff73891b0>

In [None]:
!aws configure