# Overview

Magenta's work is going to be the key to our drum generation. Specifically, their "GrooVAE" project uses neural networks to model the "groove" of human performance. Their models have different types of functionality, but we are interested in their "Drumify" or "Tap2Drum" functionality. This has the ability to go from a tapped rhythm to a full drum track. The "tapped rhythm" can be literally just taps, or it can be extracted from the track of another instrument. For us, we will be extracting a "tapped" rhythm from a guitar or bass track.

The following notebook demonstrates 2 key things:

1) Magenta's drumify functionality. The ability to use a PRETRAINED model to generate a drum track based on the groove / rhythm of an input track.

2) How to convert from audio (WAV file) to what I am calling "rhythm-MIDI". This is basically just MIDI data but the pitch information is irrelevant. Timing and rhythm information are all that matter. Magenta accomplishes this using a python library "Librosa". This library allows them to extract "onset", "offset", and "velocity" information for each note in WAV file. This is all the rhythmic information that is required for their models.

Disclaimer: most of this code is copied directly from Magenta's own notebook that demonstrates their GrooVAE model (https://colab.research.google.com/github/tensorflow/magenta-demos/blob/master/colab-notebooks/GrooVAE.ipynb). I have simply condensed it to purely the code we want. 

NOTE: Looks like Magenta wrote their notebook on "Google Colab" which extends the functionality of Jupyter notebook. Allows you to set up the entire environment in a web browser as opposed to Jupyter which expects the Python environment to be active on your local machine. I am going to stick with Jupyter for now, so our steps will be slightly more involved to get the environment running locally. 

## Still TODO 

1) Need to verify these Magenta models can be used with the USB Coral Accelerator. Some translation required from TF to TF-Lite? 

2) The pretrained models are good, but I think we can do better. We should train our own models using the Expanded Groove Dataset. Just the added data should be an increase in performance. Additionally, we can train models on specific genres and see how well they model characteristics of a single genre rather than generic drum performances.

## Environment Setup

As mentioned in a Note above, this notebook works with the standard Jupyter workflow: Python environment is acivated on the local machine to expose necessary libraries for this notebook. Google's notebook works in Colab, slightly different, where the entire environment is loaded in the browser.

For the following code to work in a Jupyter notebook, I had to do the following steps from an Anaconda Prompt:

1) Activate the BB Environment. `conda activate BB`

2) Install magenta `pip install magenta --user`. The user flag allows the `llvmlite` library to get uninstalled. Not quite sure why its necessary, but it is.

3) Try 'pip show tensorflow'. If nothing shows up, you need to install tensorflow: `pip install tensorflow`

4) We have the necessary library support, but now we need Magenta's models. If you have the latest repo, there should be a `ML/model_checkpoints/` directory with a `groovae_2bar_tap_fixed_velocity.tar` file. DO NOT UNZIP. Tensorflow handles unzipping internally. If the zip file is not present, you can download it from https://github.com/magenta/magenta/tree/master/magenta/models/music_vae

NOTE: When running the cells, I get lots of Tensorflow warnings. This is ok. We may want to investigate and fix the deprecated code, but the code in this notebook should still run.



In [2]:
#@title Setup Environment and Define all helper functionality
import tensorflow_datasets as tfds
import tensorflow as tf

import copy, warnings, librosa, numpy as np
warnings.filterwarnings("ignore", category=DeprecationWarning)


# Colab/Notebook specific stuff
import IPython.display
from IPython.display import Audio

import magenta
# Magenta specific stuff
from magenta.models.music_vae import configs
from magenta.models.music_vae.trained_model import TrainedModel
from magenta.models.music_vae import data
import note_seq
from note_seq import midi_synth
from note_seq.sequences_lib import concatenate_sequences
from note_seq.protobuf import music_pb2

# Define some functions

# If a sequence has notes at time before 0.0, scootch them up to 0
def start_notes_at_0(s):
  for n in s.notes:
    if n.start_time < 0:
      n.end_time -= n.start_time
      n.start_time = 0
  return s

