In [1]:
import torchaudio
import torch
import os, csv, argparse, wget
from AST.models import ASTModel
import numpy as np
from torch.cuda.amp import autocast
import IPython

 - 10-second audio waveform -> sequence of 128-dimensional long filterbank 
 - 1024(time) x 128(frequency) spectogram
 - split into 512(64 time) x 8(frequency)) square patches of shape 16x16 fed into AST

In [2]:

# Filterbank
def load_audio(audio_path):
    waveform, sample_rate = torchaudio.load(audio_path)
    audio_info = 'Original input audio length {:.2f} seconds, number of channels: {:d}, sampling rate: {:d}.'.format(waveform.shape[1]/sample_rate, waveform.shape[0], sample_rate)
    if waveform.shape[0] != 1:
        waveform = waveform[0].unsqueeze(0)
        audio_info += ' Only the first channel is used.'
    if sample_rate == 16000:
        pass
    else:
        waveform = torchaudio.functional.resample(waveform, orig_freq=sample_rate, new_freq=16000)
        sample_rate = 16000
        audio_info += ' Resample to 16000Hz.'
    waveform = waveform - waveform.mean()
    fbank = torchaudio.compliance.kaldi.fbank(waveform, htk_compat=True, sample_frequency=sample_rate,
                                              use_energy=False, window_type='hanning',
                                              num_mel_bins=128, dither=0.0, frame_shift=10)
    target_length = 1024
    n_frames = fbank.shape[0]
    p = target_length - n_frames
    if p > 0:
        m = torch.nn.ZeroPad2d((0, 0, 0, p))
        fbank = m(fbank)
    elif p < 0:
        fbank = fbank[0:target_length, :]
    # normalize the fbank
    fbank = (fbank + 5.081) / 4.4849
    return fbank, audio_info
    
def load_label(label_csv):
    with open(label_csv, 'r') as f:
        reader = csv.reader(f, delimiter=',')
        lines = list(reader)
    labels = []
    ids = []  # Each label has a unique id such as "/m/068hy"
    for i1 in range(1, len(lines)):
        id = lines[i1][1]
        label = lines[i1][2]
        ids.append(id)
        labels.append(label)
    return labels

In [3]:
import math
from typing import List, Optional, Tuple, Union

import torch
import torch.utils.checkpoint
from torch import nn
from torch.nn import BCEWithLogitsLoss, CrossEntropyLoss, MSELoss

import numpy as np

import os
import torch
import torch.nn as nn
import timm
from timm.models.layers import to_2tuple, trunc_normal_, DropPath
from timm.models.vision_transformer import Attention, Mlp, PatchEmbed, Block
from pos_embed import get_2d_sincos_pos_embed

### WORKING CODE START

