In [1]:
import os
import random
import pandas as pd
import numpy as np
import warnings

warnings.filterwarnings(action='ignore')
from tqdm import tqdm as tq

import torch
from torch.utils.data import Dataset, DataLoader

from transformers import AutoModel, AutoTokenizer

In [None]:
folder = os.getcwd() + '\\open'
train = os.listdir(folder)[2]
test  = os.listdir(folder)[1]
submit = os.listdir(folder)[0]

train = pd.read_csv(folder + '/' + train)
test = pd.read_csv(folder + '/' + test)
submit = pd.read_csv(folder + '/' + submit)

In [None]:
train_texts = train.문장.tolist()
len(texts)

# 문장 자르기
truncated_texts = []
for txt in texts:
    splited = txt.split(' ')
    if len(splited)>=40:
        valid = ' '.join(splited[:20] + splited[-20:])
        truncated_texts.append(valid)
    else:
        truncated_texts.append(txt)
truncated_texts[:3]

### embeddings

In [None]:
# embeddings = AutoModel.from_pretrained("klue/roberta-large").embeddings.word_embeddings
# embeddings

### Modules

In [2]:
class AttentionHead(torch.nn.Module):
    def __init__(self, dim_in, dim_Q, dim_K):
        super().__init__()
        self.q_linear = torch.nn.Linear(dim_in, dim_Q)
        self.k_linear = torch.nn.Linear(dim_in, dim_K)
        self.v_linear = torch.nn.Linear(dim_in, dim_K)
    
    def forward(self, Q, K, V):
        return self.scaled_dotproduct_attn(Q, K, V)
        
    def scaled_dotproduct_attn(self, Q, K, V):
        numerator   = Q.bmm(K.transpose(1, 2))
        denominator = Q.size(-1)**0.5 + 1e-08  # root d_k
        softmax = torch.nn.functional.softmax(numerator/denominator, dim=-1)
        return softmax.bmm(V)
    
class MultiHeadAttention(torch.nn.Module):
    def __init__(self, num_heads, dim_in, dim_Q, dim_K):
        super().__init__()
        self.heads = torch.nn.ModuleList(
            [AttentionHead(dim_in, dim_Q, dim_K) for _ in range(num_heads)]
        )
        self.linear = torch.nn.Linear(num_heads * dim_in, dim_in)
        
    def forward(self, Q, K, V):
        multi_head_result = torch.cat([ head(Q, K, V) for head in self.heads ], dim = -1)
        return self.linear(multi_head_result)    
    
