# Mock Location Data to ClickHouse

This notebooks gives a good stripped down appreciation on STARLAKE's ETL work. The diagram below shows the high-level diagram of this example:

![ image](diagram.jpg)

We can start by installing all dependencies required to run this notebook:

In [None]:
! pip install -r requirements.txt

## 0. Import all libs

In [None]:
from clickhouse_driver import Client
from confluent_kafka import Producer
from datetime import datetime, timedelta
import random
import time
import json
import requests

## 1. Connect to ClickHouse DB

In [None]:
clickhouse_host = "clickhouse"
clickhouse_port = 9000
clickhouse_user = "default"
clickhouse_pass = "plschangeme"
clickhouse_db   = "testdb"

# Connect to ClickHouse
client_ch = Client(
    host=clickhouse_host,
    port=clickhouse_port,
    user=clickhouse_user,
    password=clickhouse_pass,
    database=clickhouse_db
)
print("Connected to ClickHouse.")

## 2. Create Table in Clickhouse

We can assume all schemas across the sources have already been unified into:


| Column Name | Data Type | Description |
| :--- | :--- | :--- |
| **id** | `UInt32` | Unique identifier. Can take that users can use this unique ID to join with other source specific cols from another table |
| **vesselname** | `String` | Name of the vessel. |
| **latitude** | `Float64` | Vessel's latitude, typically within the range $[-90, 90]$. |
| **longitude** | `Float64` | Vessel's longitude, typically within the range $[-180, 180]$. |
| **speed** | `Float64` | Vessel's speed, often in **knots** or another unit. |
| **course** | `Float64` | Vessel's **Course Over Ground (COG)**, measured in degrees ($0^{\circ}$ to $360^{\circ}$). |
| **captureddatetime** | `UInt64` | The time the data was captured, represented as an **epoch in milliseconds** ($\text{epochms}$) in the UTC+0 timezone. |


In [None]:
# ClickHouse: Drop + Create table
client_ch.execute("DROP TABLE IF EXISTS raw_ais_data_mock")

create_table_ch = """
CREATE TABLE raw_data_mock (
    id                UInt32,
    vesselname        String,
    latitude          Float64,        -- Latitude [-90, 90]
    longitude         Float64,        -- Longitude [-180, 180]
    speed             Float64,        -- Speed in knots or other unit
    course            Float64,        -- Course over ground (degrees)
    captureddatetime  UInt64          -- epochms UTC+0 timezome
) ENGINE = MergeTree()
ORDER BY (vesselname, captureddatetime)

"""

client_ch.execute(create_table_ch)
print("Created ClickHouse ais_data table.")

## 3. Generate Dummy Data Helper function

In [None]:
# Updated Avro Schema
ais_avro_schema = """
{
  "type": "record",
  "name": "AISData",
  "fields": [
    {"name": "id", "type": "int"},
    {"name": "vesselname", "type": "string"},
    {"name": "latitude", "type": "double"},
    {"name": "longitude", "type": "double"},
    {"name": "speed", "type": "double"},
    {"name": "course", "type": "double"},
    {"name": "captureddatetime", "type": "long"}
  ]
}
"""
vessel_names = [
    "The Sovereign Tide",
    "Trident's Reach",
    "The Albatross Crest",
    "The Valiant Star",
    "Kingston Trader",
    "The Meridian Queen",
    "The Iron Duke",
    "MV Apex Voyager",
    "Oceanic Transporter 9",
    "CS Zenith Horizon",
    "Pacific Hauler",
    "Orion Global",
    "Stratton Logistics",
    "The Titanus",
    "The Nomad's Quest",
    "Arctic Tern"
]

# Updated generate_ais_message function
def generate_ais_message(record_id):
    """Generate a single random AIS message as a dictionary with epoch timestamp."""
    vesselname = random.choice(vessel_names)
    latitude = random.uniform(-90, 90)
    longitude = random.uniform(-180, 180)
    speed = random.uniform(0, 30)
    course = random.uniform(0, 359.99)
    now = datetime.now()
    random_time = now - timedelta(days=random.random()*7, seconds=random.randint(0, 86400))
    
    return {
        "id": record_id,
        "vesselname": vesselname,
        "latitude": latitude,
        "longitude": longitude,
        "speed": speed,
        "course": course,
        "captureddatetime": int(random_time.timestamp() * 1000) # Epoch time in milliseconds
    }

from confluent_kafka import Producer
from confluent_kafka.serialization import StringSerializer, SerializationContext, MessageField
from confluent_kafka.schema_registry import SchemaRegistryClient
from confluent_kafka.schema_registry.avro import AvroSerializer
import json
from datetime import datetime, timedelta
import random
import time


