In [1]:
%pip install psycopg2-binary pandas pyarrow minio numpy

Note: you may need to restart the kernel to use updated packages.



[notice] A new release of pip is available: 25.2 -> 25.3
[notice] To update, run: python.exe -m pip install --upgrade pip


In [2]:
import psycopg2
import time
import sqlite3
import random
from minio import Minio

In [18]:
sample_conn = scon = sqlite3.connect('sample.sqlite')
src_conn = psycopg2.connect(host='localhost', port=5433, user='postgres', password='postgres', dbname='postgres')
dest_conn = psycopg2.connect(host='localhost', port=5444, user='postgres', password='postgres', dbname='analytics')

minio_client = Minio(
    'localhost:9000',
    access_key='minioadmin',
    secret_key='minioadmin',
    secure=False
)

print('Sample_conn dsn:', sample_conn)
print('src_conn dsn:', src_conn.dsn)
print('dest_conn dsn:', dest_conn.dsn)
print('Minio client:', minio_client)

Sample_conn dsn: <sqlite3.Connection object at 0x0000025618C33970>
src_conn dsn: user=postgres password=xxx dbname=postgres host=localhost port=5433
dest_conn dsn: user=postgres password=xxx dbname=analytics host=localhost port=5444
Minio client: <minio.api.Minio object at 0x0000025618BC75C0>


In [19]:
# Pick a random element from sample.sqlite and insert it into postgres_src
print('=== INSERT TEST ===')
sample_cur = scon.cursor()

# get count and a random offset
sample_cur.execute('SELECT COUNT(*) FROM meme_coins')
row_count = sample_cur.fetchone()[0]
if row_count == 0:
    raise RuntimeError('sample.sqlite has no rows in meme_coins')
random_offset = random.randint(0, row_count - 1)

sample_cur.execute('SELECT * FROM meme_coins LIMIT 1 OFFSET ?', (random_offset,))
random_element = sample_cur.fetchone()
print(f'Source element: {random_element[0]}')
sample_cur.close()

# Insert into source Postgres
with src_conn.cursor() as cur:
    cur.execute('INSERT INTO meme_coins VALUES (%s, %s, %s, %s, %s, %s, %s, %s)', random_element)
    src_conn.commit()
print(f'Action: INSERT id={random_element[0]}')

# Wait a few seconds and check destinations
print('Waiting 5 seconds for replication...')
time.sleep(5)

# postgres
with dest_conn.cursor() as cur:
    cur.execute('SELECT * FROM meme_coins WHERE id = %s', (random_element[0],))
    dest_row = cur.fetchone()

if dest_row:
    print(f'✓ Postgres: Found row')
else:
    print(f'✗ Postgres: Row not found')

# minio
try:
    minio_objects = list(minio_client.list_objects('meme-parquet', recursive=True))
    matching_objects = [obj for obj in minio_objects if random_element[0] in obj.object_name]
    if matching_objects:
        print(f'✓ MinIO: Found {len(matching_objects)} parquet file(s)')
        for obj in matching_objects:
            print(f'  - {obj.object_name}')
    else:

        print(f'✗ MinIO: No parquet files found')

except Exception as e:    print(f'✗ MinIO: Error - {e}')

=== INSERT TEST ===
Source element: 96dbdc37-88fe-4dec-a3c2-ccd8e3c21718
Action: INSERT id=96dbdc37-88fe-4dec-a3c2-ccd8e3c21718
Waiting 5 seconds for replication...
✓ Postgres: Found row
✓ MinIO: Found 1 parquet file(s)
  - id=96dbdc37-88fe-4dec-a3c2-ccd8e3c21718_20251209T014425_106166acf6ad4d029e610c5f91dd7001.parquet
✓ Postgres: Found row
✓ MinIO: Found 1 parquet file(s)
  - id=96dbdc37-88fe-4dec-a3c2-ccd8e3c21718_20251209T014425_106166acf6ad4d029e610c5f91dd7001.parquet


In [20]:
# Update a random element in source Postgres and verify it replicates to destinations
print('=== UPDATE TEST ===')

