# LSTM

In [None]:
# Rossler from Scratch

from math import sqrt
from numpy import split
from numpy import array
from pandas import read_csv
from sklearn.metrics import mean_squared_error
from matplotlib import pyplot
from keras.models import Sequential
from keras.layers import Dense
from keras.layers import LSTM
from keras.layers import RepeatVector
from keras.layers import TimeDistributed
import matplotlib.pyplot as plt

In [None]:
### Loading Libraries and Data

In [None]:
### Rossler sample

#rossler_sample_1 = all_x20[all_x20['id'] == 1].copy()
#rossler_sample_1 = all_x20.loc[ all_x20['id'] == 1, ['x', 'y'] ].copy()
rossler_sample_1 = all_x20.loc[ all_x20['id'] == 1, ['x'] ].copy()

rossler_sample_1.head()

In [None]:
rossler_sample_1['x'].plot()
plt.show()

### Data Preparation

In [None]:
# Apply a MinMaxScaler
from sklearn.preprocessing import MinMaxScaler
scaler = MinMaxScaler()
df = pd.DataFrame(scaler.fit_transform(rossler_sample_1), columns = ['x'])

In [None]:
df['x'].head(10)

In [None]:
# Preparing the sequence data
ylist = list(df['x'])

n_future = 10
n_past = 10
total_period = 2 * 10 

idx_end = len(ylist)
idx_start = idx_end - total_period

X_new = []
y_new = []

while idx_start > 0:
  x_line = ylist[idx_start:idx_start+n_past]
  y_line = ylist[idx_start+n_past:idx_start+total_period]

  X_new.append(x_line)
  y_new.append(y_line)

  idx_start = idx_start - 1

# converting list of lists to numpy array
import numpy as np
X_new = np.array(X_new)
y_new = np.array(y_new)

In [None]:
# after your while‑loop and conversion to numpy
X_new = X_new[::-1]
y_new = y_new[::-1]

In [None]:
X_new[0]

In [None]:
X_new[1]

In [None]:
y_new[0]

In [None]:
# suppose X_new and y_new are already in chronological order
n = len(X_new)
train_end = int(0.95 * n)

X_train = X_new[:train_end]
y_train = y_new[:train_end]

X_test  = X_new[train_end:]
y_test  = y_new[train_end:]

In [None]:
X_train

In [None]:
y_train

In [None]:
# Reshape the data to be recognized by Keras

batch_size = 32

n_samples = X_train.shape[0]
n_timesteps = X_train.shape[1]
n_steps = y_train.shape[1]
n_features = 1

X_train_rs = X_train.reshape(n_samples, n_timesteps, n_features)
X_test_rs = X_test.reshape(X_test.shape[0], n_timesteps, n_features)

### Model 1

In [None]:

# One-Layer LSTM
import random
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense, LSTM

random.seed(42)

batch_size = 32

git add

### Model 2

In [None]:
### Model 2

from tensorflow.keras.layers import Input, Add
from tensorflow.keras.models import Model

inputs = Input(shape=(n_timesteps, n_features))
x = LSTM(64, activation='tanh')(inputs)
y = Dense(y_train.shape[1])(x)

# Residual connection (assumes shapes match)
residual = Dense(y_train.shape[1])(inputs[:, -1, :])  # last timestep
output = Add()([y, residual])

model_2 = Model(inputs, output)

### Model 3

In [None]:
from tensorflow.keras.layers import Bidirectional

model = Sequential([
    Bidirectional(LSTM(64, activation='tanh'), input_shape=(n_timesteps, n_features)),
    Dense(y_train.shape[1])
])


### Model 4

In [None]:
from tensorflow.keras.layers import Conv1D, LSTM, Flatten

model = Sequential([
    Conv1D(filters=32, kernel_size=3, activation='relu', input_shape=(n_timesteps, n_features)),
    LSTM(64),
    Dense(y_train.shape[1])
])


### Model 5

In [None]:
from tensorflow.keras.layers import RepeatVector, TimeDistributed

model = Sequential([
    LSTM(64, activation='tanh', input_shape=(n_timesteps, n_features)),
    RepeatVector(1),  # since we forecast one step
    LSTM(64, activation='tanh', return_sequences=True),
    TimeDistributed(Dense(y_train.shape[1]))
])


### Model not included

In [None]:

from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import LSTM, RepeatVector, TimeDistributed, Dense, Dropout

model = Sequential([
    # encoder
    LSTM(128, activation='tanh', input_shape=(n_timesteps, n_features)),
    Dropout(0.2),
    RepeatVector(n_timesteps),            # repeat latent for each future step

    # decoder
    LSTM(128, activation='tanh', return_sequences=True),
    Dropout(0.2),
    TimeDistributed(Dense(64, activation='relu')),
    TimeDistributed(Dense(1)),             # one output per timestep
])
model.compile('adam', loss='mae')

### Beast Model

