In [1]:
from functools import partial
from pathlib import Path
from pprint import pprint
from time import sleep

In [2]:
import psycopg2
from psycopg2.extras import RealDictCursor

In [3]:
from ingestion import (
    DBNAME,
    HOST,
    PASSWORD,
    PORT,
    USER,
    ingest_data_files,
    reset_db_structures,
)

## Reset DB Structures

In [4]:
input_csvs_small = [Path("/workspaces/data-playground/data/user_events.small.csv")]
input_csvs_large = [
    Path("/workspaces/data-playground/data/user_events.large.part01.csv"),
    Path("/workspaces/data-playground/data/user_events.large.part02.csv"),
]

In [5]:
reset_db_structures()
ingest_data_files(input_csvs_small)

Dropping db 'db_user_events'
Creating db 'db_user_events'
Creating table 't_user_events'
Copying data from /workspaces/data-playground/data/user_events.small.csv to table 't_user_events'
Count of 't_user_events': 885_129


## Utility Functions

In [6]:
pprint = partial(pprint, sort_dicts=False, underscore_numbers=True)

In [7]:
def execute_trans_query(query: str, *, fetch_size: int = 0) -> list[dict]:
    with (
        psycopg2.connect(
            host=HOST,
            port=PORT,
            user=USER,
            password=PASSWORD,
            database=DBNAME,
            cursor_factory=RealDictCursor,
        ) as conn,
        conn.cursor() as cur,
    ):
        cur.execute(query)
        match fetch_size:
            case 0:
                return None
            case 1:
                return dict(cur.fetchone())
        return [dict(res) for res in cur.fetchmany(fetch_size)]

In [8]:
def execute_non_trans_query(query: str, *, fetch_size: int = 0) -> list[dict]:
    conn = psycopg2.connect(host=HOST, user=USER, password=PASSWORD, dbname=DBNAME)
    conn.autocommit = True
    cur = conn.cursor()
    try:
        cur.execute(query)
        match fetch_size:
            case 0:
                return None
            case 1:
                return dict(cur.fetchone())
        return [dict(res) for res in cur.fetchmany(fetch_size)]
    except Exception:
        cur.close()
        conn.close()
        raise
    finally:
        cur.close()
        conn.close()

In [9]:
def print_count(table_name="t_user_events"):
    query = f"SELECT count(1) FROM {table_name}"
    res = execute_trans_query(query, fetch_size=1)
    print(f"Count: {res['count']:_}")

In [10]:
def print_current_hypertables():
    query = """
        SELECT hypertable_name, num_dimensions, num_chunks, compression_enabled
        FROM timescaledb_information.hypertables;"""
    res = execute_trans_query(query, fetch_size=3)
    print("Current hypertables:")
    pprint(res)

In [11]:
def print_chunk_info(table_name="t_user_events", order_by_stmt="", fetch_size=3):
    query = f"""
        SELECT chunk_name, is_compressed, range_start, range_end
        FROM timescaledb_information.chunks
        WHERE hypertable_name = '{table_name}'
        {order_by_stmt};"""
    res = execute_trans_query(query, fetch_size=fetch_size)
    print("Current chunks info:")
    pprint(res)

In [12]:
def print_compression_settings(fetch_size=10):
    query = """
        SELECT *
        FROM timescaledb_information.compression_settings
        WHERE hypertable_name = 't_user_events';"""
    res = execute_trans_query(query, fetch_size=fetch_size)
    print("Compression settings:")
    pprint(res)

## Query Statements

### Setup

In [13]:
query = """
    SELECT extversion
    FROM pg_extension
    WHERE extname = 'timescaledb';"""
version = execute_trans_query(query, fetch_size=1)["extversion"]
print(f"TimescaleDB version: {version}")

TimescaleDB version: 2.18.0


In [14]:
print_current_hypertables()

Current hypertables:
[]


In [15]:
print_chunk_info()

Current chunks info:
[]


