# Timbre transfer demo

In [None]:
DRIVE_BASE_DIR = '/content/drive/MyDrive/SMC 10/DDSP-10/'

from google.colab import drive
drive.mount('/content/drive')
!pip install -qU /content/drive/MyDrive/SMC\ 10/DDSP-10/dist/ddsp-1.2.0.tar.gz

### Imports

In [None]:
from ddsp.training import train_util, nn, models, trainers, decoders, preprocessing
from ddsp import synths, processors, losses, core
from ddsp.colab import colab_utils

import tensorflow as tf
import numpy as np
import random
import sys
import copy
from IPython.display import display, Audio
from google.colab import output
from tqdm.notebook import tqdm

import seaborn as sns
from matplotlib import pyplot as plt
sns.set(style="whitegrid")
%config InlineBackend.figure_format='retina'

tfkl = tf.keras.layers

### Setup

In [None]:
SAMPLE_RATE = 48000
DURATION = 2
LR = 1e-3

N_SAMPLES = int(SAMPLE_RATE * DURATION)
save_dir= DRIVE_BASE_DIR + 'timbre_transfer'

In [None]:
# connections=[[43,32,21],[42,32,21],[42,31,21],[43,42,31,21],[41,31,21],
#              [42,31],[42,32],[42,32,31],[43,32],[43,32,31],
#              [43],[43,42],[43,42,41],
#              []]

# for f in connections:
#   mods=[]
#   mods+=[1] if 21 in f else [0]
#   mods+=[1] if 31 in f else [0]
#   mods+=[1] if 32 in f else [0]
#   mods+=[1] if 41 in f else [0]
#   mods+=[1] if 42 in f else [0]
#   mods+=[1] if 43 in f else [0]
#   print(mods)

In [None]:
algorithms = [[1,0,0,0], [1,0,0,0], [1,0,0,0], [1,0,0,0], [1,0,0,0],
              [1,1,0,0], [1,1,0,0], [1,1,0,0], [1,1,0,0], [1,1,0,0], 
              [1,1,1,0], [1,1,1,0], [1,1,1,0],
              [1,1,1,1]] 

modulators = [
              [1, 0, 1, 0, 0, 1],
              [1, 0, 1, 0, 1, 0],
              [1, 1, 0, 0, 1, 0],
              [1, 1, 0, 0, 1, 1],
              [1, 1, 0, 1, 0, 0],
              [0, 1, 0, 0, 1, 0],
              [0, 0, 1, 0, 1, 0],
              [0, 1, 1, 0, 1, 0],
              [0, 0, 1, 0, 0, 1],
              [0, 1, 1, 0, 0, 1],
              [0, 0, 0, 0, 0, 1],
              [0, 0, 0, 0, 1, 1],
              [0, 0, 0, 1, 1, 1],
              [0, 0, 0, 0, 0, 0],
              ]

### Synth
Uses scaling, but since we are calling `get_signal` directly, no scaling is involved. 

In [None]:
fm = synths.FrequencyModulation(n_samples=N_SAMPLES, 
                                sample_rate=SAMPLE_RATE,
                                index_scale = True,
                                name='fm')

### Network architecture

In [None]:
class AudioAE(nn.OutputSplitsLayer):

  def __init__(self, 
               steps=4,
               max_units=64,
               input_keys=['f0_scaled'],
               output_splits=(('op1', 3), ('op2', 3), ('op3', 3), ('op4', 3), 
                              ('modulators', 6)),
               **kwargs):

    super().__init__(
        input_keys=input_keys, output_splits=output_splits, **kwargs)

    layers = []
    self.steps = steps
    self.max_units = max_units 
    enc_ls = np.linspace(1,self.max_units,steps)
    dec_ls = np.linspace(self.max_units,18,steps)

    for f in range(1,self.steps-1):
      layers.append(tfkl.Dense(int(enc_ls[f]), activation='relu'))
    layers.append(tfkl.Dense(max_units, activation='relu'))
    for f in range(1,self.steps-1):
      layers.append(tfkl.Dense(int(dec_ls[f]), activation='relu'))
    
    # print(layers)

    # layers = [
    #           # tfkl.Dense(128, activation='relu'), 
    #           # tfkl.Dense(64),
    #           # tfkl.LayerNormalization(),
    #           # tfkl.Activation('relu'),
    # ]
    self.nn = tf.keras.Sequential(layers)

  def compute_output(self, *inputs):
    x = tf.concat(inputs, axis=-1)
    x = self.nn(x)
    return x