In [None]:
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras.layers import (
    Input, Conv1D, LayerNormalization,
    Bidirectional, LSTM, Dropout, Attention,
    Add, Dense
)
from tensorflow.keras.models import Model

# --- Define the beast ---
inputs = Input(shape=(n_timesteps, n_features))

# 1) Local feature extraction
x = Conv1D(64, kernel_size=3, padding='same', activation='relu')(inputs)
x = LayerNormalization()(x)

# 2) First bidirectional LSTM stack
x_enc = Bidirectional(LSTM(128, return_sequences=True))(x)
x_enc = Dropout(0.3)(x_enc)

# 3) Self-attention over all timesteps
attn = Attention()([x_enc, x_enc])
x_res1 = Add()([x_enc, attn])             # Residual connection

# 4) Second bidirectional LSTM (encoder→decoder style)
x_dec = Bidirectional(LSTM(64, return_sequences=False))(x_res1)
x_dec = Dropout(0.3)(x_dec)

# 5) Residual skip from last input timestep
skip = Dense(y_train.shape[1])(inputs[:, -1, :])

# 6) Final projection + residual
out_dense = Dense(y_train.shape[1])(x_dec)
outputs = Add()([out_dense, skip])

beast_model = Model(inputs=inputs, outputs=outputs, name="BeastLSTM")

### Model initialization

In [None]:
current_model = model_1

In [None]:
current_model.summary()

In [None]:
import keras

current_model.compile(
    optimizer=keras.optimizers.Adam(learning_rate=0.01),
    loss='mean_absolute_error',
    metrics=['mean_absolute_error'],
)

In [None]:
from tensorflow.keras.callbacks import ReduceLROnPlateau, EarlyStopping

callbacks = [
    ReduceLROnPlateau(monitor='val_loss', factor=0.5, patience=5, min_lr=1e-6),
    EarlyStopping(monitor='val_loss', patience=15, restore_best_weights=True)
]

In [None]:
smod_history = current_model.fit(X_train_rs, y_train,
                                validation_split=0.2,
                                epochs=100,
                                batch_size=batch_size,
                                shuffle = False,
                                callbacks = callbacks,
                                verbose = 1
                                )

### One step to one step forecasting

In [None]:
X_test  = X_new[train_end:]
y_test  = y_new[train_end:]

In [None]:
X_test_rs = X_test.reshape(X_test.shape[0], n_timesteps, n_features)

In [None]:
X_test_rs

In [None]:
y_test

In [None]:
0.2802436


In [None]:
# demonstrate prediction
x_input = array([0.2802436])
x_input = x_input.reshape((1, n_steps))
yhat = beast_model.predict(x_input, verbose=0)

In [None]:
print(yhat)

In [None]:
0.28430681

In [None]:
X_test_rs.shape

In [None]:
X_test_rs[:50, 0, :]
#X_test_rs[1,:]

In [None]:
y_test[:50,0]

In [None]:
yupi = X_test_rs[:50, 0, :]

In [None]:
len(yupi)e

In [None]:
yupi[0].reshape((1, n_steps)) 

In [None]:
y_test[:50]

In [None]:
len(X_test_rs)

In [None]:
# Initialize input andlight caffeine blend list to store predictions
x_input = X_test_rs[0].reshape((1, n_steps))  # start with the first value
predictions = []

# Loop to predict one step at a time
for i in range(len(X_test_rs)):
    yhat = beast_model.predict(x_input, verbose=0)
    predictions.append(yhat[0, 0])
    
    # update input to next step — use prediction as input
    x_input = yhat.reshape((1, n_steps))

# Convert predictions to numpy arr
predictions = np.array(predictions)

In [None]:
# Plot
plt.plot(y_test, label='Original (last 20)', marker='o')
plt.plot(predictions, label='Predicted', marker='x')
plt.legend()
plt.title('One-Step Recursive Forecasting')
plt.xlim(0,5)
plt.show()

In [None]:
predictions.shape

In [None]:
y_test = y_test.reshape((500,))
#y_test.shape

In [None]:
true = y_test
pred = predictions

In [None]:
from sklearn import metrics
from scipy.stats import skew
import numpy as np

metric_dict = {
    "MAE": metrics.mean_absolute_error(true, pred),
    "MSE": metrics.mean_squared_error(true, pred),
    "RMSE": np.sqrt(metrics.mean_squared_error(true, pred)),
    "R2": metrics.r2_score(true, pred),
    "Explained Variance": metrics.explained_variance_score(true, pred),
    "Max Error": metrics.max_error(true, pred),
    "Median AE": metrics.median_absolute_error(true, pred),
    "Mean Bias": np.mean(pred - true),
    "Mean Forecast": np.mean(pred),
    "Mean Ground Truth": np.mean(true),
    "Std Forecast": np.std(pred),
    "Std Ground Truth": np.std(true),
    "MASE": np.mean(np.abs(pred - true)) / np.mean(np.abs(np.diff(true))),
    "MAPE": np.mean(np.abs((true - pred) / true)) * 100 if np.all(true != 0) else np.nan,
    "SMAPE": 100 * np.mean(2 * np.abs(pred - true) / (np.abs(pred) + np.abs(true))),
    "NRMSE": np.sqrt(metrics.mean_squared_error(true, pred)) / (np.max(true) - np.min(true)),
    "Cosine Similarity": metrics.pairwise.cosine_similarity(true.reshape(1, -1), pred.reshape(1, -1))[0, 0],
    "Pearson r": np.corrcoef(true, pred)[0, 1],
    "Skew True": skew(true),
    "Skew Pred": skew(pred),
}

