In [None]:
# PREGUNTES PER AL THOMAS
# 2. How to get Melodia working?


# FIND 2 PATTERNS AND TRAIN A MODEL TO PREDICT THOSE 2 PATTERNS


# Normalizing the cents

# Raga Ritigowla


# VISUALIZATION
# MatPlotLib to visualize the patterns (it's the easiest)
# Plotly dash (more complex)

## 0. Imports

In [None]:
import math
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import librosa
from IPython.display import Audio

In [None]:
# %pip install gdown
# %pip install configobj

In [None]:
# import compiam

## 1. Load files

### 1.1 Annotations

In [None]:
annotations_kj_path = "../data/raw/annotations_koti_janmani.txt"
annotations_vnk_path = "../data/raw/annotations_vanajaksha_ninni_kore.txt"

In [None]:
def to_seconds(t):
    return (t.hour * 60 * 60) + (t.minute * 60) + t.second + (t.microsecond / 1000000)

def load_annotations_file(path: str) -> pd.DataFrame:
    """
    Load annotations from a file.

    :param path: Path to the file containing the annotations.
    :return: A pandas DataFrame containing the annotations.
    """
    # Read the annotations file
    annotations = pd.read_csv(path, sep='\t', header=None)

    # Add column names
    annotations.columns = ["level", "", "start", "end", "duration", "label"]
    del annotations[""]

    # Convert to seconds
    annotations["start"] = pd.to_datetime(annotations["start"])
    annotations["end"] = pd.to_datetime(annotations["end"])
    annotations["start"] = annotations["start"].apply(to_seconds)
    annotations["end"] = annotations["end"].apply(to_seconds)

    annotations.reset_index(inplace=True)

    return annotations

In [None]:
annotations_kj = load_annotations_file(annotations_kj_path)
annotations_vnk = load_annotations_file(annotations_vnk_path)

#annotations_vnk

In [None]:
annotations_kj_usancara = annotations_kj[annotations_kj["level"] == "underlying_sancara"]
annotations_vnk_usancara = annotations_vnk[annotations_vnk["level"] == "root_sancara"]

#annotations_vnk_usancara

### 1.2 Audio

In [None]:
audio_kj_path = "../data/raw/Koti Janmani/Koti Janmani.multitrack-vocal.mp3"
audio_vnk_path = "../data/raw/Vanajaksha Ninne Kori/Vanajaksha Ninne Kori_vocal.mp3"

In [None]:
def load_audio_file(path: str, sampling_rate: int) -> tuple:
    audio_time_series, sr = librosa.load(path, sr=sampling_rate)
    return audio_time_series, sr

In [None]:
audio_kj, sr_kj = load_audio_file(audio_kj_path, 44100)
audio_vnk, sr_vnk = load_audio_file(audio_vnk_path, 44100)

Audio(data=audio_vnk, rate=sr_vnk)

### 1.3 Extract pitch

In [None]:
# Passar la ref a una constant (per fer el canvi de Hz a cents)
tonic_path_kj = "../data/raw/Koti Janmani/Koti Janmani.ctonic.txt"
tonic_path_vnk = "../data/raw/Vanajaksha Ninne Kori/Vanajaksha Ninne Kori.ctonic.txt"

with open(tonic_path_kj, "r") as f:
    ctonic_ref_kj = float(f.readline().strip())

with open(tonic_path_vnk, "r") as f:
    ctonic_ref_vnk = float(f.readline().strip())

ctonic_ref_vnk

In [None]:
from scipy.signal import savgol_filter

def pitch_to_cents(pitch: float, ref: float):
    if pitch == 0:
        return None
    else:
        return 1200 * math.log(pitch/ref, 2)

def interpolate_and_smooth_pitch(pitch):
    pitch = pd.Series(pitch)
    pitch[pitch <= 0] = np.nan
    pitch_interpolated = pitch.interpolate(method="linear")
    pitch_smoothed = savgol_filter(pitch_interpolated, window_length=5, polyorder=2)
    return pitch_smoothed

#### Extract pitch from pitch file

In [None]:
pitch_path_kj = "../data/raw/Koti Janmani/Koti Janmani.melodia.pitch.txt"
pitch_path_vnk = "../data/raw/Vanajaksha Ninne Kori/Vanajaksha Ninne Kori.melodia.pitch.txt"

In [None]:
def load_pitch_file(path: str):
    """
    Load a pitch file from a given path.

    :param path: Path to the pitch file.
    :return: pitch_file, time, pitch, timestep
    """
    pitch_file = pd.read_csv(path, sep="\t", header=None)
    pitch_file.columns = ["time", "pitch"]

    time = pitch_file["time"].values
    pitch = pitch_file["pitch"].values
    timestep = time[1] - time[0]

    return pitch_file, time, pitch, timestep


