In [1]:
from transformers import AddedToken, AutoModel, AutoTokenizer, AutoProcessor, Qwen2ForCausalLM, AutoConfig
from transformers.models.whisper.modeling_whisper import WhisperEncoderLayer
from transformers import WhisperPreTrainedModel, WhisperConfig
from transformers.modeling_outputs import BaseModelOutput
from datasets import Audio
import math
import torch
from torch import nn



[2025-05-28 22:40:39,426] [INFO] [real_accelerator.py:239:get_accelerator] Setting ds_accelerator to cuda (auto detect)


/usr/bin/ld: cannot find -laio: No such file or directory
collect2: error: ld returned 1 exit status
/usr/bin/ld: cannot find -laio: No such file or directory
collect2: error: ld returned 1 exit status


In [2]:
class WhisperEncoder(WhisperPreTrainedModel):
    
    def __init__(self, config: WhisperConfig):
        super().__init__(config)
        self.dropout = config.dropout
        self.layerdrop = config.encoder_layerdrop

        embed_dim = config.d_model
        self.num_mel_bins = config.num_mel_bins
        self.padding_idx = config.pad_token_id
        self.max_source_positions = config.max_source_positions
        self.embed_scale = math.sqrt(embed_dim) if config.scale_embedding else 1.0

        self.conv1 = nn.Conv1d(self.num_mel_bins, embed_dim, kernel_size=3, padding=1)
        self.conv2 = nn.Conv1d(embed_dim, embed_dim, kernel_size=3, stride=2, padding=1)
        
        self.range_max_source_positions = torch.arange(self.max_source_positions)

        self.embed_positions = nn.Embedding(self.max_source_positions, embed_dim)
        self.embed_positions.requires_grad_(False)

        self.layers = nn.ModuleList([WhisperEncoderLayer(config) for _ in range(config.encoder_layers)])
        self.layer_norm = nn.LayerNorm(config.d_model)

        self.gradient_checkpointing = False
        self.post_init()

    def _freeze_parameters(self):
        for param in self.parameters():
            param.requires_grad = False
        self._requires_grad = False

    def get_input_embeddings(self) -> nn.Module:
        return self.conv1

    def set_input_embeddings(self, value: nn.Module):
        self.conv1 = value

    def forward(
        self,
        input_features,
        attention_mask=None,
        head_mask=None,
        output_attentions=None,
        output_hidden_states=None,
        return_dict=None,
    ):

        expected_seq_length = self.config.max_source_positions * self.conv1.stride[0] * self.conv2.stride[0]
        if input_features.shape[-1] != expected_seq_length:
            raise ValueError(
                f"Whisper expects the mel input features to be of length {expected_seq_length}, but found {input_features.shape[-1]}. Make sure to pad the input mel features to {expected_seq_length}."
            )

        output_attentions = output_attentions if output_attentions is not None else self.config.output_attentions
        output_hidden_states = (
            output_hidden_states if output_hidden_states is not None else self.config.output_hidden_states
        )
        return_dict = return_dict if return_dict is not None else self.config.use_return_dict
        inputs_embeds = nn.functional.gelu(self.conv1(input_features))
        inputs_embeds = nn.functional.gelu(self.conv2(inputs_embeds))

        inputs_embeds = inputs_embeds.permute(0, 2, 1)
        embed_pos = self.embed_positions(self.range_max_source_positions)

        hidden_states = inputs_embeds + embed_pos
        hidden_states = nn.functional.dropout(hidden_states, p=self.dropout, training=self.training)

        encoder_states = () if output_hidden_states else None
        all_attentions = () if output_attentions else None

        # check if head_mask has a correct number of layers specified if desired
        if head_mask is not None:
            assert head_mask.size()[0] == (len(self.layers)), (
                f"The head_mask should be specified for {len(self.layers)} layers, but it is for {head_mask.size()[0]}."
            )

        for idx, encoder_layer in enumerate(self.layers):
            if output_hidden_states:
                encoder_states = encoder_states + (hidden_states,)
            # add LayerDrop (see https://arxiv.org/abs/1909.11556 for description)
            to_drop = False
            if self.training:
                dropout_probability = torch.rand([])
                if dropout_probability < self.layerdrop:  # skip the layer
                    to_drop = True

            if to_drop:
                layer_outputs = (None, None)
            else:
                if self.gradient_checkpointing and self.training:
                    layer_outputs = self._gradient_checkpointing_func(
                        encoder_layer.__call__,
                        hidden_states,
                        None,
                        (head_mask[idx] if head_mask is not None else None),
                        output_attentions,
                    )
                else:
                    layer_outputs = encoder_layer(
                        hidden_states,
                        None,
                        layer_head_mask=(head_mask[idx] if head_mask is not None else None),
                        output_attentions=output_attentions,
                    )

                hidden_states = layer_outputs[0]

            if output_attentions:
                all_attentions = all_attentions + (layer_outputs[1],)

        hidden_states = self.layer_norm(hidden_states)
        if output_hidden_states:
            encoder_states = encoder_states + (hidden_states,)

        if not return_dict:
            return tuple(v for v in [hidden_states, encoder_states, all_attentions] if v is not None)
        return BaseModelOutput(
            last_hidden_state=hidden_states, hidden_states=encoder_states, attentions=all_attentions
        )

