In [1]:
from dask.distributed import Client, progress, get_worker
import os
client = Client("tcp://10.10.24.86:8786")

In [2]:
FILE_TO_BE_COMPRESSED = "enwik8"
TEMP_FILE_ON_WORKER = "_worker/temp.txt"
TEMP_COMPRESSED_FILE_ON_WORKER = "_worker/temp.gz"
FINAL_COMPRESSED_FILE = "compressed.gz"

In [3]:
def run_on_worker(file_bytes, index):
    import requests
    from xfZlibWrapper import xfZlibWrapper
    
    #Avoid reinstalling the xclbin
    if not hasattr(get_worker(), '_xclbin'): 
        xfZlib = xfZlibWrapper(b"/home/nimbix/compress_decompress.xclbin")
        get_worker()._xclbin = xfZlib
    else:
        xfZlib = get_worker()._xclbin
    
    os.makedirs(os.path.dirname("_worker/"), exist_ok=True)    
    open(TEMP_FILE_ON_WORKER, 'wb').write(file_bytes)
    

    size = xfZlib.compress_file(TEMP_FILE_ON_WORKER, TEMP_COMPRESSED_FILE_ON_WORKER)
    print('Compressed from ', os.path.getsize(TEMP_FILE_ON_WORKER),' to ', size, ' bytes')
    f = open(TEMP_COMPRESSED_FILE_ON_WORKER, "rb")
    return {
        'index': index,
        'data': f.read()
    }

In [5]:
import time
t0 = time.time()

num_of_workers = len(client.scheduler_info()["workers"])
data_split = []

# Split up the file into equal sized chunks based on number of available dask workers

print("Splitting input file into", num_of_workers, "chunk(s)")
with open(FILE_TO_BE_COMPRESSED, "rb") as ifile:    
    total = ifile.read()
    start = 0
    chunk_size = int(len(total)/num_of_workers)
    for i in range(num_of_workers):
        data_split.append(total[start: start+chunk_size])
        start += chunk_size
    
# print(data_split)
# Scatter the data to the workers before calling run_on_worker on the workers
distributed_data = client.scatter(data_split)
futures = client.map(run_on_worker, distributed_data, range(num_of_workers))
print(futures)
results = client.gather(futures)
print("Received data from workers")

results.sort(key = lambda result: result['index'])  # Reorder the response
results = [r['data'] for r in results]

gzip_header_size = 10 + len(TEMP_FILE_ON_WORKER) + 1 # standard header + file name + "\0"
gzip_footer_size = 8 + 5 # Standard footer + 5 bytes padding added by xfZlib

print("Merging the compressed files received")
concatenated_gzip_bytes = results[0][0:gzip_header_size] # Header from first result
for result in results[0:]:
    concatenated_gzip_bytes += result[gzip_header_size:-1*gzip_footer_size]
concatenated_gzip_bytes += results[0][-1*gzip_header_size:] # Footer from first result

print("Writing combined (compressed) data to " + FINAL_COMPRESSED_FILE)
with open(FINAL_COMPRESSED_FILE, "wb") as f:
    f.write(concatenated_gzip_bytes)
t1 = time.time()
print("TOTAL EXECUTION TIME (in s): ", t1 - t0)



Splitting input file into 1 chunk(s)
[<Future: cancelled, key: run_on_worker-12942954877001ae012fc1506a5140f6>]


CancelledError: 

In [None]:

FILE_COPY = FILE_TO_BE_COMPRESSED + ".copy"
COMMAND_TO_RUN = "gzip -dc " + FINAL_COMPRESSED_FILE + " > " + FILE_COPY
print("Extracting", FINAL_COMPRESSED_FILE, "using command: ")
print(COMMAND_TO_RUN)
os.system(COMMAND_TO_RUN)
print("Comparing", FILE_COPY, "to", FILE_TO_BE_COMPRESSED)
with open(FILE_TO_BE_COMPRESSED, 'rb') as f1:
    with open(FILE_COPY, 'rb') as f2:
        if f1.read() == f2.read():
            print("Validation succeeded !!")
        else:
            print("Validation failed !!")