In [None]:
pitch_file_kj, time_kj, pitch_kj, timestep_kj = load_pitch_file(pitch_path_kj)
pitch_file_vnk, time_vnk, pitch_vnk, timestep_vnk = load_pitch_file(pitch_path_vnk)

# Replace non-positive values with NaN, interpolate and smooth
pitch_kj_smoothed = interpolate_and_smooth_pitch(pitch_kj)
pitch_vnk_smoothed = interpolate_and_smooth_pitch(pitch_vnk)

# Convert pitch to cents
pitch_cents_kj = np.array([pitch_to_cents(p, ctonic_ref_kj) for p in pitch_kj_smoothed])
pitch_cents_vnk = np.array([pitch_to_cents(p, ctonic_ref_vnk) for p in pitch_vnk_smoothed])

In [None]:
#pitch_cents_kj[10000:10020]
pitch_cents_vnk[10000:10020]

##### Pitch was extracted using melodia and saved into the files: "Koti Janmani.melodia.pitch.txt" , "Vanajaksha Ninne Kori.melodia.pitch.txt"

## 2. Feature extraction

### 2.1 Time Domain Features

In [None]:
from librosa.feature import rms
from librosa.feature import zero_crossing_rate as zcr

In [None]:
time_features_kj_df = annotations_kj_usancara.copy()
time_features_vnk_df = annotations_vnk_usancara.copy()

#### 2.1.1 Amplitude Envelope ¿¿??

In [None]:
# No se si serveix d'algo

#### 2.1.2 Root Mean Square Energy

In [None]:
def compute_rms(audio: np.ndarray, sample_start: float, sample_end: float, sr: int) -> float:
    sample = audio[round(sample_start * sr):round(sample_end * sr)]
    return np.mean(rms(y=sample)[0])

In [None]:
# NOTE: a stands for annotation
time_features_kj_df['rmse'] = time_features_kj_df.apply(
    lambda a: compute_rms(audio_kj, a['start'], a['end'], sr_kj), axis=1
)
time_features_vnk_df['rmse'] = time_features_vnk_df.apply(
    lambda a: compute_rms(audio_vnk, a['start'], a['end'], sr_vnk), axis=1
)

#### 2.1.3 Zero Crossing Rate

In [None]:
def compute_zcr(audio: np.ndarray, sample_start: float, sample_end: float, sr: int) -> float:
    sample = audio[round(sample_start * sr):round(sample_end * sr)]
    return np.mean(zcr(y=sample)[0])

In [None]:
time_features_kj_df['zcr'] = time_features_kj_df.apply(
    lambda a: compute_zcr(audio_kj, a['start'], a['end'], sr_kj), axis=1
)
time_features_vnk_df['zcr'] = time_features_vnk_df.apply(
    lambda a: compute_zcr(audio_vnk, a['start'], a['end'], sr_vnk), axis=1
)

#### Time Domain Features DataFrame

In [None]:
time_features_kj_df
# time_features_vnk_df

### 2.2 Frequency Domain Features

In [None]:
from librosa.feature import spectral_centroid as scentroid
from librosa.feature import spectral_bandwidth as sbandwidth
from librosa.feature import spectral_rolloff as srolloff
from librosa.feature import mfcc

In [None]:
frequency_features_kj_df = annotations_kj_usancara.copy()
frequency_features_vnk_df = annotations_vnk_usancara.copy()

#### 2.2.1 Band Energy Ratio (not necessary??)

In [None]:
# TODO: Passar el codi de l'altre notebook

#### 2.2.2 Spectral Centroid

Each frame of a magnitude spectrogram is normalized and treated as a distribution over frequency bins, from which the mean (centroid) is extracted per frame.

In [None]:
def compute_scentroid(audio: np.ndarray, sample_start: float, sample_end: float, sr: int) -> float:
    sample = audio[round(sample_start * sr):round(sample_end * sr)]
    return np.mean(scentroid(y=sample, sr=sr)[0])

In [None]:
frequency_features_kj_df['spectral_centroid'] = frequency_features_kj_df.apply(
    lambda a: compute_scentroid(audio_kj, a['start'], a['end'], sr_kj), axis=1
)
frequency_features_vnk_df['spectral_centroid'] = frequency_features_vnk_df.apply(
    lambda a: compute_scentroid(audio_vnk, a['start'], a['end'], sr_vnk), axis=1
)

#### 2.2.3 Spectral Bandwidth

In [None]:
def compute_sbandwidth(audio: np.ndarray, sample_start: float, sample_end: float, sr: int) -> float:
    sample = audio[round(sample_start * sr):round(sample_end * sr)]
    return np.mean(sbandwidth(y=sample, sr=sr)[0])

