In [1]:
!pip install torchtext



In [15]:
import torch
import torchtext
from collections import defaultdict

# tokenize
haiku1 = "Tranquil waters flow, Whispering secrets of time, Embraced by the night."
haiku2 = "Moonlight dances soft, Through branches of ancient oak, Embraced by the night."
haiku3 = "Serene silence reigns, Stars shimmer in the night sky, Embraced by the night."
haiku4 = "Shadows dance gently, Across fields of golden wheat, Embraced by the night."
haiku5 = "Fireflies flicker bright, Illuminating the dark, Embraced by the night."
haikus = [haiku1, haiku2, haiku3, haiku4, haiku5]

tokenizer = torchtext.data.utils.get_tokenizer("basic_english")
tokenized_haikus = [tokenizer(haiku) for haiku in haikus]

adjusted_tokenized_haikus = []
for haiku_tokens in tokenized_haikus:
    adjusted_tokens = []
    for token in haiku_tokens:
        if ',' in token:
            parts = token.split(',')
            adjusted_tokens.extend([parts[0], ','])
        elif '.' in token:
            parts = token.split('.')
            adjusted_tokens.extend([parts[0], '.'])
        else:
            adjusted_tokens.append(token)
    adjusted_tokenized_haikus.append(adjusted_tokens)

# vocab
vocab = defaultdict(lambda: len(vocab))
vocab['<unk>'] = 0  
vocab['<pad>'] = 1  


for haiku_tokens in adjusted_tokenized_haikus:
    for token in haiku_tokens:
        vocab[token]  

# Ctokens 
token_ids_haikus = []
for haiku_tokens in adjusted_tokenized_haikus:
    token_ids = [vocab[token] for token in haiku_tokens]
    token_ids_haikus.append(token_ids)

data_x = []
data_y = []
for token_ids in token_ids_haikus:
    for i in range(len(token_ids) - 3):
        data_x.append(token_ids[i:i+3])
        data_y.append(token_ids[i+3])

data_x = torch.tensor(data_x, dtype=torch.long)
data_y = torch.tensor(data_y, dtype=torch.long)


dataset = []
for haiku_tokens in adjusted_tokenized_haikus:
    for i in range(len(haiku_tokens) - 3):
        x = haiku_tokens[i:i+3]
        y = haiku_tokens[i+3]
        dataset.append({'x': x, 'y': y})

# change number 12 
for data in dataset[:12]:
    print(f"x: {data['x']}, y: [{data['y']}]")


x: ['tranquil', 'waters', 'flow'], y: []
x: ['waters', 'flow', ''], y: [,]
x: ['flow', '', ','], y: [whispering]
x: ['', ',', 'whispering'], y: [secrets]
x: [',', 'whispering', 'secrets'], y: [of]
x: ['whispering', 'secrets', 'of'], y: [time]
x: ['secrets', 'of', 'time'], y: []
x: ['of', 'time', ''], y: [,]
x: ['time', '', ','], y: [embraced]
x: ['', ',', 'embraced'], y: [by]
x: [',', 'embraced', 'by'], y: [the]
x: ['embraced', 'by', 'the'], y: [night]


In [None]:
import torch
from torch import nn
import torch.optim as optim
import math

class TransformerModule(nn.Module):
    def __init__(self, d_model=768, n_head=12, d_ffn=2048, dropout=0.1, device='cpu'):
        super(TransformerModule, self).__init__()
        self.d_model = d_model
        self.n_head = n_head
        self.d_ffn = d_ffn
        self.dropout = nn.Dropout(dropout)
        self.device = device

        self.norm_1 = nn.LayerNorm(d_model)
        self.norm_2 = nn.LayerNorm(d_model)

        self.ffn_1 = nn.Linear(d_model, d_ffn)
        self.ffn_2 = nn.Linear(d_ffn, d_model)

        self.gelu = nn.GELU()

        self.attention = nn.MultiheadAttention(d_model, n_head, dropout, batch_first=True, device=device)

    def forward(self, x):
        x_1 = self.norm_1(x)  

        
        mask = torch.triu(torch.ones(x.shape[1], x.shape[1]), diagonal=1).bool().to(self.device)
        x_1, _ = self.attention(x_1, x_1, x_1, attn_mask=mask, need_weights=False)

        x_1 = self.dropout(x_1)
        x_1 = x_1 + x
        x_1 = self.norm_2(x_1)

        x_2 = self.ffn_1(x_1)
        x_2 = self.gelu(x_2)
        x_2 = self.ffn_2(x_2)
        x_2 = self.dropout(x_2)

        return x_2 + x_1

