In [None]:
!mamba install tensorflow-gpu==2.11.0 -y -q
!mamba install pytorch-cuda=11.6 -c pytorch -c conda-forge -c nvidia -y -q
!mamba install -c conda-forge pyts==0.12.0 -y -q
!pip install torch==1.13 torchvision torchaudio --extra-index-url https://download.pytorch.org/whl/cu116 -q
!pip install torch-scatter torch-sparse torch-cluster torch-spline-conv torch-geometric -f https://data.pyg.org/whl/torch-1.13.1+cu116.html -q
!pip install ts2vg==1.2.1 -q
!pip install pytorch_lightning==1.9.1 -q
!pip install torchsummary==1.5.1 -q
!pip install dvclive==2.0.2 -q
!pip install pyedflib -q
!pip install mne -q

In [2]:
import pandas as pd
import numpy as np
import torch
import warnings

from pyts.image import MarkovTransitionField

from ts2vg import NaturalVG
from ts2vg import HorizontalVG

from torch_geometric.data import Data

def get_Rutgers():
    """
    Gets X, labels Y for graph clasification, and mask of Y for every sample for node classification, of a chosen Rutgers time series

    Affected by:
        Config["graph"]["len_type"]
        Config["graph"]["data_path"]
        Config["graph"]["folder_name"]
        Config["graph"]["properties_name"]
        Config["graph"]["mask_name"]
        
    Returns:
       _X: a 2D array containing the time steps of the rutgers dataset
       mask: a 2D array containing the mask for node classification for the rutgers dataset
       Y: a 1D array containing the mask for graph classification for the rutgers dataset
    """
    # for the path containing time series, all with the same number of samples
    if Config["graph"]["len_type"] == "un/cut":
        df = pd.read_csv(Config["graph"]["data_path"] + Config["graph"]["folder_name"])  
        del df['Unnamed: 0']
        df.index, df.columns = [range(df.index.size), range(df.columns.size)]
        length_rss = int((df.columns.stop-2)/2)
        
        X = df.loc[:,df.columns[:length_rss]].to_numpy()
        Y = df[length_rss+1].to_numpy(dtype=np.uint8)
        mask = df.loc[:,df.columns[length_rss+2:]].to_numpy()
        
    # for the path containing time series, with diferent number of samples
    elif Config["graph"]["len_type"] == "random":
        dataset_rss = np.load(Config["graph"]["data_path"] + Config["graph"]["folder_name"], allow_pickle=True)['arr_0']
        dataset_properties = np.load(Config["graph"]["data_path"] + Config["graph"]["properties_name"], allow_pickle=True)['arr_0']
        dataset_mask = np.load(Config["graph"]["data_path"] + Config["graph"]["mask_name"], allow_pickle=True)['arr_0']

        for i in range(len(dataset_properties)):
            dataset_properties[i,1] = int(dataset_properties[i,1])
        
        X = dataset_rss 
        mask = dataset_mask 
        Y = dataset_properties[:,2] 
        
    return X, mask, Y

