In [36]:
 script_idea = """
SCRIPT FOR PREPROCESSING THE DATA:
Given annotations .txt and .wavs of a couple of songs we will:
For each song:
  For each label (drumtype, ie: KD , HH, SD...):
    1) Binarize the data (ie KD / OTHER)
    2) Drop close observations
    3) For each onset in the song:
      3.1) Take +- a few miliseconds (ie : dMS = +-50) of the signal
      3.2) Pad the signal
      3.3) Perform STFT over the padded signal.

"""
print(script_idea)


SCRIPT FOR PREPROCESSING THE DATA:
Given annotations .txt and .wavs of a couple of songs we will:
For each song:
 For each label (drumtype, ie: KD , HH, SD...):
   1) Binarize the data (ie KD / OTHER)
   2) Drop close observations
   3) For each onset in the song:
     3.1) Take +- a few miliseconds (ie : dMS = +-50) of the signal
     3.2) Pad the signal
     3.3) Perform STFT over the padded signal.




In [None]:
import librosa
import sympy
import warnings
from librosa import display
import matplotlib.pyplot as plt
from google.colab import drive
import os,sys,re,pandas as pd,numpy as np
import seaborn as sns
from scipy.io import wavfile
import math
import glob
from sympy import Interval
import logging
import warnings


drive.mount(ROOT_DIR)


Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [None]:
# Don't orget to type My Drive before the whole path
ROOT_DIR = "/content/drive"
MUSIC_DIR = os.path.join(ROOT_DIR,'My Drive/Maestria DM y KDD/Especializacion tesis/MDBDrums/MDB Drums')
AUDIO_DIR = os.path.join(MUSIC_DIR,'audio','drum_only')
wavs_traintest_allpaths = glob.glob(AUDIO_DIR+"/*/*.wav")
wavs_traintest_allpaths[0]


'/content/drive/My Drive/Maestria DM y KDD/Especializacion tesis/MDBDrums/MDB Drums/audio/drum_only/train/MusicDelta_80sRock_Drum.wav'

In [None]:
ANNOTATIONS_DIR = os.path.join(MUSIC_DIR,'annotations','class')
annotations_traintest_allpaths = glob.glob(ANNOTATIONS_DIR+"/*/*.txt")
annotations_traintest_allpaths[0]

'/content/drive/My Drive/Maestria DM y KDD/Especializacion tesis/MDBDrums/MDB Drums/annotations/class/test/MusicDelta_Hendrix_class.txt'

In [None]:
def annotation_file2df(annotation_path):
  """
  Preprocessing function for parsing the annotation path from MDB Drums.
  params : annotation_path must be .txt from MDB drums (class)
  returns : df with annotations
  """
  with open(annotation_path,"r") as f:
    myfile = f.read()
  my_annotations = [x.split("\t") for x in myfile.split("\n")[:-1]]
  df_annotations = pd.DataFrame(my_annotations,columns=["time","class"])
  df_annotations['class'] =  df_annotations['class'].str.strip()
  df_annotations['time'] = df_annotations['time'].astype('float')
  return df_annotations

def search_correspondingpath_given_annotation(annotation_txt):
  """
  Params: 
    annotation_txt 
  Returns:
    Correspondng annotation in .wav for this folder
  Example:
    annotation_txt = ./my_drums_class.txt
    output = search_correspondingpath_given_annotation(annotation_txt)
    >> ./my_drums_Drum.wav
  """
  filepath_wav = os.path.basename(annotation_txt).replace(".txt",".wav")
  # required preprocess 
  filepath_wav = filepath_wav.replace("_class","_Drum")
  if "test" in annotation_txt: #means need to go to perform the search where the ttest set is
    filepath_wav = os.path.join(AUDIO_DIR,"test",filepath_wav)
  elif "train" in annotation_txt:#means need to go to perform the search where the train  set is
    filepath_wav = os.path.join(AUDIO_DIR,"train",filepath_wav)
  else:
    raise ValueError("must be either in test or train , review the file")
    assert os.path.exists(filepath_wav),"The file doesn't exist , review your annotation process"
  return filepath_wav

