In [None]:
from google.colab import drive
drive.mount('/content/drive')

# Tempo evaluation

## Training

### Import requirements

In [None]:
import pandas as pd
import librosa
from torch.utils.data import Dataset, DataLoader
import torch
import torch.nn as nn
import torch.optim as optim
import matplotlib.pyplot as plt
from IPython.display import clear_output
from functools import reduce
import os
from numpy import float32
from IPython.display import Audio
import torchaudio
import IPython
device = "cuda"
sr=48000
slice_audio_sec = 30

### Utils

In [None]:
def slice_by_second(audio, segment_length_sec, sr=sr):
    # Convert segment length to samples
    segment_length_samples = int(segment_length_sec * sr)
    num_segments = len(audio) // segment_length_samples
    segments = []
    for i in range(num_segments):
        start_sample = i * segment_length_samples
        end_sample = (i + 1) * segment_length_samples
        segment = audio[start_sample:end_sample]
        segments.append(segment)
    return segments

def parse_for_local(path:str):
    is_drive = os.path.isdir('/content/drive/MyDrive/')
    if is_drive:
        return path
    else:
        return path.replace("/content/drive/MyDrive/", "/Users/yoonsookim/Library/CloudStorage/GoogleDrive-ml.laptise@gmail.com/マイドライブ/")

def extract_melspecto(audio_segment):
    hop_length = int(1024/ 4)
    melspectrogram = librosa.feature.melspectrogram(
        y=audio_segment,
        sr=sr,
        n_fft=1024,
        hop_length=hop_length)
    return melspectrogram

### Load audios

In [None]:
sheet = pd.read_csv("https://docs.google.com/spreadsheets/d/1UDQxW1s2D6kUJuYOqQOC5WPmU6UlGb-GTCwoe52_3qw/export?format=csv")
rows = sheet.dropna(subset=["BPM", "AR"])
# List to store loaded audio data
loaded_data = []

dset = []
# Load audio files
for _,row in rows.iterrows():
    path = row["dir"]
    bpm = row["BPM"]
    ar_path = parse_for_local(f"{path}/ar.wav")
    audio, _ = librosa.load(ar_path, sr=sr, mono=True)
    metadata = {
        "bpm": bpm,
        "ar_path": ar_path,
        "title": row["Title"],
        'artist': row['Artist'],
    }
    segments = slice_by_second(audio, slice_audio_sec)
    for segment in segments:
        features = extract_melspecto(segment)
        feature_torch = torch.tensor(features).to(device)
        dset.append({
            "features": feature_torch,
            "bpm": bpm,
            "metadata": metadata,
            "segment": segment
        })
print(len(dset))

### Extract Feature and build dataset

In [None]:
def shape_to_value(shape):
    return reduce(lambda x, y: x * y, shape)

class TempoDataset(Dataset):

    def get_input_size(self):
        shapes = list(self._dset[0]["feature"].shape)
        size = reduce(lambda x, y: x * y, shapes)
        return size

    def _parse_dict(self, d: dict):
        return {
            "feature": d["features"],
            "bpm": float32(d["bpm"]),
            "metadata": d["metadata"],
            "segment": d["segment"]
        }

    def _check_feature_length(self):
        fisrt_shape = self._dset[0]["feature"].shape
        shapes = []
        for d in self._dset:
            shape = d["feature"].shape
            shapes.append(shape)
            if shape != fisrt_shape:
                print(shapes)
                print(d['metadata'])
                raise ValueError("All features must have the same shape.")

    def __init__(self, dset: list[dict]):
        self._dset = list(map(self._parse_dict, dset))
        self._check_feature_length()

    def __len__(self):
        return len(self._dset)

    def __getitem__(self, idx):
        target = self._dset[idx]
        return target["feature"], target["bpm"], target["metadata"], target["segment"]

dataset = TempoDataset(dset)


### Training

In [None]:
# Simple model definition
class TrainReport:

    def __init__(self):
        self.histories = []
        self.troubles = []

    def add_history(self, epoch:int, loss:int, metadata: dict, segment):
        self.histories.append({"epoch": epoch, "loss":loss, "metadata": metadata, "segment": segment})
        is_trouble = self.get_avg_loss() < loss
        print(is_trouble)
        if is_trouble:
            self.regist_troubles()

    def get_loss_list(self):
        return list(map(lambda x: x["loss"], self.histories))

    def get_epoch_list(self):
        return list(map(lambda x: x["epoch"], self.histories))

    def get_avg_loss(self):
        return sum(self.get_loss_list()) / len(self.get_loss_list())

    def regist_troubles(self):
        last_history = self.histories[-1]
        self.troubles.append({
            "epoch": last_history["epoch"],
            "loss": last_history["loss"],
            "metadata": last_history["metadata"],
            "segment": last_history["segment"],
            "avg_loss": self.get_avg_loss()
        })
        print("New trouble added")

    def get_troubles(self):
        return self.troubles


