<a href="https://colab.research.google.com/github/softmurata/colab_notebooks/blob/main/audio/MERT.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!python3 -m pip install -U yt-dlp

In [None]:
!yt-dlp -x --audio-format wav "https://youtu.be/V-gxqhWEbxI" -o "%(title)s.%(ext)s"  # vaundy
!yt-dlp -x --audio-format wav "https://youtu.be/6YZlFdTIdzM" -o "%(title)s.%(ext)s"  # one ok rock
!yt-dlp --audio-format wav "https://youtu.be/hN5MBlGv2Ac" -o "%(title)s.%(ext)s" # official
!yt-dlp --audio-format wav "https://youtu.be/oLrp9uTa9gw" -o "%(title)s.%(ext)s"  # official 2
!yt-dlp --audio-format wav "https://youtu.be/lD-GY7WiTd4" -o "%(title)s.%(ext)s"  # twice

In [None]:
start = 30
end = 60
!ffmpeg -i "/content/そんなbitterな話 ⧸ Vaundy：MUSIC VIDEO.wav" -ss $start -t $end /content/out_vaundy.wav
!ffmpeg -i "/content/ONE OK ROCK - Clock Strikes [Official Music Video].wav" -ss $start -t $end /content/out_oneok.wav
!ffmpeg -i "/content/Official髭男dism - Subtitle [Official Video].webm" -ss $start -t $end /content/out_official.wav
!ffmpeg -i "/content/Official髭男dism - TATTOO [Official Video].webm" -ss $start -t $end /content/out_official2.wav
!ffmpeg -i "/content/Bouquet.webm" -ss $start -t $end /content/out_twice.wav

Installation

In [None]:
!pip install transformers accelerate datasets
!pip install nnAudio

Load model

In [None]:
from transformers import Wav2Vec2FeatureExtractor
from transformers import AutoModel
import torch
from torch import nn
import torchaudio.transforms as T
from datasets import load_dataset


device = "cuda" if torch.cuda.is_available() else "cpu"

# loading our model weights
model = AutoModel.from_pretrained("m-a-p/MERT-v1-95M", trust_remote_code=True).to(device)
# loading the corresponding preprocessor config
processor = Wav2Vec2FeatureExtractor.from_pretrained("m-a-p/MERT-v1-95M",trust_remote_code=True)

Demo inference

In [None]:
dataset = load_dataset("hf-internal-testing/librispeech_asr_demo", "clean", split="validation")
dataset = dataset.sort("id")
sampling_rate = dataset.features["audio"].sampling_rate

resample_rate = processor.sampling_rate
# make sure the sample_rate aligned
if resample_rate != sampling_rate:
    print(f'setting rate from {sampling_rate} to {resample_rate}')
    resampler = T.Resample(sampling_rate, resample_rate, dtype=torch.float64)
else:
    resampler = None

# audio file is decoded on the fly
if resampler is None:
    input_audio = dataset[0]["audio"]["array"]
else:
  input_audio = resampler(torch.from_numpy(dataset[0]["audio"]["array"]))

In [None]:
inputs = processor(input_audio, sampling_rate=resample_rate, return_tensors="pt").to(device)
with torch.no_grad():
    outputs = model(**inputs, output_hidden_states=True)

In [None]:
# take a look at the output shape, there are 13 layers of representation
# each layer performs differently in different downstream tasks, you should choose empirically
all_layer_hidden_states = torch.stack(outputs.hidden_states).squeeze()
print(all_layer_hidden_states.shape) # [13 layer, Time steps, 768 feature_dim]

# for utterance level classification tasks, you can simply reduce the representation in time
time_reduced_hidden_states = all_layer_hidden_states.mean(-2)
print(time_reduced_hidden_states.shape) # [13, 768]

# you can even use a learnable weighted average representation
aggregator = nn.Conv1d(in_channels=13, out_channels=1, kernel_size=1)
weighted_avg_hidden_states = aggregator(time_reduced_hidden_states.unsqueeze(0).cpu()).squeeze()
print(weighted_avg_hidden_states.shape) # [768]

Music Analysis

