In [10]:
from torch.utils.data import DataLoader, Dataset
from torchinfo import summary
import torch.optim as optim
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import train_test_split
import torch
import torch.nn as nn
from tqdm import tqdm
import pandas as pd
import os
import numpy as np
import itertools
import matplotlib.pyplot as plt
import random
from datetime import datetime
import numpy as np
import matplotlib.pyplot as plt
import random
import os
import pandas as pd
import keras
from abc import ABC, abstractmethod
from keras.datasets import cifar10
from typing import Tuple
import numpy as np
from keras import Model
from keras.optimizers import Adam
import matplotlib.pyplot as plt
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Input, Flatten, Dense, Conv2D, MaxPooling2D

In [11]:
def find_stabilization_point(
    sequence,
    window=5,
    slope_threshold=0.005,
    curvature_threshold=0.05,
    patience=1,
    oscillation_tolerance=0.001,
    increasing_trend_threshold=0.01,
    flat_change_threshold=0.003 
):
    if len(sequence) < window * 2:
        return float("inf")

    # Smooth the sequence using a moving average
    smoothed_sequence = np.convolve(sequence, np.ones(window) / window, mode='valid')

    # Calculate slopes and curvatures
    slopes = np.diff(smoothed_sequence) / smoothed_sequence[:-1]
    curvatures = np.diff(slopes)

    stabilization_count = 0
    for i in range(len(slopes) - window):
        recent_slopes = slopes[i: i + window]
        recent_curvatures = curvatures[i: i + window - 1]

        # Stabilization conditions
        is_stabilized = (
            np.all(np.abs(recent_slopes) < slope_threshold) and
            np.all(np.abs(recent_curvatures) < curvature_threshold)
        )

        # Oscillation detection
        recent_values = smoothed_sequence[i: i + window]
        oscillation_range = np.ptp(recent_values)
        is_oscillating = oscillation_range < oscillation_tolerance

        # Increasing trend detection
        has_increasing_trend = np.all(recent_slopes > increasing_trend_threshold)

        # Flat change detection
        flat_change = np.abs(smoothed_sequence[i + window - 1] - smoothed_sequence[i]) < flat_change_threshold

        if is_stabilized or is_oscillating or flat_change:
            stabilization_count += 1
            if stabilization_count >= patience:
                return i + window
        elif has_increasing_trend:
            return i + window
        else:
            stabilization_count = 0

    return float("inf")

class FloatSequenceTransformer(nn.Module):
    def __init__(self, embedding_dim=8, num_heads=1, num_layers=1, dropout=0.2):
        super(FloatSequenceTransformer, self).__init__()
        
        self.embedding = nn.Linear(1, embedding_dim)
        self.embedding_dim = embedding_dim
        self.dropout = dropout
        self.num_heads = num_heads
        self.num_layers = num_layers
        

        transformer_layer = nn.TransformerEncoderLayer(d_model=embedding_dim, nhead=num_heads, 
                                                       batch_first=True, dropout=dropout)
        self.transformer = nn.TransformerEncoder(transformer_layer, num_layers=num_layers)
        
        self.output_layer = nn.Linear(embedding_dim, 1)  # NO UNCERTAINTY
        #self.output_layer = nn.Linear(embedding_dim, 2)

    def generate_positional_encoding(self, seq_length, device):
        position = torch.arange(seq_length, device=device).unsqueeze(1).float()
        div_term = torch.exp(torch.arange(0, self.embedding_dim, 2, device=device).float() * -(np.log(10000.0) / self.embedding_dim))
        pe = torch.zeros(seq_length, self.embedding_dim, device=device)
        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)
        return pe
    
    def forward(self, x, key_padding_mask):
        seq_length = x.size(1)
        x = self.embedding(x)
        positional_encoding = self.generate_positional_encoding(seq_length, x.device)
        x = x + 0.1 * positional_encoding # maybe positional encoding too heavily weighted!
    
        # Pass the 2D key_padding_mask
        x = self.transformer(x, src_key_padding_mask=key_padding_mask)
        x = self.output_layer(x)
        #mean, log_var = x[:, -1, 0], x[:, -1, 1]
        #return mean, log_var
        # NO UNCERTAINITY
        return  x[:, -1, :]

    def __str__(self):
        current_date = datetime.now().strftime("%d-%m-%Y")
        s = (f"transformer-model_emb{self.embedding_dim}_dropout{self.dropout}_layers{self.num_layers}_"
             f"heads{self.num_heads}_date{current_date}")
        return s

def predict_next_value(model, sequence):
    model.eval()
    with torch.no_grad():
        # Prepare the input sequence
        input_sequence = torch.tensor(sequence, dtype=torch.float32).unsqueeze(-1)  # Shape: (seq_length, 1)
        padded_input = torch.zeros(input_length, 1)  # Padding to fixed length
        padded_input[:len(sequence)] = input_sequence  # Copy sequence into padded tensor

        # Create the key_padding_mask
        key_padding_mask = torch.full((1, input_length), True, dtype=torch.bool)  # All True initially
        key_padding_mask[0, :len(sequence)] = False  # False for valid positions

        # Make the prediction
        padded_input = padded_input.unsqueeze(0)  # Add batch dimension: (1, input_length, 1)
        
        #mean, log_var = model(padded_input, key_padding_mask)
        #uncertainty = torch.sqrt(torch.exp(log_var)).item()
        #return mean.item(), uncertainty
        # NO UNCERTAINTY
        prediction = model(padded_input, key_padding_mask)
        return prediction.item()

