# MLOps Sentiment Analysis - Sistema di Monitoraggio Completo

Questo notebook implementa il sistema completo di monitoraggio e retraining per il modello di sentiment analysis. Include:
- ✅ Download del dataset
- ✅ Valutazione del modello su test set (Accuracy, F1-Score)
- ✅ Logging delle predizioni con timestamp
- ✅ Tracking delle metriche (distribuzione sentiment, confidence media)
- ✅ Rilevazione del drift confrontando con baseline
- ✅ Trigger di retraining basato su performance
- ✅ Visualizzazioni dei risultati
- ✅ Test unitari della funzione principale

In [1]:
# 1. Importare Librerie Necessarie
import sys
import os
import numpy as np
import pandas as pd
import json
from pathlib import Path
from typing import Dict, List, Tuple, Optional, Any
from datetime import datetime, timedelta
from collections import Counter
import warnings
warnings.filterwarnings('ignore')

# Librerie di visualizzazione
import matplotlib.pyplot as plt
import seaborn as sns

# Scikit-learn per le metriche
from sklearn.metrics import accuracy_score, f1_score, precision_score, recall_score
from sklearn.metrics import classification_report, confusion_matrix

# Datasets
from datasets import load_dataset

# Modelli locali
sys.path.insert(0, '/workspaces/mlops-ex')
from src.sentiment_model import analyze_sentiment
from src.monitoring import PredictionLogger, PredictionLog
from src.metrics import MetricsTracker, SentimentMetrics
from src.drift_detection import DriftDetector, DriftReport
from src.retraining import RetrainingManager, RetrainingTrigger

# Configurazione di stile
sns.set_style("whitegrid")
plt.rcParams['figure.figsize'] = (12, 6)

print("✅ Tutte le librerie importate correttamente")
print(f"Python version: {sys.version}")

Loading weights: 100%|██████████| 201/201 [00:00<00:00, 649.45it/s, Materializing param=roberta.encoder.layer.11.output.dense.weight]              
RobertaForSequenceClassification LOAD REPORT from: cardiffnlp/twitter-roberta-base-sentiment-latest
Key                             | Status     |  | 
--------------------------------+------------+--+-
roberta.pooler.dense.bias       | UNEXPECTED |  | 
roberta.embeddings.position_ids | UNEXPECTED |  | 
roberta.pooler.dense.weight     | UNEXPECTED |  | 

Notes:
- UNEXPECTED	:can be ignored when loading from different task/architecture; not ok if you expect identical arch.


ImportError: cannot import name 'wasserstein_distance' from 'scipy.spatial.distance' (/workspaces/mlops-ex/.venv/lib/python3.12/site-packages/scipy/spatial/distance.py)

In [None]:
# 2. Configurare il Sistema di Logging
from pathlib import Path

# Crea le directory necessarie
logs_dir: Path = Path("logs")
logs_dir.mkdir(exist_ok=True)

# Inizializza i componenti del sistema di monitoraggio
logger: PredictionLogger = PredictionLogger(log_dir="logs")
metrics_tracker: MetricsTracker = MetricsTracker(metrics_dir="logs")
drift_detector: DriftDetector = DriftDetector(
    baseline_file="logs/baseline_distribution.json",
    drift_threshold=0.15,
    metrics_dir="logs"
)
retraining_manager: RetrainingManager = RetrainingManager(
    min_samples_for_retraining=50,
    confidence_threshold=0.70,
    drift_detector=drift_detector,
    metrics_dir="logs"
)

# Pulisci i log precedenti (opzionale)
logger.clear_logs()
metrics_tracker.clear_metrics()
drift_detector.clear_drift_reports()
retraining_manager.clear_triggers()

print("✅ Sistema di logging configurato")
print(f"Directory dei log: {logs_dir.absolute()}")

In [None]:
# 3. Scaricare e Caricare il Dataset
# Carica il dataset
dataset = load_dataset("tweet_eval", "sentiment")

# Mapping delle label
label_mapping: Dict[int, str] = {0: "Negativo", 1: "Neutro", 2: "Positivo"}

print("=" * 60)
print("DATASET INFORMATION")
print("=" * 60)
print(f"Train samples: {len(dataset['train'])}")
print(f"Test samples: {len(dataset['test'])}")
print(f"Label mapping: {label_mapping}")

In [None]:
# 4. Implementare Prediction Logger (esempio d'uso)
# Esegui inferenze su un campione e logga le predizioni
import numpy as np

sample_size: int = 100
indices = np.random.choice(len(dataset['test']), sample_size, replace=False)

for i in indices:
    sample = dataset['test'][int(i)]
    text = sample['text']
    true_label = label_mapping[sample['label']]
    
    scores = analyze_sentiment(text)
    pred_label = max(scores, key=scores.get)
    
    logger.log_prediction(text=text, sentiment_scores=scores)

print(f"✅ Loggate {logger.get_logs_count()} predizioni di esempio")

In [None]:
# 5. Calcolare Metriche Aggregate

logs = logger.load_logs()
metrics = metrics_tracker.calculate_metrics(logs)
metrics_tracker.save_metrics(metrics)

print("Metriche aggregate:")
print(json.dumps(metrics.to_dict(), indent=2))

# Mostra distribuzione sentiment
sns.barplot(
    x=list(metrics.sentiment_distribution.keys()),
    y=list(metrics.sentiment_distribution.values())
)
plt.title("Distribuzione dei sentiment (campione)")
plt.xlabel("Sentiment")
plt.ylabel("Conteggio")
plt.show()

print(f"Confidenza media: {metrics.average_confidence:.4f}")

In [None]:
# 6. Implementare Drift Detection

