In [92]:
import torch
import torch.nn as nn
from torch.nn.parameter import Parameter
import torch.nn.functional as F
import torch.nn.init as init
import math

In [93]:
class SelfRNN(nn.Module):
    def __init__(self,in_dim,hidden_dim,num_layers):
        super().__init__()
        self.in_dim = in_dim
        self.hidden_dim = hidden_dim
        self.num_layers = num_layers
        self.rnn = nn.ModuleList()
        for _ in range(self.num_layers):
            self.rnn.append(
                nn.Sequential(
                    nn.Linear(in_dim + hidden_dim,hidden_dim),
                    nn.ReLU(),
                ))
            in_dim = hidden_dim
    def forward(self,in_features,h0 = None):
        seq_len,batch_size,_ = in_features.shape
        if h0 == None:
            h0 = torch.zeros(batch_size,self.hidden_dim)
        output = []
        for time_step in range(seq_len):
            x = in_features[time_step,:,:]
            hn = []
            for i in range(self.num_layers):
                hi = h0[i]
                x = self.rnn[i](torch.cat((x,hi),dim = 1))
                hn.append(x)
            h0 = torch.stack(hn)
            output.append(x)
        out = torch.stack(output)
        return out,h0
rnn = SelfRNN(10, 20, 50)
input = torch.randn(5, 3, 10)
h0 = torch.randn(50, 3, 20)
output,hn = rnn(input, h0)
output.shape, hn.shape



(torch.Size([5, 3, 20]), torch.Size([50, 3, 20]))

In [94]:
class SelfLSTM(nn.Module):
    def __init__(self,in_dim,embedding_dim,num_layers):
        super().__init__()
        self.in_dim = in_dim 
        self.embedding_dim = embedding_dim
        self.num_layers = num_layers
        self.lstm = nn.ModuleList()
        for _ in range(self.num_layers):
            self.lstm.append(nn.Linear(in_dim + embedding_dim,embedding_dim * 4))
            in_dim = embedding_dim
    def forward(self,in_features,h0 = None,c0 = None):
        seq_len,batch_size,_ = in_features.shape
        if h0 == None:
            h0 = torch.zeros(seq_len,batch_size,self.embedding_dim)
        if c0 == None:
            c0 = torch.zeros(seq_len,batch_size,self.embedding_dim)
        output = []
        for time_step in range(seq_len):
            x = in_features[time_step].squeeze(0)
            hn = []
            cn = []
            for i in range(self.num_layers):
                hi = h0[i]
                ci = c0[i]
                x = self.lstm[i](torch.cat((x,hi),dim = 1))
                i_t,f_t,g_t,o_t, = x.split(self.embedding_dim,dim = -1)
                i_t = torch.sigmoid(i_t)
                f_t = torch.sigmoid(f_t)
                g_t = torch.tanh(g_t)
                o_t = torch.sigmoid(o_t)
                ci = g_t * i_t + f_t * ci
                x = ci * o_t
                hn.append(x)
                cn.append(ci)
            h0 = torch.stack(hn)
            output.append(x)
        out = torch.stack(output)
        return out,h0,c0

        
lstm = SelfLSTM(10, 20, 2)
input = torch.randn(5, 3, 10)
h0 = torch.randn(2, 3, 20)
output, hn,C = lstm(input, h0)
print(output.shape,hn.shape,C.shape)


torch.Size([5, 3, 20]) torch.Size([2, 3, 20]) torch.Size([5, 3, 20])


In [95]:
class SelfMultiHeadSelfAttention(nn.Module):
    def __init__(self,embedding_dim,n_head):
        super().__init__()
        self.embedding_dim = embedding_dim
        self.n_head = n_head
        self.head_dim = embedding_dim // n_head
        assert embedding_dim % n_head == 0
        self.attenion = nn.Linear(embedding_dim,embedding_dim*3)
        self.f_out = nn.Linear(embedding_dim,embedding_dim)
    def forward(self,in_features):
        batch_size,seq_len,embedding_dim = in_features.shape
        k,q,v = self.attenion(in_features).split(self.embedding_dim,-1)
        k = k.view(batch_size,seq_len,self.n_head,self.head_dim).transpose(1,2)
        q = q.view(batch_size,seq_len,self.n_head,self.head_dim).transpose(1,2)
        v = v.view(batch_size,seq_len,self.n_head,self.head_dim).transpose(1,2)
        score =  torch.sigmoid(torch.matmul(q,k.transpose(-1,-2)) / math.sqrt(embedding_dim))
        output = score @ v
        output = output.transpose(1,2).contiguous().view(batch_size,seq_len,embedding_dim)
        output = self.f_out(output)
        return output

# Line Test

