# Speech Translation (Qwen-Audio)
* [Qwen/Qwen2-Audio-7B-Instruct](https://huggingface.co/Qwen/Qwen2-Audio-7B-Instruct)

# Prepare Environment

In [1]:
!nvidia-smi

Thu Apr 24 22:09:55 2025       
+-----------------------------------------------------------------------------------------+
| NVIDIA-SMI 550.144.03             Driver Version: 550.144.03     CUDA Version: 12.4     |
|-----------------------------------------+------------------------+----------------------+
| GPU  Name                 Persistence-M | Bus-Id          Disp.A | Volatile Uncorr. ECC |
| Fan  Temp   Perf          Pwr:Usage/Cap |           Memory-Usage | GPU-Util  Compute M. |
|                                         |                        |               MIG M. |
|   0  NVIDIA H200                    On  |   00000000:AA:00.0 Off |                    0 |
| N/A   27C    P0             74W /  700W |       1MiB / 143771MiB |      0%      Default |
|                                         |                        |             Disabled |
+-----------------------------------------+------------------------+----------------------+
                                                

In [None]:
# !pip3 install --upgrade transformers accelerate datasets hf_transfer -q
# !pip3 install librosa tensorboardX -q

In [None]:
# For using wandb

# !pip3 install wandb -q
# !wandb login $WB_TOKEN

# wandb settings
# os.environ["WANDB_PROJECT"] = "Qwen-German"
# os.environ["WANDB_LOG_MODEL"] = "end" # or "checkpoint"

In [None]:
import os
os.environ["HF_HUB_ENABLE_HF_TRANSFER"] = "1"

# Load Dataset

In [None]:
tgt_lang_code = "de"
# tgt_lang_code = "zh"
# tgt_lang_code = "ar"

In [6]:
data_cache_dir = "/workspace/data/"
model_cache_dir = "/workspace/model/"

In [None]:
from datasets import load_dataset, Audio

cache_dir = "/workspace/cache"

dataset_name = "ymoslem/ACL-6060"

dataset = load_dataset(dataset_name,
                       split="dev+eval",
                       cache_dir=cache_dir
                      )
dataset = dataset.cast_column("audio", Audio(sampling_rate=16000))

dataset = dataset.train_test_split(test_size=100, seed=0)

dataset

DatasetDict({
    train: Dataset({
        features: ['index', 'audio', 'text_en', 'text_ar', 'text_de', 'text_fa', 'text_fr', 'text_ja', 'text_nl', 'text_pt', 'text_ru', 'text_tr', 'text_zh'],
        num_rows: 784
    })
    test: Dataset({
        features: ['index', 'audio', 'text_en', 'text_ar', 'text_de', 'text_fa', 'text_fr', 'text_ja', 'text_nl', 'text_pt', 'text_ru', 'text_tr', 'text_zh'],
        num_rows: 100
    })
})

In [None]:
print(dataset["test"]["text_en"][0])
print(dataset["test"][f"text_{tgt_lang_code}"][0])

In [None]:
["train"] = dataset["train"].shuffle(seed=0)

print(dataset["train"]["text_en"][0])
print(dataset["train"][f"text_{tgt_lang_code}"][0])

A classifier using BERT or CodeBERT and a generator using BART.


In [None]:
dataset = dataset.select_columns(["audio", f"text_{tgt_lang_code}"])
print("Current column:", dataset.column_names)

dataset = dataset.rename_column(f"text_{tgt_lang_code}", "text")

dataset

Current column: {'train': ['audio', 'text_de'], 'test': ['audio', 'text_de']}


DatasetDict({
    train: Dataset({
        features: ['audio', 'text'],
        num_rows: 784
    })
    test: Dataset({
        features: ['audio', 'text'],
        num_rows: 100
    })
})

In [None]:
dataset["train"].features

{'audio': Audio(sampling_rate=16000, mono=True, decode=True, id=None),
 'text': Value(dtype='string', id=None)}

# Load Feature Extractor and Tokenizer

In [None]:
model_name = "Qwen/Qwen2-Audio-7B-Instruct"

print(model_name)

### Load WhisperFeatureExtractor

In [None]:
from transformers import AutoFeatureExtractor

feature_extractor = AutoFeatureExtractor.from_pretrained(model_name,
                                                         cache_dir=model_cache_dir,
                                                        )

In [None]:
feature_extractor

### Load the processor

In [None]:
from transformers import AutoProcessor

processor = AutoProcessor.from_pretrained(model_name,
                                          cache_dir=model_cache_dir)

In [None]:
print(processor.tokenizer.bos_token,
      processor.tokenizer.bos_token_id,
      processor.tokenizer.eos_token,
      processor.tokenizer.eos_token_id,
      processor.tokenizer.pad_token_id,
      sep="\n",
      end="\n\n",
     )

print(processor.feature_extractor.n_samples)  # 30 seconds
print(processor.feature_extractor.sampling_rate)

In [20]:
# chat template

import numpy as np

# print(processor.tokenizer.chat_template)

test_conversation = [
    {"role": "system", "content": f"You are a professional translator."},
    {"role": "user", "content": 
     [{"type": "audio", "audio": np.zeros(16000)}]},
]
print(processor.apply_chat_template(test_conversation, tokenize=False))


<|im_start|>system
You are a professional translator.<|im_end|>
<|im_start|>user
Audio 1: <|audio_bos|><|AUDIO|><|audio_eos|>
<|im_end|>



# Prepare Data

### Define a Data Collator

In [None]:
from torch.utils.data import DataLoader
from typing import List, Dict, Any, Optional, Union
import numpy as np
import librosa
import logging
import traceback
import torch


# Set up proper logging instead of print statements
logger = logging.getLogger(__name__)

class AudioDataCollator:
    """
    A data collator for processing audio data and preparing it for multimodal models.
    Handles audio processing and tokenization with appropriate masking for training.
    """
    def __init__(
        self, 
        processor, 
        task_prompt: str, 
        system_prompt: str = "You are a professional translator.",
        max_length: Optional[int] = None,
    ):
        """
        Initialize the AudioDataCollator.
        
        Args:
            processor: The processor for audio and text inputs
            task_prompt: The instruction to include with each audio input
            system_prompt: System prompt to include in each conversation
            max_length: Maximum audio length (samples). Uses processor default if None
        """
        self.processor = processor
        self.task_prompt = task_prompt
        self.system_prompt = system_prompt
        self.max_length = max_length or processor.feature_extractor.n_samples
        self.sampling_rate = processor.feature_extractor.sampling_rate

    def process_audio(self, audio: np.ndarray) -> np.ndarray:
        """
        Process a single audio array.
        
        Args:
            audio: Audio array to process
            
        Returns:
            Processed audio array with correct length and dtype
        """
        if len(audio) > self.max_length:
            # Truncate if longer than max_length
            audio = audio[:self.max_length]
        elif len(audio) < self.max_length:
            # Optionally pad if shorter
            # audio = np.pad(audio, (0, self.max_length - len(audio)), mode='constant')
            pass

        return audio.astype(np.float32)

    
    def find_assistant_token_position(self, input_ids: torch.Tensor) -> int:
        """
        Find the position where the assistant's response begins.
        
        Args:
            input_ids: Tokenized input sequence
            
        Returns:
            Index of assistant token sequence start position, or -1 if not found
        """
        assistant_start_tokens = self.processor.tokenizer.encode(
            "<|im_start|>assistant",
            add_special_tokens=False
        )
        
        # Search for the sequence of tokens in the input
        for j in range(len(input_ids) - len(assistant_start_tokens) + 1):
            if torch.equal(
                input_ids[j:j + len(assistant_start_tokens)],
                torch.tensor(assistant_start_tokens)
            ):
                return j
        
        return -1
    
    def __call__(self, examples: List[Dict[str, Any]]) -> Dict[str, torch.Tensor]:
        """
        Process a batch of examples.
        
        Args:
            examples: List of examples with audio and text fields
            
        Returns:
            Dictionary of tensors ready for model input
            
        Raises:
            ValueError: If no valid examples can be processed
        """
        texts = []
        audios = []
        valid_examples = []
        
        # Process each example
        for i, example in enumerate(examples):
            try:
                # Process audio
                procesed_audio = self.process_audio(example["audio"]["array"])
                audios.append(procesed_audio)
                
                conversation = [
                                {"role": "system", "content": self.system_prompt},
                                {"role": "user", "content": [
                                    {"type": "audio", "audio_url": example["audio"]["path"]},  # for formatting
                                    {"type": "text", "text": self.task_prompt},
                                ]},
                                {"role": "assistant", "content": [
                                    {"type": "text", "text": example["text"]},
                                ]},
                            ]

                # Apply chat template
                text = self.processor.apply_chat_template(
                    conversation,
                    add_generation_prompt=False,
                    tokenize=False,
                )                
                texts.append(text)
                valid_examples.append(example)
                
            except Exception as e:
                logger.error(f"Failed to process example {i}: {str(e)}")
                logger.debug(traceback.format_exc())
                continue
        
        logger.info(f"Successfully processed {len(valid_examples)} examples out of {len(examples)}")
        
        if not valid_examples:
            raise ValueError("No valid examples found in the batch.")

        try:
            # Process batch through processor
            inputs = self.processor(
                text=texts,
                audio=audios,
                sampling_rate=self.sampling_rate,
                return_tensors="pt",
                padding=True,
            )
        
        except Exception as e:
            logger.error(f"Failed to process batch: {str(e)}")
            logger.debug(traceback.format_exc())
            raise

        
        # Create labels by cloning input_ids
        labels = inputs["input_ids"].clone()
        
        
        # Mask the prompt portion (everything before assistant response)
        for i in range(len(texts)):
            try:
                # Find where assistant response starts
                assistant_start_idx = self.find_assistant_token_position(inputs["input_ids"][i])
                
                if assistant_start_idx != -1:
                    # Get the length of assistant token sequence
                    assistant_tokens_len = len(self.processor.tokenizer.encode(
                        "<|im_start|>assistant",
                        add_special_tokens=False
                    ))
                    
                    # Mask everything before the assistant content
                    labels[i, :assistant_start_idx + assistant_tokens_len] = -100
                else:
                    logger.warning(f"'<|im_start|>assistant' not found in input for example {i}")
                    labels[i, :] = -100  # Mask everything if token not found

            except Exception as e:
                logger.error(f"Failed to mask labels for example {i}: {str(e)}")
                logger.debug(traceback.format_exc())
        
        # Return formatted batch
        return {
            "input_ids": inputs["input_ids"],
            "attention_mask": inputs["attention_mask"],
            "input_features": inputs["input_features"],
            "feature_attention_mask": inputs["feature_attention_mask"],
            "labels": labels
        }

In [22]:
from typing import Dict, Any
import torch

def inspect_batch(batch: Dict[str, torch.Tensor], processor) -> None:
    """Detailed inspection of a batch, including label decoding."""
    print("\n=== Detailed Batch Inspection ===")
    for key, tensor in batch.items():
        print (f"\n{key}:")
        print(f"Shape: {tensor.shape}")
        print(f"Type: {tensor.dtype}")
        print(f"Device: {tensor.device}")
        if key not in ['input_features', 'feature_attention_mask']:
            print(f" First row: {tensor[0].tolist()}")        
    
    if "input_ids" in batch:
        # find maximum input length
        max_length = batch["input_ids"].shape[1]
        print("\n=== Max Input Length ===")
        print(f"Max input length: {max_length}")
        
        # find maximum feature length
        max_feature_length = batch["input_features"].shape[1]
        print("\n=== Max Feature Length ===")
        print(f"Max feature length: {max_feature_length}")
        
        # Decode input_ids and print the first one
        print("\n=== Decoded input_ids ===")
        first_input = batch["input_ids"][0].tolist()
        decoded_input = processor.tokenizer.decode(first_input, skip_special_tokens=False)
        print(f"Decoded Input (First Row) :\n{decoded_input}")
        # Decode labels and print the first one
    
    if "labels" in batch:
        print("\n=== Decoded Labels ===")
        first_label = batch["labels"][0].tolist()
        # Filter out the misked token (-100) values
        valid_tokens = [token for token in first_label if token != -100]
        decoded_label = processor.tokenizer.decode(valid_tokens, skip_special_tokens=False)
        print(f"Decoded Label (First Row) :\n{decoded_label}")

In [None]:
if tgt_lang_code == "de":
    tgt_lang = "German"
elif tgt_lang_code == "zh":
    tgt_lang = "Chinese"
elif tgt_lang_code == "ar":
    tgt_lang = "Arabic"
else:
    raise ValueError(f"Unsupported target language code: {tgt_lang_code}")


data_collator = AudioDataCollator(processor=processor, 
                                  task_prompt=f"Translate the English speech into {tgt_lang}:",
                                 )

dataloader = DataLoader(dataset["train"],
                        batch_size=1,  # 1024
                        collate_fn=data_collator,
                        shuffle=True
                       )

for batch in dataloader:
    inspect_batch(batch, processor)
    break

# Load the model

### Load a Pre-Trained Checkpoint

In [None]:
from transformers import Qwen2AudioForConditionalGeneration, BitsAndBytesConfig

model = Qwen2AudioForConditionalGeneration.from_pretrained(model_name,
                                                           # torch_dtype=torch.bfloat16,  # more efficient
                                                           cache_dir=model_cache_dir,
                                                           ).to("cuda")

In [None]:
# Pring model number of parameters
num_parameters = sum(p.numel() for p in model.parameters())
print(f"Number of model parameters: {num_parameters:,}")

In [None]:
assert model.device.type == "cuda"  # "Model must be on GPU"

In [None]:
print(model.config)

In [None]:
print("Original config:", model.generation_config)
model.generation_config.do_sample = False
model.generation_config.temperature = None
model.generation_config.top_k = None
model.generation_config.top_p = None
print("Modified config:", model.generation_config)

In [None]:
model

# Training

### Define the Training Configuration

In [None]:
# Traing arguments

import time

batch_size = 4
accumulation_steps = 1
learning_rate = 1e-5
scheduler = "cosine"
warmup_ratio = 0.0
epochs = 3

weight_decay = 0.001  # set to 0.0 for pruned models

data = "acl6060"
model_prefix = model_name.split("/")[-1]
arguments = f"{batch_size}bs-1e5lr-{scheduler}-{epochs}epoch"
run_name = f"{model_prefix}-{arguments}-{data}-new"

user_id = "ymoslem"  # change to your user ID
output_dir = user_id + "/" + run_name

print(output_dir)

In [None]:
from transformers import TrainingArguments

training_args = TrainingArguments(
    output_dir=output_dir,
    
    remove_unused_columns=False,  # Important
    
    per_device_train_batch_size=batch_size,
    per_device_eval_batch_size=batch_size,
    gradient_accumulation_steps=accumulation_steps,
    eval_accumulation_steps=accumulation_steps,
    
    # gradient_checkpointing=True,  # saves memory, but can be slower
    # gradient_checkpointing_kwargs={'use_reentrant':False},
    
    bf16=True,
    
    learning_rate=learning_rate,
    lr_scheduler_type=scheduler,
    warmup_ratio=warmup_ratio,
    
    weight_decay=weight_decay,

    num_train_epochs=epochs,
    
    # eval_strategy="epoch",
    eval_strategy="steps",
    eval_steps=100,
    
    save_strategy="epoch",
    
    save_total_limit=2,
    # load_best_model_at_end=True,
    
    logging_steps=1,
    report_to=["tensorboard"],  # or ["tensorboard", "wandb"]
    run_name=run_name,
    
    push_to_hub=True,
    hub_private_repo=True,
)

In [32]:
training_args.learning_rate

1e-05

In [None]:
print(training_args)

In [35]:
from transformers import Trainer

trainer = Trainer(
    args=training_args,
    model=model,
    train_dataset=dataset["train"],
    eval_dataset=dataset["test"],
    data_collator=data_collator,
)

In [None]:
processor.save_pretrained(training_args.output_dir)

### Training

In [None]:
# Start training
trainer.train()

  
  
  
***

## Push to Hub

In [38]:
kwargs = {
    "dataset_tags": [dataset_name],
    "dataset": ["ymoslem/ACL-6060"],
    "language": ["en", tgt_lang_code],
    "model_name": f"Qwen2-Audio-7B EN-{tgt_lang_code.upper()} Speech Translation",
    "finetuned_from": model_name,
    "tasks": "automatic-speech-recognition",
}

In [None]:
trainer.push_to_hub(**kwargs)

In [None]:
processor.push_to_hub(output_dir)