In [None]:
import comet_ml
COMET_API_KEY = "Qinr5yrH2KKQJvTcie3hVIYK3"

import tensorflow as tf

import mitdeeplearning as mdl

import numpy as np
import os
import time
import functools
from IPython import display as ipythondisplay
from tqdm import tqdm
from scipy.io.wavfile import write

# assert len(tf.config.list_physical_devices('GPU')) > 0
# assert COMET_API_KEY != "", "Please insert your Comet API Key"

In [None]:
song = mdl.lab1.load_training_data()

example_song = song[0]
print("\nExample song:")
print(example_song)

In [None]:
mdl.lab1.play_song(example_song)

In [None]:
songs_joined = "\n\n".join(song)

vocab = sorted(set(songs_joined))
print(f"There are {len(vocab)} unique characters in the dataset")
print(vocab)

In [None]:
char2idx = {u:i for i, u in enumerate(vocab)}
idx2char = np.array(vocab)

In [None]:
print('{')
for char,_ in zip(char2idx, range(20)):
    print('  {:4s}: {:3d},'.format(repr(char), char2idx[char]))
print('  ...\n}')

In [None]:
def vectorize_string(string):
    vectorized_output = np.array([char2idx[char] for char in string])
    return vectorized_output

vectorized_songs = vectorize_string(songs_joined)

In [None]:
print ('{} ---- characters mapped to int ----> {}'.format(repr(songs_joined[:10]), vectorized_songs[:10]))
# check that vectorized_songs is a numpy array
assert isinstance(vectorized_songs, np.ndarray), "returned result should be a numpy array"

In [None]:
def get_batch(vectorized_songs, seq_length, batch_size):
    n = vectorized_songs.shape[0] - 1
    idx = np.random.choice(n-seq_length, batch_size)
    input_batch = [vectorized_songs[i : i+seq_length] for i in idx]
    output_batch = [vectorized_songs[i+1 : i+1+seq_length] for i in idx]
    x_batch = np.reshape(input_batch, [batch_size, seq_length])
    y_batch = np.reshape(output_batch, [batch_size, seq_length])
    return x_batch, y_batch

test_args = (vectorized_songs, 10, 2)
if not mdl.lab1.test_batch_func_types(get_batch, test_args) or \
   not mdl.lab1.test_batch_func_shapes(get_batch, test_args) or \
   not mdl.lab1.test_batch_func_next_step(get_batch, test_args):
   print("======\n[FAIL] could not pass tests")
else:
   print("======\n[PASS] passed all tests!")

In [None]:
x_batch, y_batch = get_batch(vectorized_songs, seq_length=5, batch_size=1)

for i, (input_idx, target_idx) in enumerate(zip(np.squeeze(x_batch), np.squeeze(y_batch))):
    print("Step {:3d}".format(i))
    print("  input: {} ({:s})".format(input_idx, repr(idx2char[input_idx])))
    print("  expected output: {} ({:s})".format(target_idx, repr(idx2char[target_idx])))

In [None]:
def LSTM(rnn_units):
    return tf.keras.layers.LSTM(
        rnn_units,
        return_sequences=True,
        recurrent_initializer='glorot_uniform',
        recurrent_activation='sigmoid',
        stateful=True,
    )

In [None]:
def build_model(vocab_size, embedding_dim, rnn_units, batch_size):
    model = tf.keras.Sequential([
        tf.keras.layers.Embedding(vocab_size, embedding_dim),
        LSTM(rnn_units),
        tf.keras.layers.Dense(vocab_size)
    ])
    return model

with tf.device('/CPU:0'):
    a = tf.constant([1, 2, 3])
    b = tf.constant([4, 5, 6])
    result = tf.add(a, b)
    print(result)
    
model = build_model(len(vocab), embedding_dim=256, rnn_units=1024, batch_size=32)
model.build(tf.TensorShape([32, 100]))

In [None]:
model.summary()