In [None]:
frequency_features_kj_df['spectral_bandwidth'] = frequency_features_kj_df.apply(
    lambda a: compute_sbandwidth(audio_kj, a['start'], a['end'], sr_kj), axis=1
)
frequency_features_vnk_df['spectral_bandwidth'] = frequency_features_vnk_df.apply(
    lambda a: compute_sbandwidth(audio_vnk, a['start'], a['end'], sr_vnk), axis=1
)

#### 2.2.4 Spectral Rolloff

The roll-off frequency is defined for each frame as the center frequency for a spectrogram bin such that at least roll_percent (0.85 by default) of the energy of the spectrum in this frame is contained in this bin and the bins below. This can be used to, e.g., approximate the maximum (or minimum) frequency by setting roll_percent to a value close to 1 (or 0).

In [None]:
def compute_srolloff(audio: np.ndarray, sample_start: float, sample_end: float, sr: int) -> float:
    sample = audio[round(sample_start * sr):round(sample_end * sr)]
    return np.mean(srolloff(y=sample, sr=sr)[0])

In [None]:
frequency_features_kj_df['spectral_rolloff'] = frequency_features_kj_df.apply(
    lambda a: compute_srolloff(audio_kj, a['start'], a['end'], sr_kj), axis=1
)
frequency_features_vnk_df['spectral_rolloff'] = frequency_features_vnk_df.apply(
    lambda a: compute_srolloff(audio_vnk, a['start'], a['end'], sr_vnk), axis=1
)

#### 2.2.5 Mel Frequency Cepstral Coefficients

In [None]:
mfcc_kj_df = annotations_kj_usancara.copy()
mfcc_vnk_df = annotations_vnk_usancara.copy()

In [None]:
def compute_mfcc(audio: np.ndarray, sample_start: float, sample_end: float, sr: int) -> np.ndarray:
    sample = audio[round(sample_start * sr):round(sample_end * sr)]
    return np.mean(mfcc(y=sample, sr=sr, n_mfcc=13), axis=1)

mfcc_cols = [f'mfcc_{i+1}' for i in range(13)]

In [None]:
mfcc_kj_df[mfcc_cols] = mfcc_kj_df.apply(
    lambda a: compute_mfcc(audio_kj, a['start'], a['end'], sr_kj), axis=1
).apply(pd.Series)
mfcc_vnk_df[mfcc_cols] = mfcc_vnk_df.apply(
    lambda a: compute_mfcc(audio_vnk, a['start'], a['end'], sr_vnk), axis=1
).apply(pd.Series)

#### Frequency Domain Features DataFrame

In [None]:
#frequency_features_kj_df
#frequency_features_vnk_df
#mfcc_kj_df
#mfcc_vnk_df

#### 2.2.4 Spectral Contrast ¿¿??

Each frame of a spectrogram S is divided into sub-bands. For each sub-band, the energy contrast is estimated by comparing the mean energy in the top quantile (peak energy) to that of the bottom quantile (valley energy). High contrast values generally correspond to clear, narrow-band signals, while low contrast values correspond to broad-band noise.

### 2.3 Pitch Curve Features

In [None]:
pitch_features_kj_df = annotations_kj_usancara.copy()
pitch_features_vnk_df = annotations_vnk_usancara.copy()

#### 2.3.1 Mean pitch, Min/Max and Range

In [None]:
# TODO: Not sure if the min_pitch and max_pitch correspond to each annotation

def get_mean_min_max_pitch(cents: np.ndarray, tstep: float, sample_start: float, sample_end: float):
    #sample_time = time[round(sample_start/tstep):round(sample_end/tstep)]
    sample_cents = cents[round(sample_start/tstep):round(sample_end/tstep)]
    """ sample_cents_clean = [x for x in sample_cents if x is not None]
    if not sample_cents_clean:
        return None """
    return np.mean(sample_cents), min(sample_cents), max(sample_cents)

In [None]:
pitch_features_kj_df[['mean_pitch', 'min_pitch', 'max_pitch']] = pitch_features_kj_df.apply(
    lambda a: get_mean_min_max_pitch(pitch_cents_kj, timestep_kj, a['start'], a['end']), axis=1
).apply(pd.Series)
pitch_features_vnk_df[['mean_pitch', 'min_pitch', 'max_pitch']] = pitch_features_vnk_df.apply(
    lambda a: get_mean_min_max_pitch(pitch_cents_vnk, timestep_vnk, a['start'], a['end']), axis=1
).apply(pd.Series)