# test uncertainty of model
def predict_next_value_with_uncertainty(model, sequence, num_samples=60):
    model.train()  # Set the model to training mode (this keeps dropout active)
    predictions = []

    # Prepare the input sequence
    input_sequence = torch.tensor(sequence, dtype=torch.float32).unsqueeze(-1)  # Shape: (seq_length, 1)
    padded_input = torch.zeros(input_length, 1)  # Padding to fixed length
    padded_input[:len(sequence)] = input_sequence  # Copy sequence into padded tensor

    # Create the key_padding_mask
    key_padding_mask = torch.full((1, input_length), True, dtype=torch.bool)  # All True initially
    key_padding_mask[0, :len(sequence)] = False  # False for valid positions

    # Make the prediction
    padded_input = padded_input.unsqueeze(0)  # Add batch dimension: (1, input_length, 1)

    with torch.no_grad():  # Disable gradient computation during inference
        for _ in range(num_samples):
            # Perform forward pass with dropout enabled
            prediction = model(padded_input, key_padding_mask)
            predictions.append(prediction.item())  # Store the predictions (detached from the computation graph)
    
    predictions = np.array(predictions)
    mean_prediction = predictions.mean(axis=0)  # Mean prediction across samples
    std_prediction  = predictions.std(axis=0)  # Standard deviation as uncertainty
    
    return mean_prediction, std_prediction

class TaskInterface(ABC):

    @abstractmethod
    def __init__(self):
        self.history = {'loss': []}
        self.batches = []
        self.lrs = []
        self.batch_size = None
        self.learning_rate = None
        self.model = None

    @abstractmethod
    def load_data(self):
        """
        Load dataset and split into training and test sets
        :return: tuple of (X_train, y_train), (X_test, y_test)
        """
        pass

    @abstractmethod
    def create_model(self) -> Model:
        """
        Initializes and returns a compiled Keras model.

        Returns:
            Model: A compiled Keras model instance.
        """
        pass

    @abstractmethod
    def train(self, **args):
        pass

    @abstractmethod
    def evaluate(self, **args):
        pass

    @abstractmethod
    def save(self):
        pass

    @abstractmethod
    def load_model(self, **args):
        pass

class CIFAR10task(TaskInterface):

    def __init__(self, save_path):
        super().__init__()
        self.model = None
        self.x_train = None
        self.x_test = None
        self.y_train = None
        self.y_test = None
        self.batch_size = None
        self.learning_rate = None
        self.save_path = save_path
        self.batches = [8 * i for i in range(1, 17)]
        self.lrs = [0.001 * i for i in range(1, 11)]
        self.history = {}

    def load_data(self):
        (x_train, y_train), (x_test, y_test) = cifar10.load_data()
        (x_train, y_train) = (x_train[:len(x_train)//3], y_train[:len(x_train)//3])
        self.x_train = x_train.astype("float32") / 255.0
        self.x_test = x_test.astype("float32") / 255.0
        self.y_train = keras.utils.to_categorical(y_train, 10)
        self.y_test = keras.utils.to_categorical(y_test, 10)
        print(f"Loaded CIFAR10 dataset\nShape X: {self.x_train.shape}\nShape Y: {self.y_train.shape}")

    def evaluate(self, test_data=True):
        results = self.model.evaluate(self.x_test, self.y_test, verbose=0) if test_data else self.model.evaluate(self.x_train, self.y_train, verbose=0)
        return results

    def train(self, epochs, callbacks=None):
        history = self.model.fit(self.x_train,
                                 self.y_train,
                                 epochs=epochs,
                                 batch_size=self.batch_size,
                                 verbose=0,
                                 callbacks=callbacks,
                                 validation_data=(self.x_test, self.y_test))
        for key in history.history:
            if key not in self.history:
                self.history[key] = []
            self.history[key].extend(history.history[key])

    def plot_metrics(self):
        # Plot accuracy
        plt.figure(figsize=(12, 5))

        plt.subplot(1, 2, 1)
        plt.plot(self.history['accuracy'], label='Training Accuracy')
        if 'val_accuracy' in self.history:
            plt.plot(self.history['val_accuracy'], label='Validation Accuracy')
        plt.title('Model Accuracy')
        plt.xlabel('Epoch')
        plt.ylabel('Accuracy')
        plt.legend()

        # Plot loss
        plt.subplot(1, 2, 2)
        plt.plot(self.history['loss'], label='Training Loss')
        if 'val_loss' in self.history:
            plt.plot(self.history['val_loss'], label='Validation Loss')
        plt.title('Model Loss')
        plt.xlabel('Epoch')
        plt.ylabel('Loss')
        plt.legend()

        plt.tight_layout()

        # Save the plot to a file
        img_path = f"MNIST-lr{self.learning_rate}-bs{self.batch_size}.png"
        plt.savefig(img_path)

        # Display the plot
        plt.show()

    def create_model(self):
        model = Sequential([
            Conv2D(16, (3, 3), activation='relu', input_shape=(32, 32, 3)),
            MaxPooling2D((2, 2)),
            Conv2D(32, (3, 3), activation='relu'),
            MaxPooling2D((2, 2)),
            Flatten(),
            Dense(64, activation='relu'),
            Dense(10, activation='softmax')
        ])
        model.compile(optimizer=Adam(self.learning_rate), loss="categorical_crossentropy", metrics=["accuracy"])
        self.model = model

    def save(self):
        self.model.save(self.save_path)

    def load_model(self, load_path):
        self.model = keras.models.load_model(load_path)

In [15]:
p = "models/cnns_cifar10_categorical/transformer-model_emb8_dropout0.2_layers1_heads1_date27-12-2024.pth"
model = FloatSequenceTransformer() 
model.load_state_dict(torch.load(p))
task = CIFAR10task("./test.model")
task.load_data()

Loaded CIFAR10 dataset
Shape X: (16666, 32, 32, 3)
Shape Y: (16666, 10)