### Dataset

In [None]:
n_batch = 1
n_frames = 1

inputs = []

algorithm = np.random.randint(len(algorithms))

a_c = algorithms[algorithm]
i_c = 0.5 + np.random.randint(17, size=4)/2
e_c = np.random.rand(4)
m_c = np.random.randint(19, size=6)/2 * modulators[algorithm]

a_c[0] = 1
i_c[0] = 1

# a_c[0] = 1
# a_c[1] = 1
# a_c[2] = 1
# a_c[3] = 0

# i_c[0] = 1
# i_c[1] = 0.2124433530542842
# i_c[2] = 0.7860434189145742
# i_c[3] = 0.6474421104585822

# e_c[0] = 0.6355107266065505
# e_c[1] = 0.045662207156135226
# e_c[2] = 0.11993143823966657
# e_c[3] = 0.8059533690405574

for midinote in range(45,93):
  
  freq = core.midi_to_hz(midinote)

  # audio = fm.get_signal([[[freq]]],
  #                     [[[1, 1, 1]]], 
  #                     [[[1, 0.01, 1]]], 
  #                     [[[1, .5, 1]]], 
  #                     [[[0, 0, 0]]],
  #                     [[[0, 0, 0, 0, 0, 0]]],
  #                   )

  audio = fm.get_signal([[[freq]]],
                    [[[a_c[0], i_c[0], e_c[0]]]], 
                    [[[a_c[1], i_c[1], e_c[1]]]], 
                    [[[a_c[2], i_c[2], e_c[2]]]], 
                    [[[a_c[3], i_c[3], e_c[3]]]],
                    [[m_c]],
                  )
  
  f0 = np.ones([n_batch, n_frames, 1])*freq
  input = {
    'f0': f0,
    'f0_scaled': [[[midinote / 127.0]]],
    'audio': audio,
  }
  input = {k: core.tf_float32(v) for k, v in input.items()}
  inputs.append(input)

sorted_inputs = copy.deepcopy(inputs)
random.shuffle(inputs)

In [None]:
STEPS = 4
MAX_UNITS = 64

element = 36 #np.random.randint(len(inputs))
display(Audio(sorted_inputs[element]['audio'], rate=SAMPLE_RATE))

for f in range(4):
  print(a_c[f], i_c[f] , e_c[f])
print(m_c)

colab_utils.specplot(sorted_inputs[element]['audio'])


### Decoder, processor group, loss

In [None]:
decoder = AudioAE(
    input_keys = ['f0_scaled'],
    steps=STEPS,
    max_units=MAX_UNITS,
)

dag = [(fm, 
        ['f0', 'op1', 'op2', 'op3', 'op4', 'modulators']),
]

processor_group = processors.ProcessorGroup(dag=dag, 
                                            name='processor_group')

spectral_loss = losses.SpectralLoss(
    #fft_sizes=(4096, 64),
    fft_sizes=(2048, 1024, 512, 256, 128, 64),
    loss_type='L1',
    mag_weight=1.0,
    delta_time_weight=0.0,
    delta_freq_weight=0.0,
    cumsum_freq_weight=0.0,
    logmag_weight=0.0,
    loudness_weight=0.0,
    )

### Trainer

In [None]:
strategy = train_util.get_strategy()
with strategy.scope():
  model = models.Autoencoder(preprocessor=None,
                             encoder=None,
                             decoder=decoder,
                             processor_group=processor_group, 
                             losses=[spectral_loss])
  trainer = trainers.Trainer(model, 
                             strategy, 
                             learning_rate=LR,
                             checkpoints_to_keep = 1)


### Generator

In [None]:
# def gen_inputs():
#   i = 0
#   while True:
#     yield inputs[i]['f0'], inputs[i]['f0_scaled'], inputs[i]['audio'] 
#     i = (i+1) % len(inputs)

# dataset = tf.data.Dataset.from_generator(
#     gen_inputs,
#     output_signature=(
#       tf.TensorSpec(shape=(n_frames, 1), dtype=tf.float32),
#       tf.TensorSpec(shape=(n_frames, 1), dtype=tf.float32),
#       tf.TensorSpec(shape=(N_SAMPLES), dtype=tf.float32),
#     )
# )

