# Questions/Thoughts
- How to best structure the project? Code is growing fast
- How important is presenting results cleanly?
    - Could look around for better profiling and create graphs in matplotlib
    - Is it valuable to spend time automating profiling using decorators
- What's the bigger picture? How will this system fit into Pywren?
- Is there a benchmark? What kind of performance am I comparing against? Spark
- Is there a way to check correctness of the query results? Using Spark

In [None]:
import pandas as pd
import numpy as np
import queue
import cProfile
import multiprocessing as mp

In [8]:
NUM_THREADS = 4

In [3]:
# File I/O functions
def read_header(table_name):
    # TODO: read directly from schema
    with open("headers/" + table_name, "r") as f:
        result = f.read().split("\n")
        f.close()
        return result

def read_table(table_name):
    header = read_header(table_name)
    filename = "data/{table_name}.dat".format(table_name=table_name)
    table = pd.read_csv(filename, names=header, delimiter='|')
    table.tail()
    return table

def read_table_chunk(table_name, chunk_id, total_chunks):
    header = read_header(table_name)
    filename = "data/{table_name}_{chunk_id}_{total_chunks}.dat".format(table_name=table_name,
                                                                        chunk_id=chunk_id,
                                                                        total_chunks=total_chunks)
    table = pd.read_csv(filename, names=header, delimiter='|')
    table.tail()
    return table

def read_bucket(table_name, chunk_id, total_chunks, column, bucket_id, total_buckets):
    header = read_header(table_name)
    filename_template = "data/{name}_{chunk_id}_{total_chunks}_{column}_{bucket_id}_{total_buckets}.dat"
    filename = filename_template.format(name=table_name, chunk_id=chunk_id, total_chunks=total_chunks,
                                       column=column, bucket_id=bucket_id, total_buckets=total_buckets)
    table = pd.read_csv(filename, names=header, delimiter='|')
    table.tail()
    return table

def write_header(table_name, columns):
    with open("headers/" + table_name, "w") as f:
        f.write("\n".join(columns))
        f.close()

def write_table(table_name, table):
    write_header(table_name, table.columns)
    filename = "data/{table_name}.dat".format(table_name=table_name)
    table.to_csv(filename, sep='|', header=None)
    
def write_table_chunk(table_name, table, chunk_id, total_chunks):
    write_header(table_name, table.columns)
    filename = "data/{table_name}_{chunk_id}_{total_chunks}.dat".format(table_name=table_name,
                                                                        chunk_id=chunk_id,
                                                                        total_chunks=total_chunks)
    table.to_csv(filename, sep='|', header=None)
    
def write_buckets(buckets, table_name, chunk_id, total_chunks, column):
    total_buckets = len(buckets)
    assert total_buckets > 0, "There must be at least 1 bucket"
    write_header(table_name, buckets[0].columns)
    filename_template = "data/{name}_{chunk_id}_{total_chunks}_{column}_{bucket_id}_{total_buckets}.dat"
    for i, bucket in enumerate(buckets):
        filename = filename_template.format(name=table_name, chunk_id=chunk_id, column=column,
                                                 total_chunks=total_chunks, bucket_id=i + 1,
                                                 total_buckets=total_buckets)
        bucket.to_csv(filename, sep='|', header=None)

In [52]:
def join_on_column(A, B, a_column, b_column):
    """Merges 2 tables where a_column and b_column are equal.
    
    Assumes that a_column and b_column contain unique keys.
    This is *much* faster than filtered_cartesian product.
    However, it can be replaced with a map/reduce:
        1. map over rows of A
        2. select row with matching key in B
        3. return concatenation of row from A with row from B
        4. reduce list of concatenated rows by converting to dataframe
    
    Params:
        - A: a table (pandas dataframe object)
        - B: a table (pandas dataframe object)
        - a_column: string name of column of keys in A
        - b_column: string name of column of keys in B
    
    Returns:
        pandas dataframe
        
    Note:
        - The returned dataframe does not contain a column with the label
            b_column.
    """
    B_renamed  = B.rename(columns={b_column: a_column}, inplace=False)
    return pd.merge(A, B_renamed, on=a_column)