def play(note_sequence, sf2_path='Standard_Drum_Kit.sf2'):  
  if sf2_path:
    audio_seq = midi_synth.fluidsynth(start_notes_at_0(note_sequence), sample_rate=44100, sf2_path=sf2_path)
    IPython.display.display(IPython.display.Audio(audio_seq, rate=44100))
  else:
    note_seq.play_sequence(start_notes_at_0(note_sequence), synth=note_seq.fluidsynth)

# Some midi files come by default from different instrument channels
# Quick and dirty way to set midi files to be recognized as drums
def set_to_drums(ns):
  for n in ns.notes:
    n.instrument=9
    n.is_drum = True
    
def unset_to_drums(ns):
  for note in ns.notes:
    note.is_drum=False
    note.instrument=0
  return ns

# quickly change the tempo of a midi sequence and adjust all notes
def change_tempo(note_sequence, new_tempo):
  new_sequence = copy.deepcopy(note_sequence)
  ratio = note_sequence.tempos[0].qpm / new_tempo
  for note in new_sequence.notes:
    note.start_time = note.start_time * ratio
    note.end_time = note.end_time * ratio
  new_sequence.tempos[0].qpm = new_tempo
  return new_sequence

def download(note_sequence, filename):
  note_seq.sequence_proto_to_midi_file(note_sequence, filename)
  files.download(filename)
  
def download_audio(audio_sequence, filename, sr):
  librosa.output.write_wav(filename, audio_sequence, sr=sr, norm=True)
  files.download(filename)
 
# Load some configs to be used later
dc_quantize = configs.CONFIG_MAP['groovae_2bar_humanize'].data_converter
dc_tap = configs.CONFIG_MAP['groovae_2bar_tap_fixed_velocity'].data_converter
dc_hihat = configs.CONFIG_MAP['groovae_2bar_add_closed_hh'].data_converter
dc_4bar = configs.CONFIG_MAP['groovae_4bar'].data_converter

# quick method for removing microtiming and velocity from a sequence
def get_quantized_2bar(s, velocity=0):
  new_s = dc_quantize.from_tensors(dc_quantize.to_tensors(s).inputs)[0]
  new_s = change_tempo(new_s, s.tempos[0].qpm)
  if velocity != 0:
    for n in new_s.notes:
      n.velocity = velocity
  return new_s

# quick method for turning a drumbeat into a tapped rhythm
def get_tapped_2bar(s, velocity=85, ride=False):
  new_s = dc_tap.from_tensors(dc_tap.to_tensors(s).inputs)[0]
  new_s = change_tempo(new_s, s.tempos[0].qpm)
  if velocity != 0:
    for n in new_s.notes:
      n.velocity = velocity
  if ride:
    for n in new_s.notes:
      n.pitch = 42
  return new_s

# quick method for removing hi-hats from a sequence
def get_hh_2bar(s):
  new_s = dc_hihat.from_tensors(dc_hihat.to_tensors(s).inputs)[0]
  new_s = change_tempo(new_s, s.tempos[0].qpm)
  return new_s


# Calculate quantization steps but do not remove microtiming
def quantize(s, steps_per_quarter=4):
  return note_seq.sequences_lib.quantize_note_sequence(s,steps_per_quarter)

# Destructively quantize a midi sequence
def flatten_quantization(s):
  beat_length = 60. / s.tempos[0].qpm
  step_length = beat_length / 4#s.quantization_info.steps_per_quarter
  new_s = copy.deepcopy(s)
  for note in new_s.notes:
    note.start_time = step_length * note.quantized_start_step
    note.end_time = step_length * note.quantized_end_step
  return new_s

# Calculate how far off the beat a note is
def get_offset(s, note_index):
  q_s = flatten_quantization(quantize(s))
  true_onset = s.notes[note_index].start_time
  quantized_onset = q_s.notes[note_index].start_time
  diff = quantized_onset - true_onset
  beat_length = 60. / s.tempos[0].qpm
  step_length = beat_length / 4#q_s.quantization_info.steps_per_quarter
  offset = diff/step_length
  return offset