def position_encoding(seq_len, dim_model, device):
    pos = torch.arange(seq_len,   dtype=torch.float, device=device).reshape(1,-1,1)
    dim = torch.arange(dim_model, dtype=torch.float, device=device).reshape(1,1,-1)
    phase = pos/(1e4 ** (dim//dim_model))
    return torch.where(dim.long() % 2 == 0, torch.sin(phase), torch.cos(phase))

class feed_forward(torch.nn.Module):
    def __init__(self, dim_model=512, dim_feedforward=2048):
        super().__init__()
        self.layers = torch.nn.Sequential(
            torch.nn.Linear(dim_model, dim_feedforward),
            torch.nn.LeakyReLU(),
            torch.nn.Linear(dim_feedforward, dim_model)
        )
    def forward(self, x):
        return self.layers(x)
    
class Residual(torch.nn.Module):
    def __init__(self, sublayer, dim, dropout=0.1):
        super().__init__()
        self.sublayer = sublayer
        self.norm     = torch.nn.LayerNorm(dim)
        self.dropout  = torch.nn.Dropout(dropout)
    
    def forward(self, *tensors): # Query tensor first
        return self.norm(tensors[0] + self.dropout(self.sublayer(*tensors)))
    
class TransformerEncoderBlock(torch.nn.Module):
    def __init__(
        self,
        dim_model = 512,
        num_heads = 7,
        dim_feedforward = 2048,
        dropout = 0.1):
        super().__init__()
        
        dim_Q = dim_K = max(dim_model//num_heads, 1)
        self.attention = Residual(
            sublayer = MultiHeadAttention(num_heads, dim_model, dim_Q, dim_K),
            dim      = dim_model,
            dropout  = dropout
        )
        self.feed_forward = Residual(
            sublayer = feed_forward(dim_model, dim_feedforward),
            dim      = dim_model,
            dropout  = dropout
        )
    
    def forward(self, source):
        source = self.attention(source, source, source)
        return self.feed_forward(source)
    
class TransformerEncoder(torch.nn.Module):
    def __init__(
        self,
        num_layers = 6,
        dim_model = 512,
        num_heads = 7,
        dim_feedforward = 2048,
        dropout = 0.1):
        super().__init__()
        self.layers = torch.nn.ModuleList(
            [TransformerEncoderBlock(dim_model, num_heads, dim_feedforward, dropout) for _ in range(num_layers)]
        )
        self.device = 'cuda' if torch.cuda.is_available() else 'cpu'
    
    def forward(self, x):
        T, D = x.shape[1], x.shape[2]
        x += position_encoding(T, D, self.device)
        
        for layer in self.layers:
            x = layer(x)
        return x

In [30]:
class TempModel(torch.nn.Module):
    def __init__(
        self,
        num_layers = 3,
        dim_model = 1024,
        num_heads = 7,
        dim_feedforward = 2048,
        dropout = 0.1):
        super().__init__()
        self.device = 'cuda' if torch.cuda.is_available() else 'cpu'

        
        # pretrained embeddings
        self.embedding = AutoModel.from_pretrained("klue/roberta-large").embeddings.word_embeddings
        self.embedding.weight.requires_grad = False
        
        self.encoder  = TransformerEncoder(
            num_layers=num_layers, 
            dim_model=dim_model, 
            num_heads=num_heads, 
            dim_feedforward=dim_feedforward, 
            dropout=dropout
        )
            
    def forward(self, x_ids):
        if x_ids.ndim ==1:
            x_ids = x_ids.reshape(1, -1)
        N, T = x_ids.shape
        x = self.embedding(x_ids)
        x = self.encoder(x)
        return x    

In [31]:
model = TempModel()

Some weights of the model checkpoint at klue/roberta-large were not used when initializing RobertaModel: ['lm_head.decoder.weight', 'lm_head.decoder.bias', 'lm_head.bias', 'lm_head.dense.bias', 'lm_head.layer_norm.weight', 'lm_head.layer_norm.bias', 'lm_head.dense.weight']
- This IS expected if you are initializing RobertaModel from the checkpoint of a model trained on another task or with another architecture (e.g. initializing a BertForSequenceClassification model from a BertForPreTraining model).
- This IS NOT expected if you are initializing RobertaModel from the checkpoint of a model that you expect to be exactly identical (initializing a BertForSequenceClassification model from a BertForSequenceClassification model).
Some weights of RobertaModel were not initialized from the model checkpoint at klue/roberta-large and are newly initialized: ['roberta.pooler.dense.weight', 'roberta.pooler.dense.bias']
You should probably TRAIN this model on a down-stream task to be able to use it f

In [32]:
x_ids.shape

torch.Size([1, 5])

In [33]:
x_ids = torch.tensor([[1,2,3,4,5]])
out = model(x_ids)

In [28]:
out.shape

torch.Size([1, 5, 1024])

In [29]:
model.embedding

Embedding(32000, 1024, padding_idx=1)

In [35]:
>>> m = torch.nn.Conv1d(16, 33, 3, stride=2)
>>> input = torch.randn(20, 16, 50)
>>> output = m(input)

In [38]:
print(input.shape, output.shape)

torch.Size([20, 16, 50]) torch.Size([20, 33, 24])