def get_matrix(X_current):
    """
    This function gets the adjacency matrix through either visibility, MTF, or dual VG graph
    
    Affected:
        Config["graph"]["type"]
        Config["graph"]["VG"]["edge_type"]
        Config["graph"]["VG"]["edge_dir"]
        Config["graph"]["MTF"]["num_bins"]
        
    Args:
        X_current: a 1D array usually containing time series values
    
    Returns:
        adj_mat: a list of adjacency matrices
    """
    adj_mat = []
    
    # Check the graph type specified in the Config and perform the corresponding operations
    if Config["graph"]["type"] in ("VG", "Dual_VG"):
        VGConfig = Config["graph"]["VG"]
        
        # Create an instance of the visibility graph class based on the edge type specified
        if VGConfig["edge_type"] == "natural":
            g = NaturalVG(weighted=VGConfig["distance"])
        elif VGConfig["edge_type"] == "horizontal":
            g = HorizontalVG(weighted=VGConfig["distance"])

        # Build the visibility graph using the provided time series
        g.build(X_current)

        adj_mat_vis = np.zeros([len(X_current), len(X_current)], dtype='float')
        
        # Iterate over the edges of the visibility graph and assign weights to the corresponding positions in the adjacency matrix
        for x, y, q in g.edges:
            adj_mat_vis[x, y] = q
            if VGConfig["edge_dir"] == "undirected":
                adj_mat_vis[y, x] = q
        
        adj_mat.append(adj_mat_vis)
        
    elif Config["graph"]["type"] == "MTF":
        n_bins = Config["graph"]["MTF"]["num_bins"]
        if n_bins == "auto":
            n_bins = len(X_current)
        #is used to ignore some warnings as  the instance of MTF is being computed
        warnings.filterwarnings("ignore")
        
        # Create and compute an instance of the Markov Transition Field class
        MTF = MarkovTransitionField(n_bins=n_bins)
        X_gaf_MTF_temp = MTF.fit_transform(X_current.reshape(1, -1))
        adj_mat.append(X_gaf_MTF_temp[0])
    
    return adj_mat
    
def adjToEdgidx(adj_mat):
    """
    This function creates edge indexes and weights for a given matrix
    
    Args:
        adj_mat: a 2D array

    Returns:
        edge_index: a 2D torch array that indicates the connected values
        edge_weight: a 1D array of weights that represent the absolute distance between connected nodes or values in the time series
    """
    edge_index = torch.from_numpy(adj_mat[0]).nonzero().t().contiguous()
    row, col = edge_index
    edge_weight = adj_mat[0][row, col]
    return edge_index, edge_weight

def adjToEdgidx_Dual_VG(X_current):
    """
    Creates a dual visibility graph by first creating a directed VG from one side and then flipping and running the get_matrix function again.
    By doing this, we join these two graphs and obtain a dual VG.

    Args:
        X_current: 1D array usually containing time series values

    Returns:
        edge_index: 2D torch array that defines the connected values
        edge_weight: 2D array of weights that represent the absolute distance between every node or value in the time series
    """
    pos_adj_mat_vis = get_matrix(X_current)[0]
    neg_adj_mat_vis = get_matrix(-X_current)[0]
    edge_index = torch.from_numpy(pos_adj_mat_vis + neg_adj_mat_vis).nonzero().t().contiguous()

    # Join two edge_weight arrays
    row, col = edge_index
    edge_weight = np.zeros([len(row), 2], dtype='float')
    edge_weight[:, 0] = pos_adj_mat_vis[row, col]
    edge_weight[:, 1] = neg_adj_mat_vis[row, col]

    return edge_index, edge_weight
    
def create_graph(output, X, mask, Y):
    """
    Creates a graph in the torch geometric Data format, containing the node values x, mask values for training, testing, and validation, edge indexes, and edge attributes.

    Affected by:
        Config["graph"]["type"]
        Config["graph"]["classif"]
    
    Args:
        output: Dataset of multiple graphs (optional). New graph will be appended to this dataset.
        X: Node values (integer).
        mask: 1D array representing the mask.

    Returns:
        output: Updated dataset of multiple or singular graph.
    """
    if Config["graph"]["type"] in ("VG", "MTF"):
        edge_index, edge_weight = adjToEdgidx(get_matrix(X))
    elif Config["graph"]["type"] == "Dual_VG":
        edge_index, edge_weight = adjToEdgidx_Dual_VG(X)
    
    
    
    x = torch.unsqueeze(torch.tensor(X, dtype=torch.double), 1).clone().detach()
    edge_index = edge_index.clone().detach().to(torch.int64)
    edge_attr = torch.unsqueeze(torch.tensor(edge_weight, dtype=torch.double), 1).clone().detach()
    
    if Config["graph"]["classif"] == "graph": # for graph classification
        y = torch.tensor(Y, dtype=torch.long)
    elif Config["graph"]["classif"] == "node":# for node classification 
        y = torch.unsqueeze(torch.tensor(mask, dtype=torch.double),1)


    output.append(Data(x=x, edge_index=edge_index, edge_attr=edge_attr, y=y))
    return output

