# Empirical validation of property 2

This notebook is dedicated to an empirical validation of the following property from the MAD paper:

Property 2

Let $X$ and $X^\prime$ be datasets each composed of $n$ time series, and let us assume uniform weights, i.e. $w=w^\prime=(1/n, \cdots, 1/n)$.
There exists a transport plan solution to the MAD (resp. $|\mathcal{C}|$-MAD) problem that is a one-to-one matching, i.e. each sample from $X$ is matched to exactly one sample in $X^\prime$ (and conversely).

Emprical validation:

Because $X$ and $X^\prime$ are composed of the same number of series $n$, property 2 implies that the transport plan has only $n$ non-zero elements. Moreover, because of the uniform weights, each series must have at least one matching.

In [1]:
import numpy.random as npr
import numpy as np
import ot
import tslearn.metrics as tslm
import matplotlib.pyplot as plt


2023-03-31 14:15:27.805529: I tensorflow/core/platform/cpu_feature_guard.cc:193] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN) to use the following CPU instructions in performance-critical operations:  AVX2 FMA
To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags.


Numpy (and abriege) version of MAD

In [2]:
# The MAD algorithm

class MAD:
    def __init__(self, X, Y):

        self.X = X
        self.Y = Y
        self.shapeX = X.shape
        self.shapeY = Y.shape
        self.classe = np.zeros((self.shapeX[0], 1), dtype=int)
        self.classe_unique = np.unique(self.classe)
        self.Xa_one = np.ones(self.shapeX[0]) / self.shapeX[0]
        self.Ya_one = np.ones(self.shapeY[0]) / self.shapeY[0]
        self.OT_tilde = self.init_OT_matrix()

        self.tab_idx = []
        self.dist_OT = []
        self.pi_DTW_idx = []
        self.pi_DTW_path_idx = []
        self.Xsquared = []
        self.Xsquared_sum = []
        self.Ysquared = self.Y ** 2
        for cl in self.classe_unique:
            self.tab_idx.append(np.where(self.classe == cl)[0])
            self.pi_DTW_idx.append(self.init_DTW_matrix())
            X2 = self.X[self.tab_idx[cl]] ** 2
            X2_sum = np.dot(self.Xa_one[self.tab_idx[cl]], X2.transpose(1, 0, -1)).sum(-1)
            self.Xsquared.append(X2)
            self.Xsquared_sum.append(X2_sum[:, None])

    def init_OT_matrix(self):
        cost_OT = np.ones((self.shapeX[0], self.shapeY[0]))
        OT_tilde = ot.emd(self.Xa_one, self.Ya_one, cost_OT, numItermax=10000000)
        return OT_tilde

    def init_DTW_matrix(self):
        DTW_matrix = np.zeros((self.shapeX[1], self.shapeY[1]))
        ts = [0, 0]
        indices_table = [[1, 0], [0, 1], [1, 1]]
        while (ts[0] != self.shapeX[1] - 1) or (ts[1] != self.shapeY[1] - 1):
            DTW_matrix[ts[0], ts[1]] = 1
            if ts[0] == self.shapeX[1] - 1:
                indice_moving = 1
            elif ts[1] == self.shapeY[1] - 1:
                indice_moving = 0
            else:
                indice_moving = npr.randint(3)
            ts[0] = ts[0] + indices_table[indice_moving][0]
            ts[1] = ts[1] + indices_table[indice_moving][1]
        DTW_matrix[-1, -1] = 1
        return DTW_matrix

    def mat_cost_OT(self):
        mat_cost = np.zeros(shape=(self.shapeX[0], self.shapeY[0]))

        for cl in self.classe_unique:
            pi_DTW = self.pi_DTW_idx[cl]
            C1 = np.dot(self.Xsquared[cl].transpose(0, -1, 1), np.sum(pi_DTW, axis=1)).sum(-1)
            C2 = np.dot(self.Ysquared.transpose(0, -1, 1), np.sum(pi_DTW.T, axis=1)).sum(-1)
            C3 = np.tensordot(np.dot(self.X[self.tab_idx[cl]].transpose(0, -1, 1), pi_DTW), self.Y,
                                axes=([1, 2], [2, 1]))
            res = C1[:, None] + C2[None, :] - 2 * C3
            mat_cost[self.tab_idx[cl]] = res
        mat_cost /= (self.shapeX[1] + self.shapeY[1]) / 2
        return mat_cost

    def mat_dist_DTW(self, classe_it=None):
        if classe_it is None:
            OTc = self.OT_tilde
            Xc = self.X
        else:
            OTc = self.OT_tilde[self.tab_idx[classe_it]]
            Xc = self.X[self.tab_idx[classe_it]]
        C2 = np.dot(OTc.sum(axis=0), self.Ysquared.transpose(1, 0, -1)).sum(-1)
        C31 = np.dot(Xc.T, OTc)
        C32 = np.tensordot(C31, self.Y, axes=([0, 2], [2, 0]))
        res = self.Xsquared_sum[classe_it] + C2[None, :] - 2 * C32
        res /= (self.shapeX[1] + self.shapeY[1]) / 2
        return res

    def path2mat(self, path):
        pi_DTW = np.zeros((self.shapeX[1], self.shapeY[1]))
        for i, j in path:
            pi_DTW[i, j] = 1
        return pi_DTW

    def stopping_criterion(self, last_pi_DTW):
        stop = True
        for cl in self.classe_unique:
            pi_DTW = self.pi_DTW_idx[cl]
            last_DTW = last_pi_DTW[cl]
            if (pi_DTW != last_DTW).any():
                stop = False
        return stop

    def main_training(self, max_init=100, first_step_DTW=True):
        cost = {"Cost": []}
        stop = False
        current_init = 0
        # Begin training
        while stop is not True and current_init < max_init:
            if (current_init != 0) or (first_step_DTW is False):
                Cost_OT = self.mat_cost_OT()
                self.OT_tilde = ot.emd(self.Xa_one, self.Ya_one, Cost_OT, numItermax=1000000)
                score_OT = np.sum(self.OT_tilde * Cost_OT)
                cost["Cost"].append(score_OT)

            dtw_score = 0
            self.pi_DTW_path_idx = []
            total_cost_DTW = []
            for cl in self.classe_unique:
                mat_dist = self.mat_dist_DTW(cl)
                total_cost_DTW.append(mat_dist)
                Pi_DTW_path, dtw_score_prov = tslm.dtw_path_from_metric(mat_dist, metric="precomputed")
                self.pi_DTW_path_idx.append(Pi_DTW_path)
                Pi_DTW_prov = self.path2mat(Pi_DTW_path)
                self.pi_DTW_idx[cl] = Pi_DTW_prov
                dtw_score += dtw_score_prov
            cost["Cost"].append(dtw_score)
            if current_init != 0:
                stop = self.stopping_criterion(last_pi_DTW)
            last_pi_DTW = self.pi_DTW_idx.copy()
            current_init = current_init + 1
        else:
            return self.OT_tilde, self.pi_DTW_idx, Cost_OT, score_OT



Simple test : we count the number of occurences where the property does not hold. Hopefully, the count stays at 0.

In [3]:
npr.seed(10)

different_than_expected = 0
for n in range(2, 400):
    nb_series = n
    nb_times = 20
    nb_feature = 2

    data_X = npr.normal(size=(nb_series, nb_times, nb_feature))
    data_Y = npr.normal(size=(nb_series, nb_times, nb_feature))

    mad = MAD(data_X, data_Y)
    OT, DTW, score, _ = mad.main_training()
    non_zero = np.count_nonzero(OT)
    if non_zero != nb_series:
        different_than_expected += 1
print("There are", different_than_expected, "datasets for which the property is not verified")


There are 0 datasets for which the property is not verified