logs = logger.load_logs()
report = drift_detector.detect_drift(logs)

drift_detector.save_drift_report(report)

print("Drift report:")
print(json.dumps(report.to_dict(), indent=2))

if report.drift_detected:
    print("⚠️ Drift rilevato — considerare il retraining")
else:
    print("No drift rilevato")

In [None]:
# 7. Implementare Trigger Retraining

logs = logger.load_logs()
trigger = retraining_manager.evaluate_retraining_need(logs)
retraining_manager.save_trigger(trigger)

print("Retraining trigger:")
print(json.dumps(trigger.to_dict(), indent=2))

if trigger.triggered:
    print("✅ Trigger per retraining attivato: ", trigger.recommended_action)
else:
    print("Nessun retraining necessario al momento")

In [None]:
# 8. Valutare Modello su Test Set

from sklearn.metrics import ConfusionMatrixDisplay

# Usa un campione del test set per la valutazione
sample_size = min(200, len(dataset['test']))
indices = np.random.choice(len(dataset['test']), sample_size, replace=False)

preds = []
trues = []
confs = []

for i in indices:
    sample = dataset['test'][int(i)]
    text = sample['text']
    true_label = label_mapping[sample['label']]
    res = analyze_sentiment(text)
    pred_label = max(res, key=res.get)
    conf = res[pred_label]

    preds.append(pred_label)
    trues.append(true_label)
    confs.append(conf)

# Mappa a valori numerici
label_to_idx = {v: k for k, v in label_mapping.items()}

y_true = [label_to_idx[t] for t in trues]
y_pred = [label_to_idx[p] for p in preds]

accuracy = accuracy_score(y_true, y_pred)
f1 = f1_score(y_true, y_pred, average='weighted', zero_division=0)
precision = precision_score(y_true, y_pred, average='weighted', zero_division=0)
recall = recall_score(y_true, y_pred, average='weighted', zero_division=0)

print("EVALUATION")
print(f"Accuracy: {accuracy:.4f}")
print(f"F1 (weighted): {f1:.4f}")
print(f"Precision (weighted): {precision:.4f}")
print(f"Recall (weighted): {recall:.4f}")

# Confusion matrix
cm = confusion_matrix(y_true, y_pred)
disp = ConfusionMatrixDisplay(confusion_matrix=cm, display_labels=list(label_mapping.values()))
disp.plot(cmap='Blues')
plt.title('Confusion Matrix')
plt.show()

# Distribuzione confidenza
sns.histplot(confs, bins=20)
plt.title('Distribuzione dei confidence score')
plt.xlabel('Confidence')
plt.show()

In [None]:
# 9. Visualizzare Distribuzioni Sentiment nel tempo

# Calcola metriche su finestre orarie
metrics_over_time = metrics_tracker.get_metrics_over_time(logger.load_logs(), window_hours=1)

# Prepara i dati per il plotting
if metrics_over_time:
    timestamps = [t for t, m in metrics_over_time]
    positives = [m.sentiment_distribution.get('Positivo', 0) for t, m in metrics_over_time]
    neutrals = [m.sentiment_distribution.get('Neutro', 0) for t, m in metrics_over_time]
    negatives = [m.sentiment_distribution.get('Negativo', 0) for t, m in metrics_over_time]

    plt.plot(timestamps, positives, label='Positivo')
    plt.plot(timestamps, neutrals, label='Neutro')
    plt.plot(timestamps, negatives, label='Negativo')
    plt.xticks(rotation=45)
    plt.legend()
    plt.title('Evoluzione distribuzione sentiment nel tempo (finestre orarie)')
    plt.show()
else:
    print('Non ci sono abbastanza dati per mostrare metriche nel tempo')


In [None]:
# 10. Test unitari della funzione principale (esempi)
# Esegue un set di test rapidi per verificare logger, metrics, drift e retraining

from src.monitoring import PredictionLogger
from src.metrics import MetricsTracker
from src.drift_detection import DriftDetector
from src.retraining import RetrainingManager
from datetime import datetime

# Test 1: Logging corretto
pl = PredictionLogger(log_dir='logs')
pl.clear_logs()
res = {'Positivo': 0.9, 'Neutro': 0.09, 'Negativo': 0.01}
entry = pl.log_prediction('Test logging', res)
assert pl.get_logs_count() >= 1
print('Test 1 passed: logging corretto')

# Test 2: Metrics aggregation
mt = MetricsTracker(metrics_dir='logs')
logs = pl.load_logs()
metrics = mt.calculate_metrics(logs)
assert metrics.total_predictions >= 1
print('Test 2 passed: metrics aggregation')

# Test 3: Drift detection con distribuzioni artificiali
from src.monitoring import PredictionLog
fake_logs = [
    PredictionLog(timestamp=datetime.now().isoformat(), text='t1', sentiment='Positivo', confidence=0.9, scores={'Positivo':0.9,'Neutro':0.08,'Negativo':0.02})
    for _ in range(60)
]

det = DriftDetector(baseline_file='logs/baseline_test.json', metrics_dir='logs', drift_threshold=0.1)
det.set_baseline(fake_logs[:30])
report = det.detect_drift(fake_logs[30:])
print('Test 3 passed: drift detection eseguito', report.drift_detected)

# Test 4: Retraining trigger
rm = RetrainingManager(min_samples_for_retraining=10, confidence_threshold=0.95, drift_detector=det, metrics_dir='logs')
trigger = rm.evaluate_retraining_need(fake_logs)
print('Test 4 passed: retraining evaluated', trigger.triggered)

print('✅ Tutti i test rapidi sono stati eseguiti')