<a href="https://colab.research.google.com/github/nielsrolf/ddsp/blob/master/ddsp/colab/experiments/timbre_interpolation.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
#@title #Install and Import

!pip install git+git://github.com/nielsrolf/ddsp &> /dev/null



#@markdown Install ddsp, define some helper functions, and download the model. This transfers a lot of data and _should take a minute or two_.
%tensorflow_version 2.x

# Ignore a bunch of deprecation warnings
import warnings
warnings.filterwarnings("ignore")

import copy
import os
import time

import crepe
import ddsp
import ddsp.training
from ddsp.colab import colab_utils
from ddsp.colab.colab_utils import (
    auto_tune, detect_notes, fit_quantile_transform, 
    get_tuning_factor, download, play, record, 
    specplot, upload, DEFAULT_SAMPLE_RATE)
import gin
from google.colab import files
import librosa
import matplotlib.pyplot as plt
import numpy as np
import pickle
import tensorflow.compat.v2 as tf
import tensorflow_datasets as tfds

# Helper Functions
sample_rate = DEFAULT_SAMPLE_RATE  # 16000


print('Done!')

Done!


In [11]:
#@title #Mount drive or sync s3
import getpass
import os

sync_s3 = True


if sync_s3:
    results_dir = "s3"
    s3_bucket = "s3://niels-warncke-experiments"
    !pip install awscli &> /dev/null
    os.makedirs("/root/.aws", exist_ok=True)
    with open("/root/.aws/credentials", "w") as private_key:
        print("aws_access_key_id")
        private_key.write(f"[default]\naws_access_key_id = {getpass.getpass()}\n")
        print("aws_secret_access_key")
        private_key.write(f"aws_secret_access_key = {getpass.getpass()}\n")
    !aws s3 sync {s3_bucket} {results_dir} &> /dev/null && rm -r /root/.aws
else:
    from google.colab import drive
    drive.mount('/content/drive')
    #@markdown (ex. `/content/drive/MyDrive/...`) Leave blank to skip loading from Drive.
    DRIVE_DIR = 'drive/MyDrive/ddsp' #@param {type: "string"}
    assert os.path.exists(DRIVE_DIR)
    print('Drive Folder Exists:', DRIVE_DIR)
    results_dir = DRIVE_DIR

aws_access_key_id
··········
aws_secret_access_key
··········


In [None]:
#@title Record or Upload Source Audio
#@markdown **Source Audio - we will use this melody and loudness**
#@markdown * Either record audio from microphone or upload audio from file (.mp3 or .wav) 
#@markdown * Audio should be monophonic (single instrument / voice)
#@markdown * Extracts fundmanetal frequency (f0) and loudness features. 

source = "File System"  #@param ["Record", "Upload (.mp3 or .wav)", "Youtube", "File System"]

record_seconds =     5#@param {type:"number", min:1, max:10, step:1}

youtube_url = "https://www.youtube.com/watch?v=XvVmZmMLojc" #@param {type:"string"}

filename = "s3/samples/vozes.mp3" #@param {type:"string"}

if source == "Record":
    audio = record(seconds=record_seconds)
elif source == "Upload":
    # Load audio sample here (.mp3 or .wav3 file)
    # Just use the first file.
    filenames, audios = upload()
    audio = audios[0]
elif source == "Youtube":
    !pip install youtube-dl &> /dev/null
    from uuid import uuid4
    import time
    from glob import glob
    filename = f"{uuid4().hex[:5]}.mp3"
    files = set(glob('*'))
    !youtube-dl --extract-audio {youtube_url} --audio-format mp3
    time.sleep(10)
    filename = (set(glob('*')) - files).pop()
    source = "File System"
if source == "File System":
    !pip install pydub &> /dev/null
    from pydub import AudioSegment
    if filename.endswith(".mp3"):
        song = AudioSegment.from_mp3(filename)
    elif filename.endswith(".wav"):
        song = AudioSegment.from_wav(filename)
    audio = np.array(song.set_frame_rate(sample_rate).get_array_of_samples()).reshape(song.channels, -1, order='F')[0]
    audio = audio / np.max(np.absolute(audio))