def is_4_4(s):
  ts = s.time_signatures[0]
  return (ts.numerator == 4 and ts.denominator ==4)

def preprocess_4bar(s):
  return dc_4bar.from_tensors(dc_4bar.to_tensors(s).outputs)[0]

def preprocess_2bar(s):
  return dc_quantize.from_tensors(dc_quantize.to_tensors(s).outputs)[0]

def _slerp(p0, p1, t):
  """Spherical linear interpolation."""
  omega = np.arccos(np.dot(np.squeeze(p0/np.linalg.norm(p0)),
    np.squeeze(p1/np.linalg.norm(p1))))
  so = np.sin(omega)
  return np.sin((1.0-t)*omega) / so * p0 + np.sin(t*omega)/so * p1

def mix_tracks(y1, y2, stereo = False):
  l = max(len(y1),len(y2))
  y1 = librosa.util.fix_length(y1, l)
  y2 = librosa.util.fix_length(y2, l)
  
  if stereo:
    return np.vstack([y1, y2])  
  else:
    return y1+y2

def make_click_track(s):
  last_note_time = max([n.start_time for n in s.notes])
  beat_length = 60. / s.tempos[0].qpm 
  i = 0
  times = []
  while i*beat_length < last_note_time:
    times.append(i*beat_length)
    i += 1
  return librosa.clicks(times)

def drumify(s, model, temperature=1.0): 
  encoding, mu, sigma = model.encode([s])
  decoded = model.decode(encoding, length=32, temperature=temperature)
  return decoded[0]

def combine_sequences(seqs):
  # assumes a list of 2 bar seqs with constant tempo
  for i, seq in enumerate(seqs):
    shift_amount = i*(60 / seqs[0].tempos[0].qpm * 4 * 2)
    if shift_amount > 0:
      seqs[i] = note_seq.sequences_lib.shift_sequence_times(seq, shift_amount)
  return note_seq.sequences_lib.concatenate_sequences(seqs)

def combine_sequences_with_lengths(sequences, lengths):
  seqs = copy.deepcopy(sequences)
  total_shift_amount = 0
  for i, seq in enumerate(seqs):
    if i == 0:
      shift_amount = 0
    else:
      shift_amount = lengths[i-1]
    total_shift_amount += shift_amount
    if total_shift_amount > 0:
      seqs[i] = note_seq.sequences_lib.shift_sequence_times(seq, total_shift_amount)
  combined_seq = music_pb2.NoteSequence()
  for i in range(len(seqs)):
    tempo = combined_seq.tempos.add()
    tempo.qpm = seqs[i].tempos[0].qpm
    tempo.time = sum(lengths[0:i-1])
    for note in seqs[i].notes:
      combined_seq.notes.extend([copy.deepcopy(note)])
  return combined_seq

def get_audio_start_time(y, sr):
  tempo, beat_frames = librosa.beat.beat_track(y=y, sr=sr)
  beat_times = librosa.frames_to_time(beat_frames, sr=sr)
  onset_times = librosa.onset.onset_detect(y, sr, units='time')
  start_time = onset_times[0] 
  return start_time

def audio_tap_to_note_sequence(f, velocity_threshold=30):
  y, sr = librosa.load(f)
  # pad the beginning to avoid errors with onsets right at the start
  y = np.concatenate([np.zeros(1000),y])
  tempo, beat_frames = librosa.beat.beat_track(y=y, sr=sr)
  # try to guess reasonable tempo
  beat_times = librosa.frames_to_time(beat_frames, sr=sr)
  onset_frames = librosa.onset.onset_detect(y, sr, units='frames')
  onset_times = librosa.onset.onset_detect(y, sr, units='time')
  start_time = onset_times[0]
  onset_strengths = librosa.onset.onset_strength(y, sr)[onset_frames]
  normalized_onset_strengths = onset_strengths / np.max(onset_strengths)
  onset_velocities = np.int32(normalized_onset_strengths * 127)
  note_sequence = music_pb2.NoteSequence()
  note_sequence.tempos.add(qpm=tempo)
  for onset_vel, onset_time in zip(onset_velocities, onset_times):
    if onset_vel > velocity_threshold and onset_time >= start_time:  # filter quietest notes
      note_sequence.notes.add(
        instrument=9, pitch=42, is_drum=True,
        velocity=onset_vel,  # use fixed velocity here to avoid overfitting
        start_time=onset_time - start_time,
        end_time=onset_time - start_time)

  return note_sequence

