# PyTorch RNN

[RNN Doc PyTorch](https://docs.pytorch.org/docs/stable/generated/torch.nn.RNN.html#rnn)

**Parameters:**

- input_size – The number of expected features in the input x
- hidden_size – The number of features in the hidden state h

- batch_first – If True, then the input and output tensors are provided as (batch, seq, feature) 
- instead of (seq, batch, feature). 

Note that this does not apply to hidden or cell states. 

# Generating a Synthetic Weather Time-Series

To train an RNN, we need a sequence with memory. Weather is a natural example: 
today’s temperature depends on previous days plus some randomness.

We generate a simple autoregressive process:

`T_t = 0.7 * T_(t-1) + 0.2 * T_(t-2) + Normal(0, 0.5)`

This produces a smooth temperature sequence with short-term memory and noise.
We will use the first 5 days to predict day 6.


In [None]:
import numpy as np

temps = []
temps.append(20)   # seed day 0
temps.append(21)   # seed day 1

for t in range(2, 100):
    next_temp = (
        0.7 * temps[-1] +
        0.2 * temps[-2] +
        np.random.normal(0, 0.5)
    )
    temps.append(next_temp)

input_sq = temps[:5]
target = temps[5]
print(input_sq, target)

In [None]:
import torch.nn as nn
import torch
import torch.optim as optim


class RNNPredictor(nn.Module):
    def __init__(self, input_size=1, hidden_size=20):
        super().__init__()
        self.rnn = nn.RNN(
            input_size=input_size,
            hidden_size=hidden_size,
            batch_first=True
        )
        self.fc = nn.Linear(hidden_size, 1)

    def forward(self, x):
        # x: (batch, seq_len, input_size) 
        out, h = self.rnn(x)
        last_h = out[:, -1, :]   # final hidden state
        y_pred = self.fc(last_h)
        return y_pred

## Make the dataset tensor instead of np array

In [None]:
def make_dataset(data, seq_len=5):
    X, y = [], []
    for i in range(len(data) - seq_len):
        X.append(data[i:i+seq_len])
        y.append(data[i+seq_len])
    return np.array(X), np.array(y)

X, y = make_dataset(temps, seq_len=5)

# reshape for PyTorch: (batch, seq_len, input_size)
X = torch.tensor(X, dtype=torch.float32).unsqueeze(-1)
y = torch.tensor(y, dtype=torch.float32).unsqueeze(-1)

print(X.shape, y.shape)
print(X[1].shape)

### input: tensor of shape (L, H_in)
### hx tensor of shape (D * num_layers, H_out)

- L = sequence length
- H_in = input size
- D = defult **(1)** 2 if bidirectional=True
- H_out = hidden size

## Just like before 

- define the model
- define the loss_fn
- define the optimizer
- write the training loop

In [None]:
model = RNNPredictor(input_size=1, hidden_size=20)

loss_fn = nn.MSELoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.01)

In [None]:
epochs = 3000

for epoch in range(epochs):
    optimizer.zero_grad()

    y_pred = model(X)
    loss = loss_fn(y_pred, y)

    loss.backward()
    optimizer.step()

    if epoch % 300 == 0:
        print(f"epoch {epoch}, loss={loss.item():.4f}")


In [None]:
test_seq = torch.tensor(temps[:5]).unsqueeze(0).unsqueeze(-1)
pred = model(test_seq)

print("Input:", temps[:5])
print("Prediction:", pred.item())
print("True next value:", temps[5])

# One To Many

