In [None]:
# realtime_pipeline_example.ipynb
# Esempio prototipale di pipeline in streaming con Kafka e modello ML pre-addestrato

!pip install kafka-python joblib

import json
import time
import joblib
import numpy as np
from kafka import KafkaConsumer

# Carica un modello pre-addestrato (ad es. un IsolationForest salvato)
model = joblib.load('isolation_forest_model.pkl')

# Consumer Kafka
consumer = KafkaConsumer(
    'cyber_logs_topic',
    bootstrap_servers=['localhost:9092'],
    auto_offset_reset='earliest',
    enable_auto_commit=True,
    group_id='my-group')

print("In ascolto sul topic 'cyber_logs_topic'...")

for message in consumer:
    # message.value è in formato bytes, convertiamo in json
    log_entry = json.loads(message.value.decode('utf-8'))
    # Esempio: log_entry = {"feature1": 0.5, "feature2": 102, ...}

    # Creiamo un vettore di feature
    feature_vector = np.array([[
        log_entry["feature1"],
        log_entry["feature2"],
        # ...
    ]])

    # Prediciamo
    pred = model.predict(feature_vector)  # -1 = anomalia, 1 = normale

    if pred[0] == -1:
        print("ALERT: Anomalia rilevata -", log_entry)
        # eventuale invio email / Slack ...
    else:
        print("OK: traffico regolare -", log_entry)

    time.sleep(0.1)  # per evitare spam in questa demo