# Allow encoding of a sequence that has no extracted examples
# by adding a quiet note after the desired length of time
def add_silent_note(note_sequence, num_bars):
  tempo = note_sequence.tempos[0].qpm
  length = 60/tempo * 4 * num_bars
  note_sequence.notes.add(
    instrument=9, pitch=42, velocity=0, start_time=length-0.02, 
    end_time=length-0.01, is_drum=True)
  
def get_bar_length(note_sequence):
  tempo = note_sequence.tempos[0].qpm
  return 60/tempo * 4

def sequence_is_shorter_than_full(note_sequence):
  return note_sequence.notes[-1].start_time < get_bar_length(note_sequence)

def get_rhythm_elements(y, sr):
  onset_env = librosa.onset.onset_strength(y, sr=sr)
  tempo = librosa.beat.tempo(onset_envelope=onset_env, max_tempo=180)[0]
  onset_times = librosa.onset.onset_detect(y, sr, units='time')
  onset_frames = librosa.onset.onset_detect(y, sr, units='frames')
  onset_strengths = librosa.onset.onset_strength(y, sr)[onset_frames]
  normalized_onset_strengths = onset_strengths / np.max(onset_strengths)
  onset_velocities = np.int32(normalized_onset_strengths * 127)

  return tempo, onset_times, onset_frames, onset_velocities

def make_tap_sequence(tempo, onset_times, onset_frames, onset_velocities,
                     velocity_threshold, start_time, end_time):
  note_sequence = music_pb2.NoteSequence()
  note_sequence.tempos.add(qpm=tempo)
  for onset_vel, onset_time in zip(onset_velocities, onset_times):
    if onset_vel > velocity_threshold and onset_time >= start_time and onset_time < end_time:  # filter quietest notes
      note_sequence.notes.add(
        instrument=9, pitch=42, is_drum=True,
        velocity=onset_vel,  # model will use fixed velocity here
        start_time=onset_time - start_time,
        end_time=onset_time -start_time + 0.01
      )
  return note_sequence