In [31]:
from sklearn.utils import class_weight
def Rutgers_graph():
    """
    Performs a pipeline of operations to process the Rutgers dataset for a given utility.

    Affected by:
        Config["graph"]

    Returns:
        output: A list containing the processed graph data.
        class_weights: Torch tensor containing the computed class weights.
    """
    # creates X, mask of lables and Y as a graph lable
    X, mask, Y = get_Rutgers()
    
    # here graphs are appended to the dataset
    dataset = []
    for i in range(len(X)):
        dataset = create_graph(dataset, X[i], mask[i],Y[i])
        
    # join all lables into a 1D array
    all_x = np.reshape(np.concatenate([obj.y for obj in dataset]), (-1,))
    
    class_weights = torch.tensor(class_weight.compute_class_weight(class_weight='balanced',
                                                                    classes=np.unique(all_x),
                                                                    y=all_x))
    if Config["graph"]["classif"] == "node":
        class_weights =torch.tensor([class_weights[1]/class_weights[0]])

    return dataset, class_weights

In [9]:
import pytorch_lightning as pl
import torch.nn.functional as F

from pytorch_lightning.callbacks import EarlyStopping
from pytorch_lightning.callbacks import ModelCheckpoint

from dvclive.lightning import DVCLiveLogger

from torch.nn import Sequential, BatchNorm1d, ReLU
from torch.nn import Linear, CrossEntropyLoss, BCEWithLogitsLoss

from torch_geometric.loader import DataLoader
from torch_geometric.nn import GINEConv, GATConv
from torch_geometric.nn import global_max_pool

from sklearn.metrics import confusion_matrix
from sklearn.metrics import classification_report

from torch import nn

"""
This is the definition of a model arhitecture created for graph classification using GINEConv layers
""" 
class GINE(nn.Module):
    def __init__(self):
        super().__init__()
        
        if Config["graph"]["type"] in ("MTF", "VG"):
            edge_dim = 1
        elif Config["graph"]["type"] in ("dual_VG"):
            edge_dim = 2

        dim_h = 32
        # Define GINEConv and Linear layers
        self.conv1 = GINEConv(
            Sequential(Linear(dim_h, dim_h),
                       BatchNorm1d(dim_h), ReLU(),
                       Linear(dim_h, dim_h), ReLU()), edge_dim=edge_dim)

        self.conv2 = GINEConv(
            Sequential(Linear(dim_h, dim_h), BatchNorm1d(dim_h), ReLU(),
                       Linear(dim_h, dim_h), ReLU()), edge_dim=edge_dim)

        self.conv3 = GINEConv(
            Sequential(Linear(dim_h, dim_h), BatchNorm1d(dim_h), ReLU(),
                       Linear(dim_h, dim_h), ReLU()), edge_dim=edge_dim)

        self.conv4 = GINEConv(
            Sequential(Linear(dim_h, dim_h), BatchNorm1d(dim_h), ReLU(),
                       Linear(dim_h, dim_h), ReLU()), edge_dim=edge_dim)

        self.conv5 = GINEConv(
            Sequential(Linear(dim_h, dim_h), BatchNorm1d(dim_h), ReLU(),
                       Linear(dim_h, dim_h), ReLU()), edge_dim=edge_dim)


        self.lin1 = Linear(dim_h*5, dim_h*5)
        self.lin2 = Linear(dim_h*5, 5)

    def forward(self, data):
        x, edge_index, edge_weight, batch = data.x, data.edge_index, data.edge_attr, data.batch

        # Node embeddings 
        h1 = self.conv1(x, edge_index, edge_attr=edge_weight)
        h2 = self.conv2(h1, edge_index, edge_attr=edge_weight)
        h3 = self.conv3(h2, edge_index, edge_attr=edge_weight)
        h4 = self.conv4(h3, edge_index, edge_attr=edge_weight)
        h5 = self.conv5(h4, edge_index, edge_attr=edge_weight)

        # Graph-level readout
        h1 = global_max_pool(h1, batch)
        h2 = global_max_pool(h2, batch)
        h3 = global_max_pool(h3, batch)
        h4 = global_max_pool(h4, batch)
        h5 = global_max_pool(h5, batch)

        # Concatenate graph embeddings
        h = torch.cat((h1, h2, h3, h4, h5), dim=1)

        # Classifier
        h = self.lin1(h)
        h = h.relu()
        h = F.dropout(h, p=0.5, training=self.training)
        h = self.lin2(h)

        return h

