In [None]:
import numpy as np
import random
import torch
import torch.nn as nn
from torch import optim
from torch.utils.data import DataLoader, TensorDataset
from tqdm import trange

class LSTMEncoder(nn.Module):
    ''' Encodes time-series sequence '''

    def __init__(self, input_size, hidden_size, num_layers=1):
        super(LSTMEncoder, self).__init__()
        self.input_size = input_size
        self.hidden_size = hidden_size
        self.num_layers = num_layers

        # define LSTM layer
        self.lstm = nn.LSTM(input_size=input_size, hidden_size=hidden_size, num_layers=num_layers, batch_first=True)

    def forward(self, x_input):
        lstm_out, hidden = self.lstm(x_input)
        return lstm_out, hidden

    def init_hidden(self, batch_size):
        return (torch.zeros(self.num_layers, batch_size, self.hidden_size),
                torch.zeros(self.num_layers, batch_size, self.hidden_size))

class LSTMDecoder(nn.Module):
    ''' Decodes hidden state output by encoder '''

    def __init__(self, input_size, hidden_size, num_layers=1):
        super(LSTMDecoder, self).__init__()
        self.input_size = input_size
        self.hidden_size = hidden_size
        self.num_layers = num_layers

        self.lstm = nn.LSTM(input_size=input_size, hidden_size=hidden_size, num_layers=num_layers, batch_first=True)
        self.linear = nn.Linear(hidden_size, input_size)

    def forward(self, x_input, encoder_hidden_states):
        lstm_out, hidden = self.lstm(x_input.unsqueeze(1), encoder_hidden_states)
        output = self.linear(lstm_out.squeeze(1))
        return output, hidden

class LSTMSeq2Seq(nn.Module):
    ''' Train LSTM encoder-decoder and make predictions '''

    def __init__(self, input_size, hidden_size, num_layers=1):
        super(LSTMSeq2Seq, self).__init__()
        self.input_size = input_size
        self.hidden_size = hidden_size
        self.num_layers = num_layers

        self.encoder = LSTMEncoder(input_size=input_size, hidden_size=hidden_size, num_layers=num_layers)
        self.decoder = LSTMDecoder(input_size=input_size, hidden_size=hidden_size, num_layers=num_layers)

    def train_model(self, train_loader, n_epochs, target_len, teacher_forcing_ratio=0.5, learning_rate=0.01, dynamic_tf=False):
        losses = np.full(n_epochs, np.nan)
        optimizer = optim.Adam(self.parameters(), lr=learning_rate)
        criterion = nn.MSELoss()

        with trange(n_epochs) as tr:
            for it in tr:
                batch_loss = 0.

                for batch in train_loader:
                    input_batch, target_batch = batch

                    batch_size = input_batch.size(0)
                    outputs = torch.zeros(batch_size, target_len, self.input_size)

                    encoder_hidden = self.encoder.init_hidden(batch_size)
                    optimizer.zero_grad()

                    encoder_output, encoder_hidden = self.encoder(input_batch)
                    decoder_input = input_batch[:, -1, :]
                    decoder_hidden = encoder_hidden

                    for t in range(target_len):
                        decoder_output, decoder_hidden = self.decoder(decoder_input, decoder_hidden)
                        outputs[:, t, :] = decoder_output

                        teacher_force = random.random() < teacher_forcing_ratio
                        decoder_input = target_batch[:, t, :] if teacher_force else decoder_output

                    loss = criterion(outputs, target_batch)
                    batch_loss += loss.item()

                    loss.backward()
                    optimizer.step()

                batch_loss /= len(train_loader)
                losses[it] = batch_loss

                if dynamic_tf and teacher_forcing_ratio > 0:
                    teacher_forcing_ratio = max(0, teacher_forcing_ratio - 0.02)

                tr.set_postfix(loss="{0:.3f}".format(batch_loss))

        return losses

    def predict(self, input_tensor, target_len):
        input_tensor = input_tensor.unsqueeze(0)
        encoder_output, encoder_hidden = self.encoder(input_tensor)

        outputs = torch.zeros(target_len, self.input_size)
        decoder_input = input_tensor[:, -1, :]
        decoder_hidden = encoder_hidden

        for t in range(target_len):
            decoder_output, decoder_hidden = self.decoder(decoder_input, decoder_hidden)
            outputs[t] = decoder_output.squeeze(0)
            decoder_input = decoder_output

        return outputs.detach().numpy()

