In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset

In [None]:
class RNN(nn.Module):
    def __init__(self, input_size, hidden_size, output_size, rnn_type='LSTM',
                 num_layers=1, dropout=0.0, batch_norm=False, bidirectional=False):
        super(RNN, self).__init__()
        self.rnn_type = rnn_type.upper()
        self.hidden_size = hidden_size
        self.num_layers = num_layers
        self.bidirectional = bidirectional
        self.batch_norm = batch_norm
        self.num_directions = 2 if bidirectional else 1

        # RNN Layer
        if self.rnn_type == 'RNN':
            self.rnn = nn.RNN(input_size, hidden_size, num_layers=num_layers,
                              batch_first=True, dropout=dropout if num_layers > 1 else 0,
                              bidirectional=bidirectional)
        elif self.rnn_type == 'LSTM':
            self.rnn = nn.LSTM(input_size, hidden_size, num_layers=num_layers,
                               batch_first=True, dropout=dropout if num_layers > 1 else 0,
                               bidirectional=bidirectional)
        elif self.rnn_type == 'GRU':
            self.rnn = nn.GRU(input_size, hidden_size, num_layers=num_layers,
                              batch_first=True, dropout=dropout if num_layers > 1 else 0,
                              bidirectional=bidirectional)
        else:
            raise ValueError("Unsupported RNN type")

        # BatchNorm
        if batch_norm:
            self.bn = nn.BatchNorm1d(hidden_size * self.num_directions)

        # Dropout layer
        self.dropout_layer = nn.Dropout(dropout)

        # Output layer
        self.fc = nn.Linear(hidden_size * self.num_directions, output_size)

    def forward(self, x):
        rnn_out, _ = self.rnn(x)
        out = rnn_out[:, -1, :]  # last timestep output

        if self.batch_norm:
            out = self.bn(out)
        out = self.dropout_layer(out)
        out = self.fc(out)
        return out


In [None]:
def build_optimizer(model, optimizer_name='adam', lr=0.001, reg_lambda=0.0):
    if optimizer_name.lower() == 'adam':
        return optim.Adam(model.parameters(), lr=lr, weight_decay=reg_lambda)
    elif optimizer_name.lower() == 'sgd':
        return optim.SGD(model.parameters(), lr=lr, weight_decay=reg_lambda)
    else:
        raise ValueError("Unsupported optimizer")

def get_loss_fn(loss_name='mse'):
    if loss_name.lower() == 'mse':
        return nn.MSELoss()
    elif loss_name.lower() == 'crossentropy':
        return nn.CrossEntropyLoss()
    else:
        raise ValueError("Unsupported loss function")

# -------------------------
# Training Loop
# -------------------------
def train_rnn(model, optimizer, loss_fn, train_loader, val_loader=None, 
              epochs=50, device='cpu', early_stopping=False, patience=5, 
              use_scheduler=False, task_type='regression', sequence_mask=None):
    """
    task_type: 'regression', 'classification_last', 'classification_sequence'
    sequence_mask: tensor of shape (batch, seq_len) with 1 for valid timesteps, 0 for padding
    """
    model.to(device)
    best_loss = float('inf')
    counter = 0

    scheduler = None
    if use_scheduler:
        scheduler = optim.lr_scheduler.StepLR(optimizer, step_size=10, gamma=0.5)

    for epoch in range(epochs):
        model.train()
        train_loss = 0
        for X_batch, y_batch in train_loader:
            X_batch, y_batch = X_batch.to(device), y_batch.to(device)
            optimizer.zero_grad()
            
            y_pred = model(X_batch)

            # Handle sequence classification masking
            if task_type == 'classification_sequence' and sequence_mask is not None:
                mask = sequence_mask.to(device)
                y_pred = y_pred.view(-1, y_pred.size(-1))
                y_batch = y_batch.view(-1)
                mask = mask.view(-1)
                y_pred = y_pred[mask == 1]
                y_batch = y_batch[mask == 1]

            loss = loss_fn(y_pred, y_batch)
            loss.backward()
            optimizer.step()
            train_loss += loss.item() * X_batch.size(0)

        train_loss /= len(train_loader.dataset)

        # Validation
        val_loss = 0
        if val_loader:
            model.eval()
            with torch.no_grad():
                for X_val, y_val in val_loader:
                    X_val, y_val = X_val.to(device), y_val.to(device)
                    y_val_pred = model(X_val)

                    if task_type == 'classification_sequence' and sequence_mask is not None:
                        mask = sequence_mask.to(device)
                        y_val_pred = y_val_pred.view(-1, y_val_pred.size(-1))
                        y_val = y_val.view(-1)
                        mask = mask.view(-1)
                        y_val_pred = y_val_pred[mask == 1]
                        y_val = y_val[mask == 1]

                    loss_val = loss_fn(y_val_pred, y_val)
                    val_loss += loss_val.item() * X_val.size(0)
            val_loss /= len(val_loader.dataset)
            print(f"Epoch {epoch+1}/{epochs} | Train Loss: {train_loss:.4f} | Val Loss: {val_loss:.4f}")

            # Early stopping
            if early_stopping:
                if val_loss < best_loss:
                    best_loss = val_loss
                    counter = 0
                else:
                    counter += 1
                if counter >= patience:
                    print("Early stopping triggered!")
                    break
        else:
            print(f"Epoch {epoch+1}/{epochs} | Train Loss: {train_loss:.4f}")

        if use_scheduler:
            scheduler.step()