with src_conn.cursor() as cur:
    cur.execute("SELECT id FROM meme_coins ORDER BY RANDOM() LIMIT 1")
    row = cur.fetchone()
    if not row:
        print('✗ No rows found in source to update')
    else:
        rid = row[0]
        new_price = '99999999999999999999999999999999999999999999999999999999999999999.99'
        cur.execute('UPDATE meme_coins SET price = %s WHERE id = %s', (new_price, rid))
        src_conn.commit()
        print(f'Source element: {rid}')
        print(f'Action: UPDATE price')

# Wait a few seconds and check destinations
print('Waiting 5 seconds for replication...')
time.sleep(5)

if row:
    # postgres
    with dest_conn.cursor() as cur:
        cur.execute('SELECT id, price FROM meme_coins WHERE id = %s', (rid,))
        dest_row = cur.fetchone()
    
    if dest_row and dest_row[1] == new_price:
        print(f'✓ Postgres: Price updated')
    else:
        print(f'✗ Postgres: Price not updated')
    
    # minio
    try:
        minio_objects = list(minio_client.list_objects('meme-parquet', recursive=True))
        matching_objects = [obj for obj in minio_objects if rid in obj.object_name]
        if matching_objects:
            print(f'✓ MinIO: Found {len(matching_objects)} parquet file(s)')
            for obj in matching_objects:
                print(f'  - {obj.object_name}')
        else:
            print(f'✗ MinIO: No parquet files found')
    except Exception as e:
        print(f'✗ MinIO: Error - {e}')


=== UPDATE TEST ===
Source element: 48688918-016c-47bf-9461-73c146d6bfa4
Action: UPDATE price
Waiting 5 seconds for replication...
✓ Postgres: Price updated
✓ MinIO: Found 2 parquet file(s)
  - id=48688918-016c-47bf-9461-73c146d6bfa4_20251209T013542_d51535ac92704451a67bd9c0af436d5a.parquet
  - id=48688918-016c-47bf-9461-73c146d6bfa4_20251209T014430_90e7c1d3900d4148934554366de9c168.parquet
✓ Postgres: Price updated
✓ MinIO: Found 2 parquet file(s)
  - id=48688918-016c-47bf-9461-73c146d6bfa4_20251209T013542_d51535ac92704451a67bd9c0af436d5a.parquet
  - id=48688918-016c-47bf-9461-73c146d6bfa4_20251209T014430_90e7c1d3900d4148934554366de9c168.parquet


In [21]:
# Delete a random element from source Postgres and verify deletion on destinations
print('=== DELETE TEST ===')

with src_conn.cursor() as cur:
    cur.execute("SELECT id FROM meme_coins ORDER BY RANDOM() LIMIT 1")
    row = cur.fetchone()
    if not row:
        print('✗ No rows found in source to delete')
    else:
        rid = row[0]
        cur.execute('DELETE FROM meme_coins WHERE id = %s', (rid,))
        src_conn.commit()
        print(f'Source element: {rid}')
        print(f'Action: DELETE')

# Wait a few seconds and check destinations
print('Waiting 5 seconds for replication...')
time.sleep(5)

if row:
    # postgres
    with dest_conn.cursor() as cur:
        cur.execute('SELECT * FROM meme_coins WHERE id = %s', (rid,))
        dest_row = cur.fetchone()
    
    if dest_row is None:
        print(f'✓ Postgres: Row deleted')
    else:
        print(f'✗ Postgres: Row still exists')
    
    # minio
    try:
        minio_objects = list(minio_client.list_objects('meme-parquet', recursive=True))
        matching_objects = [obj for obj in minio_objects if rid in obj.object_name]
        if matching_objects:
            print(f'ℹ MinIO: Found {len(matching_objects)} parquet file(s) (delete marker may be present)')
            for obj in matching_objects:
                print(f'  - {obj.object_name}')
        else:
            print(f'✓ MinIO: No parquet files found (delete processed)')
    except Exception as e:
        print(f'✗ MinIO: Error - {e}')


=== DELETE TEST ===
Source element: 48688918-016c-47bf-9461-73c146d6bfa4
Action: DELETE
Waiting 5 seconds for replication...
✓ Postgres: Row deleted
✓ MinIO: No parquet files found (delete processed)
✓ Postgres: Row deleted
✓ MinIO: No parquet files found (delete processed)
