In [1]:
# Initialization
import pyhdk 
import pandas
import time
import pyarrow as pa
import pyarrow.csv
import os, sys

config = pyhdk.buildConfig(enable_heterogeneous=True,
                           force_heterogeneous_distribution=True,
                           enable_multifrag_heterogeneous=True,
                           enable_debug_timer=True,
                           )
pyhdk.initLogger(log_severity="INFO")
storage = pyhdk.storage.ArrowStorage(1)
data_mgr = pyhdk.storage.DataMgr(config)
data_mgr.registerDataProvider(storage)

calcite = pyhdk.sql.Calcite(storage, config)
executor = pyhdk.Executor(data_mgr, config)

In [2]:
# Helper Functions
default_step = 50
default_iters = 3

def get_rel_alg(sql):
    return calcite.process(sql)

def run_query(sql):
    ra = get_rel_alg(sql)
    # One RelAlgExecutor per query
    rel_alg_executor = pyhdk.sql.RelAlgExecutor(executor, storage, data_mgr, ra)
    return rel_alg_executor.execute()


def import_hdk_pyarrow(storage, arrow_table, hdk_table_name, fragment_size, overwrite=True):
    """
    Imports a pyarrow table to HDK with the given fragment size.
        overwrite: By default overwrites previously existing table.
    """
    opt = pyhdk.storage.TableOptions(fragment_size)
    start_timer = time.perf_counter()
    try:
        storage.importArrowTable(arrow_table, hdk_table_name, opt)
    except:
        if not overwrite:
            raise Exception(f"Cannot overwrite table{hdk_table_name}, overwrite={overwrite}")
        storage.dropTable(hdk_table_name)
        storage.importArrowTable(arrow_table, hdk_table_name, opt)
    print(f"[PyHDK] Importing pyarrow table: {(time.perf_counter()-start_timer):.4f}s")


def run_query_het_all_props(sql, query_name="", prop_step=default_step, n_iters=default_iters, clear_memory_devices=[]):
    """
    Runs SQL query multiple times at each proportion, feel free try and experiment with loops order.
        clear_memory_devices: clear memory of the device manager: 1:CPU, 2:GPU 
    """
    cython_enum_dict = {"CPU":1, "GPU":2} # May move up to cython for easier interface
    ra = get_rel_alg(sql)
    col_names = ["GPU_prop", f"QueryT_{query_name}"]
    prop_time = {col_names[0] : [], col_names[1]: []}
    # Walking over proportions
    for gpu_proportion in range(0, 101, prop_step):
        # Multiple iterations
        for _ in range(1, n_iters + 1):
            rel_alg_executor = pyhdk.sql.RelAlgExecutor(executor, storage, data_mgr, ra)
            query_start = time.perf_counter()
            result = rel_alg_executor.execute(forced_gpu_proportion=gpu_proportion)
            query_finish = time.perf_counter()
            prop_time[col_names[0]].append(gpu_proportion)
            prop_time[col_names[1]].append(query_finish - query_start)
            [executor.clearMemory(data_mgr, cython_enum_dict[device]) for device in clear_memory_devices]

        df_prop_time = pandas.DataFrame(prop_time, columns=col_names)
    # Some metadata to get idea about the output cardinality
    df_output = result.to_arrow().to_pandas()
    output_size_KB = df_output.memory_usage(index=True).sum() // (1024)
    df_prop_time.rename(columns={col_names[1]:f"{col_names[1]}_{output_size_KB}KB"}, inplace=True)
    return [df_prop_time, df_output]

def run_queries_all_props(query_dict, step=default_step, n_iters=default_iters, clear_memory_devices=[]):
    """
    Runs query dictionary of SQL queries with the following structure: dict(query_name:{SQL_string})
        clear_memory_devices: clear memory of the device manager after each query: "CPU", "GPU" 
    """
    q_timings_dict = dict()
    for q_name in query_dict:
        [df_prop_time, df_output] = run_query_het_all_props(query_dict[q_name], 
                                                            query_name=q_name, 
                                                            prop_step=step, 
                                                            n_iters=n_iters, 
                                                            clear_memory_devices=clear_memory_devices)
        df_prop_time.set_index("GPU_prop", inplace=True)
        q_timings_dict[q_name] = (df_prop_time)
    return q_timings_dict

