In [1]:
import torch
import torch.nn as nn
import torch.nn.functional as F

import torchvision.transforms as transforms
from torchvision.datasets import CIFAR100
from torch.utils.data import DataLoader

import numpy as np

In [2]:
# data loader
path = './datasets/'

transform = transforms.Compose([transforms.ToTensor()])

train_data = CIFAR100(root=path,train=True,transform=transform,download=True)
test_data = CIFAR100(root=path,train=False,transform=transform,download=True)

batch_size = 100

train_loader = DataLoader(dataset=train_data,batch_size=batch_size,shuffle=True,num_workers=0)
test_loader = DataLoader(dataset=test_data,batch_size=batch_size,shuffle=False,num_workers=0)

input_shape = train_data[0][0].shape
output_shape = len(train_data.classes)
print()




# Blocks

In [25]:
class PositionalEncoding(nn.Module):
    def __init__(self, device, max_len=512, d_model=16):
        super().__init__()
        # fill out here
        # how should we fill out self.pos_enc?
        self.device = device
        self.max_len = max_len
        self.d_model = d_model

        self.pos_enc = torch.zeros(max_len,d_model,requires_grad=False) # [max_len, d_model]
        pos = torch.arange(1, max_len+1, 1, device=device, requires_grad=False).reshape(-1, 1) # [max_len, 1]
        i = torch.arange(1, d_model // 2 + 1, 1, device=device, requires_grad=False) # [d_model/2]
        pos_value = 10000 ** (-2 * i / d_model) # [d_model/2]
        self.pos_enc[:, 0::2] = torch.cos(pos * pos_value)
        self.pos_enc[:, 1::2] = torch.sin(pos * pos_value)
        

    def forward(self, x):
        # fill out here
        """
        x: transformed input embedding where x.shape = [batch_size, seq_len, data_dim]
        """
        return x + self.pos_enc[:x.shape[1], :].unsqueeze(0) # 배치 차원 고려

In [26]:
class ScaledDotProductAttention(nn.Module):
# refer to Section 3.2.1 and Fig 2 (left) in the paper

    def __init__(self, d_ff):
        super().__init__()
        # there is nothing to do here
        self.d_ff = d_ff

    def forward(self, q, k, v, mask=False):
        # fill out here
        # compute attention value based on transformed query, key, value where mask is given conditionally
        """
        q, k, v = transformed query, key, value
        q.shape, k.shape, v.shpae = [batch_size, num_head, seq_len, d_ff=d_model/num_head]
        mask = masking matrix, if the index has value False, kill the value; else, leave the value
        """
        # attention 계산
        scores = torch.matmul(q, k.transpose(-2, -1))/(self.d_ff ** 0.5) # [batch_size, num_head, seq_len, seq_len]

        # 마스킹 추가
        if mask:
            mask_array = torch.tensor(np.triu(np.full(scores.shape, fill_value= -np.inf), k= 1), dtype=torch.float32)
            scores = scores + mask_array
        
        weights = F.softmax(scores) # [batch_size, num_head, seq_len, seq_len]
        attention_value = torch.matmul(weights, v) # [batch_size, num_head, seq_len, d_ff]

        return attention_value

In [27]:
class MultiHeadAttention(nn.Module):

    def __init__(self, d_model=16, num_head=4):
        super().__init__()

        assert d_model % num_head == 0, "check if d_model is divisible by num_head"

        # dimensions
        self.d_model = d_model
        self.num_head = num_head
        self.d_ff = d_model//num_head

        self.w_q = nn.Linear(d_model, d_model)
        self.w_k = nn.Linear(d_model, d_model)
        self.w_v = nn.Linear(d_model, d_model)
        
        self.attention = ScaledDotProductAttention(self.d_ff)
        self.w_o = nn.Linear(self.d_model, self.d_model)
        

    def forward(self, dec, mask=False, cross=None):
        # fill out here
        # compute multi-head attention value
        # here, query, key, value are pre-transformed, so you need to transfrom them in this module
        """
        q, k, v = pre-transformed query, key, value
        q.shape, k.shape, v.shape = [batch_size, seq_len, d_model]
        mask = masking matrix, if the index has value False, kill the value; else, leave the value
        """
        q = self.w_q(cross) if cross!=None else self.w_q(dec)
        k = self.w_k(dec)
        v = self.w_v(dec)

        batch_size = q.shape[0]

        # 헤드 분할 (dim: [batch_size, num_head, seq_len, d_ff])
        q = q.view(batch_size, -1, self.num_head, self.d_ff).transpose(1, 2)
        k = k.view(batch_size, -1, self.num_head, self.d_ff).transpose(1, 2)
        v = v.view(batch_size, -1, self.num_head, self.d_ff).transpose(1, 2)

        # attention 계산
        attention_value = self.attention(q, k, v, mask) # [batch_size, num_head, seq_len, d_ff]
        
        result = attention_value.transpose(1, 2) # [batch_size, seq_len, num_head, d_ff]
        output = self.w_o(result.contiguous().view(batch_size, -1, self.d_model)) # [batch_size, seq_len, d_model]
        
        return output

In [28]:
class PositionwiseFeedForwardNetwork(nn.Module):
# refer to Section 3.3 in the paper
# do not use torch.nn.Conv1d

    def __init__(self, d_model=16, d_ff=32):
        super().__init__()
        # fill out here
        self.linear_1 = nn.Linear(d_model, d_ff)
        self.linear_2 = nn.Linear(d_ff, d_model)


    def forward(self,x):
        # fill out here
        # x: [batch_size, seq_len, d_model]
        x = F.relu(self.linear_1(x)) # [batch_size, seq_len, d_ff]
        output = self.linear_2(x) # [batch_size, seq_len, d_model]

        return output

In [29]:
class LayerNormalization(nn.Module):
# do not use torch.nn.LayerNorm

    def __init__(self, d_model=16, eps=1e-5):
        super().__init__()
        # fill out here
        
        self.linear = nn.Linear(d_model, d_model)
        self.eps = eps


    def forward(self,x):
        # fill out here
        # x: [batch_size, seq_len, d_model]
        mean = torch.mean(x, dim=-1, keepdim= True)
        var = torch.var(x, dim=-1, unbiased= False, keepdim= True)
        
        normed = (x - mean)/torch.sqrt(var + self.eps)
        normed = self.linear(normed)

        return normed

# Encoder, Decoder Layer

In [30]:
class EncoderLayer(nn.Module):
# refer to Section 3.1 and Figure 1 in the paper
# this is a single encoder block consists of the following
# multi-head attention, positionwise feed forward network, residual connections, layer normalizations 

    def __init__(self, d_model=16, num_head=4, d_ff=32, drop_prob=.1):
        super().__init__()
        # fill out here
        self.attention = MultiHeadAttention(d_model, num_head)
        self.ffn = PositionwiseFeedForwardNetwork()
        
        self.layer_norm1 = LayerNormalization()
        self.layer_norm2 = LayerNormalization()

        # dropout


    def forward(self, enc):
        # fill out here
        '''
        (1 layer)
        enc -> w_q, w_k, w_v -> q, k, v
        -> multi-head attention
        -> residual learning, layer normalizing
        -> FFN -> residual learning, layer normalizing => output
        '''
        _res = enc
        
        # self attention
        attention_result = self.attention(enc) # [batch_size, seq_len, d_model]

        # residual learning + layer normalizing
        # enc: [batch_size, seq_len, d_model]
        sub_layer_1 = self.layer_norm1(_res + attention_result)

        # FFN
        _res = sub_layer_1
        ffn_result = self.ffn(sub_layer_1)

        # residual learning + layer normalizing
        sub_layer_2 = self.layer_norm2(_res + ffn_result)
        
        return sub_layer_2

In [31]:
class DecoderLayer(nn.Module):
# refer to Section 3.1 and Figure 1 in the paper
# this is a single decoder block consists of the following
# mawsked multi-head attention, multi-head attention, positionwise feed forward network, residual connections, layer normalizations

    def __init__(self,d_model=16,num_head=4,d_ff=32,drop_prob=.1):
        super().__init__()
        # fill out here
        self.self_attention = MultiHeadAttention(d_model, num_head)
        self.cross_attention = MultiHeadAttention(d_model, num_head)
        self.ffn = PositionwiseFeedForwardNetwork()

        self.layer_norm1 = LayerNormalization()
        self.layer_norm2 = LayerNormalization()
        self.layer_norm3 = LayerNormalization()


    def forward(self,enc_output,dec):
        # fill out here
        '''
        (1 layer)
        dec -> w_q, w_k, w_v -> q1, k1, v1
        -> multi-head attention (self attention)
        -> residual learning, layer normalizing => q2

        enc_output -> w_k, w_v -> k2, v2
        q2, k2, v2 -> cross attention
        -> residual learning, layer normalizing
        -> FFN -> residual learning, layer normalizing => output
        '''
        ########## self attention
        _res = dec

        # self attention
        self_result = self.self_attention(dec, mask=True) # [batch_size, dec_len, d_model]

        # residual learning + layer normalizing
        sub_layer_1 = self.layer_norm1(_res + self_result)

        ########## cross attention

        # cross attention
        cross_result = self.cross_attention(enc_output, cross=sub_layer_1) # [batch_size, dec_len, d_model]

        # residual learning + layer normalizing
        sub_layer_2 = self.layer_norm2(sub_layer_1 + cross_result)
         
        # FFN
        ffn_result = self.ffn(sub_layer_2)

        # residual learning + layer normalizing
        sub_layer_3 = self.layer_norm3(sub_layer_2 + ffn_result)

        return sub_layer_3

# Encoder, Decoder

In [32]:
class Encoder(nn.Module):
# refer to Section 3.1 and Figure 1 in the paper
# this is a whole encoder, i.e., the left side of Figure 1, consists of the following as well
# input embedding, positional encoding
    """
    in this homework, encoder inputs are not tokens, it is already embeddings in the input dimension
    hence, you don't have to set input embedding layer
    instead, you have to transform the input into the hidden dimension with single linear transformation
    """
    def __init__(self,device,input_dim=3,num_layer=3,max_len=512,d_model=16,num_head=4,d_ff=32,drop_prob=.1):
        super().__init__()
        # fill out here
        
        self.num_layer = num_layer

        self.positional_encoding = PositionalEncoding(device)

        # 다 다른 파라미터를 가지도록 해야함!!
        self.encoder_layer = nn.ModuleList()
        for _ in range(self.num_layer):
            self.encoder_layer.append(EncoderLayer())


    def forward(self, x):
        # fill out here
        '''
        positional encoding
        encoder layer * n
        '''
        # positional encoding
        x = self.positional_encoding(x)

        # encoder layer
        for i in range(self.num_layer):
            hidden = self.encoder_layer[i](x)

        return hidden

In [33]:
class Decoder(nn.Module):
# refer to Section 3.1 and Figure 1 in the paper
# this is a whole decoder, i.e., the left side of Figure 1, consists of the following as well
# input embedding, positional encoding, linear classifier
    """
    in this homework, decoder inputs are not tokens, it is already embeddings in the input dimension
    hence, you don't have to set input embedding layer
    instead, you have to transform the input into the hidden dimension with single linear transformation
    """
    def __init__(self,device,input_dim=3,num_layer=3,max_len=512,d_model=16,num_head=4,d_ff=32,drop_prob=.1):
        super().__init__()
        # fill out here

        self.num_layer = num_layer

        self.positional_encoding = PositionalEncoding(device)

        self.decoder_layer = nn.ModuleList()
        for _ in range(self.num_layer):
            self.decoder_layer.append(DecoderLayer())

        self.w_o = nn.Linear(d_model, d_model)


    def forward(self,enc_output, y):
        # fill out here
        '''
        positional encoding
        decoder layer * n
        linear -> softmax
        '''
        # positional encoding
        y = self.positional_encoding(y)

        # decoder layer
        for i in range(self.num_layer):
            output = self.decoder_layer[i](enc_output, y)

        output = F.softmax(self.w_o(output))

        return output

# Transformer

In [34]:
class Transformer(nn.Module):
# refer to Section 3.1 and Figure 1 in the paper
# sum up encoder and decoder

    def __init__(self,device,input_dim=3,num_layer=3,max_len=512,d_model=16,num_head=4,d_ff=32,drop_prob=.1):
        super().__init__()
        # fill out here

        self.encoder = Encoder(device)
        self.decoder = Decoder(device)


    def forward(self,x,y):
        # fill out here
        '''
        x -> encoder => hidden
        y, hidden -> decoder => dec_output
        '''
        hidden = self.encoder(x)
        dec_output = self.decoder(hidden, y)

        return dec_output

# 오류 테스트
gpt가 짜줌

In [35]:
def test_transformer_modules():
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    
    # 테스트용 더미 데이터 생성
    batch_size = 4
    seq_len = 512
    d_model = 16
    x = torch.randn(batch_size, seq_len, d_model).to(device)
    y = torch.randn(batch_size, seq_len, d_model).to(device)

    try:
        # Positional Encoding 테스트
        pe = PositionalEncoding(device, max_len=512, d_model=16)
        out_pe = pe(x)
        print("✅ PositionalEncoding 통과")
        
        # 인코더 레이어 테스트
        encoder_layer = EncoderLayer(d_model=16, num_head=4, d_ff=32)
        out_enc_layer = encoder_layer(x)
        print("✅ EncoderLayer 통과")
        
        # 디코더 레이어 테스트
        decoder_layer = DecoderLayer(d_model=16, num_head=4, d_ff=32)
        out_dec_layer = decoder_layer(x, y)
        print("✅ DecoderLayer 통과")
        
        # 전체 트랜스포머 테스트
        transformer = Transformer(device, input_dim=3, num_layer=3, 
                                max_len=512, d_model=16, num_head=4)
        out_transformer = transformer(x, y)
        print("✅ Transformer 통과")
        
    except Exception as e:
        print(f"❌ 오류 발생: {str(e)}")
        raise

In [36]:
test_transformer_modules()

✅ PositionalEncoding 통과
✅ EncoderLayer 통과
✅ DecoderLayer 통과


  weights = F.softmax(scores) # [batch_size, num_head, seq_len, seq_len]


✅ Transformer 통과


  output = F.softmax(self.w_o(output))


# Train

In [None]:
##########################################
#### there is nothing to do from here ####
##########################################

class ScheduledOptimizer:

    def __init__(self,optimizer,d_model=16,warmup_steps=4000):
        self.optimizer = optimizer
        self.d_model = d_model
        self.warmup_steps = warmup_steps
        self.step_num = 0

    def zero_grad(self):
        self.optimizer.zero_grad()

    def update_parameter_and_learning_rate(self):
        self.optimizer.step()
        self.step_num += 1
        self.lr = self.d_model**(-.5) * min(self.step_num**(-.5),self.step_num*self.warmup_steps**(-1.5))
        for param_group in self.optimizer.param_groups:
            param_group['lr'] = self.lr

In [None]:
# device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
# mps device로 변경

model = Transformer(device=device,input_dim=3,num_layer=3,max_len=512,d_model=16,num_head=4,d_ff=64,drop_prob=.1).to(device)
loss = nn.BCEWithLogitsLoss(reduction='sum')
optimizer = torch.optim.Adam(model.parameters(),betas=(.9,.98),eps=1e-9)
scheduled_optimizer = ScheduledOptimizer(optimizer,d_model=16)


num_epoch = 100
train_loss_list, test_loss_list = list(), list()

total_params = sum(p.numel() for p in model.parameters() if p.requires_grad)

print("num_param:", total_params)

for i in range(num_epoch):
    
    ## train
    model.train()

    total_loss = 0
    count = 0

    for batch_idx, (image, label) in enumerate(train_loader):

        image = image.reshape(-1,3,1024).transpose(1,2)
        x, y = image[:,:512,:].to(device), image[:,512:,:].to(device)

        y_ = torch.zeros([batch_size,1,3],requires_grad=False).to(device)
        y_ = torch.cat([y_,y[:,:-1,:]],dim=1)
        
        logit = model.forward(x,y_)
        cost = loss(logit, y)/(3*512)
        
        total_loss += cost.item()

        scheduled_optimizer.zero_grad()
        cost.backward()
        scheduled_optimizer.update_parameter_and_learning_rate()
        
    ave_loss = total_loss/len(train_data)
    train_loss_list.append(ave_loss)

    if i % 1 == 0:
        print("\nEpoch %d Train: %.3f w/ Learning Rate: %.5f"%(i,ave_loss,scheduled_optimizer.lr))

    ## test
    model.eval()

    total_loss = 0
    count = 0

    with torch.no_grad():
        for batch_idx, (image, label) in enumerate(test_loader):

            image = image.reshape(-1,3,1024).transpose(1,2)
            x, y = image[:,:512,:].to(device), image[:,512:,:].to(device)

            y_ = torch.zeros([batch_size,1,3],requires_grad=False).to(device)
            y_ = torch.cat([y_,y[:,:-1,:]],dim=1)
            
            logit = model.forward(x,y_)
            cost = loss(logit, y)/(3*512)

            total_loss += cost.item()

    ave_loss = total_loss/len(test_data)
    test_loss_list.append(ave_loss)

    if i % 1 == 0:
        print("Epoch %d Test: %.3f"%(i,ave_loss))