In [1]:
import numpy as np
import threading
import queue
import time
import matplotlib.pyplot as plt

In [14]:
def hpc_thread_function_w(a, w, h, cross_queues, out_queue, i, n_col):
    my_u = h @ h.T # calculate my piece of u
    u = h @ h.T # keep a running sum (not a reference to my_u!!)
    status = [0 for _ in cross_queues] # 0 means nothing done yet
    while sum(status) < 2*len(cross_queues): # 2 means done
    #while np.all(x == 2 or x == 3 for x in status):
        for j, (q, lock) in enumerate(cross_queues):
            with lock:
                if status[j] == 0 and q.empty():
                    q.put(my_u)
                    status[j] = 1 # 1 means in write-first mode
                elif status[j] == 0 and not q.empty():
                    other_u = q.get()
                    assert q.empty()
                    u += other_u
                    q.put(my_u)
                    q.put(None) # mark that content has been read and re-written by filling queue to 2
                    assert q.full()
                    status[j] = 2 # 2 means done
                elif status[j] == 1 and q.full(): # see that content has been read and re-written
                    other_u = q.get()
                    q.get() # clear the cross-queue
                    assert q.empty()
                    u += other_u
                    status[j] = 2.00001 # 2 means done
    out_queue.put(u)


def HPC_NMF(a, k, p_row, p_col, numIter):
    m, n = np.shape(a)
    if m % (p_row*p_col) > 0:
        raise TypeError('Input first dimension not divisible by number of threads')
    if n % (p_row*p_col) > 0:
        raise TypeError('Input second dimension not divisible by number of threads')
    w = np.random.rand(m, k)
    h = np.random.rand(k, n)

    a_pieces = [np.split(x, p_col, 1) for x in np.split(a, p_row, 0)] # cut a into p_row x p_col pieces of shape m/p_row x n/p_col
    assert np.shape(a_pieces[0][0]) == (int(m/p_row), int(n/p_col))

    for _ in range(numIter):
        u = np.zeros((k, k))
        h_pieces = np.split(h, p_row*p_col, 1) # cut h into p_row*p_col pieces of shape k x n/(p_row*p_col)
        assert np.shape(h_pieces[0]) == (k, int(n/(p_row*p_col)))

        threads_w = []
        cross_queues_w = []
        out_queue_w = queue.Queue(maxsize=p_row*p_col)
        for first in range(p_row*p_col):
            for second in range(first+1, p_row*p_col):
                cross_queues_w.append((first, second, queue.Queue(maxsize=2), threading.Lock())) # assign each pair a unique queue and a unique lock
        for i in range(p_row*p_col): # split into p threads to calculate updates for each piece
            newThread = threading.Thread(target = hpc_thread_function_w, args = 
                (a_pieces[i%p_row][int(i/p_row)],
                 w,
                 h_pieces[i],
                 [(q, l) for j, jj, q, l in cross_queues_w if i == j or i == jj],
                 out_queue_w,
                 i,
                 p_col))
            newThread.start()
            threads_w.append(newThread)
        for thread in threads_w: # wait for all threads to complete
            thread.join()
        assert [q.empty() for _, _, q, _ in cross_queues_w]
        while not out_queue_w.empty(): # reconstitute and update w
            np.all(np.isclose(h@h.T, out_queue_w.get()))
    return

HPC_NMF(np.identity(16)+1, 16, 4, 4, 1)
