In [None]:
# Graph  and model configurations: 

"""
This list contains configurations for generating and training a graph from a TSSB time series.
"""

Config = {  
    "graph": {
        "dataset_path": "datasets/TSSB/", # path to the TSSB dataset. can be downloaded from: https://github.com/ermshaua/time-series-segmentation-benchmark/tree/main/tssb/datasets
        "type": "Dual_VG",  # Type of graph (MTF, VG, Dual_VG, OPTN)
        "MTF": {
            "num_bins": "auto"  # Number of bins for MTF graph (integer or "auto")
        },
        "VG": {
            "edge_type": "natural",  # Type of edge calculation for VG graph (natural or horizontal)
            "distance": 'distance',  # Type of distance metric for VG graph (slope, abs_slope, distance, h_distance, v_distance, abs_v_distance)
            "edge_dir": "directed",   # Directionality of edges in VG graph (undirected or directed)
        },    
        "OPTN": {
            "dx": "10",
            "taux":"2",
        },
    },
    
    "model": {
        "SEED": 820,  # Random seed for reproducibility
        "learning_rate": 0.05,  # Learning rate for training
        "batch_size": 64,  # Batch size for training
        "range_epoch": 200,  # Number of training epochs
        "save_file": "test_test",  # File name for saving trained model
        "name_of_save": "test_u-time",  # Name of the save (e.g., checkpoint name)
        "patience": 500,  # Patience for early stopping
        "train/val/test": {
            "train": 0.8,  # Percentage of data used for training
            "val": 0.0,  # Percentage of data used for validation
            "test": 0.2, # Percentage of data used for testing
        }
    }
}

In [1]:
import torch
torch.cuda.is_available()

True

In [3]:
import numpy as np
import pytorch_lightning as pl
from pytorch_lightning.callbacks import EarlyStopping
from pytorch_lightning.callbacks import ModelCheckpoint
from dvclive.lightning import DVCLiveLogger
from torch_geometric.loader import DataLoader
from torch_geometric.nn import GATConv 
from torch.utils.checkpoint import checkpoint

import torch.nn.functional as F
from torch.nn import CrossEntropyLoss
from sklearn.metrics import confusion_matrix, f1_score, accuracy_score
from sklearn.metrics import classification_report
from pyts.image import MarkovTransitionField
import matplotlib.pyplot as plt
import glob
import pandas as pd
import ordpy
import seaborn as sns
from csv import writer

from sklearn.neighbors import kneighbors_graph
from sklearn.neighbors import radius_neighbors_graph
from sklearn.utils import class_weight

import os
import random
import networkx as nx
from PIL import Image
from pyts.image import MarkovTransitionField
from ts2vg import NaturalVG
from ts2vg import HorizontalVG

from torch_geometric.data import Data



