Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: log dask task stream and rmm events into results dir #225

Open
wants to merge 62 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
62 commits
Select commit Hold shift + click to select a range
46eefa1
Add dask-sql query files
Aug 12, 2021
b14335a
Fixes
Aug 12, 2021
2de0db5
Specify web_clickstreams columns
Aug 16, 2021
0f20be6
Set persist=False when creating tables
Aug 17, 2021
c216a3f
Remove debugging code
Aug 17, 2021
7b96544
Cleanup query 5
Aug 24, 2021
5c1eeb8
Add DISTRIBUTE BY Operator
Sep 7, 2021
0ba6115
Reset index on train/test df's after sorting
ayushdg Sep 8, 2021
41054eb
Merge pull request #1 from ayushdg/dask-sql
ChrisJar Sep 8, 2021
9a2eace
Fix query 18
Sep 9, 2021
0b5eb2c
Merge std_dev aggregation to query2, and remove persist's to reduce m…
ayushdg Sep 10, 2021
2101d1e
Merge pull request #2 from ayushdg/dask-sql
ChrisJar Sep 10, 2021
ddf0c9f
Fix duplicate index for 5, 8, and 26
Sep 15, 2021
73ee7a9
Updating bdb_tools & test dask-sql queries for shared Context
randerzander Sep 20, 2021
e726156
Merge pull request #3 from randerzander/224_2
ChrisJar Sep 20, 2021
ed7b137
added split_out to q29
VibhuJawa Sep 20, 2021
a2a52e2
Added persist to prevent duplicate computation
VibhuJawa Sep 20, 2021
c5b40f0
fixed comment
VibhuJawa Sep 20, 2021
0dc6440
fixed comment
VibhuJawa Sep 20, 2021
c0d7471
Merge pull request #4 from VibhuJawa/q29_sql
ChrisJar Sep 21, 2021
c44374e
Update remaining queries to use shared context
Sep 21, 2021
382c7c1
Remove extra dask-sql imports
Sep 23, 2021
b71cb50
Fix q22 errros by casting date column to int
ayushdg Sep 23, 2021
a1ec59e
Merge pull request #5 from ayushdg/dask-sql
ChrisJar Sep 28, 2021
3a26c91
added Query-25 dask-sql alternate implimentation
VibhuJawa Sep 29, 2021
558c5de
fixed comment
VibhuJawa Sep 29, 2021
a51521e
Merge pull request #6 from VibhuJawa/q25_fix
ChrisJar Sep 30, 2021
fbaa648
removed not useful order bys
VibhuJawa Oct 6, 2021
c453417
remove persist from query-02
VibhuJawa Oct 6, 2021
a0334a2
q03 removed persist
Oct 8, 2021
5b20078
Revert incorrect cluster change
VibhuJawa Oct 8, 2021
002f168
Merge pull request #7 from VibhuJawa/fix_order_distribute_by
ChrisJar Oct 8, 2021
824d5dd
Fix query 3 numba issue
Nov 3, 2021
0e058a6
Fix module load errors
Nov 9, 2021
65d2ea4
Variable renaming: bsql->dask-sql
randerzander Dec 8, 2021
db45969
Merge pull request #9 from randerzander/224_9
ChrisJar Dec 14, 2021
d3ca1b1
Update copyrights
Jan 4, 2022
f5303bb
Cleanup
Jan 5, 2022
19a271d
Address reviews
Jan 6, 2022
6637e11
Remove load_q03
Jan 6, 2022
0720a66
Share code between sql and Dask queries
Jan 11, 2022
999818b
Remove lock files
Jan 11, 2022
10c71d1
Remove category codes casts
Jan 19, 2022
8d9db0d
Refactor read_tables and constants into shared files
Jan 20, 2022
6470303
Update copyrights
Jan 20, 2022
3e9d688
Remove unused imports
Jan 20, 2022
a55eddc
Cleanup remaining repeated code
Jan 20, 2022
d73b1c0
Add dask-sql environment file
Jan 25, 2022
a9a0833
Update dask-sql version
Jan 25, 2022
b60d2f1
fix query 22
VibhuJawa Jan 25, 2022
3cf17b5
Query 03 fi
VibhuJawa Jan 25, 2022
e3a94c1
small style fix
VibhuJawa Jan 25, 2022
5fe4e41
Merge pull request #10 from VibhuJawa/q03_drop_na_fix
ChrisJar Jan 25, 2022
ca49940
replace deprecated df.one_hot_encoder with cudf.get_dummies
ayushdg Jan 26, 2022
cac2144
Merge pull request #12 from ayushdg/replace-one-hot-encoder
ChrisJar Jan 26, 2022
1dcc3f5
Q22 result verfied
VibhuJawa Jan 26, 2022
7b1c16c
Merge pull request #11 from VibhuJawa/q22_fix
ChrisJar Jan 26, 2022
99e5f30
log dask task stream and rmm events into results dir
kevingerman Dec 20, 2021
cbd632e
unneccesary
kevingerman Jan 6, 2022
d38e827
rmm logs named for each worker by pid
kevingerman Jan 13, 2022
db13a58
gate logging with config options
kevingerman Jan 25, 2022
e4d7817
rebase dask-sql
kevingerman Feb 3, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 34 additions & 0 deletions conda/rapids-gpu-bdb-dask-sql.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
channels:
- rapidsai-nightly
- nvidia
- conda-forge