def signal_zero_padding(original_signal,desired_signal_size,verbose=0):
  """
  Perform signal_zero_padding adding the same (or almost, diff by 1) quantity of zeros to the left and to the right of the original_signal ; the final
  signal length will be equal to  desired_signal_size
  """
  original_signal_size = len(original_signal)
  if verbose>0:
    print("Original Signal length:",original_signal_size)
  x = (desired_signal_size-original_signal_size)/2
  signal_start,signal_end = math.floor(x),math.floor(x)+original_signal_size
  new_signal = np.zeros(desired_signal_size)
  new_signal[signal_start:signal_end] = original_signal
  return new_signal

def signal2melspectrogram(signal: np.array,samping_rate:int,*args):
  S = librosa.feature.melspectrogram(y=signal, sr=samping_rate, n_mels=4096,fmax=8000,*args)
  return S

def onset2stft(onset_padded, sampling_rate=22050,stride=4,window_size = 128):
  """
  Function for performing stft based on librosa. Taking the positive part of the signal.
  Function taken from https://colab.research.google.com/drive/1coMGL4gJhmSS93C1JK74uOUewL1lDZvU#scrollTo=XuNucqMgkGQU

  Params:
    onset_padded -> signal 1darray (you should perform padding before)
    sampling_rate int : sampling rate (in MDB drums is 22050)
    --- Check librosa.stft for the following params:
    stride: hop_length
    window_size: n_fft 
  Returns:
    signal after stft with only the positive part 
  """
  # from https://stackoverflow.com/questions/56286595/librosas-fft-and-scipys-fft-are-different
  # option 2
  #window_size = 128  # 2048-sample fourier windows
  #stride = 4      # 512 samples between windows
  wps = sampling_rate/float(stride) # ~86 windows/second
  X_libs = librosa.stft(onset_padded, n_fft=window_size, hop_length=stride)
  X_libs = np.abs(X_libs)[:,:int(2*wps)]
  # important
  # wsize = 64 ; so output expected is 64/2+1 =33 rows  (positive part of the fft)
  #  columns: depends on how much stride there is; stride = 64 ; signal of 128 --> 128/64 +1 = 3 ; stride = 16 --> 128/16 +1  =8  +1 = 9
  return X_libs

def song2taggedbars(annotation:pd.DataFrame(),signal:np.array,sampling_rate:int,secondswindow=0.1,desired_signal_size=1024,stride=32,window_size = 128,verbose=1):
  """
  convert the signal using the annotations sets to a lot of 'bars' onset + - 0.1 seconds , perform stft overthat and goes to npy
  """
  #zero_point_one_seconds = int(secondswindow*sampling_rate/60) # 36 samples = approximately 0.1 seconds
  # why over 60??
  mseconds_samples = int(secondswindow*sampling_rate) # 36 samples = approximately 0.1 seconds
  if verbose>0:
    print("Mseconds taken:",mseconds_samples)
  onsets_list,labels_list = list(),list()
  number_onsets_song = len(annotation)
  for onset_annotation_idx in range(number_onsets_song):
    onset_annotation,label = annotation.loc[onset_annotation_idx,"sampling_time"],annotation.loc[onset_annotation_idx,"class"]
    onset_start,onset_end = max(0,onset_annotation-mseconds_samples),min(len(signal),onset_annotation+mseconds_samples)

    if verbose>0:
      print("Onset annotation label:",onset_annotation)
      print("Onset start:",onset_start)
      print("Onset end:",onset_end)
    onset = signal[onset_start:onset_end]
    onset_padded = signal_zero_padding(original_signal=onset,desired_signal_size=desired_signal_size, verbose = verbose)
    onset_stft = onset2stft(onset_padded,sampling_rate=sampling_rate,stride=stride,window_size = window_size)
    onsets_list.append(onset_stft)
    labels_list.append(label)
  return onsets_list,labels_list

In [None]:
config_signal_params = {"hop_size":256,"n_fft":1024,"desired_signal_size_for_padding":4096,"seconds_window":0.05}
# LITTLE Explanation about  the params that i made for myself 
"""
513x17 is shape: 1024 / 2 is 512 (so 513?)  (config_signal_params["n_fft"]/2 +1 ) using the positive part of the ft
Signal is 4096 (config_signal_params["desired_signal_size_for_padding"])
seconds_window: has impact on how much your you go forward and backwards to get the signal LOL!
                so if seconds_window = 0.05 you will take a signal of 10ms (from your annotation point, 5 ms forward and 5ms backward)
                then you will pad that to 4096 samples.
                (since 10ms *22050 =2250 samples -> 2048 is not enough padding -> therefore 4096)
hop_size: 256 ; n_fft: length of the window 1024 moving by 256: how many times we get?:
4096/256 +1 = 17 ; config_signal_params["n_fft"]/config_signal_params["hop_size"] +1 ; so, 17 windows in the interval
"""

