In [None]:
import numpy as np
import matplotlib.pyplot as plt
import os
from tqdm import tqdm
import pickle
import math
import random
from numba import cuda

import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader

import tensorflow as tf
from tensorflow.keras.preprocessing.text import Tokenizer
from tensorflow.keras.preprocessing.sequence import pad_sequences

from nltk.translate.bleu_score import sentence_bleu, corpus_bleu, SmoothingFunction
from nltk.metrics.scores import (precision, recall)

from torchmetrics.text.rouge import ROUGEScore
from torchmetrics.text.bert import BERTScore
from nltk.sentiment.vader import SentimentIntensityAnalyzer

## Preprocessing, load tokenizers, set seed, initialize parameters

In [None]:
device = cuda.get_current_device()
device.reset()

print(torch.cuda.is_available())
torch.cuda.empty_cache()

seed = 9248
# os.environ['CUDA_LAUNCH_BLOCKING'] = "1"
# torch.backends.cudnn.benchmark = True
# torch.backends.cudnn.deterministic = True
torch.backends.cudnn.enabled = False
torch.manual_seed(seed)
tf.random.set_seed(seed)
np.random.seed(seed)
random.seed(seed)

In [None]:
data_matrix = np.load("data/sequences.npy")
print(data_matrix.shape)

lyrics = data_matrix[:, :, 0]
notes = data_matrix[:, :, 1]
durations = data_matrix[:, :, 2]
rests = data_matrix[:, :, 3]

print(lyrics.shape, notes.shape, durations.shape, rests.shape)

In [None]:
def load_tokenizer(file):
    with open(file, 'rb') as f:
        data = pickle.load(f)
        tokenizer = data['tokenizer']
    return tokenizer

In [None]:
tokenizer_lyr = load_tokenizer("tokenizers/tokenizer_lyr.pkl")
tokenizer_note = load_tokenizer("tokenizers/tokenizer_note.pkl")
tokenizer_duration = load_tokenizer("tokenizers/tokenizer_duration.pkl")
tokenizer_rest = load_tokenizer("tokenizers/tokenizer_rest.pkl")

In [None]:
num_songs = len(lyrics)
num_samples = 6848
train_inds = np.random.choice(np.arange(num_songs), size=num_samples, replace=False)
test_inds = np.delete(np.arange(num_songs), train_inds)

train_lyrics = [lyrics[i] for i in train_inds]
test_lyrics = [lyrics[i] for i in test_inds]

train_notes = [notes[i] for i in train_inds]
test_notes = [notes[i] for i in test_inds]

train_durations = [durations[i] for i in train_inds]
test_durations = [durations[i] for i in test_inds]

train_rests = [rests[i] for i in train_inds]
test_rests = [rests[i] for i in test_inds]

In [None]:
# General params
vocab_size = min(len(tokenizer_lyr.word_index) + 1, 10000)
notes_size = len(tokenizer_note.word_index) + 1
durations_size = len(tokenizer_duration.word_index) + 1
rests_size = len(tokenizer_rest.word_index) + 1

pad_id_lyr = tokenizer_lyr.word_index["eos"]
start_id_lyr = tokenizer_lyr.word_index["bos"]

pad_id_note = tokenizer_note.word_index["eos"]
start_id_note = tokenizer_note.word_index["bos"]

pad_id_duration = tokenizer_duration.word_index["eos"]
start_id_duration = tokenizer_duration.word_index["bos"]

pad_id_rest = tokenizer_rest.word_index["eos"]
start_id_rest = tokenizer_rest.word_index["bos"]

max_seq_len = lyrics.shape[1]
batch_size = 16

chencherry = SmoothingFunction()

# Generator params
g_dropout = 0.3
g_embed_dim = 32
g_hidden = 32
pretrain_epochs_gen = 120
g_lr_pretrain = 0.01
g_lr_adv = 1e-4

# Discriminator params
d_embed_dim = 64
dis_filter_sizes = [2, 3, 4, 5]
dis_num_filters = [300, 300, 300, 300]
d_dropout_prob = 0.2
d_lr_pre = 1e-3
d_lr_adv = 1e-4
num_rep = 64

# relational memory params
mem_slots = 1
num_heads = 2
head_size = 256

clip_norm = 5.0

adversarial_epochs = 10000

## Initialize Model and training

In [None]:
def truncated_normal_(tensor, mean=0, std=1):
    """
    Implemented by @ruotianluo
    See https://discuss.pytorch.org/t/implementing-truncated-normal-initializer/4778/15
    """
    size = tensor.shape
    tmp = tensor.new_empty(size + (4,)).normal_()
    valid = (tmp < 2) & (tmp > -2)
    ind = valid.max(-1, keepdim=True)[1]
    tensor.data.copy_(tmp.gather(-1, ind).squeeze(-1))
    tensor.data.mul_(std).add_(mean)
    return tensor

