# ClickHouse Warehouse Deep-Dive

. This notebook is intended to be executed from the Dockerized Jupyter service that ships with the project.

. It walks through:
- sanity-checking the runtime environment and network access
- building tabular and vector datasets inside the notebook
- loading those datasets with low-level ClickHouse operations
- validating CRUD flows and similarity search queries

. Prerequisites:
1. Launch the stack `docker compose -f compose/docker-compose.yml up -d`.
2. Populate `.env` with ClickHouse, S3, and embedding model values (run `python/scripts/download_models.py` if models are missing).
3. Execute cells sequentially; each section documents the underlying SQL so you can trace what happens at every step.

In [1]:
"""Ensure project modules are importable, expose repo root, and toggle vector experimental mode."""
import os
import sys
from pathlib import Path

project_root = Path.cwd().resolve()
if not (project_root / "python").exists() and (project_root.parent / "python").exists():
    project_root = project_root.parent

python_src = project_root / "python"
if python_src.exists():
    sys.path.insert(0, str(python_src))
os.environ.setdefault("PYTHONPATH", str(python_src))
os.environ.setdefault("CLICKHOUSE_ENABLE_VECTOR_EXPERIMENTAL_TYPE", "1")
project_root

PosixPath('/home/jovyan/work')

In [2]:
"""Capture a quick snapshot of the runtime so we know what versions are executing."""
import platform
from importlib import metadata

env_snapshot = {
    "python_version": sys.version.split()[0],
    "platform": platform.platform(),
    "clickhouse_driver": metadata.version("clickhouse-driver"),
    "pandas": metadata.version("pandas"),
    "numpy": metadata.version("numpy"),
}
env_snapshot

{'python_version': '3.11.6',
 'platform': 'Linux-6.6.87.2-microsoft-standard-WSL2-x86_64-with-glibc2.35',
 'clickhouse_driver': '0.2.8',
 'pandas': '2.2.3',
 'numpy': '2.1.2'}

In [3]:
from warehouse import config, crud_tabular, crud_vector, crud_s3
from warehouse.clickhouse import client_session
from warehouse.datasets import load_vector_items
from warehouse.embeddings import embed_texts, embedding_dimension

cfg = config.load_config()
cfg

  from .autonotebook import tqdm as notebook_tqdm


AppConfig(clickhouse=ClickHouseSettings(host='clickhouse-server', native_port=9000, http_port=8123, user='default', password='clickhouse', database='default'), s3=S3Settings(endpoint_url='http://seaweedfs:8333', region='us-east-1', access_key='s3admin', secret_key='s3secret', bucket='clickhouse-demo'), models=ModelSettings(primary='BAAI/bge-base-en-v1.5', secondary='Alibaba-NLP/gte-Qwen2-1.5B-instruct', active='BAAI/bge-base-en-v1.5'), paths=PathSettings(assets_dir=PosixPath('/home/jovyan/work/assets'), model_cache_dir=PosixPath('/home/jovyan/work/assets/models'), data_dir=PosixPath('/home/jovyan/work/assets/data')))

In [4]:
"""Confirm network reachability and inspect the active ClickHouse session."""
from pprint import pprint

with client_session(cfg) as client:
    server_version = client.execute("SELECT version() AS version, currentUser(), currentDatabase()")[0]
    settings = client.execute("SELECT name, value FROM system.settings WHERE name IN ('max_threads','async_insert') ORDER BY name")
pprint({"version": server_version, "selected_settings": settings})

{'selected_settings': [('async_insert', '0'), ('max_threads', "'auto(24)'")],
 'version': ('25.8.11.66', 'default', 'default')}


In [5]:
"""Peek at existing tables and their schemas for situational awareness."""
with client_session(cfg) as client:
    existing_tables = client.execute("SHOW TABLES")
    # Querying system.columns gives column metadata; use the raw `type` string for compatibility.
    schema_preview = client.execute("""\
        SELECT
            table,
            type AS column_type,
            default_kind
        FROM system.columns
        WHERE database = currentDatabase()
        ORDER BY table, position
        LIMIT 20
    """)
existing_tables, schema_preview