"""
This is the definition of a model arhitecture created for node classification using GATConv and Linear layers
""" 
class GAT(nn.Module):
    def __init__(self):
        super().__init__()
        # Define GATConv and Linear layers
        self.conv1 = GATConv(1, 32, heads=4)
        self.lin1 = torch.nn.Linear(1, 4 * 32)
        self.conv2 = GATConv(4 * 32, 32, heads=4)
        self.lin2 = torch.nn.Linear(4 * 32, 4 * 32)
        self.conv3 = GATConv(4 * 32, 1, heads=6,concat=False)
        self.lin3 = torch.nn.Linear(4 * 32, 1)

    def forward(self, data):
        x, edge_index, edge_weight = data.x, data.edge_index, data.edge_attr
        # Process input data through convolutional and linear layers
        x = F.elu(self.conv1(x, edge_index) + self.lin1(x))
        x = F.elu(self.conv2(x, edge_index) + self.lin2(x))
        x = self.conv3(x, edge_index) + self.lin3(x)
        return x
    
    
"""
class NGClassifier is a Pytorch Lightning class that is used to train, validate and test a model arhitecture for graph or node classificaiton
""" 
class NGClassifier(pl.LightningModule):
    def __init__(self, class_weights, model,Config):
        super().__init__()
        self.class_weights = class_weights
        self.model = model
        self.Config = Config
        
    def configure_optimizers(self):
        """
        Configures the optimizer for training the model.
        
        Affected by:
            Config["graph"]["learning_rate"]
        
        Returns:
            optimizer: The configured optimizer.
        """
        optimizer = torch.optim.Adam(self.parameters(), lr=self.Config["model"]["learning_rate"], weight_decay=5e-4)
        return optimizer
    
    def acc_pred(self, out, y):
        """
        Calculates the accuracy and predicted labels that the trained model has generated.

        Affected by:
            Config["graph"]["classif"]

        Args:
            out: Output values from a trained model.
            y: True labels.

        Returns:
            accuracy: Accuracy of the model's predictions.
            pred: Predicted labels.
        """
        if self.Config["graph"]["classif"] == "graph":
            pred = out.argmax(-1)
            accuracy = (pred == y).sum() / pred.shape[0]
            
        elif self.Config["graph"]["classif"] == "node":
            preds = []
            preds.append((out > 0).float().cpu())     
            pred = torch.cat(preds, dim=0)
            accuracy = (pred == y.cpu()).sum() / pred.shape[0]
        
        return accuracy, pred
    
    def loss_function_selection(self):
        """
        Choses a loss function depending on what type of classification is happening (graph or node classification)

        Affected by:
            Config["graph"]["classif"]

        Returns:
            a loss funciton 
        """
        if self.Config["graph"]["classif"] == "graph":
            return CrossEntropyLoss(weight=self.class_weights).to(device)
        elif self.Config["graph"]["classif"] == "node":
            return BCEWithLogitsLoss(weight=self.class_weights).to(device)
            
    def training_step(self, train_batch, batch_idx):
        """
        Performs a single training step on the given batch of train data.

        Args:
            train_batch: Input data for the training step.
            batch_idx: Index of the current batch.

        Returns:
            train_loss: Loss value for the training step.
        """
        out = self.model(train_batch)
        loss_function = self.loss_function_selection()
        train_loss = loss_function(out, train_batch.y)
        
        return train_loss
    
    def validation_step(self, val_batch, batch_idx):
        """
        Performs a single validation step on the given batch of validation data. It logs val_loss and accuracy for early stoping and saving the best trained models
        
        Args:
            val_batch: Test data for the current batch.
            batch_idx: Index of the current batch.
        """
        out = self.model(val_batch)
        loss_function = self.loss_function_selection()
        val_loss = loss_function(out, val_batch.y)
        
        accuracy, pred = self.acc_pred(out, val_batch.y)
        
        self.log("val_loss", val_loss)
        self.log("val_acc", accuracy)        
    
    def test_step(self, test_batch, batch_idx):
        """
        Performs a forward pass on the model to obtain predictions for the test data. It logs accuracy, and collects the true labels and predicted labels for later evaluation.

        Args:
            test_batch: Test data for the current batch.
            batch_idx: Index of the current batch.

        Returns:
            pred: Predicted labels for the test data.
            y: True labels for the test data.
        """
        out = self.model(test_batch)
        loss_function = self.loss_function_selection()
        test_loss = loss_function(out, test_batch.y)
        
        accuracy, pred = self.acc_pred(out, test_batch.y)
        
        self.log("test_acc", accuracy)
        return pred, test_batch.y

        
    def test_epoch_end(self, outputs):
        """
        This function receives accumulated predicted and true labels from the test_step and uses them on the confusion matrix and classification report that are than printed.
        
        Args:
            outputs: Contains an array of predicted and an array of true lables

        """
        global true_array, true_array
        true_array=[outputs[i][1].cpu().numpy() for i in range(len(outputs))]
        pred_array = [outputs[i][0].cpu().numpy() for i in range(len(outputs))]    
        pred_array = np.array(pred_array).reshape(-1, 1)
        true_array = np.array(true_array).reshape(-1, 1)
        print(confusion_matrix(true_array, pred_array))
        print(classification_report(true_array, pred_array))
        
        
