In [1]:
import torch
from torch import nn
from tqdm import tqdm

In [2]:
import collections

class Vocabulary:

  def __init__(self, tokens, unk_token="<unk>", min_count=None):
    if min_count is None:
      min_count = 0
    count = collections.Counter(tokens)
    tokens = [w for w, c in count.most_common() if c >= min_count] # added


    self.unk_token = unk_token
    self.unk_index = 0
    self._itos = [unk_token] + tokens
    self._stoi = {token: index for index, token in enumerate(self._itos)}

  def stoi(self, token: str) -> int:
    return self._stoi.get(token, self.unk_index)


  def itos(self, index: int) -> str:
    if index < 0 or index >= len(self._itos):
      raise LookupError(f"Index {index} out of range for vocabulary size {len(self._itos)}")
    return self._itos[index]

  @property
  def tokens(self):
    return self._itos

  def __len__(self) -> int:
    return len(self._itos)

  def __getitem__(self, key):
    if isinstance(key, str):
      return self.stoi(key)
    elif isinstance(key, int):
      return self.itos(key)
    else:
      TypeError("Unsupported ley type")

def vectorize(tokens, vocab):
  return torch.tensor([vocab.stoi(t) for t in tokens])

In [3]:
def build_context_target(tokens, seq_length):
    contexts = []
    targets = []
    for i in range(0, len(tokens) - seq_length):
        context = tokens[i: i + seq_length]              
        target = tokens[i + 1: i + seq_length + 1]        
        contexts.append(context)
        targets.append(target)
    return contexts, targets

In [4]:
build_context_target(list("Hello".lower()), 4)

([['h', 'e', 'l', 'l']], [['e', 'l', 'l', 'o']])

In [5]:
def tokenize(text):
    return list(text)

In [6]:
def batchify(data, batch_size):
    for i in range(0, len(data), batch_size):
        yield data[i:i + batch_size]

In [7]:
text = "[Somewhere over the rainbow https://youtu.be/w_DKWlrA24k?si=xDJWHeC37RutylZX]" * 20
text

'[Somewhere over the rainbow https://youtu.be/w_DKWlrA24k?si=xDJWHeC37RutylZX][Somewhere over the rainbow https://youtu.be/w_DKWlrA24k?si=xDJWHeC37RutylZX][Somewhere over the rainbow https://youtu.be/w_DKWlrA24k?si=xDJWHeC37RutylZX][Somewhere over the rainbow https://youtu.be/w_DKWlrA24k?si=xDJWHeC37RutylZX][Somewhere over the rainbow https://youtu.be/w_DKWlrA24k?si=xDJWHeC37RutylZX][Somewhere over the rainbow https://youtu.be/w_DKWlrA24k?si=xDJWHeC37RutylZX][Somewhere over the rainbow https://youtu.be/w_DKWlrA24k?si=xDJWHeC37RutylZX][Somewhere over the rainbow https://youtu.be/w_DKWlrA24k?si=xDJWHeC37RutylZX][Somewhere over the rainbow https://youtu.be/w_DKWlrA24k?si=xDJWHeC37RutylZX][Somewhere over the rainbow https://youtu.be/w_DKWlrA24k?si=xDJWHeC37RutylZX][Somewhere over the rainbow https://youtu.be/w_DKWlrA24k?si=xDJWHeC37RutylZX][Somewhere over the rainbow https://youtu.be/w_DKWlrA24k?si=xDJWHeC37RutylZX][Somewhere over the rainbow https://youtu.be/w_DKWlrA24k?si=xDJWHeC37RutylZ

In [8]:
tokens = tokenize(text)
vocab = Vocabulary(tokens)
len(vocab)

44

In [9]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