In [None]:
# Range
pitch_features_kj_df['pitch_range'] = pitch_features_kj_df['max_pitch'] - pitch_features_kj_df['min_pitch']
pitch_features_vnk_df['pitch_range'] = pitch_features_vnk_df['max_pitch'] - pitch_features_vnk_df['min_pitch']

#### 2.3.2 Number of Change Points

In [None]:
# Use PROMINENCE no get only significant change points (> 70 cents is significant)

In [None]:
from scipy.signal import find_peaks

def compute_number_of_change_points(cents: np.ndarray, tstep: float, sample_start: float, sample_end: float) -> int:
    #sample_time = time[round(sample_start/tstep):round(sample_end/tstep)]
    sample_cents = cents[round(sample_start/tstep):round(sample_end/tstep)]

    peaks, _ = find_peaks(sample_cents, prominence=70)
    valleys, _ = find_peaks(-sample_cents, prominence=70)

    num_change_points = len(peaks) + len(valleys)
    return num_change_points

In [None]:
pitch_features_kj_df['num_change_points'] = pitch_features_kj_df.apply(
    lambda a: compute_number_of_change_points(pitch_cents_kj, timestep_kj, a['start'], a['end']), axis=1
)
pitch_features_vnk_df['num_change_points'] = pitch_features_vnk_df.apply(
    lambda a: compute_number_of_change_points(pitch_cents_vnk, timestep_vnk, a['start'], a['end']), axis=1
)

#### 2.3.3 Number of Change Points per second

In [None]:
# TODO: 

#### Pitch Curve Features DataFrame

In [None]:
pitch_features_kj_df
# pitch_features_vnk_df

### 2.4 Create DataFrame with the Features

In [None]:
cols_to_drop = ["index", "level", "start", "end", "duration", "label"]
features_kj_df = pd.concat([annotations_kj_usancara, 
                        time_features_kj_df.drop(columns=cols_to_drop),
                        frequency_features_kj_df.drop(columns=cols_to_drop),
                        mfcc_kj_df.drop(columns=cols_to_drop),
                        pitch_features_kj_df.drop(columns=cols_to_drop)],
axis=1)
features_vnk_df = pd.concat([annotations_vnk_usancara,
                        time_features_vnk_df.drop(columns=cols_to_drop),
                        frequency_features_vnk_df.drop(columns=cols_to_drop),
                        mfcc_vnk_df.drop(columns=cols_to_drop),
                        pitch_features_vnk_df.drop(columns=cols_to_drop)],
axis=1)

# TODO: Normalize the data
# ...

In [None]:
features_df

In [None]:
# TODO: Evaluate if each feature is significant or not
""" df_ns = df[df['is_nns']==1]
df_no_ns = df[df['is_nns']!=1]
print(np.mean(df_ns[feature]))
print(np.mean(df_no_ns[feature])) """

## 3. Modelling

In [None]:
trial_df = features_df.copy()

In [None]:
# Patterns to use:
# trial_df['label'].value_counts()
# 1. 'sssnnpn'
# 2. 'mgmpmggrs'
# 3. 'gmn'

### 3.1 Get Training Data

In [None]:
from sklearn.model_selection import train_test_split

In [None]:
# Falten BER.
features = ['rmse', 'zcr', 
            'spectral_centroid', 'spectral_bandwidth', 'spectral_rolloff',
            'mfcc_1', 'mfcc_2', 'mfcc_3', 'mfcc_4', 'mfcc_5', 'mfcc_6', 'mfcc_7', 'mfcc_8', 'mfcc_9', 'mfcc_10', 'mfcc_11', 'mfcc_12', 'mfcc_13', 
            'mean_pitch', 'min_pitch', 'max_pitch', 'pitch_range', 'num_change_points']
target1 = ['is_{pattern}']
target2 = ['is_{pattern}']
target3 = ['is_{pattern}']

In [None]:
X = df[features].values
y = df[target].values

In [None]:
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.3, random_state=42)
len(X_train)

### 3.2 Train a Model

In [None]:
from sklearn.ensemble import GradientBoostingClassifier
from sklearn.model_selection import GridSearchCV

In [None]:
clf = GradientBoostingClassifier(...)

In [None]:
clf.fit(X_train, y_train)

In [None]:
params = {
    'n_estimators':[10,50,100],
    'learning_rate':[0.001,0.01,0.1,1],
    'max_depth':[2, 4, 8],
}

gs = GridSearchCV(clf, param_grid=params, scoring='f1', cv=2)
gs.fit(X_train, y_train)
best_clf = gs.best_estimator_
pd.DataFrame(gs.cv_results_)

## 4. Evaluation

In [None]:
from sklearn.metrics import f1_score

In [None]:
best_clf.predict(X_test)
y_pred = best_clf.predict(X_test)
f1_score(y_pred, y_test)