# M4L - Timbre Transfer 

"Have fun! And please feel free to hack this notebook to make your own creative interactions.“[- Magenta](https://colab.research.google.com/github/magenta/ddsp/blob/master/ddsp/colab/demos/timbre_transfer.ipynb)

This is such a hacked notebook. It contains a loop that searches a Google Drive folder for new files to process, and have a m4l-devices for settings and download of processed files.

### Instructions for running:

* Make sure to use a GPU runtime, click:  __Runtime >> Change Runtime Type >> GPU__
* Press ▶️ on the left of each of the cells
* View the code: Double-click any of the cells
* Hide the code: Double click the right side of the cell



Imports

In [0]:
#@title Install and Import

import sys
import subprocess
import pkg_resources

def install(package):
    subprocess.check_call([sys.executable, "-m", "pip", "install", "--upgrade", package])

print('Installing and upgrading packages...')

# List of packages to install/upgrade
packages = [
    "pip==23.3.1",
    "numpy==1.24.3",
    "scipy",
    "matplotlib",
    "librosa==0.9.2",
    "pydub",
    "google-auth-oauthlib==0.4.6",
    "pydrive",
    "protobuf==3.20.0",
    "tensorflow==2.12.0",
    "tensorflow-probability==0.19.0",
    "crepe",
    "ddsp"
]

# Install packages
for package in packages:
    install(package)

# Reload the sys module to ensure we're using the newly installed packages
import importlib
importlib.reload(sys)

# Ignore deprecation warnings
import warnings
warnings.filterwarnings("ignore")

# Import required modules
import copy
import os
import time 
import json
import crepe
import ddsp
import ddsp.training
from ddsp.colab import colab_utils
from ddsp.colab.colab_utils import (
    auto_tune, get_tuning_factor, play, record, 
    specplot, upload, DEFAULT_SAMPLE_RATE
)
from ddsp.training import postprocessing
detect_notes = postprocessing.detect_notes
fit_quantile_transform = postprocessing.fit_quantile_transform

import gin
from google.colab import files
import librosa
import matplotlib.pyplot as plt
import numpy as np
import tensorflow as tf
import tensorflow_datasets as tfds

# Helper Functions
sample_rate = DEFAULT_SAMPLE_RATE  # 16000

import base64
import io
import tempfile
import pickle

from IPython import display
from pydub import AudioSegment
from scipy.io import wavfile

from google.colab import output
from google.colab import files as colab_files
download = colab_files.download

from pydrive.auth import GoogleAuth
from pydrive.drive import GoogleDrive
from google.colab import auth
from oauth2client.client import GoogleCredentials

print('Done!')

# Verify installations
installed_packages = pkg_resources.working_set
installed_packages_list = sorted([f"{i.key}=={i.version}" for i in installed_packages])
print("Installed packages:")
for package in installed_packages_list:
    print(package)

# Print DDSP version
print(f"DDSP version: {ddsp.__version__}")

##Login for authentication for Google Drive access

In [0]:
auth.authenticate_user()
gauth = GoogleAuth()
gauth.credentials = GoogleCredentials.get_application_default()
drive = GoogleDrive(gauth)


# Load functionality from DDSP 'Timbre Transfer Demo Notebook'

Essentially copy/paste from the DDSP 'Timbre Transfer Demo Notebook', made to work for this. Added functions for file management. 

In [0]:
#@title Whole lotta code
from tensorflow.python.ops.numpy_ops import np_config
np_config.enable_numpy_behavior()

gdID = None
file_list = drive.ListFile({'q': "'root' in parents and trashed=false and title = 'M4L-Timbre-Transfer-Folder'"}).GetList()
for file1 in file_list:
  print('title: %s, id: %s' % (file1['title'], file1['id']))
  gdID = file1['id']

audio = None
audio_features = None
audio_features_mod = None
def p4load(fileName):
  global audio, audio_features, audio_features_mod
  anp, sr = load_audio(fileName)
  print("anp", anp)
  audio = anp[np.newaxis, :]
  specplot(audio)
  play(audio)

  # Setup the session.
  ddsp.spectral_ops.reset_crepe()

  start_time = time.time()
  audio_features = ddsp.training.metrics.compute_audio_features(audio)
  audio_features['loudness_db'] = audio_features['loudness_db'].numpy().astype(np.float32)
  audio_features_mod = None
  print('Audio features took %.1f seconds' % (time.time() - start_time))

TRIM = -15

def load_audio(path):
    audio_np, unused_sr = librosa.core.load(path, sr=16000)
    return audio_np.astype(np.float32), unused_sr

