In [None]:
!sudo apt install -y fluidsynth
!pip install --upgrade pyfluidsynth
!pip install pretty_midi
!pip install midi2audio
!pip install pydub

Reading package lists... Done
Building dependency tree... Done
Reading state information... Done
The following additional packages will be installed:
  fluid-soundfont-gm libevdev2 libfluidsynth3 libgudev-1.0-0 libinput-bin
  libinput10 libinstpatch-1.0-2 libmd4c0 libmtdev1 libqt5core5a libqt5dbus5
  libqt5gui5 libqt5network5 libqt5svg5 libqt5widgets5 libwacom-bin
  libwacom-common libwacom9 libxcb-icccm4 libxcb-image0 libxcb-keysyms1
  libxcb-render-util0 libxcb-util1 libxcb-xinerama0 libxcb-xinput0 libxcb-xkb1
  libxkbcommon-x11-0 qsynth qt5-gtk-platformtheme qttranslations5-l10n
  timgm6mb-soundfont
Suggested packages:
  fluid-soundfont-gs qt5-image-formats-plugins qtwayland5 jackd
The following NEW packages will be installed:
  fluid-soundfont-gm fluidsynth libevdev2 libfluidsynth3 libgudev-1.0-0
  libinput-bin libinput10 libinstpatch-1.0-2 libmd4c0 libmtdev1 libqt5core5a
  libqt5dbus5 libqt5gui5 libqt5network5 libqt5svg5 libqt5widgets5 libwacom-bin
  libwacom-common libwacom9 libx

In [None]:
import torchaudio.transforms as transforms
import os
import re
import tensorflow as tf
import pathlib
import glob
import fluidsynth
from midi2audio import FluidSynth
import pretty_midi
from pydub import AudioSegment

In [None]:
def augment_pitch(pm, augment_dir, file_name, pitch_shift):
    # Apply pitch transposition

    print(f"Applying pitch shift of {pitch_shift} semitones.")
    for instrument in pm.instruments:
        for note in instrument.notes:
            note.pitch = max(0, min(127, note.pitch + pitch_shift))  # Ensure pitch is within MIDI range

    augmented_file_path = pitch_dir + f"{file_name}_pitch_{pitch_shift}.mid"

    pm.write(str(augmented_file_path))
    wav_file_path = str(augment_dir + f"{file_name}_pitch_{pitch_shift}.wav")
    FluidSynth().midi_to_audio(augmented_file_path, wav_file_path)
    clip_length(wav_file_path)

    return augmented_file_path, wav_file_path, pitch_shift

def augment_vol(pm, augment_dir, file_name, vol_shift):
  # Apply pitch transposition

  print(f"Applying volume shift of {vol_shift} velocity.")
  for instrument in pm.instruments:
      for note in instrument.notes:
          note.velocity = max(1, min(127, note.velocity + vol_shift))

  augmented_file_path = vol_dir + f"{file_name}_vol_{vol_shift}.mid"

  pm.write(str(augmented_file_path))
  wav_file_path = str(augment_dir + f"{file_name}_vol_{vol_shift}.wav")
  FluidSynth().midi_to_audio(augmented_file_path, wav_file_path)
  clip_length(wav_file_path)

  return augmented_file_path, wav_file_path, vol_shift


def augment_tempo(pm, augment_dir, file_name, tempo_shift):
  # Apply tempo transformation

  print(f"Applying tempo shift of {tempo_shift} tempo.")
  for intrument in pm.instruments:
    for note in intrument.notes:
      note.start *= tempo_shift
      note.end *= tempo_shift

  augmented_file_path = tempo_dir + f"{file_name}_tempo_{tempo_shift}.mid"
  pm.write(str(augmented_file_path))
  wav_file_path = str(augment_dir + f"{file_name}_tempo_{tempo_shift}.wav")
  FluidSynth().midi_to_audio(augmented_file_path, wav_file_path)
  clip_length(wav_file_path)

  return augmented_file_path, wav_file_path, tempo_shift



In [None]:
# ALTENRATIVE DATASET Lakh MIDI data layout
data_matched_dir = pathlib.Path('data/LakhMIDI')
if not data_matched_dir.exists():
  tf.keras.utils.get_file(
      'lmd_matched.tar.gz',
      origin='http://hog.ee.columbia.edu/craffel/lmd/lmd_matched.tar.gz',
      extract=True,
      cache_dir='.', cache_subdir='data',
  )

data_allgined_dir = pathlib.Path('data/LakhMIDI')
if not data_allgined_dir.exists():
  tf.keras.utils.get_file(
      'lmd_aligned.tar.gz',
      origin='http://hog.ee.columbia.edu/craffel/lmd/lmd_aligned.tar.gz',
      extract=True,
      cache_dir='.', cache_subdir='data',
  )

