# fastcdc-py - optimization study

In [1]:
import ctypes
import os
from codetiming import Timer
import humanize
import cython
import random
from array import array
from concurrent.futures import ThreadPoolExecutor
%load_ext cython

## Utility functions

In [2]:
def get_block_size_win():
    """Check storage cluster/sector sizes"""
    sectorsPerCluster = ctypes.c_ulonglong(0)
    bytesPerSector = ctypes.c_ulonglong(0)
    rootPathName = ctypes.c_wchar_p(u"C:\\")

    ctypes.windll.kernel32.GetDiskFreeSpaceW(rootPathName,
        ctypes.pointer(sectorsPerCluster),
        ctypes.pointer(bytesPerSector),
        None,
        None,
    )
    return sectorsPerCluster.value, bytesPerSector.value
get_block_size_win()

(8, 512)

In [3]:
DUMMY="dummy.bin"

def create_dummy_file(size=int(1e+9)):
    """Create a Random Data File"""
    print(f'Generate {humanize.naturalsize(size)} {DUMMY}')
    with open(DUMMY, "wb") as outf:
        outf.write(os.urandom(size))

create_dummy_file()

Generate 1.0 GB dummy.bin


## File Read

In [4]:
def benchmark_read(func, chunk_size):
    func_name = func.__name__
    num_bytes = os.path.getsize(DUMMY)
    file_size = humanize.naturalsize(num_bytes)
    t = Timer(logger=None)
    t.start()
    result = func(chunk_size)
    t.stop()
    print(f"{file_size} with {humanize.naturalsize(chunk_size)} chunk size with {func_name} : {humanize.naturalsize(num_bytes / t.last)}/s (result = {result})")

In [5]:
def read_chunked(chunk_size=1024*32):
    with open(DUMMY, 'rb') as f:
        data = f.read(chunk_size)
        x = None
        while data:
            x = data
            data = f.read(chunk_size)
    return x[-1]

In [6]:
benchmark_read(read_chunked, chunk_size=1024)
benchmark_read(read_chunked, chunk_size=1024*8)
benchmark_read(read_chunked, chunk_size=1024*16)
benchmark_read(read_chunked, chunk_size=1024*32)
benchmark_read(read_chunked, chunk_size=1024*64)
benchmark_read(read_chunked, chunk_size=1024*128)
benchmark_read(read_chunked, chunk_size=1024*256)
benchmark_read(read_chunked, chunk_size=1024*512)
benchmark_read(read_chunked, chunk_size=1024*1024)

1.0 GB with 1.0 kB chunk size with read_chunked : 1.5 GB/s (result = 46)
1.0 GB with 8.2 kB chunk size with read_chunked : 1.9 GB/s (result = 46)
1.0 GB with 16.4 kB chunk size with read_chunked : 2.9 GB/s (result = 46)
1.0 GB with 32.8 kB chunk size with read_chunked : 4.3 GB/s (result = 46)
1.0 GB with 65.5 kB chunk size with read_chunked : 5.3 GB/s (result = 46)
1.0 GB with 131.1 kB chunk size with read_chunked : 6.1 GB/s (result = 46)
1.0 GB with 262.1 kB chunk size with read_chunked : 6.8 GB/s (result = 46)
1.0 GB with 524.3 kB chunk size with read_chunked : 7.1 GB/s (result = 46)
1.0 GB with 1.0 MB chunk size with read_chunked : 2.7 GB/s (result = 46)


## Chunking Operations 

In [7]:
DATA = os.urandom(1024*512*100)
GEAR = [random.getrandbits(32) for _ in range(256)]
GEAR_A = array('L', GEAR)

In [8]:
def benchmark_chunker(func, data, gear):
    func_name = func.__name__
    num_bytes = len(data)
    data_size = humanize.naturalsize(num_bytes)
    t = Timer(logger=None)
    t.start()
    result = func(data, gear)
    t.stop()
    print(f"{data_size} with {func_name} : {humanize.naturalsize(num_bytes / t.last)}/s (total={t.last:.4f}s result={result})")

In [9]:
def chunker_simple(data, gear):
    pattern = 0
    mask = 32767
    c = 0
    for b in data:
        pattern = (pattern >> 1) + gear[b]
        if not pattern & mask:
            c+=1
    return c

In [10]:
%%cython
cimport cython
from libc.stdint cimport uint32_t, uint8_t

@cython.boundscheck(False)
@cython.wraparound(False)
def chunker_cython(const uint8_t[:] data, const uint32_t[:] gear):
    cdef uint32_t pattern, mask, c, i, length
    pattern = 0
    mask = 32767
    c = 0
    length = data.shape[0]
    with nogil:
        for i in range(length):
            pattern = (pattern >> 1) + gear[data[i]]
            if not pattern & mask:
                c += 1
    return c

In [11]:
from numba import jit

@jit(nopython=True)
def chunker_numba(data, gear):
    pattern = 0
    mask = 32767
    c = 0
    for b in data:
        pattern = (pattern >> 1) + gear[b]
        if not pattern & mask:
            c+=1
    return c

In [22]:
benchmark_chunker(chunker_simple, DATA, GEAR)
benchmark_chunker(chunker_cython, DATA, GEAR_A)
benchmark_chunker(chunker_numba, DATA, GEAR_A)

52.4 MB with chunker_simple : 7.6 MB/s (total=6.9281s result=1656)
52.4 MB with chunker_cython : 1.5 GB/s (total=0.0352s result=1622)
52.4 MB with chunker_numba : 1.7 GB/s (total=0.0313s result=1656)


## Parallel Operations

In [20]:
def benchmark_multi_chunker(func):
    func_name = func.__name__
    num_bytes = os.path.getsize(DUMMY)    
    data_size = humanize.naturalsize(num_bytes)
    t = Timer(logger=None)
    t.start()
    result = func()
    t.stop()
    print(f"{data_size} with {func_name} : {humanize.naturalsize(num_bytes / t.last)}/s (total={t.last:.4f}s result={result[:3]}...{result[-3:]})")

def fixed_chunks(chunk_size=1024*512):
    with open(DUMMY, 'rb') as f:
        data = f.read(chunk_size)
        while data:
            yield data
            data = f.read(chunk_size)
            
def serial_chunker():
    result = []
    for chunk in fixed_chunks():
        result.append(chunker_cython(chunk, GEAR_A))
    return result

def parallel_chunker():
    with ThreadPoolExecutor(max_workers=8) as ex:
        jobs = [ex.submit(chunker_cython, chunk, GEAR_A) for chunk in fixed_chunks()]
    return [job.result() for job in jobs]

In [21]:
benchmark_multi_chunker(serial_chunker)
benchmark_multi_chunker(parallel_chunker)

1.0 GB with serial_chunker : 1.2 GB/s (total=0.8270s result=[15, 15, 19]...[13, 17, 4])
1.0 GB with parallel_chunker : 4.3 GB/s (total=0.2345s result=[15, 15, 19]...[13, 17, 4])
