# Demo Long Short-Term Memory

## Model Design

Denote input $\mathbf x$, hidden state (short term memory) $\mathbf h\in\mathbb{R}^K$, cell state (long term memory) $\mathbf c\in\mathbb{R}^K$. Define the function generating gates

$$
G(\mathbf x, \mathbf h;W_x, W_h,\beta) = \sigma(W_x\mathbf x+W_h\mathbf h+\beta).
$$

Define function generating candidate for long-term memeory

$$
H(\mathbf x, \mathbf h;W_x, W_h,\beta) = \tanh(W_x\mathbf x+W_h\mathbf h+\beta).
$$

With the same parameters, for each stage $t$ with $\mathbf h_{t-1}$, $\mathbf c_{t-1}$, we have forget gate 

$$
f = G(\mathbf x, \mathbf h;W_{xf}, W_{hf},\beta_f),
$$

input gate

$$
i = G(\mathbf x, \mathbf h;W_{xi}, W_{fi},\beta_i),
$$

new cell state as

$$
c_{t} =  f\mathbf c_{t-1}+ iH(\mathbf x, \mathbf h;W_{xx}, W_{fx},\beta_x).
$$

The new hidden state is

$$
\mathbf{h}_t = G(\mathbf x, \mathbf h;W_{xo}, W_{ho},\beta_o)\tanh(\mathbf{c}_t).
$$

## Data Preparation

### Generate Numpy Sequential Data

In [1]:
import numpy as np
import torch
from torch import nn
from torch.utils.data import TensorDataset, DataLoader, random_split

def gen_wave(wave_type: int, cycles: int, phase: float, seq_len: int):
    """Return a 1D NumPy array of length seq_len for the chosen wave."""
    t = np.linspace(0, 1, seq_len, dtype=np.float32)
    f = float(cycles)  # cycles across [0,1]
    if wave_type == 0:      # sine
        x = np.sin(2 * np.pi * f * t + phase)
    elif wave_type == 1:    # square
        x = np.sign(np.sin(2 * np.pi * f * t + phase))
    else:                   # sawtooth in [-1, 1]
        frac = (f * t + phase / (2*np.pi)) % 1.0
        x = 2.0 * (frac - 0.5)
    return x.astype(np.float32)

N_SAMPLES = 3000
SEQ_LEN   = 100

np.random.seed(42)
X_list, y_list = [], []
for _ in range(N_SAMPLES):
    wave_type = np.random.randint(0, 3)           # 0/1/2
    cycles    = np.random.randint(5, 11)          # 5–10 periods
    phase     = np.random.uniform(0.0, 2*np.pi)   # random phase
    wave      = gen_wave(wave_type, cycles, phase, SEQ_LEN)
    X_list.append(wave)                            # (T,)
    y_list.append(wave_type)

X = np.stack(X_list, axis=0).astype(np.float32)    # (N, T)
y = np.array(y_list, dtype=np.int64)               # (N,)

### Generate DataLoader

In [2]:
BATCH_SIZE = 64

torch.manual_seed(42)
np.random.seed(42)

X_torch = torch.from_numpy(X).unsqueeze(-1)               # (N, T, 1)
y_torch = torch.from_numpy(y)                             # (N,)

dataset = TensorDataset(X_torch, y_torch)

val_ratio = 0.2
n_val = int(len(dataset) * val_ratio)
n_train = len(dataset) - n_val
train_ds, val_ds = random_split(dataset, [n_train, n_val], generator=torch.Generator().manual_seed(42))

train_loader = DataLoader(train_ds, batch_size=BATCH_SIZE, shuffle=True, drop_last=False)
val_loader   = DataLoader(val_ds,   batch_size=BATCH_SIZE, shuffle=False, drop_last=False)

## Define RNN Model

In [3]:
class LSTMCell(nn.Module):
    def __init__(self, input_size: int, hidden_size: int, forget_bias: float = 1.0):
        super().__init__()
        self.hidden_size = hidden_size
        # x->gates and h->gates (concat: [i | f | g | o])
        self.x2g = nn.Linear(input_size, 4 * hidden_size, bias=True)
        self.h2g = nn.Linear(hidden_size, 4 * hidden_size, bias=True)
        with torch.no_grad():
            H = hidden_size
            self.x2g.bias[H:2*H].fill_(forget_bias)
            self.h2g.bias[H:2*H].fill_(forget_bias)

    def forward(self, x_t, h_prev, c_prev):
        """
        x_t:    (B, input_size)
        h_prev: (B, H)
        c_prev: (B, H)
        returns: h_t, c_t
        """
        gates = self.x2g(x_t) + self.h2g(h_prev)   # (B, 4H)
        H = self.hidden_size
        i = torch.sigmoid(gates[:, 0:H])           # input gate
        f = torch.sigmoid(gates[:, H:2*H])         # forget gate
        g = torch.tanh(   gates[:, 2*H:3*H])       # candidate
        o = torch.sigmoid(gates[:, 3*H:4*H])       # output gate
        c_t = f * c_prev + i * g                   # long-term memory
        h_t = o * torch.tanh(c_t)                  # short-term / output
        return h_t, c_t 

