## **Prepare data**

In [None]:
import torch
import pandas as pd
import math
import time
from sklearn.model_selection import train_test_split
from tqdm.notebook import tqdm
from itertools import chain

def flatten(ls):
    """
    Flatten list of list
    """
    return list(chain.from_iterable(ls))

def clean_tokens(tokens):
    #tokens = [token.strip() for token in tokens if token.strip() not in ["", "(", ")", "*", "."]]
    
    tokens = [token for token in tokens if token.strip() not in ["(", ")", "*", "."]]
    tokens = [token.strip() if token.strip() != "" else " " for token in tokens]
    return tokens

siamzone_df = pd.read_pickle("siamzone-process-v2.pickle")
siamzone_trian_df, siamzone_val_df = train_test_split(siamzone_df, test_size=0.15, random_state=126)

train_lyrics = flatten(siamzone_trian_df.tokenized_lyrics.map(clean_tokens))
val_lyrics = flatten(siamzone_val_df.tokenized_lyrics.map(clean_tokens))


device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

## **Create Model**

ref: https://pytorch.org/tutorials/beginner/transformer_tutorial.html
and: https://colab.research.google.com/drive/1u34ME_e1wezCmCRdjTiTHlWg9DZsfheb?usp=sharing&fbclid=IwAR3koG2yz6nAnXYCeg7aC-rSBgmWwrX8mRgWggZCK630hDvbKZZvH6Q1QRk#scrollTo=eGMG9zFDscev

In [None]:
import torch
import torch.nn as nn
from torch.nn import Transformer
from torch.nn import TransformerEncoder, TransformerEncoderLayer, Transformer
from torch.utils.data import TensorDataset, DataLoader

import torchtext
from torchtext.data import Field, BPTTIterator
from torchtext.data.utils import get_tokenizer
from torchtext.datasets import LanguageModelingDataset

from collections import Counter


class TransformerModel(nn.Module):

    def __init__(self, ntoken, ninp, nhead, nhid, nlayers, dropout=0.5):
        super(TransformerModel, self).__init__()
        from torch.nn import TransformerEncoder, TransformerEncoderLayer
        self.model_type = 'Transformer'
        self.pos_encoder = PositionalEncoding(ninp, dropout)
        encoder_layers = TransformerEncoderLayer(ninp, nhead, nhid, dropout)
        self.transformer_encoder = TransformerEncoder(encoder_layers, nlayers)
        self.encoder = nn.Embedding(ntoken, ninp)
        self.ninp = ninp
        self.decoder = nn.Linear(ninp, ntoken)

        self.init_weights()

    def generate_square_subsequent_mask(self, sz):
        mask = (torch.triu(torch.ones(sz, sz)) == 1).transpose(0, 1)
        mask = mask.float().masked_fill(mask == 0, float('-inf')).masked_fill(mask == 1, float(0.0))
        return mask

    def init_weights(self):
        initrange = 0.1
        self.encoder.weight.data.uniform_(-initrange, initrange)
        self.decoder.bias.data.zero_()
        self.decoder.weight.data.uniform_(-initrange, initrange)

    def forward(self, src, src_mask):
        src = self.encoder(src) * math.sqrt(self.ninp)
        src = self.pos_encoder(src)
        output = self.transformer_encoder(src, src_mask)
        output = self.decoder(output)
        return output


class PositionalEncoding(nn.Module):

    def __init__(self, d_model, dropout=0.1, max_len=5000):
        super(PositionalEncoding, self).__init__()
        self.dropout = nn.Dropout(p=dropout)

        pe = torch.zeros(max_len, d_model)
        position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-math.log(10000.0) / d_model))
        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)
        pe = pe.unsqueeze(0).transpose(0, 1)
        self.register_buffer('pe', pe)

    def forward(self, x):
        x = x + self.pe[:x.size(0), :]
        return self.dropout(x)


def create_lookup_dict(tokenized_lyrics, n_min=None):
    """
    Create lookup dictionary from list of words (lyrics)
    """
    word_counts = Counter(tokenized_lyrics)
    sorted_vocab = sorted(word_counts, key=word_counts.get, reverse=True)
    if n_min is not None:
        sorted_vocab = {k: v for k, v in word_counts.items() if v >= n_min}
    vocab_to_int = {word: i for i, word in enumerate(sorted_vocab, 1)}
    vocab_to_int["<unk>"] = 0
    int_to_vocab = {i: word for word, i in vocab_to_int.items()}
    return (vocab_to_int, int_to_vocab)

    
def batch_data(words, sequence_length, batch_size, shuffle=False):
    """
    ref: Udacity Deep learning class
    """
    batch_size_total = batch_size * sequence_length
    n_batches = len(words) // batch_size_total
    words = words[: n_batches * batch_size_total]
    
    X, target = [], []
    for n in range(0, len(words) - sequence_length, 1):
        x = words[n: n + sequence_length]
        y = words[n + sequence_length]
        X.append(np.array(x))
        target.append(y)
    X = np.array(X)
    target = np.array(target)
    dataset = TensorDataset(torch.from_numpy(X), torch.from_numpy(target))
    return DataLoader(dataset, batch_size=batch_size, shuffle=shuffle)

