In [334]:
from torch import nn
import torch

def replicate(layer,N):
    return nn.ModuleList([layer for _ in range(N)])

class Encode(nn.Module):

    def __init__(self,layer,N):
        super(Encode, self).__init__()
        self.layer=replicate(layer,N)
        self.N=N

    def forward(self,X,mask):
        for layer in self.layer:
            X=layer(X,mask)
        return X

In [291]:
class LayerNorm(nn.Module):

    def __init__(self,features,eps=1e-16):
        super(LayerNorm, self).__init__()
        self.a_2=nn.Parameter(torch.ones(features))
        self.b_2=nn.Parameter(torch.zeros(features))
        self.eps=eps

    def forward(self, x):
        mean = x.mean(-1, keepdim=True)
        std = x.std(-1, keepdim=True)
        return self.a_2 * (x - mean) / (std + self.eps) + self.b_2


In [292]:
class AddNorm(nn.Module):
    def __init__(self, size, dropout_rate, eps=1e-6):
        super(AddNorm, self).__init__()
        
        # Layer normalization component
        self.norm = LayerNorm(size, eps=eps)
        
        # Dropout
        self.dropout = nn.Dropout(dropout_rate)

    def forward(self, x, sublayer_output):
        "Apply residual connection followed by layer normalization"
        # Residual connection
        added_output = x + self.dropout(sublayer_output)
        
        # Layer normalization
        return self.norm(added_output)

In [293]:
class FeedForward(nn.Module):
    def __init__(self, d_model, d_ff, dropout=0.1):
        """
        d_model: the number of expected features in the input (required).
        d_ff: the number of features of the feedforward network model.
        dropout: the dropout value (default=0.1).
        """
        super(FeedForward, self).__init__()
        
        # Two linear layers with a ReLU activation in between
        self.w_1 = nn.Linear(d_model, d_ff)
        self.w_2 = nn.Linear(d_ff, d_model)
        self.dropout = nn.Dropout(dropout)

    def forward(self, x):
        return self.w_2(self.dropout(F.relu(self.w_1(x))))

In [294]:
def subsequent_mask(size):
    "Mask out subsequent positions."
    attn_shape = (1, size, size)
    subsequent_mask = torch.triu(torch.ones(attn_shape), diagonal=1).type(
        torch.uint8
    )
    return subsequent_mask == 0

In [295]:
import torch.nn.functional as F
def attention(query,Q,K,V):
    #K transpose might cause a trpoouble if we have multi dimntioanmatrix
    scores=torch.matmul(Q,K.T)/math.sqrt(d_k)
    p_atten=scores.softmax(scores, dim=2)
    return torch.matmul(p_atten,V),p_atten

In [296]:
class MultiHeadAttention(nn.Module):
    def __init__(self, d_model, num_heads):
        super(MultiHeadAttention, self).__init__()
        assert d_model % num_heads == 0

        self.d_k = d_model // num_heads
        self.num_heads = num_heads

        self.query = nn.Linear(d_model, d_model)
        self.key = nn.Linear(d_model, d_model)
        self.value = nn.Linear(d_model, d_model)
        
        self.fc_out = nn.Linear(d_model, d_model)

    def forward(self, query, key,value,mask=None):
        batch_size = query.size(0)

        # Linear layers
        Q = self.query(query)
        K = self.key(key)
        V = self.value(value)

        # Split into multiple heads
        Q = Q.view(batch_size, -1, self.num_heads, self.d_k).permute(0, 2, 1, 3)
        K = K.view(batch_size, -1, self.num_heads, self.d_k).permute(0, 2, 1, 3)
        V = V.view(batch_size, -1, self.num_heads, self.d_k).permute(0, 2, 1, 3)

        # Scaled dot-product attention
        attention_scores = torch.matmul(Q, K.permute(0, 1, 3, 2)) / self.d_k**0.5
        if mask is not None:
            attention_scores = attention_scores.masked_fill(mask == 0, -1e9)
        attention_probs = F.softmax(attention_scores, dim=-1)
        attention_output = torch.matmul(attention_probs, V)

        # Concatenate heads and pass through final linear layer
        attention_output = attention_output.permute(0, 2, 1, 3).contiguous().view(batch_size, -1, self.d_k * self.num_heads)
        output = self.fc_out(attention_output)

        return output

