In [3]:
import torch
import torch.nn as nn
import torch.nn.functional as F
from tokenization_kobert import KoBertTokenizer

ModuleNotFoundError: No module named 'tokenization_kobert'

In [61]:
class PositionalEncoding(nn.Module):
    def __init__(self, d_model, d_ff):
        super(PositionalEncoding, self).__init__()
        self.encoding = nn.Parameter(torch.empty(d_model, d_ff))
        self.d_model = d_model
        self.d_ff = d_ff
    
    def forward(self, x):
        for i in range(len(self.encoding)):
            for j in range(len(self.encoding[0])):
                if j % 2 == 0:
                    self.encoding[i][j] = torch.sin(i / 10000**(j / d_model))
                else:
                    self.encoding[i][j] = torch.cos(i / 10000**((j - 1) / d_model))
        return self.encoding

In [62]:
class PreProcess(nn.Module):
    def __init__(self, d_model, d_ff):
        super(PreProcess, self).__init__()
        self.encoding = PositionalEncoding(d_model, d_ff)
        self.embedding = nn.Embedding(d_ff, d_model)
        self.d_model = d_model
        
    def forward(self, x):
        x = self.embedding(x)
        x += self.encoding(x)
        return x

In [63]:
def init_params(*params, init_fnc=None):
    for param in params:
        if init_fnc is not None:
            init_fnc(param)
        else:
            torch.nn.init.normal_(param)


class Attention(nn.Module):
    def __init__(self, d_model, d_k=None, d_v=None):
        super(Attention, self).__init__()
        if d_k is None:
            d_k = d_model
        if d_v is None:
            d_v = d_model
            
        self.W_k = nn.Parameter(torch.empty(d_model, d_k))
        self.W_q = nn.Parameter(torch.empty(d_model, d_k))
        self.W_v = nn.Parameter(torch.empty(d_model, d_v))
        self.dot_scale = d_model ** .5
        init_params(self.W_k, self.W_q, self.W_v)
        
    def forward(self, *, query, key, value):
        Q = query.matmul(self.W_q)
        K = key.matmul(self.W_k)
        V = value.matmul(self.W_v)
        A = F.softmax(Q.matmul(K.transpose(1, 2)) / self.dot_scale, dim=1)
        return A.matmul(V)


class MHA(nn.Module):
    def __init__(self, n_heads, d_model, d_k=None, d_v=None):
        super(MHA, self).__init__()
        if d_k is None:
            d_k = d_model // n_heads
            
        if d_v is None:
            d_v = d_model // n_heads

        self.heads = nn.ModuleList([Attention(d_model=d_model, d_k=d_k, d_v=d_v) for _ in range(n_heads)])
        self.W_o = nn.Parameter(torch.empty(d_v * n_heads, d_model))
        init_params(self.W_o)
        
    def forward(self, query, key, value):
        As = []
        for head in self.heads:
            As.append(head(
                query=query,
                key=key,
                value=value,
            ))
        As = torch.cat(As, dim=-1)
        return As.matmul(self.W_o)


class MHACombined(nn.Module):
    def __init__(self, n_heads, d_model, d_k=None, d_v=None, masking = False):
        super(MHACombined, self).__init__()
        if d_k is None:
            d_k = d_model // n_heads
        if d_v is None:
            d_v = d_model // n_heads
        self.masking = masking    
        self.W_k = nn.Parameter(torch.empty(n_heads, d_model, d_k))
        self.W_q = nn.Parameter(torch.empty(n_heads, d_model, d_k))
        self.W_v = nn.Parameter(torch.empty(n_heads, d_model, d_v))
        self.dot_scale = d_model ** .5
        self.W_o = nn.Parameter(torch.empty(d_v * n_heads, d_model))
        init_params(self.W_k, self.W_q, self.W_v, self.W_o)
        
    def forward(self, query, key, value):
        Q = query.unsqueeze(1).matmul(self.W_q)
        K = key.unsqueeze(1).matmul(self.W_k)
        V = value.unsqueeze(1).matmul(self.W_v)
        QK = Q.matmul(K.transpose(-2, -1)) / self.dot_scale
        if self.masking is True:
            mask = torch.full_like(QK, -float('Inf'))
            mask = mask.triu(1)
            QK += mask
        QK = torch.nan_to_num(F.softmax(QK, dim=-1))
        A_ = QK.matmul(V)
        A = A_.permute(0, 2, 1, 3).reshape(A_.shape[0], A_.shape[2], -1)        

        return A.matmul(self.W_o)