In [None]:
class RelationalMemory(nn.Module):
    """
    Constructs a `RelationalMemory` object.
    This class is same as the RMC from relational_rnn_models.py, but without language modeling-specific variables.
    Args:
      mem_slots: The total number of memory slots to use.
      head_size: The size of an attention head.
      input_size: The size of input per step. i.e. the dimension of each input vector
      num_heads: The number of attention heads to use. Defaults to 1.
      num_blocks: Number of times to compute attention per time step. Defaults
        to 1.
      forget_bias: Bias to use for the forget gate, assuming we are using
        some form of gating. Defaults to 1.
      input_bias: Bias to use for the input gate, assuming we are using
        some form of gating. Defaults to 0.
      gate_style: Whether to use per-element gating ('unit'),
        per-memory slot gating ('memory'), or no gating at all (None).
        Defaults to `unit`.
      attention_mlp_layers: Number of layers to use in the post-attention
        MLP. Defaults to 2.
      key_size: Size of vector to use for key & query vectors in the attention
        computation. Defaults to None, in which case we use `head_size`.
      # NEW flag for this class
      return_all_outputs: Whether the model returns outputs for each step (like seq2seq) or only the final output.
    Raises:
      ValueError: gate_style not one of [None, 'memory', 'unit'].
      ValueError: num_blocks is < 1.
      ValueError: attention_mlp_layers is < 1.
    """

    def __init__(self, mem_slots, head_size, input_size, num_heads=1, num_blocks=1, forget_bias=1., input_bias=0.,
                 gate_style='unit', attention_mlp_layers=2, key_size=None, return_all_outputs=False):
        super(RelationalMemory, self).__init__()

        ########## generic parameters for RMC ##########
        self.mem_slots = mem_slots
        self.head_size = head_size
        self.num_heads = num_heads
        self.mem_size = self.head_size * self.num_heads

        # a new fixed params needed for pytorch port of RMC
        # +1 is the concatenated input per time step : we do self-attention with the concatenated memory & input
        # so if the mem_slots = 1, this value is 2
        self.mem_slots_plus_input = self.mem_slots + 1

        if num_blocks < 1:
            raise ValueError('num_blocks must be >=1. Got: {}.'.format(num_blocks))
        self.num_blocks = num_blocks

        if gate_style not in ['unit', 'memory', None]:
            raise ValueError(
                'gate_style must be one of [\'unit\', \'memory\', None]. got: '
                '{}.'.format(gate_style))
        self.gate_style = gate_style

        if attention_mlp_layers < 1:
            raise ValueError('attention_mlp_layers must be >= 1. Got: {}.'.format(
                attention_mlp_layers))
        self.attention_mlp_layers = attention_mlp_layers

        self.key_size = key_size if key_size else self.head_size

        ########## parameters for multihead attention ##########
        # value_size is same as head_size
        self.value_size = self.head_size
        # total size for query-key-value
        self.qkv_size = 2 * self.key_size + self.value_size
        self.total_qkv_size = self.qkv_size * self.num_heads  # denoted as F

        # each head has qkv_sized linear projector
        # just using one big param is more efficient, rather than this line
        # self.qkv_projector = [nn.Parameter(torch.randn((self.qkv_size, self.qkv_size))) for _ in range(self.num_heads)]
        self.qkv_projector = nn.Linear(self.mem_size, self.total_qkv_size)
        self.qkv_layernorm = nn.LayerNorm([self.mem_slots_plus_input, self.total_qkv_size])

        # used for attend_over_memory function
        self.attention_mlp = nn.ModuleList([nn.Linear(self.mem_size, self.mem_size)] * self.attention_mlp_layers)
        self.attended_memory_layernorm = nn.LayerNorm([self.mem_slots_plus_input, self.mem_size])
        self.attended_memory_layernorm2 = nn.LayerNorm([self.mem_slots_plus_input, self.mem_size])

        ########## parameters for initial embedded input projection ##########
        self.input_size = input_size
        self.input_projector = nn.Linear(self.input_size, self.mem_size)

        ########## parameters for gating ##########
        self.num_gates = 2 * self.calculate_gate_size()
        self.input_gate_projector = nn.Linear(self.mem_size, self.num_gates)
        self.memory_gate_projector = nn.Linear(self.mem_size, self.num_gates)
        # trainable scalar gate bias tensors
        self.forget_bias = nn.Parameter(torch.tensor(forget_bias, dtype=torch.float32))
        self.input_bias = nn.Parameter(torch.tensor(input_bias, dtype=torch.float32))

        ########## number of outputs returned #####
        self.return_all_outputs = return_all_outputs

    def repackage_hidden(self, h):
        """Wraps hidden states in new Tensors, to detach them from their history."""
        # needed for truncated BPTT, called at every batch forward pass
        if isinstance(h, torch.Tensor):
            return h.detach()
        else:
            return tuple(self.repackage_hidden(v) for v in h)

    def initial_state(self, batch_size, trainable=False):
        """
        Creates the initial memory.
        We should ensure each row of the memory is initialized to be unique,
        so initialize the matrix to be the identity. We then pad or truncate
        as necessary so that init_state is of size
        (batch_size, self.mem_slots, self.mem_size).
        Args:
          batch_size: The size of the batch.
          trainable: Whether the initial state is trainable. This is always True.
        Returns:
          init_state: A truncated or padded matrix of size
            (batch_size, self.mem_slots, self.mem_size).
        """
        init_state = torch.stack([torch.eye(self.mem_slots) for _ in range(batch_size)])

        # pad the matrix with zeros
        if self.mem_size > self.mem_slots:
            difference = self.mem_size - self.mem_slots
            pad = torch.zeros((batch_size, self.mem_slots, difference))
            init_state = torch.cat([init_state, pad], -1)

        # truncation. take the first 'self.mem_size' components
        elif self.mem_size < self.mem_slots:
            init_state = init_state[:, :, :self.mem_size]

        return init_state

    def multihead_attention(self, memory):
        """
        Perform multi-head attention from 'Attention is All You Need'.
        Implementation of the attention mechanism from
        https://arxiv.org/abs/1706.03762.
        Args:
          memory: Memory tensor to perform attention on.
        Returns:
          new_memory: New memory tensor.
        """

        # First, a simple linear projection is used to construct queries
        qkv = self.qkv_projector(memory)
        # apply layernorm for every dim except the batch dim
        qkv = self.qkv_layernorm(qkv)

        # mem_slots needs to be dynamically computed since mem_slots got concatenated with inputs
        # example: self.mem_slots=10 and seq_length is 3, and then mem_slots is 10 + 1 = 11 for each 3 step forward pass
        # this is the same as self.mem_slots_plus_input, but defined to keep the sonnet implementation code style
        mem_slots = memory.shape[1]  # denoted as N

        # split the qkv to multiple heads H
        # [B, N, F] => [B, N, H, F/H]
        qkv_reshape = qkv.view(qkv.shape[0], mem_slots, self.num_heads, self.qkv_size)

        # [B, N, H, F/H] => [B, H, N, F/H]
        qkv_transpose = qkv_reshape.permute(0, 2, 1, 3)

        # [B, H, N, key_size], [B, H, N, key_size], [B, H, N, value_size]
        q, k, v = torch.split(qkv_transpose, [self.key_size, self.key_size, self.value_size], -1)

        # scale q with d_k, the dimensionality of the key vectors
        q *= (self.key_size ** -0.5)

        # make it [B, H, N, N]
        dot_product = torch.matmul(q, k.permute(0, 1, 3, 2))
        weights = F.softmax(dot_product, dim=-1)

        # output is [B, H, N, V]
        output = torch.matmul(weights, v)

        # [B, H, N, V] => [B, N, H, V] => [B, N, H*V]
        output_transpose = output.permute(0, 2, 1, 3).contiguous()
        new_memory = output_transpose.view((output_transpose.shape[0], output_transpose.shape[1], -1))

        return new_memory

    @property
    def state_size(self):
        return [self.mem_slots, self.mem_size]

    @property
    def output_size(self):
        return self.mem_slots * self.mem_size

    def calculate_gate_size(self):
        """
        Calculate the gate size from the gate_style.
        Returns:
          The per sample, per head parameter size of each gate.
        """
        if self.gate_style == 'unit':
            return self.mem_size
        elif self.gate_style == 'memory':
            return 1
        else:  # self.gate_style == None
            return 0

    def create_gates(self, inputs, memory):
        """
        Create input and forget gates for this step using `inputs` and `memory`.
        Args:
          inputs: Tensor input.
          memory: The current state of memory.
        Returns:
          input_gate: A LSTM-like insert gate.
          forget_gate: A LSTM-like forget gate.
        """
        # We'll create the input and forget gates at once. Hence, calculate double
        # the gate size.

        # equation 8: since there is no output gate, h is just a tanh'ed m
        memory = torch.tanh(memory)

        # sonnet uses this, but i think it assumes time step of 1 for all cases
        # if inputs is (B, T, features) where T > 1, this gets incorrect
        # inputs = inputs.view(inputs.shape[0], -1)

        # fixed implementation
        if len(inputs.shape) == 3:
            if inputs.shape[1] > 1:
                raise ValueError(
                    "input seq length is larger than 1. create_gate function is meant to be called for each step, with input seq length of 1")
            inputs = inputs.view(inputs.shape[0], -1)
            # matmul for equation 4 and 5
            # there is no output gate, so equation 6 is not implemented
            gate_inputs = self.input_gate_projector(inputs)
            gate_inputs = gate_inputs.unsqueeze(dim=1)
            gate_memory = self.memory_gate_projector(memory)
        else:
            raise ValueError("input shape of create_gate function is 2, expects 3")

        # this completes the equation 4 and 5
        gates = gate_memory + gate_inputs
        gates = torch.split(gates, split_size_or_sections=int(gates.shape[2] / 2), dim=2)
        input_gate, forget_gate = gates
        assert input_gate.shape[2] == forget_gate.shape[2]

        # to be used for equation 7
        input_gate = torch.sigmoid(input_gate + self.input_bias)
        forget_gate = torch.sigmoid(forget_gate + self.forget_bias)

        return input_gate, forget_gate

    def attend_over_memory(self, memory):
        """
        Perform multiheaded attention over `memory`.
            Args:
              memory: Current relational memory.
            Returns:
              The attended-over memory.
        """
        for _ in range(self.num_blocks):
            attended_memory = self.multihead_attention(memory)

            # Add a skip connection to the multiheaded attention's input.
            memory = self.attended_memory_layernorm(memory + attended_memory)

            # add a skip connection to the attention_mlp's input.
            attention_mlp = memory
            for i, l in enumerate(self.attention_mlp):
                attention_mlp = self.attention_mlp[i](attention_mlp)
                attention_mlp = F.relu(attention_mlp)
            memory = self.attended_memory_layernorm2(memory + attention_mlp)

        return memory

    def forward_step(self, inputs, memory, treat_input_as_matrix=False):
        """
        Forward step of the relational memory core.
        Args:
          inputs: Tensor input.
          memory: Memory output from the previous time step.
          treat_input_as_matrix: Optional, whether to treat `input` as a sequence
            of matrices. Default to False, in which case the input is flattened
            into a vector.
        Returns:
          output: This time step's output.
          next_memory: The next version of memory to use.
        """

        if treat_input_as_matrix:
            # keep (Batch, Seq, ...) dim (0, 1), flatten starting from dim 2
            inputs = inputs.view(inputs.shape[0], inputs.shape[1], -1)
            # apply linear layer for dim 2
            inputs_reshape = self.input_projector(inputs)
        else:
            # keep (Batch, ...) dim (0), flatten starting from dim 1
            inputs = inputs.view(inputs.shape[0], -1)
            # apply linear layer for dim 1
            inputs = self.input_projector(inputs)
            # unsqueeze the time step to dim 1
            inputs_reshape = inputs.unsqueeze(dim=1)

        memory_plus_input = torch.cat([memory, inputs_reshape], dim=1)
        next_memory = self.attend_over_memory(memory_plus_input)

        # cut out the concatenated input vectors from the original memory slots
        n = inputs_reshape.shape[1]
        next_memory = next_memory[:, :-n, :]

        if self.gate_style == 'unit' or self.gate_style == 'memory':
            # these gates are sigmoid-applied ones for equation 7
            input_gate, forget_gate = self.create_gates(inputs_reshape, memory)
            # equation 7 calculation
            next_memory = input_gate * torch.tanh(next_memory)
            next_memory += forget_gate * memory

        output = next_memory.view(next_memory.shape[0], -1)

        return output, next_memory

    def forward(self, inputs, memory, treat_input_as_matrix=False):
        # Starting each batch, we detach the hidden state from how it was previously produced.
        # If we didn't, the model would try backpropagating all the way to start of the dataset.

        # for loop implementation of (entire) recurrent forward pass of the model
        # inputs is batch first [batch, seq], and output logit per step is [batch, vocab]
        # so the concatenated logits are [seq * batch, vocab]

        # targets are flattened [seq, batch] => [seq * batch], so the dimension is correct

        # memory = self.repackage_hidden(memory)
        logit = 0
        logits = []
        # shape[1] is seq_lenth T
        for idx_step in range(inputs.shape[1]):
            logit, memory = self.forward_step(inputs[:, idx_step], memory)
            logits.append(logit.unsqueeze(1))
        logits = torch.cat(logits, dim=1)

        if self.return_all_outputs:
            return logits, memory
        else:
            return logit.unsqueeze(1), memory

