In [1]:
from dask.distributed import Client, progress, get_worker
import os
import binascii

# Replace with IP address of the dask scheduler
client = Client("tcp://10.10.24.60:8786") 

In [10]:
#Constants
BATCH_SIZES = [100, 500, 1000, 1500, 2000]

XCLBIN_PATH = "a.xclbin"
PLATFORM = "alveo"

In [3]:
#Download the CIFAR 10 dataset 
!wget https://raw.githubusercontent.com/modestyachts/CIFAR-10.1/master/datasets/cifar10.1_v4_data.npy

--2020-11-12 13:59:59--  https://raw.githubusercontent.com/modestyachts/CIFAR-10.1/master/datasets/cifar10.1_v4_data.npy
Resolving raw.githubusercontent.com (raw.githubusercontent.com)... 151.101.36.133
Connecting to raw.githubusercontent.com (raw.githubusercontent.com)|151.101.36.133|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 6208640 (5.9M) [application/octet-stream]
Saving to: ‘cifar10.1_v4_data.npy.3’


2020-11-12 14:00:00 (34.0 MB/s) - ‘cifar10.1_v4_data.npy.3’ saved [6208640/6208640]



In [17]:
def run_on_worker(ibuf_normal, index):
    print("Received ", len(ibuf_normal), "images for classification")
    from multiprocessing import Process,Queue
    import numpy as np
    import time
    
    def forked_process(queue, ibuf_normal):
        from driver import FINNAccelDriver
        from pynq.ps import Clocks
        
        batch_size = len(ibuf_normal)
        t0 = time.time()
        finnDriver = FINNAccelDriver(batch_size, XCLBIN_PATH, PLATFORM)
        ibuf_folded = finnDriver.fold_input(ibuf_normal)
#         ibuf_packed = finnDriver.pack_input(ibuf_folded)   Do not pack for performance reasons
        ibuf_packed = ibuf_folded
        finnDriver.copy_input_data_to_device(ibuf_packed)
        finnDriver.execute()
        obuf_packed = np.empty_like(finnDriver.obuf_packed_device)
        finnDriver.copy_output_data_from_device(obuf_packed)
        obuf_folded = finnDriver.unpack_output(obuf_packed)
        obuf_normal = finnDriver.unfold_output(obuf_folded)
        t1 = time.time()
        
        if PLATFORM != "alveo":
            fclk_mhz = Clocks.fclk0_mhz
        else:
            fclk_mhz = finnDriver.fclk_mhz
        runtime = t1-t0
        queue.put({
            'data': obuf_normal,
            'runtime': runtime,
            'index': index,
            'fclk_mhz': fclk_mhz,
            'throughput': batch_size/runtime,
            'bandwidth_in': np.prod(finnDriver.ishape_packed)*0.000001 / runtime,
            'bandwidth_out': np.prod(finnDriver.oshape_packed)*0.000001 / runtime,
            'N': batch_size
            
        })
    
    
    # We need to run the Pynq overlay in a new forked process since it cannot be run in a non-Main thread
    t0_total = time.time()
    queue = Queue()
    p = Process(target=forked_process, args=(queue, ibuf_normal))
    p.start()
    result = queue.get()
    p.join()
    t1_total = time.time()
    print("TOTAL EXECUTION TIME ON THIS WORKER (s): ", t1_total - t0_total)
    return result

In [18]:
import time
import numpy as np
import json

num_of_workers = len(client.scheduler_info()["workers"])
full_cifar = np.load('cifar10.1_v4_data.npy')


for BATCH_SIZE in BATCH_SIZES:
    print("BATCH_SIZE:", BATCH_SIZE)
    partial_cifar = full_cifar[:BATCH_SIZE]
    t0 = time.time()
    
    # Split up the file into equal sized chunks based on number of available dask workers
    data_split = []
    start = 0
    chunk_size = int(len(partial_cifar)/num_of_workers)
    for i in range(num_of_workers - 1):
        data_split.append(partial_cifar[start: start+chunk_size])
        start += chunk_size
    data_split.append(partial_cifar[start:]) #Last partition

    # Scatter the data to the workers before calling run_on_worker on the workers
    print("Sending data to workers, and triggering worker tasks...")
    
    distributed_data = client.scatter(data_split)
    futures = client.map(run_on_worker, distributed_data, range(num_of_workers))
    results = client.gather(futures)
    print("Received data from workers.")

    # Reorder the response based on original input order
    results.sort(key = lambda result: result['index'])  

    # Concatenate the result where each is an ndarray of the shape (BATCH_SIZE/num_of_workers, 1)
    merged_result = np.concatenate([r['data'] for r in results]) # FINAL RESULTS (CLASS LABELS)
    
    t1 = time.time()

    def avg(li):
        return sum(li)/len(li)

    print("TOTAL EXECUTION TIME:", t1-t0)
    print("Maximum FPGA runtime[s]:", max([r['runtime'] for r in results])) # Shown in the plot
    print("Average throughput[images/s]:", avg([r['throughput'] for r in results]))
    print("Average DRAM_in_bandwidth[Mb/s]:", avg([r['bandwidth_in'] for r in results])) 
    print("Average DRAM_out_bandwidth[Mb/s]:", avg([r['bandwidth_out'] for r in results]))    
    print("**************************")    

BATCH_SIZE: 100
Sending data to workers, and triggering worker tasks...
Received data from workers.
TOTAL EXECUTION TIME: 1.3151390552520752
Maximum FPGA runtime[s]: 0.2621912956237793
Average throughput[images/s]: 381.40091478662555
Average DRAM_in_bandwidth[Mb/s]: 1.1716636102245137
Average DRAM_out_bandwidth[Mb/s]: 0.0003814009147866255
**************************
BATCH_SIZE: 500
Sending data to workers, and triggering worker tasks...
Received data from workers.
TOTAL EXECUTION TIME: 1.4434535503387451
Maximum FPGA runtime[s]: 0.4252645969390869
Average throughput[images/s]: 1175.7385956856829
Average DRAM_in_bandwidth[Mb/s]: 3.611868965946418
Average DRAM_out_bandwidth[Mb/s]: 0.0011757385956856828
**************************
BATCH_SIZE: 1000
Sending data to workers, and triggering worker tasks...
Received data from workers.
TOTAL EXECUTION TIME: 1.6121728420257568
Maximum FPGA runtime[s]: 0.6024956703186035
Average throughput[images/s]: 1659.7629647217111
Average DRAM_in_bandwidth[Mb