In [1]:
import evidently
evidently.__version__

'0.7.0'

In [3]:
import os, time, json
import pandas as pd, numpy as np
from pathlib import Path
from fastapi import FastAPI, HTTPException
from fastapi.responses import FileResponse, PlainTextResponse
from fastapi.staticfiles import StaticFiles

from evidently import Report, Regression, Dataset, DataDefinition
from evidently.tests import lte, gte, lt, gt, is_in, not_in, eq, not_eq
from evidently.presets import DataDriftPreset, RegressionPreset
from evidently.metrics import ValueDrift, DriftedColumnsCount

# # Get the AIP token from environment
# EVIDENTLY_API_TOKEN = os.getenv("EVIDENTLY_AI")
# print(f"EVIDENTLY_API_TOKEN: {EVIDENTLY_API_TOKEN}")
# ws = CloudWorkspace(token=EVIDENTLY_API_TOKEN, url="https://app.evidently.cloud")

# USE CONTAINER PATHS (compose mounts these)
DATA_DIR = os.getenv("DATA_DIR", "data_clean")
REPORTS_DIR = os.getenv("REPORTS_DIR", "reports")
GOLD = os.path.join(DATA_DIR, "gold")
GOLD_MKT = os.path.join(DATA_DIR, "gold", "market", "features", "spx500_features.parquet")
GOLD_NEWS = os.path.join(DATA_DIR, "gold", "news", "signals", "spx500_trading_signals.parquet")
GOLD_LABELS = os.path.join(DATA_DIR, "gold", "market", "labels", "spx500_labels_30min.parquet")   # user_id, target_regression, label
PREDS = os.path.join(DATA_DIR, "predictions", "spx500_batch_scores.parquet")  # user_id, score
MODELS = os.path.join("models", "production")
REPORT_FEATURES_HTML = os.path.join(REPORTS_DIR, "features_latest_report.html")
REPORT_DRIFT_HTML = os.path.join(REPORTS_DIR, "features_drift_report.html")
START_TRAIN = "2023-10-13"
END_TRAIN = "2025-08-30"

for d in [GOLD_MKT, GOLD_NEWS, GOLD_LABELS, PREDS, REPORT_FEATURES_HTML, REPORT_DRIFT_HTML]:
    d = d.replace("/", "\\")  # for Windows compatibility

app = FastAPI(title="Evidently 0.6.7 Monitor")

# Ensure the reports dir exists before mounting as static
os.makedirs(REPORTS_DIR, exist_ok=True)
app.mount("/reports", StaticFiles(directory=REPORTS_DIR), name="reports")

# Simulate predictions
def simulate_predictions():
    df_example = pd.read_parquet(GOLD_LABELS.replace("/", "\\"))
    # Perturb the label and rename as score
    label = "target_regression"
    df_example["predicted_regression"] = df_example[label] + 0.1 * (0.5 - np.random.rand(len(df_example)))
    df_example[["time", "predicted_regression"]].to_parquet(PREDS, index=False)
    # Save as parquet for downstream tasks
    df_example.to_parquet(PREDS.replace("/", "\\"), index=False)
    print("Simulated predictions saved.")

print("Current directory:", os.getcwd())
# Change directory to root
os.chdir("c:\\Users\\gabjj\\Desktop\\Education\\MITB\\CS611\\project\\fx-ml-pipeline")
print("Changed directory to:", os.getcwd())
RUN_SIMULATION = True
if RUN_SIMULATION:
    simulate_predictions()

Current directory: c:\Users\gabjj\Desktop\Education\MITB\CS611\project\fx-ml-pipeline
Changed directory to: c:\Users\gabjj\Desktop\Education\MITB\CS611\project\fx-ml-pipeline
Simulated predictions saved.


In [18]:
df_gold_mkt = pd.read_parquet(GOLD_MKT)
df_gold_labels = pd.read_parquet(GOLD_LABELS)
df_pred = pd.read_parquet(PREDS)
df_gold = df_gold_mkt.merge(df_gold_labels[['time', 'target_regression']], on=["time"], how="inner", suffixes=("_mkt", "_labels"))
df_gold = df_gold.merge(df_pred[['time', 'predicted_regression']], on=["time"], how="inner", suffixes=("", "_pred"))

with open(os.path.join(MODELS, "current_features.json"), "r") as f:
    current_features = json.load(f)

numerical_features = [d for d in df_gold.columns if df_gold[d].dtype in [np.float64, np.int64] and d not in ["time", "predicted_regression", "target_regression"]]
categorical_features = [d for d in df_gold.columns if df_gold[d].dtype == object and d not in ["time", "predicted_regression", "target_regression"]]
numerical_features = [f for f in numerical_features if f in current_features['features']]
categorical_features = [f for f in categorical_features if f in current_features['features']]

