In [1]:
import json
import numpy as np
from kafka import KafkaConsumer
from datetime import datetime

# Secure JSON deserializer
def safe_json_deserializer(v):
    try:
        decoded_value = v.decode('utf-8')
        return json.loads(decoded_value)  # Deserialize to a dictionary
    except Exception as e:
        print(f"[ERROR] Unable to parse data: {v}，Wrong message: {e}")
        return None  

# Compute VaR and ES
def compute_var_es(returns, confidence_level=0.95):
    """
    Compute Value at Risk（VaR）and Expected Shortfall（ES）
    :param returns: A list of asset returns
    :param confidence_level: confidence level（95%）
    :return: VaR and ES
    """
    if len(returns) == 0:
        return None, None  # avoid mistakes caused by empty dataset

    var_threshold = np.percentile(returns, (1 - confidence_level) * 100)
    es = np.mean([r for r in returns if r <= var_threshold])
    return var_threshold, es

# Kafka Consumer set
consumer = KafkaConsumer(
    'stock_index_topic',
    bootstrap_servers='localhost:9092',
    auto_offset_reset='earliest',
    enable_auto_commit=False,
    group_id=f'read-all-history-group-{datetime.now().timestamp()}',
    value_deserializer=safe_json_deserializer
)

prices = []

print("Consumer has started and is listening 'stock_index_topic' data...")

# receive messages, compute VaR and ES
for message in consumer:
    # get original JSON data
    data = message.value
    if data:

        # get date, time, last price and stock code
        try:
            date_str = data.get("date")  
            time_str = data.get("time")  
            last_price = data.get("last_price")  
            stock_code = data.get("stock_code")

            # make sure get data
            if date_str and time_str and last_price:
                # combine date and exact time, then transfer
                timestamp_str = f"{date_str} {time_str}"  # eg. "2025-03-26 15:43:58"
                timestamp_sgt = datetime.strptime(timestamp_str, "%Y-%m-%d %H:%M:%S")


                prices.append(last_price)

                if len(prices) > 1:
                    returns = np.diff(prices) / prices[:-1]  

                    var, es = compute_var_es(returns, confidence_level=0.95)
                    print(f"[VaR & ES] Stock Code: {stock_code} Time: {timestamp_sgt}, Latest VaR(95%): {var:.4f}, ES(95%): {es:.4f}")
        except Exception as e:
            print(f"[ERROR] Data parsing failed, Wrong message: {e}")
            continue


Consumer has started and is listening 'stock_index_topic' data...


KeyboardInterrupt: 

In [None]:
import json
from kafka import KafkaConsumer
from datetime import datetime

def safe_json_deserializer(v):
    try:
        decoded_value = v.decode('utf-8')
        return json.loads(decoded_value)
    except Exception as e:
        print(f"[ERROR] Unable to parse data: {v}，Wrong message: {e}")
        return None  

consumer = KafkaConsumer(
    'stock_index_topic',
    bootstrap_servers='localhost:9092',
    auto_offset_reset='earliest',
    enable_auto_commit=False,
    group_id=f'read-all-history-group-{datetime.now().timestamp()}',
    value_deserializer=safe_json_deserializer
)

for message in consumer:
    print(f"[DEBUG] receive message: {message.value}")  # make sure Consumer is receiving messages

[DEBUG] 接收到消息: {'stock_code': 'STI', 'date': '2025-03-26', 'time': '15:43:58', 'last_price': 3967.87}
[DEBUG] 接收到消息: {'stock_code': 'STI', 'date': '2025-03-26', 'time': '15:44:01', 'last_price': 3967.6}
[DEBUG] 接收到消息: {'stock_code': 'STI', 'date': '2025-03-26', 'time': '15:44:04', 'last_price': 3967.75}
[DEBUG] 接收到消息: {'stock_code': 'STI', 'date': '2025-03-26', 'time': '15:44:07', 'last_price': 3967.73}
[DEBUG] 接收到消息: {'stock_code': 'STI', 'date': '2025-03-26', 'time': '15:44:10', 'last_price': 3967.76}
[DEBUG] 接收到消息: {'stock_code': 'STI', 'date': '2025-03-26', 'time': '15:44:13', 'last_price': 3967.92}
[DEBUG] 接收到消息: {'stock_code': 'STI', 'date': '2025-03-26', 'time': '15:44:16', 'last_price': 3966.79}
[DEBUG] 接收到消息: {'stock_code': 'STI', 'date': '2025-03-26', 'time': '15:44:19', 'last_price': 3966.76}
[DEBUG] 接收到消息: {'stock_code': 'STI', 'date': '2025-03-26', 'time': '15:44:22', 'last_price': 3967.39}
[DEBUG] 接收到消息: {'stock_code': 'STI', 'date': '2025-03-26', 'time': '15:44:26', 'las

KeyboardInterrupt: 