## Imports

In [9]:
import sys
import os
import json
import glob
import subprocess
from collections import OrderedDict
import itertools
import numpy as np
import matplotlib.pyplot as plt
import tqdm

import librosa
from IPython.display import Audio as ipy_audio
from IPython.core.display import display

In [10]:
SCRIPT_DIR = os.path.dirname(os.path.abspath(""))
sys.path.append(SCRIPT_DIR)

from quicktranscribe import tonic, pitch, wave, kde
from mogra import tonnetz
from mogra.datatypes import Swar, normalize_frequency, ratio_to_swar, SWAR_BOUNDARIES

In [None]:
# syntonic comma in the 0 to 1 scale
SYNTONIC_COMMA = (librosa.hz_to_midi(220*81/80) - librosa.hz_to_midi(220))/12

## Util Functions

In [3]:
def fetch_audio(ra, savedir):
    for raag, vv in ra.items():
        for artist, url in vv.items():
            command = f"/opt/homebrew/bin/yt-dlp {url} -f 'ba' -x --audio-format 'mp3' --ffmpeg-location /opt/homebrew/bin/ffmpeg -P {savedir}/ -o {raag}-{artist}.mp3"
            result = subprocess.run(command, shell=True, capture_output=True)
            print(result.stdout.decode())
            if len(result.stderr) > 0:
                print("Error:", result.stderr.decode())

In [4]:
def annotate_tonic(track_path, plot=False):
    DEFAULT_TONIC = 220
    np.set_printoptions(suppress=True)
    
    start=7*60
    end=8*60
    y_stereo, sr = wave.read_audio_section(track_path + ".mp3", start, end)
    y_sample = librosa.to_mono(y_stereo.T)
    
    kde_sample = kde.extract(y_sample, sr=sr, tonic=DEFAULT_TONIC)
    peaks, _ = kde.prominence_based_peak_finder(kde_sample, prominence=0.005)
    print(peaks)

    if plot:
        plt.plot(np.linspace(0, 12, len(kde_sample)), kde_sample, color="teal")
        plt.plot(np.array(peaks) * 12/len(kde_sample), kde_sample[peaks], "o", markersize="3", color="orange")
    
    display(ipy_audio(y_sample, rate=sr))
    input("hear the audio and press any key to continue")
    
    peaks = sorted(peaks, key=lambda x: kde_sample[x], reverse=True)
    found_tonic = False
    for peak in peaks:
        # generate a sine wave of the peak frequency and play it
        fpeak = librosa.midi_to_hz(librosa.hz_to_midi(DEFAULT_TONIC) + 12 * peak / len(kde_sample))
        ypeak = librosa.tone(fpeak, duration=3)
        display(ipy_audio(ypeak, rate=sr))
        ft = input("Is this the tonic? (y/n): ")
        if ft == "y":
            found_tonic = True
            break
    
    if not found_tonic:
        print("No tonic found")
        return None
    
    # write tonic to file
    tonic.write_tonic(track_path + ".ctonic.txt", fpeak)

In [5]:
def read_sample_and_tonic(track_path):
    
    ctonic = tonic.read_tonic(track_path + ".ctonic.txt")
    # metadata = tonic.read_metadata(track_path + ".json")
    # pitch_annotations, aps = pitch.read_pitch(track_path + ".pitch.txt")
    
    # # full audio
    # y_sample, sr = wave.get_audio(track_path + ".mp3")

    # # 3-minute sample
    start=5*60
    end=8*60
    y_stereo, sr = wave.read_audio_section(track_path + ".mp3", start, end)
    y_sample = librosa.to_mono(y_stereo.T)
    # ipy_audio(data=y_sample, rate=sr)
    
    return y_sample, sr, ctonic

In [None]:
def preprocess(track):
    # downsample
    track = track[::16]
    # drop nans
    track = track[~np.isnan(track)]
    # round to nearest int
    track = np.round(track).astype(int)
    
    return track

## Tonnetz

In [None]:
# define the net
gs = tonnetz.EFGenus.from_list([3,3,3,3,5,5])
tn = tonnetz.Tonnetz(gs)

In [None]:
# get the adjacency and equivalence matrices
adjac = tn.adjacency_matrix()
equiv = tn.equivalence_matrix()

## Raags & Tracks

Read Data

In [None]:
DATA_DIR = "concrete-demo/"

In [None]:
artist = "AjoyChakrabarty"
raags_and_times = {
    "Bhoop": [(3,21)],
    "Deshkar": [(3,21)],
}

