## Register the Debezium Connector 

In [1]:
import requests
with open('config/debezium-source.json') as f:
    resp = requests.post('http://localhost:8083/connectors', headers={'Content-Type': 'application/json'}, data=f.read())
    print(resp.status_code, resp.text)


409 {"error_code":409,"message":"Connector thmanyah-postgres-connector already exists"}


## Generate Mock Data Into `content` and `engagement_events` Postgres tables 

In [None]:
%pip install -r ./data-generator/requirements.txt

In [37]:
import uuid
import random
import psycopg2
from datetime import datetime, timedelta
from faker import Faker
import json

fake = Faker()

# DB Connection
conn = psycopg2.connect(
    host="localhost",
    port=5432,
    dbname="thmanyah_db",
    user="postgres",
    password="postgres"
)
cursor = conn.cursor()

# Constants
CONTENT_TYPES = ['podcast', 'newsletter', 'video']
EVENT_TYPES = ['play', 'pause', 'finish', 'click']
DEVICES = ['ios', 'android', 'web-chrome', 'web-safari']

NUM_CONTENT = 10
NUM_EVENTS = 50

# Step 1: Insert Content
content_ids = []
for _ in range(NUM_CONTENT):
    cid = uuid.uuid4()
    content_ids.append(cid)
    cursor.execute("""
        INSERT INTO content (id, slug, title, content_type, length_seconds, publish_ts)
        VALUES (%s, %s, %s, %s, %s, %s)
        ON CONFLICT (id) DO NOTHING;
    """, (
        str(cid),
        fake.slug(),
        fake.sentence(nb_words=5),
        random.choice(CONTENT_TYPES),
        random.randint(30, 900),  # 0.5 to 15 minutes
        datetime.now() - timedelta(days=random.randint(0, 30))
    ))

# Step 2: Insert Engagement Events
for _ in range(NUM_EVENTS):
    cid = random.choice(content_ids)
    uid = uuid.uuid4()
    event_type = random.choice(EVENT_TYPES)
    duration_ms = random.randint(1000, 300000) if event_type in ['play', 'finish'] else None
    payload = {
        "ip": fake.ipv4(),
        "user_agent": fake.user_agent()
    }

    cursor.execute("""
        INSERT INTO engagement_events (content_id, user_id, event_type, event_ts, duration_ms, device, raw_payload)
        VALUES (%s, %s, %s, %s, %s, %s, %s);
    """, (
        str(cid),
        str(uid),
        event_type,
        datetime.now(),
        duration_ms,
        random.choice(DEVICES),
        json.dumps(payload)
    ))

conn.commit()
cursor.close()
conn.close()

print(f"✅ Inserted {NUM_CONTENT} content records and {NUM_EVENTS} engagement events.")


✅ Inserted 10 content records and 50 engagement events.


## Flink Job (Depricated)

In [None]:
%pip install -r ./flink-job/requirements.txt

In [None]:
import json
import redis
import requests
from pyflink.common import Configuration
from pyflink.common import SimpleStringSchema, Types
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.connectors import FlinkKafkaConsumer
from pyflink.datastream.connectors import FlinkKafkaProducer




# Redis Setup
redis_client = redis.Redis(host='localhost', port=6379, decode_responses=True)

# External System URL (placeholder) TODO 
EXTERNAL_URL = "http://external-system/api/ingest"


def enrich_event(event, content_lookup):
    try:
        content = content_lookup.get(event['content_id'])
        if not content:
            return None 

        length_seconds = content.get('length_seconds')
        duration_ms = event.get('duration_ms')

        engagement_seconds = round(duration_ms / 1000.0, 2) if duration_ms else None
        engagement_pct = round(engagement_seconds / length_seconds, 2) if (engagement_seconds and length_seconds) else None

        enriched = {
            **event,
            "content_type": content.get("content_type"),
            "length_seconds": length_seconds,
            "engagement_seconds": engagement_seconds,
            "engagement_pct": engagement_pct
        }
        return enriched
    except Exception as e:
        print(f"Enrichment error: {e}")
        return None