class GAT(pl.LightningModule):
    def __init__(self, class_weights,num_classes,Config):
        """
        Initializes the GAT model.

        Args:
            class_weights: The class weights used for training.
        """
        super(GAT, self).__init__()
        
        self.class_weights = class_weights
        self.Config = Config
        
        self.conv1 = GATConv(1, 32, heads=4)
        self.conv2 = GATConv(4 * 32, 32, heads=4)
        self.conv3 = GATConv(4 * 32, 32, heads=8)
        self.conv4 = GATConv(8 * 32, num_classes, heads=6,concat=False)
        self.outputs = []

    def forward(self, data):
        """
        Performs the forward pass of the model.

        Args:
            data: The input data.

        Returns:
            x: The output of the model.
        """
        x, edge_index, edge_weight = data.x, data.edge_index, data.edge_attr

        x = F.elu(self.conv1(x, edge_index, edge_weight))
        x = F.elu(self.conv2(x, edge_index, edge_weight))
        x = F.elu(self.conv3(x, edge_index, edge_weight))
        x = self.conv4(x, edge_index, edge_weight)
        return x

    def configure_optimizers(self):
        """
        Configures the optimizer for training the model.

        Returns:
            optimizer: The configured optimizer.
        """
        optimizer = torch.optim.Adam(self.parameters(), lr=self.Config["model"]["learning_rate"], weight_decay=5e-4)
        return optimizer
    
    def training_step(self, data, batch_idx):
        """
        Performs a single training step on the given batch of train data.

        Args:
            data: Input data for the training step.
            batch_idx: Index of the current batch.

        Returns:
            train_loss: Loss value for the training step.
        """
        out = self(data)
        loss_function = CrossEntropyLoss(weight=self.class_weights).to(self.device) 
        train_loss = loss_function(out[data.train_mask], data.y[data.train_mask].squeeze().to(torch.int64))
        
        return train_loss

    def test_step(self, data, batch_idx):
        """
        Performs a forward pass on the model to obtain predictions for the test data. It calculates the test loss, accuracy, and collects the true labels and predicted labels for later evaluation.

        Args:
            data: Test data for the current batch.
            batch_idx: Index of the current batch.

        Returns:
            pred: Predicted labels for the test data.
            y: True labels for the test data.
        """
        out = self(data)
        loss_function = CrossEntropyLoss(weight=self.class_weights).to(self.device)#weight=self.class_weights
    
        test_loss = loss_function(out[data.test_mask], data.y[data.test_mask].squeeze().to(torch.int64))

        ys, preds = [], []
        test_label = data.y[data.test_mask].cpu()
        ys.append(data.y[data.test_mask])
        preds.append((out[data.test_mask].argmax(-1)).float().cpu())

        y, pred = torch.cat(ys, dim=0), torch.cat(preds, dim=0)
        pred = pred.reshape(-1, 1)
        accuracy = (pred == test_label).sum() / pred.shape[0]

        self.log("test_loss", test_loss)
        self.log("test_acc", accuracy)
        self.outputs.append([pred, y.squeeze()])
        return pred, y.squeeze()


    def on_test_epoch_end(self):
        """
        Test epoch end function.

        This function receives accumulated predicted and true labels from the test_step and uses them on the confusion matrix and classification report.

        Args:
            pred: Predicted labels for the test data.
            y: True labels for the t
        global true_array, pred_array
        true_array = np.concatenate([outputest data.
        
        Returns:
            prints a confusion matrix and a classification report

        """
        global true_array, pred_array
        outputs = self.outputs
        true_array = np.concatenate([output[1].cpu().numpy() for output in outputs], axis=0)
        pred_array = np.concatenate([output[0].cpu().numpy() for output in outputs], axis=0)

        print(confusion_matrix(true_array, pred_array))
        print(classification_report(true_array, pred_array))
        f1 = f1_score(true_array, pred_array, average='weighted')
        acc = accuracy_score(true_array, pred_array, normalize=True, sample_weight=None)
        
        report = classification_report(true_array, pred_array, output_dict=True)
        df = pd.DataFrame(report).transpose()
        df.to_csv('nvg5/'+ts_name+".csv")
    

In [4]:
def get_TSSB(name_of_X):
    """
    Gets the X and mask of a chosen TSSB time series

    Args:
        name_of_X: name of the chosen TS we want and its mask
    
    Returns:
       _X: a 1D array containing the time steps of the chosen TSSB TS
       mask: a 1D array containing the mask for the chosen TSSB TS
    """
    
    desc_file = os.path.join(Config["graph"]["dataset_path"], "desc.txt")
    ts_file = os.path.join(Config["graph"]["dataset_path"], name_of_X)
    
    print(ts_file)
    data = {}
    
    # Read the description file and populate the data dictionary
    with open(desc_file) as f:
        for line in f:
            row = line.strip().split(",")
            name = row[0]
            values = [int(value) for value in row[2:]]
            data[name] = values
            
    # Load the time series data from the specified file and create an array of zeros with the same length as _X
    _X = np.loadtxt(ts_file)
    mask = np.zeros(len(_X))
    
    # Check if the name_of_X (without the file extension) is in the data dictionary
    if name_of_X[:-4] in data:
        for value in data[name_of_X[:-4]]:
            if value < len(mask):
                mask[value] = 1

    return _X, mask