In [None]:
seqs = OrderedDict({rr:[] for rr in raags_and_times.keys()})
for raag in raags_and_times.keys():
    track_mp3 = glob.glob(DATA_DIR + f"*{raag}*{artist}.mp3")[0]
    track_path = track_mp3[:-4]
    for start_min, end_min in tqdm(raags_and_times[raag]):
        y_sample, sr, ctonic = read_sample_and_tonic(track_path, start_min, end_min)
        ftrack = pitch.track_pitch_pyin(y_sample, sr, ctonic)
        seqs[raag].append(preprocess(ftrack))

In [None]:
import pickle
with open("seqs.pkl", "wb") as f:
    pickle.dump(seqs, f)

In [None]:
# import pickle
# with open("seqs.pkl", "rb") as f:
#     seqs = OrderedDict(pickle.load(f))

Define + Plot Ground Truth

In [None]:
raag_gt_ratios = {
    "Bhoop": [1, 10/9, 5/4, 3/2, 5/3],
    "Deshkar": [1, 9/8, 81/64, 3/2,  27/16],
    "Yaman": [1, 9/8, 5/4, 45/32, 3/2, 27/16, 15/8],
}
raag_gt_nodes = {
    "Bhoop": [(0,0), (1,0), (0,1), (-1,1), (-2,1)],
    "Deshkar": [(0,0), (1,0), (2,0), (3,0), (4,0)],
    "Yaman": [(0,0), (1,0), (2,0), (1,1), (2,1), (0,1), (3,0)],
}

In [None]:
import plotly.graph_objects as go
for raag, gt_nodes in raag_gt_nodes.items():
    fig = go.Figure(data=[go.Scatter(
        x=tn.coords3d[0],
        y=tn.coords3d[1],
        mode="text+markers",
        marker=dict(
            size=21,
            symbol="circle",
            color=["#e0b724" if coord in gt_nodes else "midnightblue" for coord in tn.node_coordinates]
        ),
        text=tn.node_names,
        textposition="middle center",
        textfont=dict(family="Overpass", size=13, color="white"),
    )])
    
    # fig = tn.prep_plot(fig)
    fig.update_layout(title=f"Raag {raag}", xaxis_title="powers of 3", yaxis_title="powers of 5")
    # set major ticks
    fig.update_xaxes(tickvals=np.arange(-4, 5))
    fig.update_yaxes(tickvals=np.arange(-2, 3))
    # fig.update_xaxes(tickvals=np.arange(-2, 3), ticktext=[f"$3^{ii}$" for ii in np.arange(-2, 3)])
    fig.show()

Annotate Tonic

In [1]:
# ONLY ONCE
# fetch_audio(raags_and_artists, DATA_DIR)

In [17]:
# ONLY ONCE
# for raag, vv in raags_and_artists.items():
#     raag_samples = {}
#     for artist, _ in vv.items():
#         track_mp3 = glob.glob(f"{DATA_DIR}/{raag}-{artist}*.mp3")[0]
#         annotate_tonic(track_mp3[:-4])

---
The following is a Work In Progress

---

## TGNN

In [None]:
import torch
from mogra.tgnn import TemporalGNN, swarwise_loss

Parameters

In [None]:
num_nodes = len(tn.node_coordinates)
input_dim = 1  # Feature dimension
hidden_dim = 16  # Hidden layer size
num_classes = 12  # Number of equivalence classes
num_time_steps = 25  # Number of time steps
batch_size = 2  # Number of samples in each batch

Prepare Data

In [None]:
adjac = torch.tensor(adjac, dtype=torch.long)
equiv = torch.tensor(equiv, dtype=torch.long)

In [None]:
# get an array from seqs
min_length = min([len(ii) for ii in seqs["Bhoop"] + seqs["Deshkar"]])
xx = np.array([ii[:min_length] for raag, values in seqs.items() for ii in values])

# xx is currently num_samples x sample_length and contains node indices, make it num_samples x num_nodes x sample_length x input_dim with one-hot node encoding
xseq = np.zeros((xx.shape[0], num_nodes, xx.shape[1], input_dim))
for i in range(xx.shape[0]):
    for j in range(xx.shape[1]):
        tone_options, _ = tn.get_swar_options(Swar(xx[i, j]%12).name)
        for tone_option in tone_options:
            xseq[i, tn.node_coordinates.index(tone_option), j, 0] = 1
xseq = torch.tensor(xseq, dtype=torch.float32)

In [None]:
tn_indices = lambda nodes: [tn.node_coordinates.index(ii) for ii in nodes]
def tn_indices(nodes):
    tni = np.zeros(len(tn.node_coordinates), dtype=int)
    for ii in [tn.node_coordinates.index(ii) for ii in nodes]:
        tni[ii] = 1
    return tni
