# 각종 로딩

In [50]:
import torch
import torch.nn as nn
import torch.optim as optim
import torch.utils.data as data
from torch.nn import Transformer
import torch.nn.functional as F
import math
import random
import numpy as np
from transformers import ASTConfig, ASTModel, GPT2Config, GPT2Model, AutoModelForCausalLM, GPT2LMHeadModel, BartConfig

class GPT2Model(nn.Module):
    def __init__(self, vocab_size=140, n_embd=768, n_layer=12, n_head=12):
        super(GPT2Model, self).__init__()
        self.configuration = GPT2Config(vocab_size=vocab_size, n_embd=n_embd, n_layer=n_layer, n_head=n_head, bos_token_id=2, eos_token_id=1)
        self.model = GPT2LMHeadModel(self.configuration)
        
    def get_embed(self, idx):
        embedding_layer = self.model.transformer.wte
        token_embedding = embedding_layer(torch.tensor([idx]))
        return token_embedding
    
    def extract_vocab_embeddings(self):
        # Extract all the embeddings for the entire vocabulary
        embedding_layer = self.model.transformer.wte
        vocab_embeddings = embedding_layer.weight.detach().clone()
        return vocab_embeddings

    def forward(self, input_ids, labels=None, return_hidden_states=False):
        attention_mask = self.make_mask(input_ids)
        # Forward pass through the transformer to get hidden states
        transformer_outputs = self.model.transformer(input_ids=input_ids, attention_mask=attention_mask, output_hidden_states=True)

        # Extract hidden states before the projection
        hidden_states = transformer_outputs.last_hidden_state
        
        if return_hidden_states:
            return hidden_states

        # Project the hidden states to vocabulary size
        logits = self.model.lm_head(hidden_states)

        if labels is not None:
            loss_fct = nn.CrossEntropyLoss()
            loss = loss_fct(logits.view(-1, self.configuration.vocab_size), labels.view(-1))
            return loss, logits
        return logits

    def make_mask(self, input_ids):
        attention_mask = (input_ids != 0).long()
        return attention_mask
    
    def infer(self, input_ids, length=2048):
        if len(input_ids.shape) == 1:
            input_ids = input_ids.unsqueeze(0)
        if len(input_ids.shape) > 2:
            raise Exception
        
        if length > 2048:
            print("Max Length is 2048. Change Length Auto to 2048")
            length = 2048
        
        with torch.no_grad():
            for step in range(length):
                logits = self.forward(input_ids)
                output = torch.argmax(logits, dim=2)

                predict = output[:,-1].unsqueeze(1)
                output_ids = torch.cat((input_ids, predict), dim=-1)

                input_ids = output_ids
                
                if output_ids.shape[1] > 2048:
                    break

        return output_ids
    
class ResidualLinearLayer(nn.Module):
    def __init__(self, d_dim):
        super(ResidualLinearLayer, self).__init__()
        # Define a linear layer that keeps the dimensionality constant
        self.linear = nn.Linear(d_dim, d_dim)
        # self.down = nn.Linear(d_dim, d_dim//4)
        # self.up = nn.Linear(d_dim//4, d_dim)
        self.activation = nn.ReLU()

    def forward(self, x):
        # Apply the linear layer followed by an activation function
        out = self.linear(x)
        out = self.activation(out)
        # Add the input to the output for the residual connection
        return out + x

class InstPoolingLayer(nn.Module):
    def __init__(self, d_dim=1024, num_layers=8):
        super(InstPoolingLayer, self).__init__()

        self.layers = nn.ModuleList([ResidualLinearLayer(d_dim) for _ in range(num_layers)])

    def forward(self, x):
        # Sequentially apply each residual layer
        for layer in self.layers:
            x = layer(x)
        return x