In [10]:
class ElmanRnn(nn.Module):
    def __init__(self, vocab_size, embedding_dim, hidden_dim):
        super().__init__()
        self.vocab_size = vocab_size
        self.embedding_dim = embedding_dim
        self.hidden_dim = hidden_dim

        self.embeddings = nn.Embedding(vocab_size, embedding_dim)
        self.W_h = nn.Linear(embedding_dim, hidden_dim)
        self.U_h = nn.Linear(hidden_dim, hidden_dim)
        self.tanh = nn.Tanh()
        self.W_y = nn.Linear(hidden_dim, vocab_size)

    def forward(self, inputs, hidden_state=None):
        batch_size, seq_len = inputs.size()
        if hidden_state is None:
            hidden_state = torch.zeros(batch_size, self.hidden_dim, device=inputs.device)

        log_probs_seq = []

        for t in range(seq_len):
            x_t = self.embeddings(inputs[:, t])  
            hidden_state = self.tanh(self.W_h(x_t) + self.U_h(hidden_state))  
            logits = self.W_y(hidden_state)  
            log_probs = torch.log_softmax(logits, dim=-1)
            log_probs_seq.append(log_probs)

        log_probs_seq = torch.stack(log_probs_seq, dim=1)
        return log_probs_seq, hidden_state
        
    def init_hidden(self, batch_size):
        return torch.zeros(batch_size, self.hidden_dim)


In [11]:
def train_loop(model, tokens, vocab, context_size, loss_fn, optimizer):
    contexts, targets = build_context_target(tokens, context_size)  
    total_loss = 0.0
    total_sequences = len(targets)
    total_batch_num = (total_sequences // params["batch_size"]) + (1 if total_sequences % params["batch_size"] != 0 else 0)

    for context_batch, target_batch in tqdm(zip(batchify(contexts, params["batch_size"]), batchify(targets, params["batch_size"])),
                                            total=total_batch_num):
        X = torch.stack([vectorize(context, vocab) for context in context_batch]).to(device)  
        Y = torch.stack([vectorize(target, vocab) for target in target_batch]).to(device)  
        
        hidden_state = model.init_hidden(batch_size=X.size(0)).to(X.device)
        
        optimizer.zero_grad()
        log_probs, hidden_state = model(X, hidden_state)

        loss = loss_fn(log_probs.view(-1, model.vocab_size), Y.view(-1))
        total_loss += loss.item()
        
        loss.backward()
        optimizer.step()

    avg_loss = total_loss / total_sequences
    return avg_loss


In [12]:
def train(model, tokens, vocab, context_size, loss_fn, optimizer):
  for e in range(params["num_epochs"]):
    print("EPOCH :", e, "/",params["num_epochs"])
    avg_loss = train_loop(model, tokens, vocab, context_size, loss_fn, optimizer)
    print(f"Average Loss: {avg_loss:.4f}")

In [13]:
@torch.no_grad()
def greedy_decode(model, vocab, start_token, end_token):
    start_index = vocab.stoi(start_token)
    result = [start_index]
    hidden_state = model.init_hidden(batch_size=1)  
    while len(result) < 100:  
        input_ = torch.tensor([[result[-1]]]).to(device)  
        log_probs, hidden_state = model(input_, hidden_state)

        token_index = log_probs.argmax().item()
        if token_index == vocab.stoi(end_token):
            break
        result.append(token_index)
    return result[1:]

In [14]:
params = {
    "vocab_size": len(vocab),
    "embedding_dim": 48,
    "context_size": 5,
    "hidden_dim": 256,
    "num_epochs": 20,
    "lr": 0.001,
    "batch_size": 16
}

model = ElmanRnn(params["vocab_size"], params["embedding_dim"], params["hidden_dim"])
loss_fn = nn.NLLLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=params["lr"])

In [15]:
train(model, tokens, vocab, params["context_size"], loss_fn, optimizer)

EPOCH : 0 / 20


100%|██████████████████████████████████████████| 96/96 [00:00<00:00, 429.53it/s]


Average Loss: 0.0833
EPOCH : 1 / 20