In [None]:
# Synthetic dataset
seq_len = 10
input_size = 5
hidden_size = 16
output_size = 3  # For classification
batch_size = 8
n_samples = 100

torch.manual_seed(64)

X = torch.randn(n_samples, seq_len, input_size)
y = torch.randint(0, output_size, (n_samples,)) 

# create train and validation loaders
dataset = TensorDataset(X, y)
train_size = int(0.8 * n_samples)
val_size = n_samples - train_size
train_dataset, val_dataset = torch.utils.data.random_split(dataset, [train_size, val_size])
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False)
# Model, optimizer, loss function
model = RNN(input_size, hidden_size, output_size, rnn_type='LSTM'
            ,num_layers=2, dropout=0.2, batch_norm=True, bidirectional=True)
optimizer = build_optimizer(model, optimizer_name='adam', lr=0.001, reg_lambda
=0.0001)
loss_fn = get_loss_fn(loss_name='crossentropy')
# Train the model
train_rnn(model, optimizer, loss_fn, train_loader, val_loader,
            epochs=100, device='cpu', early_stopping=True, patience=23, 
            use_scheduler=True, task_type='classification_last')
            


In [None]:
import torch
import torch.nn as nn
import torch.optim as optim

class OneToOneRNN(nn.Module):
    def __init__(self, input_size, hidden_size):
        super(OneToOneRNN, self).__init__()
        self.rnn = nn.RNN(input_size, hidden_size, batch_first=True)
        self.fc = nn.Linear(hidden_size, 1)
        self.sigmoid = nn.Sigmoid()
    
    def forward(self, x):
        out, _ = self.rnn(x)
        out = out[:, -1, :]  # last timestep
        out = self.fc(out)
        return self.sigmoid(out)

# Example usage
model = OneToOneRNN(input_size=10, hidden_size=32)
criterion = nn.BCELoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)


In [None]:
class OneToManyRNN(nn.Module):
    def __init__(self, input_size, hidden_size, output_seq_len, vocab_size):
        super(OneToManyRNN, self).__init__()
        self.fc_in = nn.Linear(input_size, hidden_size)
        self.rnn = nn.RNN(hidden_size, hidden_size, batch_first=True)
        self.fc_out = nn.Linear(hidden_size, vocab_size)
        self.output_seq_len = output_seq_len
    
    def forward(self, x):
        x = self.fc_in(x).unsqueeze(1)  # add time dimension
        x = x.repeat(1, self.output_seq_len, 1)  # repeat across time steps
        out, _ = self.rnn(x)
        return self.fc_out(out)

# Example usage
model = OneToManyRNN(input_size=2048, hidden_size=128, output_seq_len=20, vocab_size=10000)
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters())