In [None]:
class LSTMGenerator(nn.Module):

    def __init__(self, embedding_dim, hidden_dim, vocab_size, notes_size, durations_size, rests_size,
                 max_seq_len, pad_id_lyr, pad_id_note, pad_id_duration, pad_id_rest, gpu=True):
        super(LSTMGenerator, self).__init__()
        self.name = 'vanilla'

        self.hidden_dim = hidden_dim
        self.embedding_dim = embedding_dim
        self.max_seq_len = max_seq_len
        self.vocab_size = vocab_size
        self.pad_id_lyr = pad_id_lyr
        self.pad_id_note = pad_id_note
        self.pad_id_duration = pad_id_duration
        self.pad_id_rest = pad_id_rest
        self.gpu = gpu

        self.temperature = 1.0

        self.embeddings_lyr = nn.Embedding(vocab_size, embedding_dim, padding_idx=pad_id_lyr)
        self.embeddings_notes = nn.Embedding(notes_size, embedding_dim, padding_idx=pad_id_note)
        self.embeddings_durations = nn.Embedding(durations_size, embedding_dim, padding_idx=pad_id_duration)
        self.embeddings_rests = nn.Embedding(rests_size, embedding_dim, padding_idx=pad_id_rest)
        self.lstm = nn.LSTM(embedding_dim*4, hidden_dim, batch_first=True)
        self.lstm2out = nn.Linear(hidden_dim, vocab_size)
        self.softmax = nn.LogSoftmax(dim=-1)

        self.init_params()

    def forward(self, inp, mel, hidden, need_hidden=False):
        """
        Embeds input and applies LSTM
        :param inp: batch_size * seq_len
        :param hidden: (h, c)
        :param need_hidden: if return hidden, use for sampling
        """
        if len(mel.shape) == 2:
            notes = mel[:, 0]
            durations = mel[:, 1]
            rests = mel[:, 2]
        else:
            notes = mel[:, :, 0]
            durations = mel[:, :, 1]
            rests = mel[:, :, 2]

        emb_lyr = self.embeddings_lyr(inp)
        emb_note = self.embeddings_notes(notes)
        emb_dur = self.embeddings_durations(durations)
        emb_rest = self.embeddings_rests(rests)

        emb = torch.cat([emb_lyr, emb_note, emb_dur, emb_rest], dim=2)

        if len(inp.size()) == 1:
            emb = emb.unsqueeze(1)  # batch_size * 1 * embedding_dim

        out, hidden = self.lstm(emb, hidden)  # out: batch_size * seq_len * hidden_dim
        out = out.contiguous().view(-1, self.hidden_dim)  # out: (batch_size * len) * hidden_dim
        out = self.lstm2out(out)  # (batch_size * seq_len) * vocab_size
        # out = self.temperature * out  # temperature
        pred = self.softmax(out)

        if need_hidden:
            return pred, hidden
        else:
            return pred

    def sample(self, melody, num_samples, batch_size, start_letter):
        """
        Samples the network and returns num_samples samples of length max_seq_len.
        :return samples: num_samples * max_seq_length (a sampled sequence in each row)
        """
        num_batch = num_samples // batch_size + 1 if num_samples != batch_size else 1
        samples = torch.zeros(num_batch * batch_size, self.max_seq_len).long()

        # Generate sentences with multinomial sampling strategy
        for b in range(num_batch):
            hidden = self.init_hidden(batch_size)
            inp = torch.LongTensor([start_letter] * batch_size)
            if self.gpu:
                inp = inp.cuda()

            for i in range(self.max_seq_len):
                y = melody[:, i]
                if self.gpu:
                    y = y.cuda()
                out, hidden = self.forward(inp, y, hidden, need_hidden=True)  # out: batch_size * vocab_size
                next_token = torch.multinomial(torch.exp(out), 1)  # batch_size * 1 (sampling from each row)
                samples[b * batch_size:(b + 1) * batch_size, i] = next_token.view(-1)
                inp = next_token.view(-1)
        samples = samples[:num_samples]

        return samples

    def init_params(self):
        for param in self.parameters():
            if param.requires_grad and len(param.shape) > 0:
                stddev = 1 / math.sqrt(param.shape[0])
                truncated_normal_(param, std=stddev)

    def init_hidden(self, batch_size=batch_size):
        h = torch.zeros(1, batch_size, self.hidden_dim)
        c = torch.zeros(1, batch_size, self.hidden_dim)

        if self.gpu:
            return h.cuda(), c.cuda()
        else:
            return h, c

