<a href="https://colab.research.google.com/github/rachida-saroui/twi-ksvd/blob/main/code.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

## Import

In [2]:
import numpy as np
from scipy.io import loadmat
import matplotlib.pyplot as plt
from tqdm import tqdm
from scipy.linalg import svd

In [1]:
!wget -O character_trajectories.zip "https://github.com/rachida-saroui/twi-ksvd/raw/main/Datasets/character%2Btrajectories.zip"

# Unzip
!unzip -o character_trajectories.zip -d character_trajectories

--2024-12-17 09:33:33--  https://github.com/rachida-saroui/twi-ksvd/raw/main/Datasets/character%2Btrajectories.zip
Resolving github.com (github.com)... 20.27.177.113
Connecting to github.com (github.com)|20.27.177.113|:443... connected.
HTTP request sent, awaiting response... 302 Found
Location: https://raw.githubusercontent.com/rachida-saroui/twi-ksvd/main/Datasets/character%2Btrajectories.zip [following]
--2024-12-17 09:33:34--  https://raw.githubusercontent.com/rachida-saroui/twi-ksvd/main/Datasets/character%2Btrajectories.zip
Resolving raw.githubusercontent.com (raw.githubusercontent.com)... 185.199.108.133, 185.199.109.133, 185.199.110.133, ...
Connecting to raw.githubusercontent.com (raw.githubusercontent.com)|185.199.108.133|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 7915950 (7.5M) [application/zip]
Saving to: ‘character_trajectories.zip’


2024-12-17 09:33:35 (18.2 MB/s) - ‘character_trajectories.zip’ saved [7915950/7915950]

Archive:  character_t

## Usefull functions

In [3]:
def preprocess_character_data(data_path, target_char='a'):
    """
    Preprocess the multivariate character trajectory data for a specific character.

    """
    mat_data = loadmat(data_path)
    trajectories = mat_data['mixout'][0]
    charlabels = mat_data['consts']['charlabels'][0][0].flatten()
    key = mat_data['consts']['key'][0][0][0]  # Additional [0] to get the inner array

    # Find the label for target character
    target_char = target_char.upper()
    char_idx = None

    # Print first few characters for debugging
    print("Available characters:")
    print([k[0] for k in key[:5]])

    # Find matching character
    for i, k in enumerate(key):
        if k[0].upper() == target_char:
            char_idx = i + 1  # Labels are 1-based
            break

    if char_idx is None:
        raise ValueError(f"Character '{target_char}' not found in dataset")

    print(f"Found character {target_char} at index {char_idx}")

    # Select trajectories for target character
    selected_trajectories = []
    sequence_lengths = []

    for traj, label in zip(trajectories, charlabels):
        if label == char_idx:
            # Each trajectory is 3xT where T is the sequence length
            selected_trajectories.append(traj)
            sequence_lengths.append(traj.shape[1])

    treated_select_trajectories = []
    treated_sequence_lengths = []

    for i in range(len(selected_trajectories)):
        traj = selected_trajectories[i]
        treat_traj = []
        for j in range(traj.shape[1]):

            if np.all(traj[:,j] == 0) == False:
                treat_traj.append(traj[:,j])

        treated_sequence_lengths.append(len(treat_traj))
        treated_select_trajectories.append(np.transpose(np.array(treat_traj)))



    print(f"Selected {len(selected_trajectories)} trajectories")
    if len(selected_trajectories) == 0:
        raise ValueError(f"No trajectories found for character '{target_char}'")

    return treated_select_trajectories, treated_sequence_lengths

## TWI OMP

In [4]:
def f(inner, normx, normy):

    return inner/np.sqrt(normx*normy)

def costw(x, y):

    Tx = x.shape[0]
    Ty = y.shape[0]


    M = np.zeros((Tx,Ty), dtype=tuple)
    #M[i,j]  = (<x_i,y_j>, norm(x_i)^2, norm(y_j)^2)

    path=[]

    #Init the matrix to compute the costw
    M[0,0]=(x[0]*y[0], x[0]**2, y[0]**2)

    for i in range(1,Tx):

        inner, nx, ny = M[i-1,0]
        M[i,0] = (inner + x[i]*y[0], nx+x[i]**2, ny+y[0]**2)

    for j in range(1,Ty):

        inner, nx, ny = M[0,j-1]
        M[0,j] = (inner + x[0]*y[j], nx+x[0]**2, ny+y[j]**2)

    for i in range(1, Tx):
        for j in range(1,Ty):

            inner_j1, normx_j1, normy_j1 = M[i,j-1]
            inner_i1, normx_i1, normy_i1 = M[i-1,j]
            inner_ij1, normx_ij1, normy_ij1 = M[i-1,j-1]

            #possible choices for the i,j case of M
            tuple_choice = [(inner_j1 + x[i]*y[j], normx_j1 + x[i]**2, normy_j1 + y[j]**2),
                            (inner_i1 + x[i]*y[j], normx_i1 + x[i]**2, normy_i1 + y[j]**2),
                            (inner_ij1 + x[i]*y[j], normx_ij1 + x[i]**2, normy_ij1 + y[j]**2)]

            #Maximise the costw
            index_choose = np.argmax([f(*tuple_choice[0]),f(*tuple_choice[1]),f(*tuple_choice[2])])

            M[i,j]=tuple_choice[index_choose]

    #distance between the 2 series
    costw_dist = f(*M[Tx-1,Ty-1])

    i = Tx - 1
    j = Ty - 1


    #Determine the path by maximize the cost
    path.append((i,j))

    while i>0 or j>0:

        if i==0:
            j-=1
        elif j==0:
            i-=1
        else :
            choices = [f(*M[i,j-1]),f(*M[i-1,j]),f(*M[i-1,j-1])]
            index = np.argmax(choices)

            if index==0:
                j-=1
            elif index==1:
                i-=1
            else:
                i-=1
                j-=1
        path.append((i,j))

    path.reverse()

    return costw_dist, path

