In [1]:
%pip install clickhouse_driver pandas -q

Note: you may need to restart the kernel to use updated packages.


In [10]:
from helpers import Client

client = Client(
    host='localhost',
    port='9000',
    user='default',
    password=''
)

LOG_TABLE='`test`.`orders_log`'
CDC_TABLE='`test`.`orders_cdc`'
CDC_MVIEW='`test`.`mview_orders_cdc`'

## Before modifications

In [11]:
o = client.query_dataframe(f"""
SHOW CREATE TABLE {LOG_TABLE};
""")
print(o['statement'][0])

CREATE TABLE test.orders_log
(
    `before.id` Int64,
    `before.date` DateTime,
    `before.price` Int64,
    `before.cancelled` Bool,
    `after.id` Int64,
    `after.date` DateTime,
    `after.price` Int64,
    `after.cancelled` Bool,
    `source.lsn` UInt64,
    `op` String
)
ENGINE = MergeTree
PRIMARY KEY `source.lsn`
ORDER BY `source.lsn`
SETTINGS index_granularity = 8192


In [12]:
o = client.query_dataframe(f"""
SHOW CREATE TABLE {CDC_TABLE};
""")
print(o['statement'][0])

CREATE TABLE test.orders_cdc
(
    `id` Int64,
    `date` DateTime,
    `price` Int64,
    `cancelled` Bool,
    `version` UInt64,
    `deleted` UInt8
)
ENGINE = ReplacingMergeTree(version, deleted)
PRIMARY KEY id
ORDER BY id
SETTINGS index_granularity = 8192


In [13]:
o = client.query_dataframe(f"""
SHOW CREATE TABLE {CDC_MVIEW};
""")
print(o['statement'][0])

CREATE MATERIALIZED VIEW test.mview_orders_cdc TO test.orders_cdc
(
    `id` Int64,
    `date` DateTime,
    `price` Int64,
    `cancelled` Bool,
    `version` UInt64,
    `deleted` UInt8
) AS
SELECT
    if(op = 'd', `before.id`, `after.id`) AS id,
    if(op = 'd', `before.date`, `after.date`) AS date,
    if(op = 'd', `before.price`, `after.price`) AS price,
    if(op = 'd', `before.cancelled`, `after.cancelled`) AS cancelled,
    `source.lsn` AS version,
    if(op = 'd', 1, 0) AS deleted
FROM test.orders_log
WHERE (op = 'c') OR (op = 'u') OR (op = 'd')


In [15]:
client.run(f"""
select *
from {LOG_TABLE}
order by `source.lsn`
limit 5;
""",
skip=False,
verbose=True,
just_output=True,
title="")

client.run(f"""
select *
from {CDC_TABLE}
order by `version`
limit 5;
""",
skip=False,
verbose=True,
just_output=True,
title="")

In [24]:
client.run(f"""
insert into {LOG_TABLE}
(
  `before.id`, `before.date`, `before.price`, `before.cancelled`,
  `after.id`, `after.date`, `after.price`, `after.cancelled`,
  `source.lsn`, op
)
values
(
  1, '2024-02-04 00:00:00', 10, False,
  1, '2024-02-04 00:00:00', 10, False,
  toInt64(now64()), 'c'
);
""",
skip=False,
verbose=True,
just_output=True,
title="")

In [25]:
client.run(f"""
select *
from {LOG_TABLE}
order by `source.lsn`
limit 5;
""",
skip=False,
verbose=True,
just_output=True,
title="")

client.run(f"""
select *
from {CDC_TABLE}
order by `version`
limit 5;
""",
skip=False,
verbose=True,
just_output=True,
title="")

Output:


Unnamed: 0,before_id,before_date,before_price,before_cancelled,after_id,after_date,after_price,after_cancelled,source_lsn,op
0,1,2024-02-04,10,False,1,2024-02-04,10,False,1707228457,c
1,1,2024-02-04,10,False,1,2024-02-04,10,False,1707228627,c


Output:


Unnamed: 0,id,date,price,cancelled,version,deleted
0,1,2024-02-04,10,False,1707228457,0
1,1,2024-02-04,10,False,1707228627,0


# After Modification

In [20]:
o = client.query_dataframe(f"""
SHOW CREATE TABLE {LOG_TABLE};
""")
print(o['statement'][0])

