In [None]:
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import pytorch_lightning as pl
import torch 
import torch.nn as nn
import torch.nn.functional as F
import statistics

from pynamical import cubic_map, logistic_map, simulate, singer_map
from torchmetrics import ExplainedVariance
from torch.utils.data import DataLoader, Dataset

np.set_printoptions(suppress=True)

In [None]:
class timeseries(Dataset):
    """
    To create and load custom datasets
    """
    def __init__(self, x, y):
        self.x = torch.tensor(x, dtype=torch.float32)
        self.y = torch.tensor(y, dtype=torch.float32)
        
    def __len__(self):
        return len(self.x)
    
    def __getitem__(self, idx):
        """
        returns a sample from the dataset at the given index idx
        """
        return self.x[idx], self.y[idx]
    
    
class RMSELoss(torch.nn.Module):
    """
    Deriving RMSE from MSE
    """
    def __init__(self):
        super(RMSELoss,self).__init__()

    def forward(self,y,y_pred):
        criterion = nn.MSELoss()
        eps = 1e-6
        loss = torch.sqrt(criterion(y,y_pred) + eps)
        return loss
    
    
class MLP(pl.LightningModule):

    def __init__(self, window_size, num_points):
        super().__init__()
        self.window_size = window_size
        self.l1 = nn.Linear(1, window_size)
        self.relu = nn.ReLU()
        self.l2 = nn.Linear(window_size, num_points)

    def forward(self, x):      
        out = self.l1(x)
        out = out[:,-1,:]
        out = self.relu(out)
        out = self.l2(out)
        return out
    
    def configure_optimizers(self):
        optimizer = torch.optim.Adam(self.parameters(), lr=0.0001)
        return optimizer
    
    def training_step(self, batch, batch_idx):
        x, y = batch
        x = x = x[:][0].view(-1,self.window_size,1)
        y_pred = self(x)
        criterion = RMSELoss()
        loss = criterion(y_pred, y)
        self.log("train_loss", loss, on_step=True, on_epoch=True, prog_bar=True, logger=True)
        return loss
    
    def test_step(self, batch, batch_idx):
        x, y = batch
        x = x = x[:][0].view(-1,self.window_size,1)
        y_pred = self(x)
        criterion = RMSELoss()
        loss = criterion(y_pred, y)
        self.log("test_loss", loss, prog_bar=True, logger=True)
        return loss


In [None]:
N = [1, 2, 4, 8, 16]
runs = 10
window_sizes = [5, 10, 2]

def sine_wave_prediction(window_size, num_points):
    
    # generate the sine wave  
    frequency = 3 # number of oscillations
    amplitude = 3 # peak height of wave
    theta = 0 # amplitude of wave at time 0
    points = 100

    x_ = np.linspace(0, 2*np.pi*frequency, points)
    data = amplitude * np.sin(x_) + theta
    
    # split sine wave into inputs and targets
    inputs = []
    targets = []
    for i in range(len(data)):  
        first_index = i + window_size
        last_index = first_index + num_points
        if last_index > len(data) - 1:
            break # prevent "index is out of range" error
        inputs.append(data[i:first_index])
        targets.append(data[first_index:last_index])
    inputs = np.array(inputs)
    targets = np.array(targets)

    # train test split
    split_ratio = int(0.7*points)
    x_train = inputs[:split_ratio]
    x_test = inputs[split_ratio:]
    y_train = targets[:split_ratio]
    y_test = targets[split_ratio:]

    # create dataloaders
    train_data = timeseries(x_train, y_train)
    test_data = timeseries(x_test, y_test)
    train_dataloader = DataLoader(train_data) 
    test_dataloader = DataLoader(test_data)
    
    # train and test
    model = MLP(window_size, num_points)
    trainer = pl.Trainer(max_epochs=10)
    trainer.fit(model, train_dataloader)
    output = trainer.test(test_dataloaders=test_dataloader)
    return output
    
sine_wave_df = pd.DataFrame(columns=["Num Points", "Mean RMSE"])
for num_points in N:
    rmse = []
    for i in range(runs):   
        print(f"Points to predict: {num_points}")
        print(f"Run number {i+1}")
        output = sine_wave_prediction(window_sizes[0], num_points)
        rmse.append(output[0]["test_loss"])
    # get mean RMSE across 10 runs for each num_points value
    mean_rmse = statistics.mean(rmse)
    sine_wave_df.loc[len(sine_wave_df.index)] = [int(num_points), mean_rmse]
sine_wave_df.style 
    