def transform_mask(_X, _mask):
    
    """
    Transforms the mask from the shape of, for example, [0,0,1,0,0,0,1,0,0,1,0,1,0,0,0] to [0,0,1,1,1,1,2,2,2,3,3,4,4,4,4] and repairs it to [0,0,1,1,1,1,2,2,2,3,3,1,1,1,1] if 1 and 4 are the same segments

    Args:
        _X: a 1D array containing time series, used to check if _X is repeating anywhere inside it
        _mask: a 1D array that will be converted from 1 and 0 to a multi-label mask
    
    Returns:
        new_mask: a 1D array where the mask is converted from 1 and 0 to a multi-label mask, mostly used for time series segmentation
    """
    
    current_group = 0
    group_indices = []

    # Iterate over the elements in the _mask array
    for i in range(len(_mask)):
        if _mask[i] == 1:
            current_group += 1
        group_indices.append(current_group)
    new_mask = np.array(group_indices)

    unique_values = np.unique(new_mask)
    total_unique_values = np.arange(len(unique_values))
    
    # Iterate over the unique values in the new_mask array
    for i in unique_values:
        for j in range(i, unique_values[-1] + 1):
            is_equal = np.all(_X[new_mask == i][:100] == _X[new_mask == j][:100])
            if is_equal:
                total_unique_values[j] = total_unique_values[i]
                
    # Update the new_mask array using the total_unique_values array
    for i in unique_values:
        new_mask[new_mask == i] = total_unique_values[i]

    true_unique_values = np.unique(new_mask)
    true_total_unique_values = np.arange(len(true_unique_values))
    
    # Update the new_mask array with the true_total_unique_values array
    for i in true_total_unique_values:     
        mask_index = new_mask == true_unique_values[i]
        new_mask[mask_index] = int(i)
        
    return new_mask.reshape(1, -1)


def get_matrix(X_current):
    
    """
    This function gets the adjacency matrix through either visibility, MTF, or dual VG graph

    Args:
        X_current: a 1D array usually containing time series values
    
    Returns:
        adj_mat: a list of adjacency matrices
    """
    
    # Check the graph type specified in the Config and perform the corresponding operations
    if Config["graph"]["type"] in ("VG", "Dual_VG"):
        VGConfig = Config["graph"]["VG"]
    
        # Create an instance of the visibility graph class based on the edge type specified
        if VGConfig["edge_type"] == "natural":
            
            g = NaturalVG(weighted = 'distance')
        
        elif VGConfig["edge_type"] == "horizontal":
            
            g = HorizontalVG(weighted='distance') 
          
        # Build the visibility graph using the provided time series
        g.build(X_current)
        adj_mat = g.adjacency_matrix()

    
    elif Config["graph"]["type"] == "MTF":
        n_bins = Config["graph"]["MTF"]["num_bins"]
        if n_bins == "auto":
            n_bins = int(len(X_current)/2)

        # Create and compute an instance of the Markov Transition Field class
        MTF = MarkovTransitionField(n_bins=n_bins, strategy='quantile') # quantile/uniform/normal 
        X_gaf_MTF_temp = MTF.fit_transform(X_current.reshape(1, -1))
        adj_mat = X_gaf_MTF_temp[0]

    elif Config["graph"]["type"] == "OPTN":
        dx = int(Config["graph"]["OPTN"]["dx"])   
        taux = int(Config["graph"]["OPTN"]["taux"])

        nodes, edges, edge_weights = ordpy.ordinal_network(X_current, dx=dx, taux=taux)
        #print("Nodes:", nodes)
        #print("Edges:", edges)
        #print("Edge Weights:", edge_weights)
        
        node_to_idx = {node: idx for idx, node in enumerate(nodes)}
        
        adj_mat = np.zeros((len(nodes), len(nodes)))
        
        for i, (src, dst) in enumerate(edges):
            src_idx = node_to_idx[src]
            dst_idx = node_to_idx[dst]
            adj_mat[src_idx][dst_idx] = edge_weights[i]
    
    return adj_mat    