def twi_omp(x, D, tau):

    K = len(D)
    alpha = alpha = np.zeros(K) #Number of atom in D
    delta = [] #list of delta matrix with differents size
    index_atom_choosed = []
    alpha_inter = []
    representative_dictionary =[]

    residual = np.copy(x)

    #Create all delta matrix with zeros. It will be update during the algorithm
    for j in range(K):
        delta_j = np.zeros((len(x), D[j].shape[0]))
        delta.append(delta_j)

    for i in range(tau):

        best_costw = 0
        best_path = []
        index = 0
        #Determine the best atom to reconstruct the residual
        for j in range(K):

            if j not in index_atom_choosed:
                costw_dist, path = costw(residual,D[j])

                if costw_dist > best_costw:
                    best_costw = costw_dist
                    best_path = path
                    index = j

        #Set delta_j matrix to encode the path
        delta_j = np.zeros((len(x), len(D[index])))
        index_atom_choosed.append(index)

        for k in range(len(best_path)):
            (u,v)=best_path[k]
            delta_j[u,v] = 1

        #Update delta
        delta[index] = np.copy(delta_j)

        #Compute sibling atom
        ds_j = delta_j @ np.array(D[index])

        #Classic OMP
        representative_dictionary.append(ds_j)

        representative_dictionary_array = np.transpose(np.array(representative_dictionary))

        # alpha_inter = np.linalg.pinv(np.transpose(representative_dictionary_array)@representative_dictionary_array)@(np.transpose(representative_dictionary_array)@x)

        alpha_inter = np.linalg.pinv(representative_dictionary_array)@x

        residual = x - representative_dictionary_array@alpha_inter

    for i in range(tau):
        index = index_atom_choosed[i]
        alpha[index]= alpha_inter[i]

    return alpha, delta


## TWI kSVD

In [5]:
def rotation(a, b, c):
    if b.shape[0] != c.shape[0]:
        raise ValueError(f"Shape mismatch: b.shape = {b.shape}, c.shape = {c.shape}")

    b_norme = b / np.linalg.norm(b)
    c_norme = c / np.linalg.norm(c)
    u = b_norme
    v = c - np.dot(u, c) * u
    v = v / np.linalg.norm(v)
    cos_theta = np.dot(u, c_norme)
    sin_theta = np.sqrt(1 - cos_theta**2)
    R_2d = np.array([[cos_theta, -sin_theta], [sin_theta, cos_theta]])
    n = len(a)
    uv = np.transpose(np.vstack((u, v)))
    rotation_matrix = np.eye(n) - np.inner(u, u) - np.inner(v,v) + uv@R_2d@np.transpose(uv)
    a_r = rotation_matrix @ a
    return a_r

def compute_error_matrix(omega_k, X, D, A, Delta, k):

    K = len(D)
    ei_list = []
    E_k = []

    for n in range(len(omega_k)):
        i = omega_k[n]
        reconstruct = np.zeros((X[i].shape[0]))
        # delta = Delta[i]
        #recontstruct the sample i without the kth atom
        for j in range(K):
            if j != k:
                reconstruct += np.dot(Delta[i][j],D[j])*A[j,i]

        e_i = np.array(X[i] - reconstruct)
        delta_ik = Delta[i][k]

        dSi_k = np.dot(delta_ik,D[k])

        phi_ei = rotation(np.dot(np.transpose(delta_ik),e_i), np.dot(np.transpose(delta_ik),dSi_k), D[k])

        ei_list.append(e_i)
        E_k.append(phi_ei)

    return np.array(E_k).T , ei_list