audio_src = audio[np.newaxis, :]

play(audio_src, sample_rate=sample_rate)

In [None]:
!cd s3 && unzip mono-instruments.zip

In [None]:
#@markdown **Target Audio - we will use this to extract the timbre**
#@markdown * Either record audio from microphone or upload audio from file (.mp3 or .wav) 
#@markdown * Audio should be monophonic (single instrument / voice)
#@markdown * Extracts fundmanetal frequency (f0) and loudness features. 

source = "File System"  #@param ["Upload (.mp3 or .wav)", "Youtube", "File System"]

youtube_url = "https://www.youtube.com/watch?v=XvVmZmMLojc" #@param {type:"string"}

filename = "s3/mono-instruments/AuSep_1_cl_19_Pavane.wav" #@param {type:"string"}

if source == "Record":
    audio = record(seconds=record_seconds)
elif source == "Upload":
    # Load audio sample here (.mp3 or .wav3 file)
    # Just use the first file.
    filenames, audios = upload()
    audio = audios[0]
elif source == "Youtube":
    !pip install youtube-dl &> /dev/null
    from uuid import uuid4
    import time
    from glob import glob
    filename = f"{uuid4().hex[:5]}.mp3"
    files = set(glob('*'))
    !youtube-dl --extract-audio {youtube_url} --audio-format mp3
    time.sleep(10)
    filename = (set(glob('*')) - files).pop()
    source = "File System"
if source == "File System":
    !pip install pydub &> /dev/null
    from pydub import AudioSegment
    if filename.endswith(".mp3"):
        song = AudioSegment.from_mp3(filename)
    elif filename.endswith(".wav"):
        song = AudioSegment.from_wav(filename)
    audio = np.array(song.set_frame_rate(sample_rate).get_array_of_samples()).reshape(song.channels, -1, order='F')[0]
    audio = audio / np.max(np.absolute(audio))
audio_target = audio[np.newaxis, :]

play(audio_target, sample_rate=sample_rate)

In [None]:
#@title Building the src model
model_dir = os.path.join(results_dir, "models", model)

def load_model_for_audio_size(model_dir, audio):
    start_time = time.time()
    audio_features = ddsp.training.metrics.compute_audio_features(audio)
    audio_features['loudness_db'] = audio_features['loudness_db'].astype(np.float32)
    print('Audio features took %.1f seconds' % (time.time() - start_time))

    gin_file = os.path.join(model_dir, 'operative_config-0.gin')

    # Parse gin config,
    with gin.unlock_config():
        gin.parse_config_file(gin_file, skip_unknown=True)

    # Use latest checkpoint in the folder, 'ckpt-[iter]`.
    ckpt_files = [f for f in tf.io.gfile.listdir(model_dir) if 'ckpt' in f]
    step_of = lambda f: int(f.split('.')[0].split('-')[1])
    latest = max([step_of(f) for f in ckpt_files])
    ckpt_name = [i for i in ckpt_files if step_of(i) == latest][0].split('.')[0]
    ckpt = os.path.join(model_dir, ckpt_name)

    # Ensure dimensions and sampling rates are equal
    time_steps_train = gin.query_parameter('F0LoudnessPreprocessor.time_steps')
    n_samples_train = gin.query_parameter('Harmonic.n_samples')
    hop_size = int(n_samples_train / time_steps_train)

    time_steps = int(audio.shape[1] / hop_size)
    n_samples = time_steps * hop_size


    # -----------  Load Model for decoding ----------------
    gin_params = [
        'Harmonic.n_samples = {}'.format(n_samples),
        'FilteredNoise.n_samples = {}'.format(n_samples),
        'F0LoudnessPreprocessor.time_steps = {}'.format(time_steps),
        'oscillator_bank.use_angular_cumsum = True',  # Avoids cumsum accumulation errors.
    ]

    with gin.unlock_config():
        gin.parse_config(gin_params)

    # Trim all input vectors to correct lengths 
    for key in ['f0_hz', 'f0_confidence', 'loudness_db']:
        audio_features[key] = audio_features[key][:time_steps]
    audio_features['audio'] = audio_features['audio'][:, :n_samples]


    # Set up the model just to predict audio given new conditioning
    start_time = time.time()
    model = ddsp.training.models.Autoencoder()
    model.restore(ckpt)

    # Build model by running a batch through it.
    out = model(audio_features, training=False)
    print('Restoring model took %.1f seconds' % (time.time() - start_time))
    return model, audio_features, out