100%|██████████████████████████████████████████| 96/96 [00:00<00:00, 629.51it/s]


Average Loss: 0.0168
EPOCH : 2 / 20


100%|██████████████████████████████████████████| 96/96 [00:00<00:00, 573.14it/s]


Average Loss: 0.0126
EPOCH : 3 / 20


100%|██████████████████████████████████████████| 96/96 [00:00<00:00, 580.31it/s]


Average Loss: 0.0114
EPOCH : 4 / 20


100%|██████████████████████████████████████████| 96/96 [00:00<00:00, 589.50it/s]


Average Loss: 0.0110
EPOCH : 5 / 20


100%|██████████████████████████████████████████| 96/96 [00:00<00:00, 622.37it/s]


Average Loss: 0.0107
EPOCH : 6 / 20


100%|██████████████████████████████████████████| 96/96 [00:00<00:00, 600.69it/s]


Average Loss: 0.0106
EPOCH : 7 / 20


100%|██████████████████████████████████████████| 96/96 [00:00<00:00, 561.32it/s]


Average Loss: 0.0105
EPOCH : 8 / 20


100%|██████████████████████████████████████████| 96/96 [00:00<00:00, 550.80it/s]


Average Loss: 0.0104
EPOCH : 9 / 20


100%|██████████████████████████████████████████| 96/96 [00:00<00:00, 521.88it/s]


Average Loss: 0.0104
EPOCH : 10 / 20


100%|██████████████████████████████████████████| 96/96 [00:00<00:00, 491.14it/s]


Average Loss: 0.0103
EPOCH : 11 / 20


100%|██████████████████████████████████████████| 96/96 [00:00<00:00, 555.04it/s]


Average Loss: 0.0103
EPOCH : 12 / 20


100%|██████████████████████████████████████████| 96/96 [00:00<00:00, 535.59it/s]


Average Loss: 0.0103
EPOCH : 13 / 20


100%|██████████████████████████████████████████| 96/96 [00:00<00:00, 585.25it/s]


Average Loss: 0.0103
EPOCH : 14 / 20


100%|██████████████████████████████████████████| 96/96 [00:00<00:00, 599.70it/s]


Average Loss: 0.0103
EPOCH : 15 / 20


100%|██████████████████████████████████████████| 96/96 [00:00<00:00, 602.50it/s]


Average Loss: 0.0103
EPOCH : 16 / 20


100%|██████████████████████████████████████████| 96/96 [00:00<00:00, 568.93it/s]


Average Loss: 0.0102
EPOCH : 17 / 20


100%|██████████████████████████████████████████| 96/96 [00:00<00:00, 560.50it/s]


Average Loss: 0.0102
EPOCH : 18 / 20


100%|██████████████████████████████████████████| 96/96 [00:00<00:00, 464.79it/s]


Average Loss: 0.0102
EPOCH : 19 / 20


100%|██████████████████████████████████████████| 96/96 [00:00<00:00, 605.41it/s]

Average Loss: 0.0102





In [16]:
start_token = '['
end_token = ']'

In [17]:
generated_token_ids = greedy_decode(model, vocab, start_token, end_token)

In [18]:
tokens = [vocab[i] for i in generated_token_ids]
"".join(tokens)

'Somewhere over the rainbow https://youtu.be/w_DKWlrA24k?si=xDJWHeC37RutylZX'

In [19]:
model.parameters

<bound method Module.parameters of ElmanRnn(
  (embeddings): Embedding(44, 48)
  (W_h): Linear(in_features=48, out_features=256, bias=True)
  (U_h): Linear(in_features=256, out_features=256, bias=True)
  (tanh): Tanh()
  (W_y): Linear(in_features=256, out_features=44, bias=True)
)>

In [20]:
import json
import torch