'\n513x17 is shape: 1024 / 2 is 512 (so 513?)  (config_signal_params["n_fft"]/2 +1 ) using the positive part of the ft\nSignal is 4096 (config_signal_params["desired_signal_size_for_padding"])\nseconds_window: has impact on how much your you go forward and backwards to get the signal LOL!\n                so if seconds_window = 0.05 you will take a signal of 10ms (from your annotation point, 5 ms forward and 5ms backward)\n                then you will pad that to 4096 samples.\n                (since 10ms *22050 =2250 samples -> 2048 is not enough padding -> therefore 4096)\nhop_size: 256 ; n_fft: length of the window 1024 moving by 256: how many times we get?:\n4096/256 +1 = 17 ; config_signal_params["n_fft"]/config_signal_params["hop_size"] +1 ; so, 17 windows in the interval\n'

In [None]:
expected_height = config_signal_params["desired_signal_size_for_padding"]/config_signal_params["hop_size"] + 1 
expected_width = config_signal_params["n_fft"]/2 +1 # for accounting only the positive part
total_nfeatures4model = expected_height*expected_width
current_sampling_rate = 22050 
total_samples_per_onset = (config_signal_params["seconds_window"]*current_sampling_rate)*2 # goes 0.05 secdonds ahead and 0.05 seconds back


# Little explanation I mad for myself about min_signal_size_for_padding
"""
2^x = total_samples_per_onset
log2(2^x) = log2(total_samples_per_onset)
x * log2(2) = log2(total_samples_per_onset)
x = log2(total_samples_per_onset) # need to be a rounded integer so
x ~ round(log2(total_samples_per_onset))
so min_singal_size is 2**x 
"""
min_signal_size_for_padding = int(2**(np.trunc(np.log2(total_samples_per_onset)+1)))

warnings.warn(f"\n ====== Current samlping rate for calculus is {current_sampling_rate} if you change the dataset, you SHOULD change this as well \n ======")
print(f"Expected height: {expected_height} ; Expected width: {expected_width} ; Total featuers for the model:  {total_nfeatures4model}; Min signal size4padding: {min_signal_size_for_padding}")

Expected height: 17.0 ; Expected width: 513.0 ; Total featuers for the model:  8721.0; Min signal size4padding: 4096




In [None]:
print(f"[INFO] Config signal params are: {config_signal_params}")

[INFO] Config signal params are: {'hop_size': 256, 'n_fft': 1024, 'desired_signal_size_for_padding': 4096, 'seconds_window': 0.05}


In [None]:
def retag_labels(label:str,target_label:str) -> str:
  """
  Function that:
    Retag labels to Other , if the label doesn't match with the target_label
    For example if label = 'KD' then for all labels thata re not kd will be targetted as 'OTHER'
  
  Params:
    label: str label like 'KD' or 'HH'
    target_label: label that should not be changed to OTHER
  Returns:
    the name of the label if it is ieuqla to target_label; else will be 'OTHER'
  """
  if label == target_label:
    newlabel = target_label
  else:
    newlabel = 'OTHER'
  return newlabel

