In [None]:
import os
from glob import glob
import random

import pandas as pd
from tqdm.notebook import tqdm
import IPython.display as ipd
import matplotlib.pyplot as plt
import numpy as np

import librosa

import warnings
warnings.filterwarnings('ignore') # to silence librosa warnings

from sklearn.model_selection import train_test_split

import tensorflow as tf
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Input, Conv2D, MaxPooling2D, Flatten, Dense, Lambda, Dropout
from tensorflow.keras.optimizers.legacy import Adam
from tensorflow.keras import backend as K
from tensorflow.keras.metrics import Precision, Recall

tqdm.pandas()

import seaborn as sns

## Dataset

In [None]:
class Track:
    def __init__(self, filepath: str, sr: int = 22050) -> None:
        self.filepath = filepath
        self.audio, self.sr = librosa.load(filepath, mono=True, sr=sr)
    
    def _normalize_mel_spectrogram(mel_spec: np.ndarray) -> np.ndarray:
        max_val = np.max(mel_spec)
        min_val = np.min(mel_spec)
        normalized_spectrogram = (mel_spec - min_val) / (max_val - min_val)

        return normalized_spectrogram
    
    def audio_extract(self, from_sec: int, to_sec: int) -> np.ndarray:
        from_idx = from_sec * self.sr
        to_idx = to_sec * self.sr

        return self.audio[from_idx:to_idx]
    
    def spectrogram(self, from_sec: int = 60, to_sec: int = 75) -> np.ndarray:
        extract = self.audio_extract(from_sec, to_sec)

        spec = librosa.feature.melspectrogram(y=extract, sr=self.sr, hop_length=2048)
        spec_db = librosa.power_to_db(S=spec, ref=np.max)
        spec_db_norm = Track._normalize_mel_spectrogram(spec_db)

        return spec_db_norm
    
    def tri_spectrogram(self, offset: float = 1.0) -> np.ndarray:
        """
        Take 3 spectrogram of n seconds at 25%, 50% and 75% of the track into one
        Params
        ======
        `offset`: offset the start of the spectrograms by `offset` percent.
        Usefull for data augmentation
        """
        total_length_sec = len(self.audio) / self.sr
        n = 5

        start_25 = int(0.25 * offset * total_length_sec)
        stop_25 = start_25 + n
        start_50 = int(0.50 * offset * total_length_sec)
        stop_50 = start_50 + n
        start_75 = int(0.75 * offset * total_length_sec)
        stop_75 = start_75 + n

        spec_1 = self.spectrogram(start_25, stop_25)
        spec_2 = self.spectrogram(start_50, stop_50)
        spec_3 = self.spectrogram(start_75, stop_75)

        return np.concatenate([spec_1, spec_2, spec_3], axis=1)

class TrackPair:
    def __init__(self, filepath_left: str, filepath_right: str, similar: bool) -> None:
        self.left = Track(filepath_left)
        self.right = Track(filepath_right)
        self.similar = similar

class Dataset:
    def __init__(self, data_path: str, frac: int = 1.0) -> None:
        """
        Build dataset of pairs given a path to the data folder. This folder should look like:

        ```

        data
            different
            |   pair_0
            |   |   track_a.mp3
            |   |   track_b.aif
            |   pair_1
            |   |   track_c.m4a
            |   |   track_b.aiff
            |   ...
            similar
                pair_0
                |   track_d.aif
                |   track_e.aiff
                pair_1
                |   track_z.mp3
                |   track_b.aiff
                ...
        ```
        """

        self.trackpairs: list[TrackPair] = []

        similars = sorted(glob(f"{data_path}/similar/*"))
        limit = round(frac * len(similars))
        print("Loading similar tracks")
        for similar in tqdm(similars[:limit]):
            file_pair = sorted(glob(f"{similar}/*"))
            track_pair = TrackPair(file_pair[0], file_pair[1], 1)
            self.trackpairs.append(track_pair)


        differents = sorted(glob(f"{data_path}/different/*"))
        limit = round(frac * len(differents))
        print("Loading different tracks")
        for different in tqdm(differents[:limit]):
            file_pair = sorted(glob(f"{different}/*"))
            track_pair = TrackPair(file_pair[0], file_pair[1], 0)
            self.trackpairs.append(track_pair)

    def as_dataframe(self) -> pd.DataFrame:
        data = {"left": [], "right": [], "similar": []}

        for track_pair in self.trackpairs:
            data["left"].append(track_pair.left.filepath)
            data["right"].append(track_pair.right.filepath)
            data["similar"].append(int(track_pair.similar))
        
        return pd.DataFrame(data)
    
    def as_training_data(self) -> tuple[np.ndarray, np.ndarray]:
        pairs = []
        labels = []

        for track_pair in self.trackpairs:
            for _ in range(10): # data augmentation. take random parts of the track
                offset = random.random()
                pairs.append([track_pair.left.tri_spectrogram(offset), track_pair.right.tri_spectrogram(offset)])
                labels.append(track_pair.similar)
        
        return np.array(pairs).astype(float), np.array(labels)

In [None]:
dataset = Dataset("./data", 1.00)

In [None]:
dataset.as_dataframe().sample(frac=1.0, random_state=0).head(10)

