From e4d78171a6d52cc5f18431982b67fbc4f2d4c542 Mon Sep 17 00:00:00 2001 From: Kevin German Date: Wed, 2 Feb 2022 16:01:33 -0800 Subject: [PATCH] rebase dask-sql --- gpu_bdb/benchmark_runner.py | 34 ++++++++++++++++++++++++++++++++-- 1 file changed, 32 insertions(+), 2 deletions(-) diff --git a/gpu_bdb/benchmark_runner.py b/gpu_bdb/benchmark_runner.py index fe055016..6c383ab5 100755 --- a/gpu_bdb/benchmark_runner.py +++ b/gpu_bdb/benchmark_runner.py @@ -21,9 +21,11 @@ def load_query(qnum, fn): return mod.main -dask_qnums = [str(i).zfill(2) for i in range(1, 31)] -sql_qnums = [str(i).zfill(2) for i in range(1, 31)] +dask_qnums = [str(i).zfill(2) for i in map(int,os.getenv("DASK_QNUMS"," ".join(map(str,range(1, 31)))).split())] +sql_qnums = [str(i).zfill(2) for i in map(int,os.getenv("BSQL_QNUMS"," ".join(map(str,range(1, 31)))).split())] +from random import shuffle +shuffle(dask_qnums) if __name__ == "__main__": from bdb_tools.cluster_startup import attach_to_cluster, import_query_libs @@ -32,6 +34,7 @@ def load_query(qnum, fn): import_query_libs() config = gpubdb_argparser() config["run_id"] = uuid.uuid4().hex + include_sql = config.get("benchmark_runner_include_sql") dask_queries = { @@ -44,6 +47,11 @@ def load_query(qnum, fn): qnum: load_query(qnum, f"queries/q{qnum}/gpu_bdb_query_{qnum}_dask_sql.py") for qnum in sql_qnums } + else: + dask_queries = { + qnum: load_query(qnum, f"queries/q{qnum}/gpu_bdb_query_{qnum}.py") + for qnum in dask_qnums + } client, c = attach_to_cluster(config, create_sql_context=include_sql) # Preload required libraries for queries on all workers @@ -51,6 +59,26 @@ def load_query(qnum, fn): base_path = os.getcwd() + if config.get('benchmark_runner_log_rmm', False) or config.get('benchmark_runner_log_tasks', False): + + from bdb_tools import RMMResourceMonitor + from bdb_tools import DaskTaskLogger + + rmm_analyzer=RMMResourceMonitor(client=client, + outputdir=os.getenv('OUTPUT_DIR', '/tmp')) + dasktasklog=DaskTaskLogger( client=client, + outputdir=os.getenv('OUTPUT_DIR', '/tmp')) + + orig_run_query=run_query + def logged_run_query( *args, **kwargs ): + rmm_analyzer.begin_logging( prefix=f"rmmlog{qnum}") + dasktasklog.mark_begin() + orig_run_query( *args, **kwargs ) + rmm_analyzer.stop_logging() + dasktasklog.save_tasks( prefix=f"dasktasklog{qnum}") + + run_query=logged_run_query + # Run Dask SQL Queries if include_sql and len(sql_qnums) > 0: print("Dask SQL Queries") @@ -95,3 +123,5 @@ def load_query(qnum, fn): client.run_on_scheduler(gc.collect) gc.collect() time.sleep(3) + +