Skip to content

Commit

Permalink
perf: Fix full table scans for objects, files, tables (#1709)
Browse files Browse the repository at this point in the history
* perf: Fix full table scans for objects, files, tables

* Comments and query fix

* Move predicate to innermost

* One more inner predicate
  • Loading branch information
shawnlewis committed May 31, 2024
1 parent ab219e8 commit 0e0616b
Showing 1 changed file with 103 additions and 7 deletions.
110 changes: 103 additions & 7 deletions weave/trace_server/clickhouse_trace_server_batched.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,27 @@
# Clickhouse Trace Server

# A note on query structure:
# There are four major kinds of things that we query:
# - calls,
# - object_versions,
# - tables
# - files
#
# calls are identified by ID.
#
# object_versions, tables, and files are identified by digest. For these kinds of
# things, we dedupe at merge time using Clickhouse's ReplacingMergeTree, but we also
# need to dedupe at query time.
#
# Previously, we did query time deduping in *_deduped VIEWs. But it turns out
# clickhouse doesn't push down the project_id predicate into those views, so we were
# always scanning whole tables.
#
# Now, we've just written the what were views before into this file directly as
# subqueries, and put the project_id predicate in the innermost subquery, which fixes
# the problem.


from collections import defaultdict
import threading
from contextlib import contextmanager
Expand Down Expand Up @@ -307,9 +329,11 @@ def calls_query_stream(
# This order-by clause creation should probably be moved into this function
# and passed down as a processed object. It will follow the same patterns as
# the filters and queries.
order_by=None
if not req.sort_by
else [(s.field, s.direction) for s in req.sort_by],
order_by=(
None
if not req.sort_by
else [(s.field, s.direction) for s in req.sort_by]
),
)

# Yield the marshaled response
Expand Down Expand Up @@ -544,15 +568,39 @@ def _table_query(
conds.extend(conditions)

predicate = _combine_conditions(conds, "AND")
# The subqueries are for deduplication of table rows and tables by digest.
# It might be more efficient to do deduplication of table rows
# in the outer query instead of the right side of the JOIN clause here,
# that hasn't been tested yet.
query = f"""
SELECT tr.digest, tr.val_dump
FROM (
SELECT project_id, row_digest
FROM tables_deduped
FROM (
SELECT *
FROM (
SELECT *,
row_number() OVER (PARTITION BY project_id, digest) AS rn
FROM tables
WHERE project_id = {{project_id:String}} AND digest = {{digest:String}}
)
WHERE rn = 1
ORDER BY project_id, digest
)
ARRAY JOIN row_digests AS row_digest
WHERE digest = {{digest:String}}
) AS t
JOIN table_rows_deduped tr ON t.project_id = tr.project_id AND t.row_digest = tr.digest
JOIN (
SELECT project_id, digest, val_dump
FROM (
SELECT *,
row_number() OVER (PARTITION BY project_id, digest) AS rn
FROM table_rows
WHERE project_id = {{project_id:String}}
)
WHERE rn = 1
ORDER BY project_id, digest
) tr ON t.project_id = tr.project_id AND t.row_digest = tr.digest
WHERE {predicate}
"""
if parameters is None:
Expand Down Expand Up @@ -806,8 +854,22 @@ def file_create(self, req: tsi.FileCreateReq) -> tsi.FileCreateRes:
return tsi.FileCreateRes(digest=digest)

def file_content_read(self, req: tsi.FileContentReadReq) -> tsi.FileContentReadRes:
# The subquery is responsible for deduplication of file chunks by digest
query_result = self.ch_client.query(
"SELECT n_chunks, val_bytes FROM files_deduped WHERE project_id = {project_id:String} AND digest = {digest:String}",
"""
SELECT n_chunks, val_bytes
FROM (
SELECT *
FROM (
SELECT *,
row_number() OVER (PARTITION BY project_id, digest, chunk_index) AS rn
FROM files
WHERE project_id = {project_id:String} AND digest = {digest:String}
)
WHERE rn = 1
ORDER BY project_id, digest, chunk_index
)
WHERE project_id = {project_id:String} AND digest = {digest:String}""",
parameters={"project_id": req.project_id, "digest": req.digest},
column_formats={"val_bytes": "bytes"},
)
Expand Down Expand Up @@ -1081,6 +1143,7 @@ def _select_objs_query(

if parameters is None:
parameters = {}
# The subquery is for deduplication of object versions by digest
query_result = self._query_stream(
f"""
SELECT
Expand All @@ -1097,7 +1160,39 @@ def _select_objs_query(
version_index,
version_count,
is_latest
FROM object_versions_deduped
FROM (
SELECT project_id,
object_id,
created_at,
kind,
base_object_class,
refs,
val_dump,
digest,
if (kind = 'op', 1, 0) AS is_op,
row_number() OVER (
PARTITION BY project_id,
kind,
object_id
ORDER BY created_at ASC
) AS _version_index_plus_1,
_version_index_plus_1 - 1 AS version_index,
count(*) OVER (PARTITION BY project_id, kind, object_id) as version_count,
if(_version_index_plus_1 = version_count, 1, 0) AS is_latest
FROM (
SELECT *,
row_number() OVER (
PARTITION BY project_id,
kind,
object_id,
digest
ORDER BY created_at ASC
) AS rn
FROM object_versions
WHERE project_id = {{project_id: String}}
)
WHERE rn = 1
)
WHERE project_id = {{project_id: String}} AND
{conditions_part}
{limit_part}
Expand Down Expand Up @@ -1703,6 +1798,7 @@ def _process_calls_query_to_conditions(
param_builder = param_builder or ParamBuilder()
conditions = []
raw_fields_used = set()

# This is the mongo-style query
def process_operation(operation: tsi_query.Operation) -> str:
cond = None
Expand Down

0 comments on commit 0e0616b

Please sign in to comment.