In [1]:
from clickhouse_driver import Client
import os
import pandas

In [2]:
CLICKHOUSE_HOST=os.getenv("CLICKHOUSE_HOST")
CLICKHOUSE_USERNAME=os.getenv("CLICKHOUSE_USERNAME")
CLICKHOUSE_PASSWORD=os.getenv("CLICKHOUSE_PASSWORD")
client = Client(CLICKHOUSE_HOST, user=CLICKHOUSE_USERNAME, password=CLICKHOUSE_PASSWORD)

# Create distributed table for weatherstation data

In [6]:
client.execute("""CREATE TABLE IF NOT EXISTS `weatherStationObservation_shard` ON CLUSTER `cluster`
(
    `station_id` Int64,
    `measurement_moment` DateTime64(6) CODEC(ZSTD),
    `temperature_ambient` Float64 CODEC(Delta, ZSTD),
    `temperature_ground` Float64 CODEC(Delta, ZSTD),
    `humidity` Float64 CODEC(Delta, ZSTD),
    `pressure` Float64 CODEC(Delta, ZSTD),
    `wind_speed` Float64 CODEC(Delta, ZSTD),
    `precipitation` Float64 CODEC(Delta, ZSTD),
    `irradiance` Float64 CODEC(Delta, ZSTD)
) 
ENGINE = ReplicatedMergeTree(
    '/clickhouse/tables/{shard}/weatherStationObservation_shard',
    '{replica}' )
ORDER BY (station_id, measurement_moment)
PARTITION BY toYYYYMM(measurement_moment) 
""")

client.execute("""
CREATE TABLE IF NOT EXISTS `weatherStationObservation` ON CLUSTER `cluster` AS weatherStationObservation_shard
ENGINE = Distributed(cluster, default, weatherStationObservation_shard, station_id)
""")

[('chi-clickhouse-dataplatform-cluster-2-1', 9000, 0, '', 5, 0),
 ('chi-clickhouse-dataplatform-cluster-1-0', 9000, 0, '', 4, 0),
 ('chi-clickhouse-dataplatform-cluster-0-0', 9000, 0, '', 3, 0),
 ('chi-clickhouse-dataplatform-cluster-1-1', 9000, 0, '', 2, 0),
 ('chi-clickhouse-dataplatform-cluster-2-0', 9000, 0, '', 1, 0),
 ('chi-clickhouse-dataplatform-cluster-0-1', 9000, 0, '', 0, 0)]

# Receive data from Kafka for weatherstation data

In [None]:
KAFKA_BOOTSTRAP_HOST = os.getenv("KAFKA_BOOTSTRAP_HOST")

client.execute(f"""CREATE TABLE IF NOT EXISTS default.weatherStationObservation_kafka ON CLUSTER `cluster`
(
    `station_id` Int64,
    `measurement_moment` DateTime64(6),
    `temperature_ambient` Float64,
    `temperature_ground` Float64,
    `humidity` Float64,
    `pressure` Float64,
    `wind_speed` Float64,
    `precipitation` Float64,
    `irradiance` Float64
)
ENGINE = Kafka('{KAFKA_BOOTSTRAP_HOST}:9092', 'weather-station-observation', 'clickhouse', 'JSONEachRow') 
SETTINGS kafka_thread_per_consumer = 0, kafka_num_consumers = 1;
""")

client.execute("""CREATE MATERIALIZED VIEW IF NOT EXISTS default.weatherStationObservation_kafka_mv ON CLUSTER `cluster` 
TO weatherStationObservation_shard AS
SELECT * FROM default.weatherStationObservation_kafka;
""")

# Connect weatherstation metadata from postgreSQL database

In [6]:
POSTGRES_HOST=os.getenv("POSTGRES_HOST_R")
POSTGRES_USERNAME=os.getenv("POSTGRES_USERNAME")
POSTGRES_PASSWORD=os.getenv("POSTGRES_PASSWORD")

client.execute(f"""
CREATE TABLE IF NOT EXISTS WeatherStation_postgres ON CLUSTER `cluster`
(
    `id` Int64,
    `station_name` String,
    `lat` Float64,
    `lon` Float64
) ENGINE = PostgreSQL('{POSTGRES_HOST}', 'app', 'weather_station', '{POSTGRES_USERNAME}', '{POSTGRES_PASSWORD}');
""")

[('chi-clickhouse-dataplatform-cluster-1-1', 9000, 0, '', 5, 0),
 ('chi-clickhouse-dataplatform-cluster-0-0', 9000, 0, '', 4, 0),
 ('chi-clickhouse-dataplatform-cluster-2-1', 9000, 0, '', 3, 0),
 ('chi-clickhouse-dataplatform-cluster-1-0', 9000, 0, '', 2, 0),
 ('chi-clickhouse-dataplatform-cluster-2-0', 9000, 0, '', 1, 0),
 ('chi-clickhouse-dataplatform-cluster-0-1', 9000, 0, '', 0, 0)]

In [10]:
data, cols = client.execute("SELECT * FROM WeatherStation_postgres", with_column_types=True)
pandas.DataFrame(data, columns=[col_name for col_name, _ in cols])

Unnamed: 0,id,station_name,lat,lon
0,1,Oslo,59.9133,10.7389
1,2,Bergen,60.3894,5.33
2,3,Stavanger,58.97,5.7314
3,4,Sandnes,58.8517,5.7361
4,5,Trondheim,63.4297,10.3933
5,6,Sandvika,59.8833,10.5167
6,7,Kristiansand,58.1472,7.9972
7,8,Drammen,59.7378,10.205
8,9,Asker,59.8331,10.4392
9,10,Tønsberg,59.2981,10.4236
