In [186]:
from dask.distributed import Client, progress, get_worker
import numpy as np

# Insert the scheduler IP below (available after running the "dask-scheduler" command)
client = Client("tcp://131.180.106.138:8786")
client

0,1
Client  Scheduler: tcp://xx.xx.xx.xx:8786,Cluster  Workers: 2  Cores: 2  Memory: 5.05 GB


In [187]:
def run_on_worker(data):
    print(f"Received task from scheduler with data of type: {type(data)}")
    from multiprocessing import Process,Queue
    import time
    import numpy as np
    t0 = time.time()
    
    def child_process(queue, data):
        import pynq
        from pynq import Device
        import os
        
        # Select right device
        device_name = os.environ['DEVICE']
        device = [i for i in Device.devices if i.name == os.environ['DEVICE']]
        if len(device) == 1:         
            print(f"Selecting device: {device[0].name}")
            Device.active_device = device[0]
        else:
            print(f"No device found with name: {os.environ['DEVICE']}")
            return None

        ol = pynq.Overlay("pynq-notebooks/1-introduction/intro.xclbin")
        vadd = ol.vadd_1

        # allocate buffers
        (input1, input2) = data
        shape = input1.shape
        print(f"Adding arrays of shape: {shape}")
        size = shape[0]*shape[1]
        in1_vadd = pynq.allocate(shape, np.uint32)
        in2_vadd = pynq.allocate(shape, np.uint32)
        out = pynq.allocate(shape, np.uint32)

        # initialize input
        in1_vadd[:] = input1
        in2_vadd[:] = input2

        # send data to the device
        in1_vadd.sync_to_device()
        in2_vadd.sync_to_device()

        # call kernel
        vadd.call(in1_vadd, in2_vadd, out, size)

        # get data from the device
        out.sync_from_device()

        # clean up
        del in1_vadd
        del in2_vadd
        ol.free()
        queue.put(np.copy(out))
        del out
        
    # We need to run the Pynq overlay in a new forked process since it cannot be run in a non-Main thread
    queue = Queue()
    p = Process(target=child_process, args=(queue,data))
    p.start()
    result = queue.get()
    p.join()
    t1 = time.time()
    print("EXECUTION TIME ON THIS WORKER: ", t1 - t0)
    return result

In [188]:
import time

t0 = time.time()

# Initialise input data
input_shape = (4096,4096)
input1 = np.random.randint(low=0, high=100, size=input_shape, dtype=np.uint32)
input2 = np.full(input_shape, 200)


# Split input data based on no. of workers
num_of_workers = len(client.scheduler_info()["workers"])
data_split = []
start = 0
chunk_size = int(len(input1)/num_of_workers)
for i in range(num_of_workers):
    data_split.append((input1[start: start+chunk_size], input2[start: start+chunk_size]))
    start += chunk_size
print(f"Split image data into {num_of_workers} chunk(s)")
    

# Scatter the data to the workers before calling run_on_worker on the workers
distributed_data = client.scatter(data_split)
futures = client.map(run_on_worker, distributed_data)

#Print the output returned by the workers
results = client.gather(futures)
print("Received from workers: ", results)

t1 = time.time()


# check results
mergedResult = np.concatenate(results, axis=0)
msg = "SUCCESS!" if np.array_equal(mergedResult, input1 + input2) else "FAILURE!"
print(msg)


print("TOTAL EXECUTION TIME: ", t1 - t0)

Split image data into 2 chunk(s)
Received from workers:  [array([[266, 264, 265, ..., 278, 215, 291],
       [221, 231, 202, ..., 260, 209, 246],
       [280, 222, 208, ..., 283, 220, 276],
       ...,
       [223, 278, 203, ..., 260, 278, 287],
       [219, 237, 221, ..., 240, 207, 244],
       [269, 289, 259, ..., 299, 299, 294]], dtype=uint32), array([[268, 221, 299, ..., 211, 223, 234],
       [246, 284, 276, ..., 254, 276, 285],
       [215, 202, 254, ..., 232, 226, 281],
       ...,
       [261, 215, 232, ..., 231, 256, 296],
       [274, 241, 283, ..., 249, 218, 273],
       [248, 202, 243, ..., 262, 235, 262]], dtype=uint32)]
SUCCESS!
TOTAL EXECUTION TIME:  3.2790794372558594
