Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf: Fix full table scans for objects, files, tables #1709

Merged
merged 4 commits into from
May 31, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
110 changes: 103 additions & 7 deletions weave/trace_server/clickhouse_trace_server_batched.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,27 @@
# Clickhouse Trace Server

# A note on query structure:
# There are four major kinds of things that we query:
# - calls,
# - object_versions,
# - tables
# - files
#
# calls are identified by ID.
#
# object_versions, tables, and files are identified by digest. For these kinds of
# things, we dedupe at merge time using Clickhouse's ReplacingMergeTree, but we also
# need to dedupe at query time.
#
# Previously, we did query time deduping in *_deduped VIEWs. But it turns out
# clickhouse doesn't push down the project_id predicate into those views, so we were
# always scanning whole tables.
#
# Now, we've just written the what were views before into this file directly as
# subqueries, and put the project_id predicate in the innermost subquery, which fixes
# the problem.


from collections import defaultdict
import threading
from contextlib import contextmanager
Expand Down Expand Up @@ -307,9 +329,11 @@ def calls_query_stream(
# This order-by clause creation should probably be moved into this function
# and passed down as a processed object. It will follow the same patterns as
# the filters and queries.
order_by=None
if not req.sort_by
else [(s.field, s.direction) for s in req.sort_by],
order_by=(
None
if not req.sort_by
else [(s.field, s.direction) for s in req.sort_by]
),
)

# Yield the marshaled response
Expand Down Expand Up @@ -544,15 +568,39 @@ def _table_query(
conds.extend(conditions)

predicate = _combine_conditions(conds, "AND")
# The subqueries are for deduplication of table rows and tables by digest.
# It might be more efficient to do deduplication of table rows
# in the outer query instead of the right side of the JOIN clause here,
# that hasn't been tested yet.
query = f"""
SELECT tr.digest, tr.val_dump
FROM (
SELECT project_id, row_digest
FROM tables_deduped
FROM (
SELECT *
FROM (
SELECT *,
row_number() OVER (PARTITION BY project_id, digest) AS rn
FROM tables
WHERE project_id = {{project_id:String}} AND digest = {{digest:String}}
)
WHERE rn = 1
ORDER BY project_id, digest
)
ARRAY JOIN row_digests AS row_digest
WHERE digest = {{digest:String}}
) AS t
JOIN table_rows_deduped tr ON t.project_id = tr.project_id AND t.row_digest = tr.digest
JOIN (
SELECT project_id, digest, val_dump
FROM (
SELECT *,
row_number() OVER (PARTITION BY project_id, digest) AS rn
FROM table_rows
WHERE project_id = {{project_id:String}}
)
WHERE rn = 1
ORDER BY project_id, digest
) tr ON t.project_id = tr.project_id AND t.row_digest = tr.digest
WHERE {predicate}
"""
if parameters is None:
Expand Down Expand Up @@ -806,8 +854,22 @@ def file_create(self, req: tsi.FileCreateReq) -> tsi.FileCreateRes:
return tsi.FileCreateRes(digest=digest)

def file_content_read(self, req: tsi.FileContentReadReq) -> tsi.FileContentReadRes:
# The subquery is responsible for deduplication of file chunks by digest
query_result = self.ch_client.query(
"SELECT n_chunks, val_bytes FROM files_deduped WHERE project_id = {project_id:String} AND digest = {digest:String}",
"""
SELECT n_chunks, val_bytes
FROM (
SELECT *
FROM (
SELECT *,
row_number() OVER (PARTITION BY project_id, digest, chunk_index) AS rn
FROM files
WHERE project_id = {project_id:String} AND digest = {digest:String}
)
WHERE rn = 1
ORDER BY project_id, digest, chunk_index
)
WHERE project_id = {project_id:String} AND digest = {digest:String}""",
parameters={"project_id": req.project_id, "digest": req.digest},
column_formats={"val_bytes": "bytes"},
)
Expand Down Expand Up @@ -1081,6 +1143,7 @@ def _select_objs_query(

if parameters is None:
parameters = {}
# The subquery is for deduplication of object versions by digest
query_result = self._query_stream(
f"""
SELECT
Expand All @@ -1097,7 +1160,39 @@ def _select_objs_query(
version_index,
version_count,
is_latest
FROM object_versions_deduped
FROM (
SELECT project_id,
object_id,
created_at,
kind,
base_object_class,
refs,
val_dump,
digest,
if (kind = 'op', 1, 0) AS is_op,
row_number() OVER (
PARTITION BY project_id,
kind,
object_id
ORDER BY created_at ASC
) AS _version_index_plus_1,
_version_index_plus_1 - 1 AS version_index,
count(*) OVER (PARTITION BY project_id, kind, object_id) as version_count,
if(_version_index_plus_1 = version_count, 1, 0) AS is_latest
FROM (
SELECT *,
row_number() OVER (
PARTITION BY project_id,
kind,
object_id,
digest
ORDER BY created_at ASC
) AS rn
FROM object_versions
WHERE project_id = {{project_id: String}}
)
WHERE rn = 1
)
WHERE project_id = {{project_id: String}} AND
{conditions_part}
{limit_part}
Expand Down Expand Up @@ -1703,6 +1798,7 @@ def _process_calls_query_to_conditions(
param_builder = param_builder or ParamBuilder()
conditions = []
raw_fields_used = set()

# This is the mongo-style query
def process_operation(operation: tsi_query.Operation) -> str:
cond = None
Expand Down
Loading