In [16]:
query = """
    SELECT create_hypertable(
                't_user_events',
                'event_time',
                chunk_time_interval => INTERVAL '1 day',
                migrate_data => TRUE,
                if_not_exists => TRUE);"""
res = execute_trans_query(query, fetch_size=0)

In [17]:
print_current_hypertables()

Current hypertables:
[{'hypertable_name': 't_user_events',
  'num_dimensions': 1,
  'num_chunks': 158,
  'compression_enabled': False}]


In [18]:
print_chunk_info()

Current chunks info:
[{'chunk_name': '_hyper_1_1_chunk',
  'is_compressed': False,
  'range_start': datetime.datetime(2020, 9, 24, 0, 0, tzinfo=datetime.timezone.utc),
  'range_end': datetime.datetime(2020, 9, 25, 0, 0, tzinfo=datetime.timezone.utc)},
 {'chunk_name': '_hyper_1_2_chunk',
  'is_compressed': False,
  'range_start': datetime.datetime(2020, 9, 25, 0, 0, tzinfo=datetime.timezone.utc),
  'range_end': datetime.datetime(2020, 9, 26, 0, 0, tzinfo=datetime.timezone.utc)},
 {'chunk_name': '_hyper_1_3_chunk',
  'is_compressed': False,
  'range_start': datetime.datetime(2020, 9, 26, 0, 0, tzinfo=datetime.timezone.utc),
  'range_end': datetime.datetime(2020, 9, 27, 0, 0, tzinfo=datetime.timezone.utc)}]


In [19]:
query = """
    CREATE MATERIALIZED VIEW mvw_event_counts
    WITH (timescaledb.continuous) AS
    SELECT
        time_bucket('1 hour', event_time) AS bucket,
        event_type,
        COUNT(*) AS event_count
    FROM t_user_events
    GROUP BY bucket, event_type;"""
res = execute_non_trans_query(query, fetch_size=0)

In [20]:
query = """
    SELECT add_continuous_aggregate_policy(
                'mvw_event_counts',
                start_offset => INTERVAL '100 years',
                end_offset => INTERVAL '1 hour',
                schedule_interval => INTERVAL '1 day');"""
res = execute_trans_query(query, fetch_size=0)

In [21]:
query = """
    SELECT *
    FROM mvw_event_counts
    ORDER BY bucket ASC;"""
res = execute_trans_query(query, fetch_size=6)
pprint(res)

[{'bucket': datetime.datetime(2020, 9, 24, 11, 0, tzinfo=datetime.timezone.utc),
  'event_type': 'view',
  'event_count': 13},
 {'bucket': datetime.datetime(2020, 9, 24, 12, 0, tzinfo=datetime.timezone.utc),
  'event_type': 'purchase',
  'event_count': 18},
 {'bucket': datetime.datetime(2020, 9, 24, 12, 0, tzinfo=datetime.timezone.utc),
  'event_type': 'view',
  'event_count': 238},
 {'bucket': datetime.datetime(2020, 9, 24, 12, 0, tzinfo=datetime.timezone.utc),
  'event_type': 'cart',
  'event_count': 14},
 {'bucket': datetime.datetime(2020, 9, 24, 13, 0, tzinfo=datetime.timezone.utc),
  'event_type': 'purchase',
  'event_count': 10},
 {'bucket': datetime.datetime(2020, 9, 24, 13, 0, tzinfo=datetime.timezone.utc),
  'event_type': 'view',
  'event_count': 255}]


In [22]:
query = """
    EXPLAIN
    SELECT *
    FROM mvw_event_counts
    ORDER BY bucket ASC;"""
res = execute_trans_query(query, fetch_size=5)
pprint(res)