In [None]:
class ManyToOneRNN(nn.Module):
    def __init__(self, input_size, hidden_size, num_classes, rnn_type='LSTM'):
        super(ManyToOneRNN, self).__init__()
        if rnn_type.upper() == 'LSTM':
            self.rnn = nn.LSTM(input_size, hidden_size, batch_first=True)
        elif rnn_type.upper() == 'GRU':
            self.rnn = nn.GRU(input_size, hidden_size, batch_first=True)
        else:
            self.rnn = nn.RNN(input_size, hidden_size, batch_first=True)
        self.fc1 = nn.Linear(hidden_size, 32)
        self.fc2 = nn.Linear(32, num_classes)
    
    def forward(self, x):
        out, _ = self.rnn(x)
        out = out[:, -1, :]  # last timestep
        out = torch.relu(self.fc1(out))
        return self.fc2(out)

# Example usage
model = ManyToOneRNN(input_size=128, hidden_size=64, num_classes=3)
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)


In [None]:
class Encoder(nn.Module):
    def __init__(self, input_size, hidden_size):
        super(Encoder, self).__init__()
        self.rnn = nn.RNN(input_size, hidden_size, batch_first=True)
    
    def forward(self, x):
        out, hidden = self.rnn(x)
        return hidden  # pass context to decoder

class Decoder(nn.Module):
    def __init__(self, output_size, hidden_size, seq_len):
        super(Decoder, self).__init__()
        self.rnn = nn.RNN(output_size, hidden_size, batch_first=True)
        self.fc = nn.Linear(hidden_size, output_size)
        self.seq_len = seq_len
    
    def forward(self, x, hidden):
        x = x.repeat(1, self.seq_len, 1)  # initial decoder input
        out, _ = self.rnn(x, hidden)
        return self.fc(out)

class Seq2Seq(nn.Module):
    def __init__(self, input_size, hidden_size, output_size, output_seq_len):
        super(Seq2Seq, self).__init__()
        self.encoder = Encoder(input_size, hidden_size)
        self.decoder = Decoder(output_size, hidden_size, output_seq_len)
    
    def forward(self, src, tgt):
        hidden = self.encoder(src)
        return self.decoder(tgt, hidden)

# Example usage
input_seq_len, output_seq_len = 50, 60
model = Seq2Seq(input_size=10000, hidden_size=256, output_size=12000, output_seq_len=output_seq_len)
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters())


In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset
from sklearn.metrics import accuracy_score, classification_report
import numpy as np

# ----------------------------
# Example BiRNN Model
# ----------------------------
class BiRNN(nn.Module):
    def __init__(self, vocab_size, embed_dim, hidden_size, output_size=1, num_layers=1, dropout=0.5):
        super(BiRNN, self).__init__()
        self.embedding = nn.Embedding(vocab_size, embed_dim)
        self.rnn = nn.RNN(embed_dim, hidden_size, num_layers=num_layers,
                          batch_first=True, bidirectional=True, nonlinearity='tanh')
        self.dropout = nn.Dropout(dropout)
        self.fc = nn.Linear(hidden_size * 2, output_size)  # bidirectional → hidden*2
        self.sigmoid = nn.Sigmoid()
    
    def forward(self, x):
        x = self.embedding(x)               # [batch, seq_len] -> [batch, seq_len, embed_dim]
        out, _ = self.rnn(x)                # [batch, seq_len, hidden*2]
        out = out[:, -1, :]                 # take last timestep
        out = self.dropout(out)
        out = self.fc(out)
        out = self.sigmoid(out)
        return out

# ----------------------------
# Hyperparameters
# ----------------------------
vocab_size = 2000
embed_dim = 128
hidden_size = 64
max_len = 50
batch_size = 32
epochs = 5
lr = 0.001
device = 'cuda' if torch.cuda.is_available() else 'cpu'

# ----------------------------
# Simulate data (replace with actual IMDb preprocessing)
# ----------------------------
X_train = np.random.randint(0, vocab_size, (1000, max_len))
y_train = np.random.randint(0, 2, (1000, 1))

X_test = np.random.randint(0, vocab_size, (200, max_len))
y_test = np.random.randint(0, 2, (200, 1))