dependencies:
- python=3.8
- cudatoolkit=11.2
- cudf
- rmm
- dask-cuda
- dask-cudf
- cuml
- dask
- distributed
- ucx-py
- ucx-proc=*=gpu
- dask-sql>=2022.1
- numba=0.54.*
- scipy
- scikit-learn
- cupy
- spacy=2.3
- oauth2client
- asyncssh
- psutil
- ipykernel
- jupyterlab
- gspread
- oauth2client
- pytest
- pip
- pip:
- jupyter-server-proxy
3 changes: 3 additions & 0 deletions gpu_bdb/bdb_tools/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1,4 @@
# Copyright (c) 2020, NVIDIA CORPORATION.

from .rmm_monitor import RMMResourceMonitor
from .dasktasklogger import DaskTaskLogger
59 changes: 8 additions & 51 deletions gpu_bdb/bdb_tools/cluster_startup.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,43 +24,13 @@
from dask.utils import parse_bytes


def get_bsql_config_options():
"""Loads configuration environment variables.
In case it is not previously set, returns a default value for each one.

Returns a dictionary object.
For more info: https://docs.blazingdb.com/docs/config_options
"""
config_options = {}
config_options['JOIN_PARTITION_SIZE_THRESHOLD'] = os.environ.get("JOIN_PARTITION_SIZE_THRESHOLD", 300000000)
config_options['MAX_DATA_LOAD_CONCAT_CACHE_BYTE_SIZE'] = os.environ.get("MAX_DATA_LOAD_CONCAT_CACHE_BYTE_SIZE", 400000000)
config_options['BLAZING_DEVICE_MEM_CONSUMPTION_THRESHOLD'] = os.environ.get("BLAZING_DEVICE_MEM_CONSUMPTION_THRESHOLD", 0.6)
config_options['BLAZ_HOST_MEM_CONSUMPTION_THRESHOLD'] = os.environ.get("BLAZ_HOST_MEM_CONSUMPTION_THRESHOLD", 0.6)
config_options['MAX_KERNEL_RUN_THREADS'] = os.environ.get("MAX_KERNEL_RUN_THREADS", 3)
config_options['TABLE_SCAN_KERNEL_NUM_THREADS'] = os.environ.get("TABLE_SCAN_KERNEL_NUM_THREADS", 1)
config_options['MAX_NUM_ORDER_BY_PARTITIONS_PER_NODE'] = os.environ.get("MAX_NUM_ORDER_BY_PARTITIONS_PER_NODE", 20)
config_options['NUM_BYTES_PER_ORDER_BY_PARTITION'] = os.environ.get("NUM_BYTES_PER_ORDER_BY_PARTITION", 400000000)
config_options['MAX_ORDER_BY_SAMPLES_PER_NODE'] = os.environ.get("MAX_ORDER_BY_SAMPLES_PER_NODE", 10000)
config_options['MAX_SEND_MESSAGE_THREADS'] = os.environ.get("MAX_SEND_MESSAGE_THREADS", 20)
config_options['MEMORY_MONITOR_PERIOD'] = os.environ.get("MEMORY_MONITOR_PERIOD", 50)
config_options['TRANSPORT_BUFFER_BYTE_SIZE'] = os.environ.get("TRANSPORT_BUFFER_BYTE_SIZE", 1048576) # 1 MBs
config_options['TRANSPORT_POOL_NUM_BUFFERS'] = os.environ.get("TRANSPORT_POOL_NUM_BUFFERS", 1000)
config_options['BLAZING_LOGGING_DIRECTORY'] = os.environ.get("BLAZING_LOGGING_DIRECTORY", 'blazing_log')
config_options['BLAZING_CACHE_DIRECTORY'] = os.environ.get("BLAZING_CACHE_DIRECTORY", '/tmp/')
config_options['LOGGING_LEVEL'] = os.environ.get("LOGGING_LEVEL", "trace")
config_options['MAX_JOIN_SCATTER_MEM_OVERHEAD'] = os.environ.get("MAX_JOIN_SCATTER_MEM_OVERHEAD", 500000000)
config_options['PROTOCOL'] = os.environ.get("PROTOCOL", "AUTO")