[{'QUERY PLAN': 'Custom Scan (ChunkAppend) on _materialized_hypertable_2  '
                '(cost=0.15..357.33 rows=5270 width=234)'},
 {'QUERY PLAN': '  Order: _materialized_hypertable_2.bucket'},
 {'QUERY PLAN': '  ->  Index Scan Backward using '
                '_hyper_2_175_chunk__materialized_hypertable_2_bucket_idx on '
                '_hyper_2_175_chunk  (cost=0.15..18.00 rows=310 width=234)'},
 {'QUERY PLAN': '  ->  Index Scan Backward using '
                '_hyper_2_170_chunk__materialized_hypertable_2_bucket_idx on '
                '_hyper_2_170_chunk  (cost=0.27..21.42 rows=310 width=234)'},
 {'QUERY PLAN': '  ->  Index Scan Backward using '
                '_hyper_2_161_chunk__materialized_hypertable_2_bucket_idx on '
                '_hyper_2_161_chunk  (cost=0.27..21.42 rows=310 width=234)'}]


In [23]:
query = """
    EXPLAIN
    SELECT *
    FROM mvw_event_counts
    WHERE bucket >= '2020-09-24 11:00:00' AND bucket < '2020-09-24 12:00:00';"""
res = execute_trans_query(query, fetch_size=10)
pprint(res)

[{'QUERY PLAN': 'Bitmap Heap Scan on _hyper_2_175_chunk  (cost=1.27..3.41 '
                'rows=2 width=234)'},
 {'QUERY PLAN': "  Recheck Cond: ((bucket >= '2020-09-24 "
                "11:00:00+00'::timestamp with time zone) AND (bucket < "
                "'2020-09-24 12:00:00+00'::timestamp with time zone))"},
 {'QUERY PLAN': '  ->  Bitmap Index Scan on '
                '_hyper_2_175_chunk__materialized_hypertable_2_bucket_idx  '
                '(cost=0.00..1.27 rows=2 width=0)'},
 {'QUERY PLAN': "        Index Cond: ((bucket >= '2020-09-24 "
                "11:00:00+00'::timestamp with time zone) AND (bucket < "
                "'2020-09-24 12:00:00+00'::timestamp with time zone))"}]


In [24]:
query = """
    SELECT set_chunk_time_interval('t_user_events', INTERVAL '1 hour');"""
res = execute_non_trans_query(query, fetch_size=0)
# NOTE: the new chunk interval applies only to future chunks

In [25]:
print_chunk_info(fetch_size=3)

Current chunks info:
[{'chunk_name': '_hyper_1_1_chunk',
  'is_compressed': False,
  'range_start': datetime.datetime(2020, 9, 24, 0, 0, tzinfo=datetime.timezone.utc),
  'range_end': datetime.datetime(2020, 9, 25, 0, 0, tzinfo=datetime.timezone.utc)},
 {'chunk_name': '_hyper_1_2_chunk',
  'is_compressed': False,
  'range_start': datetime.datetime(2020, 9, 25, 0, 0, tzinfo=datetime.timezone.utc),
  'range_end': datetime.datetime(2020, 9, 26, 0, 0, tzinfo=datetime.timezone.utc)},
 {'chunk_name': '_hyper_1_3_chunk',
  'is_compressed': False,
  'range_start': datetime.datetime(2020, 9, 26, 0, 0, tzinfo=datetime.timezone.utc),
  'range_end': datetime.datetime(2020, 9, 27, 0, 0, tzinfo=datetime.timezone.utc)}]


In [26]:
query = """
    INSERT INTO t_user_events
    SELECT
        event_time - (interval '10 year'),
        event_type,
        product_id,
        category_id,
        category_code,
        brand,
        price,
        user_id,
        user_session
    FROM t_user_events;"""
res = execute_trans_query(query, fetch_size=0)

In [27]:
print_chunk_info(
    table_name="t_user_events",
    order_by_stmt="ORDER BY range_end ASC",
    fetch_size=3,
)