# !!!!! TRANSLATE DIRECTLY FROM AUDIO TO DRUMS !!!!!
def audio_to_drum(f, velocity_threshold=30, temperature=1., force_sync=False, start_windows_on_downbeat=False):
  y, sr = librosa.load(f)
  # pad the beginning to avoid errors with onsets right at the start
  y = np.concatenate([np.zeros(1000),y])

  clip_length = float(len(y))/sr

  tap_sequences = []
  # Loop through the file, grabbing 2-bar sections at a time, estimating
  # tempos along the way to try to handle tempo variations

  tempo, onset_times, onset_frames, onset_velocities = get_rhythm_elements(y, sr)

  initial_start_time = onset_times[0]

  start_time = onset_times[0]
  beat_length = 60/tempo
  two_bar_length = beat_length * 8
  end_time = start_time + two_bar_length

  start_times = []
  lengths = []
  tempos = []

  start_times.append(start_time)
  lengths.append(end_time-start_time)
  tempos.append(tempo)

  tap_sequences.append(make_tap_sequence(tempo, onset_times, onset_frames, 
                       onset_velocities, velocity_threshold, start_time, end_time))

  start_time += two_bar_length; end_time += two_bar_length


  while start_time < clip_length:
    start_sample = int(librosa.core.time_to_samples(start_time, sr=sr))
    end_sample = int(librosa.core.time_to_samples(start_time + two_bar_length, sr=sr))
    current_section = y[start_sample:end_sample]
    tempo = librosa.beat.tempo(onset_envelope=librosa.onset.onset_strength(current_section, sr=sr), max_tempo=180)[0]

    beat_length = 60/tempo
    two_bar_length = beat_length * 8

    end_time = start_time + two_bar_length

    start_times.append(start_time)
    lengths.append(end_time-start_time)
    tempos.append(tempo)

    tap_sequences.append(make_tap_sequence(tempo, onset_times, onset_frames, 
                         onset_velocities, velocity_threshold, start_time, end_time))

    start_time += two_bar_length; end_time += two_bar_length
  
  # if there's a long gap before the first note, back it up close to 0
  def _shift_notes_to_beginning(s):
    start_time = s.notes[0].start_time
    if start_time > 0.1:
      for n in s.notes:
        n.start_time -= start_time
        n.end_time -=start_time
    return start_time
      
  def _shift_notes_later(s, start_time):
    for n in s.notes:
      n.start_time += start_time
      n.end_time +=start_time    
  
  def _sync_notes_with_onsets(s, onset_times):
    for n in s.notes:
      n_length = n.end_time - n.start_time
      closest_onset_index = np.argmin(np.abs(n.start_time - onset_times))
      n.start_time = onset_times[closest_onset_index]
      n.end_time = n.start_time + n_length
  
  drum_seqs = []
  for s in tap_sequences:
    try:
      if sequence_is_shorter_than_full(s):
        add_silent_note(s, 2)
        
      if start_windows_on_downbeat:
        note_start_time = _shift_notes_to_beginning(s)
      h = drumify(s, groovae_2bar_tap, temperature=temperature)
      h = change_tempo(h, s.tempos[0].qpm)
      
      if start_windows_on_downbeat and note_start_time > 0.1:
          _shift_notes_later(s, note_start_time)
        
      drum_seqs.append(h)
    except:
      continue  
      
  combined_tap_sequence = start_notes_at_0(combine_sequences_with_lengths(tap_sequences, lengths))
  combined_drum_sequence = start_notes_at_0(combine_sequences_with_lengths(drum_seqs, lengths))
  
  if force_sync:
    _sync_notes_with_onsets(combined_tap_sequence, onset_times)
    _sync_notes_with_onsets(combined_drum_sequence, onset_times)
  
  full_tap_audio = librosa.util.normalize(midi_synth.fluidsynth(combined_tap_sequence, sample_rate=sr))
  full_drum_audio = librosa.util.normalize(midi_synth.fluidsynth(combined_drum_sequence, sample_rate=sr))
  
  tap_and_onsets = mix_tracks(full_tap_audio, y[int(initial_start_time*sr):]/2, stereo=True)
  drums_and_original = mix_tracks(full_drum_audio, y[int(initial_start_time*sr):]/2, stereo=True)
  
  return full_drum_audio, full_tap_audio, tap_and_onsets, drums_and_original, combined_drum_sequence


In [3]:
#@title Load model checkpoint
GROOVAE_2BAR_TAP_FIXED_VELOCITY = "model_checkpoints/groovae_2bar_tap_fixed_velocity.tar"
config_2bar_tap = configs.CONFIG_MAP['groovae_2bar_tap_fixed_velocity']
groovae_2bar_tap = TrainedModel(config_2bar_tap, 1, checkpoint_dir_or_path=GROOVAE_2BAR_TAP_FIXED_VELOCITY)

INFO:tensorflow:Building MusicVAE model with BidirectionalLstmEncoder, GrooveLstmDecoder, and hparams:
{'max_seq_len': 32, 'z_size': 256, 'free_bits': 48, 'max_beta': 0.2, 'beta_rate': 0.0, 'batch_size': 1, 'grad_clip': 1.0, 'clip_mode': 'global_norm', 'grad_norm_clip_to_zero': 10000, 'learning_rate': 0.001, 'decay_rate': 0.9999, 'min_learning_rate': 1e-05, 'conditional': True, 'dec_rnn_size': [256, 256], 'enc_rnn_size': [512], 'dropout_keep_prob': 0.3, 'sampling_schedule': 'constant', 'sampling_rate': 0.0, 'use_cudnn': False, 'residual_encoder': False, 'residual_decoder': False, 'control_preprocessing_rnn_size': [256]}
INFO:tensorflow:
Encoder Cells (bidirectional):
  units: [512]