class InstGRU(nn.Module):
    def __init__(self, hidden_dim=128, n_layers=3):
        super(InstGRU, self).__init__()
        
        self.chord_transformer = GPT2Model(vocab_size=150)
        self.chord_transformer.load_state_dict(torch.load('/workspace/out/chord_bpe/GPT2_BPE_V150/model_207_0.4520_0.3645.pt'))
        # Freeze the chord_transformer parameters
        for param in self.chord_transformer.parameters():
            param.requires_grad = False
            
        self.grus = nn.ModuleList([nn.GRU(1024, hidden_dim, num_layers=n_layers, batch_first=True) for _ in range(133)])
        self.output_layers = nn.ModuleList([nn.Linear(hidden_dim, 1) for _ in range(133)])
        self.init_inst_proj = nn.Linear(133, 256)
        
    def forward(self, chord_tensor, init_inst_tensor, inst_all_container=None):
        # print("BBBBB")
        
        chord_embed = self.chord_transformer(chord_tensor, return_hidden_states=True)
        chord_embed = chord_embed[:,1:-1,:]
        length = chord_embed.shape[1]
        init_inst_embed = self.init_inst_proj(init_inst_tensor)
        init_inst_embed = init_inst_embed.unsqueeze(1).repeat(1, length, 1)
        
        input_embed = torch.cat((chord_embed, init_inst_embed), dim=2)
        
        output_container = []
        # print(input_embed.shape)
        
        for inst_idx in range(133):
            
            out_embed, hn = self.grus[inst_idx](input_embed)
            
            out_embed = self.output_layers[inst_idx](out_embed)
            
            output_container.append(out_embed)
        # print("CCCCC")
        # print(out_embed)
        # print(out_embed.shape)
        return output_container
    
    # def infer(self, chord_tensor, init_inst_tensor, length=1024):
    #     with torch.no_grad():
    #         for _ in range(length):
    #             output = self.forward(chord_tensor, init_inst_tensor)
                
    #     return

class InstPooling(nn.Module):
    def __init__(self, n_layers=2):
        super(InstPooling, self).__init__()
        
        self.chord_transformer = GPT2Model(vocab_size=150)
        self.chord_transformer.load_state_dict(torch.load('/workspace/pj/out/chord_bpe/GPT2_BPE_V150/model_207_0.4520_0.3645.pt'))
        # Freeze the chord_transformer parameters
        for param in self.chord_transformer.parameters():
            param.requires_grad = False
        
        self.init_inst_proj = nn.Linear(133, 256)
            
        self.inst_poolers = nn.ModuleList([InstPoolingLayer(num_layers=n_layers) for _ in range(133)])
        self.output_layers = nn.ModuleList([nn.Linear(1024, 1) for _ in range(133)])
        
        # self.down = nn.Linear(1024, 1024//8)
        # self.up = nn.Linear(1024//8, 1024)
        
    def forward(self, chord_tensor, init_inst_tensor, inst_all_container=None):
        # print("BBBBB")
        
        chord_embed = self.chord_transformer(chord_tensor, return_hidden_states=True)
        chord_embed = chord_embed[:,1:-1,:]
        length = chord_embed.shape[1]
        init_inst_embed = self.init_inst_proj(init_inst_tensor)
        init_inst_embed = init_inst_embed.unsqueeze(1).repeat(1, length, 1)
        
        input_embed = torch.cat((chord_embed, init_inst_embed), dim=2)
        
        output_container = []
        
        for inst_idx in range(133):
            
            out_embed = self.inst_poolers[inst_idx](input_embed)
            
            out_embed = self.output_layers[inst_idx](out_embed)
            
            output_container.append(out_embed)

        return output_container

class InstSimplePooling(nn.Module):
    def __init__(self, n_layers=2):
        super(InstSimplePooling, self).__init__()
        
        self.chord_transformer = GPT2Model(vocab_size=150)
        self.chord_transformer.load_state_dict(torch.load('/workspace/pj/out/chord_bpe/GPT2_BPE_V150/model_207_0.4520_0.3645.pt'))
        # Freeze the chord_transformer parameters
        for param in self.chord_transformer.parameters():
            param.requires_grad = False
        
        self.init_inst_proj = nn.Linear(133, 256)
            
        # self.inst_poolers = nn.ModuleList([InstPoolingLayer(num_layers=n_layers) for _ in range(133)])
        self.output_layers = nn.ModuleList([nn.Linear(1024, 1) for _ in range(133)])
        
        # self.down = nn.Linear(1024, 1024//8)
        # self.up = nn.Linear(1024//8, 1024)
        
    def forward(self, chord_tensor, init_inst_tensor, inst_all_container=None):
        # print("BBBBB")
        
        chord_embed = self.chord_transformer(chord_tensor, return_hidden_states=True)
        chord_embed = chord_embed[:,1:-1,:]
        length = chord_embed.shape[1]
        init_inst_embed = self.init_inst_proj(init_inst_tensor)
        init_inst_embed = init_inst_embed.unsqueeze(1).repeat(1, length, 1)
        
        input_embed = torch.cat((chord_embed, init_inst_embed), dim=2)
        
        output_container = []
        
        for inst_idx in range(133):
            
            # out_embed = self.inst_poolers[inst_idx](input_embed)
            
            out_embed = self.output_layers[inst_idx](input_embed)
            
            output_container.append(out_embed)

        return output_container