net = SelfMultiHeadSelfAttention(40,5)
# loss = nn.modules.MSELoss()
# optimizer = torch.optim.SGD(net.parameters())
in_features = torch.ones(40,4,40)
out_feature= net(in_features)
print(out_feature.shape)
# l = loss(in_features,out_feature)
# l.backward()
# optimizer.step()
# optimizer.zero_grad()






torch.Size([40, 4, 40])


In [96]:
class SelfMultiHeadAttention(nn.Module):
    def __init__(self,embedding_dim,n_head):
        super().__init__()
        self.embedding_dim = embedding_dim
        self.n_head = n_head
        self.head_dim = embedding_dim // n_head
        assert embedding_dim % n_head == 0
        self.query_linear = nn.Linear(embedding_dim,embedding_dim)
        self.key_linear = nn.Linear(embedding_dim,embedding_dim)
        self.value_linear = nn.Linear(embedding_dim,embedding_dim)
        self.f_out = nn.Linear(embedding_dim,embedding_dim)
    def forward(self,q,k,v,mask = None):
        # in_features = torch.cat((q,k,v),dim = -1)
        batch_size,seq_len,embedding_dim = q.shape
        # print(batch_size,seq_len,embedding_dim)
        q = self.query_linear(q)
        k = self.key_linear(k)
        v = self.key_linear(v)
        k = k.view(batch_size,seq_len,self.n_head,self.head_dim).transpose(1,2)
        q = q.view(batch_size,seq_len,self.n_head,self.head_dim).transpose(1,2)
        v = v.view(batch_size,seq_len,self.n_head,self.head_dim).transpose(1,2)
        score =  torch.sigmoid(torch.matmul(q,k.transpose(-1,-2)) / math.sqrt(self.embedding_dim))
        if mask != None:
            score = score.masked_fill(mask == 1,1e-9)
        output = score @ v
        output = output.transpose(1,2).contiguous().view(batch_size,seq_len,embedding_dim)
        output = self.f_out(output)
        return output

# Line Test

net = SelfMultiHeadAttention(40,5)
# loss = nn.modules.MSELoss()
# optimizer = torch.optim.SGD(net.parameters())
in_features = torch.ones(40,4,40)
out_feature= net(in_features,in_features,in_features)
print(out_feature.shape)
# l = loss(in_features,out_feature)
# l.backward()
# optimizer.step()
# optimizer.zero_grad()






torch.Size([40, 4, 40])


In [97]:
class SelfPositionalEncoding(nn.Module):
    def __init__(self,embedding_dim,max_length = 5000):
        super().__init__()
        self.pe = torch.zeros(max_length,embedding_dim)
        pos = torch.arange(0,max_length).unsqueeze(1)
        div_item = torch.exp(torch.arange(0,embedding_dim,2).float() *(- math.log(1000)) / (embedding_dim) )

        self.pe[:,0::2] = torch.sin(pos*div_item)
        self.pe[:,1::2] = torch.cos(pos*div_item)
    def forward(self,in_features):
        batch_size,seq_len,_ = in_features.shape
        in_features = in_features + self.pe[0:seq_len]
        return in_features
    
net = SelfPositionalEncoding(500)

in_features = torch.rand(4,20,500)

out_features = net(in_features)


In [98]:
class SelfFeedForward(nn.Module):
    def __init__(self,d_model,d_ffn):
        super().__init__()
        self.ffn = nn.Sequential(
            nn.Linear(d_model,d_ffn),
            nn.ReLU(),
            nn.Linear(d_ffn,d_model)
        )
    def forward(self,x):
        return self.ffn(x)

In [99]:
class SelfEncoderLayer(nn.Module):
    def __init__(self,d_model,d_ff,n_head,dropout):
        super().__init__()
        self.attention = SelfMultiHeadAttention(d_model,n_head)
        self.ffn = SelfFeedForward(d_model,d_ff)
        self.norm1 = nn.LayerNorm(d_model)
        self.norm2 = nn.LayerNorm(d_model)
        self.dropout = nn.Dropout(dropout)
    def forward(self,x):
        attention_out = self.attention(x,x,x)
        x = self.norm1(x + self.dropout(attention_out))
        ffn_out = self.ffn(x)
        x = self.norm1(x + self.dropout(ffn_out))
        return x

in_features = torch.rand(30,5,80)
layer = SelfEncoderLayer(80,200,20,0.1)
out_features = layer(in_features)

