Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(clickhouse): Init Clickhouse container on startup #4365

Merged
merged 6 commits into from
May 6, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
186 changes: 145 additions & 41 deletions crates/analytics/docs/clickhouse/scripts/api_events.sql
Original file line number Diff line number Diff line change
Expand Up @@ -23,17 +23,16 @@ CREATE TABLE api_events_queue (
`ip_addr` String,
`hs_latency` Nullable(UInt128),
`http_method` LowCardinality(String),
`url_path` String,
`url_path` Nullable(String),
`dispute_id` Nullable(String)
) ENGINE = Kafka SETTINGS kafka_broker_list = 'kafka0:29092',
kafka_topic_list = 'hyperswitch-api-log-events',
kafka_group_name = 'hyper-c1',
kafka_group_name = 'hyper',
kafka_format = 'JSONEachRow',
kafka_handle_error_mode = 'stream';


CREATE TABLE api_events_dist (
`merchant_id` String,
CREATE TABLE api_events (
`merchant_id` LowCardinality(String),
`payment_id` Nullable(String),
`refund_id` Nullable(String),
`payment_method_id` Nullable(String),
Expand All @@ -51,27 +50,85 @@ CREATE TABLE api_events_dist (
`error` Nullable(String),
`authentication_data` Nullable(String),
`status_code` UInt32,
`created_at_timestamp` DateTime64(3),
`created_at` DateTime64(3),
`inserted_at` DateTime DEFAULT now() CODEC(T64, LZ4),
`latency` UInt128,
`user_agent` String,
`ip_addr` String,
`hs_latency` Nullable(UInt128),
`http_method` LowCardinality(String),
`url_path` String,
`url_path` Nullable(String),
`dispute_id` Nullable(String),
`masked_response` Nullable(String),
INDEX flowIndex flow_type TYPE bloom_filter GRANULARITY 1,
INDEX apiIndex api_flow TYPE bloom_filter GRANULARITY 1,
INDEX statusIndex status_code TYPE bloom_filter GRANULARITY 1
) ENGINE = MergeTree PARTITION BY toStartOfDay(created_at)
ORDER BY
(
created_at,
merchant_id,
flow_type,
status_code,
api_flow
) TTL inserted_at + toIntervalMonth(18) SETTINGS index_granularity = 8192;

CREATE TABLE api_events_audit (
`merchant_id` LowCardinality(String),
`payment_id` String,
`refund_id` Nullable(String),
`payment_method_id` Nullable(String),
`payment_method` Nullable(String),
`payment_method_type` Nullable(String),
`customer_id` Nullable(String),
`user_id` Nullable(String),
`connector` Nullable(String),
`request_id` String,
`flow_type` LowCardinality(String),
`api_flow` LowCardinality(String),
`api_auth_type` LowCardinality(String),
`request` String,
`response` Nullable(String),
`error` Nullable(String),
`authentication_data` Nullable(String),
`status_code` UInt32,
`created_at` DateTime64(3),
`inserted_at` DateTime DEFAULT now() CODEC(T64, LZ4),
`latency` UInt128,
`user_agent` String,
`ip_addr` String,
`hs_latency` Nullable(UInt128),
`http_method` LowCardinality(Nullable(String)),
`url_path` Nullable(String),
`dispute_id` Nullable(String),
`masked_response` Nullable(String)
) ENGINE = MergeTree PARTITION BY merchant_id
ORDER BY
(merchant_id, payment_id) TTL inserted_at + toIntervalMonth(18) SETTINGS index_granularity = 8192;

CREATE MATERIALIZED VIEW api_events_parse_errors (
`topic` String,
`partition` Int64,
`offset` Int64,
`raw` String,
`error` String
) ENGINE = MergeTree
PARTITION BY toStartOfDay(created_at)
ORDER BY
(created_at, merchant_id, flow_type, status_code, api_flow)
TTL created_at + toIntervalMonth(6)
;
(topic, partition, offset) SETTINGS index_granularity = 8192 AS
SELECT
_topic AS topic,
_partition AS partition,
_offset AS offset,
_raw_message AS raw,
_error AS error
FROM
api_events_queue
WHERE
length(_error) > 0;

CREATE MATERIALIZED VIEW api_events_mv TO api_events_dist (
CREATE MATERIALIZED VIEW api_events_audit_mv TO api_events_audit (
`merchant_id` String,
`payment_id` Nullable(String),
`payment_id` String,
`refund_id` Nullable(String),
`payment_method_id` Nullable(String),
`payment_method` Nullable(String),
Expand All @@ -88,14 +145,16 @@ CREATE MATERIALIZED VIEW api_events_mv TO api_events_dist (
`error` Nullable(String),
`authentication_data` Nullable(String),
`status_code` UInt32,
`created_at_timestamp` DateTime64(3),
`created_at` DateTime64(3),
`inserted_at` DateTime DEFAULT now() CODEC(T64, LZ4),
`latency` UInt128,
`user_agent` String,
`ip_addr` String,
`hs_latency` Nullable(UInt128),
`http_method` LowCardinality(String),
`url_path` String,
`dispute_id` Nullable(String)
`http_method` LowCardinality(Nullable(String)),
`url_path` Nullable(String),
`dispute_id` Nullable(String),
`masked_response` Nullable(String)
) AS
SELECT
merchant_id,
Expand All @@ -116,37 +175,82 @@ SELECT
error,
authentication_data,
status_code,
created_at_timestamp,
now() as inserted_at,
created_at_timestamp AS created_at,
now() AS inserted_at,
latency,
user_agent,
ip_addr,
hs_latency,
http_method,
url_path,
dispute_id
dispute_id,
response AS masked_response
FROM
api_events_queue
where length(_error) = 0;

WHERE
(length(_error) = 0)
AND (payment_id IS NOT NULL);

CREATE MATERIALIZED VIEW api_events_parse_errors
(
`topic` String,
`partition` Int64,
`offset` Int64,
`raw` String,
`error` String
)
ENGINE = MergeTree
ORDER BY (topic, partition, offset)
SETTINGS index_granularity = 8192 AS
CREATE MATERIALIZED VIEW api_events_mv TO api_events (
`merchant_id` String,
`payment_id` Nullable(String),
`refund_id` Nullable(String),
`payment_method_id` Nullable(String),
`payment_method` Nullable(String),
`payment_method_type` Nullable(String),
`customer_id` Nullable(String),
`user_id` Nullable(String),
`connector` Nullable(String),
`request_id` String,
`flow_type` LowCardinality(String),
`api_flow` LowCardinality(String),
`api_auth_type` LowCardinality(String),
`request` String,
`response` Nullable(String),
`error` Nullable(String),
`authentication_data` Nullable(String),
`status_code` UInt32,
`created_at` DateTime64(3),
`inserted_at` DateTime DEFAULT now() CODEC(T64, LZ4),
`latency` UInt128,
`user_agent` String,
`ip_addr` String,
`hs_latency` Nullable(UInt128),
`http_method` LowCardinality(Nullable(String)),
`url_path` Nullable(String),
`dispute_id` Nullable(String),
`masked_response` Nullable(String)
) AS
SELECT
_topic AS topic,
_partition AS partition,
_offset AS offset,
_raw_message AS raw,
_error AS error
FROM api_events_queue
WHERE length(_error) > 0
;
merchant_id,
payment_id,
refund_id,
payment_method_id,
payment_method,
payment_method_type,
customer_id,
user_id,
connector,
request_id,
flow_type,
api_flow,
api_auth_type,
request,
response,
error,
authentication_data,
status_code,
created_at_timestamp AS created_at,
now() AS inserted_at,
latency,
user_agent,
ip_addr,
hs_latency,
http_method,
url_path,
dispute_id,
response AS masked_response
FROM
api_events_queue
WHERE
length(_error) = 0;