In [51]:
prefix = '/workspace/pj/out/inst_gru'
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
device = 'cpu'
print(device)

model = InstPooling()
model.load_state_dict(torch.load(prefix + '/POOL_Vanila/model_10_0.9701_0.9644_0.6331.pt', map_location=device))
model.to(device)
model.eval()

cpu


InstPooling(
  (chord_transformer): GPT2Model(
    (model): GPT2LMHeadModel(
      (transformer): GPT2Model(
        (wte): Embedding(150, 768)
        (wpe): Embedding(1024, 768)
        (drop): Dropout(p=0.1, inplace=False)
        (h): ModuleList(
          (0): GPT2Block(
            (ln_1): LayerNorm((768,), eps=1e-05, elementwise_affine=True)
            (attn): GPT2Attention(
              (c_attn): Conv1D()
              (c_proj): Conv1D()
              (attn_dropout): Dropout(p=0.1, inplace=False)
              (resid_dropout): Dropout(p=0.1, inplace=False)
            )
            (ln_2): LayerNorm((768,), eps=1e-05, elementwise_affine=True)
            (mlp): GPT2MLP(
              (c_fc): Conv1D()
              (c_proj): Conv1D()
              (act): NewGELUActivation()
              (dropout): Dropout(p=0.1, inplace=False)
            )
          )
          (1): GPT2Block(
            (ln_1): LayerNorm((768,), eps=1e-05, elementwise_affine=True)
            (attn): GPT2A

In [6]:
import torch
import json
from tqdm import tqdm
from torch.utils.data import Dataset, DataLoader
from torch.nn.utils.rnn import pad_sequence
from sklearn.model_selection import train_test_split
  
class InstGRUDataset(Dataset):
    def __init__(self, data):
        super().__init__()
        self.data = data
        inst_vocab_path = '/workspace/pj/data/vocabs/inst.json'
        chord_vocab_path = '/workspace/pj/data/vocabs/chord.json'
        with open(inst_vocab_path, 'r') as file:
            self.inst_vocab = json.load(file)
        with open(chord_vocab_path, 'r') as file:
            self.chord_vocab = json.load(file)

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        text_seq = self.data[idx]
        
        if isinstance(text_seq, str):
            toks = text_seq.split()
            
        l_toks = len(toks)
        ratio = 4
        chord_list = []
        ans_inst_container = []
        inst_in_measure = []
        inst_list = []
        inst_tensor = torch.zeros(133)
        
        for idx in range(0, l_toks, ratio):
            t1, t2, t3, t4 = toks[idx : idx + 4]
            if t1[0] == 'h' or t1[0] == 'H':
                chord_list.append(t1)

            if t4[0] == 'x' or t4[0] == 'X' or t4[0] == 'y' or t4 == '<unk>':
                inst_tensor[self.inst_vocab[t4]] = 1
                
            if t4[0] == 'x' or t4[0] == 'X' or t4[0] == 'y' or t4 == '<unk>':
                inst_in_measure.append(t4)
                
            if (t1[0] == 'm' or t1[0] == 'M') and len(chord_list) > 0:
                inst_list.append(inst_in_measure)
                inst_in_measure = []
        
        inst_list.append(inst_in_measure)

        chord_tensor = [self.chord_vocab[chd] for chd in chord_list]
        ans_inst_container = self.convert_inst_to_onehot(inst_list, ans_inst_container)
        
        target_chord_tensor = [2] + chord_tensor[:766] + [1]
        target_chord_tensor = torch.tensor(target_chord_tensor)
        
        init_inst_tensor = inst_tensor

        return target_chord_tensor, init_inst_tensor, ans_inst_container
    
    def convert_inst_to_onehot(self, inst_list, ans_inst_container):
        
        for _ in range(133):
            ans_inst_container.append([0]*len(inst_list))
        
        for idx, inst_in_measure in enumerate(inst_list):
            if len(inst_in_measure) == 0:
                continue
            else:
                for inst in inst_in_measure:
                    # base_tensor[idx, self.inst_vocab[inst]] = 1
                    ans_inst_container[self.inst_vocab[inst]][idx] = 1
                    
        for idx, vec in enumerate(ans_inst_container):
            ans_inst_container[idx] = torch.tensor(vec[:766])
        
        return ans_inst_container


def create_InstGRU(batch_size):
    raw_data_path = '../../../workspace/pj/data/corpus/raw_corpus_bpe.txt'
    # raw_data_path = '../../../workspace/data/corpus/first_5_lines_bpe.txt'
    raw_data = []
    with open(raw_data_path, 'r') as f:
        for line in tqdm(f, desc="reading original txt file..."):
            raw_data.append(line.strip())
            
    train, val_test = train_test_split(raw_data, test_size=0.1, random_state=5)
    val, test = train_test_split(val_test, test_size=0.2, random_state=5)
    
    train_dataset = InstGRUDataset(train)
    val_dataset = InstGRUDataset(val)
    test_dataset = InstGRUDataset(test)
    
    train_loader = DataLoader(train_dataset, batch_size=batch_size, collate_fn=collate_batch_InstGRU)
    val_loader = DataLoader(val_dataset, batch_size=batch_size, collate_fn=collate_batch_InstGRU)
    test_loader = DataLoader(test_dataset, batch_size=batch_size, collate_fn=collate_batch_InstGRU)

    # return train_loader, True, True
    return train_loader, val_loader, test_loader

def collate_batch_InstGRU(batch):
    target_chord_tensor, init_inst_tensor, ans_inst_container = zip(*batch)
    # padding_value = <eos>
    chord_padded = pad_sequence(target_chord_tensor, padding_value=0, batch_first=True)
    inst_padded = pad_sequence(init_inst_tensor, padding_value=0, batch_first=True)
    # ans_padded = pad_sequence(ans_inst_container, padding_value=0, batch_first=True)

    return chord_padded, inst_padded, ans_inst_container


# train_loader, val_loader, test_loader = create_InstGRU(2)
# for chord, inst, length in train_loader:
#     print(chord)
#     print(chord.shape)
#     print(inst.shape)
#     print(inst)
#     print(length)
#     break

train_loader, val_loader, test_loader = create_InstGRU(1)

reading original txt file...: 0it [00:00, ?it/s]

reading original txt file...: 46188it [00:18, 2512.35it/s]


# Output Test

In [54]:
for idx, (chord_tensor, init_inst, targets) in enumerate(train_loader):
    if test_chord.shape[1] > 150:
        break
    test_target = targets
    test_init =init_inst
    test_chord = chord_tensor

print(test_init)
print(test_target[0][62])

tensor([[0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
         0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 1., 0., 0., 0., 0., 0., 0., 0.,
         1., 0., 0., 0., 0., 0., 0., 0., 0., 0., 1., 0., 0., 0., 0., 0., 0., 0.,
         0., 0., 0., 0., 0., 0., 1., 1., 1., 0., 0., 0., 0., 0., 0., 1., 1., 0.,
         0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
         0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
         0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
         0., 0., 0., 0., 0., 0., 1.]])
tensor([1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 0, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1,
        1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 0, 0, 0, 0, 0, 0, 0, 0,
        0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1,
        0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1,
        1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 0, 0, 0, 0, 0, 0, 

In [41]:
print(test_chord.shape[1])

158


In [53]:
infer = model(test_chord, test_init)
# print(infer)

for idx, i in enumerate(infer):
    print(idx)
    output = torch.sigmoid(i)  # Apply sigmoid to get probabilities
    predictions = (output > 0.5).float()  # Convert probabilities to binary (0 or 1)
    print(predictions.view(-1))

0
tensor([0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
        0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
        0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
        0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
        0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
        0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
        0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.])
1
tensor([0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
        0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
        0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
   