In [None]:
%load_ext autoreload
%autoreload 2
import tensorflow as tf
import ddsp
import ddsp.training
import numpy as np
import IPython.display

import gin
gin.enter_interactive_mode()

from thesis.notebook_util import play_audio

In [None]:
data_provider = ddsp.training.data.TFRecordProvider(
    file_pattern="/Users/vaclav/prog/thesis/data/violin2/violin2.tfrecord*",
    frame_rate=50,
    centered=True,
)
def representative_data_gen():
    dataset = data_provider.get_batch(batch_size=1, shuffle=True, repeats=1)
    for i, batch in zip(range(10), dataset):
        # Model has only one input so each data point has one element.
        yield [batch["audio"]]

In [None]:
x = list(representative_data_gen())

In [None]:
play_audio(x[0])

In [None]:
# TFLITE_FILE_PATH = "/cluster/scratch/vvolhejn/models/0503-ddspae-vst-cnn-2/export/tflite/model.tflite"
TFLITE_FILE_PATH = "/Volumes/euler/export/tflite/model_quantized.tflite"
interpreter = tf.lite.Interpreter(TFLITE_FILE_PATH)
my_signature = interpreter.get_signature_runner()

In [None]:
input_details = interpreter.get_input_details()

if input_details['dtype'] == np.int8:
    input_scale, input_zero_point = input_details["quantization"]
    test_image = test_image / input_scale + input_zero_point
    print(input_scale, input_zero_point)


In [None]:
input_details

In [None]:
interpreter.get_tensor_details()

In [None]:

# my_signature is callable with input as arguments.
#output = my_signature(x=tf.constant([1.0], shape=(1,10), dtype=tf.float32))
n_samples = 64000

audio = tf.cast(tf.reshape(tf.sin(tf.linspace(0, 2000, 64000) + (tf.linspace(0, 1, 64000) ** 2) * 2000), [64000]), tf.float32)
audio = tf.reshape(x[0], [64000])

output = my_signature(
    audio=tf.constant(audio, shape=(n_samples,), dtype=tf.float32),
)

In [None]:
output

In [None]:
harm_audio, final_phase = ddsp.core.streaming_harmonic_synthesis(
    frequencies=output["f0_hz"],
    amplitudes=output["amplitudes"],
    harmonic_distribution=output["harmonic_distribution"],
    initial_phase=tf.constant([0], shape=(1,1,1), dtype=tf.float32),
    n_samples=n_samples,
    sample_rate=16000,
    amp_resample_method="linear")

filtered_noise = ddsp.synths.FilteredNoise(n_samples=n_samples, window_size=0)

noise_audio = filtered_noise.get_signal(tf.expand_dims(output["noise_magnitudes"], axis=0))
audio_out = harm_audio + noise_audio

In [None]:
play_audio(audio_out)

In [None]:
my_signature.get_input_details()

In [None]:
def test(layers_per_stack, kernel_size, stacks):
    dec = ddsp.training.decoders.DilatedConvDecoder(
        ch=128,
        layers_per_stack=layers_per_stack,
        kernel_size=kernel_size,
        norm_type="layer",
        input_keys=("pw_scaled", "f0_scaled"),
        stacks=stacks,
        conditioning_keys = None,  # Nothing else than a latent, so no need to consider this separately
        precondition_stack = None,  # Not relevant since `conditioning_keys = None`
    #    output_splits = (('control_embedding', %decoder_output_channels),)
        output_splits = (('amps', 1),
                                  ('harmonic_distribution', 60),
                                  ('noise_magnitudes', 65)),
        resample_after_convolve = False,
    )

    n = 500
    y = dec({
        "pw_scaled": tf.constant([[0.5] * n], shape=(1,n,1), dtype=tf.float32),
        "f0_scaled": tf.constant([[0.5] * n], shape=(1,n,1), dtype=tf.float32)
    })
    dropped_actual = n - y["amps"].shape[1]

    stacks_correction = (kernel_size - 1) * (stacks - 1)
    dropped_predicted = (kernel_size - 1) * (stacks * 2 ** layers_per_stack) - stacks_correction

    msg = (f"predicted {dropped_predicted} and got {dropped_actual} "
        f"({layers_per_stack} {kernel_size} {stacks}) -> {dropped_predicted - dropped_actual}")
    print(msg)
    # assert dropped_predicted == dropped_actual, msg

In [None]:
for layers_per_stack in [1,2,3]:
    for kernel_size in [2,3]:
        for stacks in [1,2,3,4]:
            test(layers_per_stack, kernel_size, stacks)

In [None]:
def dummy_representative_dataset():
    for _ in range(100):
      pw_scaled=np.random.rand(1, 100, 1).astype(np.float32)
      f0_scaled=np.random.rand(1, 100, 1).astype(np.float32)
      yield [pw_scaled, f0_scaled]

In [None]:
class DecoderWrapper(tf.keras.models.Model):
    def __init__(self):
        super().__init__()

        self.dec = ddsp.training.decoders.DilatedConvDecoder(
            ch=128,
            layers_per_stack=2,
            kernel_size=3,
            norm_type="layer",
            input_keys=("pw_scaled", "f0_scaled"),
            stacks=2,
            conditioning_keys = None,  # Nothing else than a latent, so no need to consider this separately
            precondition_stack = None,  # Not relevant since `conditioning_keys = None`
        #    output_splits = (('control_embedding', %decoder_output_channels),)
            output_splits = (('amps', 1),
                                      ('harmonic_distribution', 60),
                                      ('noise_magnitudes', 65)),
            resample_after_convolve = False,
        )

    @tf.function
    def call(self, pw_scaled, f0_scaled):
        features = {
            "pw_scaled": pw_scaled,
            "f0_scaled": f0_scaled,
        }
        outputs = self.dec(features)

        return outputs

model = DecoderWrapper()

# Create a model using high-level tf.keras.* APIs
#model.compile(optimizer='sgd', loss='mean_squared_error') # compile the model
outputs = model(pw_scaled=np.random.rand(1, 100, 1).astype(np.float32), f0_scaled=np.random.rand(1, 100, 1).astype(np.float32))
outputs.keys()
# train the model
# (to generate a SavedModel) tf.saved_model.save(model, "saved_model_keras_dir")

In [None]:
saved_model_path = "/tmp/wrapper"
model.save(saved_model_path)

In [None]:
# Convert the model.
# converter = tf.lite.TFLiteConverter.from_keras_model(model)
converter = tf.lite.TFLiteConverter.from_saved_model(saved_model_path)

converter.optimizations = [tf.lite.Optimize.DEFAULT]
converter.representative_dataset = dummy_representative_dataset
tflite_model = converter.convert()

In [None]:
# Save the model.
dummy_path = '/tmp/dummy_model.tflite'
with open(dummy_path, 'wb') as f:
  f.write(tflite_model)

In [None]:
interpreter = tf.lite.Interpreter(dummy_path)
dummy_signature = interpreter.get_signature_runner()
dummy_signature(
    args_0=np.random.rand(1, 100, 1).astype(np.float32), args_1=np.random.rand(1, 100, 1).astype(np.float32)
)

In [None]:
interpreter.get_tensor_details()

In [None]:
dummy_signature.get_input_details()

In [None]:
interpreter.get_tensor_details()