# Connect to existing Dask CUDA Cluster

In [None]:
from dask.distributed import Client


cluster_ip = 'YOUR_DASK_SCHEDULER_IP'
client = Client(f'ucx://{cluster_ip}:8786')

client

In [None]:
import dask_cudf

tpcx_bb_home = 'YOUR_TPCX_BB_REPO_LOCATION'
spark_schema_dir = f'{tpcx_bb_home}/tpcx_bb/spark_table_schemas/'

# Spark uses different names for column types, and RAPIDS doesn't yet support Decimal types.
def get_schema(table):
  with open(f'{spark_schema_dir}{table}.schema') as fp:
    schema = fp.read()
    names = [line.replace(',', '').split()[0] for line in schema.split('\n')]
    types = [line.replace(',', '').split()[1].replace('bigint', 'int').replace('string', 'str') for line in schema.split('\n')]
    types = [col_type.split('(')[0].replace('decimal', 'float') for col_type in types]
    return names, types

def read_csv_table(table, chunksize='256 MiB'):
    # build dict of dtypes to use when reading CSV
    names, types = get_schema(table)
    dtype = {names[i]: types[i] for i in range(0, len(names))}

    base_path = f'{data_dir}/data/{table}'
    files = os.listdir(base_path)
    # item_marketprices has "audit" files that should be excluded
    if table == 'item_marketprices':
        paths = [f'{base_path}/{fn}' for fn in files if 'audit' not in fn and os.path.getsize(f'{base_path}/{fn}') > 0]
        base_path = f'{data_dir}/data_refresh/{table}'
        paths = paths + [f'{base_path}/{fn}' for fn in os.listdir(base_path) if 'audit' not in fn and os.path.getsize(f'{base_path}/{fn}') > 0]
        df = dask_cudf.read_csv(paths, sep='|', names=names, dtype=dtype, chunksize=chunksize, quoting=3)
    else:
        paths = [f'{base_path}/{fn}' for fn in files if os.path.getsize(f'{base_path}/{fn}') > 0]
        if table in refresh_tables:
            base_path = f'{data_dir}/data_refresh/{table}'
            paths = paths + [f'{base_path}/{fn}' for fn in os.listdir(base_path) if os.path.getsize(f'{base_path}/{fn}') > 0]
        df = dask_cudf.read_csv(paths, sep='|', names=names, dtype=types, chunksize=chunksize, quoting=3)

    return df

In [None]:
import os, subprocess, math


def multiplier(unit):
    if unit == 'G':
        return 1
    elif unit == 'T':
        return 1000
    else: return 0

# we use size of the CSV data on disk to determine number of Parquet partitions
def get_size_gb(table):
    path = data_dir + 'data/'+table
    size = subprocess.check_output(['du','-sh', path]).split()[0].decode('utf-8')
    unit = size[-1]
    
    size = math.ceil(float(size[:-1])) * multiplier(unit)
    
    if table in refresh_tables:
        path = data_dir + 'data_refresh/'+table
        refresh_size = subprocess.check_output(['du','-sh', path]).split()[0].decode('utf-8')
        size = size + math.ceil(float(refresh_size[:-1])) * multiplier(refresh_size[-1])
    
    return size

In [None]:
def repartition(table, outdir, npartitions=None, chunksize=None, compression='snappy'):
    size = get_size_gb(table)
    if npartitions is None:
        npartitions = max(1, size)
    print(f'Converting {table} of {size} GB to {npartitions} parquet files, chunksize: {chunksize}')
    read_csv_table(table, chunksize).repartition(npartitions=npartitions).to_parquet(outdir+table, compression=compression)

## Generate list of tables to convert

In [None]:
import os

# these tables have extra data produced by bigbench dataGen
refresh_tables = [
  'customer', 'customer_address',
  'inventory', 'item', 'item_marketprices',
  'product_reviews', 'store_returns', 'store_sales',
  'web_clickstreams', 'web_returns', 'web_sales'
]
tables = [table.split('.')[0] for table in os.listdir(spark_schema_dir)]

## Convert all tables to Parquet

In [None]:
import time

scale = 'sf10000'
part_size = 3
chunksize = '128 MiB'

# location of bigBench dataGen's CSV output
data_dir = f'/mnt/weka/tpcx-bb/{scale}/'
# location you want to write Parquet versions of the table data 
outdir = f'/mnt/weka/tpcx-bb/{scale}/parquet_{part_size}gb/'

total = 0
for table in tables:
    size_gb = get_size_gb(table)
    # product_reviews has lengthy strings which exceed cudf's max number of characters per column
    # we use smaller partitions to avoid overflowing this character limit
    if table == 'product_reviews':
        npartitions = max(1, int(size_gb/1))
    else:
        npartitions = max(1, int(size_gb/part_size))
    t0 = time.time()
    repartition(table, outdir, npartitions, chunksize, compression='snappy')
    t1 = time.time()
    total = total + (t1-t0)
    print(f'{table} took {t1-t0} of {total}\n')
print(f'{chunksize} took {total}s')