In [4]:
class PatchEmbed(nn.Module):
    def __init__(self, img_size=224, patch_size=16, in_chans=3, embed_dim=768):
        super().__init__()

        img_size = to_2tuple(img_size)
        patch_size = to_2tuple(patch_size)
        num_patches = (img_size[1] // patch_size[1]) * (img_size[0] // patch_size[0])
        self.img_size = img_size
        self.patch_size = patch_size
        self.num_patches = num_patches

        self.proj = nn.Conv2d(in_chans, embed_dim, kernel_size=patch_size, stride=patch_size)

    def forward(self, x):
        x = self.proj(x).flatten(2).transpose(1, 2)
        return x

class Block(nn.Module):
    def __init__(self, dim, num_heads, mlp_ratio=4., qkv_bias=False, qk_scale=None, drop=0., attn_drop=0.,
                 drop_path=0., act_layer=nn.GELU, norm_layer=nn.LayerNorm):
        super().__init__()
        self.norm1 = norm_layer(dim)
        self.norm1_a = norm_layer(dim)
        self.norm1_v = norm_layer(dim)
        self.attn = Attention(
            dim, num_heads=num_heads, qkv_bias=qkv_bias, qk_scale=qk_scale, attn_drop=attn_drop, proj_drop=drop)
        # NOTE: drop path for stochastic depth, we shall see if this is better than dropout here
        self.drop_path = DropPath(drop_path) if drop_path > 0. else nn.Identity()
        self.norm2 = norm_layer(dim)
        self.norm2_a = norm_layer(dim)
        self.norm2_v = norm_layer(dim)
        mlp_hidden_dim = int(dim * mlp_ratio)
        self.mlp = Mlp(in_features=dim, hidden_features=mlp_hidden_dim, act_layer=act_layer, drop=drop)

    def forward(self, x, modality=None):
        if modality == None:
            x = x + self.drop_path(self.attn(self.norm1(x)))
            x = x + self.drop_path(self.mlp(self.norm2(x)))
        elif modality == 'a':
            x = x + self.drop_path(self.attn(self.norm1_a(x)))
            x = x + self.drop_path(self.mlp(self.norm2_a(x)))
        elif modality == 'v':
            x = x + self.drop_path(self.attn(self.norm1_v(x)))
            x = x + self.drop_path(self.mlp(self.norm2_v(x)))

        # this is a workaround to avoid ddp complain
        x = x + 0.0 * (self.norm1(x) + self.norm2(x) + self.norm1_a(x) + self.norm2_a(x) + self.norm1_v(x) + self.norm2_v(x))
        return x

# the finetuned CAV-MAE model
class CAVMAEFTAudio(nn.Module):
    def __init__(self, img_size=224, audio_length=1024, patch_size=16, in_chans=3,
                 embed_dim=768, modality_specific_depth=11, num_heads=12, mlp_ratio=4., norm_layer=nn.LayerNorm, norm_pix_loss=False, tr_pos=True):
        super().__init__()
        timm.models.vision_transformer.Block = Block

        timm.models.vision_transformer.PatchEmbed = PatchEmbed
        timm.models.vision_transformer.Block = Block

        self.patch_embed_a = PatchEmbed(img_size, patch_size, 1, embed_dim)

        self.patch_embed_a.num_patches = int(audio_length * 128 / 256)

        self.modality_a = nn.Parameter(torch.zeros(1, 1, embed_dim))

        self.pos_embed_a = nn.Parameter(torch.zeros(1, self.patch_embed_a.num_patches, embed_dim), requires_grad=tr_pos)  # fixed sin-cos embedding

        self.blocks_a = nn.ModuleList([Block(embed_dim, num_heads, mlp_ratio, qkv_bias=True, qk_scale=None, norm_layer=norm_layer) for i in range(modality_specific_depth)])
        self.blocks_u = nn.ModuleList([Block(embed_dim, num_heads, mlp_ratio, qkv_bias=True, qk_scale=None, norm_layer=norm_layer) for i in range(12 - modality_specific_depth)])

        self.norm_a = norm_layer(embed_dim)

        self.initialize_weights()

    def get_patch_num(self, input_shape, stride):
        test_input = torch.zeros(1, 1, input_shape[0], input_shape[1])
        test_proj = torch.nn.Conv2d(1, 4, kernel_size=(16, 16), stride=(stride, stride))
        test_output = test_proj(test_input)
        return test_output.shape[2], test_output[3], test_output[2] * test_output[2]

    def initialize_weights(self):
        pos_embed_a = get_2d_sincos_pos_embed(self.pos_embed_a.shape[-1], 8, int(self.patch_embed_a.num_patches/8), cls_token=False)
        self.pos_embed_a.data.copy_(torch.from_numpy(pos_embed_a).float().unsqueeze(0))

        w = self.patch_embed_a.proj.weight.data
        torch.nn.init.xavier_uniform_(w.view([w.shape[0], -1]))

        torch.nn.init.normal_(self.modality_a, std=.02)

        self.apply(self._init_weights)

    def _init_weights(self, m):
        if isinstance(m, nn.Linear):
            # we use xavier_uniform following official JAX ViT:
            torch.nn.init.xavier_uniform_(m.weight)
            if isinstance(m, nn.Linear) and m.bias is not None:
                nn.init.constant_(m.bias, 0)
        elif isinstance(m, nn.LayerNorm):
            nn.init.constant_(m.bias, 0)
            nn.init.constant_(m.weight, 1.0)

    def forward(self, a):
        # expect input [b, t, f]
        a = a.unsqueeze(1)
        a = a.transpose(2, 3)
        a = self.patch_embed_a(a)
        a = a + self.pos_embed_a
        a = a + self.modality_a

        for blk in self.blocks_a:
            a = blk(a)

        for blk in self.blocks_u:
            a = blk(a, 'a')

        a = self.norm_a(a)
        # output in shape [b, t, dim]
        return a

In [5]:
from datasets import load_dataset
from prompter import Prompter

In [6]:
prompt = "hello what are you? what can you do?"
response = "I am a helpful AI assistant"

In [7]:
from transformers import AutoTokenizer, AutoModelForSeq2SeqLM
"""
tokenizer = AutoTokenizer.from_pretrained("MBZUAI/LaMini-T5-738M")
inputs = tokenizer(prompt, return_tensors="pt")
"""
from transformers import T5Tokenizer, T5ForConditionalGeneration

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

tokenizer = T5Tokenizer.from_pretrained("google/flan-t5-large")
model = T5ForConditionalGeneration.from_pretrained("google/flan-t5-large", device_map="auto")
model.eval()
model.to(device)

You are using the default legacy behaviour of the <class 'transformers.models.t5.tokenization_t5.T5Tokenizer'>. This is expected, and simply means that the `legacy` (previous) behavior will be used so nothing changes for you. If you want to use the new behaviour, set `legacy=False`. This should only be set if you understand what it means, and thoroughly read the reason why this was added as explained in https://github.com/huggingface/transformers/pull/24565
Special tokens have been added in the vocabulary, make sure the associated word embeddings are fine-tuned or trained.


T5ForConditionalGeneration(
  (shared): Embedding(32128, 1024)
  (encoder): T5Stack(
    (embed_tokens): Embedding(32128, 1024)
    (block): ModuleList(
      (0): T5Block(
        (layer): ModuleList(
          (0): T5LayerSelfAttention(
            (SelfAttention): T5Attention(
              (q): Linear(in_features=1024, out_features=1024, bias=False)
              (k): Linear(in_features=1024, out_features=1024, bias=False)
              (v): Linear(in_features=1024, out_features=1024, bias=False)
              (o): Linear(in_features=1024, out_features=1024, bias=False)
              (relative_attention_bias): Embedding(32, 16)
            )
            (layer_norm): T5LayerNorm()
            (dropout): Dropout(p=0.1, inplace=False)
          )
          (1): T5LayerFF(
            (DenseReluDense): T5DenseGatedActDense(
              (wi_0): Linear(in_features=1024, out_features=2816, bias=False)
              (wi_1): Linear(in_features=1024, out_features=2816, bias=False)
       

In [8]:
input_ids = tokenizer(prompt, return_tensors="pt").input_ids.to("cuda")

In [9]:
file = "./data/sample/audio_samples/audio/5FTf2UXOjd8_000160.flac"
cur_audio_input, audio_info = load_audio(file)
cur_audio_input = cur_audio_input.unsqueeze(0)

In [10]:
audio_encoder = CAVMAEFTAudio()
# projecting to 1024 input embedding dimension for T5
audio_proj = nn.Sequential(nn.LayerNorm(768, elementwise_affine=False), nn.Linear(768, 1024))

In [11]:
audio_input = audio_encoder(cur_audio_input)  # [B, 512, 768]
audio_input = audio_input.reshape(audio_input.shape[0], 8, 64, audio_input.shape[-1])
audio_input = torch.mean(audio_input, dim=1)  # mean pool over the frequency dimension # [B, 64, 768]
audio_input = torch.nn.functional.avg_pool2d(audio_input, (2, 1)) #[B, 32, 768]
# hard norm to 50
audio_input = audio_input / 50
audio_input = audio_proj(audio_input) #[B, 32, 1024]
audio_input.shape

torch.Size([1, 32, 1024])

In [14]:

prompt_text = "what can be infered from this audio following right after this"
input_ids = tokenizer(prompt_text, return_tensors='pt').input_ids.to(device)


with torch.no_grad():
    prompt_embeddings = model.shared(input_ids)  # Shape: (1, sequence_length, 1024)
    prompt_embeddings = prompt_embeddings.to(device)


target_text = "what can be infered from this audio following right after this: The audio clip suggests that someone is playing a steel "\
"guitar or slide guitar. This can be inferred from the smooth and resonant sound of the instrument."

target_ids = tokenizer(target_text, return_tensors='pt').input_ids.to(device)

decoder_input_ids = model._shift_right(target_ids)

audio_embeddings = audio_input.to(device)  # Shape: (1, 32, 1024)

print(audio_embeddings.shape)
print(prompt_embeddings.shape)
# Concatenate prompt and audio embeddings
combined_embeddings = torch.cat((prompt_embeddings, audio_embeddings), dim=1)  # Shape: (1, sequence_length + 32, 1024)

max_length = 512


if combined_embeddings.size(1) > max_length:
    combined_embeddings = combined_embeddings[:, :max_length, :]

padding_length = max_length - combined_embeddings.size(1)
if padding_length > 0:
    padding_tensor = torch.zeros((combined_embeddings.size(0), padding_length, combined_embeddings.size(2))).to(device)
    combined_embeddings = torch.cat((combined_embeddings, padding_tensor), dim=1)


attention_mask = torch.ones(combined_embeddings.size(0), combined_embeddings.size(1)).to(device)
if padding_length > 0:
    attention_mask[:, -padding_length:] = 0


outputs = model(inputs_embeds=combined_embeddings, attention_mask=attention_mask,decoder_input_ids=decoder_input_ids)
print(outputs.loss)
logits = outputs.logits  

predicted_ids = torch.argmax(logits, dim=-1)

decoded_text = tokenizer.decode(predicted_ids[0], skip_special_tokens=True)
decoded_text

torch.Size([1, 32, 1024])
torch.Size([1, 14, 1024])
None


'can be inferered from this audio following right after thisn is is that the is  a guitar guitar,  guitar. is be inferred from the following sound ssonant sound of the guitar.'

# WORKING CODE END

## Miscellaneous


#### Example workings

In [56]:

input_ids = tokenizer(
    "Studies have been shown that owning a dog is good for you", return_tensors="pt"
).input_ids  # Batch size 1
decoder_input_ids = tokenizer("Studies show that", return_tensors="pt").input_ids  # Batch size 1

# preprocess: Prepend decoder_input_ids with start token which is pad token for T5Model.
# This is not needed for torch's T5ForConditionalGeneration as it does this internally using labels arg.
decoder_input_ids = model._shift_right(decoder_input_ids)

# forward pass
outputs = model(input_ids=input_ids, decoder_input_ids=decoder_input_ids)

In [22]:
# Load AST model with AudioSet pretrained weights

checkpoint_path = "./data/audioset_0.4593.pth"
ast = ASTModel(label_dim=527, input_tdim=1024, imagenet_pretrain=False, audioset_pretrain=False)
checkpoint = torch.load(checkpoint_path, map_location="cuda")
audio_model = torch.nn.DataParallel(ast, device_ids=[0])
audio_model.load_state_dict(checkpoint)
audio_model = audio_model.to(torch.device("cuda:0"))
audio_model.eval()

label_csv = "./data/class_labels_indices.csv"
labels = load_label(label_csv)

with torch.no_grad():
  with autocast():
    output = audio_model.forward(fb_data)
    output = torch.sigmoid(output)
result_output = output.data.cpu().numpy()[0]
sorted_indexes = np.argsort(result_output)[::-1]
# Print audio tagging top probabilities
print('Predice results:')
for k in range(10):
    print('- {}: {:.4f}'.format(np.array(labels)[sorted_indexes[k]], result_output[sorted_indexes[k]]))
print('Listen to this sample: ')
IPython.display.Audio('./data/sample/audio_samples/audio/5FTf2UXOjd8_000160.flac', rate=16000)

Predice results:
- Music: 0.9717
- Guitar: 0.6226
- Musical instrument: 0.5547
- Plucked string instrument: 0.5020
- Steel guitar, slide guitar: 0.0828
- Bass guitar: 0.0827
- Tapping (guitar technique): 0.0817
- Strum: 0.0570
- Acoustic guitar: 0.0519
- Electric guitar: 0.0502
Listen to this sample: 
(527,)


In [97]:
from datasets import load_dataset
from prompter import Prompter
data_path = "./openaqa_toy.json"
data = load_dataset("json", data_files=data_path)

tokenizer.padding_side = "left"  # Allow batched inference
tokenizer.pad_token_id = (
        0  # unk. we want this to be different from the eos token
    )
def tokenize(prompt, add_eos_token=True):
    result = tokenizer(
        prompt,
        truncation=True,
        max_length=cutoff_len,
        padding=False,
        return_tensors=None,
    )
    if (
        result["input_ids"][-1] != tokenizer.eos_token_id
        and len(result["input_ids"]) < cutoff_len
        and add_eos_token
    ):
        result["input_ids"].append(tokenizer.eos_token_id)
        result["attention_mask"].append(1)

    result["labels"] = result["input_ids"].copy()
    return result

def generate_and_tokenize_prompt(data_point):
    full_prompt = prompter.generate_prompt(
        data_point["instruction"],
        data_point["input"],
        data_point["output"]
    )
    tokenized_full_prompt = tokenize(full_prompt)
    if not train_on_inputs:
        user_prompt = prompter.generate_prompt(
            data_point["instruction"], data_point["input"]
        )
        tokenized_user_prompt = tokenize(
            user_prompt, add_eos_token=add_eos_token
        )
        user_prompt_len = len(tokenized_user_prompt["input_ids"])

        if add_eos_token:
            user_prompt_len -= 1

        tokenized_full_prompt["labels"] = [
            -100
        ] * user_prompt_len + tokenized_full_prompt["labels"][
            user_prompt_len:
        ]  # could be sped up, probably
    return tokenized_full_prompt



In [48]:
input_ids = tokenizer("The <extra_id_0> walks in <extra_id_1> park", return_tensors="pt").input_ids
labels = tokenizer("<extra_id_0> cute dog <extra_id_1> the <extra_id_2>", return_tensors="pt").input_ids

# the forward function automatically creates the correct decoder_input_ids
loss = model(input_ids=input_ids, labels=labels).loss
loss.item()

9.598347663879395

In [28]:
max_source_length = 768
max_target_length = 768
input_sequence_1 = "Welcome to NYC"
output_sequence_1 = "Bienvenue à NYC"

input_sequence_2 = "HuggingFace is a company"
output_sequence_2 = "HuggingFace est une entreprise"

# encode the inputs
task_prefix = "translate English to French: "
input_sequences = [input_sequence_1, input_sequence_2]

encoding = tokenizer(
    [task_prefix + sequence for sequence in input_sequences],
    padding="longest",
    max_length=max_source_length,
    truncation=True,
    return_tensors="pt",
)

In [29]:
input_ids, attention_mask = encoding.input_ids, encoding.attention_mask

In [30]:
target_encoding = tokenizer(
    [output_sequence_1, output_sequence_2],
    padding="longest",
    max_length=max_target_length,
    truncation=True,
    return_tensors="pt",
)
labels = target_encoding.input_ids

# replace padding token id's of the labels by -100 so it's ignored by the loss
labels[labels == tokenizer.pad_token_id] = -100

# forward pass
loss = model(input_ids=input_ids, attention_mask=attention_mask, labels=labels).loss
loss.item()

1.2628182172775269