# Example usage
input_size = 10
hidden_size = 128
num_layers = 2
seq_len = 20
target_len = 10
batch_size = 32
n_epochs = 100
learning_rate = 0.01
teacher_forcing_ratio = 0.5

# Dummy data
input_tensor = torch.randn(1000, seq_len, input_size)
target_tensor = torch.randn(1000, target_len, input_size)

# Create DataLoader
dataset = TensorDataset(input_tensor, target_tensor)
train_loader = DataLoader(dataset, batch_size=batch_size, shuffle=True)

# Initialize model
model = LSTMSeq2Seq(input_size=input_size, hidden_size=hidden_size, num_layers=num_layers)

# Train model
losses = model.train_model(train_loader, n_epochs, target_len, teacher_forcing_ratio, learning_rate)

# Predict
test_input = torch.randn(seq_len, input_size)
predictions = model.predict(test_input, target_len)
print(predictions)

In [None]:
import numpy as np
import random
import torch
import torch.nn as nn
from torch import optim
from torch.utils.data import DataLoader, TensorDataset
from tqdm import trange

class LSTMEncoder(nn.Module):
    ''' Encodes time-series sequence '''

    def __init__(self, input_size, hidden_size, num_layers=1):
        super(LSTMEncoder, self).__init__()
        self.input_size = input_size
        self.hidden_size = hidden_size
        self.num_layers = num_layers

        # define LSTM layer
        self.lstm = nn.LSTM(input_size=input_size, hidden_size=hidden_size, num_layers=num_layers, batch_first=True)

    def forward(self, x_input):
        lstm_out, hidden = self.lstm(x_input)
        return lstm_out, hidden

    def init_hidden(self, batch_size):
        return (torch.zeros(self.num_layers, batch_size, self.hidden_size),
                torch.zeros(self.num_layers, batch_size, self.hidden_size))

class Attention(nn.Module):
    ''' Attention mechanism '''

    def __init__(self, hidden_size):
        super(Attention, self).__init__()
        self.attn = nn.Linear(hidden_size * 2, hidden_size)
        self.v = nn.Parameter(torch.rand(hidden_size))
        self.init_weights()

    def init_weights(self):
        nn.init.xavier_uniform_(self.attn.weight)
        nn.init.constant_(self.attn.bias, 0)
        nn.init.uniform_(self.v, -0.1, 0.1)

    def forward(self, hidden, encoder_outputs):
        seq_len = encoder_outputs.size(1)
        hidden = hidden.unsqueeze(1).repeat(1, seq_len, 1)
        energy = torch.tanh(self.attn(torch.cat((hidden, encoder_outputs), dim=2)))
        energy = energy.transpose(1, 2)
        v = self.v.repeat(encoder_outputs.size(0), 1).unsqueeze(1)
        attention_weights = torch.bmm(v, energy).squeeze(1)
        return torch.softmax(attention_weights, dim=1)

class LSTMDecoderWithAttention(nn.Module):
    ''' Decodes hidden state output by encoder with attention '''

    def __init__(self, input_size, hidden_size, num_layers=1):
        super(LSTMDecoderWithAttention, self).__init__()
        self.input_size = input_size
        self.hidden_size = hidden_size
        self.num_layers = num_layers

        self.attention = Attention(hidden_size)
        self.lstm = nn.LSTM(hidden_size * 2 + input_size, hidden_size, num_layers=num_layers, batch_first=True)
        self.linear = nn.Linear(hidden_size, input_size)

    def forward(self, x_input, encoder_outputs, hidden, cell):
        attn_weights = self.attention(hidden[-1], encoder_outputs)
        context = torch.bmm(attn_weights.unsqueeze(1), encoder_outputs).squeeze(1)
        lstm_input = torch.cat([x_input, context], dim=1).unsqueeze(1)
        lstm_out, (hidden, cell) = self.lstm(lstm_input, (hidden, cell))
        output = self.linear(lstm_out.squeeze(1))
        return output, hidden, cell