model_src, src_features, src_out = load_model_for_audio_size(model_dir, audio_src)
play(src_out['audio_synth'])

In [None]:
#@title Target Timbre Audio

model_target, target_features, target_out = load_model_for_audio_size(model_dir, audio_target)
play(target_out['audio_synth'])


In [34]:
#@title Get dataset statistics for auto adjustment
from ddsp.colab.colab_utils import fit_quantile_transform, detect_notes

na = None


def get_dataset_statistics(audio_features):
    f0 = audio_features['f0_hz'][na]
    loudness = audio_features['loudness_db'][na]
    f0_conf = audio_features['f0_confidence'][na]
    trim_end = 20
    f0_trimmed = f0[:, :-trim_end]
    l_trimmed = loudness[:, :-trim_end]
    f0_conf_trimmed = f0_conf[:, :-trim_end]
    mask_on, _ = detect_notes(l_trimmed, f0_conf_trimmed)
    quantile_transform = fit_quantile_transform(l_trimmed, mask_on)

    # Average values.
    mean_pitch = np.mean(ddsp.core.hz_to_midi(f0_trimmed[mask_on]))
    mean_loudness = np.mean(l_trimmed)
    mean_max_loudness = np.mean(np.max(l_trimmed, axis=0))

    # Object to pickle all the statistics together.
    ds = {'mean_pitch': mean_pitch,
        'mean_loudness': mean_loudness,
        'mean_max_loudness': mean_max_loudness,
        'quantile_transform': quantile_transform}
    return ds


In [40]:
#@title Auto Adjust Settings
#@markdown You can leave this at 1.0 for most cases
threshold = 1 #@param {type:"slider", min: 0.0, max:2.0, step:0.01}


#@markdown ## Automatic

ADJUST = True #@param{type:"boolean"}

#@markdown Quiet parts without notes detected (dB)
quiet = 20 #@param {type:"slider", min: 0, max:60, step:1}

#@markdown Force pitch to nearest note (amount)
autotune = 0.4 #@param {type:"slider", min: 0.0, max:1.0, step:0.1}

#@markdown ## Manual


#@markdown Shift the pitch (octaves)
pitch_shift =  0 #@param {type:"slider", min:-2, max:2, step:1}

#@markdown Adjsut the overall loudness (dB)
loudness_shift = 0 #@param {type:"slider", min:-20, max:20, step:1}



mixing_factor = 1 #@param {type:"slider", min:0, max:1, step:0.1}

In [None]:
#@title Magic
# Auto Adjust

## Helper functions.
def shift_ld(audio_features, ld_shift=0.0):
    """Shift loudness by a number of ocatves."""
    audio_features['loudness_db'] += ld_shift
    return audio_features


def shift_f0(audio_features, pitch_shift=0.0):
    """Shift f0 by a number of ocatves."""
    audio_features['f0_hz'] *= 2.0 ** (pitch_shift)
    audio_features['f0_hz'] = np.clip(audio_features['f0_hz'], 
                                    0.0, 
                                    librosa.midi_to_hz(110.0))
    return audio_features