In [53]:
def filter_chunked_table(table_name, filtered_name, num_chunks, filter_fun):
    """Filters table in chunks.
    
    Args:
        filter_fun: takes a table chunk and returns a filtered table chunk
    """
    for i in range(num_chunks):
        chunk = read_table_chunk(table_name, i+1, num_chunks)
        chunk_filtered = filter_fun(chunk)
        write_table_chunk(filtered_name, chunk_filtered, i+1, num_chunks)

In [54]:
def assign_bucket_hash(item, total_buckets):
    return hash(item) % total_buckets

def assign_bucket_range(item, total_buckets):
    """Assigns item to a bucket associated to a particular range.
    Note: bucket ranges must be consistent across chunks.
    """
    raise NotImplementedError

In [62]:
# Modified this for multithreading
def split_chunk_into_buckets(i, table_name, column, total_chunks, assign_bucket_fn, total_buckets):
    """Splits a chunk into buckets and writes them
    """
    chunk = read_table_chunk(table_name, i, total_chunks)
    buckets_list = [list() for _ in range(total_buckets)]
    
    for idx, row in chunk.iterrows():
        bucket = assign_bucket_fn(row[column], total_buckets)
        buckets_list[bucket].append(row)
    buckets = [pd.DataFrame(rows, columns=chunk.columns) for rows in buckets_list]
        
    write_buckets(buckets, table_name, i, total_chunks, column)
    return True

def reduce_buckets_to_chunk(bucket_id, table_name, column, total_chunks, total_buckets, new_table_name):
    bucket_list = []
    for chunk_id in range(1, total_chunks + 1):
        bucket = read_bucket(table_name, chunk_id, total_chunks, column, bucket_id, total_buckets)
        bucket_list.append(bucket)
    chunk = pd.concat(bucket_list, ignore_index=True)
    write_table_chunk(new_table_name, chunk, bucket_id, total_buckets)

In [74]:
def split_table_into_buckets_threaded(table_name, column, total_chunks, assign_bucket_fn,
                                      total_buckets, num_threads=NUM_THREADS):
    # Is this the proper way, or is it better to modify split_chunk_into_buckets?
    args = [(i + 1, table_name, column, total_chunks, assign_bucket_fn, total_buckets)
            for i in range(total_buckets)]
    with mp.Pool(num_threads) as p:
        return all(p.starmap(split_chunk_into_buckets, args))
        
def merge_buckets_threaded(table_name, column, total_chunks, total_buckets,
                           new_table_name="", num_threads=NUM_THREADS):
    if not new_table_name:
        new_table_name = table_name
    args = [(i + 1, table_name, column, total_chunks, total_buckets, new_table_name)
           for i in range(total_buckets)]
    with mp.Pool(num_threads) as p:
        return all(p.starmap(reduce_buckets_to_chunk, args))

In [75]:
def join_buckets_on_column(a_name, a_column, b_name, b_column, bucket_id, total_buckets, out_name):
    a_bucket = read_table_chunk(a_name, bucket_id, total_buckets)
    b_bucket = read_table_chunk(b_name, bucket_id, total_buckets)
    joined_bucket = join_on_column(a_bucket, b_bucket, a_column, b_column)
    write_table_chunk(out_name, joined_bucket, bucket_id, total_buckets)
    return True

In [82]:
def join_on_column_threaded(a_name, a_column, a_total_chunks,
                               b_name, b_column, b_total_chunks,
                               out_name, total_buckets, num_threads=NUM_THREADS):
    # Split into buckets by hash
    split_table_into_buckets_threaded(a_name, a_column, a_total_chunks,
                                      assign_bucket_hash, total_buckets, num_threads)
    split_table_into_buckets_threaded(b_name, b_column, b_total_chunks,
                                      assign_bucket_hash, total_buckets, num_threads)
    # Merge buckets
    a_temp_name = a_name + "_hash_grouped"
    b_temp_name = b_name + "_hash_grouped"
    merge_buckets_threaded(a_name, a_column, a_total_chunks, total_buckets, a_temp_name, num_threads)
    merge_buckets_threaded(b_name, b_column, b_total_chunks, total_buckets, b_temp_name, num_threads)
    args = [(a_temp_name, a_column, b_temp_name, b_column, i + 1, total_buckets, out_name)
            for i in range(total_buckets)]
    with mp.Pool(num_threads) as p:
        return all(p.starmap(join_buckets_on_column, args))