report = TrainReport()
class SimpleModel(nn.Module):
    def __init__(self, input_size:int, hidden_size:int):
        super(SimpleModel, self).__init__()
        self.fc1 = nn.Linear(input_size, hidden_size)
        self.fc2 = nn.Linear(hidden_size, 1)  # 出力層の数は10

    def forward(self, x):
        x = self.flattern(x)
        x = torch.relu(self.fc1(x))
        x = self.fc2(x)  # 出力層（活性化関数なし）
        return x

    def flattern(self, x):
        return x.view(x.size(0), -1)

    def predict(self, audio_path:str):
        y, *_ = librosa.load(audio_path, sr=sr, mono=True)
        sliced, *_ = slice_by_second(y, 30)
        melspecto = extract_melspecto(sliced)
        torch_melspecto = torch.tensor([melspecto]).to(device)
        result = self(torch_melspecto)
        return result.item()

    def _train(self, dataset: TempoDataset, num_epochs:int,optimizer, criterion=nn.MSELoss()):
        # for plot
        losses_for_plot = []  # 各エポックのロスを格納するリスト
        epoches_for_plot = []
        # for log
        messages = ""

        dataloader = DataLoader(dataset, batch_size=1, shuffle=True)
        step = 0
        # Training loop
        for epoch in range(num_epochs):
            # Each Step
            for feature, bpm, metadata, segment in dataloader:
                # Zero the gradients
                optimizer.zero_grad()

                # Forward pass
                outputs = self(feature.to(device))
                # Calculate loss
                loss = criterion(outputs.to(device), bpm.to(device))
                # Backward pass and optimization
                loss.backward()
                optimizer.step()
                step += 1

            clear_output()
            # Each Epoch
            new_epoch = epoch + 1
            report.add_history(epoch=new_epoch, loss=loss.item(), metadata=metadata, segment=segment)
            messages += f'Epoch [{new_epoch}/{num_epochs}], Loss: {loss.item()}, Answer: {bpm.item()}, Predicted: {outputs.item()}\n'
            #plot
            plt.plot(report.get_epoch_list(), report.get_loss_list())
            plt.xlabel('Epoch')
            plt.ylabel('Loss')
            plt.show()
            #log
            print(messages)
            if new_epoch % 5 == 0:
                torch.save(self.state_dict(), f"/content/drive/MyDrive/audio/models/tempo-predictor/e_{new_epoch}.pth")

input_size = dataset.get_input_size()
print(f"Input size is {input_size}")
hidden_size = 1600

model = SimpleModel(input_size, hidden_size).to(device)

lowest_loss_model = {
    "model": None,
    "loss": 1000
}

model._train(
    dataset=dataset,
    num_epochs=1000,
    criterion=nn.MSELoss(),
    optimizer=optim.Adam(model.parameters(), lr=1e-7)
    )

In [None]:

troubles = report.get_troubles()
for trouble in troubles:
    segment = trouble["segment"][0]
    loss = trouble["loss"]
    epoch = trouble["epoch"]
    metadata = trouble["metadata"]
    # print(f"{segment}")
    x = segment.to('cpu').detach().numpy().copy()
    # print(list(segment))

    # torchaudio.load(x, sr)
    # print(segment)
    print(f"{loss}")
    print(epoch)

    IPython.display.display(IPython.display.Audio(x, rate=sr))
    # Audio(x, rate=sr)


## Predict

In [None]:
file_path="/content/drive/MyDrive/audio/batch/MrChu-unknown/ar.wav"

import numpy as np
import librosa
import matplotlib.pyplot as plt

duration = 30
x_sr = 200
bpm_min, bpm_max = 60, 240

# 楽曲の信号を読み込む
y, sr = librosa.load(file_path, offset=38, duration=duration, mono=True)

# ビート検出用信号の生成
# リサンプリング & パワー信号の抽出
x = np.abs(librosa.resample(y=y, orig_sr=sr, target_sr=x_sr)) ** 2
x_len = len(x)

# 各BPMに対応する複素正弦波行列を生成
M = np.zeros((bpm_max, x_len), dtype=complex)
for bpm in range(bpm_min, bpm_max):
    thete = 2 * np.pi * (bpm/60) * (np.arange(0, x_len) / x_sr)
    M[bpm] = np.exp(-1j * thete)

# 各BPMとのマッチング度合い計算
#（複素正弦波行列とビート検出用信号との内積）
x_bpm = np.abs(np.dot(M, x))

# BPM　を算出
bpm = np.argmax(x_bpm)

onset_env = librosa.onset.onset_strength(y=y, sr=sr)
tempo = librosa.feature.rhythm.tempo(onset_envelope=onset_env, sr=sr)


### Deep learning way
model = SimpleModel(input_size, hidden_size).to(device)
model.eval()
model.load_state_dict(torch.load('/content/drive/MyDrive/audio/models/tempo-predictor/e_100.pth'))

# Perform prediction
with torch.no_grad():
    predicted = model.predict(parse_for_local(file_path))
    print(f"Deeplearning BPM: {predicted}")
    print(f"librosa BPM: {tempo}")
    print(f"legacy BPM: {bpm}")