def onsetsidx2drop(annotation:pd.DataFrame,target_label:str,seconds_window:float) -> list:
  """
  This function creates upperbound and lowerbounds if any onset falls between the bounds,
  means our objecti veonset and a certain onset occurs at the same window and therefore
  we must delete the nonobjective onset (to avoid overlapping of sounds).

  Params:
    annotation: annotations dataset
    target_label: target_label in the dataset For example 'KD'
    seconds_window: How much close two onsets need to be for thinking about
                    dropping one of them.
  Returns:
    Indexes of the onsets we should drop, since they are overlapping with a sound.
    Example: Let's say a HiHat occurs at second 0.113 and KickDrum at 0.115; 
            Then you take +- 0.005s and take the stft: the signal of the HiHat
            will start at 0.108 and end at 0.118 ; the kcikdrum from 0.110 to 0.12
            Both onsets will share a huge part, thefore for the model will
            be confusing to say the first one is a HiHat and the second one
            not (the signal is ALMOST the same); thus , we delete that.

  """
  indexes2drop = []
  nlabels = len(annotation)
  for idx in range(nlabels):
    curent_label = annotation['class'][idx]
    if curent_label == target_label:
      current_time = annotation['time'][idx]
      uppercap,lowercap = current_time+seconds_window,current_time-seconds_window
      cap_tuple = (lowercap,uppercap)
      cap_tuple_interval = sympy.Interval(*cap_tuple)
      # Now go to previous and next label if possible
      prev_idx,next_idx = idx-1,idx+1
      if prev_idx>=0:
        prev_time = annotation['time'][prev_idx]
        contains_onsetime = cap_tuple_interval.contains(prev_time)
        if contains_onsetime:
          indexes2drop.append(prev_idx)
      if next_idx<nlabels:
        next_time = annotation['time'][next_idx]
        #perform here
        contains_onsetime = cap_tuple_interval.contains(next_time)
        if contains_onsetime:
          indexes2drop.append(next_idx)
  return indexes2drop

def drop_close_obs(annotation:pd.DataFrame,target_label:str,seconds_window:float) ->pd.DataFrame:
  """
  This function will drop labels that are close enough to our objective labels. IE: Say our target_label is kick drum , so if a kickdrum and a hihat occurs
  at the same time , we will have training data for kickdrum and for hihat(rebaled as other) for the same signal ,that will confuse the model and this is why need to be dropped.
  
  target_label : 
  returns : newannotation dataset without the onsets close enough to our target_label's onsets
  """
  #relabel data
  newannotation = annotation.copy()
  newannotation['class'] = [retag_labels(label,target_label) for label in newannotation['class']]
  indexes2drop = onsetsidx2drop(newannotation,target_label,seconds_window)
  newannotation = newannotation.loc[set(newannotation.index)-set(indexes2drop)].reset_index(drop=True)
  return newannotation

Wit this, we'll be generating for each target level its respective onset with stft

In [None]:
# we will generate a dataset of stft , for each label
target_labels_list = ['KD','SD','HH','TT','CY','OT']

In [None]:
EXP_PIPE_DATA = os.path.join(MUSIC_DIR,"pipe_example_multiplemodelsdata")

if not os.path.exists(EXP_PIPE_DATA):
  os.mkdir(EXP_PIPE_DATA)

log_file_path = os.path.join(EXP_PIPE_DATA,"trained_models_info.log")
logging.basicConfig(filename=log_file_path, level=logging.INFO,
format='%(asctime)s %(message)s', datefmt='%m/%d/%Y %I:%M:%S %p')
logging.info(config_signal_params)

EXP_PIPE_DATA_TRAIN = os.path.join(EXP_PIPE_DATA,"train")
EXP_PIPE_DATA_TEST = os.path.join(EXP_PIPE_DATA,"test")

#create folders if neccesary
if not os.path.exists(EXP_PIPE_DATA_TRAIN):
  os.mkdir(EXP_PIPE_DATA_TRAIN)
  
if not os.path.exists(EXP_PIPE_DATA_TEST):
  os.mkdir(EXP_PIPE_DATA_TEST)