In [100]:
class SelfDecoderLayer(nn.Module):
    def __init__(self,d_model,d_ff,n_head,dropout):
        super().__init__()
        self.self_attention = SelfMultiHeadAttention(d_model,n_head)
        self.cross_attention = SelfMultiHeadAttention(d_model,n_head)
        self.ff = SelfFeedForward(d_model,d_ff)
        self.dropout = nn.Dropout(dropout)
        self.norm1 = nn.LayerNorm(d_model)
        self.norm2 = nn.LayerNorm(d_model)
        self.norm3 = nn.LayerNorm(d_model)
    def forward(self,x,encode_out,src_mask = None,target_mask = None):
        attention1_out = self.self_attention(x,x,x,target_mask)
        x = self.norm1(x + self.dropout(attention1_out))

        attention2_out = self.cross_attention(x,encode_out,encode_out,src_mask)
        x = self.norm2(x + self.dropout(attention2_out))

        ff_out = self.ff(x)
        x = self.norm3(x + self.dropout(ff_out))
        return x


encoder_out = torch.rand(20,5,80)
x = torch.rand(20,5,80)

decoder = SelfDecoderLayer(80,100,8,0.1)
src_mask = torch.triu(torch.ones(5,5))

out = decoder(x,encoder_out,src_mask,src_mask)

In [104]:
class SelfTransformer(nn.Module):
    def __init__(self,src_vocab_size,tgt_vocab_size,d_model,d_ff,n_head,num_encode_layers,num_decode_layers,dropout):
        super().__init__()
        self.src_embedding = nn.Embedding(src_vocab_size,d_model)
        self.tgt_embedding = nn.Embedding(tgt_vocab_size,d_model)

        self.pe = SelfPositionalEncoding(d_model)

        self.encoder = nn.ModuleList()
        for _ in range(num_encode_layers):
            self.encoder.append(SelfEncoderLayer(d_model,d_ff,n_head,dropout))
        self.decoder = nn.ModuleList()
        for _ in range(num_decode_layers):
            self.decoder.append(SelfDecoderLayer(d_model,d_ff,n_head,dropout))
        
        self.linear = nn.Linear(d_model,tgt_vocab_size)
        self.dropout = nn.Dropout(dropout)
    def forward(self,src,tgt,src_mask = None,tgt_mask = None):
        src_embedding = self.src_embedding(src)
        src_embedding_pe = self.pe(src_embedding)

        tgt_embedding = self.tgt_embedding(tgt)
        tgt_embedding_pe = self.pe(tgt_embedding)

        encoder_out = src_embedding_pe
        for encode_layer in self.encoder:
            encoder_out = encode_layer(encoder_out)

        decode_out = tgt_embedding_pe
        for decoder_layer in self.decoder:
            decode_out = decoder_layer(encoder_out,decode_out,src_mask,tgt_mask)


        out = torch.softmax(self.linear(decode_out),dim = -1)
        return out
    

src_vocab_size = 500
tgt_vocab_size = 500
d_model = 200
d_ff = 250 
n_head = 4 
num_encode_layers = 6
num_decode_layers = 6
dropout = 0.1

batch_size = 50
seq_len = 10
src = torch.randint(0,src_vocab_size,(batch_size,seq_len))
tgt = torch.randint(0,src_vocab_size,(batch_size,seq_len))
net = SelfTransformer(src_vocab_size,tgt_vocab_size,d_model,d_ff,n_head,num_encode_layers,num_decode_layers,dropout)

tgt_mask = torch.triu(torch.ones(seq_len,seq_len),diagonal=1)

net(src,tgt,tgt_mask= tgt_mask)

tensor([[[0.0013, 0.0042, 0.0042,  ..., 0.0022, 0.0041, 0.0007],
         [0.0007, 0.0034, 0.0030,  ..., 0.0038, 0.0039, 0.0007],
         [0.0008, 0.0028, 0.0036,  ..., 0.0037, 0.0037, 0.0008],
         ...,
         [0.0009, 0.0031, 0.0033,  ..., 0.0031, 0.0038, 0.0008],
         [0.0006, 0.0022, 0.0032,  ..., 0.0032, 0.0034, 0.0007],
         [0.0008, 0.0024, 0.0031,  ..., 0.0035, 0.0035, 0.0006]],

        [[0.0012, 0.0013, 0.0024,  ..., 0.0013, 0.0017, 0.0010],
         [0.0009, 0.0012, 0.0032,  ..., 0.0013, 0.0017, 0.0008],
         [0.0011, 0.0019, 0.0016,  ..., 0.0014, 0.0014, 0.0010],
         ...,
         [0.0007, 0.0018, 0.0016,  ..., 0.0019, 0.0017, 0.0009],
         [0.0008, 0.0015, 0.0015,  ..., 0.0017, 0.0018, 0.0008],
         [0.0010, 0.0012, 0.0010,  ..., 0.0026, 0.0014, 0.0007]],

        [[0.0008, 0.0037, 0.0023,  ..., 0.0016, 0.0018, 0.0016],
         [0.0010, 0.0036, 0.0025,  ..., 0.0014, 0.0020, 0.0012],
         [0.0011, 0.0032, 0.0018,  ..., 0.0016, 0.0016, 0.