def fragment_size_calc(num_rows):
    """Taken from Modin, you can experiment with it."""
    cpu_count = os.cpu_count()
    if cpu_count is not None:
        fragment_size = num_rows // cpu_count
        fragment_size = min(fragment_size, 2**25)
        fragment_size = max(fragment_size, 2**18)
        return fragment_size
    else:
        return None

def fragment_size_test_range(num_rows):
    """
    Take two power of two steps around default frag_size: [x/4,x/2,x,x*2,x*4].
    """
    res_range = []
    default_fragment_size = fragment_size_calc(num_rows)
    print(f"Default fragment_size={default_fragment_size}")
    power_two_steps = 4
    range_start = default_fragment_size
    range_end = default_fragment_size*(2**power_two_steps)
    fragment_size = range_start
    while fragment_size < range_end+1:
        res_range.append(fragment_size)
        fragment_size *= 2
    res_range.append(32000000)
    return res_range

def test_groups_fragment_sizes(storage, pyarrow_tbl, table_name, get_q_dict_callback, step, n_iters, clear_memory_devices=[]):
    """ 
    Produces the follwing result grouping: fragment_size{query_name{timings_df}}
    """
    part_group_timings_dict = dict()
    for frag_size in fragment_size_test_range(pyarrow_tbl.num_rows):
        table_size_MB = pyarrow_tbl.nbytes // (1024*1024)
        print(f"Testing {table_size_MB}MB Table with Frag.size={frag_size}")
        refragmented_view_name = f"{table_name}_{frag_size}"
        storage.createRefragmentedView(table_name, refragmented_view_name, frag_size)
        part_group_timings_dict[f"Tbl_size_{table_size_MB}MB_frag_size_{frag_size}"] = run_queries_all_props(get_q_dict_callback(refragmented_view_name), step, n_iters, clear_memory_devices)
        storage.dropTable(refragmented_view_name)
    return part_group_timings_dict

In [3]:
# Read data
# dataset_path = "../omniscidb/Tests/ArrowStorageDataFiles/taxi_sample_header.csv"
table_name = "taxi"
# If the CSV does not have a header, please provide the column names.
# pyarrow_tbl = pa.csv.read_csv(dataset_path)
# import pyarrow.parquet
dataset_dir1 = "~/datasets/taxi_parq/trips_xaa.parquet"
# dataset_dir2 = "~/datasets/taxi_parq/trips_xab.parquet"
# dataset_path = "~/datasets/taxi_reduced.csv"
# table_name = "taxi"
import pyarrow.parquet
# # If the CSV does not have a header, please provide the column names.
pyarrow_tbl = pa.parquet.read_table(dataset_dir1).slice(0, 10000000)
# pyarrow_tbl2 = pa.parquet.read_table(dataset_dir2)
# pyarrow_tbl = pa.concat_tables([pyarrow_tbl1,pyarrow_tbl2])

SyntaxError: invalid syntax (2340901355.py, line 13)

In [None]:
# Queries (NY Taxi example)
select_queries = {
    "simple_select" : f"SELECT * FROM {table_name};",
    "select_less" : f"SELECT * FROM {table_name} WHERE rate_code_id > 1;",
}

groupby_queries = {
    "simple_groupby" : f"SELECT Count(*) FROM {table_name} GROUP BY rate_code_id;",
    "group_by_larger" : f"SELECT total_amount, COUNT(*) FROM {table_name} GROUP BY total_amount;",
}

def getTaxiQ_for_table(tbl_name):
    return {
    "Q1": f"SELECT cab_type, count(*)\
            FROM {tbl_name}\
            GROUP BY cab_type;",
    "Q2": f"SELECT passenger_count, avg(total_amount)\
            FROM {tbl_name}\
            GROUP BY passenger_count;",
    "Q3": f"SELECT passenger_count, extract(year from pickup_datetime) as pickup_year, count(*)\
            FROM {tbl_name}\
            GROUP BY passenger_count, extract(year from pickup_datetime);",
    "Q4": f"SELECT passenger_count,\
                extract(year from pickup_datetime) as pickup_year,\
                cast(trip_distance as int) AS distance,\
                count(*) AS the_count\
            FROM {tbl_name}\
            GROUP BY passenger_count,\
                    pickup_year,\
                    distance\
            ORDER BY passenger_count, pickup_year, distance, the_count;"
}

In [None]:
# # Run Queries (kernel may crush, HDK-side issues)
default_fragment_size = fragment_size_calc(pyarrow_tbl.num_rows)
import_hdk_pyarrow(storage, pyarrow_tbl, table_name, 500000)

