Skip to content

Commit

Permalink
rebase dask-sql
Browse files Browse the repository at this point in the history
  • Loading branch information
kevingerman committed Feb 3, 2022
1 parent db13a58 commit e4d7817
Showing 1 changed file with 32 additions and 2 deletions.
34 changes: 32 additions & 2 deletions gpu_bdb/benchmark_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,11 @@ def load_query(qnum, fn):
return mod.main


dask_qnums = [str(i).zfill(2) for i in range(1, 31)]
sql_qnums = [str(i).zfill(2) for i in range(1, 31)]
dask_qnums = [str(i).zfill(2) for i in map(int,os.getenv("DASK_QNUMS"," ".join(map(str,range(1, 31)))).split())]
sql_qnums = [str(i).zfill(2) for i in map(int,os.getenv("BSQL_QNUMS"," ".join(map(str,range(1, 31)))).split())]

from random import shuffle
shuffle(dask_qnums)

if __name__ == "__main__":
from bdb_tools.cluster_startup import attach_to_cluster, import_query_libs
Expand All @@ -32,6 +34,7 @@ def load_query(qnum, fn):
import_query_libs()
config = gpubdb_argparser()
config["run_id"] = uuid.uuid4().hex

include_sql = config.get("benchmark_runner_include_sql")

dask_queries = {
Expand All @@ -44,13 +47,38 @@ def load_query(qnum, fn):
qnum: load_query(qnum, f"queries/q{qnum}/gpu_bdb_query_{qnum}_dask_sql.py")
for qnum in sql_qnums
}
else:
dask_queries = {
qnum: load_query(qnum, f"queries/q{qnum}/gpu_bdb_query_{qnum}.py")
for qnum in dask_qnums
}

client, c = attach_to_cluster(config, create_sql_context=include_sql)
# Preload required libraries for queries on all workers
client.run(import_query_libs)

base_path = os.getcwd()

if config.get('benchmark_runner_log_rmm', False) or config.get('benchmark_runner_log_tasks', False):

from bdb_tools import RMMResourceMonitor
from bdb_tools import DaskTaskLogger

rmm_analyzer=RMMResourceMonitor(client=client,
outputdir=os.getenv('OUTPUT_DIR', '/tmp'))
dasktasklog=DaskTaskLogger( client=client,
outputdir=os.getenv('OUTPUT_DIR', '/tmp'))

orig_run_query=run_query
def logged_run_query( *args, **kwargs ):
rmm_analyzer.begin_logging( prefix=f"rmmlog{qnum}")
dasktasklog.mark_begin()
orig_run_query( *args, **kwargs )
rmm_analyzer.stop_logging()
dasktasklog.save_tasks( prefix=f"dasktasklog{qnum}")

run_query=logged_run_query

# Run Dask SQL Queries
if include_sql and len(sql_qnums) > 0:
print("Dask SQL Queries")
Expand Down Expand Up @@ -95,3 +123,5 @@ def load_query(qnum, fn):
client.run_on_scheduler(gc.collect)
gc.collect()
time.sleep(3)


0 comments on commit e4d7817

Please sign in to comment.