# Character Trajectories dataset

Link to [Official repository](https://archive-beta.ics.uci.edu/dataset/175/character+trajectories) on UC Irvine


### Data description:

Multiple, labelled samples of pen tip trajectories recorded whilst writing individual characters. All samples are from the same writer, for the purposes of primitive extraction. Only characters with a single pen-down segment were considered.

Each character sample is a 3-dimensional pen tip velocity trajectory. This is contained in matrix format, with 3 rows and T columns where T is the length of the character sample.

The characters here were used for a PhD study on primitive extraction using HMM based models. The data consists of 2858 character samples, contained in the cell array 'mixout'. The struct variable 'consts' contains a field consts.charlabels which provides ennummerated labels for the characters. consts.key provides the key for each label. The data was captured using a WACOM tablet. 3 Dimensions were kept - x, y, and pen tip force. The data has been numerically differentiated and Gaussian smoothed, with a sigma value of 2. Data was captured at 200Hz. The data was normalised with consts.datanorm. Only characters with a single 'PEN-DOWN' segment were considered. Character segmentation was performed using a pen tip force cut-off point. The characters have also been shifted so that their velocity profiles best match the mean of the set.

## Download database

Run the cell below to collect the archives in your folder.

Safe cell that will create a folder `../trajectory_dataset` (), download and overwrite the files in this folder.

In [None]:
import os 
import requests

if not os.path.exists("../trajectory_dataset"):
    os.mkdir("../trajectory_dataset")
    r = requests.get("https://archive.ics.uci.edu/ml/machine-learning-databases/character-trajectories/mixoutALL_shifted.mat")
    open('../trajectory_dataset/mixoutALL_shifted.mat', 'wb').write(r.content)
    r = requests.get(" https://archive.ics.uci.edu/ml/machine-learning-databases/character-trajectories/trajectories.names")

    open('../trajectory_dataset/trajectories.names', 'wb').write(r.content)
# ! wget -O ../trajectory_dataset/mixoutALL_shifted.mat https://archive.ics.uci.edu/ml/machine-learning-databases/character-trajectories/mixoutALL_shifted.mat
# ! wget -O ../trajectory_dataset/trajectories.names https://archive.ics.uci.edu/ml/machine-learning-databases/character-trajectories/trajectories.names

## Interacting with data

In [None]:
from scipy.io import loadmat
import numpy as np

# load data in a python dictionnary
mat = loadmat('../trajectory_dataset/mixoutALL_shifted.mat')

mat.keys()

In [None]:
# Information about the dataset

n_samples = mat["mixout"][0].__len__()

print(f"Number of samples:\t {n_samples}")


# Loading samples labels
keys = mat["consts"][0,0][4]
keys = [key - 1 for key in keys[0]]

labels = mat["consts"][0,0][3]
labels = np.array([label[0] for label in labels[0]])

samples_label = labels[[keys]][0]

label_unique, label_count = np.unique(samples_label, return_counts=True)

print()
print("Data distribution:")
print(f"{len(label_unique)} different class")
line_u = "_______" + str.join("_", ["___" for _ in label_count]) + "_"
print(line_u)
char = "|CHAR  | " + str.join(" | ", label_unique) + " |"
print(char)
line_d = "|______|" + str.join("|", ["___" for _ in label_count]) + "|"
print(line_d)
count = "|COUNT |" + str.join("|", [str(c) for c in label_count]) + "|"
print(count)
print(line_d)


samples = mat["mixout"][0]

f_s = 200 #Hz sampling at 200Hz

print(f"\n\nN_features: \t\t{samples[0].shape[0]} ---> \t(v_x, v_y, pen_tip_force)")
print(f"Sampling frequency: \t{f_s} Hz")


In [None]:
import matplotlib.pyplot as plt

rdm_sample_idx = np.random.randint(0, len(samples), 10)

for i, i_sample in enumerate(rdm_sample_idx):

    exsample = samples[i_sample]

    x_speed = exsample[0]
    y_speed = exsample[1]
    pen_tip_f = exsample[2]
    

    # Conversion to trajectory beggining at point (0,0)
    x = [0]
    y = [0]

    for x_v, y_v in zip(x_speed, y_speed):
        x.append(x[-1] + x_v/f_s)
        y.append(y[-1] + y_v/f_s)

    plt.subplot(2, 5, i+1)
    plt.scatter(x[:], y[:], c=np.arange(len(x)), s=5)

    # To modulate the linewidth depending on the pen tip force 
    # plt.scatter(x[1:], y[1:], c=np.arange(len(x[1:])), s=2*np.abs(pen_tip_f))
    
    plt.plot(x, y, alpha=0.5)
    
    plt.title(samples_label[i_sample])

    plt.axis('equal')
    plt.axis('off')
    #plt.colorbar()

plt.suptitle("Characters samples (start in indigo, end in yellow)")
plt.show()

for i, i_sample in enumerate(rdm_sample_idx):

    exsample = samples[i_sample]

    pen_tip_f = exsample[2]

    plt.subplot(2, 5, i+1)
    plt.plot(np.arange(len(pen_tip_f)), pen_tip_f)
    plt.title(samples_label[i_sample])

plt.suptitle("Pen tip force curves")
plt.show()

for i, i_sample in enumerate(rdm_sample_idx):

    exsample = samples[i_sample]

    x_speed = exsample[0]

    plt.subplot(2, 5, i+1)
    plt.plot(np.arange(len(x_speed)), x_speed)
    plt.title(samples_label[i_sample])

plt.suptitle("X speed (+ = right, - = down)")
plt.show()

for i, i_sample in enumerate(rdm_sample_idx):

    exsample = samples[i_sample]

    y_speed = exsample[1]

    plt.subplot(2, 5, i+1)
    plt.plot(np.arange(len(y_speed)), y_speed)
    plt.title(samples_label[i_sample])

plt.suptitle("Y speed (+ = up, - = left)")
plt.show()

for i, i_sample in enumerate(rdm_sample_idx):

    exsample = samples[i_sample]

    x_speed = exsample[0]
    y_speed = exsample[1]
    pen_tip_f = exsample[2]
    

    # Conversion to trajectory beggining at point (0,0)
    x = [0]
    y = [0]

    for x_v, y_v in zip(x_speed, y_speed):
        x.append(x[-1] + x_v/f_s)
        y.append(y[-1] + y_v/f_s)

    plt.subplot(2, 5, i+1)
    plt.plot(np.arange(len(x)), x)
    plt.title(samples_label[i_sample])

plt.suptitle("X position")
plt.show()


for i, i_sample in enumerate(rdm_sample_idx):

    exsample = samples[i_sample]

    x_speed = exsample[0]
    y_speed = exsample[1]
    pen_tip_f = exsample[2]
    

    # Conversion to trajectory beggining at point (0,0)
    x = [0]
    y = [0]

    for x_v, y_v in zip(x_speed, y_speed):
        x.append(x[-1] + x_v/f_s)
        y.append(y[-1] + y_v/f_s)

    plt.subplot(2, 5, i+1)
    plt.plot(np.arange(len(y)), y)
    plt.title(samples_label[i_sample])

plt.suptitle("Y position")
plt.show()

## TWI-kSVD on char-traj data

In [None]:
x_speeds = []
y_speeds = []
pen_tip_forces = []
max_len = 0
min_len = 1000

for sample in samples:
    x_speeds.append(sample[0])
    y_speeds.append(sample[1])
    pen_tip_forces.append(sample[2])
    
    if len(sample[0]) > max_len:
        max_len = len(sample[0])
    
    if len(sample[0]) < min_len:
        min_len = len(sample[0])


In [None]:
print(f"Longueur minimale : {min_len}")
print(f"Longueur maximale : {max_len}")

In [None]:
for i, x_speed in enumerate(x_speeds[:2]):

    plt.subplot(2, 1, i+1)
    plt.plot(np.arange(len(x_speed)), x_speed)
    plt.title(samples_label[i])

In [None]:
samples_label.shape


In [None]:
from sklearn.model_selection import train_test_split
from twi_ksvd.ksvd import TWI_kSVD
from twi_ksvd.omp import TWI_OMP

from multiprocessing import Pool
from tqdm import tqdm


def eval_count(x, D_list, tau, r_window):
    alphas, _ = TWI_OMP(x, D_list, tau=tau, r_window=r_window)

    return alphas != 0


def train_label(K, tau, max_iter, label, train_idx, test_idx, r_window):
    """
    Train a dictionnary for the given label
    """

    # x training
    x_train = [x_speeds[i] for i in train_idx if samples_label[i] == label]
    x_test = [x_speeds[i] for i in test_idx if samples_label[i] == label]

    model_x = TWI_kSVD( K, max_iter=max_iter)
    D_list = []

    dico_idx = np.random.choice(np.arange(len(x_train)), K)
    for i in range(K):
        D_list.append(x_train[dico_idx[i]])

    _, Dx = model_x.fit(x_train, D_list, tau, r_window=r_window)

    counts_test = np.zeros(len(Dx), dtype=int)

    with Pool(processes=None) as pool:
        multiple_results = [pool.apply_async(eval_count, (x,Dx,tau, r_window)) for x in x_test]

        for i in tqdm(range(len(x_test))):
            c = multiple_results[i].get()
            counts_test += c

    final_x_dico = [atom for i, atom in enumerate(Dx) if counts_test[i] > 0]


    # y training
    y_train = [y_speeds[i] for i in train_idx if samples_label[i] == label]
    y_test = [y_speeds[i] for i in test_idx if samples_label[i] == label]

    model_y = TWI_kSVD( K, max_iter=max_iter)
    D_list = []

    dico_idx = np.random.choice(np.arange(len(y_train)), K)
    for i in range(K):
        D_list.append(y_train[dico_idx[i]])

    _, Dy = model_x.fit(y_train, D_list, tau, r_window=r_window)

    counts_test = np.zeros(len(Dy), dtype=int)

    with Pool(processes=None) as pool:
        multiple_results = [pool.apply_async(eval_count, (y,Dy,tau, r_window)) for y in y_test]

        for i in tqdm(range(len(y_test))):
            c = multiple_results[i].get()
            counts_test += c

    final_y_dico = [atom for i, atom in enumerate(Dy) if counts_test[i] > 0]

    return final_x_dico, final_y_dico




def train_dicos(K=8, tau=2, max_iter=30, labels=np.unique(samples_label), save_path = "../trajectory_dataset/dictionnary/", r_window=20):
    """
    Train a dictionnary on training set for every letter
    """

    print("Labels : ", labels)
    train_idx, test_idx = train_test_split(np.arange(len(x_speeds)), test_size=0.3, stratify=samples_label)

    for label in labels:

        print("Training for label " + label)

        final_x_dico, final_y_dico = train_label(K, tau, max_iter, label, train_idx, test_idx, r_window)

        print(f"For label {label}, {len(final_x_dico)} x atoms and {len(final_y_dico)} y atoms")
        

        plt.subplot(1, 2, 1)
        for atom in final_x_dico:
            plt.plot(np.arange(len(atom)), atom)

        plt.title("X_speeds / label " + label)

        plt.subplot(1, 2, 2)    
        for atom in final_y_dico:
            plt.plot(np.arange(len(atom)), atom)

        plt.title("Y_speeds / label " + label)
        plt.show()

        
        for i, atom in enumerate(final_x_dico):
            os.makedirs(save_path + "/" + label + "/x_dico", exist_ok=True)
            np.save(save_path + "/" + label + "/x_dico/atom_" + str(i) + ".npy", atom)

        for i, atom in enumerate(final_y_dico):
            os.makedirs(save_path + "/" + label + "/y_dico", exist_ok=True)
            np.save(save_path + "/" + label + "/y_dico/atom_" + str(i) + ".npy", atom)

        print("Atoms saved in " + save_path + "/" + label)


In [None]:
#train_dicos(K=8, tau=2, max_iter=30, labels=np.unique(samples_label)[-1:], save_path="../trajectory_dataset/23_03_2023/", r_window=20)

In [None]:


train_idx, test_idx = train_test_split(np.arange(len(x_speeds)), test_size=0.3, stratify=samples_label)

x_train = [x_speeds[i][::2] for i in train_idx if samples_label[i] == 'a']
x_test = [x_speeds[i][::2] for i in test_idx if samples_label[i] == 'a']

y_train = [y_speeds[i][::2] for i in train_idx if samples_label[i] == 'a']
y_test = [y_speeds[i][::2] for i in test_idx if samples_label[i] == 'a']

In [None]:
from twi_ksvd.ksvd import TWI_kSVD
from scipy import signal

# Au maximum 20 atomes pour décrire tous les signaux
K = 5
model_x = TWI_kSVD( K, max_iter=30)
# En décrivant chaque signal par au plus 5 atomes
tau = 2

D_list = []

dico_idx = np.random.choice(np.arange(len(x_train)), K)
for i in range(K):
    D_list.append(x_train[dico_idx[i]])

    # atom_length = 50  #(min_len + (i  * (max_len - min_len)) // (K-1)) // 4

    # phase = 2* np.pi * np.random.random(1)

    # window = signal.windows.hamming(atom_length)

    # window = window - np.min(window)
    
    # t = np.arange(atom_length) * np.random.random(1) / 5

    # D_list.append(np.cos(t+phase)*window)

In [None]:
for i, atom in enumerate(D_list):

    plt.plot(np.arange(len(atom)), atom, label=f"{i}")

plt.title("Initial random cosine atoms")
plt.legend()
plt.show()
    

In [None]:
Ax,Dx = model_x.fit(x_train, D_list, tau, r_window=5)

In [None]:
Ax, Dx = model_x.alphas, model_x.D

In [None]:
# Au maximum 20 atomes pour décrire tous les signaux
K = 10
model_y = TWI_kSVD( K, max_iter=30)
# En décrivant chaque signal par au plus 5 atomes
tau = 2

D_list = []

dico_idx = np.random.choice(np.arange(len(y_train)), K)
for i in range(K):
    D_list.append(y_train[dico_idx[i]])

In [None]:

Ay,Dy = model_y.fit(y_train, D_list, tau, r_window=20)

In [None]:
Ay, Dy = model_y.alphas, model_y.D

In [None]:
from multiprocessing import Pool
from tqdm import tqdm

def eval_count(x, D_list, tau, r_window):
    alphas, _ = TWI_OMP(x, D_list, tau=tau, r_window=20)

    return alphas != 0

counts_train = np.zeros(len(Dx), dtype=int)

with Pool(processes=None) as pool:
    multiple_results = [pool.apply_async(eval_count, (x,Dx,tau, 20)) for x in x_train]

    for i in tqdm(range(len(x_train))):
        c = multiple_results[i].get()
        counts_train += c


counts_test = np.zeros(len(Dx), dtype=int)

with Pool(processes=None) as pool:
    multiple_results = [pool.apply_async(eval_count, (x,Dx,tau, 20)) for x in x_test]

    for i in tqdm(range(len(x_test))):
        c = multiple_results[i].get()
        counts_test += c

print(counts_test + counts_train)
np.sum(counts_test + counts_train) == tau * (len(x_train) + len(x_test))

In [None]:
print(counts_test)
print(counts_train)

In [None]:
for i, atom in enumerate(Dx):
    if counts_test[i] > 0:
        plt.plot(np.arange(len(atom)), atom, label=f"Atom {i} (counts = {counts_test[i]})")

plt.title("Final random cosine atoms (x_speeds)")
plt.legend()
plt.show()


In [None]:
from multiprocessing import Pool
from tqdm import tqdm

def eval_count(x, D_list, tau, r_window):
    alphas, _ = TWI_OMP(x, D_list, tau=tau, r_window=20)

    return alphas != 0

counts_train = np.zeros(len(Dy), dtype=int)

with Pool(processes=None) as pool:
    multiple_results = [pool.apply_async(eval_count, (y,Dy,tau, 20)) for y in y_train]

    for i in tqdm(range(len(y_train))):
        c = multiple_results[i].get()
        counts_train += c


counts_test = np.zeros(len(Dy), dtype=int)

with Pool(processes=None) as pool:
    multiple_results = [pool.apply_async(eval_count, (y,Dy,tau, 20)) for y in y_test]

    for i in tqdm(range(len(y_test))):
        c = multiple_results[i].get()
        counts_test += c

print(counts_test + counts_train)
np.sum(counts_test + counts_train) == tau * (len(y_train) + len(y_test))

In [None]:
for i, atom in enumerate(model_y.D):
    if counts_test[i] > 0:
        plt.plot(np.arange(len(atom)), atom, label=f"Atom {i} (counts = {counts_test[i]})")

plt.title("Final random cosine atoms (y_speeds)")
plt.legend()
plt.show()
    

In [None]:
from twi_ksvd.omp import TWI_OMP


x_test
for p, x in enumerate([x_train[i] for i in np.random.randint(0, len(x_train), 2)] + [x_test[i] for i in np.random.randint(0, len(x_test), 2)]):
    alphas, deltas = TWI_OMP(x, Dx, tau=tau, r_window=20)

    reconstructed_x_signal = np.zeros_like(x)

    plt.subplot(4, 2, 2*p + 1)
    for alpha, delta, atom in zip(alphas, deltas, Dx):
        if alpha != 0:
            reconstructed_x_signal += alpha * delta @ atom
            plt.plot(atom)
    
    #plt.plot(reconstructed_x_signal)
    plt.subplot(4, 2, 2*p + 2)
    plt.plot(x)
    plt.plot(reconstructed_x_signal)

In [None]:
for p, y in enumerate([y_train[i] for i in np.random.randint(0, len(y_train), 2)] + [y_test[i] for i in np.random.randint(0, len(y_test), 2)]):
    alphas, deltas = TWI_OMP(y, Dy, tau=tau, r_window=20)

    reconstructed_y_signal = np.zeros_like(y)

    plt.subplot(4, 2, 2*p + 1)
    for alpha, delta, atom in zip(alphas, deltas, Dy):
        if alpha != 0:
            reconstructed_y_signal += alpha * delta @ atom
            plt.plot(atom)
    
    plt.subplot(4, 2, 2*p + 2)
    plt.plot(y)
    plt.plot(reconstructed_y_signal)

In [None]:
for p, (xs,ys) in enumerate([(x_train[i], y_train[i]) for i in np.random.randint(0, len(x_train), 2)] + [(x_test[i], y_test[i]) for i in np.random.randint(0, len(x_test), 2)]):

    #x speeds reconstruction
    alphas, deltas = TWI_OMP(xs, Dx, tau=tau, r_window=20)

    reconstructed_x_signal = np.zeros_like(xs)

    for alpha, delta, atom in zip(alphas, deltas, Dx):
        if alpha != 0:
            reconstructed_x_signal += alpha * delta @ atom
    
    idx = np.argmax(alphas)
    first_atom_x = alphas[idx] * deltas[idx] @ Dx[idx]
    
    # y reconstruction
    alphas, deltas = TWI_OMP(ys, Dy, tau=tau, r_window=20)

    reconstructed_y_signal = np.zeros_like(ys)

    for alpha, delta, atom in zip(alphas, deltas, Dy):
        if alpha != 0:
            reconstructed_y_signal += alpha * delta @ atom
            #plt.plot(alpha * delta @ atom)
    
    idx = np.argmax(alphas)
    first_atom_y = alphas[idx] * deltas[idx] @ Dy[idx]

    exsample = samples[i]

    x_speed = reconstructed_x_signal
    y_speed = reconstructed_y_signal
    pen_tip_f = exsample[2]
    

    # Conversion to trajectory beggining at point (0,0)
    x = [0]
    y = [0]
    f_x = [0]
    f_y = [0]
    x_t = [0]
    y_t = [0]

    for x_v, y_v in zip(x_speed, y_speed):
        x.append(x[-1] + x_v/f_s)
        y.append(y[-1] + y_v/f_s)
    
    for x_v, y_v in zip(exsample[0], exsample[1]):
        x_t.append(x_t[-1] + x_v/f_s)
        y_t.append(y_t[-1] + y_v/f_s)
    
    for x_v, y_v in zip(first_atom_x, first_atom_y):
        f_x.append(f_x[-1] + x_v/f_s)
        f_y.append(f_y[-1] + y_v/f_s)


    plt.subplot(4, 3, 3*p + 1)
    plt.plot(x, y, "b")
    plt.axis('equal')
    plt.axis('off')
    plt.ylabel(f"Sample {p}")
    if p == 0:
        plt.title("Reconstruction")

    plt.subplot(4, 3, 3*p + 2)
    plt.plot(f_x, f_y, "r")
    plt.axis('equal')
    plt.axis('off')
    if p == 0:
        plt.title("First atom")

    plt.subplot(4, 3, 3*p + 3)
    plt.plot(x_t, y_t, "g")
    plt.axis('equal')
    plt.axis('off')
    if p == 0:
        plt.title("Original")

In [None]:
for i, atom in enumerate(Dx):
    os.makedirs("../trajectory_dataset/x_dictionnary", exist_ok=True)
    np.save("../trajectory_dataset/x_dictionnary/atom_" + str(i) + ".npy", atom)

for i, atom in enumerate(Dy):
    os.makedirs("../trajectory_dataset/y_dictionnary", exist_ok=True)
    np.save("../trajectory_dataset/y_dictionnary/atom_" + str(i) + ".npy", atom)