In [312]:
class Encoder(nn.Module):
    def __init__(self, size, FeedForward, Multi_Head_Attention,AddNorm):
        super(Encoder, self).__init__()
        self.self_attn = Multi_Head_Attention
        self.feed_forward = FeedForward
        self.AddNorm=AddNorm
        self.size = size

    def forward(self,x,mask):
        x=self.AddNorm(x, self.self_attn(x,x,x,mask))
        x=self.AddNorm(x, self.feed_forward(x))
        return x      

In [284]:
# src=torch.LongTensor([0,1,2,3,4,5,6,7,8,9])
# tgt=torch.LongTensor([0,1,2,3,4,5,6,7,8,9])

# model=Encoder_Decoder(
#         Encode(Encoder(512,FeedForward(512, 2048, dropout=0.1), MultiHeadAttention(512, 8),AddNorm(512, 0.1, eps=1e-6)),6),
#         Decode(Decoder(512, FeedForward(512, 2048, dropout=0.1), MultiHeadAttention(512, 8),MultiHeadAttention(512,8)),6),
#         nn.Sequential(Embedding(512,10),Positional_Encoding(512)),
#         nn.Sequential(Embedding(512,10),Positional_Encoding(512)),
#         Generator(512,10)
#     )
# A=model.src_embed(src)
# B=model.Encode(A)

In [262]:
# B=MultiHeadAttention(512, 8)
# B(A,A,A).shape


torch.Size([10, 1, 512])

In [350]:
#this might be a replicate
class Decode(nn.Module):

    def __init__(self,layer,N):
        super(Decode, self).__init__()
        self.layer=replicate(layer,N)
        self.N=N

    def forward(self,X,y,src_mask,tgt_mask):
        for layer in self.layer:
            X=layer(X,y,src_mask,tgt_mask)
        return X

In [351]:
class Decoder(nn.Module):
    def __init__(self, size, FeedForward, Self_Multi_Head_Attention,Encoder_Multi_Head_Attention):
        super(Decoder, self).__init__()
        self.self_attn = Self_Multi_Head_Attention
        self.feed_forward = FeedForward
        self.encoder_attention=Encoder_Multi_Head_Attention
        self.size = size

    def forward(self,x,m,src_mask,tgt_mask):
        x=self.AddNorm(x, self.self_attn(x,x,x,tgt_mask))
        x=self.AddNorm(x, self.encoder_attention(x,m,m,src_mask))
        x=self.AddNorm(x, FeedForward(x))
        return x   

In [300]:
class Embedding(nn.Module):
    def __init__(self, d_model,vocab):
        super(Embedding, self).__init__()
        self.embedding=nn.Embedding(vocab,d_model)

    def forward(self,x):
        return self.embedding(x)   

In [301]:
class Positional_Encoding(nn.Module):
    def __init__(self, d_model):
        super(Positional_Encoding, self).__init__()
        self.dmodel=d_model
        
    def forward(self,x):
        n=x.shape[0]
        div_term = torch.exp(torch.arange(0., self.dmodel, 2) * -(math.log(10000.0) / self.dmodel))

        positions = torch.arange(n).unsqueeze(1).float()
        div_term = div_term.unsqueeze(0)
        sin_vals = torch.sin(positions * div_term)
        cos_vals = torch.cos(positions * div_term)

        ZZ = torch.empty(n, self.dmodel)
        ZZ[:, 0::2] = sin_vals
        ZZ[:, 1::2] = cos_vals
        return x+ZZ

In [174]:
# def Positional_Encoding(Embeded):
#     n=Embeded.shape[0]
#     dmodel=Embeded.shape[1]
#     div_term = torch.exp(torch.arange(0., dmodel, 2) * -(math.log(10000.0) / dmodel))

#     # Expand dimensions for broadcasting
#     positions = torch.arange(n).unsqueeze(1).float()
#     div_term = div_term.unsqueeze(0)

#     # Calculate sin and cos values
#     sin_vals = torch.sin(positions * div_term)
#     cos_vals = torch.cos(positions * div_term)

#     # Interleave sin and cos values
#     ZZ = torch.empty(n, dmodel)
#     ZZ[:, 0::2] = sin_vals
#     ZZ[:, 1::2] = cos_vals

In [341]:
class Encoder_Decoder(nn.Module):
    def __init__(self, Encode,Decode,src_embed, tgt_embed, generator):
        super(Encoder_Decoder, self).__init__()
        self.Encode=Encode
        self.Decode=Decode
        self.src_embed=src_embed
        self.tgt_embed=tgt_embed
        self.generator=generator

    def forward(self, src, tgt, src_mask, tgt_mask):
        "Take in and process masked src and target sequences."
        return self.decode(self.encode(src, src_mask), src_mask, tgt, tgt_mask)

    def encode(self, src, src_mask):
        return self.Encode(self.src_embed(src), src_mask)

    def decode(self, memory, src_mask, tgt, tgt_mask):
        return self.Decode(self.tgt_embed(tgt), memory, src_mask, tgt_mask)