return config_options


def attach_to_cluster(config, create_blazing_context=False):
def attach_to_cluster(config, create_sql_context=False):
"""Attaches to an existing cluster if available.
By default, tries to attach to a cluster running on localhost:8786 (dask's default).

This is currently hardcoded to assume the dashboard is running on port 8787.

Optionally, this will also create a BlazingContext.
Optionally, this will also create a Dask-SQL Context.
"""
scheduler_file = config.get("scheduler_file_path")
host = config.get("cluster_host")
Expand Down Expand Up @@ -131,19 +101,12 @@ def maybe_create_worker_directories(dask_worker):
config["40GB_workers"] = worker_counts.get("40GB", 0)
config["80GB_workers"] = worker_counts.get("80GB", 0)

bc = None
if create_blazing_context:
from blazingsql import BlazingContext
bc = BlazingContext(
dask_client=client,
pool=os.environ.get("BLAZING_POOL", False),
network_interface=os.environ.get("INTERFACE", "ib0"),
config_options=get_bsql_config_options(),
allocator=os.environ.get("BLAZING_ALLOCATOR_MODE", "existing"),
initial_pool_size=os.environ.get("BLAZING_INITIAL_POOL_SIZE", None)
)
c = None
if create_sql_context:
from dask_sql import Context
c = Context()

return client, bc
return client, c