In [86]:
def join_on_column_threaded_other(a_name, a_column, a_total_chunks,
                               b_name, b_column, b_total_chunks,
                               out_name, total_buckets, num_threads=NUM_THREADS):
    """Does not thread the joining of buckets. This is almost the same speed.
    Maybe due to I/O intensive operations and low numbers of buckets and data in join_buckets_on_column
    """
    # Split into buckets by hash
    split_table_into_buckets_threaded(a_name, a_column, a_total_chunks,
                                      assign_bucket_hash, total_buckets, num_threads)
    split_table_into_buckets_threaded(b_name, b_column, b_total_chunks,
                                      assign_bucket_hash, total_buckets, num_threads)
    # Merge buckets
    a_temp_name = a_name + "_hash_grouped"
    b_temp_name = b_name + "_hash_grouped"
    merge_buckets_threaded(a_name, a_column, a_total_chunks, total_buckets, a_temp_name, num_threads)
    merge_buckets_threaded(b_name, b_column, b_total_chunks, total_buckets, b_temp_name, num_threads)
    args = [(a_temp_name, a_column, b_temp_name, b_column, i + 1, total_buckets, out_name)
            for i in range(total_buckets)]
    for bucket_id in range(1, total_buckets + 1):
        # buckets are now chunks of the new table
        a_bucket = read_table_chunk(a_temp_name, bucket_id, total_buckets)
        b_bucket = read_table_chunk(b_temp_name, bucket_id, total_buckets)
        joined_bucket = join_on_column(a_bucket, b_bucket, a_column, b_column)
        write_table_chunk(out_name, joined_bucket, bucket_id, total_buckets)

In [22]:
# Filter date_dim
filter_chunked_table("date_dim", "date_dim_filtered", 3, lambda tbl: tbl[tbl["d_year"] == 2000])

In [92]:
# Using 2 threads
cProfile.run('join_on_column_threaded("date_dim_filtered", "d_date_sk", 3, '+
                          '"store_returns", "sr_returned_date_sk", 3,' +
                          '"ctr_joined", 3, 2)')

         4147 function calls in 43.788 seconds

   Ordered by: standard name

   ncalls  tottime  percall  cumtime  percall filename:lineno(function)
       45    0.000    0.000    0.000    0.000 <frozen importlib._bootstrap>:996(_handle_fromlist)
        2    0.000    0.000   35.309   17.654 <ipython-input-74-81de4cfdaee6>:1(split_table_into_buckets_threaded)
        2    0.000    0.000    0.000    0.000 <ipython-input-74-81de4cfdaee6>:13(<listcomp>)
        2    0.000    0.000    0.000    0.000 <ipython-input-74-81de4cfdaee6>:4(<listcomp>)
        2    0.000    0.000    5.853    2.927 <ipython-input-74-81de4cfdaee6>:9(merge_buckets_threaded)
        1    0.000    0.000   43.785   43.785 <ipython-input-82-5bd60ebc15df>:1(join_on_column_threaded)
        1    0.000    0.000    0.000    0.000 <ipython-input-82-5bd60ebc15df>:14(<listcomp>)
        1    0.001    0.001   43.787   43.787 <string>:1(<module>)
       25    0.000    0.000    0.000    0.000 _weakrefset.py:38(_remove)
       25 