In [None]:
class RelGAN_G(LSTMGenerator):
    def __init__(self, mem_slots, num_heads, head_size, embedding_dim, hidden_dim,
                 vocab_size, notes_size, durations_size, rests_size, max_seq_len,
                 pad_id_lyr, pad_id_note, pad_id_duration, pad_id_rest, gpu=True, model_type="RMC"):
        super(RelGAN_G, self).__init__(embedding_dim, hidden_dim, vocab_size, notes_size,
                                       durations_size, rests_size, max_seq_len, pad_id_lyr, 
                                       pad_id_note, pad_id_duration, pad_id_rest, gpu)
        self.name = 'relgan'
        self.temperature = 1000.0  # max value
        self.model_type = model_type
        self.embeddings_lyr = nn.Embedding(vocab_size, embedding_dim, padding_idx=pad_id_lyr)
        self.embeddings_notes = nn.Embedding(notes_size, embedding_dim, padding_idx=pad_id_note)
        self.embeddings_durations = nn.Embedding(durations_size, embedding_dim, padding_idx=pad_id_duration)
        self.embeddings_rests = nn.Embedding(rests_size, embedding_dim, padding_idx=pad_id_rest)
        if model_type == 'LSTM':
            # LSTM
            self.hidden_dim = hidden_dim
            self.lstm = nn.LSTM(embedding_dim*4, self.hidden_dim, batch_first=True)
            self.lstm2out = nn.Linear(self.hidden_dim, vocab_size)
        else:
            # RMC
            self.hidden_dim = mem_slots * num_heads * head_size
            self.lstm = RelationalMemory(mem_slots=mem_slots, head_size=head_size, input_size=embedding_dim*4,
                                         num_heads=num_heads, return_all_outputs=True)
            self.lstm2out = nn.Linear(self.hidden_dim, vocab_size)

        self.init_params()
        pass

    def init_hidden(self, batch_size=32):
        if self.model_type == 'LSTM':
            h = torch.zeros(1, batch_size, self.hidden_dim)
            c = torch.zeros(1, batch_size, self.hidden_dim)

            if self.gpu:
                return h.cuda(), c.cuda()
            else:
                return h, c
        else:
            """init RMC memory"""
            memory = self.lstm.initial_state(batch_size)
            memory = self.lstm.repackage_hidden(memory)  # detch memory at first
            return memory.cuda() if self.gpu else memory

    def step(self, lyr, mel, hidden):
        """
        RelGAN step forward
        :param inp: [batch_size]
        :param hidden: memory size
        :return: pred, hidden, next_token, next_token_onehot, next_o
            - pred: batch_size * vocab_size, use for adversarial training backward
            - hidden: next hidden
            - next_token: [batch_size], next sentence token
            - next_token_onehot: batch_size * vocab_size, not used yet
            - next_o: batch_size * vocab_size, not used yet
        """
        notes = mel[:, 0]
        durations = mel[:, 1]
        rests = mel[:, 2]

        emb_lyr = self.embeddings_lyr(lyr).unsqueeze(1) # batch_size * len * embedding_dim
        emb_note = self.embeddings_notes(notes).unsqueeze(1)
        emb_dur = self.embeddings_durations(durations).unsqueeze(1)
        emb_rest = self.embeddings_rests(rests).unsqueeze(1)

        emb = torch.cat([emb_lyr, emb_note, emb_dur, emb_rest], dim=2)

        out, hidden = self.lstm(emb, hidden)
        gumbel_t = self.add_gumbel(self.lstm2out(out.squeeze(1)))
        next_token = torch.argmax(gumbel_t, dim=1).detach()
        # next_token_onehot = F.one_hot(next_token, cfg.vocab_size).float()  # not used yet
        next_token_onehot = None

        pred = F.softmax(gumbel_t * self.temperature, dim=-1)  # batch_size * vocab_size
        # next_o = torch.sum(next_token_onehot * pred, dim=1)  # not used yet
        next_o = None

        return pred, hidden, next_token, next_token_onehot, next_o

    def sample(self, melody, num_samples, batch_size, one_hot=False, start_letter=1):
        """
        Sample from RelGAN Generator
        - one_hot: if return pred of RelGAN, used for adversarial training
        :return:
            - all_preds: batch_size * seq_len * vocab_size, only use for a batch
            - samples: all samples
        """
        global all_preds
        num_batch = num_samples // batch_size + 1 if num_samples != batch_size else 1
        samples = torch.zeros(num_batch * batch_size, self.max_seq_len).long()
        if one_hot:
            all_preds = torch.zeros(batch_size, self.max_seq_len, self.vocab_size)
            if self.gpu:
                all_preds = all_preds.cuda()

        for b in range(num_batch):
            hidden = self.init_hidden(batch_size)
            inp = torch.LongTensor([start_letter] * batch_size)
            mel_batch = melody[b * batch_size:(b+1) * batch_size, :]
            if len(mel_batch) < batch_size:
                break
            if self.gpu:
                inp = inp.cuda()
                mel_batch = mel_batch.cuda()

            for i in range(self.max_seq_len):
                y = mel_batch[:, i]
                pred, hidden, next_token, _, _ = self.step(inp, y, hidden)
                samples[b * batch_size:(b + 1) * batch_size, i] = next_token
                if one_hot:
                    all_preds[:, i] = pred
                inp = next_token
        samples = samples[:num_samples]  # num_samples * seq_len

        if one_hot:
            return all_preds  # batch_size * seq_len * vocab_size
        return samples

    @staticmethod
    def add_gumbel(o_t, eps=1e-10, gpu=True):
        """Add o_t by a vector sampled from Gumbel(0,1)"""
        u = torch.zeros(o_t.size())
        if gpu:
            u = u.cuda()

        u.uniform_(0, 1)
        g_t = -torch.log(-torch.log(u + eps) + eps)
        gumbel_t = o_t + g_t
        return gumbel_t