In [None]:
class RNNotm(nn.Module):
    def __init__(self, input_size=1, hidden_size=20):
        super().__init__()
        self.rnn = nn.RNN(
            input_size=input_size,
            hidden_size=hidden_size,
            batch_first=True
        )
        self.fc = nn.Linear(hidden_size, 1)

    def forward(self, x):
        # x: (batch, seq_len, input_size)
        out, h = self.rnn(x)
        last_h = out[:, -1, :]   # final hidden state
        y_pred = self.fc(last_h)
        return y_pred
    
    def generate(self, x0, steps=10):
        """
        x0: (batch, seq_len, input_size) initial sequence
        steps: how many future steps to generate
        """
        self.eval()  # generation mode

        outputs = []
        x = x0
        h = None

        for _ in range(steps):
            out, h = self.rnn(x, h)
            last_h = out[:, -1, :]
            y = self.fc(last_h)              # next predicted value
            outputs.append(y)

            # feed prediction back in as next input
            x = torch.cat([x[:, 1:, :], y.unsqueeze(1)], dim=1)

        return torch.stack(outputs, dim=1)   # (batch, steps, 1)


In [None]:
model_otm = RNNotm(input_size=1, hidden_size=20)
loss_fn = nn.MSELoss()
optimizer = torch.optim.Adam(model_otm.parameters(), lr=0.01)

In [None]:
# train model...
for epoch in range(3000):
    optimizer.zero_grad()
    y_pred = model_otm(X)
    loss = loss_fn(y_pred, y)
    loss.backward()
    optimizer.step()

# now generate
seed = X[0].unsqueeze(0)
future = model_otm.generate(seed, steps=10)

print("seed =", seed)
print("future shape =", future.shape)
print("future =", future)


# seq to seq

In [None]:
pairs = [
    ("hello", "hola"),
    ("how are you", "cómo estás"),
    ("i am fine", "estoy bien"),
    ("thank you", "gracias"),
    ("good morning", "buenos días"),
    ("good night", "buenas noches"),
    ("see you later", "hasta luego"),
    ("i love you", "te quiero"),
    ("what is your name", "cómo te llamas"),
    ("my name is john", "me llamo john"),
]


In [None]:
from collections import Counter

def build_vocab(sentences):
    counter = Counter()
    for s in sentences:
        counter.update(s.split())

    vocab = ["<pad>", "<sos>", "<eos>"] + sorted(counter.keys())
    stoi = {w:i for i,w in enumerate(vocab)}
    itos = {i:w for w,i in stoi.items()}
    return vocab, stoi, itos


In [None]:
eng_sentences = [e for e, _ in pairs]
spa_sentences = [s for _, s in pairs]

eng_vocab, eng_stoi, eng_itos = build_vocab(eng_sentences)
spa_vocab, spa_stoi, spa_itos = build_vocab(spa_sentences)


In [None]:
def encode_sentence(sentence, stoi):
    tokens = sentence.split()
    ids = [stoi["<sos>"]] + [stoi[t] for t in tokens] + [stoi["<eos>"]]
    return ids


In [None]:
encoded_pairs = [
    (encode_sentence(e, eng_stoi), encode_sentence(s, spa_stoi))
    for e, s in pairs
]


In [None]:
def pad_sequence(seq, max_len, pad_idx):
    return seq + [pad_idx] * (max_len - len(seq))

In [None]:
def make_batch(encoded_pairs):
    eng_max = max(len(e) for e, _ in encoded_pairs)
    spa_max = max(len(s) for _, s in encoded_pairs)

    eng_batch = []
    spa_batch = []

    for e, s in encoded_pairs:
        eng_batch.append(pad_sequence(e, eng_max, eng_stoi["<pad>"]))
        spa_batch.append(pad_sequence(s, spa_max, spa_stoi["<pad>"]))

    return torch.tensor(eng_batch), torch.tensor(spa_batch)

src_batch, tgt_batch = make_batch(encoded_pairs)


In [None]:
from torch.utils.data import Dataset, DataLoader

class TranslationDataset(Dataset):
    def __init__(self, encoded_pairs):
        self.data = encoded_pairs

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        src, tgt = self.data[idx]
        return torch.tensor(src), torch.tensor(tgt)


