In [8]:
import math
import numpy as np
import tensorflow as tf

class AttentionMatrix(tf.keras.layers.Layer):

    def __init__(self, *args, use_mask=False, **kwargs):
        super().__init__(*args, **kwargs)
        self.use_mask = use_mask

    def call(self, inputs):
        K, Q = inputs
        window_size_queries = Q.get_shape()[1]  # window size of queries
        window_size_keys    = K.get_shape()[1]  # window size of keys

        mask_vals = np.triu(np.ones((window_size_queries, window_size_keys)) * np.NINF, k=1)
        mask = tf.convert_to_tensor(value=mask_vals, dtype=tf.float32)
        atten_mask = tf.tile(tf.reshape(mask, [-1, window_size_queries, window_size_keys]), [tf.shape(input=K)[0], 1, 1])

        attention = tf.matmul(Q, K, transpose_b=True) / tf.sqrt(tf.cast(window_size_keys, tf.float32))
        if self.use_mask:
            attention = atten_mask + attention
        attention = tf.nn.softmax(attention)
        self.scores = attention
        return attention


class AttentionHead(tf.keras.layers.Layer):
    def __init__(self, input_size, output_size, is_self_attention, **kwargs):
        super(AttentionHead, self).__init__(**kwargs)
        self.use_mask = is_self_attention

        self.K = self.add_weight("W_K", (input_size, output_size))
        self.Q = self.add_weight("W_Q", (input_size, output_size))
        self.V = self.add_weight("W_V", (input_size, output_size))

        self.attn_mtx = AttentionMatrix(use_mask = self.use_mask)

    @tf.function
    def call(self, inputs_for_keys, inputs_for_values, inputs_for_queries):
        K_val = tf.tensordot(inputs_for_keys, self.K, 1)
        V_val = tf.tensordot(inputs_for_values, self.V, 1)
        Q_val = tf.tensordot(inputs_for_queries, self.Q, 1)

        V_attention = self.attn_mtx((K_val, Q_val)) @ V_val

        return V_attention

class TransformerBlock(tf.keras.layers.Layer):
    def __init__(self, emb_sz, **kwargs):
        super(TransformerBlock, self).__init__(**kwargs)

        self.sequential = tf.keras.Sequential([
            tf.keras.layers.Dense(2048, activation='relu'), 
            tf.keras.layers.Dense(emb_sz)])
        
        self.self_atten         = AttentionHead(emb_sz, emb_sz, True)
        self.self_context_atten = AttentionHead(emb_sz, emb_sz, False)
        self.layer_norm = tf.keras.layers.LayerNormalization(axis = -1)

    @tf.function
    def call(self, inputs, context_sequence):
        z = self.self_atten(inputs, inputs, inputs)
        z_norm = self.layer_norm(inputs + z)

        context = self.self_context_atten(context_sequence, context_sequence, z_norm)
        context_norm = self.layer_norm(z_norm + context)

        output = self.sequential(context_norm)
        output = self.layer_norm(context_norm + output)
        output = tf.nn.relu(output)
        #print(output.shape)
        return output


def positional_encoding(length, depth):
    depth = depth/2
    positions = np.arange(length)[:, np.newaxis]    # (seq, 1)
    depths = np.arange(depth)[np.newaxis, :]/depth  # (1, depth)
    angle_rates = 1 / (10000**depths)               # (1, depth)
    angle_rads = positions * angle_rates            # (pos, depth)
    pos_encoding = np.concatenate([np.sin(angle_rads), np.cos(angle_rads)], axis=-1) 
    return tf.cast(pos_encoding, dtype=tf.float32)


class PositionalEncoding(tf.keras.layers.Layer):
    def __init__(self, vocab_size, embed_size, window_size):
        super().__init__()
        self.embed_size = embed_size

        self.embedding = tf.keras.layers.Embedding(vocab_size, embed_size)

        self.pos_encoding = positional_encoding(length=window_size, depth=embed_size)

    def call(self, x):
        length = tf.shape(x)[1]
        x = self.embedding(x)
        x *= tf.math.sqrt(tf.cast(self.embed_size, tf.float32))
        x = x + self.pos_encoding[tf.newaxis, :length, :]
        return x

class TransformerDecoder(tf.keras.Model):

    def __init__(self, vocab_size, hidden_size, window_size, **kwargs):

        super().__init__(**kwargs)
        self.vocab_size  = vocab_size
        self.hidden_size = hidden_size
        self.window_size = window_size

        # TODO: Define image and positional encoding, transformer decoder, and classification layers

        # Define feed forward layer to embed image features into a vector 
        self.image_embedding = tf.keras.Sequential([
            tf.keras.layers.Dense(1024, activation='relu'), 
            tf.keras.layers.Dense(self.hidden_size)])

        # Define positional encoding to embed and offset layer for language:
        self.encoding = PositionalEncoding(self.vocab_size, self.hidden_size, self.window_size)

        # Define transformer decoder layer:
        self.decoder = TransformerBlock(self.hidden_size)

        # Define classification layer (logits)
        self.classifier = tf.keras.layers.Dense(self.vocab_size)

    def call(self, encoded_images, captions):
        # TODO:
        # 1) Embed the encoded images into a vector (HINT IN NOTEBOOK)
        # 2) Pass the captions through your positional encoding layer
        # 3) Pass the english embeddings and the image sequences to the decoder
        # 4) Apply dense layer(s) to the decoder out to generate logits
        image_embedding = self.image_embedding(tf.expand_dims(encoded_images, axis = 1))
        sentence_encoding = self.encoding(captions)
        logits = self.decoder(sentence_encoding, image_embedding)
        probs = self.classifier(logits)
        return probs