In [None]:
class CNNDiscriminator(nn.Module):
    def __init__(self, embed_dim, vocab_size, filter_sizes, num_filters, padding_idx, gpu=False,
                 dropout=0.2):
        super(CNNDiscriminator, self).__init__()
        self.embedding_dim = embed_dim
        self.vocab_size = vocab_size
        self.padding_idx = padding_idx
        self.feature_dim = sum(num_filters)
        self.gpu = gpu

        self.embeddings = nn.Embedding(vocab_size, embed_dim, padding_idx=padding_idx)
        self.convs = nn.ModuleList([
            nn.Conv2d(1, n, (f, embed_dim)) for (n, f) in zip(num_filters, filter_sizes)
        ])
        self.highway = nn.Linear(self.feature_dim, self.feature_dim)
        self.feature2out = nn.Linear(self.feature_dim, 2)
        self.dropout = nn.Dropout(dropout)

        self.init_params()

    def forward(self, inp):
        """
        Get final predictions of discriminator
        :param inp: batch_size * seq_len
        :return: pred: batch_size * 2
        """
        feature = self.get_feature(inp)
        pred = self.feature2out(self.dropout(feature))

        return pred

    def get_feature(self, inp):
        """
        Get feature vector of given sentences
        :param inp: batch_size * max_seq_len
        :return: batch_size * feature_dim
        """
        emb = self.embeddings(inp).unsqueeze(1)  # batch_size * 1 * max_seq_len * embed_dim
        convs = [F.relu(conv(emb)).squeeze(3) for conv in self.convs]  # [batch_size * num_filter * length]
        pools = [F.max_pool1d(conv, conv.size(2)).squeeze(2) for conv in convs]  # [batch_size * num_filter]
        pred = torch.cat(pools, 1)  # tensor: batch_size * feature_dim
        highway = self.highway(pred)
        pred = torch.sigmoid(highway) * F.relu(highway) + (1. - torch.sigmoid(highway)) * pred  # highway

        return pred

    def init_params(self):
        for param in self.parameters():
            if param.requires_grad and len(param.shape) > 0:
                stddev = 1 / math.sqrt(param.shape[0])
                truncated_normal_(param, std=stddev)

In [None]:
class RelGAN_D(CNNDiscriminator):
    def __init__(self, embed_dim, max_seq_len, num_rep, vocab_size, padding_idx, gpu=True, dropout=0.25):
        super(RelGAN_D, self).__init__(embed_dim, vocab_size, dis_filter_sizes, dis_num_filters, padding_idx,
                                       gpu, dropout)

        self.embed_dim = embed_dim
        self.max_seq_len = max_seq_len
        self.feature_dim = sum(dis_num_filters)
        self.emb_dim_single = int(embed_dim / num_rep)

        self.embeddings = nn.Linear(vocab_size, embed_dim, bias=False)

        self.convs = nn.ModuleList([
            nn.Conv2d(1, n, (f, self.emb_dim_single), stride=(1, self.emb_dim_single)) for (n, f) in
            zip(dis_num_filters, dis_filter_sizes)
        ])

        self.highway = nn.Linear(self.feature_dim, self.feature_dim)
        self.feature2out = nn.Linear(self.feature_dim, 100)
        self.out2logits = nn.Linear(100, 1)
        self.dropout = nn.Dropout(dropout)

        self.init_params()

    def forward(self, inp):
        """
        Get logits of discriminator
        :param inp: batch_size * seq_len * vocab_size
        :return logits: [batch_size * num_rep] (1-D tensor)
        """
        emb = self.embeddings(inp).unsqueeze(1)  # batch_size * 1 * max_seq_len * embed_dim

        cons = [F.relu(conv(emb)) for conv in self.convs]  # [batch_size * num_filter * (seq_len-k_h+1) * num_rep]
        pools = [F.max_pool2d(con, (con.size(2), 1)).squeeze(2) for con in cons]  # [batch_size * num_filter * num_rep]
        pred = torch.cat(pools, 1)
        pred = pred.permute(0, 2, 1).contiguous().view(-1, self.feature_dim)  # (batch_size * num_rep) * feature_dim
        highway = self.highway(pred)
        pred = torch.sigmoid(highway) * F.relu(highway) + (1. - torch.sigmoid(highway)) * pred  # highway

        pred = self.feature2out(self.dropout(pred))
        logits = self.out2logits(pred).squeeze(1)  # [batch_size * num_rep]

        return logits

In [None]:
class GANDataset(Dataset):
    def __init__(self, data):
        self.data = data

    def __getitem__(self, index):
        return self.data[index]

    def __len__(self):
        return len(self.data)

In [None]:
class GenDataIter:
    def __init__(self, samples, melodies, batch_size, max_seq_len, start_letter=1, shuffle=True):
        self.batch_size = batch_size
        self.max_seq_len = max_seq_len
        self.start_letter = start_letter

        self.loader = DataLoader(
            dataset=GANDataset(self.__read_data__(samples, melodies)),
            batch_size=self.batch_size,
            shuffle=shuffle,
            drop_last=True)

        self.input = self._all_data_('lyric')
        self.melody = self._all_data_('melody')
        self.target = self._all_data_('target')

    def __read_data__(self, samples, melodies):
        """
        input: same as target, but start with start_letter.
        """
        # global all_data
        inp, target = self.prepare(samples, melodies, self.start_letter)
        lyr, mel = inp
        all_data = [{'lyric': l, 'melody': m, 'target': t} for (l, m, t) in zip(lyr, mel, target)]
        return all_data

    def random_batch(self):
        """Randomly choose a batch from loader, please note that the data should not be shuffled."""
        idx = random.randint(0, len(self.loader) - 1)
        return list(self.loader)[idx]

    def _all_data_(self, col):
        return torch.cat([data[col].unsqueeze(0) for data in self.loader.dataset.data], 0)

    @staticmethod
    def prepare(samples, melodies, start_id, gpu=False):
        """Add start_letter to samples as inp, target same as samples"""
        inp = torch.zeros(samples.size()).long()
        target = samples
        inp[:, 0] = start_id
        inp[:, 1:] = target[:, :max_seq_len - 1]

        if gpu:
            return (inp.cuda(), melodies.cuda()), target.cuda()
        return (inp, melodies), target