Current chunks info:
[{'chunk_name': '_hyper_1_176_chunk',
  'is_compressed': False,
  'range_start': datetime.datetime(2010, 9, 24, 11, 0, tzinfo=datetime.timezone.utc),
  'range_end': datetime.datetime(2010, 9, 24, 12, 0, tzinfo=datetime.timezone.utc)},
 {'chunk_name': '_hyper_1_177_chunk',
  'is_compressed': False,
  'range_start': datetime.datetime(2010, 9, 24, 12, 0, tzinfo=datetime.timezone.utc),
  'range_end': datetime.datetime(2010, 9, 24, 13, 0, tzinfo=datetime.timezone.utc)},
 {'chunk_name': '_hyper_1_178_chunk',
  'is_compressed': False,
  'range_start': datetime.datetime(2010, 9, 24, 13, 0, tzinfo=datetime.timezone.utc),
  'range_end': datetime.datetime(2010, 9, 24, 14, 0, tzinfo=datetime.timezone.utc)}]


In [28]:
query = """
    SELECT
        event_time,
        event_type,
        category_code,
        user_id
    FROM t_user_events
    WHERE user_id = 1515915625519380411
    AND event_time > NOW() - INTERVAL '10 years';"""
res = execute_trans_query(query, fetch_size=3)
pprint(res)

[{'event_time': datetime.datetime(2020, 9, 24, 11, 57, 26, tzinfo=datetime.timezone.utc),
  'event_type': 'view',
  'category_code': 'computers.components.cooler',
  'user_id': 1_515_915_625_519_380_411},
 {'event_time': datetime.datetime(2020, 9, 24, 12, 15, 11, tzinfo=datetime.timezone.utc),
  'event_type': 'view',
  'category_code': 'computers.components.power_supply',
  'user_id': 1_515_915_625_519_380_411},
 {'event_time': datetime.datetime(2020, 9, 24, 12, 19, 57, tzinfo=datetime.timezone.utc),
  'event_type': 'view',
  'category_code': 'computers.components.cooler',
  'user_id': 1_515_915_625_519_380_411}]


In [29]:
query = """
    SELECT
        brand,
        avg(price) AS avg_price
    FROM t_user_events
    WHERE event_time > NOW() - INTERVAl '10 years'
    GROUP BY brand;"""
res = execute_trans_query(query, fetch_size=5)
pprint(res)

[{'brand': 'pro', 'avg_price': Decimal('33.1026666666666667')},
 {'brand': 'goip', 'avg_price': Decimal('220.6800000000000000')},
 {'brand': 'knipex', 'avg_price': Decimal('54.2400000000000000')},
 {'brand': 'zmi', 'avg_price': Decimal('61.2700000000000000')},
 {'brand': 'hyperx', 'avg_price': Decimal('90.0882915863840719')}]


In [30]:
query = """
    SELECT
        user_id,
        count(*) AS event_count
    FROM t_user_events
    WHERE event_time > NOW() - INTERVAl '10 years'
    GROUP BY user_id
    ORDER BY count(*) DESC;"""
res = execute_trans_query(query, fetch_size=5)
pprint(res)

[{'user_id': 1_515_915_625_554_995_474, 'event_count': 572},
 {'user_id': 1_515_915_625_527_763_086, 'event_count': 424},
 {'user_id': 1_515_915_625_591_251_010, 'event_count': 363},
 {'user_id': 1_515_915_625_591_659_523, 'event_count': 339},
 {'user_id': 1_515_915_625_537_803_839, 'event_count': 329}]


## DB-Breaking Statements

### Setup

In [31]:
query = """
    CREATE TABLE t_user_events_month_chunk (
        event_time TIMESTAMPTZ,
        event_type VARCHAR(100),
        product_id INT,
        category_id BIGINT,
        category_code VARCHAR(100),
        brand VARCHAR(100),
        price NUMERIC,
        user_id BIGINT,
        user_session VARCHAR(100)
    );"""
res = execute_trans_query(query, fetch_size=0)

In [32]:
print_chunk_info(table_name="t_user_events_month_chunk")

Current chunks info:
[]