# Convert to PyTorch tensors
train_dataset = TensorDataset(torch.LongTensor(X_train), torch.FloatTensor(y_train))
test_dataset = TensorDataset(torch.LongTensor(X_test), torch.FloatTensor(y_test))

train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=batch_size)

# ----------------------------
# Model, Loss, Optimizer
# ----------------------------
model = BiRNN(vocab_size, embed_dim, hidden_size).to(device)
criterion = nn.BCELoss()
optimizer = optim.Adam(model.parameters(), lr=lr)

# ----------------------------
# Training Loop
# ----------------------------
for epoch in range(epochs):
    model.train()
    train_losses = []
    for X_batch, y_batch in train_loader:
        X_batch, y_batch = X_batch.to(device), y_batch.to(device)

        optimizer.zero_grad()
        y_pred = model(X_batch)
        loss = criterion(y_pred, y_batch)
        loss.backward()
        optimizer.step()
        train_losses.append(loss.item())
    
    avg_train_loss = np.mean(train_losses)

    # Evaluation
    model.eval()
    y_true, y_pred_all = [], []
    with torch.no_grad():
        for X_batch, y_batch in test_loader:
            X_batch, y_batch = X_batch.to(device), y_batch.to(device)
            y_pred = model(X_batch)
            y_true.extend(y_batch.cpu().numpy())
            y_pred_all.extend((y_pred.cpu().numpy() > 0.5).astype(int))
    
    acc = accuracy_score(y_true, y_pred_all)
    print(f"Epoch {epoch+1}/{epochs} | Train Loss: {avg_train_loss:.4f} | Test Acc: {acc:.4f}")

# ----------------------------
# Classification Report
# ----------------------------
print(classification_report(y_true, y_pred_all, target_names=['Negative', 'Positive']))


In [None]:
"""
Bidirectional LSTM (BiLSTM) for Sentiment Analysis in PyTorch

Theory:
---------
1. BiLSTM is an extension of standard LSTM that processes sequences in both forward
   and backward directions, capturing past and future context simultaneously.
   
2. Each BiLSTM layer contains two LSTMs:
   - Forward LSTM: processes the sequence from start to end.
   - Backward LSTM: processes the sequence from end to start.
   
3. Outputs from both directions are concatenated to form the final representation:
   final_output_t = forward_output_t + backward_output_t

4. This makes BiLSTMs effective for NLP tasks like sentiment analysis, where context
   from both past and future words can improve classification performance.

5. Typical architecture:
   - Text tokenization/vectorization
   - Embedding layer
   - One or more BiLSTM layers (with optional dropout)
   - Dense layers for classification
"""

import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, Dataset
from torchtext.datasets import IMDB
from torchtext.data.utils import get_tokenizer
from torchtext.vocab import build_vocab_from_iterator
from sklearn.metrics import accuracy_score, classification_report

# ----------------------------
# Dataset Preparation
# ----------------------------
tokenizer = get_tokenizer("basic_english")
max_len = 100
batch_size = 32

# Custom dataset class
class IMDBDataset(Dataset):
    def __init__(self, split='train'):
        self.data = list(IMDB(split=split))
        self.texts = [text for (label, text) in self.data]
        self.labels = [0 if label=='neg' else 1 for (label, text) in self.data]

    def __len__(self):
        return len(self.labels)

    def __getitem__(self, idx):
        tokens = tokenizer(self.texts[idx])
        # Truncate/pad sequence
        tokens = tokens[:max_len] + ['<pad>']*(max_len - len(tokens)) if len(tokens) < max_len else tokens[:max_len]
        return tokens, self.labels[idx]

# Build vocabulary
def yield_tokens(dataset):
    for tokens, _ in dataset:
        yield tokens

train_dataset = IMDBDataset('train')
vocab = build_vocab_from_iterator(yield_tokens(train_dataset), specials=["<unk>", "<pad>"])
vocab.set_default_index(vocab["<unk>"])