In [56]:
import librosa
def calc_embedding_for_music(input_audio_path):
  y, sr = librosa.load(input_audio_path)

  resample_rate = processor.sampling_rate
  # make sure the sample_rate aligned
  if resample_rate != sampling_rate:
      print(f'setting rate from {sr} to {resample_rate}')
      resampler = T.Resample(sr, resample_rate, dtype=torch.float64)
  else:
      resampler = None

  # audio file is decoded on the fly
  if resampler is None:
      input_audio = y
  else:
    input_audio = resampler(torch.Tensor(y).to(torch.float64))
  
  inputs = processor(input_audio, sampling_rate=resample_rate, return_tensors="pt").to(device)
  with torch.no_grad():
      outputs = model(**inputs, output_hidden_states=True)

  # need to finetuning
  # aggregator = nn.Conv1d(in_channels=13, out_channels=1, kernel_size=1)
  # weighted_avg_hidden_states = aggregator(time_reduced_hidden_states.unsqueeze(0).cpu()).squeeze()
  # print(weighted_avg_hidden_states.shape) # [768]
  # return weighted_avg_hidden_states

  all_layer_hidden_states = torch.stack(outputs.hidden_states).squeeze()
  # print(all_layer_hidden_states.shape) # [13 layer, Time steps, 768 feature_dim]
  time_reduced_hidden_states = all_layer_hidden_states.mean(-2)
  # print(time_reduced_hidden_states.shape) # [13, 768]
  return time_reduced_hidden_states.view(-1).cpu()

In [29]:
from numpy import dot 
from numpy.linalg import norm 
def calc_cos_sim(a, b):
  cos_sim = dot(a, b) / (norm(a) * norm(b)) 
  return cos_sim

In [None]:
embed_vaundy = calc_embedding_for_music("/content/out_vaundy.wav").detach().numpy()
embed_oneok = calc_embedding_for_music("/content/out_oneok.wav").detach().numpy()
embed_official = calc_embedding_for_music("/content/out_official.wav").detach().numpy()
embed_official2 = calc_embedding_for_music("/content/out_official2.wav").detach().numpy()
embed_twice = calc_embedding_for_music("/content/out_twice.wav").detach().numpy()

In [None]:
print(embed_vaundy.shape)

In [65]:
print("official vs vaundy: ", calc_cos_sim(embed_official, embed_vaundy))
print("official vs oneok: ", calc_cos_sim(embed_official, embed_oneok))
print("official vs official2: ", calc_cos_sim(embed_official, embed_official2))
print("official vs twice: ", calc_cos_sim(embed_official, embed_twice))

official vs vaundy:  0.9231535
official vs oneok:  0.9128771
official vs official2:  0.9318339
official vs twice:  0.907945


Single Inference

In [None]:
# load librosa
import librosa
y, sr = librosa.load("/content/out_oneok.wav")
print(type(y), sr)

resample_rate = processor.sampling_rate
# make sure the sample_rate aligned
if resample_rate != sampling_rate:
    print(f'setting rate from {sr} to {resample_rate}')
    resampler = T.Resample(sr, resample_rate, dtype=torch.float64)
else:
    resampler = None

# audio file is decoded on the fly
if resampler is None:
    input_audio = y
else:
  input_audio = resampler(torch.Tensor(y).to(torch.float64))

In [25]:
inputs = processor(input_audio, sampling_rate=resample_rate, return_tensors="pt").to(device)
with torch.no_grad():
    outputs = model(**inputs, output_hidden_states=True)

In [None]:
# take a look at the output shape, there are 13 layers of representation
# each layer performs differently in different downstream tasks, you should choose empirically
all_layer_hidden_states = torch.stack(outputs.hidden_states).squeeze()
print(all_layer_hidden_states.shape) # [13 layer, Time steps, 768 feature_dim]

# for utterance level classification tasks, you can simply reduce the representation in time
time_reduced_hidden_states = all_layer_hidden_states.mean(-2)
print(time_reduced_hidden_states.shape) # [13, 768]

# you can even use a learnable weighted average representation
aggregator = nn.Conv1d(in_channels=13, out_channels=1, kernel_size=1)
weighted_avg_hidden_states = aggregator(time_reduced_hidden_states.unsqueeze(0).cpu()).squeeze()
print(weighted_avg_hidden_states.shape) # [768]