# Accelerating cluster assignment for SeqClu

SeqClu is a real-time sequence clustering using online K-medoids algorithm. This notebook introduces a new algorithm to improve the cluster assignment with 3 variants.

Make sure to install the requirements from requirements.txt before executing this notebook.

Jump to the [Execution](#execution) section to set the parameters and execute the notebook

## Table of Contents:
* [Helper classes](#helper_classes)
* [Loading Datasets](#loading_datasets)
    * [Toy Dataset](#toy_dataset)
    * [Handwirtten Dataset](#handwritten_dataset)
    * [Synthetic Control Dataset](#sc_dataset)
* [Shared Implementation](#shared_implementation)
* [SeqClu base implementation](#base)
* [SeqClu improved cluster assignment algorithm](#new)
    * [Variant 1](#variant1)
    * [Variant 2](#variant2)
    * [Variant 3](#variant3)
* [The experiment](#experiment)
    * [Hypothesis testing](#hypothesis_testing)
    * [Plotting results](#plotting)
* [Execution](#execution)

In [None]:
import copy
import math
import operator
import os
import random
import re
import time
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd

In [None]:
from abc import ABC, abstractmethod
from io import TextIOWrapper
from statistics import mean, stdev
from typing import Any, Dict, List, Tuple
from zipfile import ZipFile

In [None]:
from fastdtw import fastdtw
from scipy.spatial.distance import euclidean
from scipy.stats import wilcoxon
from sklearn.metrics import f1_score
from tqdm.auto import tqdm
from tslearn.clustering import silhouette_score

## Helper classes <a class="anchor" id="helper_classes"></a>

Those helper classes are needed for running SeqClu and carrying out the experiment.

In [None]:
class Dataset:
    """Dataset class used to encapsulate a dataset and its labels.
    """

    def __init__(self, name: str, points: List, ids: List, labels: List, nclasses: int = None) -> None:
        """The constructor for the Dataset class.
        Args:
            name (str): The given name of the dataset. Used to distinguish which dataset is loaded in the Dataset object.
            points (List): The list of sequences.
            ids (List): The list of ids. [0....n].
            labels (List): The list of labels.
            nclasses (int, optional): The number of classes in the dataset. Gets calculated if it is None. Defaults to None.
        """
        self.name = name
        self.points = points
        self.ids = ids
        self.labels = labels
        self.nclasses = nclasses if nclasses is not None else len(set(self.labels))

    def shuffle(self, nclasses: int, nprototypes: int) -> None:
        """The shuffle method is used to shuffle the data while maintaining the first n * p sequences.
        Args:
            nclasses (int): The number of classes used (n).
            nprototypes (int): The number of prototypes used (p).
        """
        f_points = self.points[0: nclasses * nprototypes]
        s_points = self.points[nclasses * nprototypes:]
        f_ids = self.ids[0: nclasses * nprototypes]
        s_ids = self.ids[nclasses * nprototypes:]
        f_labels = self.labels[0: nclasses * nprototypes]
        s_labels = self.labels[nclasses * nprototypes:]
        s = list(zip(s_points, s_ids, s_labels))
        random.shuffle(s)
        s_points, s_ids, s_labels = zip(*s)
        self.points = f_points + list(s_points)
        self.ids = f_ids + list(s_ids)
        self.labels = f_labels + list(s_labels)

In [None]:
class Measurements:
    """Measurements class used to store the samples of a metric.
    """

    def __init__(self, metric: str, samples: List) -> None:
        """The constructor for the Measurements class.
        Args:
            metric (str): The name of the metric.
            samples (List): The list of samples.
        """        
        self.metric = metric
        self.samples = samples
        # Calculate the mean and stdev
        self.mean = mean(samples)
        self.std = stdev(samples)

    def __str__(self) -> str:
        """The toString method for Measurements class.
        Returns:
            str: The String representation of the measurements class.
        """        
        return self.metric + "{mean=" + str(self.mean) + ", std=" + str(self.std) + "}"

In [None]:
class Variant:
    """The Variant class used to represent an implementation.
    """

    def __init__(self, variant: str, phi: float = -1.0) -> None:
        """The constructor for the Variant class.
        Args:
            variant (str): The name of the Variant (Original, Variant 1, Variant 2, Variant 3).
            phi (float, optional): The phi value for the variant. Leave empty if variant is Original. Defaults to -1.0.
        """        
        self.variant = variant
        self.phi = phi

    def __str__(self) -> str:
        """The __str__ methode of the Variant class.
        Returns:
            str: The String representation of the variant class.
        """        
        if self.phi == -1.0:
            return self.variant
        return self.variant + " with \u03A6 = " + str(self.phi)

    def __eq__(self, other: Any) -> bool:
        """Equals method for the variant class.
        Args:
            other (Any): The other variant.
        Returns:
            bool: Whethers two variants objects are the same.
        """        
        if isinstance(other, Variant):
            if other.variant == self.variant and other.phi == self.phi:
                return True
        return False

    def __hash__(self):
        """The hash methode for the Variant class.
        Returns:
            [type]: The resultant hash.
        """        
        return hash((self.variant, self.phi))

In [None]:
class Stats:
    """The Stats class used to store measurements for different metrics.
    """    
    def __init__(self, name: str, measurements: Dict[str, Measurements]) -> None:
        """The constructor for the Stats class.
        Args:
            name (str): The name of the variant (Original, Variant 1, Variant 2, Variant 3).
            measurements (Dict[str, Measurements]): The metrics with their measurements.
        """        
        self.name = name
        self.measurements = measurements
    def __str__(self) -> str:
        """The __str__ methode of the Stats class.
        Returns:
            str: The String representation of the Stats class.
        """        
        line = self.name + " {\n\t"
        pairs = list(self.measurements.items())
        for i in range(len(pairs)):
            line += str(pairs[i][1]) + ",\n\t" if i != len(pairs) - 1 else str(pairs[i][1]) + "\n}"
        return line

## Loading Datasets <a class="anchor" id="loading_datasets"></a>

There are multiple datasets that can used to experiment on SeqClu which can be found below.

### Toy data-set <a class="anchor" id="toy_dataset"></a>

In [None]:
def generateCurve(n: float, _freq: Tuple[float, float], samplingrate: int, err: float, phase: int) -> List[np.ndarray]:
    """Function used to create the curve for the sine dataset.
    Args:
        n (float): The number of points.
        _freq (Tuple[float, float]): The frequency.
        samplingrate (int): The sampling rate.
        err (float): The error.
        phase (int): The phase.
    Returns:
        List[np.ndarray]: The generated points.
    """
    trajectory = []
    n = int(n)
    for i in range(n):
        freq = random.uniform(_freq[0], _freq[1])
        line = np.arange(1, 101, samplingrate)
        error = [random.random() * err for x in range(len(line))]
        l = np.sin((freq * line) + phase) + error
        trajectory.append(l)
    return trajectory

In [None]:
def generateToyDataset(nclasses: int, nprototypes: int, n_samples: int = 50, samplingrate: int = 1) -> Dataset:
    """Function used to generate the toy dataset.
    Args:
        nclasses (int): The number of classes.
        nprototypes (int): The number of prototypes.
        n_samples (int, optional): The number of points. Defaults to 50.
        samplingrate (int, optional): The sampling rate. Defaults to 1.
    Returns:
        Dataset: The Dataset object that contains the data and the labels.
    """

    c1 = generateCurve(n_samples / nclasses, (0.1, 0.12), samplingrate, 0.2, 5)
    c2 = generateCurve(n_samples / nclasses, (0.2, 0.22), samplingrate, 0.4, 12)
    c3 = generateCurve(n_samples / nclasses, (0.2, 0.22), samplingrate, 0.4, -10)
    classes = list(range(nclasses))
    trajectory = []
    trajectory.extend(list(zip(c1[0:nprototypes], [classes[0]] * len(c1))))
    trajectory.extend(list(zip(c2[0:nprototypes], [classes[1]] * len(c2))))
    trajectory.extend(list(zip(c3[0:nprototypes], [classes[2]] * len(c3))))
    randlist = []
    randlist.extend(list(zip(c1[nprototypes:], [classes[0]] * len(c1))))
    randlist.extend(list(zip(c2[nprototypes:], [classes[1]] * len(c2))))
    randlist.extend(list(zip(c3[nprototypes:], [classes[2]] * len(c3))))
    random.shuffle(randlist)
    trajectory.extend(randlist)
    X1 = [x for (x, y) in trajectory]  # Data
    ann1 = [x for x, y in enumerate(X1)]  # IDs
    labs1 = [y for (x, y) in trajectory]  # classes
    return Dataset("toy dataset", X1, ann1, labs1, nclasses = nclasses)

### UCI Hand-written character dataset <a class="anchor" id="handwritten_dataset"></a>

The UCI hand-written character dataset contains the data of handwritten characters. You can choose which characters you want to use. More info about the dataset can be found [here](https://archive.ics.uci.edu/ml/datasets/optical+recognition+of+handwritten+digits).

In [None]:
def parseFile(lines: List[str]) -> Dict[str, str]:
    """Function used to parse the file for the handwritten dataset.
    Args:
        lines (List[str]): The lines read from the file.
    Returns:
        Dict[str, str]: The characters and their raw data.
    """
    points: Dict[str, str] = dict()
    newchar = False
    cont = False
    point = []
    cclass = None
    for line in lines[1:]:
        if '.COMMENT' in line and 'Class' in line and '[' in line and '#' not in line:
            b = re.findall('.*?\.COMMENT\s+Class\s+\[(.*?)\]', line)
            cclass = b[0]
            newchar = True
            point = []
            continue
        if '.PEN_UP' in line:
            cont = False
            if cclass not in points.keys():
                points[cclass] = []
            points[cclass].append(point)
        if '.PEN_DOWN' in line:
            cont = True
            continue
        if newchar and cont:
            b = re.findall('.*?(\d+)\s+([-\d]+).*', line)
            xy = b[0]
            point.append((int(xy[0]), int(xy[1])))
    return points

In [None]:
def generateHandWrittenDataset(nprototypes: int, classes: List[chr] = ['O', '2', '9']) -> Dataset:
    """Generates the handwritten dataset.
    Args:
        nprototypes (int): The number of prototypes.
        classes (List[chr], optional): The characters to be used. Defaults to ['O', '2', '9'].
    Returns:
        Dataset: The Dataset object that contains the data and the labels.
    """
    # classes = ['C', 'U', 'V', 'W', 'S', 'O', '1', '2', '3', '5', '6', '8', '9']
    segments = dict()
    archive = ZipFile('handwriting.zip', 'r')
    files = archive.namelist()
    for f in files:
        f_ = TextIOWrapper(archive.open(f), encoding="utf-8")
        lines = f_.readlines()
        content = parseFile(lines)
        for cclass, segment in content.items():
            if cclass not in classes:
                continue
            if cclass not in segments.keys():
                segments[cclass] = []
            segments[cclass].extend(segment)
    archive.close()
    selected_segments = []
    for c in classes:
        selected_segments.append(segments[c])

    # Preparing input sequence data (Exp with different settings, e.g. randomize, inverted, etc)
    trajectory = []
    randlist = []
    for i in range(len(classes)):
        trajectory.extend(list(zip(selected_segments[i][0:nprototypes], [classes[i]] * len(selected_segments[i]))))
        # randomize incoming sequences so they belong to random classes
        randlist.extend(list(zip(selected_segments[i][nprototypes:], [classes[i]] * len(selected_segments[i]))))
    random.shuffle(randlist)
    trajectory.extend(randlist)
    # First 15 points are the prototypes
    X2 = [x for (x, y) in trajectory]  # Data
    ann2 = [x for x, y in enumerate(X2)]  # IDs
    labs2 = [y for (x, y) in trajectory]  # classes
    adjusted_labels = []
    conversion = {}
    counter = 0
    for l in labs2:
        if l in conversion:
            adjusted_labels.append(conversion[l])
        else:
            conversion[l] = counter
            adjusted_labels.append(counter)
            counter += 1
    return Dataset("handwritten dataset", X2, ann2, adjusted_labels, nclasses = len(classes))

### Synthetic Control Data-set <a class="anchor" id="fish_dataset"></a>

The synthetic control dataset contains 6 classes each with 50 objects. The sequences have a length of 60. The dataset has been sorted and stored as npy file for efficient retrieval and initialization. The dataset was obtained from [here](https://www.cs.ucr.edu/~eamonn/time_series_data_2018/).

In [None]:
def generateSynthicControlDS(nprototypes: int) -> Dataset:
    """Function used to generate the syntheticControl dataset.
    Args:
        nprototypes (int): The number of prototypes
    Returns:
        Dataset: The Dataset object that contains the data and the labels.
    """
    data = np.load("sc_data.npy").tolist()
    assert len(data) == 300
    # The dataset is sorted by label.
    pairs = {0: data[0:50], 1: data[50:100], 2: data[100:150], 3: data[150:200], 4: data[200:250], 5: data[250:300]}
    # Split the dataset so the first part is used for initialization and second part is shuffled and is used for clustering.
    f_data = []
    f_labels = [i for i in range(6) for _ in range(nprototypes)]
    s_data = []
    s_labels = [i for i in range(6) for _ in range(50- nprototypes)]
    for i in range(6):
        f_data += pairs[i][0:nprototypes]
        s_data += pairs[i][nprototypes:]
    s = list(zip(s_data, s_labels))
    random.shuffle(s)
    s_data, s_labels = zip(*s)
    ann = [x for x, y in enumerate(data)]
    output = Dataset("synthetic control dataset", f_data + list(s_data), ann, f_labels + list(s_labels), nclasses = 6)
    assert len(output.labels) == 300 and len(output.points) == 300 
    return output

## Shared Implementation <a class="anchor" id="shared_implementation"></a>

This section contains the shared implementation that is used by the different versions of the algorithm. The SeqClu class has all the functionality needed to implement every version of SeqClu.

In [None]:
class SeqClu(ABC):
    """The shared (abstract) SeqClu class that's used for all implementations.
    Args:
        ABC ([type]): Declared as an abstract class.
    """

    def __init__(self, dataset: Dataset, nclasses: int, nprototypes: int) -> None:
        """The Constructor for the SeqClu class.
        Args:
            dataset (Dataset): The dataset that will be used for clustering.
            nclasses (int): The number of classes/clusters.
            nprototypes (int): The number of prototypes.
        """
        assert dataset is not None
        self.dataset = dataset
        self.nclasses = nclasses
        self.nprototypes = nprototypes
        # Initialize the clusters
        self.init_clusters = [self.dataset.points[i:i + nprototypes] for i in
                              range(0, (nprototypes * nclasses) - 1, nprototypes)]
        self.assigned_clusters = []
        for i in range(0, nclasses):
            for j in range(0, nprototypes):
                self.assigned_clusters.append(i)
        assert len(self.assigned_clusters) == nclasses * nprototypes
        self.clustered = False
        self.total_time = None
        self.assignment_DTW = None
        self.extra_DTW = None
        self.silhouette_coefficient = None
        self.f_measure = None
        self.average_DTW = None

    @abstractmethod
    def clustering(self) -> None:
        """Abstract method which is used for clustering.
        """
        pass

    def calculateSilhouetteScore(self) -> None:
        """Method to calculate the silhouette score.
        """        
        self.silhouette_coefficient = silhouette_score(self.dataset.points, self.assigned_clusters, metric="dtw",
                                                       n_jobs=-1)
    
    def calculateFmeasure(self) -> None:
        """Method to calculate the f-score.
        """        
        self.f_measure = f1_score(self.dataset.labels, self.assigned_clusters, average='micro')

    def calculateAverageDTW(self) -> None:
        """Calculates the average assignment DTW operations per point clustered.
        """        
        if isinstance(self.assignment_DTW, int):
            self.average_DTW = self.nclasses * self.nprototypes
        else:
            self.average_DTW = mean(self.assignment_DTW)

    def getTotalTime(self) -> float:
        """Gets the total time to cluster all the points (excluding initialization).
        Raises:
            Exception: An exception is raised if the clustered has not been completed.
        Returns:
            float: The total time.
        """        
        if not self.clustered:
            raise Exception("Clustering hasn't been initiated")
        return self.total_time

    def getSilhoutteScore(self) -> float:
        """Gets the silhouette score.
        Raises:
            Exception: An exception is raised if the clustered has not been completed.
        Returns:
            float: The silhouette score.
        """
        if not self.clustered:
            raise Exception("Clustering hasn't been initiated")
        if self.silhouette_coefficient is None:
            self.calculateSilhouetteScore()
        return self.silhouette_coefficient

    def getFmeasure(self) -> float:
        """Gets the f-score.
        Raises:
            Exception: An exception is raised if the clustered has not been completed.
        Returns:
            float: The f-score.
        """
        if not self.clustered:
            raise Exception("Clustering hasn't been initiated")
        if self.f_measure is None:
            self.calculateFmeasure()
        return self.f_measure

    def getAssignmentDTW(self) -> int:
        """Gets the total DTW operations during clustering.
        Raises:
            Exception: An exception is raised if the clustered has not been completed.
        Returns:
            float: The total DTW operating during clustering.
        """
        if not self.clustered:
            raise Exception("Clustering hasn't been initiated")
        elif isinstance(self.assignment_DTW, int):
            return self.assignment_DTW
        elif isinstance(self.assignment_DTW, list):
            return sum(self.assignment_DTW)
        raise Exception("assignmet_DTW initialized with wrong type. Actual type: " + str(self.assignment_DTW))

    def getExtraDTW(self) -> int:
        """Gets the extra DTW operations needed (during initialization). Always 0 for base and variant 1.
        Raises:
            Exception: An exception is raised if the clustered has not been completed.
        Returns:
            float: The extra DTW operations during initialization.
        """
        if not self.clustered:
            raise Exception("Clustering hasn't been initiated")
        return self.extra_DTW
    
    def getAverageDTW(self) -> float:
        """Gets the average DTW operations per clustered point.
        Raises:
            Exception: An exception is raised if the clustered has not been completed.
        Returns:
            float: The average DTW operations per clustered point.
        """
        if not self.clustered:
            raise Exception("Clustering hasn't been initiated")
        if self.average_DTW is None:
            self.calculateAverageDTW()
        return self.average_DTW

def calculateDTW(p1: Any, p2: Any) -> float:
   return fastdtw(p1, p2, dist=euclidean)[0]

In [None]:
class Cluster:
    """The Cluster class is used to encapsulate the prototypes of the cluster. Used for Base and Variant 1.
    """

    def __init__(self, nprototypes: int, initial_prototypes: List) -> None:
        """The constructor for the Cluster class.
        Args:
            nprototypes (int): The number of prototypes.
            initial_prototypes (List): The initial prototypes.
        """        
        assert len(initial_prototypes) == nprototypes
        self.prototypes = initial_prototypes
        self.nprototypes = nprototypes

    def shufflePrototypes(self) -> None:
        """Shuffles the prototypes of the cluster.
        """    
        random.shuffle(self.prototypes)

    def clusterUpdate(self, point: Any) -> None:
        """Updates the cluster prototypes with the new point
        Args:
            point (Any): The clustered point.
        """
        maximum_distance = -1
        maximum_idx = -1
        for i in range(self.nprototypes):
            distance = calculateDTW(self.prototypes[i], point)
            if distance > maximum_distance:
                maximum_idx = i
                maximum_distance = distance
        self.prototypes[maximum_idx] = point

## SeqClu base implementation <a class="anchor" id="base"></a>

This is the original implementation of SeqClu.

In [None]:
class SeqCluBase(SeqClu):
    """The original SeqClu implementation class.
    Args:
        SeqClu ([type]): Inherits from the SeqClu abstract class.
    """

    def __init__(self, dataset: Dataset, nclasses: int, nprototypes: int) -> None:
        """The Constructor for the SeqCluBase class.
        Args:
            dataset (Dataset): The dataset that will be used for clustering.
            nclasses (int): The number of classes/clusters.
            nprototypes (int): The number of prototypes.
        """       
        SeqClu.__init__(self, dataset, nclasses, nprototypes)
        self.clusters = []
        for cluster_prototypes in self.init_clusters:
            self.clusters.append(Cluster(self.nprototypes, cluster_prototypes))

        # Remove this attribute to free up memory since it will no longer be used.
        del self.init_clusters

        # The number of DTW calculations is known beforehand
        self.assignment_DTW = self.nclasses * self.nprototypes * len(self.dataset.points)
        self.extra_DTW = 0
        self.clustering()

    def clustering(self) -> None:
        """Clusters the points according to the original algorithm.
        Raises:
            Exception: An exception is raised if the clustering has already been done.
        """
        if self.clustered:
            raise Exception("Points have already been clustered")
        start_time = time.time()
        for pidx, point in enumerate(self.dataset.points[self.nprototypes * self.nclasses:]):

            # Cluster assignment phase: assigning cluster to a point
            minimum_distance = math.inf
            minimum_idx = -1
            for cidx, cluster in enumerate(self.clusters):
                distance = 0
                for prototype in cluster.prototypes:
                    distance = calculateDTW(prototype, point)
                distance = distance * 1.0 / cluster.nprototypes * 1.0
                if distance < minimum_distance:
                    minimum_distance = distance
                    minimum_idx = cidx

            # Ensure that every point has been clustered
            assert minimum_idx != -1
            self.clusters[minimum_idx].clusterUpdate(point)
            self.assigned_clusters.append(minimum_idx)
        self.total_time = time.time() - start_time
        assert len(self.dataset.points) == len(self.assigned_clusters)
        self.clustered = True

## SeqClu new cluster assignment algorithm <a class="anchor" id="new"></a>

This section contains the new algorithm for cluster assignment. There are three variants of this algorithms. These variants differ on how they select the prototypes at each iteration.

In [None]:
class SeqCluImproved(SeqClu):
    """The new improved SeqClu implementation class.
    Args:
        SeqClu ([type]): Inherits from the SeqClu abstract class.
    """

    def __init__(self, dataset: Dataset, nclasses: int, nprototypes: int, phi: float) -> None:
        """The Constructor for the SeqCluImproved class.
        Args:
            dataset (Dataset): The dataset that will be used for clustering.
            nclasses (int): The number of classes/clusters.
            nprototypes (int): The number of prototypes.
            phi (float): The phi value.
        """
        SeqClu.__init__(self, dataset, nclasses, nprototypes)
        self.phi = phi
        self.extra_DTW = 0
        self.assignment_DTW = []

    def clustering(self) -> None:
        """Clusters the points according to the new improved algorithm algorithm.
        Raises:
            Exception: An exception is raised if the clustering has already been done.
        """
        if self.clustered:
            raise Exception("Points have already been clustered")
        start_time = time.time()
        for pidx, point in enumerate(self.dataset.points[self.nprototypes * self.nclasses:]):
            # Cluster assignment phase: assigning cluster to a point
            max_iteration = self.nprototypes
            number_DTW = 0
            minimum_idx = -1

            # Shuffle the prototypes for each cluster
            # No shuffling is done for variants 2 & 3
            self.shuffleClusterPrototypes()
            selected_prototype_indices = {}
            selected_prototype_distances = {}
            for i in range(max_iteration):
                number_DTW += self.selectPrototype(point, i, selected_prototype_distances, selected_prototype_indices)
                if i == max_iteration - 1:
                    pairs = list(selected_prototype_distances.items())
                    minimum_distance = pairs[0][1]
                    minimum_idx = pairs[0][0]
                    for j in range(1, len(pairs)):
                        current_distance = pairs[j][1]
                        if current_distance < minimum_distance:
                            minimum_distance = current_distance
                            minimum_idx = pairs[j][0]
                else:
                    # Get the cluster id with the shortest distance to the incoming point
                    closest_cluster = self.getClosestCluster(selected_prototype_distances)
                    # Filter the cluster that are too far from the closest cluster
                    new_selected_prototype_distances = {}
                    new_selected_prototype_indices = {}
                    for k, v in selected_prototype_distances.items():
                        if k == closest_cluster:
                            new_selected_prototype_distances[k] = v
                            new_selected_prototype_indices[k] = selected_prototype_indices[k]
                        else:
                            closest_prototype = selected_prototype_distances[closest_cluster]
                            # In case the distance is zero, take the small float number as smallest distance
                            closest_prototype = 1.0e-200 if closest_prototype == 0.0 else closest_prototype
                            if (v - closest_prototype) / closest_prototype * 1.0 <= self.phi:
                                new_selected_prototype_distances[k] = v
                                new_selected_prototype_indices[k] = selected_prototype_indices[k]
                    # If one cluster remains, then assign to that cluster.
                    if len(new_selected_prototype_distances) == 1:
                        minimum_idx = list(new_selected_prototype_distances.items())[0][0]
                        break
                    # Otherwise continue with filtered clusters
                    else:
                        selected_prototype_distances = new_selected_prototype_distances
                        selected_prototype_indices = new_selected_prototype_indices

            # Ensure that every point has been clustered
            assert minimum_idx != -1
            self.assignment_DTW.append(number_DTW)
            # Cluster update phase
            self.clusters[minimum_idx].clusterUpdate(point)
            self.assigned_clusters.append(minimum_idx)
        assert len(self.dataset.points) == len(self.assigned_clusters)
        self.total_time = time.time() - start_time
        self.clustered = True
    
    @abstractmethod
    def selectPrototype(self, point: Any, iteration: int, selected_prototype_distances: Dict[int, float],
                        selected_prototype_indices: Dict[int, List[int]]) -> int:
        """Selects the next prototype for each iteration.
        Args:
            point (Any): The current point.
            iteration (int): The current iteration
            selected_prototype_distances (Dict[int, float]): The distance to the closest prototype for each cluster.
            selected_prototype_indices (Dict[int, List[int]]): The indices of the selected prototypes of each cluster.
        Returns:
            int: The number of DTW operations done.
        """
        pass

    def shuffleClusterPrototypes(self) -> None:
        """Shuffles the prototypes of the cluster. Triggers only for Variant 1.
        """        
        for cluster in self.clusters:
            cluster.shufflePrototypes()

    def getClosestCluster(self, selected_prototypes: Dict[int, float]) -> int:
        """Retrieves the cluster with the closest prototype.
        Args:
            selected_prototypes (Dict[int, float]): The distance to the closest prototype for each cluster.
        Returns:
            int: The closest cluster.
        """        
        minimum = None
        minimum_id = -1
        pairs = [(k, v) for k, v in selected_prototypes.items()]
        for i in range(len(pairs)):
            if i == 0:
                minimum = pairs[i][1]
                minimum_id = pairs[i][0]
            else:
                c = pairs[i][1]
                if c < minimum:
                    minimum = c
                    minimum_id = pairs[i][0]
        return minimum_id

### SeqClu Variant 1 <a class="anchor" id="variant1"></a>

Variant 1 starts with selecting a random prototype from each cluster and continues selecting random prototypes in subsequent iterations.

In [None]:
class SeqCluV1(SeqCluImproved):
    """The new improved SeqClu implementation class with variant 1 prototype selection.
    Args:
        SeqCluImproved ([type]): Inherits from the SeqCluImproved class.
    """

    def __init__(self, dataset: Dataset, nclasses: int, nprototypes: int, phi: float) -> None:
        """The Constructor for the SeqCluV1 class.
        Args:
            dataset (Dataset): The dataset that will be used for clustering.
            nclasses (int): The number of classes/clusters.
            nprototypes (int): The number of prototypes.
            phi (float): The phi value.
        """
        SeqCluImproved.__init__(self, dataset, nclasses, nprototypes, phi)
        self.clusters = []
        for cluster_prototypes in self.init_clusters:
            self.clusters.append(Cluster(self.nprototypes, cluster_prototypes))

        # Remove this attribute to free up memory since it will no longer be used.
        del self.init_clusters
        self.clustering()
    
    def clustering(self) -> None:
        """Clusters the points according to the new improved algorithm algorithm.
        Raises:
            Exception: An exception is raised if the clustering has already been done.
        """
        SeqCluImproved.clustering(self)
    
    def selectPrototype(self, point: Any, iteration: int, selected_prototype_distances: Dict[int, float],
                        selected_prototype_indices: Dict[int, List[int]]) -> int:
        """Selects the next prototype for each iteration according to variant 1 selection.
        Args:
            point (Any): The current point.
            iteration (int): The current iteration
            selected_prototype_distances (Dict[int, float]): The distance to the closest prototype for each cluster.
            selected_prototype_indices (Dict[int, List[int]]): The indices of the selected prototypes of each cluster.
        Returns:
            int: The number of DTW operations done.
        """
        # All the prototypes were already randomized so they are selected iteratively.
        number_DTW = 0
        # For the first iteration, select the first prototype
        if iteration == 0:
            for cidx, cluster in enumerate(self.clusters):
                p_d = calculateDTW(cluster.prototypes[iteration], point)
                number_DTW += 1
                selected_prototype_distances[cidx] = p_d
                selected_prototype_indices[cidx] = [iteration]
        # For subsequent iteration, only select a prototype if it is closer to the point than the currently selected one
        else:
            selected_clusters = selected_prototype_distances.keys()
            for cidx, cluster in enumerate(self.clusters):
                if cidx in selected_clusters:
                    p_d = calculateDTW(cluster.prototypes[iteration], point)
                    number_DTW += 1
                    if p_d < selected_prototype_distances[cidx]:
                        selected_prototype_distances[cidx] = p_d
                        selected_prototype_indices[cidx].append(iteration)
        return number_DTW

### SeqClu Variant 2 <a class="anchor" id="variant2"></a>

Variant 2 starts with selecting a random prototype from each cluster but selects the nearest prototype to the last selected one which hasn't yet been selected.

In [None]:
class ClusterV2(Cluster):
    """The ClusterV2 class is used to encapsulate the prototypes of the cluster. Used for Variant 2.
    """

    def __init__(self, nprototypes: int, initial_prototypes: List) -> None:
        """The constructor for the ClusterV2 class.
        Args:
            nprototypes (int): The number of prototypes.
            initial_prototypes (List): The initial prototypes.
        """
        Cluster.__init__(self, nprototypes, initial_prototypes)
        # Calculates the distances between the prototypes.
        self.prototype_distances = []
        for i in range(self.nprototypes):
            distances = []
            for j in range(self.nprototypes):
                if i == j:
                    distances.append(0.0)
                elif i < j:
                    distances.append(calculateDTW(self.prototypes[i], self.prototypes[j]))
                elif i > j:
                    distances.append(self.prototype_distances[j][i])
            self.prototype_distances.append(distances)
    
    def shufflePrototypes(self) -> None:
        """No shuffling is needed for ClusterV2.
        """        
        pass

    def clusterUpdate(self, point: Any) -> None:
        """Updates the cluster prototypes with the new point
        Args:
            point (Any): The clustered point.
        """
    
        maximum_distance = -1
        maximum_idx = -1
        old_distances = []
        for i in range(self.nprototypes):
            distance = calculateDTW(self.prototypes[i], point)
            old_distances.append(distance)
            if distance > maximum_distance:
                maximum_idx = i
                maximum_distance = distance
        self.prototypes[maximum_idx] = point
        # Updates the distances matrix with the new point.
        for i in range(self.nprototypes):
            if i == maximum_idx:
                continue
            else:
                distance = old_distances[i]
                self.prototype_distances[maximum_idx][i] = distance
                self.prototype_distances[i][maximum_idx] = distance
    
    def getPrototypeNeighbor(self, selected_prototypes: List[int]) -> int:
        """Method used to get the nearest prototype and that hasn't been selected.
        Args:
            selected_prototypes (List[int]): The prototypes selected.
        Returns:
            int: The id of the nearest not selected prototype.
        """        
        maximum_distance = math.inf
        maximum_idx = -1
        prototype_id = selected_prototypes[len(selected_prototypes) - 1]
        for i in range(self.nprototypes):
            if i is not selected_prototypes:
                distance = self.prototype_distances[prototype_id][i]
                if distance < maximum_distance:
                    maximum_distance = distance
                    maximum_idx = i
        return maximum_idx

In [None]:
class SeqCluV2(SeqCluImproved):
    """The new improved SeqClu implementation class with variant 2 prototype selection.
    Args:
        SeqCluImproved ([type]): Inherits from the SeqCluImproved class.
    """

    def __init__(self, dataset: Dataset, nclasses: int, nprototypes: int, phi: float) -> None:
        """The Constructor for the SeqCluV2 class.
        Args:
            dataset (Dataset): The dataset that will be used for clustering.
            nclasses (int): The number of classes/clusters.
            nprototypes (int): The number of prototypes.
            phi (float): The phi value.
        """
        SeqCluImproved.__init__(self, dataset, nclasses, nprototypes, phi)
        self.clusters = []
        for cluster_prototypes in self.init_clusters:
            self.clusters.append(ClusterV2(self.nprototypes, cluster_prototypes))

        # Remove this attribute to free up memory since it will no longer be used.
        del self.init_clusters

        # Add the extra DTW calculations during initialization
        self.extra_DTW = self.nclasses * math.comb(self.nprototypes, 2)
        self.clustering()

    def clustering(self) -> None:
        """Clusters the points according to the new improved algorithm algorithm.
        Raises:
            Exception: An exception is raised if the clustering has already been done.
        """
        SeqCluImproved.clustering(self)

    def selectPrototype(self, point: Any, iteration: int, selected_prototype_distances: Dict[int, float],
                        selected_prototype_indices: Dict[int, List[int]]) -> int:
        """Selects the next prototype for each iteration according to variant 2 selection.
        Args:
            point (Any): The current point.
            iteration (int): The current iteration
            selected_prototype_distances (Dict[int, float]): The distance to the closest prototype for each cluster.
            selected_prototype_indices (Dict[int, List[int]]): The indices of the selected prototypes of each cluster.
        Returns:
            int: The number of DTW operations done.
        """        
        number_DTW = 0
        # For the first iteration, select the first prototype
        if iteration == 0:
            for cidx, cluster in enumerate(self.clusters):
                random_prototype = random.randint(0, self.nprototypes - 1)
                p_d = calculateDTW(cluster.prototypes[random_prototype], point)
                number_DTW += 1
                selected_prototype_distances[cidx] = p_d
                selected_prototype_indices[cidx] = [random_prototype]
        # For subsequent iteration, only select a prototype if it is closer to the point than the currently selected one
        else:
            selected_clusters = selected_prototype_distances.keys()
            for cidx, cluster in enumerate(self.clusters):
                if cidx in selected_clusters:
                    # Gets the nearest (non selected) prototype to the last selected one.
                    next_prototype_id = cluster.getPrototypeNeighbor(selected_prototype_indices[cidx])
                    p_d = calculateDTW(cluster.prototypes[next_prototype_id], point)
                    number_DTW += 1
                    if p_d < selected_prototype_distances[cidx]:
                        selected_prototype_distances[cidx] = p_d
                        selected_prototype_indices[cidx].append(next_prototype_id)
        return number_DTW

### SeqClu Variant 3 <a class="anchor" id="variant3"></a>

Variant 3 selects prototypes based on how central they are in each cluster.

In [None]:
class ClusterV3(Cluster):
    """The ClusterV3 class is used to encapsulate the prototypes of the cluster. Used for Variant 3.
    """

    def __init__(self, nprototypes: int, initial_prototypes: List) -> None:
        """The constructor for the ClusterV3 class.
        Args:
            nprototypes (int): The number of prototypes.
            initial_prototypes (List): The initial prototypes.
        """
        Cluster.__init__(self, nprototypes, initial_prototypes)
        # Calculates the distances between the prototypes.
        self.prototype_distances = []
        for i in range(self.nprototypes):
            distances = []
            for j in range(self.nprototypes):
                if i == j:
                    distances.append(0.0)
                elif i < j:
                    distances.append(calculateDTW(self.prototypes[i], self.prototypes[j]))
                elif i > j:
                    distances.append(self.prototype_distances[j][i])
            self.prototype_distances.append(distances)
        # Sort the prototype.
        self.sortPrototypes()

    def shufflePrototypes(self) -> None:
        """No shuffling is needed for ClusterV2.
        """
        pass

    def sortPrototypes(self) -> None:
        """Sorts the prototypes based on how central they are.
        """        
        prototype_average_distance = []
        for i in range(len(self.prototype_distances)):
            prototype_average_distance.append((i, sum(self.prototype_distances[i]) / (self.nprototypes - 1)))
        prototype_average_distance.sort(key=operator.itemgetter(1))
        new_order = [i[0] for i in prototype_average_distance]
        self.prototypes = [self.prototypes[i] for i in new_order]

    def clusterUpdate(self, point: Any) -> None:
        """Updates the cluster prototypes with the new point
        Args:
            point (Any): The clustered point.
        """
        maximum_distance = -1
        maximum_idx = -1
        old_distances = []
        for i in range(self.nprototypes):
            distance = calculateDTW(self.prototypes[i], point)
            old_distances.append(distance)
            if distance > maximum_distance:
                maximum_idx = i
                maximum_distance = distance
        self.prototypes[maximum_idx] = point
        # Updates the distances matrix with the new point.
        for i in range(self.nprototypes):
            if i == maximum_idx:
                continue
            else:
                distance = old_distances[i]
                self.prototype_distances[maximum_idx][i] = distance
                self.prototype_distances[i][maximum_idx] = distance
        # Sort the prototypes again.
        self.sortPrototypes()

In [None]:
class SeqCluV3(SeqCluImproved):
    """The new improved SeqClu implementation class with variant 3 prototype selection.
    Args:
        SeqCluImproved ([type]): Inherits from the SeqCluImproved class.
    """

    def __init__(self, dataset: Dataset, nclasses: int, nprototypes: int, phi: float) -> None:
        """The Constructor for the SeqCluV3 class.
        Args:
            dataset (Dataset): The dataset that will be used for clustering.
            nclasses (int): The number of classes/clusters.
            nprototypes (int): The number of prototypes.
            phi (float): The phi value.
        """
        SeqCluImproved.__init__(self, dataset, nclasses, nprototypes, phi)
        self.clusters = []
        for cluster_prototypes in self.init_clusters:
            self.clusters.append(ClusterV3(self.nprototypes, cluster_prototypes))

        # Remove this attribute to free up memory since it will no longer be used.
        del self.init_clusters
        self.extra_DTW = self.nclasses * math.comb(self.nprototypes, 2)
        self.clustering()

    def clustering(self) -> None:
        """Clusters the points according to the new improved algorithm algorithm.
        Raises:
            Exception: An exception is raised if the clustering has already been done.
        """
        SeqCluImproved.clustering(self)

    def selectPrototype(self, point: Any, iteration: int, selected_prototype_distances: Dict[int, float],
                        selected_prototype_indices: Dict[int, List[int]]) -> int:
        """Selects the next prototype for each iteration according to variant 3 selection.
        Args:
            point (Any): The current point.
            iteration (int): The current iteration
            selected_prototype_distances (Dict[int, float]): The distance to the closest prototype for each cluster.
            selected_prototype_indices (Dict[int, List[int]]): The indices of the selected prototypes of each cluster.
        Returns:
            int: The number of DTW operations done.
        """
        number_DTW = 0
        # Prototypes were already sorted to they are selected iteratively.
        # For the first iteration, select the first prototype
        if iteration == 0:
            for cidx, cluster in enumerate(self.clusters):
                p_d = calculateDTW(cluster.prototypes[iteration], point)
                number_DTW += 1
                selected_prototype_distances[cidx] = p_d
                selected_prototype_indices[cidx] = [iteration]
        # For subsequent iteration, only select a prototype if it is closer to the point than the currently selected one
        else:
            selected_clusters = selected_prototype_distances.keys()
            for cidx, cluster in enumerate(self.clusters):
                if cidx in selected_clusters:
                    p_d = calculateDTW(cluster.prototypes[iteration], point)
                    number_DTW += 1
                    if p_d < selected_prototype_distances[cidx]:
                        selected_prototype_distances[cidx] = p_d
                        selected_prototype_indices[cidx].append(iteration)
        return number_DTW

## The experiment <a class="anchor" id="experiment"></a>

The Experiment class receives the variants that will be used for the experiment and runs these variants multiple times while shuffling data at the end of each trial.

In [None]:
class Experiment:
    """The Experiment class used to run multiple iterations of different variants.
    """

    def __init__(self, dataset: Dataset, nclasses: int, nprototypes: int, variants: List[Variant]) -> None:
        """The constructor for the Experiment class.
        Args:
            dataset (Dataset): The dataset to be used for the experiment.
            nclasses (int): The number of classes.
            nprototypes (int): The number of prototypes.
            variants (List[Variant]): The list of variants for the experiments.
        """
        # Ensure that the dataset is loaded.
        assert dataset is not None
        self.dataset = dataset
        self.nclasses = nclasses
        self.nprototypes = nprototypes
        self.variants = variants
        self.results: Dict[Variant, List[SeqClu]] = {}
        for c in self.variants:
            self.results[c] = []
        self.trials = None
        self.completed = False

    def run_experiment(self, trials: int) -> None:
        """Runs the experiements for multuple trials with the preselected variants.
        Args:
            trials (int): The number of trials.
        """        
        assert trials >= 2
        self.trials = trials
        assert not self.completed
        pbar = tqdm(total= trials * len(self.results.keys()), position=0, leave=True)
        for t in range(trials):
            ds = self.dataset
            shuffled_dataset = copy.deepcopy(ds)
            shuffled_dataset.shuffle(self.nclasses, self.nprototypes)
            assert shuffled_dataset is not None
            for k in self.results.keys():
                if k.variant == "Original":
                    self.results[k].append(SeqCluBase(shuffled_dataset, self.nclasses, self.nprototypes))
                if k.variant == "Variant 1":
                    self.results[k].append(SeqCluV1(shuffled_dataset, self.nclasses, self.nprototypes, k.phi))
                elif k.variant == "Variant 2":
                    self.results[k].append(SeqCluV2(shuffled_dataset, self.nclasses, self.nprototypes, k.phi))
                elif k.variant == "Variant 3":
                    self.results[k].append(SeqCluV3(shuffled_dataset, self.nclasses, self.nprototypes, k.phi))
                pbar.update(1)
        pbar.close()
        self.completed = True

    def get_stats(self, metrics: List[str] = ["Total DTW", "F score"], to_print: bool = False,
                  save_csv: bool = False) -> Dict[Variant, Stats]:
        """Calculates the mean and stdev for the metrics.
        Args:
            metrics (List[str], optional): The list of metrics to generate stats for. Defaults to ["Total DTW", "F score"].
            to_print (bool, optional): Option to print the stats. Defaults to False.
            save_csv (bool, optional): Option to save stats to csv. Defaults to False.
        Raises:
            Exception: Exception is raised if the experiments wan't run before.
        Returns:
            Dict[Variant, Stats]: The variants with their stats.
        """        
        if not self.completed:
            raise Exception("Run the experiment before executing this methode")
        stats = {}
        for k, v in self.results.items():
            measurements = {}
            for m in metrics:
                measure = None
                if m == "Total time":
                    measure = [i.getTotalTime() for i in v]
                if m == "Total DTW":
                    measure = [i.getAssignmentDTW() + i.getExtraDTW() for i in v]
                if m == "Silhouette":
                    measure = [i.getSilhoutteScore() for i in v]
                if m == "F score":
                    measure = [i.getFmeasure() for i in v]
                if m == "Accuracy":
                    measure = [i.getAccuracy() for i in v]
                if m == "Average DTW":
                    measure = [i.getAverageDTW() for i in v]
                if m == "Assignment DTW":
                    measure = [i.getAssignmentDTW() for i in v]
                if m == "Extra DTW":
                    measure = [i.getExtraDTW() for i in v]
                measurements[m] = Measurements(m, measure)
            stats[k] = Stats(str(k), measurements)
        if to_print:
            for k, v in stats.items():
                print(str(v) + "\n")
        if save_csv:
            self.save_csv(stats, metrics)
        return stats

    def save_csv(self, stats: Dict[Variant, Stats], metrics: List[str]) -> None:
        """Saves the stats to a csv file.
        Args:
            stats (Dict[Variant, Stats]): The stats for each variant.
            metrics (List[str]): The metrics.
        """        
        output = []
        labels = ["Implementation"]
        for m in metrics:
            labels.append("Mean " + m)
            labels.append("Stdev " + m)
        for k, v in stats.items():
            o = []
            o.append(v.name)
            for m in metrics:
                o.append(v.measurements[m].mean)
                o.append(v.measurements[m].std)
            output.append(o)
        df = pd.DataFrame(output, columns=labels)
        df.to_csv("output\\" + self.dataset.name + "_" + str(self.nclasses) + "C_" + str(self.nprototypes) + "P.csv", index=False)

### Hypothesis Testing <a class="anchor" id="hypothesis_testing"></a>

Wilcoxon statistical test is carried out to check if there is a significant difference between two variants for different metrics.

In [None]:
def test_metric(variant1: Variant, variant2: Variant, metric: str, stats: Dict[Variant, Stats]) -> None:
    """Function used to carry the Wilcoxon test between two variants.
    Args:
        variant1 (Variant): The first variant.
        variant2 (Variant): The second variant.
        metric (str): The metric.
        stats (Dict[Variant, Stats]): The stats for each variant.
    """    
    if variant1 in stats and variant2 in stats:
        variant1_samples = stats[variant1].measurements[metric].samples
        variant2_samples = stats[variant2].measurements[metric].samples
        stat, p = wilcoxon(variant1_samples, variant2_samples)
        print("---------------------------------------------------------")
        print('stat=%.3f, p=%.3f' % (stat, p))
        if p > 0.05:
            print("H0: There is no significant difference in " + metric + " between " + str(variant1) + " and " + str(
                variant2))
        else:
            print("H1: There is significant difference in " + metric + " between " + str(variant1) + " and " + str(
                variant2))
        print("---------------------------------------------------------")
    else:
        print("Trial not found. Run the experiment again")

In [None]:
def hypothesis_testing(exp: Experiment, combs: List[Tuple[Variant, Variant]],
                       metrics: List[str] = ["Total time", "Total DTW", "Average DTW", "F score"]) -> None:
    """Function used to carry Wilcoxon statistical test between pairs of variants for each metric.
    Args:
        exp (Experiment): The experiment object.
        combs (List[Tuple[Variant, Variant]]): The list of pairs of variants to test.
        metrics (List[str], optional): The list of metrics to test for. Defaults to ["Total time", "Total DTW", "Average DTW", "F score"].
    """    
    stats = exp.get_stats(metrics=metrics)
    for v1, v2 in combs:
        for m in metrics:
            test_metric(v1, v2, m, stats)

### Plotting results <a class="anchor" id="plotting"></a>

The results of the experiment are finally plotted as a bar chart.

In [None]:
def plot_graph(exp: Experiment, combs: List[List[Variant]],
               metrics: List[str] = ["Total time", "Total DTW", "Average DTW", "F score"]) -> None:
    """Plots the stats for particular list of variants
    Args:
        exp (Experiment): The experiment object.
        combs (List[List[Variant]]): The variants to be plotted. Each list is plotted separately.
        metrics (List[str], optional): The list of metrics. Defaults to ["Total time", "Total DTW", "Average DTW", "F score"].
    """    
    stats = exp.get_stats(metrics=metrics + ["Assignment DTW", "Extra DTW"])
    i = 0
    # plt.rcParams.update(plt.rcParamsDefault)
    # plt.rcParams.update({"font.size": 12, "font.weight": "bold", "figure.figsize": (12, 9)})
    plt.style.use("ggplot")
    plt.figure(dpi=7200)
    for c in combs:
        for m in metrics:
            plt.rcParams.update({"font.size": 25, "font.weight": "bold", "figure.figsize": (12, 10)})
            fig, ax = plt.subplots()
            # Total DTW is plotted differently (uses Assignment DTW + Extra DTW)
            if m == "Total DTW":
                labels = []
                assignment_means = []
                extra_means = []
                assignment_std = []
                extra_std = []
                width = 0.35
                for sc in c:
                    labels.append(str(sc))
                    assignment_means.append(round(stats[sc].measurements["Assignment DTW"].mean, 2))
                    extra_means.append(round(stats[sc].measurements["Extra DTW"].mean, 2))
                    assignment_std.append(round(stats[sc].measurements["Assignment DTW"].std, 2))
                    extra_std.append(round(stats[sc].measurements["Extra DTW"].std, 2))

                p1 = ax.bar(labels, assignment_means, width, yerr=assignment_std, label="Assignment DTW",
                            align='center', ecolor='black', capsize=3)
                p2 = ax.bar(labels, extra_means, width, bottom=assignment_means, label="Extra DTW", align='center',
                            ecolor='black', capsize=3)
                ax.set_ylabel("Mean " + m + " calculations")
                ax.set_title("Total DTW calculations for " + exp.dataset.name + "\n #samples=" + str(
                    exp.trials) + ", #clusters=" + str(exp.nclasses) + ", #prototypes=" + str(exp.nprototypes), pad=85)
                ax.legend(loc='best', handletextpad=1.75)
                ax.bar_label(p1, label_type='center', padding=5, rotation=90)
                # ax.bar_label(p2, label_type='center')
                ax.bar_label(p2, padding=10, rotation=90)
            else:
                labels = []
                means = []
                std = []
                width = 0.35
                for sc in c:
                    labels.append(str(sc))
                    means.append(round(stats[sc].measurements[m].mean, 2))
                    std.append(round(stats[sc].measurements[m].std, 2))
                
                p1 = ax.bar(labels, means, width, yerr=std, clip_on=False, align='center', ecolor='black', capsize=3)
                ax.bar_label(p1, label_type='center', padding=5, rotation=90)
                ax.set_ylabel("Mean " + m)
                ax.set_title(m + " for " + exp.dataset.name + "\n #samples=" + str(exp.trials) + ", #clusters=" + str(
                    exp.nclasses) + ", #prototypes=" + str(exp.nprototypes), pad=50)

            ax.yaxis.grid(True)
            plt.xticks(rotation=90)
            # plt.autoscale()
            plt.show()
            fig.savefig("output\\" + str(i) + ".png", dpi=fig.dpi, bbox_inches="tight")
            del fig
            del ax
            i += 1

## Execution <a class="anchor" id="execution"></a>

This is the default list of variants that will be used for the experiments. It consists of the original SeqClu and the three variants of the new algorithm each using three values for phi.

In [None]:
org = Variant("Original")
variants = [org, Variant("Variant 1", 0.75), Variant("Variant 1", 1.0), Variant("Variant 1", 1.25),
            Variant("Variant 2", 0.75), Variant("Variant 2", 1.0), Variant("Variant 2", 1.25),
            Variant("Variant 3", 0.75), Variant("Variant 3", 1.0), Variant("Variant 3", 1.25)]

# Alternative combination used for testing
# variants = [org, Variant("Variant 1", 1.0), Variant("Variant 2", 1.0), Variant("Variant 3", 1.0)]

Set the parameters correctly. The fish dataset always has 7 classes. The number of classes for the handwritten dataset depends on the characters you choose.

The default implementation of SeqClu uses 5 prototypes.

Uncomment the relevant line for the dataset that you want.

In [None]:
nprototypes = 5 # Number of prototypes.
# The toy dataset.
# dataset = generateToyDataset(3, nprototypes)
# The handwritten digits dataset
# dataset = generateHandWrittenDataset(nprototypes, classes = ['C', 'W', 'S', 'O', '1', '2', '6'])
# The synthetic control dataset.
dataset = generateSynthicControlDS(nprototypes)
nclasses = dataset.nclasses # Number of classes 
trials = 50 # Number of trials of the experiment
exp = Experiment(dataset, nclasses, nprototypes, variants)

In [None]:
exp.run_experiment(trials)

In [None]:
exp.get_stats(to_print=True, save_csv=True)
plot_graph(exp, [variants])

In [None]:
hypothesis_testing(exp, [(org, Variant("Variant 1", 1.0)), (org, Variant("Variant 2", 1.0)),
                         (org, Variant("Variant 3", 1.0)), (Variant("Variant 1", 1.0), Variant("Variant 2", 1.0)),
                         (Variant("Variant 1", 1.0), Variant("Variant 3", 1.0)),
                         (Variant("Variant 2", 1.0), Variant("Variant 3", 1.0))])