In [16]:
import numpy as np
import tensorflow as tf

class MusicModel(tf.keras.Model):

    def __init__(self, decoder, **kwargs):
        super().__init__(**kwargs)
        self.decoder = decoder

    @tf.function
    def call(self, encoded_images, captions):
        return self.decoder(encoded_images, captions)  

    def compile(self, optimizer, loss, metrics):
        self.optimizer = optimizer
        self.loss_function = loss 
        self.accuracy_function = metrics[0]

    def train(self, train_captions, train_image_features, padding_index, batch_size=30):
        indeces = tf.random.shuffle(range(len(train_captions)))
        train_captions = tf.gather(train_captions, indeces)
        train_image_features = tf.gather(train_image_features, indeces)
        avg_loss = 0
        avg_acc = 0
        avg_prp = 0    
        total_loss = 0  
        total_seen = 0
        total_correct = 0

        for end in range(batch_size, len(train_captions)+1, batch_size):
            start = end - batch_size
            batch_image_features = train_image_features[start:end]
            decoder_input = train_captions[start:end, :-1]
            decoder_labels = train_captions[start:end, 1:]
            with tf.GradientTape() as tape:
                ## Perform a training forward pass. Make sure to factor out irrelevant labels.
                
                probs = self(batch_image_features, decoder_input)
                
                mask = decoder_labels != padding_index
                num_predictions = tf.reduce_sum(tf.cast(mask, tf.float32))
                loss = self.loss_function(probs, decoder_labels, tf.cast(mask, tf.float32))

            gradient = tape.gradient(loss, self.trainable_variables)
            self.optimizer.apply_gradients(zip(gradient, self.trainable_variables))

            accuracy = self.accuracy_function(probs, decoder_labels, mask)
            ## Compute and report on aggregated statistics
            total_loss += loss
            total_seen += num_predictions
            total_correct += num_predictions * accuracy

        avg_loss = float(total_loss / total_seen)
        avg_acc = float(total_correct / total_seen)
        avg_prp = np.exp(avg_loss)
        print(f"\r training:\t loss={avg_loss:.3f}\t acc: {avg_acc:.3f}\t perp: {avg_prp:.3f}", end='')
        return avg_loss, avg_acc, avg_prp

    def test(self, test_captions, test_image_features, padding_index, batch_size=30):
        num_batches = int(len(test_captions) / batch_size)

        total_loss = total_seen = total_correct = 0
        for index, end in enumerate(range(batch_size, len(test_captions)+1, batch_size)):
            start = end - batch_size
            batch_image_features = test_image_features[start:end, :]
            decoder_input = test_captions[start:end, :-1]
            decoder_labels = test_captions[start:end, 1:]

            ## Perform a no-training forward pass. Make sure to factor out irrelevant labels.
            probs = self(batch_image_features, decoder_input)
            mask = decoder_labels != padding_index
            num_predictions = tf.reduce_sum(tf.cast(mask, tf.float32))
            loss = self.loss_function(probs, decoder_labels, mask)
            accuracy = self.accuracy_function(probs, decoder_labels, mask)

            ## Compute and report on aggregated statistics
            total_loss += loss
            total_seen += num_predictions
            total_correct += num_predictions * accuracy

            avg_loss = float(total_loss / total_seen)
            avg_acc = float(total_correct / total_seen)
            avg_prp = np.exp(avg_loss)
            print(f"\r[Valid {index+1}/{num_batches}]\t loss={avg_loss:.3f}\t acc: {avg_acc:.3f}\t perp: {avg_prp:.3f}", end='')

        print()        
        return avg_prp, avg_acc


def accuracy_function(prbs, labels, mask):
    correct_classes = tf.argmax(prbs, axis=-1) == labels
    accuracy = tf.reduce_mean(tf.boolean_mask(tf.cast(correct_classes, tf.float32), mask))
    return accuracy


def loss_function(prbs, labels, mask):
    masked_labs = tf.boolean_mask(labels, mask)
    masked_prbs = tf.boolean_mask(prbs, mask)
    scce = tf.keras.losses.sparse_categorical_crossentropy(masked_labs, masked_prbs, from_logits=True)
    loss = tf.reduce_sum(scce)
    return loss

In [9]:
import preprocess_midi as prep
import numpy as np
import tensorflow as tf

In [10]:
data, labels = prep.preprocessing('/Users/zyl/Desktop/CS1470/FinalProject/data/musicnet_midis')
print(np.array(data).shape)
print(np.array(labels).shape)

(67385, 60, 3)
(67385, 3)


In [13]:
data = data[:,:,1]
print(np.array(data).shape)

In [12]:
def train_model(model, vocab, initial_state, window_size, epochs, batch_size):
    '''Trains model and returns model statistics'''
    stats = []
    try:
        for epoch in range(epochs):
            stats += [model.train(vocab_size, hidden_size, window_size, batch_size=batch_size)]
    except KeyboardInterrupt as e:
        if epoch > 1:
            print("Key-value interruption. Trying to early-terminate. Interrupt again to not do that!")
        else: 
            raise e
        
    return stats


def test_model(model, captions, img_feats, pad_idx, args):
    '''Tests model and returns model statistics'''
    perplexity, accuracy = model.test(captions, img_feats, pad_idx, batch_size=args.batch_size)
    return perplexity, accuracy

In [17]:
transformer_model = TransformerDecoder(128, 128, 60)
music_model = MusicModel(transformer_model)
train_model(music_model, data, data[0], 60, 10, 64)

InvalidArgumentError: indices[50544] = 28370 is not in [0, 60) [Op:GatherV2]