([('events',), ('item_vectors',), ('s3_events',)],
 [('events', 'UInt32', ''),
  ('events', "DateTime('UTC')", ''),
  ('events', 'UInt32', ''),
  ('events', 'LowCardinality(String)', ''),
  ('events', 'Decimal(10, 2)', ''),
  ('item_vectors', 'UInt32', ''),
  ('item_vectors', 'LowCardinality(String)', ''),
  ('item_vectors', 'Array(Float32)', ''),
  ('s3_events', 'UInt32', ''),
  ('s3_events', "DateTime('UTC')", ''),
  ('s3_events', 'UInt32', ''),
  ('s3_events', 'String', ''),
  ('s3_events', 'Decimal(10, 2)', '')])

## Tabular Event Pipeline

We'll generate a synthetic batch of customer events in-memory, inspect the dataset, and then load it into ClickHouse using raw SQL executed through the project helper `client_session`. This keeps the mechanics visible while reusing the connection plumbing.

In [6]:
"""Generate a reproducible synthetic batch of events."""
import numpy as np
import pandas as pd
from datetime import datetime, timezone, timedelta

rng = np.random.default_rng(7)
num_events = 32
base_time = datetime.now(tz=timezone.utc).replace(microsecond=0)
event_frame = pd.DataFrame({
    "event_id": np.arange(1, num_events + 1, dtype=np.int32),
    "event_time": base_time - pd.to_timedelta(rng.integers(0, 3600, size=num_events), unit="s"),
    "customer_id": rng.integers(1000, 5000, size=num_events, dtype=np.int32),
    "event_type": rng.choice(["view", "cart", "purchase", "wishlist"], size=num_events),
    "amount": np.round(rng.uniform(5, 250, size=num_events), 2),
}).sort_values("event_time").reset_index(drop=True)
event_frame.head()

Unnamed: 0,event_id,event_time,customer_id,event_type,amount
0,27,2025-10-30 11:06:24+00:00,2765,view,37.25
1,1,2025-10-30 11:09:08+00:00,3037,wishlist,129.4
2,13,2025-10-30 11:11:04+00:00,4429,purchase,153.24
3,4,2025-10-30 11:12:00+00:00,4170,view,151.56
4,12,2025-10-30 11:13:25+00:00,1640,view,149.55


In [7]:
"""Important for AI integration - How to prepare data for upload. Prepare the rows exactly as the low-level ClickHouse driver expects them."""
event_rows = [
    (
        int(row.event_id),
        row.event_time.to_pydatetime(),
        int(row.customer_id),
        str(row.event_type),
        float(row.amount),
    )
    for row in event_frame.itertuples(index=False)
]
event_rows[:3]

[(27,
  datetime.datetime(2025, 10, 30, 11, 6, 24, tzinfo=datetime.timezone.utc),
  2765,
  'view',
  37.25),
 (1,
  datetime.datetime(2025, 10, 30, 11, 9, 8, tzinfo=datetime.timezone.utc),
  3037,
  'wishlist',
  129.4),
 (13,
  datetime.datetime(2025, 10, 30, 11, 11, 4, tzinfo=datetime.timezone.utc),
  4429,
  'purchase',
  153.24)]

In [8]:
"""Create the MergeTree table and load the generated batch."""
crud_tabular.ensure_table(config=cfg)
insert_sql = """\
INSERT INTO events (event_id, event_time, customer_id, event_type, amount) VALUES
"""
with client_session(cfg) as client:
    client.execute("TRUNCATE TABLE IF EXISTS events")
    client.execute(insert_sql, event_rows)
len(event_rows)

32

In [9]:
"""Inspect the table contents and derive a quick aggregate for validation."""
with client_session(cfg) as client:
    sample = client.execute("""\
        SELECT event_id, event_time, customer_id, event_type, amount
        FROM events
        ORDER BY event_time DESC
        LIMIT 5
    """)
    totals = client.execute("""\
        SELECT event_type, count() AS events, round(sum(amount), 2) AS revenue
        FROM events
        GROUP BY event_type
        ORDER BY events DESC
    """)
sample, totals