# dataset = dataset.repeat(-1)
# dataset = dataset.batch(8, drop_remainder=True)
# dataset = dataset.prefetch(buffer_size=tf.data.experimental.AUTOTUNE)
# dataset = trainer.distribute_dataset(dataset)
# dataset_iter = iter(dataset)

# trainer.build(next(dataset_iter))
trainer.build(inputs[0])

In [None]:
# next(dataset_iter)

### Train the model

In [None]:
min_loss = 9999
average = []

In [None]:
STOP_AT = 0.

new_line = True
values = range(1000000)
with tqdm(total=len(values), initial=trainer.step.numpy(), unit_scale=True, unit="epoch") as pbar:

  for i in values:

    step = trainer.step.numpy()

    train_losses = trainer.train_step(inputs[i%len(inputs)])
    v = train_losses['spectral_loss'].numpy()
    res_str = f'spectral_loss: {v:.3f}\t'

    if v < min_loss:
      min_loss = v
      trainer.save(save_dir)
      if i > 25:
        output.eval_js('new Audio("https://freesound.org/data/previews/253/253168_4404552-lq.ogg").play()')
      if new_line:
        print()
        new_line = False
    res_str += f' min_loss: {min_loss:.3f}\t'

    average.append(v)
    if (len(average)>1000):
      average = average[:-1000]
    res_str += f' avg_loss: {np.average(average):.3f}\t'
    res_str += f' step: {step}'

    if v <= STOP_AT:
      print(f'\n*** STOP *** spectral_loss: {v:.3f}')
      output.eval_js('new Audio("https://freesound.org/data/previews/80/80921_1022651-lq.ogg").play()')
      # summary_writer.flush()
      break

    if step%50==0:
      sys.stdout.write("\r" + res_str)
      sys.stdout.flush()
      pbar.update(50)
      new_line = True


In [None]:
trainer.restore(save_dir)

In [None]:
i = np.random.randint(len(sorted_inputs))
i = 36

controls =  model(sorted_inputs[i])
audio_gen = model.get_audio_from_outputs(controls)

print("Original")
for f in range(4):
  print(a_c[f], i_c[f] , e_c[f])
print(m_c)

print("Reconstructed")
print(controls['fm']['controls']['op1'].numpy()[0,0])
print(controls['fm']['controls']['op2'].numpy()[0,0])
print(controls['fm']['controls']['op3'].numpy()[0,0])
print(controls['fm']['controls']['op4'].numpy()[0,0])
print(controls['fm']['controls']['modulators'].numpy()[0,0])

print("Original audio")
display(Audio(sorted_inputs[i]['audio'], rate=SAMPLE_RATE))
colab_utils.specplot(sorted_inputs[i]['audio'])

print("Reconstructed audio")
display(Audio(audio_gen, rate=SAMPLE_RATE))
colab_utils.specplot(audio_gen)


In [None]:
predicted = np.empty((len(sorted_inputs),18))
for f in range(len(sorted_inputs)):
  a = []
  controls =  model(sorted_inputs[f])
  a += controls['fm']['controls']['op1'].numpy()[0,0].tolist()
  a += controls['fm']['controls']['op2'].numpy()[0,0].tolist()
  a += controls['fm']['controls']['op3'].numpy()[0,0].tolist()
  a += controls['fm']['controls']['op4'].numpy()[0,0].tolist()
  a += controls['fm']['controls']['modulators'].numpy()[0,0].tolist()
  predicted[f] = a


In [None]:
f, ax = plt.subplots(2, 2, figsize=(15, 5), sharex=True)

ax[0][0].set_title('Output levels')
for f in range(4):
  ax[0][0].plot(predicted[:,f*3])
ax[0][0].legend(['$A_1$','$A_2$','$A_3$','$A_4$'])

ax[0][1].set_title('Freq. factor')
for f in range(4):
  ax[0][1].plot(predicted[:,f*3+1])
ax[0][1].legend(['$I_1$','$I_2$','$I_3$','$I_4$'])

ax[1][0].set_title('Envelopes')
for f in range(4):
  ax[1][0].plot(predicted[:,f*3+2])
ax[1][0].legend(['$E_1$','$E_2$','$E_3$','$E_4$'])

ax[1][1].set_title('Modulators')
for f in range(6):
  ax[1][1].plot(predicted[:,12+f])
ax[1][1].legend(['$M_{2,1}$', '$M_{3,1}$', '$M_{3,2}$', '$M_{4,1}$', '$M_{4,2}$', '$M_{4,3}$'])
plt.show()