In [None]:
def collate_fn(batch):
    src_batch, tgt_batch = zip(*batch)

    src_max = max(len(s) for s in src_batch)
    tgt_max = max(len(t) for t in tgt_batch)

    def pad(seq, max_len, pad_idx):
        pad_tensor = torch.tensor([pad_idx] * (max_len - len(seq)), dtype=torch.long)
        return torch.cat([seq, pad_tensor])

    src_padded = torch.stack([pad(s, src_max, eng_stoi["<pad>"]) for s in src_batch])
    tgt_padded = torch.stack([pad(t, tgt_max, spa_stoi["<pad>"]) for t in tgt_batch])

    return src_padded.long(), tgt_padded.long()


In [None]:
dataset = TranslationDataset(encoded_pairs)
loader = DataLoader(dataset, batch_size=4, shuffle=True, collate_fn=collate_fn)


In [None]:
class Encoder(nn.Module):
    def __init__(self, vocab_size, embed_size, hidden_size):
        super().__init__()
        self.embed = nn.Embedding(vocab_size, embed_size)
        self.rnn = nn.RNN(embed_size, hidden_size, batch_first=True)

    def forward(self, x):
        embedded = self.embed(x)
        outputs, hidden = self.rnn(embedded)
        return hidden  # (1, batch, hidden)
    

class Decoder(nn.Module):
    def __init__(self, vocab_size, embed_size, hidden_size):
        super().__init__()
        self.embed = nn.Embedding(vocab_size, embed_size)
        self.rnn = nn.RNN(embed_size, hidden_size, batch_first=True)
        self.fc = nn.Linear(hidden_size, vocab_size)

    def forward(self, x, hidden):
        embedded = self.embed(x)
        output, hidden = self.rnn(embedded, hidden)
        logits = self.fc(output)
        return logits, hidden
    

class Seq2Seq(nn.Module):
    def __init__(self, encoder, decoder):
        super().__init__()
        self.encoder = encoder
        self.decoder = decoder

    def forward(self, src, tgt):
        hidden = self.encoder(src)
        outputs, _ = self.decoder(tgt[:, :-1], hidden)
        return outputs


In [None]:
device = "cuda" if torch.cuda.is_available() else "cpu"

embed_size = 64
hidden_size = 128

encoder = Encoder(len(eng_vocab), embed_size, hidden_size).to(device)
decoder = Decoder(len(spa_vocab), embed_size, hidden_size).to(device)
model = Seq2Seq(encoder, decoder).to(device)

criterion = nn.CrossEntropyLoss(ignore_index=spa_stoi["<pad>"])
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)


In [None]:
EPOCHS = 200

for epoch in range(EPOCHS):
    total_loss = 0

    for src, tgt in loader:
        src, tgt = src.to(device), tgt.to(device)

        optimizer.zero_grad()

        logits = model(src, tgt)  # (batch, seq_len-1, vocab)
        loss = criterion(
            logits.reshape(-1, logits.size(-1)),
            tgt[:, 1:].reshape(-1)
        )

        loss.backward()
        optimizer.step()

        total_loss += loss.item()

    if epoch % 20 == 0:
        print(f"Epoch {epoch} Loss: {total_loss:.4f}")


In [None]:
def translate(sentence):
    model.eval()

    # Encode the English sentence
    src_ids = encode_sentence(sentence, eng_stoi)
    src = torch.tensor(src_ids, dtype=torch.long).unsqueeze(0).to(device)

    # Encode with the encoder
    hidden = encoder(src)

    # Start decoder with <sos>
    tgt_idx = spa_stoi["<sos>"]
    tgt = torch.tensor([[tgt_idx]], dtype=torch.long).to(device)

    result_tokens = []

    for _ in range(20):  # max output length
        logits, hidden = decoder(tgt, hidden)

        # Get the last predicted token
        next_token = logits[:, -1].argmax(dim=-1).item()

        if next_token == spa_stoi["<eos>"]:
            break

        result_tokens.append(spa_itos[next_token])

        # Feed the predicted token back into the decoder
        tgt = torch.tensor([[next_token]], dtype=torch.long).to(device)

    return " ".join(result_tokens)


In [None]:
print(translate("hello"))
print(translate("i love you"))
print(translate("thank you"))
print(translate("what is your name"))