In [None]:
vocab_to_int, int_to_vocab = create_lookup_dict(train_lyrics + val_lyrics, n_min=3)
tokenized_indices_train = [vocab_to_int.get(token, 0) for token in train_lyrics]
tokenized_indices_val = [vocab_to_int.get(token, 0) for token in val_lyrics]

In [None]:
len(vocab_to_int)

In [None]:
vocab_to_int[" "]

In [None]:
import pickle
pickle.dump({"vocab_to_int": vocab_to_int, "int_to_vocab": int_to_vocab},  open( "vocab_transformer_space.pkl", "wb" ))

In [None]:
ntokens = len(vocab_to_int) # the size of vocabulary
emsize = 512 # embedding dimension
nhid = 512 # the dimension of the feedforward network model in nn.TransformerEncoder
nlayers = 4 # the number of nn.TransformerEncoderLayer in nn.TransformerEncoder
nhead = 4 # the number of heads in the multiheadattention models
dropout = 0.2 # the dropout value
model = TransformerModel(ntokens, emsize, nhead, nhid, nlayers, dropout).to(device)

In [None]:
def batchify(data, bsz):
    data = torch.LongTensor(data).unsqueeze(0).T
    # Divide the dataset into bsz parts.
    nbatch = data.size(0) // bsz
    # Trim off any extra elements that wouldn't cleanly fit (remainders).
    data = data.narrow(0, 0, nbatch * bsz)
    # Evenly divide the data across the bsz batches.
    data = data.view(bsz, -1).t().contiguous()
    return data.to(device)

batch_size = 64
eval_batch_size = 10
train_data = batchify(tokenized_indices_train + tokenized_indices_val, batch_size)
val_data = batchify(tokenized_indices_val, eval_batch_size)

In [None]:
# data is arranged in column
print([int_to_vocab.get(idx, '') for idx in train_data[:, 0].tolist()[0: 100]])

In [None]:
bptt = 35

def get_batch(source, i):
    seq_len = min(bptt, len(source) - 1 - i)
    data = source[i:i+seq_len]
    target = source[i+1:i+1+seq_len].reshape(-1)
    return data, target

In [None]:
criterion = nn.CrossEntropyLoss()
lr = 5.0 # learning rate
optimizer = torch.optim.SGD(model.parameters(), lr=lr)
scheduler = torch.optim.lr_scheduler.StepLR(optimizer, 1.0, gamma=0.95)

def train(model, data_source):
    model.train() # Turn on the train mode
    total_loss = 0.
    start_time = time.time()
    src_mask = model.generate_square_subsequent_mask(bptt).to(device)
    for batch, i in enumerate(range(0, data_source.size(0) - 1, bptt)):
        data, targets = get_batch(data_source, i)
        optimizer.zero_grad()
        if data.size(0) != bptt:
            src_mask = model.generate_square_subsequent_mask(data.size(0)).to(device)
        output = model(data, src_mask)
        loss = criterion(output.view(-1, ntokens), targets)
        loss.backward()
        torch.nn.utils.clip_grad_norm_(model.parameters(), 0.5)
        optimizer.step()

        total_loss += loss.item()
        log_interval = 200
        if batch % log_interval == 0 and batch > 0:
            cur_loss = total_loss / log_interval
            elapsed = time.time() - start_time
            print('| epoch {:3d} | {:5d}/{:5d} batches | '
                  'lr {:02.2f} | ms/batch {:5.2f} | '
                  'loss {:5.2f} | ppl {:8.2f}'.format(
                    epoch, batch, len(data_source) // bptt, scheduler.get_lr()[0],
                    elapsed * 1000 / log_interval,
                    cur_loss, math.exp(cur_loss)))
            total_loss = 0
            start_time = time.time()
    

def evaluate(eval_model, data_source):
    eval_model.eval() # Turn on the evaluation mode
    total_loss = 0.
    ntokens = len(vocab_to_int)
    src_mask = model.generate_square_subsequent_mask(bptt).to(device)
    with torch.no_grad():
        for i in range(0, data_source.size(0) - 1, bptt):
            data, targets = get_batch(data_source, i)
            if data.size(0) != bptt:
                src_mask = model.generate_square_subsequent_mask(data.size(0)).to(device)
            output = eval_model(data, src_mask)
            output_flat = output.view(-1, ntokens)
            total_loss += len(data) * criterion(output_flat, targets).item()
    return total_loss / (len(data_source) - 1)

# Load Weight

### train from scratch do not run this section

In [None]:
optimizer = torch.optim.SGD(model.parameters(), lr=lr)


optimizer_save_path = "optimizer-lm-siamzone-v4-space-342.pkl"
optimizer.load_state_dict(torch.load(optimizer_save_path))