INFO:tensorflow:
Decoder Cells:
  units: [256, 256]





Instructions for updating:
Use `tf.cast` instead.
Instructions for updating:
Please use `keras.layers.Bidirectional(keras.layers.RNN(cell))`, which is equivalent to this API
Instructions for updating:
Please use `keras.layers.RNN(cell)`, which is equivalent to this API
Instructions for updating:
Do not pass `graph_parents`.  They will  no longer be used.
INFO:tensorflow:Unbundling checkpoint.
INFO:tensorflow:Restoring parameters from C:\Users\RYANHE~1\AppData\Local\Temp\tmpsr3yty4c\groovae_2bar_tap_fixed_velocity/model.ckpt-3668


In [20]:
#@title Audio Taps --> Beats
#paths = ['../Data/no_joe_tapped.wav', '../Data/ryan_is_no_joe.wav', '../Data/groove_dataset/drummer1/session1/1_funk_80_beat_4-4.wav']
paths = ['../Data/basic_plain.wav', '../Data/basic_compressed.wav', '../Data/basic_compressed_midscoop.wav']
# NOTE: PARAM TAGS DON'T WORK IN NOTEBOOK (ONLY COLAB)
temperature = 0.1 #@param {type:"slider", min:0.01, max:2.0, step:0.01}
velocity_threshold = 0.08 #@param {type:"slider", min:0, max:1, step:0.01}
stereo = False #@param {type:"boolean"}

new_beats = []
new_drum_audios = []
combined_audios = []

for i in range(len(paths)):
    f = paths[i]
    print("\n\n\nPlaying %s: " %(f))
    # LOAD WAV FILE WITH LIBROSA
    y,sr = librosa.load(paths[i])
    print("Sampling rate : ", sr)
    IPython.display.display(IPython.display.Audio(y, rate=sr))
    # "TRANSLATE" DIRECTLY FROM AUDIO TO NEW DRUM TRACK
    full_drum_audio, full_tap_audio, tap_and_onsets, drums_and_original, combined_drum_sequence = audio_to_drum(f, velocity_threshold=velocity_threshold, temperature=temperature)
    new_beats.append(combined_drum_sequence)
    new_drum_audios.append(full_drum_audio)
    combined_audios.append(drums_and_original)
    print("Playing the rhythm detected in %s: " %(f))
    IPython.display.display(IPython.display.Audio(full_tap_audio, rate=sr))
    print("Playing drums generated from %s: " %(f))
    IPython.display.display(IPython.display.Audio(full_drum_audio, rate=sr))
    print("Playing %s together with drums" %(f))
    IPython.display.display(IPython.display.Audio(drums_and_original, rate=sr))




Playing ../Data/basic_plain.wav: 
Sampling rate :  22050


Playing the rhythm detected in ../Data/basic_plain.wav: 


Playing drums generated from ../Data/basic_plain.wav: 


Playing ../Data/basic_plain.wav together with drums





Playing ../Data/basic_compressed.wav: 
Sampling rate :  22050


Playing the rhythm detected in ../Data/basic_compressed.wav: 


Playing drums generated from ../Data/basic_compressed.wav: 


Playing ../Data/basic_compressed.wav together with drums





Playing ../Data/basic_compressed_midscoop.wav: 
Sampling rate :  22050


Playing the rhythm detected in ../Data/basic_compressed_midscoop.wav: 


Playing drums generated from ../Data/basic_compressed_midscoop.wav: 


Playing ../Data/basic_compressed_midscoop.wav together with drums