# The rest of your code remains unchanged
def p4model(m):
    global audio_features_mod, audio, audio_features
    global model, MODEL
    model = m
    MODEL = m
    GCS_CKPT_DIR = 'gs://ddsp/models/tf2'
    print("m", m, "model", model, "MODEL", MODEL)
    if model in ('Violin', 'Flute', 'Flute2', 'Trumpet', 'Tenor_Saxophone'):
        # Pretrained models.
        PRETRAINED_DIR = '/content/pretrained'
        # Copy over from gs:// for faster loading.
        !rm -r $PRETRAINED_DIR &> /dev/null
        !mkdir $PRETRAINED_DIR &> /dev/null
        GCS_CKPT_DIR = 'gs://ddsp/models/tf2'
        model_dir = os.path.join(GCS_CKPT_DIR, 'solo_%s_ckpt' % model.lower())
        
        !gsutil cp $model_dir/* $PRETRAINED_DIR &> /dev/null
        model_dir = PRETRAINED_DIR
        gin_file = os.path.join(model_dir, 'operative_config-0.gin')

    # Load the dataset statistics.
    DATASET_STATS = None
    dataset_stats_file = os.path.join(model_dir, 'dataset_statistics.pkl')
    print(f'Loading dataset statistics from {dataset_stats_file}')
    try:
      if tf.io.gfile.exists(dataset_stats_file):
        with tf.io.gfile.GFile(dataset_stats_file, 'rb') as f:
          DATASET_STATS = pickle.load(f)
    except Exception as err:
      print('Loading dataset statistics from pickle failed: {}.'.format(err))

    # Parse gin config,
    with gin.unlock_config():
      gin.parse_config_file(gin_file, skip_unknown=True)
    
    # Assumes only one checkpoint in the folder, 'ckpt-[iter]`.
    ckpt_files = [f for f in tf.io.gfile.listdir(model_dir) if 'ckpt' in f]
    ckpt_name = ckpt_files[0].split('.')[0]
    ckpt = os.path.join(model_dir, ckpt_name)
    
    # Ensure dimensions and sampling rates are equal
    time_steps_train = gin.query_parameter('DefaultPreprocessor.time_steps')
    n_samples_train = gin.query_parameter('Additive.n_samples')
    hop_size = int(n_samples_train / time_steps_train)
    
    time_steps = int(audio.shape[1] / hop_size)
    n_samples = time_steps * hop_size
    
    gin_params = [
        'RnnFcDecoder.input_keys = ("f0_scaled", "ld_scaled")',
        'Additive.n_samples = {}'.format(n_samples),
        'FilteredNoise.n_samples = {}'.format(n_samples),
        'DefaultPreprocessor.time_steps = {}'.format(time_steps),
    ]
    
    with gin.unlock_config():
      gin.parse_config(gin_params)
    
    # Trim all input vectors to correct lengths 
    for key in ['f0_hz', 'f0_confidence', 'loudness_db']:
      audio_features[key] = audio_features[key][:time_steps]
    audio_features['audio'] = audio_features['audio'][:, :n_samples]
    
    # Set up the model just to predict audio given new conditioning
    model = ddsp.training.models.Autoencoder()
    model.restore(ckpt)
    
    # Build model by running a batch through it.
    start_time = time.time()
    _ = model(audio_features, training=False)
    print('Restoring model took %.1f seconds' % (time.time() - start_time))

print('DONE')

# The rest of your functions (p4modify, create_audio_file, p4makeSave2) remain unchanged

This continously checks for new files in Google Drive, processes them, and uploaded the transferred audio.

In [0]:
#@title LOOP
import time

count = 0
found = False

while True:
    try:
        if not found:
            # FIND JSON
            file_list = drive.ListFile(
                {'q': f"'{gdID}' in parents and trashed=false and title='settings{count}.json'"}).GetList()
            
            for file1 in file_list:
                print(f'title: {file1["title"]}, id: {file1["id"]}')
                downloaded = drive.CreateFile({'id': file1['id']})
                content = downloaded.GetContentString()
                print(f'Downloaded content "{content}"')
                jf = json.loads(content)
                print('time passed:', ((time.time()) % 86400) - jf["time"])
                found = True
                break  # Exit the loop after processing the first file

        if found:
            # LOAD AUDIO
            file_list = drive.ListFile(
                {'q': f"'{gdID}' in parents and trashed=false and title='sendAudio{count}.wav'"}).GetList()
            
            for file1 in file_list:
                print(f'title: {file1["title"]}, id: {file1["id"]}')
                aud = drive.CreateFile({'id': file1['id']})
                aud.GetContentFile(f"ddsp{count}.wav")
                
                p4load(f"ddsp{count}.wav")
                play(audio)
                print(f"model from json is {jf['model']}")
                p4model(jf["model"])
                
                p4modify(jf["octave"], jf["loudness"], jf["threshold"], jf["auto"], jf["autotune"], jf["quiet"])
                
                p4makeSave2(f"audio-transferred{count}.wav")
                count += 1
                found = False
                break  # Exit the loop after processing the first file

        # Sleep for a short time to avoid excessive API calls
        time.sleep(5)

    except Exception as e:
        print(f"An error occurred: {e}")
        time.sleep(60)  # Wait for a minute before trying again