# A2B Streaming Demo (Refactored)
This notebook demonstrates the streaming runner and producer. Ensure Kafka is running and topic `moth_clickstream` exists.

In [10]:
# Start producer (in another shell) using: bash scripts/run_producer.sh
# Start streaming job (in another shell) using: bash scripts/run_streaming.sh

from moth import kafka_producer, spark_utils, visualisation

print('Producer and streaming runner available via scripts/.')

# Example: consume a small batch into pandas (visualisation)
df = visualisation.consume_to_dataframe('localhost:9092', 'moth_clickstream', timeout_ms=5000, max_messages=100)
display(df.head())


Producer and streaming runner available via scripts/.


Unnamed: 0,#,session_id,event_name,event_id,traffic_source,event_metadata,customer_id,ts
0,88615,4cecf5d9-ed48-4a0f-89b7-07cf3e32b2b3,HOMEPAGE,4936171a-2695-4409-9e52-04a86de7a63b,MOBILE,,98052,1758417535
1,88616,4cecf5d9-ed48-4a0f-89b7-07cf3e32b2b3,SCROLL,2d90a9eb-73c4-4320-892d-82085ff66110,MOBILE,,98052,1758417535
2,88617,4cecf5d9-ed48-4a0f-89b7-07cf3e32b2b3,SEARCH,60334ea4-c712-4d04-8c14-87f0dc607080,MOBILE,{'search_keywords': 'Sepatu Nike'},98052,1758417535
3,88618,4cecf5d9-ed48-4a0f-89b7-07cf3e32b2b3,HOMEPAGE,b6fff4ab-2268-400f-8422-a3f875a8e78d,MOBILE,,98052,1758417535
4,88619,4cecf5d9-ed48-4a0f-89b7-07cf3e32b2b3,ADD_TO_CART,7faa9d71-80e6-4b74-beb7-336a11bd11a6,MOBILE,"{'product_id': 32222, 'quantity': 1, 'item_pri...",98052,1758417535


In [6]:
from kafka import KafkaConsumer
import json
import threading
import time

class LiveKafkaConsumer:
    def __init__(self, broker, topic, group_id=None):
        """
        broker: Kafka broker 地址, e.g. 'localhost:9092'
        topic: 要訂閱的 topic 名稱
        group_id: 消費者群組, None 表示每次都從 earliest 開始
        """
        self.consumer = KafkaConsumer(
            topic,
            bootstrap_servers=[broker],
            auto_offset_reset='earliest',
            enable_auto_commit=False,
            group_id=group_id,
            value_deserializer=lambda m: json.loads(m.decode('utf-8'))
        )
        self.running = False

    def start(self, max_messages=None):
        """
        開始即時打印消息
        max_messages: None 表示一直抓，否則抓到指定數量停止
        """
        self.running = True
        count = 0
        try:
            for msg in self.consumer:
                print(msg.value)
                count += 1
                if max_messages and count >= max_messages:
                    break
                if not self.running:
                    break
        finally:
            self.consumer.close()

    def stop(self):
        """停止消費"""
        self.running = False

    def start_in_thread(self, max_messages=None):
        """非阻塞啟動 consumer，用於 Notebook"""
        thread = threading.Thread(target=self.start, args=(max_messages,))
        thread.start()
        return thread


In [11]:
from kafka import KafkaConsumer
import json
import pandas as pd

def realtime_consume(broker, topic, max_messages=None):
    consumer = KafkaConsumer(
        topic,
        bootstrap_servers=[broker],
        auto_offset_reset='latest',  # 只看最新消息
        value_deserializer=lambda m: json.loads(m.decode('utf-8'))
    )
    count = 0
    try:
        for msg in consumer:
            print(msg.value)
            count += 1
            if max_messages and count >= max_messages:
                break
    finally:
        consumer.close()


In [12]:
realtime_consume('localhost:9092', 'moth_clickstream', max_messages=20)


{'#': '32305', 'session_id': '1c1147ac-faf4-4f72-934d-ddd723551c8f', 'event_name': 'HOMEPAGE', 'event_id': 'ab321141-ef83-4f72-ac30-9ece698c0d19', 'traffic_source': 'MOBILE', 'event_metadata': '', 'customer_id': '86351', 'ts': 1758424771}
{'#': '32306', 'session_id': '1c1147ac-faf4-4f72-934d-ddd723551c8f', 'event_name': 'ADD_TO_CART', 'event_id': '04e4ae23-6a24-4835-956e-7158bd799bd2', 'traffic_source': 'MOBILE', 'event_metadata': "{'product_id': 57056, 'quantity': 1, 'item_price': 205696}", 'customer_id': '86351', 'ts': 1758424771}
{'#': '32307', 'session_id': '1c1147ac-faf4-4f72-934d-ddd723551c8f', 'event_name': 'SCROLL', 'event_id': 'd642aa2c-2d77-41db-ac47-0aaeb8c9fa41', 'traffic_source': 'MOBILE', 'event_metadata': '', 'customer_id': '86351', 'ts': 1758424771}
{'#': '32308', 'session_id': '1c1147ac-faf4-4f72-934d-ddd723551c8f', 'event_name': 'CLICK', 'event_id': '1e650bfc-0fe8-4e37-b06d-09355d3b8a0d', 'traffic_source': 'MOBILE', 'event_metadata': '', 'customer_id': '86351', 'ts': 