In [None]:
x, y = get_batch(vectorized_songs, seq_length=100, batch_size=32)
pred = model(x)
print("Input shape:      ", x.shape, " # (batch_size, sequence_length)")
print("Prediction shape: ", pred.shape, "# (batch_size, sequence_length, vocab_size)")

In [None]:
sampled_indices = tf.random.categorical(pred[0], num_samples=1)
sampled_indices = tf.squeeze(sampled_indices,axis=-1).numpy()
sampled_indices

In [None]:
print("Input: \n", repr("".join(idx2char[x[0]])))
print()
print("Next Char Predictions: \n", repr("".join(idx2char[sampled_indices])))

In [None]:
### Defining the loss function ###

def compute_loss(labels, logits):
    loss = tf.keras.losses.sparse_categorical_crossentropy(labels, logits, from_logits=True)
    return loss

example_batch_loss = compute_loss(y, pred)

print(f'Prediction shape: {pred.shape} # (batch_size, sequence_length, vocab_size)')
print(f'scalar_loss:      {example_batch_loss.numpy().mean()}')

In [None]:
### Hypreparameter setting and optimization ###

vocab_size = len(vocab)

# Model parameters:
params = dict(
    num_training_iterations = 300, # Increase this to train longer
    batch_size = 8, # Experiment between 1 and 64
    seq_length = 100, # Experiment between 50 and 500
    learning_rate = 5e-3, # Experiment between 1e-5 and 1e-1
    embeding_dim = 256,
    rnn_units = 1024, # Experiment berween 1 and 2048
)

# Checkpoint location:
chechpoint_dir = './training_checkpoints'
checkpoint_prefix = os.path.join(chechpoint_dir, "my_ckpt.weights.h5")
os.makedirs(checkpoint_prefix, exist_ok=True)

In [None]:
### Create a Comet expriment to track our training run ###

def create_experiment():
    # end any prior experiments
    if 'experiment' in locals():
        experiment.end()
        
    # initiate the comet experiment for tracking
    experiment = comet_ml.Experiment(
        api_key=COMET_API_KEY,
        project_name="6S191_Lab1_Part2"
    )
    # log our hyperparameters, defined above, to the experiment
    for param, value in params.items():
        experiment.log_parameter(param, value)
    experiment.flush()
    
    return experiment

In [None]:
### Define optimizer and training operation ###

model = build_model(vocab_size, params["embeding_dim"], params["rnn_units"], params["batch_size"])

optimizer = tf.keras.optimizers.Adam(params["learning_rate"])

@tf.function
def train_step(x, y):
    # Use tf.GradientTape()
    with tf.GradientTape() as tape:

        '''feed the current input into the model and generate predictions'''
        y_hat = model(x) 

        '''compute the loss!'''
        loss = compute_loss(y, y_hat) 

        # Now, compute the gradients
        grads = tape.gradient(loss, model.trainable_variables) 

    # Apply the gradients to the optimizer so it can update the model accordingly
    optimizer.apply_gradients(zip(grads, model.trainable_variables))
    return loss

##################
# Begin training!#
##################

history = []
plotter = mdl.util.PeriodicPlotter(sec=2, xlabel='Iterations', ylabel='Loss')
experiment = create_experiment()

if hasattr(tqdm, '_instances'): tqdm._instances.clear() # clear if it exists
for iter in tqdm(range(params["num_training_iterations"])):

  # Grab a batch and propagate it through the network
  x_batch, y_batch = get_batch(vectorized_songs, params["seq_length"], params["batch_size"])
  loss = train_step(x_batch, y_batch)

  # log the loss to the Comet interface! we will be able to track it there.
  experiment.log_metric("loss", loss.numpy().mean(), step=iter)
  # Update the progress bar and also visualize within notebook
  history.append(loss.numpy().mean())
  plotter.plot(history)

  # Update the model with the changed weights!
  if iter % 100 == 0:
    model.save_weights(checkpoint_prefix)

# Save the trained model and the weights
model.save_weights(checkpoint_prefix)
experiment.flush()