def main():

    """
    Main function for training.
    
    Affected by:
        Config["graph"]["classif"]
        Config["model"]
    """
    
    global device
    # initiate callback functions, DVC, Seed and device
    early_stop = EarlyStopping(monitor='val_acc',patience=Config["model"]["patience"], strict=False,verbose=False, mode='max')
    val_checkpoint_best_loss = ModelCheckpoint(filename="best_loss", monitor = "val_loss", mode="max")
    logger = DVCLiveLogger() # the bonus of using DVCLiveLogger() is that you can visualise the validation accuracy live in dvclive/report.html
    
    torch.manual_seed(Config["model"]["SEED"])
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    
    # creates dataset containing graphs and the overall class_weights
    output, class_weights = Rutgers_graph() 

    # sets the train, validation and test sizes and atributes a number of time series coresponding to those sizes to every DataLoader
    train_size = int(Config["model"]["train/val/test"]["train"] * len(output))
    val_size = int(Config["model"]["train/val/test"]["val"]*len(output))
    test_size = len(output) - (val_size + train_size) 
    train_dataset, val_dataset, test_dataset = torch.utils.data.random_split(output, [train_size, val_size, test_size])

    train_loader = DataLoader(train_dataset, batch_size=Config["model"]["batch_size"], shuffle=True)
    val_loader = DataLoader(val_dataset, batch_size=int(Config["model"]["batch_size"]/2), shuffle=False)
    test_loader = DataLoader(test_dataset, batch_size=1, shuffle=False)

    # initializes graph or node model
    if Config["graph"]["classif"] == "graph":
        model = NGClassifier(class_weights,GINE(),Config).double() #GINE is for graph classification
    elif Config["graph"]["classif"] == "node":
        model = NGClassifier(class_weights, GAT(),Config).double() #GAT is for node classification          
    
    #traines the model arhitecture
    trainer = pl.Trainer(logger=logger, max_epochs = Config["model"]["range_epoch"], callbacks=[val_checkpoint_best_loss,early_stop],accelerator='gpu',devices=1)
    trainer.fit(model, train_loader, val_loader)
    
    #tests the model arhitecture and prints the results
    tester = pl.Trainer(accelerator='gpu',devices=1)
    tester.test(model, test_loader)
    