yy = np.array([tn_indices(raag_gt_nodes[raag]) for raag, values in seqs.items() for ii in values])

# we want to stratify yy labels into 12 classes, with each equivalence class having one (or no) labels
yt = torch.tensor(yy, dtype=torch.long)
yraag = torch.tensor(np.zeros((yt.shape[0], 12)), dtype=torch.long)
for ii, mask in enumerate(equiv.T):
    for jj in range(yt.shape[0]):
        yraag[jj, ii] = (yt[jj, :] * mask).argmax() if sum(yt[jj, :] * mask) > 0 else -1

In [None]:
print(xseq.shape, yraag.shape)

In [None]:
# DataLoader:
# we have num_samples of length num_nodes x sample_length x input_dim
# we want to break down into num_time_steps from sample_length, and return in batches of batch_size
# so we want to return "x"s of batch_size x num_nodes x num_time_steps x input_dim and "y"s of batch_size x num_classes

class SeqDataset(Dataset):
    def __init__(self, xseq, yraag, num_time_steps):
        self.xseq = xseq
        self.yraag = yraag
        self.num_time_steps = num_time_steps
        self.seqs_per_sample = xseq.shape[2] // num_time_steps

    def __len__(self):
        return self.xseq.shape[0] * self.seqs_per_sample

    def __getitem__(self, idx):
        sample_idx = idx // self.seqs_per_sample
        insample_idx = idx % self.seqs_per_sample
        x = self.xseq[sample_idx, :, insample_idx*self.num_time_steps:(insample_idx+1)*self.num_time_steps, :]
        y = self.yraag[sample_idx]
        return x, y

dataset = SeqDataset(xseq, yraag, num_time_steps)
# split into train and val
train_size = int(0.8 * len(dataset))
val_size = len(dataset) - train_size
train_dataset, val_dataset = torch.utils.data.random_split(dataset, [train_size, val_size])

train_dataloader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
val_dataloader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False)

Instantiate model and run

In [None]:
# Instantiate the model
model = TemporalGNN(input_dim, hidden_dim, num_classes, num_time_steps)
optimizer = torch.optim.Adam(model.parameters(), lr=0.01)

In [None]:
train_losses = []
val_losses = []
# optimizer.zero_grad()
for epoch in range(10):
    optimizer.zero_grad()
    train_loss = 0
    for x, y in train_dataloader:
        if x.shape[0] != batch_size:
            continue
        try:
            out = model(x, adjac, equiv)
        except:
            continue
        loss = swarwise_loss(out, y, batch_size)
        loss.backward()
        optimizer.step()
        train_loss += loss.item()
    # print("train loss: ", train_loss)
    train_losses.append(train_loss/len(train_dataloader))
        
    val_loss = 0
    for x, y in val_dataloader:
        if x.shape[0] != batch_size:
            continue
        try:
            out = model(x, adjac, equiv)
        except:
            continue
        loss = swarwise_loss(out, y, batch_size)
        val_loss += loss.item()
    # print("eval loss: ", val_loss)
    val_losses.append(val_loss/len(val_dataloader))
    

In [None]:
plt.plot(train_losses, label="train")
plt.plot(val_losses, label="val")
plt.legend()
plt.show()

In [None]:
# len(train_dataloader)

Confusion Matrix

In [None]:
def get_preds(out_scores, threshold):
    bs = out_scores.shape[0]
    out_cs = out_scores.view(bs*12, -1)
    out_cs_argmax = out_cs.argmax(dim=1)
    out_cs_max = out_cs.max(dim=1)
    for ii, score in enumerate(out_cs_max.values):
        if score < threshold:
            out_cs_argmax[ii] = -1
    out_preds = out_cs_argmax.view(bs, 12)
    return out_preds

In [None]:
labels = []
preds = []
for x, y in val_dataloader:
    out = model(x, adjac, equiv)
    preds.extend(get_preds(out, 0.5).flatten().detach().numpy())
    labels.extend(y.flatten().detach().numpy())

In [None]:
# Plot confusion matrix
from sklearn.metrics import confusion_matrix
import seaborn as sns

cm = confusion_matrix(labels, preds, labels=np.arange(-1, 45, 2))
plt.figure(figsize=(10,7))
sns.heatmap(cm, annot=True, fmt='d', vmax=15, xticklabels=False, yticklabels=False)
plt.xlabel('Predicted')
plt.ylabel('Truth')
plt.show()