def adjToEdgidx(adj_mat):
    
    """
    This function creates edge indexes and weights for a given matrix
    
    Args:adj
        adj_mat: a 2D arrayadj

    Returns:
        edge_index: a 2D torch array that indicates the connected values
        edge_weight: a 1D array of weights that represent the absolute distance between connected nodes or values in the time series
    """
    
    edge_index = torch.from_numpy(adj_mat).nonzero().t().contiguous()
    row, col = edge_index
    edge_weight = adj_mat[row, col]
    return edge_index, edge_weight

def adjToEdgidx_Dual_VG(X_current):
    
    """
    Creates a dual visibility graph by first creating a directed VG from one side and then flipping and running the get_matrix function again.
    By doing this, we join these two graphs and obtain a dual VG.

    Args:
        X_current: 1D array usually containing time series values

    Returns:
        edge_index: 2D torch array that defines the connected values
        edge_weight: 2D array of weights that represent the absolute distance between every node or value in the time series
    """
    
    pos_adj_mat_vis = get_matrix(X_current)
    neg_adj_mat_vis = get_matrix(-X_current)
    edge_index = torch.from_numpy(pos_adj_mat_vis + neg_adj_mat_vis).nonzero().t().contiguous()

    # Join two edge_weight arrays
    row, col = edge_index
    edge_weight = np.zeros([len(row), 2], dtype='float')
    edge_weight[:, 0] = pos_adj_mat_vis[row, col]
    edge_weight[:, 1] = neg_adj_mat_vis[row, col]

    return edge_index, edge_weight


def create_mask(train, val, test, max_size):
    
    """
    Generates masks for train, validation, and test sets based on specified percentages and a maximum size.

    Args:
        train: float value representing the percentage of the train set
        val: float value representing the percentage of the validation set
        test: float value representing the percentage of the test set
        max_size: integer representing the maximum size of the masks

    Returns:
        train_mask: boolean array indicating the train set
        val_mask: boolean array indicating the validation set
        test_mask: boolean array indicating the test set
    """
    
    if train + val + test != 1:
        print("train, val, and test do not add up to 1")
    else:
        random.seed(Config['model']['SEED'])
        percentages = [train, val, test]  # Percentage of each value
        values = [1, 2, 3]  # Values to use
        counts = [int(max_size * p) for p in percentages]  # Count of each value
        counts[-1] += max_size - sum(counts)  # Adjust for rounding errors

        lst = []
        for i, count in enumerate(counts):
            lst.extend([values[i]] * count)

        random.shuffle(lst)

        train_mask = np.array([x == 1 for x in lst])
        val_mask = np.array([x == 2 for x in lst])
        test_mask = np.array([x == 3 for x in lst])

        return train_mask, val_mask, test_mask

def create_graph(X, mask):
    
    """
    Creates a graph in the torch geometric Data format, containing the node values x, mask values for training, testing, and validation, edge indexes, and edge attributes.

    Args:
        output: Dataset of multiple graphs (optional). New graph will be appended to this dataset.
        X: Node values (integer).
        mask: 1D array representing the mask.

    Returns:
        output: Updated dataset of multiple or singular graph.
    """
    
    if Config["graph"]["type"] in ("VG", "MTF", "OPTN"):
        edge_index, edge_weight = adjToEdgidx(get_matrix(X))
    elif Config["graph"]["type"] == "Dual_VG":
        edge_index, edge_weight = adjToEdgidx_Dual_VG(X)


    x = torch.unsqueeze(torch.tensor(X, dtype=torch.double), 1).clone().detach()
    edge_index = edge_index.clone().detach().to(torch.int64)
    edge_attr = torch.unsqueeze(torch.tensor(edge_weight, dtype=torch.double), 1).clone().detach()
    y = torch.unsqueeze(torch.tensor(mask[0], dtype=torch.double), 1)

    train_mask, val_mask, test_mask = create_mask(Config["model"]["train/val/test"]["train"], Config["model"]["train/val/test"]["val"], Config["model"]["train/val/test"]["test"], len(X))
    train_mask = torch.tensor(train_mask, dtype=torch.bool)
    val_mask = torch.tensor(val_mask, dtype=torch.bool)
    test_mask = torch.tensor(test_mask, dtype=torch.bool)

    output = Data(x=x, train_mask=train_mask, val_mask=val_mask, test_mask=test_mask, edge_index=edge_index, edge_attr=edge_attr, y=y)
    return output