CREATE TABLE test.orders_log
(
    `before.id` Int64,
    `before.date` DateTime,
    `before.price` Int64,
    `before.cancelled` Bool,
    `after.id` Int64,
    `after.date` DateTime,
    `after.price` Int64,
    `after.cancelled` Bool,
    `source.lsn` UInt64,
    `op` String
)
ENGINE = MergeTree
PRIMARY KEY `source.lsn`
ORDER BY `source.lsn`
SETTINGS index_granularity = 8192


In [21]:
o = client.query_dataframe(f"""
SHOW CREATE TABLE {CDC_TABLE};
""")
print(o['statement'][0])

CREATE TABLE test.orders_cdc
(
    `id` Int64,
    `date` DateTime,
    `price` Int64,
    `cancelled` Bool,
    `version` UInt64,
    `deleted` UInt8
)
ENGINE = ReplacingMergeTree(version, deleted)
PRIMARY KEY id
ORDER BY id
SETTINGS index_granularity = 8192


In [22]:
o = client.query_dataframe(f"""
SHOW CREATE TABLE {CDC_MVIEW};
""")
print(o['statement'][0])

CREATE MATERIALIZED VIEW test.mview_orders_cdc TO test.orders_cdc
(
    `id` Int64,
    `date` DateTime,
    `price` Int64,
    `cancelled` Bool,
    `version` UInt64,
    `deleted` UInt8
) AS
SELECT
    if(op = 'd', `before.id`, `after.id`) AS id,
    if(op = 'd', `before.date`, `after.date`) AS date,
    if(op = 'd', `before.price`, `after.price`) AS price,
    if(op = 'd', `before.cancelled`, `after.cancelled`) AS cancelled,
    `source.lsn` AS version,
    if(op = 'd', 1, 0) AS deleted
FROM test.orders_log
WHERE (op = 'c') OR (op = 'u') OR (op = 'd')


In [27]:
client.run(f"""
select *
from {LOG_TABLE}
order by `source.lsn`
limit 5;
""",
skip=False,
verbose=True,
just_output=True,
title="")

client.run(f"""
select *
from {CDC_TABLE}
order by `version`
limit 5;
""",
skip=False,
verbose=True,
just_output=True,
title="")

Output:


Unnamed: 0,before_id,before_date,before_price,before_cancelled,after_id,after_date,after_price,after_cancelled,source_lsn,op,before_note,after_note
0,1,2024-02-04,10,False,1,2024-02-04,10,False,1707228457,c,,
1,1,2024-02-04,10,False,1,2024-02-04,10,False,1707228627,c,,


Output:


Unnamed: 0,id,date,price,cancelled,version,deleted,note
0,1,2024-02-04,10,False,1707228457,0,
1,1,2024-02-04,10,False,1707228627,0,


In [28]:
client.run(f"""
insert into {LOG_TABLE}
(
  `before.id`, `before.date`, `before.price`, `before.cancelled`, `before.note`,
  `after.id`, `after.date`, `after.price`, `after.cancelled`, `after.note`,
  `source.lsn`, op
)
values
(
  2, '2024-02-04 00:01:00', 20, True, 'cool note',
  2, '2024-02-04 00:01:00', 20, True, 'cool note',
  toInt64(now64()), 'c'
);
""",
skip=False,
verbose=True,
just_output=True,
title="")

In [29]:
client.run(f"""
select *
from {LOG_TABLE}
order by `source.lsn`
limit 5;
""",
skip=False,
verbose=True,
just_output=True,
title="")

client.run(f"""
select *
from {CDC_TABLE}
order by `version`
limit 5;
""",
skip=False,
verbose=True,
just_output=True,
title="")

Output:


Unnamed: 0,before_id,before_date,before_price,before_cancelled,after_id,after_date,after_price,after_cancelled,source_lsn,op,before_note,after_note
0,1,2024-02-04 00:00:00,10,False,1,2024-02-04 00:00:00,10,False,1707228457,c,,
1,1,2024-02-04 00:00:00,10,False,1,2024-02-04 00:00:00,10,False,1707228627,c,,
2,2,2024-02-04 00:01:00,20,True,2,2024-02-04 00:01:00,20,True,1707228782,c,cool note,cool note


Output:


Unnamed: 0,id,date,price,cancelled,version,deleted,note
0,1,2024-02-04 00:00:00,10,False,1707228457,0,
1,1,2024-02-04 00:00:00,10,False,1707228627,0,
2,2,2024-02-04 00:01:00,20,True,1707228782,0,cool note