data_matchscores_dir = pathlib.Path('data/LakhMIDI')
if not data_allgined_dir.exists():
  tf.keras.utils.get_file(
      'match_scores.json',
      origin='http://hog.ee.columbia.edu/craffel/lmd/match_scores.json',
      extract=True,
      cache_dir='.', cache_subdir='data',
  )

# ALTENRATIVE DATASET Lakh MIDI
import json
import os

# Local path constants
DATA_PATH = "data"
RESULTS_PATH = 'data'
# Path to the file match_scores.json distributed with the LMD
score_file = 'data/match_scores.json'

# Utility functions for retrieving paths
def msd_id_to_dirs(msd_id):
    """Given an MSD ID, generate the path prefix.
    E.g. TRABCD12345678 -> A/B/C/TRABCD12345678"""
    return os.path.join(msd_id[2], msd_id[3], msd_id[4], msd_id)

def msd_id_to_mp3(msd_id):
    """Given an MSD ID, return the path to the corresponding mp3"""
    return os.path.join(DATA_PATH, 'msd', 'mp3',
                        msd_id_to_dirs(msd_id) + '.mp3')

def msd_id_to_h5(h5):
    """Given an MSD ID, return the path to the corresponding h5"""
    return os.path.join(RESULTS_PATH, 'lmd_matched_h5',
                        msd_id_to_dirs(msd_id) + '.h5')

def get_midi_path(msd_id, midi_md5, kind):
    """Given an MSD ID and MIDI MD5, return path to a MIDI file.
    kind should be one of 'matched' or 'aligned'. """
    return os.path.join('data', 'lmd_{}'.format(kind), # line might be incorrect, change later after some testing
                        msd_id_to_dirs(msd_id), midi_md5 + '.mid')

# Load the match scores to find all aligned MIDI files
def load_aligned_midi_paths(score_file, kind='aligned'):
    with open(score_file, 'r') as file:
        scores = json.load(file)

    midi_paths = []
    for msd_id, matches in scores.items():
        for midi_md5 in matches:
            midi_path = get_midi_path(msd_id, midi_md5, kind)
            midi_paths.append(midi_path)

    return midi_paths

aligned_midi_files = load_aligned_midi_paths(score_file, 'aligned')
print(aligned_midi_files)

Downloading data from http://hog.ee.columbia.edu/craffel/lmd/lmd_matched.tar.gz
Downloading data from http://hog.ee.columbia.edu/craffel/lmd/lmd_aligned.tar.gz
Downloading data from http://hog.ee.columbia.edu/craffel/lmd/match_scores.json


IOPub data rate exceeded.
The notebook server will temporarily stop sending output
to the client in order to avoid crashing it.
To change this limit, set the config variable
`--NotebookApp.iopub_data_rate_limit`.

Current values:
NotebookApp.iopub_data_rate_limit=1000000.0 (bytes/sec)
NotebookApp.rate_limit_window=3.0 (secs)



In [None]:
print('Number of files:', len(aligned_midi_files))

Number of files: 116189


In [None]:
gen_folder_path = r'drive/MyDrive/413/generated'
continuation_files = os.listdir(gen_folder_path)
orig_folder_path = r'drive/MyDrive/413/original/content/augmented/'
pitch_dir = r'drive/MyDrive/413/original/content/augmented/pitch/'
vol_dir = r'drive/MyDrive/413/original/content/augmented/volume/'
tempo_dir = r'drive/MyDrive/413/original/content/augmented/tempo/'
# Print the list of files
print("List of files in the folder:")

def clip_length(file, seconds=5):
  audio = AudioSegment.from_wav(file)
  if len(audio) > seconds * 1000:
    audio = audio[:seconds * 1000]
    audio.export(file, format='wav')


def get_number_of_file(filename):
    filename_parts = filename.split('_')
    match = re.search(r'_(\d+)_', filename)
    if match:
        return int(match.group(1)), filename_parts
    else:
        assert(False)
        return None

def save_as_wav(midi_file, file_name):
    wav_file_path = orig_folder_path + 'original/' + f"{file_name}.wav"
    FluidSynth().midi_to_audio(midi_file, wav_file_path)
    return wav_file_path


def make_required_augmentation(filename, name_parts):
    file_name = 'file_' + name_parts[1]
    the_file = aligned_midi_files[int(name_parts[1])]

    pm = pretty_midi.PrettyMIDI(the_file)
    if name_parts[2] == 'pitch':
        augment_pitch(pm, pitch_dir, file_name, int(name_parts[3]))
        pass
    elif name_parts[2] == 'original':
        original_wav_path = save_as_wav(the_file, file_name)
        pass
    elif name_parts[2] == 'vol':
        augment_vol(pm, vol_dir, file_name, int(name_parts[3]))
        pass
    elif name_parts[2] == 'tempo':
        augment_tempo(pm, tempo_dir, file_name, float(name_parts[3]))
    else:
        assert(False)

for file in continuation_files:
    print(file)
    num, parts = get_number_of_file(file)
    print(num)
    print(parts)
    if num > 60:
      make_required_augmentation(file, parts)