In [303]:
class Generator(nn.Module):
    def __init__(self,d_model,tgt_vocab):
        super(Generator, self).__init__()
        self.linear=nn.Linear(d_model,tgt_vocab)

    def forward(sefl,x):
        x=self.linear(x)
        return log_softmax(self.proj(x), dim=-1)

In [352]:
def make_model(src_vocab,tgt_vocab,N=6,d_model=512,h=8,dropout=0.1,d_ff=2048):
    
    model=Encoder_Decoder(
        Encode(Encoder(d_model,FeedForward(d_model, d_ff, dropout=dropout), MultiHeadAttention(d_model, h),AddNorm(d_model,dropout, eps=1e-6)),N),
        Decode(Decoder(d_model, FeedForward(d_model, d_ff, dropout=dropout), MultiHeadAttention(d_model, h),MultiHeadAttention(d_model,h)),N),
        nn.Sequential(Embedding(d_model,src_vocab),Positional_Encoding(d_model)),
        nn.Sequential(Embedding(d_model,tgt_vocab),Positional_Encoding(d_model)),
        Generator(d_model,tgt_vocab)
    )
    for p in model.parameters():
        if p.dim()>1:
            nn.init.xavier_uniform_(p)
    return model

In [353]:
test_model = make_model(11, 11, 2)
test_model.eval()
src = torch.LongTensor([[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]])
src_mask = torch.ones(1, 1, 10)
memory = test_model.encode(src, src_mask)
ys = torch.zeros(1, 1).type_as(src)


for i in range(9):
    out = test_model.decode(
        memory, src_mask, embed_ys, subsequent_mask(ys.size(1)).type_as(src.data)
        )
    prob = test_model.generator(out[:, -1])
    _, next_word = torch.max(prob, dim=1)
    next_word = next_word.data[0]
    ys = torch.cat(
        [ys, torch.empty(1, 1).type_as(src.data).fill_(next_word)], dim=1
    )

print("Example Untrained Model Prediction:", ys)



RuntimeError: Expected tensor for argument #1 'indices' to have one of the following scalar types: Long, Int; but got torch.FloatTensor instead (while checking arguments for embedding)

In [354]:
from torch import nn
import torch
import math 
import torch.nn.functional as F

def replicate(layer,N):
    return nn.ModuleList([layer for _ in range(N)])

class Encode(nn.Module):

    def __init__(self,layer,N):
        super(Encode, self).__init__()
        self.layer=replicate(layer,N)
        self.N=N

    def forward(self,X,mask):
        for layer in self.layer:
            X=layer(X,mask)
        return X
    
class LayerNorm(nn.Module):

    def __init__(self,features,eps=1e-16):
        super(LayerNorm, self).__init__()
        self.a_2=nn.Parameter(torch.ones(features))
        self.b_2=nn.Parameter(torch.zeros(features))
        self.eps=eps

    def forward(self, x):
        mean = x.mean(-1, keepdim=True)
        std = x.std(-1, keepdim=True)
        return self.a_2 * (x - mean) / (std + self.eps) + self.b_2
    
class AddNorm(nn.Module):
    def __init__(self, size, dropout_rate, eps=1e-6):
        super(AddNorm, self).__init__()
        
        # Layer normalization component
        self.norm = LayerNorm(size, eps=eps)
        
        # Dropout
        self.dropout = nn.Dropout(dropout_rate)

    def forward(self, x, sublayer_output):
        "Apply residual connection followed by layer normalization"
        # Residual connection
        added_output = x + self.dropout(sublayer_output)
        
        # Layer normalization
        return self.norm(added_output)
    
class FeedForward(nn.Module):
    def __init__(self, d_model, d_ff, dropout=0.1):
        """
        d_model: the number of expected features in the input (required).
        d_ff: the number of features of the feedforward network model.
        dropout: the dropout value (default=0.1).
        """
        super(FeedForward, self).__init__()
        
        # Two linear layers with a ReLU activation in between
        self.w_1 = nn.Linear(d_model, d_ff)
        self.w_2 = nn.Linear(d_ff, d_model)
        self.dropout = nn.Dropout(dropout)

    def forward(self, x):
        return self.w_2(self.dropout(F.relu(self.w_1(x))))
    