([(14,
   datetime.datetime(2025, 10, 30, 12, 5, 31, tzinfo=<UTC>),
   3450,
   'wishlist',
   Decimal('161.31')),
  (9,
   datetime.datetime(2025, 10, 30, 12, 2, 30, tzinfo=<UTC>),
   2864,
   'purchase',
   Decimal('205')),
  (19,
   datetime.datetime(2025, 10, 30, 11, 58, 41, tzinfo=<UTC>),
   1566,
   'wishlist',
   Decimal('103.61')),
  (17,
   datetime.datetime(2025, 10, 30, 11, 57, 56, tzinfo=<UTC>),
   2779,
   'purchase',
   Decimal('112.88')),
  (8,
   datetime.datetime(2025, 10, 30, 11, 52, 19, tzinfo=<UTC>),
   4955,
   'view',
   Decimal('41.8'))],
 [('view', 10, Decimal('1347.25')),
  ('purchase', 10, Decimal('1438.57')),
  ('wishlist', 8, Decimal('774.85')),
  ('cart', 4, Decimal('642.95'))])

## Embedding Model Setup

If the embedding models are not yet cached under `assets/models`, run the helper script below. It will download the primary and secondary models defined in `.env` and store them for reuse. The step is idempotent and safe to rerun.

In [21]:
"""Download the embedding models referenced by the configuration (idempotent)."""
import subprocess
from textwrap import indent

download = subprocess.run(
    ["python", "python/scripts/download_models.py"],
    cwd=project_root,
    capture_output=True,
    text=True,
    check=False,
 )
stdout = download.stdout.strip() or "<no stdout>"
stderr = download.stderr.strip()
print(indent(stdout, prefix="    "))
if stderr:
    print("stderr:\n" + indent(stderr, prefix="    "))