In [5]:
from tssb.utils import load_time_series_segmentation_datasets

list_data = ["Adiac",
"ArrowHead",
"Beef",
"BeetleFly",
"BirdChicken",
"Car",
"CBF",
"ChlorineConcentration",
"CinCECGTorso",
"Coffee",
"Computers",
"CricketX",
"CricketY",
"CricketZ",
"DiatomSizeReduction",
"DistalPhalanxOutlineAgeGroup",
"DistalPhalanxTW",
"ECG200",
"ECGFiveDays",
"FaceAll",
"FaceFour",
"FacesUCR",
"FiftyWords",
"Fish",
"GunPoint",
"Haptics",
"InlineSkate",
"InsectWingbeatSound",
"ItalyPowerDemand",
"LargeKitchenAppliances",
"Lightning2",
"Lightning7",
"Mallat",
"Meat",
"MedicalImages",
"MoteStrain",
"NonInvasiveFetalECGThorax1",
"NonInvasiveFetalECGThorax2",
"OliveOil",
"OSULeaf",
"Plane",
"ProximalPhalanxOutlineAgeGroup",
"ProximalPhalanxTW",
"ShapesAll",
"SonyAIBORobotSurface1",
"SonyAIBORobotSurface2",
"SwedishLeaf", 
"Symbols",
"SyntheticControl",
"ToeSegmentation1",
"ToeSegmentation2",
"Trace",
"TwoLeadECG",
"UWaveGestureLibraryAll",
"UWaveGestureLibraryX",
"UWaveGestureLibraryY",
"UWaveGestureLibraryZ",
"WordSynonyms",
"Yoga",
]

tssb = load_time_series_segmentation_datasets(names=list_data)

In [None]:
# initiate callback functions, DVC, Seed and device

early_stop = EarlyStopping(monitor='val_loss',patience=Config["model"]["patience"], strict=False,verbose=False, mode='min')
val_checkpoint_best_acc = ModelCheckpoint(filename="best_acc", monitor = "val_acc", mode="max")
val_checkpoint_best_loss = ModelCheckpoint(filename="best_loss", monitor = "val_loss", mode="min")
logger = DVCLiveLogger(run_name = Config["model"]["name_of_save"])    


torch.manual_seed(Config['model']['SEED'])
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
Config["model"]["range_epoch"]= 1500 
Config["model"]["learning_rate"]= 0.0005 


for _, (ts_name, window_size, cps, ts) in tssb.iterrows(): 
    mask = np.zeros((len(ts)), dtype="int")

    for i in range(len(cps)):
        if len(cps) == 1:
            mask[0:cps[i]] = 0
            mask[cps[i]:] = 1
        else:
            if i == 0:
                mask[0:cps[i]] = i

            elif i == len(cps) - 1 and len(cps) > 1:
                mask[cps[i-1]:cps[i]] = i
                mask[cps[i]:] = i+1
 
            else:
                mask[cps[i-1]:cps[i]] = i
    class_weights = torch.tensor(class_weight.compute_class_weight(class_weight='balanced',
                                                               classes=np.unique(mask),
                                                               y=mask)) 
    dataset = create_graph(ts, mask.reshape(1, -1))
    output, class_weights = dataset, class_weights
    data =  output
    #create a loader; only one is needed as train and test masks are used
    loader = DataLoader([output], batch_size=1, shuffle=False)
    torch.cuda.empty_cache()
    # initializes the model
    model = GAT(class_weights,(len(cps)+1),Config).double()

    #traines the model arhitecture
    trainer = pl.Trainer(logger=logger, max_epochs = Config["model"]["range_epoch"], precision='bf16-mixed', callbacks=[val_checkpoint_best_acc,val_checkpoint_best_loss,early_stop],accelerator='gpu',devices=1)
    trainer.fit(model, loader)

    #tests the model arhitecture and prints the results
    tester = pl.Trainer(accelerator='gpu',devices=1)
    tester.test(model, loader)
    