In [1]:
import numpy as np
from dtaidistance import dtw

In [2]:
trajs = np.load('stx/2017-01-01 10_00_00.npz', allow_pickle=True)['arr_0']

In [6]:
from multiprocess import Pool, Queue
from multiprocess import Pool

def fill_distance_matrix(first_indices, second_indices, trajs, axis):
    print(f"Worker: {first_indices}, {second_indices}")
    try:
        dist_matrix = np.zeros((len(first_indices), len(second_indices)))
        for i, first_index in enumerate(first_indices):
            for j, second_index in enumerate(second_indices):
                dist_matrix[i, j] = dtw.distance_fast(trajs[first_index, axis, :], trajs[second_index, axis, :])
        return (first_indices, second_indices, dist_matrix)
    except Exception as e:
        print(f"Error in worker: {e}")
        return (first_indices, second_indices, None)

def fill_distance_matrix_parallel(trajs, axis, n_jobs=2):
    
    indices = np.arange(trajs.shape[0])
    indices_split = np.array_split(indices, n_jobs)
    indices_set = [(indices_split[i], indices_split[j]) for i in range(n_jobs) for j in range(n_jobs) if j > i]

    print('Matrix partition complete')
    
    pool = Pool(n_jobs)
    results = pool.starmap(fill_distance_matrix, [(indices_pair[0], indices_pair[1], trajs, axis) for indices_pair in indices_set])
    pool.close()
    pool.join()
    
    dist_matrix = np.zeros((len(trajs), len(trajs)))
    for fi, si, dm in results:
        if dm is not None:
            for i, f_index in enumerate(fi):
                for j, s_index in enumerate(si):
                    dist_matrix[f_index, s_index] = dm[i, j]
    
    return dist_matrix

In [7]:
fill_distance_matrix_parallel(trajs.astype(np.float64), 0, n_jobs=2)

Matrix partition complete
Worker: [0 1], [2]


array([[ 0.        ,  0.        ,  6.27900182],
       [ 0.        ,  0.        , 15.89727094],
       [ 0.        ,  0.        ,  0.        ]])