In [93]:
# Using 4 threads
cProfile.run('join_on_column_threaded("date_dim_filtered", "d_date_sk", 3, '+
                          '"store_returns", "sr_returned_date_sk", 3,' +
                          '"ctr_joined", 3, 4)')

         5426 function calls in 32.952 seconds

   Ordered by: standard name

   ncalls  tottime  percall  cumtime  percall filename:lineno(function)
       55    0.000    0.000    0.001    0.000 <frozen importlib._bootstrap>:996(_handle_fromlist)
        2    0.000    0.000   26.429   13.215 <ipython-input-74-81de4cfdaee6>:1(split_table_into_buckets_threaded)
        2    0.000    0.000    0.000    0.000 <ipython-input-74-81de4cfdaee6>:13(<listcomp>)
        2    0.000    0.000    0.000    0.000 <ipython-input-74-81de4cfdaee6>:4(<listcomp>)
        2    0.000    0.000    4.482    2.241 <ipython-input-74-81de4cfdaee6>:9(merge_buckets_threaded)
        1    0.000    0.000   32.952   32.952 <ipython-input-82-5bd60ebc15df>:1(join_on_column_threaded)
        1    0.000    0.000    0.000    0.000 <ipython-input-82-5bd60ebc15df>:14(<listcomp>)
        1    0.000    0.000   32.952   32.952 <string>:1(<module>)
       32    0.000    0.000    0.000    0.000 _weakrefset.py:38(_remove)
       35 

In [94]:
# Using 6 threads
cProfile.run('join_on_column_threaded("date_dim_filtered", "d_date_sk", 3, '+
                          '"store_returns", "sr_returned_date_sk", 3,' +
                          '"ctr_joined", 3, 6)')

         6789 function calls in 34.137 seconds

   Ordered by: standard name

   ncalls  tottime  percall  cumtime  percall filename:lineno(function)
       65    0.000    0.000    0.001    0.000 <frozen importlib._bootstrap>:996(_handle_fromlist)
        2    0.000    0.000   26.663   13.332 <ipython-input-74-81de4cfdaee6>:1(split_table_into_buckets_threaded)
        2    0.000    0.000    0.000    0.000 <ipython-input-74-81de4cfdaee6>:13(<listcomp>)
        2    0.000    0.000    0.000    0.000 <ipython-input-74-81de4cfdaee6>:4(<listcomp>)
        2    0.000    0.000    5.009    2.505 <ipython-input-74-81de4cfdaee6>:9(merge_buckets_threaded)
        1    0.000    0.000   34.137   34.137 <ipython-input-82-5bd60ebc15df>:1(join_on_column_threaded)
        1    0.000    0.000    0.000    0.000 <ipython-input-82-5bd60ebc15df>:14(<listcomp>)
        1    0.000    0.000   34.137   34.137 <string>:1(<module>)
       43    0.000    0.000    0.000    0.000 _weakrefset.py:38(_remove)
       45 

In [95]:
# Using 8 threads
cProfile.run('join_on_column_threaded("date_dim_filtered", "d_date_sk", 3, '+
                          '"store_returns", "sr_returned_date_sk", 3,' +
                          '"ctr_joined", 3, 8)')

         8237 function calls in 32.415 seconds

   Ordered by: standard name

   ncalls  tottime  percall  cumtime  percall filename:lineno(function)
       75    0.000    0.000    0.001    0.000 <frozen importlib._bootstrap>:996(_handle_fromlist)
        2    0.000    0.000   25.909   12.955 <ipython-input-74-81de4cfdaee6>:1(split_table_into_buckets_threaded)
        2    0.000    0.000    0.000    0.000 <ipython-input-74-81de4cfdaee6>:13(<listcomp>)
        2    0.000    0.000    0.000    0.000 <ipython-input-74-81de4cfdaee6>:4(<listcomp>)
        2    0.000    0.000    4.427    2.214 <ipython-input-74-81de4cfdaee6>:9(merge_buckets_threaded)
        1    0.000    0.000   32.415   32.415 <ipython-input-82-5bd60ebc15df>:1(join_on_column_threaded)
        1    0.000    0.000    0.000    0.000 <ipython-input-82-5bd60ebc15df>:14(<listcomp>)
        1    0.000    0.000   32.415   32.415 <string>:1(<module>)
       53    0.000    0.000    0.000    0.000 _weakrefset.py:38(_remove)
       55 

# Analysis
Laptop CPU (Asus UX305FA) has 2 CPUs, 4 total hyper threads. Therefore, there's a natural speedup up to 4 threads. However, these results aren't reliable because we split into 3 buckets. Need bigger data and more chunks/buckets for more accurate results.