In [None]:
gen = RelGAN_G(mem_slots, num_heads, head_size, g_embed_dim, g_hidden,
               vocab_size, notes_size, durations_size, rests_size, max_seq_len,
               pad_id_lyr, pad_id_note, pad_id_duration, pad_id_rest, gpu=True).cuda()

gen_opt = optim.Adam(gen.parameters(), lr=g_lr_pretrain)
gen_adv_opt = optim.Adam(gen.parameters(), lr=g_lr_adv)
mle_criterion = nn.NLLLoss().cuda()

In [None]:
def optimize(opt, loss, model=None, retain_graph=False):
    opt.zero_grad()
    loss.backward(retain_graph=retain_graph)
    if model is not None:
        torch.nn.utils.clip_grad_norm_(model.parameters(), clip_norm)
    opt.step()

In [None]:
def train_gen_epoch(model, data_loader, criterion, optimizer, cuda=True):
    total_loss = 0
    for data in data_loader:
        lyr, mel, target = data['lyric'], data['melody'], data['target']
        if cuda:
            lyr, mel, target = lyr.cuda(), mel.cuda(), target.cuda()

        hidden = model.init_hidden(data_loader.batch_size)
        pred = model.forward(lyr, mel, hidden)
        loss = criterion(pred, target.view(-1))
        optimize(optimizer, loss, model)
        total_loss += loss.item()
    return total_loss / len(data_loader)

In [None]:
def pretrain_generator(gen, gen_opt, data_loader, criterion, epochs):
    """
    Max Likelihood Pre-training for the generator
    """
    losses = []
    progress = tqdm(range(epochs))
    for epoch in progress:
        pre_loss = train_gen_epoch(gen, data_loader, criterion, gen_opt)
        losses.append(pre_loss)
        progress.set_description("EPOCH: {}, LOSS: {}".format(epoch, pre_loss))
    return losses

In [None]:
sequences_lyrics_train = torch.LongTensor(train_lyrics)
sequences_notes_train = torch.LongTensor(train_notes)
sequences_durations_train = torch.LongTensor(train_durations)
sequences_rests_train = torch.LongTensor(train_rests)

sequences_notes_train = sequences_notes_train.unsqueeze(2)
sequences_durations_train = sequences_durations_train.unsqueeze(2)
sequences_rests_train = sequences_rests_train.unsqueeze(2)

train_melodies = torch.cat([sequences_notes_train, sequences_durations_train, sequences_rests_train], dim=2)

gen_data_iter = GenDataIter(sequences_lyrics_train, train_melodies, batch_size, max_seq_len, start_letter=start_id_lyr)

train_melodies.shape, sequences_lyrics_train.shape

## Pretraining generator and discriminator

In [None]:
losses_pretrain = pretrain_generator(gen, gen_opt, gen_data_iter.loader, mle_criterion, pretrain_epochs_gen)

In [None]:
plt.plot(losses_pretrain)
plt.xlabel("Epoch")
plt.xlabel("Loss")
plt.title("Pretraining losses generator")
plt.savefig("figures/relgan/pretraining_gen_full.png")
plt.show()

In [None]:
def remove_start_and_end(lyrics):
    clean_lyrics = []
    for lyric in lyrics:
        removal_list = ["BOS","EOS", "eos", "bos"]
        lyric_list = lyric.split()
        final_list = [word for word in lyric_list if word not in removal_list]
        final_string = ' '.join(final_list)
        clean_lyrics.append(final_string)

    return clean_lyrics

In [None]:
class DisDataIter:
    def __init__(self, pos_samples, neg_samples):
        self.batch_size = 32
        self.max_seq_len = 20
        self.start_letter = 0

        self.loader = DataLoader(
            dataset=GANDataset(self.__read_data__(pos_samples, neg_samples)),
            batch_size=self.batch_size,
            shuffle=True,
            drop_last=True)

    def __read_data__(self, pos_samples, neg_samples):
        """
        input: same as target, but start with start_letter.
        """
        inp, target = self.prepare(pos_samples, neg_samples)
        all_data = [{'input': i, 'target': t} for (i, t) in zip(inp, target)]
        return all_data

    def random_batch(self):
        idx = random.randint(0, len(self.loader) - 1)
        return list(self.loader)[idx]

    def prepare(self, pos_samples, neg_samples):
        """Build inp and target"""
        inp = torch.cat((pos_samples, neg_samples), dim=0).long().detach()  # !!!need .detach()
        target = torch.ones(inp.size(0)).long()
        target[pos_samples.size(0):] = 0

        # shuffle
        perm = torch.randperm(inp.size(0))
        inp = inp[perm].cuda()
        target = target[perm].cuda()

        return inp, target

In [None]:
dis = RelGAN_D(d_embed_dim, max_seq_len, num_rep, vocab_size, pad_id_lyr, gpu=True).cuda()
dis_opt = optim.Adam(dis.parameters(), lr=d_lr_pre)
dis_opt_adv = optim.Adam(dis.parameters(), lr=d_lr_adv)