In [None]:
dataset.trackpairs[0].left.tri_spectrogram().shape

In [None]:
dataset.trackpairs[0].right.tri_spectrogram().shape

In [None]:
def plot_spec(track: Track):
    plt.figure(figsize=(10, 2))

    librosa.display.specshow(track.tri_spectrogram(), y_axis='linear')
    plt.colorbar()

    plt.tight_layout()
    plt.title(os.path.basename(track.filepath))
    plt.show()

In [None]:
# plot similar
for trackpair in dataset.trackpairs[:3]:
    print(f"Similar: {trackpair.similar}")
    plot_spec(trackpair.left)
    plot_spec(trackpair.right)

# plot different
for trackpair in dataset.trackpairs[70:73]:
    print(f"Similar: {trackpair.similar}")
    plot_spec(trackpair.left)
    plot_spec(trackpair.right)

In [None]:
X, y = dataset.as_training_data()
X = np.expand_dims(X, axis=-1)
y = np.float32(y)

print(f"X.shape: {X.shape}")
print(f"y.shape: {y.shape}")

## Build model

In [None]:
input_shape = X.shape[2:]
input_shape

In [None]:
def create_base_network(input_shape):
    input_tensor = Input(shape=input_shape)
    
    x = Conv2D(32, (3, 3), activation='relu')(input_tensor)
    x = MaxPooling2D(pool_size=(2, 2))(x)
    
    x = Conv2D(64, (3, 3), activation='relu')(x)
    x = MaxPooling2D(pool_size=(2, 2))(x)
    
    x = Conv2D(128, (3, 3), activation='relu')(x)
    x = MaxPooling2D(pool_size=(2, 2))(x)
    
    x = Flatten()(x)
    x = Dense(512, activation='relu')(x)
    x = Dropout(0.1)(x)
    x = Dense(128, activation='relu')(x)

    return Model(inputs=input_tensor, outputs=x)

def euclidean_distance(vectors):
    x, y = vectors
    sum_squared = K.sum(K.square(x - y), axis=1, keepdims=True)
    return K.sqrt(K.maximum(sum_squared, K.epsilon()))

def eucl_dist_output_shape(shapes):
    shape1, _ = shapes
    return (shape1[0], 1)

def create_siamese_network(input_shape):
    input_a = Input(shape=input_shape, name="left_input")
    input_b = Input(shape=input_shape, name="right_input")
    
    base_network = create_base_network(input_shape)
    
    processed_a = base_network(input_a)
    processed_b = base_network(input_b)
    
    distance = Lambda(euclidean_distance, output_shape=eucl_dist_output_shape)([processed_a, processed_b])
    
    model = Model([input_a, input_b], distance)
    return model

def contrastive_loss(y_true, y_pred):
    margin = 1.0
    return K.mean(y_true * K.square(y_pred) + (1 - y_true) * K.square(K.maximum(margin - y_pred, 0)))

def f1_score(y_true, y_pred):
    y_pred = tf.round(y_pred)
    tp = K.sum(K.cast(y_true * y_pred, 'float'), axis=0)
    tn = K.sum(K.cast((1 - y_true) * (1 - y_pred), 'float'), axis=0)
    fp = K.sum(K.cast((1 - y_true) * y_pred, 'float'), axis=0)
    fn = K.sum(K.cast(y_true * (1 - y_pred), 'float'), axis=0)

    precision = tp / (tp + fp + K.epsilon())
    recall = tp / (tp + fn + K.epsilon())

    f1 = 2 * (precision * recall) / (precision + recall + K.epsilon())
    return K.mean(f1)

model = create_siamese_network(input_shape)
model.compile(loss=contrastive_loss, optimizer=Adam(learning_rate=0.0001), metrics=[f1_score])
model.summary()

## Train

In [None]:
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, shuffle=True, stratify=y, random_state=0)

In [None]:
history = model.fit(
    [X_train[:,0], X_train[:,1]],
    y_train,
    epochs=50,
    batch_size=8,
    validation_data=([X_test[:,0], X_test[:,1]], y_test),
)

# summarize history for accuracy
plt.plot(history.history['f1_score'])
plt.plot(history.history['val_f1_score'])
plt.title('model f1_score')
plt.ylabel('f1_score')
plt.xlabel('epoch')
plt.legend(['train', 'test'], loc='upper left')
plt.show()

# summarize history for loss
plt.plot(history.history['loss'])
plt.plot(history.history['val_loss'])
plt.title('model loss')
plt.ylabel('loss')
plt.xlabel('epoch')
plt.legend(['train', 'test'], loc='upper left')
plt.show()

In [None]:
pair = TrackPair(
    "/Users/leopnt/Music/TR4X/House/Sweely - Time for Freakness/02 - Sweely - Move for Me.mp3",
    "/Users/leopnt/Music/TR4X/House/Sweely - Time for Freakness/02 - Sweely - Move for Me.mp3",
    1
)

plot_spec(pair.left)
plot_spec(pair.right)

X_new = [
    pair.left.tri_spectrogram().reshape(1, input_shape[0], input_shape[1]),
    pair.right.tri_spectrogram().reshape(1, input_shape[0], input_shape[1])]
model.predict(X_new)

In [None]:
sns.histplot(model.predict([X_test[:,0], X_test[:,1]]))