In [3]:
class Model(Qwen2ForCausalLM):
    def __init__(self, config):
        super().__init__(config)
        self.encoder = WhisperEncoder(config.audio_encoder_config)
        self.projection = nn.Linear(self.encoder.config.d_model, self.config.hidden_size, bias=True)
    
    def forward(
        self, 
        input_ids, 
        attention_mask, 
        input_features = None, 
        feature_attention_mask = None, 
        labels = None, 
        **kwargs,
    ):
        inputs_embeds = self.get_input_embeddings()(input_ids)
        if input_features is not None:
            batch_size, _, max_mel_seq_len = input_features.shape
            max_seq_len = (max_mel_seq_len - 2) // 2 + 1
            audio_feat_lengths = self.encoder._get_feat_extract_output_lengths(feature_attention_mask.sum(-1))
            seq_range = (
                torch.arange(0, max_seq_len, dtype=audio_feat_lengths.dtype, device=audio_feat_lengths.device)
                .unsqueeze(0)
                .expand(batch_size, max_seq_len)
            )
            lengths_expand = audio_feat_lengths.unsqueeze(1).expand(batch_size, max_seq_len)
            padding_mask = seq_range >= lengths_expand

            audio_attention_mask_ = padding_mask.view(batch_size, 1, 1, max_seq_len).expand(
                batch_size, 1, max_seq_len, max_seq_len
            )
            audio_attention_mask = audio_attention_mask_.to(
                dtype=self.encoder.conv1.weight.dtype, device=self.encoder.conv1.weight.device
            )
            audio_attention_mask[audio_attention_mask_] = float("-inf")
            audio_outputs = self.encoder(input_features, attention_mask=audio_attention_mask)
            selected_audio_feature = audio_outputs.last_hidden_state
            audio_features = self.projection(selected_audio_feature)
            num_audio_tokens = audio_feat_lengths
            num_audios, max_audio_tokens, embed_dim = audio_features.shape
            audio_features_mask = torch.arange(max_audio_tokens).expand(num_audios, max_audio_tokens).to(
                num_audio_tokens.device
            ) < num_audio_tokens.unsqueeze(1)
            masked_audio_features = audio_features[audio_features_mask].view(-1, embed_dim)
            inputs_embeds[input_ids == model.config.audio_token_index] = masked_audio_features.contiguous()
        
        super_out = self.model.forward(
            inputs_embeds = inputs_embeds, 
            attention_mask = attention_mask,
            output_hidden_states = True,
        )
        return super_out

In [4]:
tokenizer = AutoTokenizer.from_pretrained('Qwen/Qwen2.5-7B-Instruct')
processor = AutoProcessor.from_pretrained('openai/whisper-large-v3')

In [5]:
chat_template = "{% set audio_count = namespace(value=0) %}{% for message in messages %}{% if loop.first and message['role'] != 'system' %}<|im_start|>system\nYou are a helpful assistant.<|im_end|>\n{% endif %}<|im_start|>{{ message['role'] }}\n{% if message['content'] is string %}{{ message['content'] }}<|im_end|>\n{% else %}{% for content in message['content'] %}{% if 'audio' in content or 'audio_url' in content or message['type'] == 'audio' %}{% set audio_count.value = audio_count.value + 1 %}Audio {{ audio_count.value }}: <|audio_bos|><|file_sep|><|audio_eos|>\n{% elif 'text' in content %}{{ content['text'] }}{% endif %}{% endfor %}<|im_end|>\n{% endif %}{% endfor %}{% if add_generation_prompt %}<|im_start|>assistant\n{% endif %}"
tokenizer.chat_template = chat_template

In [6]:
config = AutoConfig.from_pretrained('Qwen/Qwen2.5-7B-Instruct')
audio_encoder_config = AutoConfig.from_pretrained('huseinzol05/whisper-large-v3-encoder')

In [7]:
config.audio_encoder_config = audio_encoder_config

In [8]:
model = Model.from_pretrained('Qwen/Qwen2.5-7B-Instruct', config = config)

Sliding Window Attention is enabled but not implemented for `sdpa`; unexpected results may be encountered.


Loading checkpoint shards:   0%|          | 0/4 [00:00<?, ?it/s]