class LSTMSeq2Seq(nn.Module):
    ''' Train LSTM encoder-decoder with attention and make predictions '''

    def __init__(self, input_size, hidden_size, num_layers=1):
        super(LSTMSeq2Seq, self).__init__()
        self.input_size = input_size
        self.hidden_size = hidden_size
        self.num_layers = num_layers

        self.encoder = LSTMEncoder(input_size=input_size, hidden_size=hidden_size, num_layers=num_layers)
        self.decoder = LSTMDecoderWithAttention(input_size=input_size, hidden_size=hidden_size, num_layers=num_layers)

    def train_model(self, train_loader, n_epochs, target_len, teacher_forcing_ratio=0.5, learning_rate=0.01, dynamic_tf=False):
        losses = np.full(n_epochs, np.nan)
        optimizer = optim.Adam(self.parameters(), lr=learning_rate)
        criterion = nn.MSELoss()

        for epoch in trange(n_epochs, desc="Epochs"):
            self.train()
            total_loss = 0

            for batch in train_loader:
                input_batch, target_batch = batch

                batch_size = input_batch.size(0)
                outputs = torch.zeros(batch_size, target_len, self.input_size)

                encoder_hidden = self.encoder.init_hidden(batch_size)
                optimizer.zero_grad()

                encoder_outputs, (hidden, cell) = self.encoder(input_batch)
                decoder_input = input_batch[:, -1, :]
                decoder_hidden = hidden
                decoder_cell = cell

                for t in range(target_len):
                    decoder_output, decoder_hidden, decoder_cell = self.decoder(decoder_input, encoder_outputs, decoder_hidden, decoder_cell)
                    outputs[:, t, :] = decoder_output

                    teacher_force = random.random() < teacher_forcing_ratio
                    decoder_input = target_batch[:, t, :] if teacher_force else decoder_output

                loss = criterion(outputs, target_batch)
                total_loss += loss.item()

                loss.backward()
                optimizer.step()

            total_loss /= len(train_loader)
            losses[epoch] = total_loss

            if dynamic_tf and teacher_forcing_ratio > 0:
                teacher_forcing_ratio = max(0, teacher_forcing_ratio - 0.02)

            trange.set_postfix(loss="{0:.3f}".format(total_loss))

        return losses

    def predict(self, input_tensor, target_len):
        input_tensor = input_tensor.unsqueeze(0)
        encoder_outputs, (hidden, cell) = self.encoder(input_tensor)

        outputs = torch.zeros(target_len, self.input_size)
        decoder_input = input_tensor[:, -1, :]
        decoder_hidden = hidden
        decoder_cell = cell

        for t in range(target_len):
            decoder_output, decoder_hidden, decoder_cell = self.decoder(decoder_input, encoder_outputs, decoder_hidden, decoder_cell)
            outputs[t] = decoder_output.squeeze(0)
            decoder_input = decoder_output

        return outputs.detach().numpy()

# Example usage
input_size = 10
hidden_size = 128
num_layers = 2
seq_len = 20
target_len = 10
batch_size = 32
n_epochs = 100
learning_rate = 0.01
teacher_forcing_ratio = 0.5

# Dummy data
input_tensor = torch.randn(1000, seq_len, input_size)
target_tensor = torch.randn(1000, target_len, input_size)

# Create DataLoader
dataset = TensorDataset(input_tensor, target_tensor)
train_loader = DataLoader(dataset, batch_size=batch_size, shuffle=True)

# Initialize model
model = LSTMSeq2Seq(input_size=input_size, hidden_size=hidden_size, num_layers=num_layers)

# Train model
losses = model.train_model(train_loader, n_epochs, target_len, teacher_forcing_ratio, learning_rate)

# Predict
test_input = torch.randn(seq_len, input_size)
predictions = model.predict(test_input, target_len)
print(predictions)