In [None]:
def noisy_sine_wave_prediction(window_size, num_points):
    
    # generate the sine wave  
    frequency = 3 # number of oscillations
    amplitude = 3 # peak height of wave
    theta = 0 # amplitude of wave at time 0
    points = 100

    x_ = np.linspace(0, 2*np.pi*frequency, points)
    noise = np.random.normal(0,1,100)
    data = (amplitude * np.sin(x_) + theta) + noise
    
    # split sine wave into inputs and targets
    inputs = []
    targets = []
    for i in range(len(data)):  
        first_index = i + window_size
        last_index = first_index + num_points
        if last_index > len(data) - 1:
            break # prevent "index is out of range" error
        inputs.append(data[i:first_index])
        targets.append(data[first_index:last_index])
    inputs = np.array(inputs)
    targets = np.array(targets)

    # train test split
    split_ratio = int(0.7*points)
    x_train = inputs[:split_ratio]
    x_test = inputs[split_ratio:]
    y_train = targets[:split_ratio]
    y_test = targets[split_ratio:]

    # create dataloaders
    train_data = timeseries(x_train, y_train)
    test_data = timeseries(x_test, y_test)
    train_dataloader = DataLoader(train_data) 
    test_dataloader = DataLoader(test_data)
    
    # train and test
    model = MLP(window_size, num_points)
    trainer = pl.Trainer(max_epochs=10)
    trainer.fit(model, train_dataloader)
    output = trainer.test(test_dataloaders=test_dataloader)
    return output
    
noisy_sine_wave_df = pd.DataFrame(columns=["Num Points", "Mean RMSE"])
for num_points in N:
    rmse = []
    for i in range(runs):   
        print(f"Points to predict: {num_points}")
        print(f"Run number {i+1}")
        output = sine_wave_prediction(window_sizes[0], num_points)
        rmse.append(output[0]["test_loss"])
    # get mean RMSE across 10 runs for each num_points value
    mean_rmse = statistics.mean(rmse)
    noisy_sine_wave_df.loc[len(noisy_sine_wave_df.index)] = [int(num_points), mean_rmse]
noisy_sine_wave_df.style 
    

In [None]:
def logistic_map_prediction(window_size, num_points):
    
    points = 100
    data = simulate(model=logistic_map, num_gens=1, rate_min=0, rate_max=4, num_rates=points, num_discard=100)
    if type(data) != np.ndarray:
        data = data.to_numpy().flatten()
    
    # split sine wave into inputs and targets
    inputs = []
    targets = []
    for i in range(len(data)):  
        first_index = i + window_size
        last_index = first_index + num_points
        if last_index > len(data) - 1:
            break # prevent "index is out of range" error
        inputs.append(data[i:first_index])
        targets.append(data[first_index:last_index])
    inputs = np.array(inputs)
    targets = np.array(targets)

    # train test split
    split_ratio = int(0.7*points)
    x_train = inputs[:split_ratio]
    x_test = inputs[split_ratio:]
    y_train = targets[:split_ratio]
    y_test = targets[split_ratio:]

    # create dataloaders
    train_data = timeseries(x_train, y_train)
    test_data = timeseries(x_test, y_test)
    train_dataloader = DataLoader(train_data) 
    test_dataloader = DataLoader(test_data)
    
    # train and test
    model = MLP(window_size, num_points)
    trainer = pl.Trainer(max_epochs=10)
    trainer.fit(model, train_dataloader)
    output = trainer.test(test_dataloaders=test_dataloader)
    return output
    
logistic_map_df = pd.DataFrame(columns=["Num Points", "Mean RMSE"])
for num_points in N:
    rmse = []
    for i in range(runs):   
        print(f"Points to predict: {num_points}")
        print(f"Run number {i+1}")
        output = logistic_map_prediction(window_sizes[0], num_points)
        rmse.append(output[0]["test_loss"])
    # get mean RMSE across 10 runs for each num_points value
    mean_rmse = statistics.mean(rmse)
    logistic_map_df.loc[len(logistic_map_df.index)] = [int(num_points), mean_rmse]
logistic_map_df.style 

In [None]:
# this function is currently producing nan/inf loss, still figuring out why

def singer_map_prediction(window_size, num_points):
    
    points = 100
    data = simulate(model=singer_map, num_gens=1, rate_min=0.9, rate_max=1.08, num_rates=100, num_discard=100)
    if type(data) != np.ndarray:
        data = data.to_numpy().flatten()
    
    # split sine wave into inputs and targets
    inputs = []
    targets = []
    for i in range(len(data)):  
        first_index = i + window_size
        last_index = first_index + num_points
        if last_index > len(data) - 1:
            break # prevent "index is out of range" error
        inputs.append(data[i:first_index])
        targets.append(data[first_index:last_index])
    inputs = np.array(inputs)
    targets = np.array(targets)

    # train test split
    split_ratio = int(0.7*points)
    x_train = inputs[:split_ratio]
    x_test = inputs[split_ratio:]
    y_train = targets[:split_ratio]
    y_test = targets[split_ratio:]

    # create dataloaders
    train_data = timeseries(x_train, y_train)
    test_data = timeseries(x_test, y_test)
    train_dataloader = DataLoader(train_data) 
    test_dataloader = DataLoader(test_data)
    
    # train and test
    model = MLP(window_size, num_points)
    trainer = pl.Trainer(max_epochs=10, terminate_on_nan=True)
    trainer.fit(model, train_dataloader)
    output = trainer.test(test_dataloaders=test_dataloader)
    return output
    