In [None]:
def get_losses(d_out_real, d_out_fake, gen_samples=None, real_samples=None, loss_type='JS'):
    """Get different adversarial losses according to given loss_type"""
    bce_loss = nn.BCEWithLogitsLoss()

    if loss_type == 'standard':  # the non-satuating GAN loss
        d_loss_real = bce_loss(d_out_real, torch.ones_like(d_out_real))
        d_loss_fake = bce_loss(d_out_fake, torch.zeros_like(d_out_fake))
        d_loss = d_loss_real + d_loss_fake

        g_loss = bce_loss(d_out_fake, torch.ones_like(d_out_fake))

    elif loss_type == 'JS':  # the vanilla GAN loss
        d_loss_real = bce_loss(d_out_real, torch.ones_like(d_out_real))
        d_loss_fake = bce_loss(d_out_fake, torch.zeros_like(d_out_fake))
        d_loss = d_loss_real + d_loss_fake

        g_loss = -d_loss_fake

    elif loss_type == 'KL':  # the GAN loss implicitly minimizing KL-divergence
        d_loss_real = bce_loss(d_out_real, torch.ones_like(d_out_real))
        d_loss_fake = bce_loss(d_out_fake, torch.zeros_like(d_out_fake))
        d_loss = d_loss_real + d_loss_fake

        g_loss = torch.mean(-d_out_fake)

    elif loss_type == 'hinge':  # the hinge loss
        d_loss_real = torch.mean(nn.ReLU(1.0 - d_out_real))
        d_loss_fake = torch.mean(nn.ReLU(1.0 + d_out_fake))
        d_loss = d_loss_real + d_loss_fake

        g_loss = -torch.mean(d_out_fake)

    elif loss_type == 'tv':  # the total variation distance
        d_loss = torch.mean(nn.Tanh(d_out_fake) - nn.Tanh(d_out_real))
        g_loss = torch.mean(-nn.Tanh(d_out_fake))

    elif loss_type == 'rsgan':  # relativistic standard GAN
        d_loss = bce_loss(d_out_real - d_out_fake, torch.ones_like(d_out_real))
        g_loss = bce_loss(d_out_fake - d_out_real, torch.ones_like(d_out_fake))

    else:
        raise NotImplementedError("Divergence '%s' is not implemented" % loss_type)

    if gen_samples == None or real_samples == None:
        return g_loss, d_loss

    r_s = torch.argmax(real_samples, dim=2)
    r_s = tokenizer_lyr.sequences_to_texts(r_s.cpu().numpy())
    g_s = torch.argmax(gen_samples, dim=2)
    g_s = tokenizer_lyr.sequences_to_texts(g_s.cpu().numpy())

    r_s = remove_start_and_end(r_s)
    g_s = remove_start_and_end(g_s)

    bleus_4, precisions, recalls = [], [], []
    try:
        for test_ref, test_pred in zip(r_s, g_s):
            bleu4 = sentence_bleu(test_ref, test_pred, smoothing_function=chencherry.method1)
            bleus_4.append(bleu4)
            test_ref_set = set(test_ref.split())
            test_pred_set = set(test_pred.split())
            prec = precision(test_ref_set, test_pred_set)
            rec = recall(test_ref_set, test_pred_set)
            prec = 1e-10 if (prec == None) else prec
            rec = 1e-10 if (rec == None) else rec

            precisions.append(prec)
            recalls.append(rec)

        bleus_4 = torch.tensor(bleus_4)
        bleu = torch.mean(bleus_4)
        
        precisions = torch.tensor(precisions)
        precision_mean = torch.mean(precisions)
        
        recalls = torch.tensor(recalls)
        recall_mean = torch.mean(recalls)

        g_loss += -torch.log(bleu) - torch.log(precision_mean) - torch.log(recall_mean)

    except KeyError:
        g_loss += torch.tensor(20.0)
        return g_loss, d_loss
    
    return g_loss, d_loss

In [None]:
def pretrain_discriminator(gen, dis, dis_opt, train_data, epochs, start_letter=1):
    d_losses = []
    progress = tqdm(range(epochs))
    for step in progress:
        rand_batch = train_data.random_batch()
        real_samples = rand_batch['target']
        mels = rand_batch['melody']
        gen_samples = gen.sample(mels, batch_size, batch_size, one_hot=True, start_letter=start_letter)
        real_samples, gen_samples = real_samples.cuda(), gen_samples.cuda()
        real_samples = F.one_hot(real_samples, vocab_size).float()

        # ===Train===
        d_out_real = dis(real_samples)
        d_out_fake = dis(gen_samples)
        _, d_loss = get_losses(d_out_real, d_out_fake, 'JS')
        optimize(dis_opt, d_loss, dis)
        d_losses.append(d_loss.item())
        progress.set_description('d_loss: %.4f' % (d_loss))

    return d_losses

In [None]:
d_losses_pretrain = pretrain_discriminator(gen, dis, dis_opt, gen_data_iter, 100, start_letter=start_id_lyr)

In [None]:
# torch.save(dis, "RelGAN_Discriminator_melody_conditioned_full_dataset_pretrain.pth")
dis = torch.load("RelGAN_Discriminator_melody_conditioned_full_dataset_pretrain.pth")

In [None]:
plt.plot(d_losses_pretrain)
plt.xlabel("Epoch")
plt.xlabel("Loss")
plt.title("Pretraining losses discriminator")
plt.savefig("figures/relgan/pretraining_dis_full.png")
plt.show()

## Adversarial Training

In [None]:
def adv_train_generator(gen, dis, gen_adv_opt, train_data, g_step, vocab_size,
                        batch_size, loss_type="rsgan", start_letter=1):
    total_loss = 0
    for step in range(g_step):
        rand_batch = train_data.random_batch()
        real_samples = rand_batch['target']
        mels = rand_batch['melody']
        gen_samples = gen.sample(mels, batch_size, batch_size, one_hot=True, start_letter=start_letter)
        real_samples, gen_samples = real_samples.cuda(), gen_samples.cuda()
        real_samples = F.one_hot(real_samples, vocab_size).float()

        # ===Train===
        d_out_real = dis(real_samples)
        d_out_fake = dis(gen_samples)
        g_loss, _ = get_losses(d_out_real, d_out_fake, real_samples, gen_samples, loss_type)

        optimize(gen_adv_opt, g_loss, gen)
        total_loss += g_loss.item()

    return total_loss / g_step if g_step != 0 else 0

In [None]:
def adv_train_discriminator(gen, dis, train_data, dis_opt, d_step, batch_size,
                            vocab_size, loss_type="rsgan", start_letter=1):
    total_loss = 0
    for step in range(d_step):
        rand_batch = train_data.random_batch()
        real_samples = rand_batch['target']
        mels = rand_batch['melody']
        gen_samples = gen.sample(mels, batch_size, batch_size, one_hot=True, start_letter=start_letter)
        real_samples, gen_samples = real_samples.cuda(), gen_samples.cuda()
        real_samples = F.one_hot(real_samples, vocab_size).float()

        # ===Train===
        d_out_real = dis(real_samples)
        d_out_fake = dis(gen_samples)
        _, d_loss = get_losses(d_out_real, d_out_fake, loss_type)

        optimize(dis_opt, d_loss, dis)
        total_loss += d_loss.item()

    return total_loss / d_step if d_step != 0 else 0

In [None]:
def get_fixed_temperature(temper, i, N, adapt):
    """A function to set up different temperature control policies"""

    if adapt == 'no':
        temper_var_np = 1.0  # no increase, origin: temper
    elif adapt == 'lin':
        temper_var_np = 1 + i / (N - 1) * (temper - 1)  # linear increase
    elif adapt == 'exp':
        temper_var_np = temper ** (i / N)  # exponential increase
    elif adapt == 'log':
        temper_var_np = 1 + (temper - 1) / np.log(N) * np.log(i + 1)  # logarithm increase
    elif adapt == 'sigmoid':
        temper_var_np = (temper - 1) * 1 / (1 + np.exp((N / 2 - i) * 20 / N)) + 1  # sigmoid increase
    elif adapt == 'quad':
        temper_var_np = (temper - 1) / (N - 1) ** 2 * i ** 2 + 1
    elif adapt == 'sqrt':
        temper_var_np = (temper - 1) / np.sqrt(N - 1) * np.sqrt(i) + 1
    else:
        raise Exception("Unknown adapt type!")

    return temper_var_np

In [None]:
def update_temperature(i, N, temp_adpt="exp"):
    return get_fixed_temperature(1000.0, i, N, temp_adpt)

