In [None]:
!pip install tqdm scipy scikit-learn pandas h5py matplotlib ksvd

In [None]:
%config InlineBackend.figure_formats = ['svg']
import h5py
import joblib
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from tqdm import tqdm
from scipy.fft import fft
from scipy.signal.windows import hamming
from sklearn.model_selection import train_test_split
from sklearn.ensemble import GradientBoostingClassifier
from IPython.display import Audio

### Global (hyper)parameters

In [None]:
# datasets and cache
dataset_path = '/Users/pjrule/Downloads/musicnet.h5'
dataset_meta_path = '/Users/pjrule/Downloads/musicnet_metadata.csv'
fingerprints_cache_path = './fingerprints.npz'  # WARNING: not currently refreshed across parameter changes
grad_model_cache_path = './grad_model_full.joblib'   # WARNING: not currently refreshed across parameter changes

# FFT parameters
fs = 44100  # 44.1 kHz sample rate
window_size = 1024
sample_interval_sec = 0.05  # in seconds
chunk_size = 30  # in seconds

# model training parameters
random_state = 0
test_size = 0.2  # train/test split

In [None]:
sample_interval = int(sample_interval_sec * fs)
intervals_per_chunk = int(chunk_size / sample_interval_sec)

In [None]:
dataset = h5py.File(dataset_path, 'r')

In [None]:
meta_df = pd.read_csv(dataset_meta_path).set_index('id')

## Fingerprinting
For each recording, we generate an _audio fingerprint matrix_ by capturing the (truncated) audio spectrogram at a fixed interval. 

In [None]:
def audio_fingerprint(audio: np.ndarray):
  """Takes a fingerprint of an audio signal by sampling the spectogram at a fixed interval."""
  n_samples = audio.shape[0] // sample_interval
  fingerprint = np.empty((n_samples, window_size // 8))
  window = hamming(window_size)
  for sample_idx in range(n_samples):
    sample = audio[sample_idx * window_size:(sample_idx + 1) * window_size]
    sample_mag = np.abs(sample)
    if sample_mag.max() > 0:
      normalized_sample = sample / sample_mag.max()
    else:
      normalized_sample = sample
    windowed_sample = window * normalized_sample
    fingerprint[sample_idx] = np.abs(fft(windowed_sample))[:window_size//8]
  return fingerprint

In [None]:
def midi_fingerprint(labels: np.ndarray):
  """TODO – use interval tree"""

In [None]:
try:
  fingerprints_by_id = np.load(fingerprints_cache_path)
except FileNotFoundError:
  fingerprints_by_id = {}
  for key in tqdm(dataset):
    fingerprints_by_id[key.split('id_')[1]] = audio_fingerprint(dataset[key]['data'][:])
  np.savez_compressed(fingerprints_cache_path, **fingerprints_by_id)

In [None]:
Audio(dataset['id_2572/data'][:], rate=fs)

### Chunking
The recordings in the MusicNet dataset are of variable length. For our purposes, we can devise a more interesting dataset (with more unique examples) by breaking the MusicNet recordings into fixed-length chunks. Because no musical passage should repeat _exactly_, overfitting shouldn't be an enormous concern (at least for first-order exploratory work); if we see evidence of overfitting, we can split into training/test sets at the recording level instead of the chunk level.

In [None]:
def fingerprint_chunks_by_column(col: str):
  ids_by_col = {
    label: set(meta_df.iloc[idx].name for idx in indices)
    for label, indices in meta_df.groupby(col).indices.items()
  }
  label_to_id = {label: idx for idx, label in enumerate(ids_by_col)}
  
  chunks = []
  chunk_label_ids = []
  for label, ids in ids_by_col.items():
    for recording_id in ids:
      recording_fingerprints = fingerprints_by_id[str(recording_id)]
      for pos in range(0, len(recording_fingerprints), intervals_per_chunk):
        chunk = recording_fingerprints[pos:pos + intervals_per_chunk]
        if chunk.shape[0] == intervals_per_chunk:  # exclude partial chunks (at end)
          chunks.append(chunk)
          chunk_label_ids.append(label_to_id[label])
  return np.array(chunks), np.array(chunk_label_ids)

In [None]:
chunks, chunk_labels = fingerprint_chunks_by_column('ensemble')

In [None]:
chunks.shape

In [None]:
plt.imshow(np.log(chunks[1400]).T)
plt.show()

In [None]:
chunks_train, chunks_test, chunk_labels_train, chunk_labels_test = train_test_split(
  chunks,
  chunk_labels,
  test_size=test_size,
  random_state=random_state
)

In [None]:
def chunks_to_samples(chunks, chunk_labels):
  samples = chunks.reshape(chunks.shape[0] * chunks.shape[1], -1)
  sample_labels = chunk_labels.repeat(chunks.shape[1])
  return samples, sample_labels

In [None]:
samples_train, sample_labels_train = chunks_to_samples(chunks_train, chunk_labels_train)

## Baseline models
* Logistic regression
* Gradient boosting
* KSVD

### Gradient boosting

In [None]:
try:
  grad_model = 
except FileNotFoundError:
  grad_model = GradientBoostingClassifier(random_state=random_state, verbose=True)
  grad_model.fit(samples_train, sample_labels_train)

## Experiment matrix
Labels: instrumentation/ensemble, key, composer (Eric says: most interesting!), composer region, composer era

Classifier: dictionary learning via randomized LU (two variants—say, tuning $k$ and $\ell$; RRLU?), linear classifier, MLP

Representation: audio fingerprints (FFT), MIDI

Benchmark: accuracy, speed (wall clock), (if feasible) approximate # of FLOPs

Breakdown:
* Parker will draft a training/test pipeline and fine-tune our current randomized LU implementation
* Eric will implement the dictionary classifier (inputs: randomized LU parameters & FFT/MIDI dictionary vectors; uses: randomized LU implementation; outputs: compressed dictionaries)
* Zoe will port linear model/MLP model from MusicNet tutorials

TODO: can we fit in rank-deficient least squares?

Other random things:
* Parker will subset the data to make it easier to play with locally