def save_model_parameters(model, file_prefix="own"):
    for name, param in model.named_parameters():
        if param.requires_grad:  
            param_data = param.data.tolist()  
            filename = f"{file_prefix}_{name}.json"
            with open(filename, 'w') as f:
                json.dump(param_data, f)
            print(f"Saved {name} parameters to {filename}")

def save_vocab(vocab, filename="own_vocab.json"):
    vocab_data = vocab.tokens  
    with open(filename, 'w') as f:
        json.dump(vocab_data, f)
    print(f"Saved vocabulary to {filename}")

def save_embeddings(embedding_layer, filename="own_embeddings.json"):
    embeddings = embedding_layer.weight.data.tolist() 
    with open(filename, 'w') as f:
        json.dump(embeddings, f)
    print(f"Saved embeddings to {filename}")

save_model_parameters(model)
save_vocab(vocab)
save_embeddings(model.embeddings)

Saved embeddings.weight parameters to own_embeddings.weight.json
Saved W_h.weight parameters to own_W_h.weight.json
Saved W_h.bias parameters to own_W_h.bias.json
Saved U_h.weight parameters to own_U_h.weight.json
Saved U_h.bias parameters to own_U_h.bias.json
Saved W_y.weight parameters to own_W_y.weight.json
Saved W_y.bias parameters to own_W_y.bias.json
Saved vocabulary to own_vocab.json
Saved embeddings to own_embeddings.json


In [21]:
# solving own quest below

In [22]:
embedding_dim = 48
hidden_dim = 256
vocab_size = 44

In [23]:
def json_to_tensor(json_name):
    with open(json_name, 'r') as file:
        tensor = json.load(file)
    return torch.tensor(tensor)

In [24]:
W_h_weight = json_to_tensor('own_W_h.weight.json')
W_h_weight.shape

torch.Size([256, 48])

In [25]:
W_y_weight = json_to_tensor('own_W_y.weight.json')
W_y_weight.shape

torch.Size([44, 256])

In [26]:
U_h_weight = json_to_tensor('own_U_h.weight.json')
U_h_weight.shape

torch.Size([256, 256])

In [27]:
Embedding_weight = json_to_tensor('own_embeddings.weight.json')
Embedding_weight.shape

torch.Size([44, 48])

In [28]:
with open('own_vocab.json', 'r') as file:
    vocab = json.load(file)
len(vocab)

44

In [29]:
class GeneratorElmanRnn(nn.Module):
    def __init__(self, vocab_size, embedding_dim, hidden_dim):
        super().__init__()
        self.vocab_size = vocab_size
        self.embedding_dim = embedding_dim
        self.hidden_dim = hidden_dim

        self.embeddings = Embedding_weight
        self.W_h = W_h_weight
        self.U_h = U_h_weight
        self.tanh = nn.Tanh()
        self.h = torch.zeros(self.hidden_dim)
        self.W_y = W_y_weight

        
    def forward(self, token_idx):
        x = self.embeddings[token_idx]
        h_t = self.tanh(self.W_h @ x + self.U_h @ self.h)
        self.h = h_t
        logits = self.W_y @ h_t
        log_probs = torch.log_softmax(logits, dim=-1)
        return log_probs

In [30]:
gen_model = GeneratorElmanRnn(vocab_size, embedding_dim, hidden_dim)

In [31]:
start_token = "["
end_token = "]"
@torch.no_grad()
def greedy_decode_generative(model, vocab):
    start_index = vocab.index(start_token)
    result = [start_index]
    while len(result) < 100:

        input_ = result[-1]
        log_probs = model(input_)

        token_index = log_probs.argmax()

        if token_index == vocab.index(end_token):
            break

        result.append(token_index.item())

    return result[1:]

In [32]:
generated_token_ids = greedy_decode_generative(gen_model, vocab)

In [33]:
tokens = [vocab[i] for i in generated_token_ids]
"".join(tokens)

'Somewhere over the rainbow https://youtu.be/w_DKWlrA24k?si=xDJWHeC37RutylZX'