-
Notifications
You must be signed in to change notification settings - Fork 0
[09] Репликация и удаление
Я работаю локально через VirtualBox на Linux CentOS 9. Процессы ClickHouse запускаются в Docker контейнерах.
В данном задании предлагается создать 2 реплики. Фактически нам необходимо повторить пример Replication for fault tolerance из документации ClickHouse:
Таким образом, нам необходимо создать два сервера ClickHouse и три экземпляра ClickHouse Keeper (для кворума).
Данная архитектура уже реализована в примере cluster_1S_2R
из репозитория ClickHouse/examples
.
Для запуска всех процессов я буду использовать уже готовый docker-compose.yaml
из этого примера, с небольшими модификациями (номера портов и т.п.).
docker-compose.yaml
version: '3.8'
services:
clickhouse-01:
image: "clickhouse/clickhouse-server:${CHVER:-latest}"
user: "0:0"
container_name: clickhouse-01
hostname: clickhouse-01
volumes:
- ${PWD}/fs/volumes/clickhouse-01/etc/clickhouse-server/config.d/config.xml:/etc/clickhouse-server/config.d/config.xml:Z
- ${PWD}/fs/volumes/clickhouse-01/etc/clickhouse-server/users.d/users.xml:/etc/clickhouse-server/users.d/users.xml:Z
ports:
- "0.0.0.0:18123:8123"
- "0.0.0.0:19000:9000"
depends_on:
- clickhouse-keeper-01
- clickhouse-keeper-02
- clickhouse-keeper-03
clickhouse-02:
image: "clickhouse/clickhouse-server:${CHVER:-latest}"
user: "0:0"
container_name: clickhouse-02
hostname: clickhouse-02
volumes:
- ${PWD}/fs/volumes/clickhouse-02/etc/clickhouse-server/config.d/config.xml:/etc/clickhouse-server/config.d/config.xml:Z
- ${PWD}/fs/volumes/clickhouse-02/etc/clickhouse-server/users.d/users.xml:/etc/clickhouse-server/users.d/users.xml:Z
ports:
- "0.0.0.0:28123:8123"
- "0.0.0.0:29000:9000"
depends_on:
- clickhouse-keeper-01
- clickhouse-keeper-02
- clickhouse-keeper-03
clickhouse-keeper-01:
image: "clickhouse/clickhouse-keeper:${CHKVER:-latest-alpine}"
user: "0:0"
container_name: clickhouse-keeper-01
hostname: clickhouse-keeper-01
volumes:
- ${PWD}/fs/volumes/clickhouse-keeper-01/etc/clickhouse-keeper/keeper_config.xml:/etc/clickhouse-keeper/keeper_config.xml:Z
ports:
- "0.0.0.0:19181:9181"
clickhouse-keeper-02:
image: "clickhouse/clickhouse-keeper:${CHKVER:-latest-alpine}"
user: "0:0"
container_name: clickhouse-keeper-02
hostname: clickhouse-keeper-02
volumes:
- ${PWD}/fs/volumes/clickhouse-keeper-02/etc/clickhouse-keeper/keeper_config.xml:/etc/clickhouse-keeper/keeper_config.xml:Z
ports:
- "0.0.0.0:29181:9181"
clickhouse-keeper-03:
image: "clickhouse/clickhouse-keeper:${CHKVER:-latest-alpine}"
user: "0:0"
container_name: clickhouse-keeper-03
hostname: clickhouse-keeper-03
volumes:
- ${PWD}/fs/volumes/clickhouse-keeper-03/etc/clickhouse-keeper/keeper_config.xml:/etc/clickhouse-keeper/keeper_config.xml:Z
ports:
- "0.0.0.0:39181:9181"
Запустим все контейнеры с нужными нам процессами с помощью команды docker compose up -d
.
$ docker ps
CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
b481c2055d61 docker.io/clickhouse/clickhouse-keeper:latest-alpine 5 minutes ago Up 5 minutes 0.0.0.0:19181->9181/tcp, 2181/tcp, 10181/tcp, 44444/tcp clickhouse-keeper-01
9e0c5250194e docker.io/clickhouse/clickhouse-keeper:latest-alpine 5 minutes ago Up 5 minutes 0.0.0.0:29181->9181/tcp, 2181/tcp, 10181/tcp, 44444/tcp clickhouse-keeper-02
08ef67411b40 docker.io/clickhouse/clickhouse-keeper:latest-alpine 5 minutes ago Up 5 minutes 0.0.0.0:39181->9181/tcp, 2181/tcp, 10181/tcp, 44444/tcp clickhouse-keeper-03
85e25a04ccd8 docker.io/clickhouse/clickhouse-server:latest 5 minutes ago Up 5 minutes 0.0.0.0:18123->8123/tcp, 0.0.0.0:19000->9000/tcp, 9009/tcp clickhouse-01
a36a2c0c59cf docker.io/clickhouse/clickhouse-server:latest 5 minutes ago Up 5 minutes 0.0.0.0:28123->8123/tcp, 0.0.0.0:29000->9000/tcp, 9009/tcp clickhouse-02
Будем использовать демонстрационный датасет New York Taxi Data. Только внесём одно небольшое изменение: сделаем таблицу партицированной (по месяцу поездки).
Создадим таблицу на первой ноде clickhouse-01.
cluster_1S_2R node 1 :) CREATE DATABASE nyc_taxi;
0 rows in set. Elapsed: 0.010 sec.
cluster_1S_2R node 1 :) CREATE TABLE nyc_taxi.trips_small (
trip_id UInt32,
pickup_datetime DateTime,
dropoff_datetime DateTime,
pickup_longitude Nullable(Float64),
pickup_latitude Nullable(Float64),
dropoff_longitude Nullable(Float64),
dropoff_latitude Nullable(Float64),
passenger_count UInt8,
trip_distance Float32,
fare_amount Float32,
extra Float32,
tip_amount Float32,
tolls_amount Float32,
total_amount Float32,
payment_type Enum('CSH' = 1, 'CRE' = 2, 'NOC' = 3, 'DIS' = 4, 'UNK' = 5),
pickup_ntaname LowCardinality(String),
dropoff_ntaname LowCardinality(String)
)
ENGINE = MergeTree
PARTITION BY toYYYYMM(pickup_datetime)
PRIMARY KEY (pickup_datetime, dropoff_datetime);
0 rows in set. Elapsed: 0.018 sec.
Заполним таблицу данными.
INSERT INTO nyc_taxi.trips_small SELECT
trip_id,
pickup_datetime,
dropoff_datetime,
pickup_longitude,
pickup_latitude,
dropoff_longitude,
dropoff_latitude,
passenger_count,
trip_distance,
fare_amount,
extra,
tip_amount,
tolls_amount,
total_amount,
payment_type,
pickup_ntaname,
dropoff_ntaname
FROM s3('https://datasets-documentation.s3.eu-west-3.amazonaws.com/nyc-taxi/trips_{0..2}.gz', 'TabSeparatedWithNames')
0 rows in set. Elapsed: 31.963 sec. Processed 3.00 million rows, 244.69 MB (93.87 thousand rows/s., 7.66 MB/s.)
Peak memory usage: 430.55 MiB.
Проверим, что данные были успешно загружены и распределены по партициям:
SELECT
database,
`table`,
partition_id,
sum(rows) AS rows
FROM system.parts
WHERE database = 'nyc_taxi'
GROUP BY
database,
`table`,
partition_id
ORDER BY
partition_id ASC,
`table` ASC
┌─database─┬─table───────┬─partition_id─┬────rows─┐
1. │ nyc_taxi │ trips_small │ 201507 │ 578443 │
2. │ nyc_taxi │ trips_small │ 201508 │ 1670635 │
3. │ nyc_taxi │ trips_small │ 201509 │ 751239 │
└──────────┴─────────────┴──────────────┴─────────┘
3 rows in set. Elapsed: 0.005 sec.
Фрагмент конфигурационного файла сервера clickhouse-01 с макросами:
<macros>
<shard>01</shard>
<replica>01</replica>
<cluster>cluster_1S_2R</cluster>
</macros>
Создадим временную таблицу.
CREATE TABLE nyc_taxi.trips_small_replicated AS nyc_taxi.trips_small
ENGINE = ReplicatedMergeTree('/clickhouse/{cluster}/{shard}/{database}/trips_small', '{replica}')
0 rows in set. Elapsed: 0.114 sec.
Подключим партиции из основной таблицы.
ALTER TABLE nyc_taxi.trips_small_replicated
(ATTACH PARTITION '201507' FROM nyc_taxi.trips_small);
ALTER TABLE nyc_taxi.trips_small_replicated
(ATTACH PARTITION '201508' FROM nyc_taxi.trips_small);
ALTER TABLE nyc_taxi.trips_small_replicated
(ATTACH PARTITION '201509' FROM nyc_taxi.trips_small);
Проверим состояние партиций на текущий момент:
SELECT
database,
`table`,
partition_id,
sum(rows) AS rows
FROM system.parts
WHERE database = 'nyc_taxi'
GROUP BY
database,
`table`,
partition_id
ORDER BY
partition_id ASC,
`table` ASC
┌─database─┬─table──────────────────┬─partition_id─┬────rows─┐
1. │ nyc_taxi │ trips_small │ 201507 │ 578443 │
2. │ nyc_taxi │ trips_small_replicated │ 201507 │ 578443 │
3. │ nyc_taxi │ trips_small │ 201508 │ 1670635 │
4. │ nyc_taxi │ trips_small_replicated │ 201508 │ 1670635 │
5. │ nyc_taxi │ trips_small │ 201509 │ 751239 │
6. │ nyc_taxi │ trips_small_replicated │ 201509 │ 751239 │
└──────────┴────────────────────────┴──────────────┴─────────┘
6 rows in set. Elapsed: 0.005 sec.
Переключим имя основной таблицы на её реплицируемый вариант с помощью операции EXCHANGE
.
EXCHANGE TABLES nyc_taxi.trips_small_replicated AND nyc_taxi.trips_small
0 rows in set. Elapsed: 0.004 sec.
Проверим состояние реплицируемых таблиц на текущий момент:
SELECT
database,
`table`,
engine,
is_readonly,
replica_name,
replica_path,
total_replicas,
active_replicas,
queue_size,
replica_is_active
FROM system.replicas \G
Row 1:
──────
database: nyc_taxi
table: trips_small
engine: ReplicatedMergeTree
is_readonly: 0
replica_name: 01
replica_path: /clickhouse/cluster_1S_2R/01/nyc_taxi/trips_small/replicas/01
total_replicas: 1
active_replicas: 1
queue_size: 0
replica_is_active: {'01':1}
1 row in set. Elapsed: 0.005 sec.
Удалим временную таблицу.
cluster_1S_2R node 1 :) DROP TABLE nyc_taxi.trips_small_replicated
0 rows in set. Elapsed: 0.002 sec.
Фрагмент конфигурационного файла сервера clickhouse-02 с макросами:
<macros>
<shard>01</shard>
<replica>02</replica>
<cluster>cluster_1S_2R</cluster>
</macros>
Создадим реплику таблицы nyc_taxi.trips_small на второй ноде clickhouse-02. Сразу создаём таблицу как реплицируемую.
cluster_1S_2R node 2 :) CREATE DATABASE nyc_taxi;
0 rows in set. Elapsed: 0.027 sec.
cluster_1S_2R node 2 :) CREATE TABLE nyc_taxi.trips_small (
trip_id UInt32,
pickup_datetime DateTime,
dropoff_datetime DateTime,
pickup_longitude Nullable(Float64),
pickup_latitude Nullable(Float64),
dropoff_longitude Nullable(Float64),
dropoff_latitude Nullable(Float64),
passenger_count UInt8,
trip_distance Float32,
fare_amount Float32,
extra Float32,
tip_amount Float32,
tolls_amount Float32,
total_amount Float32,
payment_type Enum('CSH' = 1, 'CRE' = 2, 'NOC' = 3, 'DIS' = 4, 'UNK' = 5),
pickup_ntaname LowCardinality(String),
dropoff_ntaname LowCardinality(String)
)
ENGINE = ReplicatedMergeTree('/clickhouse/{cluster}/{shard}/{database}/{table}','{replica}')
PARTITION BY toYYYYMM(pickup_datetime)
PRIMARY KEY (pickup_datetime, dropoff_datetime)
0 rows in set. Elapsed: 0.105 sec.
Проверяем состояние партиций для таблицы на втором сервере:
SELECT
getMacro('replica') AS replica,
database,
`table`,
partition_id,
sum(rows) AS rows
FROM system.parts
WHERE (database = 'nyc_taxi') AND (`table` = 'trips_small')
GROUP BY
database,
`table`,
partition_id
ORDER BY partition_id ASC
┌─replica─┬─database─┬─table───────┬─partition_id─┬────rows─┐
1. │ 02 │ nyc_taxi │ trips_small │ 201507 │ 578443 │
2. │ 02 │ nyc_taxi │ trips_small │ 201508 │ 1670635 │
3. │ 02 │ nyc_taxi │ trips_small │ 201509 │ 751239 │
└─────────┴──────────┴─────────────┴──────────────┴─────────┘
3 rows in set. Elapsed: 0.077 sec.
Как можно видеть, данные были успешно скопированы (реплицированы) на второй сервер с первого.
Проверим состояние реплицируемой таблицы на втором сервере:
SELECT
database,
`table`,
engine,
is_readonly,
replica_name,
replica_path,
total_replicas,
active_replicas,
queue_size,
replica_is_active
FROM system.replicas
Row 1:
──────
database: nyc_taxi
table: trips_small
engine: ReplicatedMergeTree
is_readonly: 0
replica_name: 02
replica_path: /clickhouse/cluster_1S_2R/01/nyc_taxi/trips_small/replicas/02
total_replicas: 2
active_replicas: 2
queue_size: 0
replica_is_active: {'01':1,'02':1}
1 row in set. Elapsed: 0.006 sec.
Можно обратить внимание, что количество реплик (total_replicas) стало равно 2.
С помощью табличной функции remote()
выполним распределённый запрос, чтобы проверить состояние партиций таблицы на обеих репликах:
SELECT
getMacro('replica') AS replica,
database,
`table`,
partition_id,
active,
rows
FROM remote('clickhouse-01:9000,clickhouse-02:9000', system.parts)
WHERE database = 'nyc_taxi'
ORDER BY
replica ASC,
partition_id ASC
┌─replica─┬─database─┬─table───────┬─partition_id─┬─active─┬────rows─┐
1. │ 01 │ nyc_taxi │ trips_small │ 201507 │ 1 │ 578443 │
2. │ 01 │ nyc_taxi │ trips_small │ 201508 │ 1 │ 533510 │
3. │ 01 │ nyc_taxi │ trips_small │ 201508 │ 1 │ 1041612 │
4. │ 01 │ nyc_taxi │ trips_small │ 201508 │ 1 │ 95513 │
5. │ 01 │ nyc_taxi │ trips_small │ 201509 │ 1 │ 70341 │
6. │ 01 │ nyc_taxi │ trips_small │ 201509 │ 1 │ 680898 │
7. │ 02 │ nyc_taxi │ trips_small │ 201507 │ 1 │ 578443 │
8. │ 02 │ nyc_taxi │ trips_small │ 201508 │ 1 │ 533510 │
9. │ 02 │ nyc_taxi │ trips_small │ 201508 │ 1 │ 1041612 │
10. │ 02 │ nyc_taxi │ trips_small │ 201508 │ 1 │ 95513 │
11. │ 02 │ nyc_taxi │ trips_small │ 201509 │ 1 │ 70341 │
12. │ 02 │ nyc_taxi │ trips_small │ 201509 │ 1 │ 680898 │
└─────────┴──────────┴─────────────┴──────────────┴────────┴─────────┘
12 rows in set. Elapsed: 0.027 sec.
Повторим запрос к system.replicas теперь уже на первом сервере:
SELECT
database,
`table`,
engine,
is_readonly,
replica_name,
replica_path,
total_replicas,
active_replicas,
queue_size,
replica_is_active
FROM system.replicas \G
Row 1:
──────
database: nyc_taxi
table: trips_small
engine: ReplicatedMergeTree
is_readonly: 0
replica_name: 01
replica_path: /clickhouse/cluster_1S_2R/01/nyc_taxi/trips_small/replicas/01
total_replicas: 2
active_replicas: 2
queue_size: 0
replica_is_active: {'01':1,'02':1}
1 row in set. Elapsed: 0.006 sec.
Ожидаемо, результат соответствует аналогичному запросу на втором сервере (выше).
Применим TTL 7 дней по колонке pickup_datetime.
ALTER TABLE nyc_taxi.trips_small ON CLUSTER cluster_1S_2R
(MODIFY TTL toStartOfDay(pickup_datetime) + interval 7 days)
┌─host──────────┬─port─┬─status─┬─error─┬─num_hosts_remaining─┬─num_hosts_active─┐
1. │ clickhouse-02 │ 9000 │ 0 │ │ 1 │ 0 │
2. │ clickhouse-01 │ 9000 │ 0 │ │ 0 │ 0 │
└───────────────┴──────┴────────┴───────┴─────────────────────┴──────────────────┘
2 rows in set. Elapsed: 0.833 sec.
Поскольку данные в демонстрационной таблице только за 2015г., естественно, они все сразу были удалены. Подтвердим это, повторив запрос к таблице system.parts на двух нодах:
SELECT
getMacro('replica') AS replica,
database,
`table`,
partition_id,
active,
rows
FROM remote('clickhouse-01:9000,clickhouse-02:9000', system.parts)
WHERE (database = 'nyc_taxi') AND active
ORDER BY
replica ASC,
partition_id ASC
┌─replica─┬─database─┬─table───────┬─partition_id─┬─active─┬─rows─┐
1. │ 01 │ nyc_taxi │ trips_small │ 201507 │ 1 │ 0 │
2. │ 01 │ nyc_taxi │ trips_small │ 201508 │ 1 │ 0 │
3. │ 01 │ nyc_taxi │ trips_small │ 201508 │ 1 │ 0 │
4. │ 01 │ nyc_taxi │ trips_small │ 201508 │ 1 │ 0 │
5. │ 01 │ nyc_taxi │ trips_small │ 201509 │ 1 │ 0 │
6. │ 01 │ nyc_taxi │ trips_small │ 201509 │ 1 │ 0 │
7. │ 02 │ nyc_taxi │ trips_small │ 201507 │ 1 │ 0 │
8. │ 02 │ nyc_taxi │ trips_small │ 201508 │ 1 │ 0 │
9. │ 02 │ nyc_taxi │ trips_small │ 201508 │ 1 │ 0 │
10. │ 02 │ nyc_taxi │ trips_small │ 201508 │ 1 │ 0 │
11. │ 02 │ nyc_taxi │ trips_small │ 201509 │ 1 │ 0 │
12. │ 02 │ nyc_taxi │ trips_small │ 201509 │ 1 │ 0 │
└─────────┴──────────┴─────────────┴──────────────┴────────┴──────┘
12 rows in set. Elapsed: 0.027 sec.
Результат запроса SHOW CREATE TABLE
:
SHOW CREATE TABLE nyc_taxi.trips_small
FORMAT RAW
CREATE TABLE nyc_taxi.trips_small
(
`trip_id` UInt32,
`pickup_datetime` DateTime,
`dropoff_datetime` DateTime,
`pickup_longitude` Nullable(Float64),
`pickup_latitude` Nullable(Float64),
`dropoff_longitude` Nullable(Float64),
`dropoff_latitude` Nullable(Float64),
`passenger_count` UInt8,
`trip_distance` Float32,
`fare_amount` Float32,
`extra` Float32,
`tip_amount` Float32,
`tolls_amount` Float32,
`total_amount` Float32,
`payment_type` Enum8('CSH' = 1, 'CRE' = 2, 'NOC' = 3, 'DIS' = 4, 'UNK' = 5),
`pickup_ntaname` LowCardinality(String),
`dropoff_ntaname` LowCardinality(String)
)
ENGINE = ReplicatedMergeTree('/clickhouse/{cluster}/{shard}/nyc_taxi/trips_small', '{replica}')
PARTITION BY toYYYYMM(pickup_datetime)
PRIMARY KEY (pickup_datetime, dropoff_datetime)
ORDER BY (pickup_datetime, dropoff_datetime)
TTL toStartOfDay(pickup_datetime) + toIntervalDay(7)
SETTINGS index_granularity = 8192
1 row in set. Elapsed: 0.002 sec.