In [None]:
print("📊 Forecast Evaluation Metrics (Last 20 Points):\n")
for name, val in metric_dict.items():
    print(f"{name:<20}: {val:.6f}")

In [None]:
### Sewquence to one step forecasting

In [None]:
X_test_rs[0]

In [None]:
X_test_rs[1]

In [None]:
y_test[0]

In [None]:
y_test[1]

In [None]:
X_test_rs[0].shape

In [None]:
# demonstrate prediction
x_input = X_test_rs[0]
x_input = x_input.reshape((1, n_steps))
yhat = current_model.predict(x_input, verbose=0)

In [None]:
X_test_rs[0].shape

In [None]:
yhat[0].shape

In [None]:
print(yhat)

In [None]:
y_test[0]

In [None]:
plt.plot(y_test[0], color='r', label = 'real')
plt.plot(yhat[0], color = 'blue', label = 'predicted')
plt.legend()
plt.grid()
plt.show()

### Beast Model

In [None]:
# --- Compile as requested ---
model.compile(
    optimizer=keras.optimizers.Adam(learning_rate=0.01),
    loss='mean_absolute_error',
    metrics=['mean_absolute_error'],
)

In [None]:
# --- Fit as requested ---
smod_history = model.fit(
    X_train_rs, y_train,
    validation_split=0.2,
    epochs=100,
    batch_size=batch_size,
    shuffle=False
)

In [None]:
### INterpreting model

plt.plot(smod_history.history['loss'])
plt.plot(smod_history.history['val_loss'])
plt.title('model loss')
plt.xlabel('epoch')
plt.legend(['train', 'val'], loc='upper left')
plt.show()

In [None]:
preds = model.predict(X_test_rs)

# flatten to 1D
y_pred = preds.ravel()
y_true = y_test.ravel()

# if they correspond to time steps t = t0, t0+1, ..., t0+1999
t = np.arange(len(y_pred))

plt.figure(figsize=(12,4))
plt.plot(t, y_true, label='actual', linewidth=1)
plt.plot(t, y_pred, label='predicted', linewidth=1, alpha=0.8)
plt.xlabel('Time step')
plt.ylabel('Value')
plt.legend()
plt.grid()
plt.show()

plt.figure(figsize=(6,6))
plt.scatter(y_true, y_pred, s=10, alpha=0.3)
plt.plot([y_true.min(), y_true.max()],
         [y_true.min(), y_true.max()],
         'r--', linewidth=1, label='perfect')
plt.xlabel('Actual')
plt.ylabel('Predicted')
plt.legend()
plt.grid()
plt.show()

In [None]:
from sklearn.metrics import r2_score
print(r2_score(preds,y_test))

In [None]:
### One Step Forecasting

import numpy as np
from sklearn.metrics import mean_absolute_error
import matplotlib.pyplot as plt

# Assume simple_model is your trained one-step LSTM,
# X_test_rs has shape (2000, 1, 1), and
# y_test has shape (2000, 1).

# 1. Grab the very first test input (seed) and the next 20 true points
seed_window = X_test_rs[0].reshape(-1)   # shape (1,) → just the single last observed point
true_continuation = y_test[:2].ravel()  # the next 20 true points, y_test[0]..y_test[19]

# 2. Iteratively forecast
horizon = 2
preds_iter = []
current_input = seed_window.copy()       # will update this each step

for i in range(horizon):
    # model.predict expects shape (batch, timesteps, features)
    x_in = current_input.reshape(1, 1, 1)
    y_hat = model.predict(x_in, verbose=0)[0, 0]
    preds_iter.append(y_hat)

    # slide the window: drop oldest point, append prediction
    current_input = np.array([y_hat])

preds_iter = np.array(preds_iter)

# 3. Compute error metrics
mae20 = mean_absolute_error(true_continuation, preds_iter)
print(f"MAE over {horizon}-step iterative forecast: {mae20:.4f}")

# 4. Plot true vs. predicted over the 20-step horizon
plt.figure(figsize=(8,3))
steps = np.arange(1, horizon+1)
plt.plot(steps, true_continuation, 'o-', label='true')
plt.plot(steps, preds_iter, 's--', label='iterative pred')
plt.xlabel('Forecast step')
plt.ylabel('Value')
plt.title(f'20-step Iterative Forecast (MAE={mae20:.4f})')
plt.legend()
plt.grid(True)
plt.show()