# Collate function
def collate_batch(batch):
    texts, labels = zip(*batch)
    text_ids = torch.tensor([vocab(t) for t in texts], dtype=torch.long)
    labels = torch.tensor(labels, dtype=torch.float).unsqueeze(1)
    return text_ids, labels

train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True, collate_fn=collate_batch)
test_loader = DataLoader(IMDBDataset('test'), batch_size=batch_size, shuffle=False, collate_fn=collate_batch)

# ----------------------------
# BiLSTM Model
# ----------------------------
class BiLSTMModel(nn.Module):
    def __init__(self, vocab_size, embed_dim=64, hidden_size1=64, hidden_size2=32, output_size=1, dropout=0.4):
        super(BiLSTMModel, self).__init__()
        self.embedding = nn.Embedding(vocab_size, embed_dim, padding_idx=vocab["<pad>"])
        self.bilstm1 = nn.LSTM(embed_dim, hidden_size1, batch_first=True, bidirectional=True)
        self.dropout1 = nn.Dropout(dropout)
        self.bilstm2 = nn.LSTM(hidden_size1*2, hidden_size2, batch_first=True, bidirectional=True)
        self.dropout2 = nn.Dropout(dropout)
        self.fc1 = nn.Linear(hidden_size2*2, 64)
        self.fc2 = nn.Linear(64, output_size)
        self.sigmoid = nn.Sigmoid()
    
    def forward(self, x):
        x = self.embedding(x)                # [batch, seq_len, embed_dim]
        x, _ = self.bilstm1(x)              # [batch, seq_len, hidden1*2]
        x = self.dropout1(x)
        x, _ = self.bilstm2(x)              # [batch, seq_len, hidden2*2]
        x = self.dropout2(x)
        x = x[:, -1, :]                      # take last timestep
        x = torch.relu(self.fc1(x))
        x = self.fc2(x)
        x = self.sigmoid(x)
        return x

# ----------------------------
# Training Setup
# ----------------------------
device = 'cuda' if torch.cuda.is_available() else 'cpu'
model = BiLSTMModel(len(vocab)).to(device)
criterion = nn.BCELoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)
epochs = 3

# ----------------------------
# Training Loop
# ----------------------------
for epoch in range(epochs):
    model.train()
    train_losses = []
    for X_batch, y_batch in train_loader:
        X_batch, y_batch = X_batch.to(device), y_batch.to(device)
        optimizer.zero_grad()
        y_pred = model(X_batch)
        loss = criterion(y_pred, y_batch)
        loss.backward()
        optimizer.step()
        train_losses.append(loss.item())
    avg_loss = sum(train_losses)/len(train_losses)
    
    # Evaluation
    model.eval()
    y_true, y_pred_all = [], []
    with torch.no_grad():
        for X_batch, y_batch in test_loader:
            X_batch, y_batch = X_batch.to(device), y_batch.to(device)
            y_pred = model(X_batch)
            y_true.extend(y_batch.cpu().numpy())
            y_pred_all.extend((y_pred.cpu().numpy()>0.5).astype(int))
    acc = accuracy_score(y_true, y_pred_all)
    print(f"Epoch {epoch+1}/{epochs} | Train Loss: {avg_loss:.4f} | Test Acc: {acc:.4f}")

# ----------------------------
# Classification Report
# ----------------------------
print(classification_report(y_true, y_pred_all, target_names=['Negative', 'Positive']))


In [None]:
"""
Many-to-Many Bidirectional LSTM (BiLSTM) for Sequence Labeling (NER/POS Tagging)

Theory:
--------
1. Many-to-Many RNNs take an input sequence and output a sequence of the same length,
   making them suitable for tasks like Named Entity Recognition (NER) or POS tagging.

2. Bidirectional LSTMs capture both past and future context for each token by processing
   sequences in forward and backward directions. The hidden states are concatenated.

3. Architecture:
   - Embedding layer: converts tokens to dense vectors.
   - BiLSTM layer(s): output hidden states for each timestep.
   - Linear layer: maps hidden states at each timestep to class scores (num_classes).
   - Softmax (or CrossEntropyLoss): used for predicting token-level labels.

4. Output:
   For input sequence of shape [batch, seq_len], the output is [batch, seq_len, num_classes].
   Each timestep is classified independently using context-aware BiLSTM hidden states.
"""