def fanout(enriched_event):
    json_str = json.dumps(enriched_event)

    # 1. Redis
    try:
        key = f"engagement:{enriched_event['content_id']}"
        redis_client.zadd("engagement_scores", {key: enriched_event['engagement_pct'] or 0})
    except Exception as e:
        print(f"Redis error: {e}")

    # 2. External HTTP system
    # try:
    #     requests.post(EXTERNAL_URL, json=enriched_event, timeout=2)
    # except Exception as e:
    #     print(f"External system error: {e}")

    # 3. BigQuery - here we simulate by printing or sending to a file
    print(f"[BigQuery] {json_str}")


def main():
    config = Configuration()
    config.set_string(
        "pipeline.jars",
        "file:///Users/youssefel-mahdy/Downloads/My%20Data/Thmanyah%20Assignment/thmanyah-streaming-assignment/jars/flink-connector-kafka-1.17.1.jar"
    )
    env = StreamExecutionEnvironment.get_execution_environment(configuration=config)
    env.set_parallelism(1)

    # Kafka Consumer Config
    kafka_props = {
        'bootstrap.servers': 'localhost:9092',
        'group.id': 'flink-consumer',
        'auto.offset.reset': 'earliest'
    }

    kafka_source = FlinkKafkaConsumer(
        topics='thmanyah.public.engagement_events',
        deserialization_schema=SimpleStringSchema(),
        properties=kafka_props
    )

    ds = env.add_source(kafka_source).map(lambda x: json.loads(x), output_type=Types.MAP(Types.STRING(), Types.STRING()))

    # Mock content lookup (in real system, use broadcast state or external store)
    # For now, we’ll simulate content data
    content_lookup = {
        "11111111-1111-1111-1111-111111111111": {
            "content_type": "podcast",
            "length_seconds": 200
        }
        # ... add more as needed
    }

    ds = ds.map(lambda event: enrich_event(event, content_lookup)) \
           .filter(lambda x: x is not None) \
           .map(lambda enriched: fanout(enriched))

    env.execute("Thmanyah Engagement Event Pipeline")


if __name__ == '__main__':
    main()


In [None]:
%%python -m pyflink.datastream.cli \
  --python flink-job/main.py \
  --jarfile "/Users/youssefel-mahdy/Downloads/My Data/Thmanyah Assignment/thmanyah-streaming-assignment/jars/flink-connector-kafka-1.17.1.jar"/Users/youssefel-mahdy/Downloads/My Data/Thmanyah Assignment/thmanyah-streaming-assignment/jars/flink-connector-kafka-1.17.1.jar"


In [None]:
%pip install faust redis requests

## Test BigQuery Connection 

In [None]:
from google.cloud import bigquery
bq_client = bigquery.Client.from_service_account_json("path.json")
# export GOOGLE_APPLICATION_CREDENTIALS="path.json"
# bq_client = bigquery.Client()

In [27]:
bq_client.list_datasets()

<google.api_core.page_iterator.HTTPIterator at 0x1156d4550>

In [28]:
for dataset in bq_client.list_datasets():
    print(dataset)

<google.cloud.bigquery.dataset.DatasetListItem object at 0x11244f250>


In [None]:
project_id = "xxx"
dataset_id = "xx"
table_name = "events_data"
table_id = f"{project_id}.{dataset_id}.{table_name}"

# Initialize BigQuery client
# bq_client = bigquery.Client()
bq_client.get_table(table_id)

Table(TableReference(DatasetReference('pioneering-rex-350017', 'thmanyah_streaming_data'), 'events_data'))

## Test Backfill Logic

In [39]:
import json
import psycopg2
from kafka import KafkaProducer

# Postgres connection config
pg_conn = psycopg2.connect(
    host="localhost",         # or 'postgres' if running inside Docker
    port="5432",
    database="thmanyah_db",
    user="postgres",
    password="postgres"
)

# Kafka config
producer = KafkaProducer(
    bootstrap_servers='localhost:9092',
    value_serializer=lambda v: json.dumps(v).encode('utf-8')
)