class Transformer(nn.Module):
    def __init__(self, context_size, vocab_size, d_model=768, dropout=0.1, n_block=12, device='cpu'):
        super(Transformer, self).__init__()
        self.context_size = context_size
        self.vocab_size = vocab_size
        self.d_model = d_model
        self.d_ffn = 2048
        self.dropout = nn.Dropout(dropout)
        self.n_block = n_block
        self.device = device

        self.embedding = nn.Embedding(vocab_size, d_model)
        self.dec_blocks = nn.ModuleList([
            TransformerModule(device=device) for _ in range(n_block)
        ])

        self.pe = self.gen_pe(context_size, d_model)
        self.norm = nn.LayerNorm(d_model)
        self.ffn = nn.Linear(d_model, vocab_size)

    def gen_pe(self, r, c):
        pe = torch.zeros(r, c).to(self.device)
        for k in range(r):
            for i in range(c):
                if i % 2 == 0:
                    theta = math.exp((-i/c) * math.log(10_000))
                    pe[k, i] = math.sin(k * theta)
                else:
                    theta = math.exp(((1-i)/c) * math.log(10_000))
                    pe[k, i] = math.cos(k * theta)
        return pe

    def forward(self, x):
        x = self.embedding(x)
        length = x.shape[1]
        x = x + self.pe[:length]
        x = self.dropout(x)

        for dec in self.dec_blocks:
            x = dec(x)

        x = self.ffn(x[:, -1])
        return x


# Model setup
device = 'cuda' if torch.cuda.is_available() else 'cpu'
model = Transformer(context_size=3, vocab_size=len(vocab), device=device)
model.to(device)

# loss
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)


epochs = 10
batch_size = 1  #adjust

for epoch in range(epochs):
    model.train()
    total_loss = 0

    for i in range(0, len(data_x), batch_size):
        batch_x = data_x[i:i + batch_size].to(device)
        batch_y = data_y[i:i + batch_size].to(device)

        optimizer.zero_grad()
        outputs = model(batch_x)
        loss = criterion(outputs, batch_y)
        loss.backward()
        optimizer.step()

        total_loss += loss.item()

    print(f'Epoch {epoch+1}, Loss: {total_loss / len(data_x)}')


Epoch 1, Loss: 4.987115935103534
Epoch 2, Loss: 3.526947406873311
Epoch 3, Loss: 3.328416060094964
Epoch 4, Loss: 3.2907073530432296
Epoch 5, Loss: 3.199951225764131
Epoch 6, Loss: 3.1080526472771006


In [None]:
# import torch

# def generate_line(model, vocab, inv_vocab, device, seed_text, length):
#     model.eval()
#     with torch.no_grad():

#         input_ids = [vocab.get(word, vocab['<unk>']) for word in seed_text]
#         input_tensor = torch.tensor([input_ids], dtype=torch.long).to(device)


#         for _ in range(length):
#             output = model(input_tensor)
#             next_token_id = output[0, -1].argmax().item()  
#             input_ids.append(next_token_id)  
#             input_tensor = torch.tensor([input_ids], dtype=torch.long).to(device)


#         generated_words = [inv_vocab.get(id, '<unk>') for id in input_ids]
#         return generated_words

# def generate_haiku(model, vocab, inv_vocab, device, seed_text=None):
#     if seed_text is None:
#         seed_text = []  
    

#     line1 = generate_line(model, vocab, inv_vocab, device, seed_text, 5)
#     line2 = generate_line(model, vocab, inv_vocab, device, [], 7)  
#     line3 = generate_line(model, vocab, inv_vocab, device, [], 5)  


#     haiku = ' '.join(line1) + '\n' + ' '.join(line2) + '\n' + ' '.join(line3)
#     return haiku


# vocab = {word: idx for idx, word in enumerate(vocab)}  
# inv_vocab = {idx: word for word, idx in vocab.items()}  
# device = 'cuda' if torch.cuda.is_available() else 'cpu'
# model = model.to(device)  

# generated_haiku = generate_haiku(model, vocab, inv_vocab, device)
# print(generated_haiku)