def subsequent_mask(size):
    "Mask out subsequent positions."
    attn_shape = (1, size, size)
    subsequent_mask = torch.triu(torch.ones(attn_shape), diagonal=1).type(
        torch.uint8
    )
    return subsequent_mask == 0

class MultiHeadAttention(nn.Module):
    def __init__(self, d_model, num_heads):
        super(MultiHeadAttention, self).__init__()
        assert d_model % num_heads == 0

        self.d_k = d_model // num_heads
        self.num_heads = num_heads

        self.query = nn.Linear(d_model, d_model)
        self.key = nn.Linear(d_model, d_model)
        self.value = nn.Linear(d_model, d_model)
        
        self.fc_out = nn.Linear(d_model, d_model)

    def forward(self, query, key,value,mask=None):
        batch_size = query.size(0)

        # Linear layers
        Q = self.query(query)
        K = self.key(key)
        V = self.value(value)

        # Split into multiple heads
        Q = Q.view(batch_size, -1, self.num_heads, self.d_k).permute(0, 2, 1, 3)
        K = K.view(batch_size, -1, self.num_heads, self.d_k).permute(0, 2, 1, 3)
        V = V.view(batch_size, -1, self.num_heads, self.d_k).permute(0, 2, 1, 3)

        # Scaled dot-product attention
        attention_scores = torch.matmul(Q, K.permute(0, 1, 3, 2)) / self.d_k**0.5
        if mask is not None:
            attention_scores = attention_scores.masked_fill(mask == 0, -1e9)
        attention_probs = F.softmax(attention_scores, dim=-1)
        attention_output = torch.matmul(attention_probs, V)

        # Concatenate heads and pass through final linear layer
        attention_output = attention_output.permute(0, 2, 1, 3).contiguous().view(batch_size, -1, self.d_k * self.num_heads)
        output = self.fc_out(attention_output)

        return output
    
class Encoder(nn.Module):
    def __init__(self, size, FeedForward, Multi_Head_Attention,AddNorm):
        super(Encoder, self).__init__()
        self.self_attn = Multi_Head_Attention
        self.feed_forward = FeedForward
        self.AddNorm=AddNorm
        self.size = size

    def forward(self,x,mask):
        x=self.AddNorm(x, self.self_attn(x,x,x,mask))
        x=self.AddNorm(x, self.feed_forward(x))
        return x  
    
class Embedding(nn.Module):
    def __init__(self, d_model,vocab):
        super(Embedding, self).__init__()
        self.embedding=nn.Embedding(vocab,d_model)

    def forward(self,x):
        return self.embedding(x)   
    
class Positional_Encoding(nn.Module):
    def __init__(self, d_model):
        super(Positional_Encoding, self).__init__()
        self.dmodel=d_model
        
    def forward(self,x):
        n=x.shape[0]
        div_term = torch.exp(torch.arange(0., self.dmodel, 2) * -(math.log(10000.0) / self.dmodel))

        positions = torch.arange(n).unsqueeze(1).float()
        div_term = div_term.unsqueeze(0)
        sin_vals = torch.sin(positions * div_term)
        cos_vals = torch.cos(positions * div_term)

        ZZ = torch.empty(n, self.dmodel)
        ZZ[:, 0::2] = sin_vals
        ZZ[:, 1::2] = cos_vals
        return x+ZZ
    
def make_model(src_vocab,tgt_vocab,N=6,d_model=512,h=8,dropout=0.1,d_ff=2048):
    
    model=Encoder_Decoder(
        Encode(Encoder(d_model,FeedForward(d_model, d_ff, dropout=dropout), MultiHeadAttention(d_model, h),AddNorm(d_model,dropout, eps=1e-6)),N),
        Decode(Decoder(d_model, FeedForward(d_model, d_ff, dropout=dropout), MultiHeadAttention(d_model, h),MultiHeadAttention(d_model,h)),N),
        nn.Sequential(Embedding(d_model,src_vocab),Positional_Encoding(d_model)),
        nn.Sequential(Embedding(d_model,tgt_vocab),Positional_Encoding(d_model)),
        Generator(d_model,tgt_vocab)
    )
    for p in model.parameters():
        if p.dim()>1:
            nn.init.xavier_uniform_(p)
    return model

class Generator(nn.Module):
    def __init__(self,d_model,tgt_vocab):
        super(Generator, self).__init__()
        self.linear=nn.Linear(d_model,tgt_vocab)

    def forward(self,x):
        x=self.linear(x)
        return F.log_softmax(x, dim=-1)
    