List of files in the folder:
file_0_pitch_{pitch_shift}_output.wav
0
['file', '0', 'pitch', '{pitch', 'shift}', 'output.wav']
file_35_tempo_{tempo_shift}_output.wav
35
['file', '35', 'tempo', '{tempo', 'shift}', 'output.wav']
file_26_tempo_{tempo_shift}_output.wav
26
['file', '26', 'tempo', '{tempo', 'shift}', 'output.wav']
file_39_pitch_{pitch_shift}_output.wav
39
['file', '39', 'pitch', '{pitch', 'shift}', 'output.wav']
file_1_tempo_{tempo_shift}_output.wav
1
['file', '1', 'tempo', '{tempo', 'shift}', 'output.wav']
file_34_pitch_{pitch_shift}_output.wav
34
['file', '34', 'pitch', '{pitch', 'shift}', 'output.wav']
file_47_pitch_{pitch_shift}_output.wav
47
['file', '47', 'pitch', '{pitch', 'shift}', 'output.wav']
file_13_vol_{vol_shift}_output.wav
13
['file', '13', 'vol', '{vol', 'shift}', 'output.wav']
file_14_vol_{vol_shift}_output.wav
14
['file', '14', 'vol', '{vol', 'shift}', 'output.wav']
file_7_tempo_{tempo_shift}_output.wav
7
['file', '7', 'tempo', '{tempo', 'shift}', 'output.wa

In [None]:
def get_num_special(filename):
    filename_parts = filename.split('_')
    match = re.search(r'_(\d+)_', filename)
    if match:
        return int(match.group(1)), filename_parts
    else:
        match = re.search(r'_(\d+)\.', filename)
        if match:
            return int(match.group(1)), filename_parts
        else:
          assert(False)
        return None

def find_corresponding_prompt(the_file):
    num, name_parts = get_number_of_file(file)
    # print(name_parts)
    if name_parts[2] == 'pitch':
        look_in = pitch_dir
    elif name_parts[2] == 'original':
        look_in = r'drive/MyDrive/413/original/content/augmented/original/'
    elif name_parts[2] == 'vol':
        look_in = vol_dir
    elif name_parts[2] == 'tempo':
        look_in = tempo_dir
    else:
        assert(False)
    for comparison_file in os.listdir(look_in):
        if get_num_special(comparison_file)[0] == num and comparison_file[-1] == 'v':
          return look_in, comparison_file
    assert(False)


In [None]:
import torchaudio
import torch

def compare(new, old, folder_of_old):
    waveform_old, sample_rate_old = torchaudio.load(folder_of_old + old, normalize=True)
    waveform_new, sample_rate_new = torchaudio.load('drive/MyDrive/413/generated/' + new, normalize=True)
    # print(waveform_old.shape)
    # INPUTS ARE STEREO OUTPUTS ARE MONO, SO WE:
    waveform_old = torch.mean(waveform_old, dim=0, keepdim=True)



    transform_old = transforms.MFCC(sample_rate=sample_rate_old, n_mfcc=13,
                                melkwargs={"n_fft": 400, "hop_length": 160,
                                           "n_mels": 23, "center": False},)
    transform_new = transforms.MFCC(sample_rate=sample_rate_new, n_mfcc=13,
                                melkwargs={"n_fft": 400, "hop_length": 160,
                                           "n_mels": 23, "center": False},)
    mfcc_old = transform_old(waveform_old)
    mfcc_new = transform_new(waveform_new)
    length_old = mfcc_old.shape[2]
    length_new = mfcc_new.shape[2]
    print(length_old)
    print(length_new)
    if length_old > length_new:
        mfcc_old = mfcc_old[:, :, -length_new:]
    else:
        mfcc_new = mfcc_new[:, :, -length_old:]
    euclid_distance = torch.norm(mfcc_new - mfcc_old, p=2)
    return euclid_distance


def print_well(ans):
    for guy in ['pitch', 'vol', 'tempo', 'original']:
      curr_list = ans[guy]
      average = sum(curr_list) / len(curr_list)
      max_value = max(curr_list)
      min_value = min(curr_list)
      print('With augmentation type ' + guy + ' we see minimum difference of ' + str(min_value))
      print('and max difference of ' + str(max_value))
      print('and average value ' + str(average))
      print(' ')
      print(' ')

answer = {'pitch': [], 'vol': [], 'tempo': [], 'original': []}
for i, file in enumerate(continuation_files):
    a = 100*i // len(continuation_files)
    if i % 10 == 0:
      print(f"\r{a}%", end="", flush=True)
    filename_parts = file.split('_')
    # print(filename_parts)
    match_file_folder, match_file = find_corresponding_prompt(file)
    difference = compare(file, match_file, match_file_folder)
    answer[filename_parts[2]].append(difference)

print(f"\r ", end="", flush=True)
print_well(answer)
