Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve ClickHouse performance #302

Closed
kamilkisiela opened this issue Aug 18, 2022 · 9 comments · Fixed by #304
Closed

Improve ClickHouse performance #302

kamilkisiela opened this issue Aug 18, 2022 · 9 comments · Fixed by #304
Assignees

Comments

@kamilkisiela
Copy link
Owner

kamilkisiela commented Aug 18, 2022

What do we show currently?

  • total number of requests
  • requests per minute
  • number of unique operations
  • success and failure rates
  • p90, p95, p99 of latency
  • top 5 clients names (with number of requests)
  • top 5 client versions (with number of requests)
  • operations over time (total and failures)
  • RPM over time
  • latency over time
  • latency histogram (super heavy)
  • list of unique operations (with p90, p95, p99, number of requests, failure rate)

What filters do we have?

  • date range
  • operations

What filters do we want to have?

  • client names
  • date range
  • operations (if the number of selected operations is greater than half, let's use NOT IN (not-selected-list)

What else do we want to show?

  • hide histogram
  • dedicated page for a single operation
@kamilkisiela kamilkisiela self-assigned this Aug 18, 2022
@kamilkisiela
Copy link
Owner Author

kamilkisiela commented Aug 19, 2022

CREATE TABLE IF NOT EXISTS default.operations_registry
  (
    target LowCardinality(String),
    hash String,
    name String,
    body String,
+   coordinates Array(String) CODEC(ZSTD(1)), # hash is built out of body, name and coordinates so we can now store coordinates here - reduces storage size
    operation String,
    inserted_at DateTime('UTC') DEFAULT toDateTime(0)
  )
  ENGINE = ReplacingMergeTree(inserted_at)
  PARTITION BY target
  ORDER BY (target, hash)
  SETTINGS index_granularity = 8192
CREATE TABLE IF NOT EXISTS default.operations
  (
    target LowCardinality(String) CODEC(ZSTD(1)),
    timestamp DateTime('UTC'),
    expires_at DateTime('UTC'),
-   schema Array(String) CODEC(ZSTD(1)), # reduce the size (coordinates are now part of operations_registry)
    hash String CODEC(ZSTD(1)),
    ok UInt8 CODEC(ZSTD(1)),
    errors UInt16 CODEC(ZSTD(1)),
    duration UInt64 CODEC(ZSTD(1)),
    client_name LowCardinality(String) CODEC(ZSTD(1)),
    client_version String CODEC(ZSTD(1)),
-   INDEX idx_schema schema TYPE bloom_filter(0.01) GRANULARITY 3, # we don't use it
-   INDEX idx_ok ok TYPE minmax GRANULARITY 1, # we don't use it
-   INDEX idx_errors errors TYPE minmax GRANULARITY 1 # we don't use it
+   INDEX idx_client_name (client_name) TYPE set(0) GRANULARITY 1, # allows to drop some rows when filtering by client_name
+   INDEX idx_hash (hash) TYPE set(0) GRANULARITY 1 # allows to drop some rows when filtering by hash
  )
  ENGINE = MergeTree
  PARTITION BY toYYYYMMDD(timestamp)
- PRIMARY KEY (target, hash, timestamp)
+ PRIMARY KEY (target, hash)
  ORDER BY (target, hash, timestamp)
  TTL expires_at
  SETTINGS index_granularity = 8192

New table (daily aggregation)

CREATE MATERIALIZED VIEW IF NOT EXISTS default.operations_new_daily_mv
  (
      target LowCardinality(String) CODEC(ZSTD(1)),
      timestamp DateTime('UTC'),
      hash String CODEC(ZSTD(1)),
      total UInt32 CODEC(ZSTD(1)),
      total_ok UInt32 CODEC(ZSTD(1)),
      duration_avg AggregateFunction(avg, UInt64) CODEC(ZSTD(1)),
      duration_quantiles AggregateFunction(quantiles(0.75, 0.9, 0.95, 0.99), UInt64) CODEC(ZSTD(1))
  )
  ENGINE = SummingMergeTree
  PARTITION BY toYYYYMMDD(timestamp)
  PRIMARY KEY (target, hash)
  ORDER BY (target, hash, timestamp)
  SETTINGS index_granularity = 8192 AS
  SELECT
    target,
    toStartOfDay(timestamp) AS timestamp,
    hash,
    count() AS total,
    sum(ok) AS total_ok,
    avgState(duration) AS duration_avg,
    quantilesState(0.75, 0.9, 0.95, 0.99)(duration) AS duration_quantiles
  FROM default.operations
  GROUP BY
    target,
    hash,
    timestamp

New table to show total number of requests grouped by client_name and client_version.

CREATE MATERIALIZED VIEW IF NOT EXISTS default.client_daily
  (
    target LowCardinality(String) CODEC(ZSTD(1)),
    client_name String CODEC(ZSTD(1)),
    client_version String CODEC(ZSTD(1)),
    hash String CODEC(ZSTD(1)), 
    timestamp DateTime('UTC'),
    total UInt32 CODEC(ZSTD(1)),
    INDEX idx_hash (hash) TYPE set(0) GRANULARITY 1
  )
  ENGINE = SummingMergeTree
  PARTITION BY toYYYYMMDD(timestamp)
  PRIMARY KEY (target, client_name, client_version)
  ORDER BY (target, client_name, client_version, timestamp)
  SETTINGS index_granularity = 8192
  AS
  SELECT
    target,
    client_name,
    client_version,
    hash,
    toStartOfDay(timestamp) AS timestamp,
    count() AS total
  FROM default.operations
  GROUP BY
    target,
    client_name,
    client_version,
    hash,
    timestamp

@kamilkisiela
Copy link
Owner Author

kamilkisiela commented Aug 19, 2022

Numbers for ~60M operations

SELECT 
  quantilesMerge(0.75, 0.90, 0.95, 0.99)(duration_quantiles) as percentiles
FROM operations_new_hourly_mv
PREWHERE timestamp >= toDateTime('2022-07-17 11:13:00', 'UTC') AND timestamp <= toDateTime('2022-08-17 11:13:00', 'UTC')

0.843 s -> 0.1 s


SELECT 
  multiply(
    toUnixTimestamp(
      toStartOfInterval(timestamp, INTERVAL 8 HOUR, 'UTC'),
    'UTC'),
  1000) as date,
  quantilesMerge(0.75, 0.90, 0.95, 0.99)(duration_quantiles) as percentiles,
  sum(total) as total,
  sum(total_ok) as totalOk
FROM operations_new_hourly_mv
PREWHERE timestamp >= subtractDays(now(), 30) 
GROUP BY date
ORDER BY date

1.003 s -> 0.1 s


SELECT 
  COUNT(*) as total,
  client_name,
  client_version
FROM operations_new
PREWHERE timestamp >= subtractDays(now(), 30) 
GROUP BY client_name, client_version

->

SELECT 
  sum(total) as total,
  client_name,
  client_version
FROM client_daily
PREWHERE timestamp >= subtractDays(now(), 30) 
GROUP BY client_name, client_version

2.1 s -> 0.1 s

@kamilkisiela
Copy link
Owner Author

kamilkisiela commented Aug 19, 2022

The overall storage size should be ~5x less than before.
The bytes per row metric shows an improvement:

  • ~22 -> ~4.5
  • ~860 -> ~455

Storage size and compression comparison (only for two most meaningful tables - 2.5B rows):

Before After
56.83 GB 12.25 GB
7.20 GB 1.24 GB

Sum: 64.03 GB -> 13.49 GB

@kamilkisiela
Copy link
Owner Author

The payload size of each insert statement will be smaller which means we will be able to process more at a time.

@kamilkisiela
Copy link
Owner Author

kamilkisiela commented Aug 19, 2022

The migration plan (long but stable and without any data loss):

  1. Create tables
  2. Insert rows to new tables (operations and operations_registry)
  3. Wait 31 days
  4. Switch to new tables
  5. Wait 31 days
  6. Drop old tables

@kamilkisiela
Copy link
Owner Author

kamilkisiela commented Aug 19, 2022

Numbers for ~2.5B operations

SELECT 
  quantilesMerge(0.75, 0.90, 0.95, 0.99)(duration_quantiles) as percentiles
FROM operations_new_hourly_mv
PREWHERE timestamp >= subtractDays(now(), 30)

4.173 s -> 0.832s


SELECT 
  multiply(
    toUnixTimestamp(
      toStartOfInterval(timestamp, INTERVAL 8 HOUR, 'UTC'),
    'UTC'),
  1000) as date,
  quantilesMerge(0.75, 0.90, 0.95, 0.99)(duration_quantiles) as percentiles,
  sum(total) as total,
  sum(total_ok) as totalOk
FROM operations_new_hourly_mv
PREWHERE timestamp >= subtractDays(now(), 30) 
GROUP BY date
ORDER BY date

4.122 s -> 0.844 s


SELECT 
  COUNT(*) as total,
  client_name,
  client_version
FROM operations_new
PREWHERE timestamp >= subtractDays(now(), 30) 
GROUP BY client_name, client_version

->

SELECT 
  sum(total) as total,
  client_name,
  client_version
FROM client_daily
PREWHERE timestamp >= subtractDays(now(), 30) 
GROUP BY client_name, client_version

56 s -> 0.057 s


SELECT hash, quantilesMerge(0.75, 0.90, 0.95, 0.99)(duration_quantiles) as percentiles
FROM operations_new_hourly_mv WHERE timestamp >= subtractDays(now(), 30) GROUP BY hash

->

SELECT hash, quantilesMerge(0.75, 0.90, 0.95, 0.99)(duration_quantiles) as percentiles
FROM operations_new_v2_daily_mv WHERE timestamp >= subtractDays(now(), 30) GROUP BY hash

4.817 s -> 1.524 s

@kamilkisiela
Copy link
Owner Author

kamilkisiela commented Aug 19, 2022

Implement #193 as part of this task

  1. Add expires_at DateTime('UTC') to all MV tables
  2. TTL expires_at
  3. Include expires_at in the sorting key (order by, at the end)
  4. Add toStartOfHour(expires_at) AS expires_at to selection set and expires_at to group by clause

@kamilkisiela
Copy link
Owner Author

kamilkisiela commented Aug 19, 2022

TODO:

  • write an updated version of OperationsReader
  • write an updated version of the usage-ingestor service

  • the structure of schema_coordinates_daily (with TTL and total)
  • schema coordinates in operations_registry + expires_at
CREATE TABLE IF NOT EXISTS default.operations_registry
  (
    target LowCardinality(String),
    hash String,
    name String,
    body String,
    operation_kind String,
    coordinates Array(String) CODEC(ZSTD(1)),
    total UInt32 CODEC(ZSTD(1)),
    timestamp DateTime('UTC'),
    expires_at DateTime('UTC'),
    INDEX idx_operation_kind (operation_kind) TYPE set(0) GRANULARITY 1
  )
  ENGINE = SummingMergeTree
  PARTITION BY toYYYYMMDD(timestamp)
  PRIMARY KEY (target, hash)
  ORDER BY (target, hash, timestamp, expires_at)
  TTL expires_at
  SETTINGS index_granularity = 8192;
CREATE MATERIALIZED VIEW IF NOT EXISTS default.schema_coordinates_daily
  (
    target LowCardinality(String) CODEC(ZSTD(1)),
    hash String CODEC(ZSTD(1)), 
    timestamp DateTime('UTC'),
    expires_at DateTime('UTC'),
    total UInt32 CODEC(ZSTD(1)),
    coordinate String CODEC(ZSTD(1))
  )
  ENGINE = SummingMergeTree
  PARTITION BY toYYYYMMDD(timestamp)
  PRIMARY KEY (target, coordinate, hash)
  ORDER BY (target, coordinate, hash, timestamp, expires_at)
  TTL expires_at
  SETTINGS index_granularity = 8192
  AS
  SELECT
    target,
    hash,
    toStartOfDay(timestamp) AS timestamp,
    toStartOfDay(expires_at) AS expires_at,
    sum(total) AS total,
    coordinate
  FROM default.operations_registry_test1
  ARRAY JOIN coordinates as coordinate
  GROUP BY
    target,
    coordinate,
    hash,
    timestamp,
    expires_at

An example insert

INSERT INTO operations_registry (
  target,
  hash,
  name,
  body,
  operation_kind,
  coordinates,
  total,
  timestamp,
  expires_at
) VALUES (
  'target1,
  'hash1',
  'name1',
  'body1',
  'query',
  array('coordinate1', 'coordinate2'),
  -- a number of the operations per report, matching this row insert
  10, 
  now(),
  now() + INTERVAL 30 DAY
);

How to get data?

SELECT name, target, body, sum(total) as total, coordinates FROM operations_registry GROUP BY name, body, target, hash, coordinates;

SELECT
    coordinate,
    sum(total) AS total,
    hash
FROM schema_coordinates_daily
GROUP BY
    coordinate,
    hash

@kamilkisiela
Copy link
Owner Author

It helped us reduce the size of storage by 55%

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
Status: Q3 2022 – Jul-Sep
Development

Successfully merging a pull request may close this issue.

1 participant