KAFKA_TOPIC = 'thmanyah.public.engagement_events'

# Date filter (inclusive): change as needed
START_DATE = '2025-01-01'
END_DATE = '2025-09-01'


def fetch_all_events(start_date, end_date):
    with pg_conn.cursor() as cursor:
        query = """
            SELECT * FROM public.engagement_events
            WHERE event_ts >= %s AND event_ts <= %s
            ORDER BY event_ts;
        """
        cursor.execute(query, (start_date, end_date))
        columns = [desc[0] for desc in cursor.description]
        for row in cursor.fetchall():
            yield dict(zip(columns, row))


def main():
    print(f"[Backfill] Starting replay from {START_DATE} to {END_DATE}")

    for event in fetch_all_events(START_DATE, END_DATE):
        # Convert datetime fields to strings
        for key, value in event.items():
            if isinstance(value, datetime):
                event[key] = value.isoformat()
        # print(event)
        # You can optionally transform here (e.g., add raw_payload: {})
        event['raw_payload'] = {}  # Optional, to match schema
        producer.send(KAFKA_TOPIC, event)
        print(f"[Backfill] Sent event: {event['content_id']}")

    producer.flush()
    print("[Backfill] All events sent.")


if __name__ == '__main__':
    main()


[Backfill] Starting replay from 2025-01-01 to 2025-09-01
[Backfill] Sent event: 802b2de1-0ca7-4947-b6b6-bd6f5a879435
[Backfill] Sent event: 962d38fe-3b19-4a0a-b7b3-5c576882e38b
[Backfill] Sent event: 34a7e59e-c143-49b0-ab95-1b23ac4bb37d
[Backfill] Sent event: 35068e14-df20-4021-a1ff-60f2493436c8
[Backfill] Sent event: bcd45e34-d0dd-4241-a6c5-4b24a6ea4ed8
[Backfill] Sent event: f5c8df49-e4e8-4cb8-a7b7-7eae01470788
[Backfill] Sent event: 3a5b264c-5fae-41cd-8979-88b1b81b2fa2
[Backfill] Sent event: 802b2de1-0ca7-4947-b6b6-bd6f5a879435
[Backfill] Sent event: df88189f-021e-4feb-9e5d-e3062970b8c7
[Backfill] Sent event: f5c8df49-e4e8-4cb8-a7b7-7eae01470788
[Backfill] Sent event: bcd45e34-d0dd-4241-a6c5-4b24a6ea4ed8
[Backfill] Sent event: bcd45e34-d0dd-4241-a6c5-4b24a6ea4ed8
[Backfill] Sent event: 34a7e59e-c143-49b0-ab95-1b23ac4bb37d
[Backfill] Sent event: 34a7e59e-c143-49b0-ab95-1b23ac4bb37d
[Backfill] Sent event: df88189f-021e-4feb-9e5d-e3062970b8c7
[Backfill] Sent event: 802b2de1-0ca7-4947-b

## Test Environment Variables Accessability

In [40]:
pip install python-dotenv


Collecting python-dotenv
  Downloading python_dotenv-1.1.1-py3-none-any.whl.metadata (24 kB)
Downloading python_dotenv-1.1.1-py3-none-any.whl (20 kB)
Installing collected packages: python-dotenv
[0mSuccessfully installed python-dotenv-1.1.1

[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip is available: [0m[31;49m25.0.1[0m[39;49m -> [0m[32;49m25.2[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpython3 -m pip install --upgrade pip[0m
Note: you may need to restart the kernel to use updated packages.


In [41]:
from dotenv import load_dotenv
load_dotenv()

True

In [42]:
import os

# Kafka
KAFKA_BROKER_URL = os.getenv("KAFKA_BROKER_URL",None)
KAFKA_BROKER_URL

'kafka://localhost:9092'

In [44]:
REDIS_AGGREGATOR_TIME_WINDOW=600 # 10 minutes 
print(int(REDIS_AGGREGATOR_TIME_WINDOW/60))

10
