Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf: Fix full table scans for objects, files, tables #1709

Merged
merged 4 commits into from
May 31, 2024
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
108 changes: 101 additions & 7 deletions weave/trace_server/clickhouse_trace_server_batched.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,27 @@
# Clickhouse Trace Server

# A note on query structure:
# There are four major kinds of things that we query:
# - calls,
# - object_versions,
# - tables
# - files
#
# calls are identified by ID.
#
# object_versions, tables, and files are identified by digest. For these kinds of
# things, we dedupe at merge time using Clickhouse's ReplacingMergeTree, but we also
# need to dedupe at query time.
#
# Previously, we did query time deduping in *_deduped VIEWs. But it turns out
# clickhouse doesn't push down the project_id predicate into those views, so we were
# always scanning whole tables.
#
# Now, we've just written the what were views before into this file directly as
# subqueries, and put the project_id predicate in the innermost subquery, which fixes
# the problem.


from collections import defaultdict
import threading
from contextlib import contextmanager
Expand Down Expand Up @@ -307,9 +329,11 @@ def calls_query_stream(
# This order-by clause creation should probably be moved into this function
# and passed down as a processed object. It will follow the same patterns as
# the filters and queries.
order_by=None
if not req.sort_by
else [(s.field, s.direction) for s in req.sort_by],
order_by=(
None
if not req.sort_by
else [(s.field, s.direction) for s in req.sort_by]
),
)

# Yield the marshaled response
Expand Down Expand Up @@ -544,15 +568,38 @@ def _table_query(
conds.extend(conditions)

predicate = _combine_conditions(conds, "AND")
# Must dedupe table rows and tables.
# It might be more efficient to do deduplication of table rows
# in the outer query instead of the right side of the JOIN clause here,
# that hasn't been tested yet.
query = f"""
SELECT tr.digest, tr.val_dump
FROM (
SELECT project_id, row_digest
FROM tables_deduped
FROM (
SELECT *
FROM (
SELECT *,
row_number() OVER (PARTITION BY project_id, digest) AS rn
FROM tables
WHERE project_id = {{project_id:String}}
)
WHERE rn = 1
ORDER BY project_id, digest
)
ARRAY JOIN row_digests AS row_digest
WHERE digest = {{digest:String}}
) AS t
JOIN table_rows_deduped tr ON t.project_id = tr.project_id AND t.row_digest = tr.digest
JOIN (
SELECT project_id, digest, val_dump
FROM (
SELECT *,
row_number() OVER (PARTITION BY project_id, digest) AS rn
FROM table_rows
)
WHERE project_id = {{project_id:String}} AND rn = 1
ORDER BY project_id, digest
) tr ON t.project_id = tr.project_id AND t.row_digest = tr.digest
WHERE {predicate}
"""
if parameters is None:
Expand Down Expand Up @@ -806,8 +853,21 @@ def file_create(self, req: tsi.FileCreateReq) -> tsi.FileCreateRes:
return tsi.FileCreateRes(digest=digest)

def file_content_read(self, req: tsi.FileContentReadReq) -> tsi.FileContentReadRes:
# Must dedupe files
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

still a todo?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, sorry those are comments that are intended to say that we're performing deduping in the query. I could clarify those.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh, no it is just a note as to why the query is so wild

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The comments are updated to be more clear now.

query_result = self.ch_client.query(
"SELECT n_chunks, val_bytes FROM files_deduped WHERE project_id = {project_id:String} AND digest = {digest:String}",
"""
SELECT n_chunks, val_bytes
FROM (
SELECT *
FROM (
SELECT *,
row_number() OVER (PARTITION BY project_id, digest, chunk_index) AS rn
FROM files
)
WHERE rn = 1 AND project_id = {project_id:String} AND digest = {digest:String}
ORDER BY project_id, digest, chunk_index
)
WHERE project_id = {project_id:String} AND digest = {digest:String}""",
parameters={"project_id": req.project_id, "digest": req.digest},
column_formats={"val_bytes": "bytes"},
)
Expand Down Expand Up @@ -1081,6 +1141,7 @@ def _select_objs_query(

if parameters is None:
parameters = {}
# must dedupe object_versions
query_result = self._query_stream(
f"""
SELECT
Expand All @@ -1097,7 +1158,39 @@ def _select_objs_query(
version_index,
version_count,
is_latest
FROM object_versions_deduped
FROM (
SELECT project_id,
object_id,
created_at,
kind,
base_object_class,
refs,
val_dump,
digest,
if (kind = 'op', 1, 0) AS is_op,
row_number() OVER (
PARTITION BY project_id,
kind,
object_id
ORDER BY created_at ASC
) AS _version_index_plus_1,
_version_index_plus_1 - 1 AS version_index,
count(*) OVER (PARTITION BY project_id, kind, object_id) as version_count,
if(_version_index_plus_1 = version_count, 1, 0) AS is_latest
FROM (
SELECT *,
row_number() OVER (
PARTITION BY project_id,
kind,
object_id,
digest
ORDER BY created_at ASC
) AS rn
FROM object_versions
WHERE project_id = {{project_id: String}}
)
WHERE rn = 1
)
WHERE project_id = {{project_id: String}} AND
{conditions_part}
{limit_part}
Expand Down Expand Up @@ -1703,6 +1796,7 @@ def _process_calls_query_to_conditions(
param_builder = param_builder or ParamBuilder()
conditions = []
raw_fields_used = set()

# This is the mongo-style query
def process_operation(operation: tsi_query.Operation) -> str:
cond = None
Expand Down
Loading