In [10]:
def report(csv_name,true_array, pred_array):
    """
    Generates a classification report based on the true and predicted arrays for a given utility.

    Args:
        csv_name: Name of the .csv file.
        true_array: Array of true labels.
        pred_array: Array of predicted labels.

    Returns:
        Saves the report as a CSV file.
    """
    print(csv_name)
    report = classification_report(true_array, pred_array, output_dict=True)
    df = pd.DataFrame(report).transpose()
    file_name = 'results_rutgers_VG_8-2_'+str(Config["model"]["SEED"])+'_Without_linear/'+csv_name
    
    # Create the directory if it doesn't exist
    directory = os.path.dirname(file_name)
    if not os.path.exists(directory):
        os.makedirs(directory)
    
    df.to_csv(file_name +".csv")
    shutil.make_archive(file_name, 'zip', file_name)

In [11]:
"""
This list contains configurations for generating and training a graph from a Rutgers time series
"""
Config = {
    "graph": {
        "data_path" : "datasets/Rutgers/", # path to datasets
        "folder_name" : "dataset_uncut.csv", #"dataset_uncut.csv", "dataset_cut.csv", "dataset_rss.npz", paths used for cut/uncut/random dataset
        "properties_name" : "dataset_properties.npz",  # path to properties used for random 
        "mask_name" : "dataset_mask.npz", # path to mask dataset used for random
        "classif": "node",  # Type of classification (node or graph)
        "len_type" : "un/cut", #"un/cut", "random", the shape of data used in later paths 
        "type": "VG",  # Type of graph (MTF, VG, Dual_VG)
        "MTF": {
            "num_bins": "auto"  # Number of bins for MTF graph (integer or "auto")
        },
        "VG": {
            "edge_type": "natural",  # Type of edge calculation for VG graph (natural or horizontal)
            "distance": 'distance',  # Type of distance metric for VG graph
                                    # (slope, abs_slope, distance, h_distance, v_distance, abs_v_distance)
            "edge_dir": "directed"  # Directionality of edges in VG graph (undirected or directed)
        }
    },
    
    "model": {
        "SEED": 280,  # Random seed for reproducibility
        "learning_rate": 0.005,  # Learning rate for training
        "batch_size": 64,  # Batch size for training
        "range_epoch": 200,  # Number of training epochs
        "save_file": "test_test",  # File name for saving trained model
        "name_of_save": "test_u-time",  # Name of the save (e.g., checkpoint name)
        "patience": 500,  # Patience for early stopping
        "train/val/test": {
            "train": 0.8,  # Percentage of data used for training
            "val": 0.04,  # Percentage of data used for validation
            "test": 0.16  # Percentage of data used for testing
        }
    }
}

In [None]:
"""
This for loop runs the main function for graph generation, training and saving the testing results into a csv file.
The trained model can be found in DvcLiveLogger/dvclive_run/checkpoints.
"""
#Most common Config parameters
Config["graph"]["classif"] = "node"
Config["graph"]["type"] = "VG"
Config["model"]["SEED"] = 280

#For generating, training and testing the graph
main()
# Generate a classification report for the utility
# report(get_versions_TSSB()[utility][:-4],true_array, pred_array,2)