class FullyConnected(nn.Module):
    def __init__(self, d_model, d_ff, activation=None):
        super(FullyConnected, self).__init__()
        self.net = nn.Sequential(
            nn.Linear(d_model, d_ff),
            activation if activation is not None else nn.ReLU(),
            nn.Linear(d_ff, d_model),
        )
        
    def forward(self, x):
        return self.net(x)


class EncoderLayer(nn.Module):
    def __init__(self, d_model, d_ff, d_k=None, d_v=None, n_heads=8):
        super(EncoderLayer, self).__init__()
#         self.attention = MHA(n_heads=n_heads, d_model=d_model, d_k=d_k, d_v=d_v)
        self.attention = MHACombined(n_heads=n_heads, d_model=d_model, d_k=d_k, d_v=d_v)
        self.fc = FullyConnected(d_model, d_ff)
        self.norm_att = nn.LayerNorm(d_model)
        self.norm_fc = nn.LayerNorm(d_model)
    
    def forward(self, x):
        x += self.attention(x, x, x)
        x = self.norm_att(x)
        x += self.fc(x)
        x = self.norm_fc(x)
        return x


class Encoder(nn.Module):
    def __init__(self, d_model, d_ff, n_heads=8, n_layers=6, d_v=None, d_k=None):
        super(Encoder, self).__init__()
        self.layers = nn.ModuleList([EncoderLayer(
            d_model=d_model,
            d_ff=d_ff,
            n_heads=n_heads,
            d_v=d_v,
            d_k=d_k,
        ) for _ in range(n_layers)])
        
    def forward(self, x):
        for layer in self.layers:
            x += layer(x)
        return x

In [64]:
class DecoderLayer(nn.Module):
    def __init__(self, d_model, d_ff, d_k = None, d_v = None, n_heads = 8):
        super(DecoderLayer, self).__init__()
        self.masked_multi_head = MHACombined(n_heads = n_heads, d_model = d_model, d_k = d_k, d_v = d_v, masking = True)
        self.multi_head = MHACombined(n_heads = n_heads, d_model = d_model, d_k = d_k, d_v = d_v, masking = False)
        self.fc = FullyConnected(d_model, d_ff)
        self.norm_masked_att = nn.LayerNorm(d_model)
        self.norm_att = nn.LayerNorm(d_model)
        self.norm_fc = nn.LayerNorm(d_model)
    
    def forward(self, x, encoder_out):
        x += self.masked_multi_head(x, x, x)
        x = self.norm_masked_att(x)
        
        x += self.multi_head(encoder_out, encoder_out, x)
        x = self.norm_att(x)
        
        x += self.fc(x)
        x = self.norm_fc(x)
        
        return x
    
    
class Decoder(nn.Module):
    def __init__(self, d_model, d_ff, n_heads = 8, n_layers = 6, d_v = None, d_k = None):
        super(Decoder, self).__init__()
        self.layers = nn.ModuleList([DecoderLayer(
            d_model = d_model,
            d_ff = d_ff,
            n_heads = n_heads,
            d_v = d_v,
            d_k = d_k        
        ) for _ in range(n_layers)])
        
    def forward(self, x, encoder_out):
        for layer in self.layers:
            x += layer(x, encoder_out)
        return x

In [65]:
class Output(nn.Module):
    def __init__(self, d_model, d_ff):
        super(Output, self).__init__()
        self.net = nn.Sequential(
            nn.Linear(d_model, d_ff),
            nn. Softmax(dim = 1)
        )
    
    def forward(self, x):
        x = self.net(x)
        return x

In [70]:
d_model = 512
d_ff = 10000
n_heads = 8
n_layers = 6

preprocess = PreProcess(d_model, d_ff)
encoder = Encoder(d_model, d_ff, n_heads, n_layers)
decoder = Decoder(d_model, d_ff, n_heads, n_layers)
output = Output(d_model, d_ff)

In [None]:
df_train = pd.read_csv('./ratings_train.txt', delimiter='\t', index_col='id')
x_train = df_train['document'].values[:100]
inp = tokenizer.batch_encode_plus(
    x_train.astype('U'),
    padding=True, 
    return_tensors='pt'
)['input_ids']


In [76]:
out = preprocess(inp)
print(out)
out = encoder(out)
print(out)
out = decoder(out)
print(out)
out = Output(out)
print(out)

ValueError: too many dimensions 'str'