# --- 2. Kafka and Schema Registry Configuration ---
kafka_bootstrap_servers = "broker:29092"
schema_registry_url = "http://schema-registry:8081"
kafka_topic = "raw_data_mock"

# Configure Schema Registry Client
schema_registry_conf = {'url': schema_registry_url}
schema_registry_client = SchemaRegistryClient(schema_registry_conf)

# Configure Avro Serializer
# The serializer will use the schema registry to encode messages
avro_serializer = AvroSerializer(
    schema_registry_client=schema_registry_client,
    schema_str=ais_avro_schema
)

# Configure Kafka Producer
producer_conf = {
    'bootstrap.servers': kafka_bootstrap_servers,
    'acks': 'all',
    'compression.type': 'snappy',
}
producer = Producer(producer_conf)

def delivery_report(err, msg):
    """Callback function to report message delivery status."""
    if err is not None:
        print(f'Message delivery failed: {err}')
    else:
        pass
        # print(f'Message delivered to {msg.topic()} [{msg.partition()}] at offset {msg.offset()}')


## 4. Insert into Kafka
Adjust the loop according to write data into Kafka Broker, can adjust the number of messages according.


In [None]:
# --- 4. Main Production Loop ---
total_messages = 1000000
start_time = time.time()

for i in range(total_messages):
    message = generate_ais_message(i + 1)
    
    # Use the Avro serializer to encode the message
    producer.produce(
        topic=kafka_topic,
        value=avro_serializer(message, SerializationContext(kafka_topic, MessageField.VALUE)),
        callback=delivery_report
    )
    
    # Poll to trigger delivery callbacks
    producer.poll(0)

# Wait for any outstanding messages to be delivered
producer.flush()
end_time = time.time()
print(f"Produced {total_messages} Avro messages to Kafka in {end_time - start_time:.2f} seconds.")

## 5. Create Sink Connector to write to ClickHouse Table

Sink Data into ClickHouse via kafka HTTP Sink Connector. Ensure that Kafka Connect have the HTTP drivers. Can read more on [Clickhouse Website](https://clickhouse.com/docs/integrations/kafka/cloud/confluent/http)

In [10]:
import json
import requests

connect_url_base = "http://connect:8083/connectors"
headers = {
    "Content-Type": "application/json"
}


In [None]:
# --- Update Connector for ClickHouse ---
connector_name_ch = "ais-data-ch-sink-connector-new"
url_ch = f"{connect_url_base}/{connector_name_ch}/config"

# Your provided config for ClickHouse
data_ch = {
    "connector.class": "com.clickhouse.kafka.connect.ClickHouseSinkConnector",
    "tasks.max": "1",
    "topics": "raw_data_mock",
    # The dedicated connector uses its own connection properties
    "hostname": "clickhouse",
    "port": "8123",
    "username": "default",
    "password": "plschangeme",
    "database": "testdb",
    # Avro converter is the right choice for this connector
    "value.converter": "io.confluent.connect.avro.AvroConverter",
    "value.converter.schema.registry.url": "http://schema-registry:8081",
    # Avro records contain the schema so these are not needed
    "auto.create": "false",
    "auto.evolve": "false"
}

response_ch = requests.put(url_ch, headers=headers, json=data_ch)
if response_ch.status_code == 201 or response_ch.status_code == 200:
    print("ClickHouse Sink Connector updated successfully!")
else:
    print(f"Failed to update ClickHouse connector: {response_ch.status_code}")
    print(response_ch.text)

## 6. Sample Benchmark Query to count number of records within a bounding box

In [None]:
num_runs = 10

# Query 1: Count all records within a bounding box
query1 = '''
SELECT count(*) FROM raw_data_mock WHERE latitude BETWEEN 0 AND 10 AND longitude BETWEEN 0 AND 10
    AND captureddatetime > 1503475512418
    AND captureddatetime < 1855475512418
'''
time_pg_q1 = 0
time_ch_q1 = 0

# Variables to store the counts for comparison
count_ch = 0

for _ in range(num_runs):
    start = time.time()
    result_ch = client_ch.execute(query1)
    # The result is a list of tuples, so we extract the count
    count_ch = result_ch[0][0]
    time_ch_q1 += time.time() - start

print(f"Average time for ClickHouse: {time_ch_q1/num_runs:.4f} seconds")
print("-" * 50)
print(f"Count for ClickHouse: {count_ch}")