singer_map_df = pd.DataFrame(columns=["Num Points", "Mean RMSE"])
for num_points in N:
    rmse = []
    for i in range(runs):   
        print(f"Points to predict: {num_points}")
        print(f"Run number {i+1}")
        output = singer_map_prediction(window_sizes[0], num_points)
        rmse.append(output[0]["test_loss"])
    # get mean RMSE across 10 runs for each num_points value
    mean_rmse = statistics.mean(rmse)
    singer_map_df.loc[len(singer_map_df.index)] = [int(num_points), mean_rmse]
singer_map_df.style 

In [None]:
def cubic_map_prediction(window_size, num_points):
    
    points = 100
    data = simulate(model=cubic_map, num_gens=1, rate_min=1, rate_max=4, num_rates=100, num_discard=100)
    if type(data) != np.ndarray:
        data = data.to_numpy().flatten()
    
    # split sine wave into inputs and targets
    inputs = []
    targets = []
    for i in range(len(data)):  
        first_index = i + window_size
        last_index = first_index + num_points
        if last_index > len(data) - 1:
            break # prevent "index is out of range" error
        inputs.append(data[i:first_index])
        targets.append(data[first_index:last_index])
    inputs = np.array(inputs)
    targets = np.array(targets)

    # train test split
    split_ratio = int(0.7*points)
    x_train = inputs[:split_ratio]
    x_test = inputs[split_ratio:]
    y_train = targets[:split_ratio]
    y_test = targets[split_ratio:]

    # create dataloaders
    train_data = timeseries(x_train, y_train)
    test_data = timeseries(x_test, y_test)
    train_dataloader = DataLoader(train_data) 
    test_dataloader = DataLoader(test_data)
    
    # train and test
    model = MLP(window_size, num_points)
    trainer = pl.Trainer(max_epochs=10)
    trainer.fit(model, train_dataloader)
    output = trainer.test(test_dataloaders=test_dataloader)
    return output
    
cubic_map_df = pd.DataFrame(columns=["Num Points", "Mean RMSE"])
for num_points in N:
    rmse = []
    for i in range(runs):   
        print(f"Points to predict: {num_points}")
        print(f"Run number {i+1}")
        output = cubic_map_prediction(window_sizes[0], num_points)
        rmse.append(output[0]["test_loss"])
    # get mean RMSE across 10 runs for each num_points value
    mean_rmse = statistics.mean(rmse)
    cubic_map_df.loc[len(cubic_map_df.index)] = [int(num_points), mean_rmse]
cubic_map_df.style 

In [None]:
# plotting mean RSME against number of points

fig = plt.figure()
fig.set_figwidth(10)
plt.plot(sine_wave_df["Num Points"], sine_wave_df["Mean RMSE"],label="sine wave")
plt.plot(noisy_sine_wave_df["Num Points"], noisy_sine_wave_df["Mean RMSE"],label="noisy sine wave")
plt.plot(logistic_map_df["Num Points"], logistic_map_df["Mean RMSE"],label="logistic map")
# singer map currently having inf/nan loss so it doesn't appear on the graph
plt.plot(singer_map_df["Num Points"], singer_map_df["Mean RMSE"],label="singer map")
plt.plot(cubic_map_df["Num Points"], cubic_map_df["Mean RMSE"],label="cubic map")
plt.xlabel("Number of points predicted")
plt.ylabel("Mean RMSE across 10 runs")
plt.legend()
plt.show()

In [None]:
# window size of 5
sine_wave_5_df = sine_wave_df

# window size of 10
sine_wave_10_df = pd.DataFrame(columns=["Num Points", "Mean RMSE"])
for num_points in N:
    rmse = []
    for i in range(runs):   
        print(f"Points to predict: {num_points}")
        print(f"Run number {i+1}")
        output = sine_wave_prediction(window_sizes[1], num_points)
        rmse.append(output[0]["test_loss"])
    # get mean RMSE across 10 runs for each num_points value
    mean_rmse = statistics.mean(rmse)
    sine_wave_10_df.loc[len(sine_wave_10_df.index)] = [int(num_points), mean_rmse]
    

In [None]:
# plotting effect of window size on mean RMSE
# should add more window sizes here :)

plt.plot(sine_wave_5_df["Num Points"], sine_wave_5_df["Mean RMSE"],label="5")
plt.plot(sine_wave_10_df["Num Points"], sine_wave_10_df["Mean RMSE"],label="10")
plt.xlabel("Number of points predicted")
plt.ylabel("Mean RMSE across 10 runs")
plt.legend()
plt.show()