Some weights of Model were not initialized from the model checkpoint at Qwen/Qwen2.5-7B-Instruct and are newly initialized: ['encoder.conv1.bias', 'encoder.conv1.weight', 'encoder.conv2.bias', 'encoder.conv2.weight', 'encoder.embed_positions.weight', 'encoder.layer_norm.bias', 'encoder.layer_norm.weight', 'encoder.layers.0.fc1.bias', 'encoder.layers.0.fc1.weight', 'encoder.layers.0.fc2.bias', 'encoder.layers.0.fc2.weight', 'encoder.layers.0.final_layer_norm.bias', 'encoder.layers.0.final_layer_norm.weight', 'encoder.layers.0.self_attn.k_proj.weight', 'encoder.layers.0.self_attn.out_proj.bias', 'encoder.layers.0.self_attn.out_proj.weight', 'encoder.layers.0.self_attn.q_proj.bias', 'encoder.layers.0.self_attn.q_proj.weight', 'encoder.layers.0.self_attn.v_proj.bias', 'encoder.layers.0.self_attn.v_proj.weight', 'encoder.layers.0.self_attn_layer_norm.bias', 'encoder.layers.0.self_attn_layer_norm.weight', 'encoder.layers.1.fc1.bias', 'encoder.layers.1.fc1.weight', 'encoder.layers.1.fc2.bias'

In [9]:
model.encoder = model.encoder.from_pretrained('huseinzol05/whisper-large-v3-encoder')

In [10]:
# _ = model.cuda()

In [11]:
audio_token = "<|file_sep|>"
audio_bos_token = "<|audio_bos|>"
audio_eos_token = "<|audio_eos|>"
audio_token_id = tokenizer._convert_token_to_id_with_added_voc(audio_token)
pad_token_id = tokenizer.pad_token_id
new_tokens = [AddedToken(audio_bos_token), AddedToken(audio_eos_token)]
tokenizer.add_tokens(new_tokens)

2

In [12]:
model.config.audio_token_index = audio_token_id

In [13]:
conversation = [
    {"role": "user", "content": [
        {"type": "audio", "audio_url": "audio.wav"},
        {"type": "text", "text": "What does the person say?"},
    ]},
    {"role": "assistant", "content": "Yes, the speaker is female and in her twenties."},
]
text = tokenizer.apply_chat_template(conversation, add_generation_prompt=True, tokenize=False)
text

'<|im_start|>system\nYou are a helpful assistant.<|im_end|>\n<|im_start|>user\nAudio 1: <|audio_bos|><|file_sep|><|audio_eos|>\nWhat does the person say?<|im_end|>\n<|im_start|>assistant\nYes, the speaker is female and in her twenties.<|im_end|>\n<|im_start|>assistant\n'

In [14]:
audio_class = Audio(sampling_rate=16000)

In [15]:
f = 'line-4.mp3'
audio_ = audio_class.decode_example(audio_class.encode_example(f))['array']

In [16]:
audio_lengths = [min(3000, math.ceil(len(audio_) / processor.feature_extractor.hop_length))]
audio_length = audio_lengths.pop(0)
input_length = (audio_length - 1) // 2 + 1

expanded_audio_token = audio_token * input_length

In [17]:
text = text.replace(audio_token, expanded_audio_token)
inputs = tokenizer(text, return_tensors = 'pt')
input_ids = inputs['input_ids']

In [18]:
inputs_audio = processor.feature_extractor(
    [audio_], 
    return_attention_mask=True, 
    padding="max_length", 
    sampling_rate=16000,
    return_tensors = 'pt'
)

input_features = inputs_audio['input_features']
feature_attention_mask = inputs_audio['attention_mask']

In [19]:
model(
    input_ids = input_ids, 
    attention_mask = inputs['attention_mask'],
    input_features = input_features,
    feature_attention_mask = feature_attention_mask,
)

BaseModelOutputWithPast(last_hidden_state=tensor([[[-0.1157,  0.1346,  1.1895,  ..., -0.8239,  0.3654,  1.1142],
         [ 0.3169,  0.7667, -0.2151,  ..., -4.4913,  3.2864,  4.9101],
         [-0.2525,  0.3457, -3.7474,  ...,  0.9931, -0.0381,  3.4437],
         ...,
         [ 3.7294,  3.0233, -0.6418,  ..., -7.0455,  3.4381,  5.4175],
         [ 1.6567,  1.7979, -6.4976,  ..., -5.2488,  2.7147, -0.0828],
         [ 3.4675,  7.1966,  3.6888,  ..., -0.3968, -0.7041, -0.8152]]],
       grad_fn=<MulBackward0>), past_key_values=<transformers.cache_utils.DynamicCache object at 0x7f1e081f01c0>, hidden_states=(tensor([[[ 1.2398e-04, -9.0599e-05,  1.2064e-04,  ..., -2.3842e-04,
          -2.8491e-05, -1.6785e-04],
         [-6.7520e-04, -8.3618e-03, -1.0376e-03,  ..., -3.0273e-02,
          -1.8433e-02, -7.6904e-03],
         [ 1.4572e-03, -4.5776e-03, -8.4229e-03,  ..., -2.0447e-03,
          -8.9264e-04, -6.8970e-03],
         ...,
         [ 1.2398e-04, -9.0599e-05,  1.2064e-04,  ..., -2.