download.returncode

    [1;36mDownloading BAAI/bge-base-en-v1.[0m[1;36m5[0m -> 
    [35m/home/jovyan/work/assets/models/[0m[95mBAAI__bge-base-en-v1.5[0m
    [?25l
    [2KFetching [90m━[0m[90m━[0m[90m━[0m[90m━[0m[35m━[0m[91m━[0m[91m━[0m[91m━[0m[91m━[0m[91m━[0m[91m━[0m[91m━[0m[35m━[0m[90m━[0m[90m━[0m[90m━[0m[90m━[0m[90m━[0m[90m━[0m[90m━[0m[90m━[0m[90m━[0m[90m━[0m[90m━[0m[35m━[0m[91m━[0m[91m━[0m[91m━[0m[91m━[0m[91m━[0m[91m━[0m[91m━[0m[35m━[0m[90m━[0m[90m━[0m[90m━[0m[90m━[0m[90m━[0m[90m━[0m[90m━[0m  [36m [0m
    [2KFetching [90m━[0m[90m━[0m[90m━[0m[90m━[0m[90m━[0m[35m━[0m[91m━[0m[91m━[0m[91m━[0m[91m━[0m[91m━[0m[91m━[0m[91m━[0m[35m━[0m[90m━[0m[90m━[0m[90m━[0m[90m━[0m[90m━[0m[90m━[0m[90m━[0m[90m━[0m[90m━[0m[90m━[0m[90m━[0m[35m━[0m[91m━[0m[91m━[0m[91m━[0m[91m━[0m[91m━[0m[91m━[0m[91m━[0m[35m━[0m[90m━[0m[90m━[0m[90m━[0m[90m━[0m[90m━[0m[90m━[0m 

0

## Vector Embedding Pipeline

This section derives embeddings on the fly using the cached Hugging Face model, writes them to disk for reuse, and persists them inside ClickHouse along with an HNSW index.

In [10]:
"""Generate embeddings and persist them as JSONL for reproducibility."""
import json
from pathlib import Path
from warehouse.datasets import VECTOR_DATASET_FILENAME, VectorRecord

dummy_items = [
    {"category": "electronics", "text": "Wireless noise-cancelling headphones with 40-hour battery life."},
    {"category": "apparel", "text": "Breathable running shoes designed for marathon training on city streets."},
    {"category": "home", "text": "Smart thermostat that learns household schedules to optimize heating and cooling."},
    {"category": "books", "text": "A science-fiction novel following explorers establishing the first colony on Mars."},
    {"category": "beauty", "text": "Vitamin C face serum targeting uneven skin tone and early-aging signs."},
]
texts = [item["text"] for item in dummy_items]
vectors = embed_texts(texts, config=cfg)
vector_records = [
    VectorRecord(item_id=index, category=item["category"], vector=vector, text=item["text"])
    for index, (item, vector) in enumerate(zip(dummy_items, vectors), start=1)
 ]
target_path = cfg.paths.data_dir / VECTOR_DATASET_FILENAME
target_path.parent.mkdir(parents=True, exist_ok=True)
with target_path.open("w", encoding="utf-8") as fh:
    for record in vector_records:
        fh.write(json.dumps({
            "item_id": record.item_id,
            "category": record.category,
            "text": record.text,
            "vector": record.vector,
            "model": cfg.models.active,
        }, ensure_ascii=False) + "\n")
target_path, vector_records[0].vector[:5]

(PosixPath('/home/jovyan/work/assets/data/vector_items.jsonl'),
 [0.05829149857163429,
  0.01668601855635643,
  0.0029277256689965725,
  -0.0031418483704328537,
  0.048713017255067825])

In [11]:
"""Load the embeddings into ClickHouse and build the HNSW index."""
crud_vector.ensure_table(config=cfg, records=vector_records)
inserted = crud_vector.load_sample_vectors(config=cfg, records=vector_records)
inserted

5

In [22]:
import pandas as pd
from typing import List

vector_catalog_df = pd.DataFrame(
    [
        {"item_id": rec.item_id, "category": rec.category, "text": rec.text}
        for rec in vector_records
    ]
 )

def search_catalog_by_phrase(phrase: str, *, limit: int = 3) -> pd.DataFrame:
    """Embed a custom phrase, query the ANN index, and return contextual matches."""
    cleaned = phrase.strip()
    if not cleaned:
        raise ValueError("Provide a non-empty phrase to search.")
    query_vector = list(embed_texts([cleaned], config=cfg)[0])
    matches = list(crud_vector.similarity_search(query_vector, limit=limit, config=cfg))
    results = pd.DataFrame(matches, columns=["item_id", "category", "distance"])
    results = results.merge(vector_catalog_df, on=["item_id", "category"], how="left")
    results.insert(0, "input_phrase", cleaned)
    return results[["input_phrase", "item_id", "category", "text", "distance"]]

TR_USE_CASE_PHRASES: List[str] = [
    "headphones",
    "thermostat",
    "exploration",
    "novel"
]

# Example: uncomment to try a custom phrase on demand
# search_catalog_by_phrase("Wireless earbuds with adaptive noise cancellation", limit=3)

In [23]:
"""Evaluate similarity matches for bundled TR use-case phrases (optional)."""
tr_eval_results = pd.concat(
    [search_catalog_by_phrase(phrase, limit=3) for phrase in TR_USE_CASE_PHRASES],
    ignore_index=True,
)
tr_eval_results

Unnamed: 0,input_phrase,item_id,category,text,distance
0,headphones,1,electronics,Wireless noise-cancelling headphones with 40-h...,0.266493
1,headphones,2,apparel,Breathable running shoes designed for marathon...,0.466185
2,headphones,5,beauty,Vitamin C face serum targeting uneven skin ton...,0.563068
3,thermostat,3,home,Smart thermostat that learns household schedul...,0.237356
4,thermostat,1,electronics,Wireless noise-cancelling headphones with 40-h...,0.513739
5,thermostat,5,beauty,Vitamin C face serum targeting uneven skin ton...,0.563206
6,exploration,4,books,A science-fiction novel following explorers es...,0.43478
7,exploration,3,home,Smart thermostat that learns household schedul...,0.495259
8,exploration,2,apparel,Breathable running shoes designed for marathon...,0.506289
9,novel,4,books,A science-fiction novel following explorers es...,0.39476


## S3 Table Function Bridge (Optional)

Execute this section once your `.env` values point at a reachable S3-compatible endpoint with staged sample objects. It demonstrates mapping an external object store into ClickHouse via table functions.

In [13]:
"""Upload the sample CSV into the configured S3 bucket (safe to rerun)."""
uploaded_key = crud_s3.stage_sample_dataset(config=cfg)
uploaded_key

'datasets/tabular_events.csv'

In [14]:
"""List objects via the ClickHouse S3 table function to verify remote access."""
crud_s3.create_s3_mapped_table(config=cfg)
s3_rows = crud_s3.query_s3_dataset(config=cfg)
list(s3_rows)

[(1, '2025-01-01T10:00:00Z', 101, 'purchase', 249.99),
 (2, '2025-01-02T14:23:11Z', 102, 'purchase', 129.5),
 (3, '2025-01-03T09:54:35Z', 101, 'refund', -49.99),
 (4, '2025-01-05T18:05:02Z', 103, 'purchase', 349.0),
 (5, '2025-01-07T12:41:17Z', 104, 'purchase', 89.0)]