# select_queries_timings = run_queries_all_props(select_queries,20,4) # on large tables can take quite some time on GPU
# groupby_queries_timings = run_queries_all_props(groupby_queries,10,10,clear_memory_devices=["CPU", "GPU"])
# taxi_queries_timings = run_queries_all_props(getTaxiQ_for_table(table_name),10,3)

[PyHDK] Importing pyarrow table: 6.0417s


In [8]:
ra = get_rel_alg(f"SELECT passenger_count, SUM(total_amount) FROM {table_name} GROUP BY passenger_count;")
rel_alg_executor = pyhdk.sql.RelAlgExecutor(executor, storage, data_mgr, ra)
result = rel_alg_executor.execute(forced_gpu_proportion=100)
df_output = result.to_arrow().to_pandas()
print(df_output)

   passenger_count        EXPR$1
0                0  1.829956e+04
1                1  1.175629e+08
2                2  1.132688e+07
3                3  3.484196e+06
4                4  1.277596e+06
5                5  8.148773e+06
6                6  2.533440e+06
7                7  4.047260e+03
8                8  3.229640e+03
9                9  1.706650e+03


In [None]:
taxi_frags = test_groups_fragment_sizes(storage, pyarrow_tbl, table_name, getTaxiQ_for_table, 10, 4)

In [None]:
import importlib.util
if importlib.util.find_spec("matplotlib") is None:
    raise Exception("Please install matplotlib")

import matplotlib.pyplot as plt
%matplotlib inline
plt.rcParams["figure.figsize"] = (8,4)
styles = ['s-','o-','^-','+-','*-',',-']

def plotTimings(dict_of_df_timings, plot_name="Time vs GPU proportion"):
    ylab = "Time (s)"
    xlab = "Data proportion on GPU (%)"
    df_accumulator = None
    for q_name in dict_of_df_timings:
        df_agg = dict_of_df_timings[q_name].groupby(["GPU_prop"]).median()
        if df_accumulator is None:
            df_accumulator = df_agg
        else:
            df_accumulator = df_accumulator.merge(df_agg, on=["GPU_prop"])
    df_accumulator.plot(xlabel=xlab, ylabel=ylab, title=plot_name)
    plt.legend(bbox_to_anchor=(1.01, 1), loc="upper left")
    plt.tight_layout()


def swap_dict_levels(dict_frag_to_q):
    """ 
    Normally, we have the following grouping: 
        query_name{timings_df}, 
    for fragments we have:
        fragment_size{query_name{timings_df}},
    so to make plotting taxi queries simpler, we change it to:
        query_name{fragment_size{timings_df}}, 
    feel free to change the query grouping structure.
    """
    transformed_q_frag = dict()
    for frag_size, q_values in dict_frag_to_q.items():
        for q, value in q_values.items():
            if q not in transformed_q_frag:
                transformed_q_frag[q] = value
            else:
                transformed_q_frag[q] = pandas.concat([transformed_q_frag[q], value], axis=1)
            l = transformed_q_frag[q].columns.tolist()
            transformed_q_frag[q] = transformed_q_frag[q].rename(columns={l[-1] :l[-1]+ f"_{frag_size}"})
    return transformed_q_frag

def plotTimingsFrags(dict_of_df_timings, plot_name="Time vs GPU proportion"):
    ylab = "Time (s)"
    xlab = "Data proportion on GPU (%)"
    fig, axes = plt.subplots(len(dict_of_df_timings),1)
    fig.set_size_inches(7,9)
    for enum, q_name in enumerate(dict_of_df_timings):
        df_agg = dict_of_df_timings[q_name].groupby(["GPU_prop"]).median()
        # Cut redundand query info
        df_agg.rename(columns=lambda x: '_'.join(x.split('_')[-3:]), inplace=True)
        subplot_title = q_name
        df_agg.plot(ax=axes[enum], xlabel=xlab, ylabel=ylab, title=subplot_title, style=styles[enum])
        axes[enum].legend(bbox_to_anchor=(1.01, 1.02), loc="upper left")

    fig.tight_layout()
    fig.show()

In [None]:
plotTimings(groupby_queries_timings)
plotTimings(taxi_queries_timings, "Taxi queries")

In [None]:
plotTimingsFrags(swap_dict_levels(taxi_frags))

In [None]:
# HDK Cleanup
storage.dropTable(table_name)