In [33]:
query = """
    SELECT create_hypertable(
                't_user_events_month_chunk',
                'event_time',
                chunk_time_interval => INTERVAL '1 month'
    );"""
res = execute_trans_query(query, fetch_size=0)

In [34]:
query = """
    INSERT INTO t_user_events_month_chunk
    SELECT * FROM t_user_events;"""
res = execute_trans_query(query, fetch_size=0)

In [35]:
query = """
    SELECT drop_chunks(
            't_user_events_month_chunk',
            older_than => INTERVAL '6 months');"""
res = execute_trans_query(query, fetch_size=0)

In [36]:
sleep(60)
print_count(table_name="t_user_events_month_chunk")

Count: 0


In [37]:
print_chunk_info(table_name="t_user_events_month_chunk")

Current chunks info:
[]


In [38]:
print_compression_settings()

Compression settings:
[]


In [39]:
query = """
    ALTER TABLE t_user_events SET (
        timescaledb.compress,
        timescaledb.compress_orderby = 'event_time DESC',
        timescaledb.compress_segmentby = 'user_id, product_id');"""
res = execute_trans_query(query, fetch_size=0)

In [40]:
query = """
    SELECT add_compression_policy(
        't_user_events',
        INTERVAL '30 days');"""
res = execute_trans_query(query, fetch_size=0)

In [41]:
print_compression_settings()

Compression settings:
[{'hypertable_schema': 'public',
  'hypertable_name': 't_user_events',
  'attname': 'user_id',
  'segmentby_column_index': 1,
  'orderby_column_index': None,
  'orderby_asc': None,
  'orderby_nullsfirst': None},
 {'hypertable_schema': 'public',
  'hypertable_name': 't_user_events',
  'attname': 'product_id',
  'segmentby_column_index': 2,
  'orderby_column_index': None,
  'orderby_asc': None,
  'orderby_nullsfirst': None},
 {'hypertable_schema': 'public',
  'hypertable_name': 't_user_events',
  'attname': 'event_time',
  'segmentby_column_index': None,
  'orderby_column_index': 1,
  'orderby_asc': False,
  'orderby_nullsfirst': True}]


In [42]:
sleep(60)
print_chunk_info(fetch_size=3)

Current chunks info:
[{'chunk_name': '_hyper_1_1_chunk',
  'is_compressed': True,
  'range_start': datetime.datetime(2020, 9, 24, 0, 0, tzinfo=datetime.timezone.utc),
  'range_end': datetime.datetime(2020, 9, 25, 0, 0, tzinfo=datetime.timezone.utc)},
 {'chunk_name': '_hyper_1_2_chunk',
  'is_compressed': True,
  'range_start': datetime.datetime(2020, 9, 25, 0, 0, tzinfo=datetime.timezone.utc),
  'range_end': datetime.datetime(2020, 9, 26, 0, 0, tzinfo=datetime.timezone.utc)},
 {'chunk_name': '_hyper_1_3_chunk',
  'is_compressed': True,
  'range_start': datetime.datetime(2020, 9, 26, 0, 0, tzinfo=datetime.timezone.utc),
  'range_end': datetime.datetime(2020, 9, 27, 0, 0, tzinfo=datetime.timezone.utc)}]


In [43]:
print_count()

Count: 1_770_258


In [44]:
query = """
    SELECT add_retention_policy(
                't_user_events',
                INTERVAL '100 years',
                if_not_exists => TRUE);"""
res = execute_trans_query(query, fetch_size=0)

In [45]:
print_count()

Count: 1_770_258


In [46]:
query = """
    SELECT remove_retention_policy('t_user_events');"""
res = execute_trans_query(query, fetch_size=0)

In [47]:
query = """
    SELECT add_retention_policy(
                't_user_events',
                INTERVAL '1 hour',
                if_not_exists => TRUE);"""
res = execute_trans_query(query, fetch_size=0)

In [48]:
sleep(60)
print_count()

Count: 0
