Start Clickhouse in the `clickhouse` directory with `docker run -d --name clickhouse-server --ulimit nofile=262144:262144 -p 8123:8123 -p 9000:9000 --volume=storage:/var/lib/clickhouse yandex/clickhouse-server`

In [None]:
!pip install clickhouse-driver[lz4,zstd]

In [15]:
from clickhouse_driver import Client
client = Client(host='localhost', port=9001)
client.execute('SHOW DATABASES')

[('default',), ('ledger',), ('system',)]

In [17]:
client.execute('SELECT * FROM system.numbers LIMIT 5')

[]

Now let's read some stuff from Kafka!

In [2]:
topic = 'data'

In [7]:
grp=0
# as I can't be bothered to find out how to change from docker-compose the clickhouse config that
# asks it to read from the start of a topic, let's first setup the connection, then push data to Kafka

print(grp)
client.execute("DROP TABLE if exists queue;")
client.execute("DROP TABLE if exists kafka_view;")
client.execute("DROP TABLE if exists my_table;")

client.execute(f"""  
CREATE TABLE queue (
`Category1` String,
`Category2` String,
`Category3` String,
`Amount` Float64
) ENGINE = Kafka('kafka:9092', '{topic}', 'group{grp}', 'JSONEachRow');
""")

# https://altinity.com/blog/2019/3/27/low-cardinality
client.execute("""
 CREATE TABLE my_table (
   `Category1` LowCardinality(String),
    `Category2` LowCardinality(String),
    `Category3` LowCardinality(String),
    `Amount` Float64
  ) ENGINE = MergeTree() order by `Category1`;
""")

client.execute("""CREATE MATERIALIZED VIEW kafka_view TO my_table AS SELECT * FROM queue;""" )



0


[]

In [9]:
import json
import datetime
import random

from confluent_kafka import Producer

num_records = s3e8

producer = Producer({"bootstrap.servers": "kafka:9092"})

def delivery_report(err, decoded_message, original_message):
    if err is not None:
        print(err)

values = [f'value{x}' for x in range(100)]
subvalues = [f'subvalue{x}' for x in range(10)]
mapping = {v: random.choice(subvalues) for v in values}

def new_message():
    return json.dumps({'Category1': random.choice(values),
            'Category2': random.choice(values),
            'Category3': random.choice(subvalues),
            'Amount': random.uniform(0,2)}
           ).encode()
          
          
def confluent_producer_async():
    for i in range(int(num_records)):
        msg = new_message()
        producer.produce(
            topic,
            msg,
            callback=lambda err, decoded_message, original_message=msg: delivery_report(  # noqa
                err, decoded_message, original_message
            ),
        )
        if i % 100000 == 0:
            producer.flush()
            if i % 1000000 == 0:
                print(i, datetime.datetime.now())
        
confluent_producer_async()

print('done!', datetime.datetime.now())

0 2021-03-24 09:28:24.507622
done!


In [10]:
# Use this to check that messages actually exist
from kafka import KafkaConsumer
consumer = KafkaConsumer(topic, 
                         bootstrap_servers="kafka:9092", 
                         auto_offset_reset='earliest', 
                         enable_auto_commit=False,
                        consumer_timeout_ms=1000)
for message in consumer:
    print (message)
    break

ConsumerRecord(topic='data', partition=0, offset=0, timestamp=1616578031783, timestamp_type=0, key=None, value=b'{"Category1": "value7", "Category2": "value98", "Category3": "subvalue1", "Amount": 0.8826342291239988}', headers=[], checksum=None, serialized_key_size=-1, serialized_value_size=103, serialized_header_size=-1)


In [11]:
client.execute("SELECT count(*) FROM my_table")

[(2000000,)]

In [12]:
%%time
client.execute("SELECT `Category1`, sum(`Amount`) from my_table where `Category2` = 'value1' group by `Category1`")

Wall time: 136 ms


[('value14', 207.5905536205164),
 ('value27', 224.91478197638577),
 ('value29', 222.979671888215),
 ('value26', 188.14472108291187),
 ('value83', 179.48815359037786),
 ('value43', 220.36624164422568),
 ('value9', 194.05700901875622),
 ('value19', 191.70593048532135),
 ('value76', 233.9466555345544),
 ('value89', 164.92161046257155),
 ('value88', 175.68549156671466),
 ('value98', 193.3670525014535),
 ('value21', 175.0714185435271),
 ('value95', 204.50444367258314),
 ('value17', 174.5596319350154),
 ('value60', 214.47154609003974),
 ('value46', 143.094695712548),
 ('value94', 225.3006863657615),
 ('value31', 185.91040229906267),
 ('value59', 219.96830339311705),
 ('value7', 189.9239971849509),
 ('value75', 221.532429741257),
 ('value92', 182.22757333713122),
 ('value97', 186.78013156879751),
 ('value24', 231.33347125818065),
 ('value56', 194.00550865134187),
 ('value38', 176.38935144709177),
 ('value66', 164.28733545971303),
 ('value34', 194.2675087739238),
 ('value10', 206.0664286878654

Now let's create a lookup dict that pretends it's a table, and use it for 'metadata' lookups

In [13]:
# -- prepare the dict

client.execute('drop table if exists mapping_table')
client.execute('drop dictionary if exists mapping_dict')

client.execute('''
CREATE TABLE mapping_table
(
    `Category1` String, 
    `SubCategory1` String
) 
ENGINE = Log;
''')
inner_sql = ','.join([f"('{key}', '{value}')" for key, value in mapping.items()])
insert_sql = f"INSERT INTO mapping_table VALUES {inner_sql}"
client.execute(insert_sql)

client.execute('''
CREATE DICTIONARY if not exists mapping_dict
(
    `Category1` String, 
    `SubCategory1` String
)
PRIMARY KEY `Category1`
SOURCE(CLICKHOUSE(HOST 'localhost' PORT 9000 USER 'default' TABLE 'mapping_table' PASSWORD '' DB 'default'))
LIFETIME(MIN 1 MAX 10)
LAYOUT(COMPLEX_KEY_HASHED());
''')

# --- you can use it 'standalone'

client.execute('''select dictGet('default.mapping_dict', 'SubCategory1', tuple('value1'));''')


[('subvalue8',)]

In [14]:
%%time
client.execute("""SELECT `SubCategory1`, sum(`Amount`) 
                    from my_table as t
                    inner join mapping_dict as d 
                    on t.`Category1` = d.`Category1`
                    where `Category2` = 'value1' group by `SubCategory1`""")

Wall time: 139 ms


[('subvalue9', 1765.395300943433),
 ('subvalue8', 1795.6587273186844),
 ('subvalue1', 2796.5558449684017),
 ('subvalue6', 2107.4622153452915),
 ('subvalue7', 1807.8253260336737),
 ('subvalue0', 1593.4361883763227),
 ('subvalue4', 1822.801146415727),
 ('subvalue3', 1682.53974685345),
 ('subvalue2', 2204.19403358955),
 ('subvalue5', 2429.0012947754717)]