def auto_adjust(audio_features, src_statistics, target_statistics, mixing_factor):
    # Detect sections that are "on".
    audio_features_mod = {k: tf.identity(v) for k, v in audio_features.items()}
    mask_on, note_on_value = detect_notes(audio_features['loudness_db'],
                                        audio_features['f0_confidence'],
                                        threshold)

    if np.any(mask_on):
        # Shift the pitch register.
        target_mean_pitch = src_statistics['mean_pitch'] * (1 - mixing_factor) + \
                            target_statistics['mean_pitch'] * mixing_factor
        pitch = ddsp.core.hz_to_midi(audio_features['f0_hz'])
        mean_pitch = np.mean(pitch[mask_on])
        p_diff = target_mean_pitch - mean_pitch
        p_diff_octave = p_diff / 12.0
        round_fn = np.floor if p_diff_octave > 1.5 else np.ceil
        p_diff_octave = round_fn(p_diff_octave)
        audio_features_mod = shift_f0(audio_features_mod, p_diff_octave)

        # Quantile shift the note_on parts.
        _, loudness_norm_src = colab_utils.fit_quantile_transform(
            audio_features['loudness_db'],
            mask_on,
            inv_quantile=src_statistics['quantile_transform'])
        
        _, loudness_norm_target = colab_utils.fit_quantile_transform(
            audio_features['loudness_db'],
            mask_on,
            inv_quantile=target_statistics['quantile_transform'])
        loudness_norm = loudness_norm_src * (1 - mixing_factor) + \
                        loudness_norm_target * mixing_factor

        # Turn down the note_off parts.
        mask_off = np.logical_not(mask_on)
        loudness_norm[mask_off] -=  quiet * (1.0 - note_on_value[mask_off][:, np.newaxis])
        loudness_norm = np.reshape(loudness_norm, audio_features['loudness_db'].shape)
        
        audio_features_mod['loudness_db'] = loudness_norm 

        # Auto-tune.
        if autotune:
            f0_midi = np.array(ddsp.core.hz_to_midi(audio_features_mod['f0_hz']))
            tuning_factor = get_tuning_factor(f0_midi, audio_features_mod['f0_confidence'], mask_on)
            f0_midi_at = auto_tune(f0_midi, tuning_factor, mask_on, amount=autotune)
            audio_features_mod['f0_hz'] = ddsp.core.midi_to_hz(f0_midi_at)
        return audio_features_mod



# def interpolate_features(src_features, target_out, mixing_factor=0.5, mixing_features=['z']):
mixing_factor=1.0
mixing_features=['z']

interpolation_latents = {k: v.copy() for k, v in src_features.items()}
# Manual Shifts.
interpolation_latents = shift_ld(interpolation_latents, loudness_shift)
interpolation_latents = shift_f0(interpolation_latents, pitch_shift)
# Feature interpolations
for feature in mixing_features:
    interpolation_latents[feature] = src_out[feature] * (1 - mixing_factor) + \
        tf.reduce_mean(target_out[feature], axis=1, keepdims=True) * mixing_factor
# Auto adjust
target_statistics = get_dataset_statistics(target_features)
src_statistics = get_dataset_statistics(src_features)

interpolation_latents = auto_adjust(interpolation_latents, src_statistics, target_statistics, mixing_factor)
if model_src.preprocessor is not None:
    interpolation_latents.update(model_src.preprocessor(interpolation_latents, training=False))

interpolation_latents.update(model_src.decoder(interpolation_latents))
pg_out = model_src.processor_group(interpolation_latents, return_outputs_dict=True)
interpolation_audio = pg_out['signal']

play(interpolation_audio)


In [None]:
#@title # Uploading to s3

os.makedirs("/root/.aws", exist_ok=True)
with open("/root/.aws/credentials", "w") as private_key:
    print("aws_access_key_id")
    private_key.write(f"[default]\naws_access_key_id = {getpass.getpass()}\n")
    print("aws_secret_access_key")
    private_key.write(f"aws_secret_access_key = {getpass.getpass()}\n")
!aws s3 sync {results_dir} {s3_bucket} && rm -r /root/.aws

aws_access_key_id
··········
aws_secret_access_key
··········
upload: s3/samples/vozes.mp3 to s3://niels-warncke-experiments/samples/vozes.mp3