import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, Dataset
import numpy as np
from sklearn.metrics import classification_report

# ----------------------------
# Example Dataset (simulated)
# ----------------------------
class SequenceDataset(Dataset):
    def __init__(self, num_samples=500, seq_len=10, vocab_size=50, num_classes=5):
        self.X = np.random.randint(1, vocab_size, size=(num_samples, seq_len))
        self.y = np.random.randint(0, num_classes, size=(num_samples, seq_len))
        self.vocab_size = vocab_size
        self.num_classes = num_classes
    
    def __len__(self):
        return len(self.X)
    
    def __getitem__(self, idx):
        return torch.tensor(self.X[idx], dtype=torch.long), torch.tensor(self.y[idx], dtype=torch.long)

# Dataset and DataLoader
train_dataset = SequenceDataset()
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)

# ----------------------------
# Many-to-Many BiLSTM Model
# ----------------------------
class ManyToManyBiLSTM(nn.Module):
    def __init__(self, vocab_size, embed_dim=64, hidden_size=128, num_classes=5, dropout=0.3):
        super(ManyToManyBiLSTM, self).__init__()
        self.embedding = nn.Embedding(vocab_size, embed_dim, padding_idx=0)
        self.bilstm = nn.LSTM(embed_dim, hidden_size, batch_first=True, bidirectional=True)
        self.dropout = nn.Dropout(dropout)
        self.fc = nn.Linear(hidden_size*2, num_classes)  # hidden_size*2 because of bidirectional
    
    def forward(self, x):
        x = self.embedding(x)               # [batch, seq_len, embed_dim]
        x, _ = self.bilstm(x)              # [batch, seq_len, hidden_size*2]
        x = self.dropout(x)
        x = self.fc(x)                      # [batch, seq_len, num_classes]
        return x

# ----------------------------
# Training Setup
# ----------------------------
device = 'cuda' if torch.cuda.is_available() else 'cpu'
model = ManyToManyBiLSTM(vocab_size=50, num_classes=5).to(device)
criterion = nn.CrossEntropyLoss()  # expects input [batch*seq_len, num_classes] and target [batch*seq_len]
optimizer = optim.Adam(model.parameters(), lr=0.001)
epochs = 5

# ----------------------------
# Training Loop
# ----------------------------
for epoch in range(epochs):
    model.train()
    total_loss = 0
    for X_batch, y_batch in train_loader:
        X_batch, y_batch = X_batch.to(device), y_batch.to(device)
        optimizer.zero_grad()
        y_pred = model(X_batch)  # [batch, seq_len, num_classes]
        
        # Reshape for CrossEntropyLoss
        y_pred = y_pred.view(-1, y_pred.shape[2])  # [batch*seq_len, num_classes]
        y_batch = y_batch.view(-1)                 # [batch*seq_len]
        
        loss = criterion(y_pred, y_batch)
        loss.backward()
        optimizer.step()
        total_loss += loss.item()
    
    avg_loss = total_loss / len(train_loader)
    print(f"Epoch {epoch+1}/{epochs} | Loss: {avg_loss:.4f}")

# ----------------------------
# Evaluation (on training data for example)
# ----------------------------
model.eval()
y_true_all, y_pred_all = [], []
with torch.no_grad():
    for X_batch, y_batch in train_loader:
        X_batch, y_batch = X_batch.to(device), y_batch.to(device)
        y_pred = model(X_batch)
        y_pred_labels = torch.argmax(y_pred, dim=2)
        y_true_all.extend(y_batch.cpu().numpy().flatten())
        y_pred_all.extend(y_pred_labels.cpu().numpy().flatten())

print(classification_report(y_true_all, y_pred_all))