# Create dataset
data_definition = DataDefinition(
    regression=[Regression(target="target_regression", prediction="predicted_regression")],
    numerical_columns=numerical_features,
    categorical_columns=categorical_features
)

features_train_df = df_gold[(df_gold['time'] >= START_TRAIN) & (df_gold['time'] <= END_TRAIN)][numerical_features + categorical_features + ['time', 'target_regression', 'predicted_regression']]
features_oot_df = df_gold[df_gold['time'] > END_TRAIN][numerical_features + categorical_features + ['time', 'target_regression', 'predicted_regression']]

In [None]:
def build_report() -> str:
    if not (os.path.exists(GOLD) and os.path.exists(PREDS)):
        raise FileNotFoundError("Need gold/features.csv and predictions/batch_scores.csv")

    df_gold_mkt = pd.read_parquet(GOLD_MKT)
    df_gold_labels = pd.read_parquet(GOLD_LABELS)
    df_pred = pd.read_parquet(PREDS)
    df_gold = df_gold_mkt.merge(df_gold_labels[['time', 'target_regression']], on=["time"], how="inner", suffixes=("_mkt", "_labels"))
    df_gold = df_gold.merge(df_pred[['time', 'predicted_regression']], on=["time"], how="inner", suffixes=("", "_pred"))

    with open(os.path.join(MODELS, "current_features.json"), "r") as f:
        current_features = json.load(f)['features']

    numerical_features = [d for d in df_gold.columns if df_gold[d].dtype in [np.float64, np.int64] and d not in ["time", "predicted_regression", "target_regression"]]
    categorical_features = [d for d in df_gold.columns if df_gold[d].dtype == object and d not in ["time", "predicted_regression", "target_regression"]]
    numerical_features = [f for f in numerical_features if f in current_features]
    categorical_features = [f for f in categorical_features if f in current_features]

    # Create dataset
    data_definition = DataDefinition(
        regression=[Regression(target="target_regression", prediction="predicted_regression")],
        numerical_columns=numerical_features,
        categorical_columns=categorical_features
    )

    features_train_df = df_gold[(df_gold['time'] >= START_TRAIN) & (df_gold['time'] <= END_TRAIN)][numerical_features + categorical_features + ['time', 'target_regression', 'predicted_regression']]
    features_oot_df = df_gold[df_gold['time'] > END_TRAIN][numerical_features + categorical_features + ['time', 'target_regression', 'predicted_regression']]
    reference_dataset = Dataset.from_pandas(features_train_df, data_definition=data_definition)
    current_dataset = Dataset.from_pandas(features_oot_df, data_definition=data_definition)

    regression_preset = Report(metrics=[
        RegressionPreset(
            # mae_tests=[lt(0.3)],
            # mean_error_tests=[gt(-0.2), lt(0.2)],
            # rmse_tests=[lt(0.3)],
            # r2score_tests=[gt(0.5)],
        )
    ])

    regression_snapshot_with_reference = regression_preset.run(current_data=current_dataset, reference_data=current_dataset)

    # Drift report
    value_drift_columns = [ValueDrift(column=col, method="psi", threshold=0.05) for col in numerical_features]
    drift_report = Report([
        DriftedColumnsCount(
            cat_stattest="psi", num_stattest="wasserstein",
            per_column_method={"target_regression": "psi", "predicted_regression": "psi"}, drift_share=0.8
        )
    ] + value_drift_columns, include_tests=False)

    drift_snapshot = drift_report.run(current_data=current_dataset, reference_data=reference_dataset)

    os.makedirs(REPORTS_DIR, exist_ok=True)
    regression_snapshot_with_reference.save_html(REPORT_FEATURES_HTML)
    drift_snapshot.save_html(REPORT_DRIFT_HTML)
    
    return REPORT_FEATURES_HTML, REPORT_DRIFT_HTML

@app.get("/ping", response_class=PlainTextResponse)
def ping():
    return "evidently up\n"

@app.post("/generate")
def generate():
    try:
        path = build_report()
    except Exception as e:
        raise HTTPException(status_code=400, detail=str(e))
    return {"status": "ok", "report": path}

@app.get("/")
def index():
    if not os.path.exists(REPORT_FEATURES_HTML) and not os.path.exists(REPORT_DRIFT_HTML):
        try:
            build_report()
        except Exception as e:
            raise HTTPException(status_code=400, detail=f"Generate failed: {e}")
    return FileResponse(REPORT_FEATURES_HTML, media_type="text/html")