def worker_count_info(client):
Expand Down Expand Up @@ -173,7 +136,7 @@ def _get_ucx_config():
Get a subset of ucx config variables relevant for benchmarking
"""
relevant_configs = ["infiniband", "nvlink"]
ucx_config = dask.config.get("ucx")
ucx_config = dask.config.get("distributed.comm.ucx")
# Doing this since when relevant configs are not enabled the value is `None` instead of `False`
filtered_ucx_config = {
config: ucx_config.get(config) if ucx_config.get(config) else False
Expand All @@ -196,11 +159,5 @@ def import_query_libs():
"spacy",
]

# optionally include blazingsql
# this is brittle, but it resolves breaking change
# issues as we can control the environment
if os.environ.get("RUNNER_INCLUDE_BSQL"):
library_list.append("blazingsql")

for lib in library_list:
importlib.import_module(lib)
20 changes: 20 additions & 0 deletions gpu_bdb/bdb_tools/dasktasklogger.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
import re
import os
import json
import numpy as np

class DaskTaskLogger():
key_expr=re.compile( '([\w-]+)-([0-9a-f-]{32,36})' )

def __init__(self, client, outputdir='/tmp'):
self._client=client
self._outputdir=outputdir

def mark_begin( self ):
self._client.get_task_stream()

def save_tasks( self, prefix='dask' ):
plotfname=os.path.join(self._outputdir, f"{prefix}_plot.html")
pdata, pfigure = self._client.get_task_stream(plot='save', filename=plotfname)
with open( os.path.join(self._outputdir, f"{prefix}_tasks.json"), 'w') as outf:
json.dump([{k:t[k] for k in filter( lambda x: type(t[x]) != bytes().__class__, t)} for t in pdata],outf)
47 changes: 47 additions & 0 deletions gpu_bdb/bdb_tools/q01_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
#
# Copyright (c) 2019-2020, NVIDIA CORPORATION.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#


from bdb_tools.readers import build_reader

# -------- Q1 -----------
q01_i_category_id_IN = 1, 2, 3
# -- sf1 -> 11 stores, 90k sales in 820k lines
q01_ss_store_sk_IN = 10, 20, 33, 40, 50
q01_viewed_together_count = 50
q01_limit = 100


item_cols = ["i_item_sk", "i_category_id"]
ss_cols = ["ss_item_sk", "ss_store_sk", "ss_ticket_number"]


def read_tables(config, c=None):
table_reader = build_reader(
data_format=config["file_format"],
basepath=config["data_dir"],
split_row_groups=config["split_row_groups"],
)

item_df = table_reader.read("item", relevant_cols=item_cols)
ss_df = table_reader.read("store_sales", relevant_cols=ss_cols)

if c:
c.create_table("item", item_df, persist=False)
c.create_table("store_sales", ss_df, persist=False)

return item_df, ss_df

38 changes: 38 additions & 0 deletions gpu_bdb/bdb_tools/q02_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
#
# Copyright (c) 2019-2022, NVIDIA CORPORATION.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

from bdb_tools.readers import build_reader

q02_item_sk = 10001
q02_limit = 30
q02_session_timeout_inSec = 3600
q02_MAX_ITEMS_PER_BASKET = 5000000


def read_tables(config, c=None):
table_reader = build_reader(
data_format=config["file_format"],
basepath=config["data_dir"],
split_row_groups=config["split_row_groups"],
)
wcs_cols = ["wcs_user_sk", "wcs_item_sk", "wcs_click_date_sk", "wcs_click_time_sk"]
wcs_df = table_reader.read("web_clickstreams", relevant_cols=wcs_cols)

if c:
c.create_table("web_clickstreams", wcs_df, persist=False)

return wcs_df

138 changes: 138 additions & 0 deletions gpu_bdb/bdb_tools/q03_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
#
# Copyright (c) 2019-2022, NVIDIA CORPORATION.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

import cudf

from numba import cuda

from bdb_tools.readers import build_reader

q03_days_in_sec_before_purchase = 864000
q03_views_before_purchase = 5
q03_purchased_item_IN = 10001
q03_purchased_item_category_IN = 2, 3
q03_limit = 100

def read_tables(config, c=None):
table_reader = build_reader(
data_format=config["file_format"],
basepath=config["data_dir"],
split_row_groups=config["split_row_groups"],
)

item_cols = ["i_category_id", "i_item_sk"]
wcs_cols = [
"wcs_user_sk",
"wcs_click_time_sk",
"wcs_click_date_sk",
"wcs_item_sk",
"wcs_sales_sk",
]

item_df = table_reader.read("item", relevant_cols=item_cols)
wcs_df = table_reader.read("web_clickstreams", relevant_cols=wcs_cols)

if c:
c.create_table("web_clickstreams", wcs_df, persist=False)
c.create_table("item", item_df, persist=False)

return item_df


@cuda.jit
def find_items_viewed_before_purchase_kernel(
relevant_idx_col, user_col, timestamp_col, item_col, out_col, N
):
"""
Find the past N items viewed after a relevant purchase was made,
as defined by the configuration of this query.
"""
i = cuda.grid(1)

if i < (relevant_idx_col.size): # boundary guard
# every relevant row gets N rows in the output, so we need to map the indexes
# back into their position in the original array
orig_idx = relevant_idx_col[i]
current_user = user_col[orig_idx]

# look at the previous N clicks (assume sorted descending)
rows_to_check = N
remaining_rows = user_col.size - orig_idx

if remaining_rows <= rows_to_check:
rows_to_check = remaining_rows - 1

for k in range(1, rows_to_check + 1):
if current_user != user_col[orig_idx + k]:
out_col[i * N + k - 1] = 0

# only checking relevant purchases via the relevant_idx_col
elif (timestamp_col[orig_idx + k] <= timestamp_col[orig_idx]) & (
timestamp_col[orig_idx + k]
>= (timestamp_col[orig_idx] - q03_days_in_sec_before_purchase)
):
out_col[i * N + k - 1] = item_col[orig_idx + k]
else:
out_col[i * N + k - 1] = 0


def apply_find_items_viewed(df, item_mappings):

# need to sort descending to ensure that the
# next N rows are the previous N clicks
df = df.sort_values(
by=["wcs_user_sk", "tstamp", "wcs_sales_sk", "wcs_item_sk"],
ascending=[False, False, False, False],
)
df.reset_index(drop=True, inplace=True)
df["relevant_flag"] = (df.wcs_sales_sk != 0) & (
df.wcs_item_sk == q03_purchased_item_IN
)
df["relevant_idx_pos"] = df.index.to_series()
df.reset_index(drop=True, inplace=True)
# only allocate output for the relevant rows
sample = df.loc[df.relevant_flag == True]
sample.reset_index(drop=True, inplace=True)

N = q03_views_before_purchase
size = len(sample)

# we know this can be int32, since it's going to contain item_sks
out_arr = cuda.device_array(size * N, dtype=df["wcs_item_sk"].dtype)

find_items_viewed_before_purchase_kernel.forall(size)(
sample["relevant_idx_pos"],
df["wcs_user_sk"],
df["tstamp"],
df["wcs_item_sk"],
out_arr,
N,
)

result = cudf.DataFrame({"prior_item_viewed": out_arr})

del out_arr
del df
del sample

filtered = result.merge(
item_mappings,
how="inner",
left_on=["prior_item_viewed"],
right_on=["i_item_sk"],
)
return filtered

Loading