- Demo use of RNNCell

In [4]:
x_batch, y_batch = next(iter(train_loader))   # x_batch: (B, T, 1)
x = x_batch[36]                               # a random sample, x: (100, 1)
print("x shape:", x.shape)                    # (seq_len, 1)

cell = LSTMCell(input_size=1, hidden_size=4)
h = torch.zeros(1, 4)   # keep a batch dim of 1
c = torch.zeros(1, 4)

t = 0
# x[t] is (1,), make it (1,1) to match (B=1, input_size=1)
x_t = x[t].unsqueeze(0)                       # (1, 1) to match (B=1,input_size=1)
h_new, c_new = cell(x_t, h, c)

print("h_new:", h_new.squeeze(0))             # (4,)
print("c_new:", c_new.squeeze(0))             # (4,)

x shape: torch.Size([100, 1])
h_new: tensor([-0.0556, -0.1769,  0.1051, -0.2274], grad_fn=<SqueezeBackward1>)
c_new: tensor([-0.0757, -0.4367,  0.3316, -0.3853], grad_fn=<SqueezeBackward1>)


In [5]:
import torch
from torch import nn

class LSTMClassifier(nn.Module):
    def __init__(self, input_size=1, hidden_size=4, num_classes=3):
        super().__init__()
        self.hidden_size = hidden_size
        self.cell = LSTMCell(input_size, hidden_size)   # just one layer
        self.fc = nn.Linear(hidden_size, num_classes)

    def forward(self, x):
        """
        x: (B, T, F)
        returns: logits (B, num_classes)
        """
        B, T, F = x.shape
        h = x.new_zeros(B, self.hidden_size)
        c = x.new_zeros(B, self.hidden_size)
        for t in range(T):
            x_t = x[:, t, :]          # (B, F)
            h, c = self.cell(x_t, h, c)
        return self.fc(h)             # use last hidden state

## Train RNN Model

### Define training step

In [6]:
model = LSTMClassifier(input_size=1, hidden_size=4, num_classes=3)
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=1e-3)

def train_epoch(model, loader, criterion, optimizer):
    model.train()
    total_loss, total_correct, total_samples = 0.0, 0, 0
    for xb, yb in loader:
        xb, yb = xb, yb
        optimizer.zero_grad()
        logits = model(xb)
        loss = criterion(logits, yb)
        loss.backward()
        torch.nn.utils.clip_grad_norm_(model.parameters(), 1.0)
        optimizer.step()
        total_loss   += loss.item() * xb.size(0)
        total_correct += (logits.argmax(1) == yb).sum().item()
        total_samples += xb.size(0)
    return total_loss/total_samples, total_correct/total_samples

@torch.no_grad()
def evaluate(model, loader, criterion):
    model.eval()
    total_loss, total_correct, total_samples = 0.0, 0, 0
    for xb, yb in loader:
        xb, yb = xb, yb
        logits = model(xb)
        loss = criterion(logits, yb)
        total_loss   += loss.item() * xb.size(0)
        total_correct += (logits.argmax(1) == yb).sum().item()
        total_samples += xb.size(0)
    return total_loss/total_samples, total_correct/total_samples

### Training Loop

In [None]:
n_epochs = 30
for epoch in range(1, n_epochs):
    tr_loss, tr_acc = train_epoch(model, train_loader, criterion, optimizer)
    va_loss, va_acc = evaluate(model, val_loader, criterion)
    if (epoch+1)%5 ==0:
        print(f"epoch {epoch+1:02d} | train {tr_loss:.4f}/{tr_acc:.3f} | val {va_loss:.4f}/{va_acc:.3f}")

model.eval()
with torch.no_grad():
    xb, yb = next(iter(val_loader))
    preds = model(xb).argmax(1).cpu().numpy()[:10]
    print(f"true:{yb[:10].numpy()}\n pred:{preds}")