for g in optimizer.param_groups:
    g['lr'] = 3

    
scheduler = torch.optim.lr_scheduler.StepLR(optimizer, 1.0, gamma=0.95)

In [None]:
for g in optimizer.param_groups:
    g['lr'] = 0.1

In [None]:
model_save_path = "lm-siamzone-v4-space-342.pkl"
model.load_state_dict(torch.load(model_save_path))
model.eval()

# Train

In [None]:
best_val_loss = float("inf")
epochs = 2 # The number of epochs
best_model = None

for epoch in range(1, epochs + 1):
    epoch_start_time = time.time()
    train(model, train_data)
    val_loss = evaluate(model, val_data)
    print('-' * 89)
    print('| end of epoch {:3d} | time: {:5.2f}s | valid loss {:5.2f} | '
          'valid ppl {:8.2f}'.format(epoch, (time.time() - epoch_start_time),
                                     val_loss, math.exp(val_loss)))
    print('-' * 89)

    if val_loss < best_val_loss:
        best_val_loss = val_loss
        best_model = model

    scheduler.step()

## Save model

In [None]:
model_save_path = "lm-siamzone-v4-space-342.pkl"

In [None]:
optimizer_save_path = "optimizer-{}".format(model_save_path)
torch.save(optimizer.state_dict(), optimizer_save_path)

In [None]:
torch.save(model.state_dict(), model_save_path)

## **Generate new lyrics**

In [None]:
import deepcut
import torch.nn.functional as F

def top_k_top_p_filtering(
    logits, top_k, top_p, temperature, filter_value=-float("Inf")
):
    # Hugging Face script to apply top k and nucleus sampling
    logits = logits / temperature

    top_k = min(top_k, logits.size(-1))  # Safety check
    if top_k > 0:
        # Remove all tokens with a probability less than the last token of the top-k
        indices_to_remove = logits < torch.topk(logits, top_k)[0][..., -1, None]
        logits[indices_to_remove] = filter_value

    if top_p > 0.0:
        sorted_logits, sorted_indices = torch.sort(logits, descending=True)
        cumulative_probs = torch.cumsum(F.softmax(sorted_logits, dim=-1), dim=-1)

        # Remove tokens with cumulative probability above the threshold
        sorted_indices_to_remove = cumulative_probs > top_p
        # Shift the indices to the right to keep also the first token above the threshold
        sorted_indices_to_remove[..., 1:] = sorted_indices_to_remove[..., :-1].clone()
        sorted_indices_to_remove[..., 0] = 0

        indices_to_remove = sorted_indices[sorted_indices_to_remove]
        logits[indices_to_remove] = filter_value

    return logits


def predict(model, start_word='อยากจะไป', size=50):
    start_ids = torch.LongTensor([vocab_to_int.get(w, 0) for w in deepcut.tokenize(start_word)])
    #print(start_ids)

    outputs = []
    outputs.extend(start_ids.tolist())
    for i in range(size):
        data = torch.tensor(outputs).unsqueeze(-1).to(device)
        src_mask = model.generate_square_subsequent_mask(data.size(0)).to(device)
        logits = model.forward(data, src_mask)
        top_k, top_p, temperature = 100, 0.95, 1
        filtered_logits = top_k_top_p_filtering(logits[-1].squeeze(), top_k=top_k, top_p=top_p, temperature=temperature)
        probabilities = F.softmax(filtered_logits, dim=-1)
        probabilities_logits, probabilities_position = torch.sort(
            probabilities, descending=True
        )
        predicted_token = torch.multinomial(probabilities, 1)
        outputs.append(int(predicted_token))

    return [int_to_vocab.get(idx, ' ') for idx in outputs]

In [None]:
start_word = "คิดถึงเธอ"
for i in range(10):
    pred = predict(model, start_word)
    print(''.join(pred))
    print("-" * 80)

## **Load weight and fine-tune**

In [None]:
artist_names = ['คาราบาว', 'ปู พงษ์สิทธิ์ คำภีร์']
batch_size = 20

finetune_lyrics = flatten(siamzone_df[siamzone_df.artist_name.isin(artist_names)].tokenized_lyrics_word_only)
tokenized_indices_ft = [vocab_to_int.get(token, 0) for token in finetune_lyrics]
finetune_data = batchify(tokenized_indices_ft, batch_size)
finetune_data.shape

In [None]:
# fine tuning
for param in model.parameters():
    param.requires_grad = False
model.decoder = nn.Linear(emsize, ntokens).cuda()

lr = 0.01
optimizer = torch.optim.Adam(model.parameters(), lr=lr)
scheduler = torch.optim.lr_scheduler.StepLR(optimizer, 20, gamma=0.9)
finetune_epochs = 200

for epoch in range(1, finetune_epochs + 1):
    print(f"epoch: {epoch}")
    train(model, finetune_data)
    scheduler.step()

In [None]:
start_word = "กัญชา"
for i in range(10):
    pred = predict(model, start_word)
    print(''.join(pred))
    print("-" * 80)