def update_atom(E_k, ei_list, omega_k, Delta, D, k):

    #SVD on E_k to determine u1
    U, S, Vt = svd(E_k, full_matrices=False)
    u1 = U[:, 0]
    dk_new = u1
    d_k_si_update = []
    alpha_ki_update = []

    for n in range(len(omega_k)):

        i=omega_k[n]
        e_i = ei_list[n]

        delta_ik = Delta[i][k]
        dSi_k = np.dot(delta_ik, D[k])

        rotate_vector = rotation(u1, D[k], np.dot(np.transpose(delta_ik),dSi_k))

        gamma_i_u1 = np.dot(delta_ik,rotation(u1, D[k], np.dot(np.transpose(delta_ik),dSi_k)))

        d_k_si_update.append(gamma_i_u1)

        alpha_ki_update.append(np.dot(e_i,gamma_i_u1)/np.linalg.norm(gamma_i_u1))


    return dk_new, d_k_si_update, np.array(alpha_ki_update)

def norm_residual(A, D, X, Delta):
    """
    Compute the norm of each sample reconstruction and mean them for TWI-KSVD algorithm.
    """
    residual_norm_sample = np.zeros(len(X))

    for i in range(len(X)):
        x = X[i]
        x_reconstructed = np.zeros(x.shape[0])

        for j in range(len(D)):
            x_reconstructed += np.dot(Delta[i][j],D[j])*A[j,i]

        error = x - x_reconstructed
        residual_norm_sample[i] = np.linalg.norm(error)

    return np.linalg.norm(residual_norm_sample)




def twi_ksvd(X, D, tau, max_iter=100, threashold=1):

    N = len(X)
    K = len(D)
    A = np.zeros((K, N))
    Delta = np.zeros(N, dtype=np.ndarray)

    residual_norms = []


    for _ in tqdm(range(max_iter), desc="TWI-KSVD Iterations"):
        for i in tqdm(range(N), desc="Sparse Coding", leave=False):
            A[:, i], Delta[i] = twi_omp(X[i], D, tau)

        for k in tqdm(range(K), desc="Dictionary Update", leave=False):
            omega_k = np.nonzero(A[k, :])[0]
            if len(omega_k) == 0:
                continue
            E_k, ei_list = compute_error_matrix(omega_k, X, D, A, Delta, k)
            dk_new, dSi_k_update, alpha_ki_update = update_atom(E_k, ei_list, omega_k, Delta, D, k)
            D[k] = np.copy(dk_new)

            for n in range(len(omega_k)):

                i=omega_k[n]
                A[k,i] = alpha_ki_update[n]

        residual_norms.append(norm_residual(A, D, X, Delta))
        print(residual_norms)

        if residual_norms[-1] < threashold:
            break


    return A, Delta, D, residual_norms

## Main

In [6]:
data_path = 'character_trajectories/mixoutALL_shifted.mat'

# Load and preprocess data
trajectories, sequence_lengths = preprocess_character_data(data_path, 'O')
print(f"Processed {len(trajectories)} samples of letter 'O'")

# Print shape information for debugging
print(f"First trajectory shape: {trajectories[0].shape}")

X_traj = []
Y_traj = []


#Create the list of trajectory for x_positions and y_positions
for i in range(len(trajectories)-1):

    X_traj.append(trajectories[i][0,:])
    Y_traj.append(trajectories[i][1,:])


# Initialize dictionary with random multivariate atoms
K = 50

q = 50 #length of atoms (can be different)

#Dictionnaries for each coordinates
D_x = []
D_y = []

# Initialize the 2 dictionnaries
for j in range(K):

    x_atom = np.random.randn(q)
    norm_x = np.linalg.norm(x_atom)

    y_atom = np.random.randn(q)
    norm_y = np.linalg.norm(y_atom)

    D_x.append(x_atom/norm_x)
    D_y.append(y_atom/norm_y)

# TWI-KSVD parameters
tau = 15
max_iter = 20


A_x, Delta_x, D_x, residual_norms_x = twi_ksvd(X_traj[0:15], D_x, tau, max_iter)

# x_traj_test = trajectories[-1][0:]

# alpha_test, delta_test = twi_omp(x_traj_test, , tau)

x = X_traj[0]
# Reconstruct trajectory
reconstructed = np.zeros_like(x)
for j in range(K):
    if A_x[j,0] != 0:
        reconstructed += np.dot(Delta_x[0][j],D_x[j])*A_x[j,0]

plt.plot(x)
plt.plot(reconstructed)

Available characters:
['a', 'b', 'c', 'd', 'e']
Found character O at index 11
Selected 141 trajectories
Processed 141 samples of letter 'O'
First trajectory shape: (3, 95)


TWI-KSVD Iterations:   0%|          | 0/20 [00:00<?, ?it/s]
Sparse Coding:   0%|          | 0/15 [00:00<?, ?it/s][A
Sparse Coding:   7%|▋         | 1/15 [00:50<11:53, 50.95s/it][A
Sparse Coding:  13%|█▎        | 2/15 [01:46<11:39, 53.79s/it][A
Sparse Coding:  20%|██        | 3/15 [02:38<10:34, 52.84s/it][A
TWI-KSVD Iterations:   0%|          | 0/20 [03:07<?, ?it/s]


KeyboardInterrupt: 