In [None]:
def adversarial_train(gen, dis, gen_adv_opt, dis_adv_opt, train_data, vocab_size,
                      batch_size, epochs, start_letter=1):
    progress = tqdm(range(epochs))
    g_losses, d_losses = [], []
    for adv_epoch in progress:
        g_loss = adv_train_generator(gen, dis, gen_adv_opt, train_data, 5,
                                     vocab_size, batch_size, loss_type = 'rsgan', start_letter=start_letter)  # Generator
        d_loss = adv_train_discriminator(gen, dis, train_data, dis_adv_opt, 1,
                                         batch_size, vocab_size, loss_type = 'rsgan', start_letter=start_letter)  # Discriminator
        gen.temperature = update_temperature(adv_epoch, epochs)  # update temperature
        g_losses.append(g_loss)
        d_losses.append(d_loss)
        progress.set_description('g_loss: %.4f, d_loss: %.4f, temperature: %.4f' % (g_loss, d_loss, gen.temperature))
    return g_losses, d_losses

In [None]:
adv_g_losses, adv_d_losses = adversarial_train(gen, dis, gen_adv_opt, dis_opt_adv, gen_data_iter,
                                               vocab_size, batch_size, 10000,
                                               start_letter=start_id_lyr)

In [None]:
plt.plot(losses_pretrain)
plt.xlabel("Epoch")
plt.xlabel("Loss")
plt.title("Pretraining losses generator")
plt.savefig("figures/relgan/pretraining_full.png")
plt.show()

In [None]:
plt.plot(adv_g_losses)
plt.xlabel("Epoch")
plt.xlabel("Loss")
plt.title("Adversarial losses generator")
plt.savefig("figures/relgan/adv_gen_full_mod_loss.png")
plt.show()

In [None]:
plt.plot(adv_d_losses)
plt.xlabel("Epoch")
plt.xlabel("Loss")
plt.title("Adversarial losses discriminator")
plt.savefig("figures/relgan/adv_dis_full_mod_loss.png")
plt.show()

## Evaluation

In [None]:
test_is = np.random.choice(test_inds, 80)
test_nos = np.expand_dims(np.array([notes[i] for i in test_is]), axis=2)
test_dus = np.expand_dims(np.array([durations[i] for i in test_is]), axis=2)
test_res = np.expand_dims(np.array([rests[i] for i in test_is]), axis=2)
test_lyr = [lyrics[i] for i in test_is]

test_mels = np.concatenate([test_nos, test_dus, test_res], axis=2)
test_mels = torch.LongTensor(test_mels)
test_mels.shape

In [None]:
samples = gen.sample(test_mels, 80, 32, start_letter=start_id_lyr)
preds = tokenizer_lyr.sequences_to_texts(samples.cpu().detach().numpy())
orig = tokenizer_lyr.sequences_to_texts(test_lyr)

In [None]:
preds = remove_start_and_end(preds)
orig = remove_start_and_end(orig)

## Rouge Scores

In [None]:
rouge = ROUGEScore()

r_f_measure_1, r_precision_1, r_recall_1 = [], [], []
r_f_measure_2, r_precision_2, r_recall_2 = [], [], []
r_f_measure_l, r_precision_l, r_recall_l = [], [], []
for test_ref, test_pred in tqdm(zip(orig, preds)):
    rouge_dict = rouge(test_pred, test_ref)
    rouge1_fmeasure = rouge_dict["rouge1_fmeasure"]
    rouge1_precision = rouge_dict["rouge1_precision"]
    rouge1_recall = rouge_dict["rouge1_recall"]
    rouge2_fmeasure = rouge_dict["rouge2_fmeasure"]
    rouge2_precision = rouge_dict["rouge2_precision"]
    rouge2_recall = rouge_dict["rouge2_recall"]
    rougeL_fmeasure = rouge_dict["rougeL_fmeasure"]
    rougeL_precision = rouge_dict["rougeL_precision"]
    rougeL_recall = rouge_dict["rougeL_recall"]
    
    r_f_measure_1.append(rouge1_fmeasure)
    r_precision_1.append(rouge1_precision)
    r_recall_1.append(rouge1_recall)
    r_f_measure_2.append(rouge2_fmeasure)
    r_precision_2.append(rouge2_precision)
    r_recall_2.append(rouge2_recall)
    r_f_measure_l.append(rougeL_fmeasure)
    r_precision_l.append(rougeL_precision)
    r_recall_l.append(rougeL_recall)
    
print(np.mean(r_f_measure_1), np.mean(r_precision_1), np.mean(r_recall_1))
print(np.mean(r_f_measure_2), np.mean(r_precision_2), np.mean(r_recall_2))
print(np.mean(r_f_measure_l), np.mean(r_precision_l), np.mean(r_recall_l))

## BLEU Scores

In [None]:
bleus_4, bleus_3, bleus_2 = [], [], []
for test_ref, test_pred in tqdm(zip(orig, preds)):
    bleu4 = sentence_bleu(test_ref, test_pred, smoothing_function=chencherry.method7)
    bleu3 = sentence_bleu(test_ref, test_pred, weights=[1/3, 1/3, 1/3], smoothing_function=chencherry.method7)
    bleu2 = sentence_bleu(test_ref, test_pred, weights=[1/2, 1/2], smoothing_function=chencherry.method7)
    bleus_4.append(bleu4)
    bleus_3.append(bleu3)
    bleus_2.append(bleu2)
    
np.mean(bleus_2), np.mean(bleus_3), np.mean(bleus_4)

## BERT Scores

In [None]:
bertscore = BERTScore()
scores = bertscore(orig, preds)
np.mean(scores["precision"]), np.mean(scores["recall"]), np.mean(scores["f1"])

## Sentiment Analysis

In [None]:
sid = SentimentIntensityAnalyzer()
pos_ratios, neu_ratios, neg_ratios = [], [], []
for o, p in zip(orig, preds):
    ss_orig = sid.polarity_scores(o)
    ss_pred = sid.polarity_scores(p)
    
    ori_neg = ss_orig["neg"]
    ori_neu = ss_orig["neu"]
    ori_pos = ss_orig["pos"]
    
    pred_neg = ss_pred["neg"]
    pred_neu = ss_pred["neu"]
    pred_pos = ss_pred["pos"]
    
    if ori_neg > pred_neg:
        neg_ratios.append(pred_neg/ori_neg)
    elif ori_neg == 0 and pred_neg == 0:
        neg_ratios.append(1)
    else:
        neg_ratios.append(ori_neg/pred_neg)
        

    if ori_neu > pred_neu:
        neu_ratios.append(pred_neu/ori_neu)
    elif ori_neu == 0 and pred_neu == 0:
        neu_ratios.append(1)
    else:
        neu_ratios.append(ori_neu/pred_neu)

    if ori_pos > pred_pos:
        pos_ratios.append(pred_pos/ori_pos)
    elif ori_pos == 0 and ori_pos == 0:
        pos_ratios.append(1)
    else:
        pos_ratios.append(ori_pos/pred_pos)

np.mean(pos_ratios), np.mean(neu_ratios), np.mean(neg_ratios)