In [None]:
"""
Gated Recurrent Unit (GRU) Networks

GRUs are a type of Recurrent Neural Network (RNN) introduced to efficiently model sequential data such as time-series, text, and speech.
While traditional RNNs struggle with long-term dependencies due to vanishing gradients, GRUs use gating mechanisms to selectively 
update the hidden state at each time step.

GRU Components:
1. Update Gate (z_t): Controls how much of the previous hidden state is retained for the next step.
2. Reset Gate (r_t): Determines how much of the past hidden state should be forgotten.
3. Candidate hidden state (h_t'): Computed using the reset gate and current input.
4. Hidden state (h_t): Weighted average of previous hidden state h_{t-1} and candidate h_t', controlled by the update gate.

GRU Equations:
r_t = σ(W_r * [h_{t-1}, x_t])
z_t = σ(W_z * [h_{t-1}, x_t])
h_t' = tanh(W_h * [r_t * h_{t-1}, x_t])
h_t = (1 - z_t) * h_{t-1} + z_t * h_t'

Advantages of GRUs:
- Simplified architecture compared to LSTM (2 gates vs 3)
- Faster training due to fewer parameters
- Performs similarly to LSTM in many sequential tasks

Typical Usage:
- Time-series forecasting
- NLP tasks
- Speech recognition
- Sequence classification
"""

import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np
import pandas as pd
from sklearn.preprocessing import MinMaxScaler

# ---------------------------
# 1. Load Dataset
# ---------------------------
df = pd.read_csv('data.csv', parse_dates=['Date'], index_col='Date')
data = df.values

# ---------------------------
# 2. Preprocess Data
# ---------------------------
scaler = MinMaxScaler(feature_range=(0, 1))
scaled_data = scaler.fit_transform(data)

def create_dataset(data, time_step=1):
    X, y = [], []
    for i in range(len(data) - time_step - 1):
        X.append(data[i:(i + time_step), 0])
        y.append(data[i + time_step, 0])
    return np.array(X), np.array(y)

time_step = 100
X, y = create_dataset(scaled_data, time_step)

# Convert to PyTorch tensors
X = torch.from_numpy(X).float().unsqueeze(-1)  # shape: [samples, time_step, features]
y = torch.from_numpy(y).float().unsqueeze(-1)  # shape: [samples, 1]

# ---------------------------
# 3. Define GRU Model
# ---------------------------
class GRUTimeSeries(nn.Module):
    def __init__(self, input_size=1, hidden_size=50, num_layers=2, output_size=1, dropout=0.2):
        super(GRUTimeSeries, self).__init__()
        self.hidden_size = hidden_size
        self.num_layers = num_layers

        # GRU layers
        self.gru = nn.GRU(input_size, hidden_size, num_layers=num_layers, 
                          batch_first=True, dropout=dropout)
        # Fully connected output layer
        self.fc = nn.Linear(hidden_size, output_size)

    def forward(self, x):
        # Initialize hidden state
        h0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size).to(x.device)

        # GRU forward pass
        out, _ = self.gru(x, h0)  # out: [batch, seq_len, hidden_size]

        # Take the last timestep output for prediction
        out = self.fc(out[:, -1, :])  # shape: [batch, output_size]
        return out

# ---------------------------
# 4. Train the Model
# ---------------------------
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = GRUTimeSeries().to(device)
criterion = nn.MSELoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

X_train, y_train = X.to(device), y.to(device)

epochs = 10
batch_size = 32

for epoch in range(epochs):
    permutation = torch.randperm(X_train.size(0))
    epoch_loss = 0

    for i in range(0, X_train.size(0), batch_size):
        indices = permutation[i:i+batch_size]
        batch_x, batch_y = X_train[indices], y_train[indices]

        optimizer.zero_grad()
        outputs = model(batch_x)
        loss = criterion(outputs, batch_y)
        loss.backward()
        optimizer.step()

        epoch_loss += loss.item() * batch_x.size(0)

    epoch_loss /= X_train.size(0)
    print(f"Epoch [{epoch+1}/{epochs}], Loss: {epoch_loss:.6f}")

# ---------------------------
# 5. Make Predictions
# ---------------------------
model.eval()
with torch.no_grad():
    input_seq = X_train[-1].unsqueeze(0)  # take last sequence
    predicted = model(input_seq)
    predicted_value = scaler.inverse_transform(predicted.cpu().numpy())
    print(f"Predicted value for next step: {predicted_value[0][0]:.2f}")