class Encoder_Decoder(nn.Module):
    def __init__(self, Encode,Decode,src_embed, tgt_embed, generator):
        super(Encoder_Decoder, self).__init__()
        self.Encode=Encode
        self.Decode=Decode
        self.src_embed=src_embed
        self.tgt_embed=tgt_embed
        self.generator=generator

    def forward(self, src, tgt, src_mask, tgt_mask):
        "Take in and process masked src and target sequences."
        return self.decode(self.encode(src, src_mask), src_mask, tgt, tgt_mask)

    def encode(self, src, src_mask):
        return self.Encode(self.src_embed(src), src_mask)

    def decode(self, memory, src_mask, tgt, tgt_mask):
        return self.Decode(self.tgt_embed(tgt), memory, src_mask, tgt_mask)
    
class Decode(nn.Module):

    def __init__(self,layer,N):
        super(Decode, self).__init__()
        self.layer=replicate(layer,N)
        self.N=N

    def forward(self,X,y,src_mask,tgt_mask):
        for layer in self.layer:
            X=layer(X,y,src_mask,tgt_mask)
        return X
    
class Decoder(nn.Module):
    def __init__(self, size, FeedForward, Self_Multi_Head_Attention,Encoder_Multi_Head_Attention,AddNorm):
        super(Decoder, self).__init__()
        self.self_attn = Self_Multi_Head_Attention
        self.feed_forward = FeedForward
        self.encoder_attention=Encoder_Multi_Head_Attention
        self.AddNorm=AddNorm
        self.size = size

    def forward(self,x,m,src_mask,tgt_mask):
        x=self.AddNorm(x, self.self_attn(x,x,x,tgt_mask))
        x=self.AddNorm(x, self.encoder_attention(x,m,m,src_mask))
        x=self.AddNorm(x, self.feed_forward(x))
        return x   
    
def make_model(src_vocab,tgt_vocab,N=6,d_model=512,h=8,dropout=0.1,d_ff=2048):
    
    model=Encoder_Decoder(
        Encode(Encoder(d_model,FeedForward(d_model, d_ff, dropout=dropout), MultiHeadAttention(d_model, h),AddNorm(d_model,dropout, eps=1e-6)),N),
        Decode(Decoder(d_model, FeedForward(d_model, d_ff, dropout=dropout), MultiHeadAttention(d_model, h),MultiHeadAttention(d_model,h),AddNorm(d_model,dropout, eps=1e-6)),N),
        nn.Sequential(Embedding(d_model,src_vocab),Positional_Encoding(d_model)),
        nn.Sequential(Embedding(d_model,tgt_vocab),Positional_Encoding(d_model)),
        Generator(d_model,tgt_vocab)
    )
    for p in model.parameters():
        if p.dim()>1:
            nn.init.xavier_uniform_(p)
    return model

In [364]:
for i in range(50):
    data=torch.randint(1,11,size=(50,10))
    data[:,0]=1
    src=data.requires_grad_(False).clone().detach()
    tgt=data.requires_grad_(False).clone().detach()

In [374]:
import torch
import torch.nn as nn

# Define the loss function with reduction set to 'sum'
loss_function = nn.KLDivLoss(reduction='sum')

# Define two probability distributions (as PyTorch tensors)
# Input (log probabilities)
input = torch.tensor([[0.2, 0.7, 0.1], [0.9, 0.05, 0.05]], dtype=torch.float).log()
# Target (probabilities)
target = torch.tensor([[0.1, 0.8, 0.1], [0.5, 0.2, 0.3]], dtype=torch.float)

# Compute the loss
loss = loss_function(input, target)
print(loss)

tensor(0.5584)


In [None]:
size=5
padding_idx=0
smoothing=0.4
x=torch.tensor([1,4,5,6,9,10])
predict=torch.FloatTensor([[0,0.2,0.7,0.1,0],
                           [0,0.2,0.7,0.1,0],
                           [0,0.2,0.7,0.1,0],
                           [0,0.2,0.7,0.1,0],
                           [0,0.2,0.7,0.1,0],
                           [0,0.2,0.7,0.1,0]])

true_dist = x.data.clone()
true_dist.fill_(smoothing / (size - 2))
true_dist.scatter_(1, target.data.unsqueeze(1), self.confidence)
true_dist[:, self.padding_idx] = 0
mask = torch.nonzero(target.data == self.padding_idx)
if mask.dim() > 0:
    true_dist.index_fill_(0, mask.squeeze(), 0.0)
true_dist = true_dist
return loss_function(x, true_dist.clone().detach())