def run_fullpipeline(dataset_type:str) -> None:
  """
  Run the whole pipeline of preprocessing
  Params:
    dataset_type: "train" if you wanna do it for all train files ;
                      "test" if yo uwanna do it for all test files
  Returns:
    Nothing
    This saves in output dirs: EXP_PIPE_DATA_TRAIN IN CASE dataset_type train
                              EXP_PIPE_DATA_TEST in case dataset_type test 
  """
  npaths = len(annotations_traintest_allpaths)
  for filepath_idx in range(npaths):
    filepath_txt = annotations_traintest_allpaths[filepath_idx]
    if dataset_type in filepath_txt:
        for target_label in target_labels_list: # for each instrument
          print(f"[INFO] Generating data for the label {target_label} and with these filepath {filepath_txt}")
          annotation = annotation_file2df(filepath_txt)
          # drop overlapping labels and transform to OBJECTIVE LABEL VS OTHER DATA
          annotation = drop_close_obs(annotation = annotation,
                                      target_label = target_label,
                                      seconds_window = config_signal_params["seconds_window"]
                                      )
          wav_path = search_correspondingpath_given_annotation(filepath_txt)
          signal, sampling_rate = librosa.load(wav_path)
          
          # sampling time means the index of when the onset occurs
          annotation["sampling_time"] = annotation["time"].astype("float64")*sampling_rate # when the sample occurs
          annotation["sampling_time"] = annotation["sampling_time"].astype("int64")
          # to do the +-1 zero
          songs_stfts,labels = song2taggedbars(annotation,
                                              signal,
                                              sampling_rate,
                                              secondswindow=config_signal_params["seconds_window"],
                                              desired_signal_size=config_signal_params["desired_signal_size_for_padding"],
                                              stride=config_signal_params["hop_size"],
                                              window_size = config_signal_params["n_fft"],
                                              verbose=0
                                            )
          print("[INFO] Song Signal length:",len(signal),"; Sampling rate:",sampling_rate, "; Sound shape:",songs_stfts[0].shape)
          #configure paths to save it with the proper names
          outfile_npy_path = os.path.basename(filepath_txt).replace(".txt",f"__{target_label}.npy")
          outfile_txt_path = os.path.basename(filepath_txt).replace(".txt",f"__{target_label}.txt")
          outfile_npy_path = os.path.join(eval("EXP_PIPE_DATA_"+dataset_type.upper() ), outfile_npy_path)
          outfile_txt_path = os.path.join(eval("EXP_PIPE_DATA_"+dataset_type.upper() ), outfile_txt_path)

          # save files
          with open(outfile_npy_path,"wb") as f:
            np.save(f,songs_stfts)
          with open(outfile_txt_path,"w") as f:
            labels_joined = "\n".join(labels)
            f.write(labels_joined)


Run the whole pipeline for the train dataset and the test datastet respectivey

In [None]:
run_fullpipeline(dataset_type = "train")

[INFO] Generating data for the label KD and with these filepath /content/drive/My Drive/Maestria DM y KDD/Especializacion tesis/MDBDrums/MDB Drums/annotations/class/train/MusicDelta_Disco_class.txt
[INFO] Song Signal length: 2751398 ; Sampling rate: 22050 ; Sound shape: (513, 17)
[INFO] Generating data for the label SD and with these filepath /content/drive/My Drive/Maestria DM y KDD/Especializacion tesis/MDBDrums/MDB Drums/annotations/class/train/MusicDelta_Disco_class.txt
[INFO] Song Signal length: 2751398 ; Sampling rate: 22050 ; Sound shape: (513, 17)
[INFO] Generating data for the label HH and with these filepath /content/drive/My Drive/Maestria DM y KDD/Especializacion tesis/MDBDrums/MDB Drums/annotations/class/train/MusicDelta_Disco_class.txt
[INFO] Song Signal length: 2751398 ; Sampling rate: 22050 ; Sound shape: (513, 17)
[INFO] Generating data for the label TT and with these filepath /content/drive/My Drive/Maestria DM y KDD/Especializacion tesis/MDBDrums/MDB Drums/annotation

In [None]:
run_fullpipeline(dataset_type = "test")

[INFO] Generating data for the label KD and with these filepath /content/drive/My Drive/Maestria DM y KDD/Especializacion tesis/MDBDrums/MDB Drums/annotations/class/test/MusicDelta_Hendrix_class.txt
[INFO] Song Signal length: 437559 ; Sampling rate: 22050 ; Sound shape: (513, 17)
[INFO] Generating data for the label SD and with these filepath /content/drive/My Drive/Maestria DM y KDD/Especializacion tesis/MDBDrums/MDB Drums/annotations/class/test/MusicDelta_Hendrix_class.txt
[INFO] Song Signal length: 437559 ; Sampling rate: 22050 ; Sound shape: (513, 17)
[INFO] Generating data for the label HH and with these filepath /content/drive/My Drive/Maestria DM y KDD/Especializacion tesis/MDBDrums/MDB Drums/annotations/class/test/MusicDelta_Hendrix_class.txt
[INFO] Song Signal length: 437559 ; Sampling rate: 22050 ; Sound shape: (513, 17)
[INFO] Generating data for the label TT and with these filepath /content/drive/My Drive/Maestria DM y KDD/Especializacion tesis/MDBDrums/MDB Drums/annotation