In [1]:
import os
import json
from datetime import datetime
import numpy as np

POLICY_FILE = "policy_rules.json"

DEFAULT_POLICY = {
    "min_mask_prob": 0.90,
    "allowed_classes": ["with_mask"],
    "hand_hygiene_required": False
}

def load_policy(path: str = POLICY_FILE) -> dict:
    if not os.path.exists(path):
        return DEFAULT_POLICY.copy()
    with open(path, "r", encoding="utf-8") as f:
        return json.load(f)

def save_policy(policy: dict, path: str = POLICY_FILE) -> None:
    with open(path, "w", encoding="utf-8") as f:
        json.dump(policy, f, indent=2)
    print(f"[{datetime.now().isoformat()}] Policy saved: {policy}")

def expert_decision(prob_vector: np.ndarray,
                    class_names: list[str],
                    policy: dict | None = None) -> dict:
    if policy is None:
        policy = DEFAULT_POLICY

    prob_vector = np.asarray(prob_vector).astype(float)
    max_idx = int(np.argmax(prob_vector))
    max_class = class_names[max_idx]
    max_prob = float(prob_vector[max_idx])

    allowed_classes = policy.get("allowed_classes", [])
    threshold = float(policy.get("min_mask_prob", 0.9))

    mask_ok = (max_class in allowed_classes) and (max_prob >= threshold)
    decision = "COMPLIANT" if mask_ok else "NON_COMPLIANT"

    explanation = {
        "predicted_class": max_class,
        "predicted_prob": max_prob,
        "threshold": threshold,
        "allowed_classes": allowed_classes,
        "rule_evaluated": (
            "mask_ok = (predicted_class in allowed_classes) "
            "AND (predicted_prob >= threshold)"
        )
    }

    return {
        "decision": decision,
        "explanation": explanation
    }

LOG_FILE = "compliance_log.csv"

def log_event(decision_info: dict,
              person_id: str | int | None = None,
              location: str = "unknown",
              source: str = "camera_1",
              log_path: str = LOG_FILE) -> None:
    header = [
        "timestamp", "source", "location", "person_id",
        "decision", "predicted_class", "predicted_prob",
        "threshold"
    ]
    exists = os.path.exists(log_path)

    with open(log_path, "a", encoding="utf-8") as f:
        if not exists:
            f.write(",".join(header) + "\n")

        e = decision_info["explanation"]
        row = [
            datetime.now().isoformat(),
            source,
            location,
            str(person_id) if person_id is not None else "",
            decision_info["decision"],
            e["predicted_class"],
            f"{e['predicted_prob']:.4f}",
            f"{e['threshold']:.4f}"
        ]
        f.write(",".join(row) + "\n")

PRIVACY_CONFIG = {
    "save_raw_frames": False,
    "blur_faces_for_storage": True,
    "store_aggregated_counts": True
}

def privacy_safe_output(decision_info: dict) -> dict:
    e = decision_info["explanation"]
    return {
        "decision": decision_info["decision"],
        "predicted_class": e["predicted_class"],
        "predicted_prob": e["predicted_prob"],
        "policy_threshold": e["threshold"],
        "rule_description": e["rule_evaluated"]
    }

def explanation_to_text(decision_info: dict) -> str:
    e = decision_info["explanation"]
    if decision_info["decision"] == "COMPLIANT":
        return (
            f"Decision: COMPLIANT. The system predicted '{e['predicted_class']}' "
            f"with probability {e['predicted_prob']:.2f}, which is above the "
            f"policy threshold {e['threshold']:.2f} and allowed by current rules."
        )
    else:
        return (
            f"Decision: NON-COMPLIANT. The system predicted '{e['predicted_class']}' "
            f"with probability {e['predicted_prob']:.2f} which is below the "
            f"policy threshold {e['threshold']:.2f} or not in the allowed classes."
        )

def apply_expert_layer_to_batch(prob_matrix: np.ndarray,
                                class_names: list[str],
                                policy: dict | None = None):
    if policy is None:
        policy = DEFAULT_POLICY
    results = []
    for i in range(prob_matrix.shape[0]):
        info = expert_decision(prob_matrix[i], class_names, policy)
        results.append(info)
    return results


In [2]:
# FAKE DATA just for testing that the RQ code runs
# Replace later with your real model outputs

all_classes = ["without_mask", "with_mask"]       # 2 classes
n_samples = 50

np.random.seed(0)
y_prob_meta = np.random.rand(n_samples, len(all_classes))
y_prob_meta /= y_prob_meta.sum(axis=1, keepdims=True)  # normalize to probabilities
y_meta_test = np.random.randint(0, len(all_classes), size=(n_samples,))


In [3]:
# ---------- RQ1 ----------
policy = load_policy()
print("Current policy:", policy)

new_policy = {
    "min_mask_prob": 0.95,
    "allowed_classes": ["with_mask"],
    "hand_hygiene_required": True
}
save_policy(new_policy)

policy = load_policy()
print("Updated policy:", policy)

# ---------- RQ2 ----------
class_names = all_classes

decision_infos = apply_expert_layer_to_batch(y_prob_meta, class_names, policy)
expert_labels = np.array([1 if d["decision"] == "COMPLIANT" else 0 for d in decision_infos])

y_pred_classes = np.argmax(y_prob_meta, axis=1)
ml_labels = np.array([
    1 if class_names[idx] in policy["allowed_classes"] else 0
    for idx in y_pred_classes
])

expert_compliance_rate = expert_labels.mean()
ml_compliance_rate = ml_labels.mean()

print(f"Expert system compliance rate: {expert_compliance_rate:.3f}")
print(f"Pure ML compliance rate:       {ml_compliance_rate:.3f}")
print("First 10 expert vs ML:", list(zip(expert_labels[:10], ml_labels[:10])))

# ---------- RQ3 ----------
for i, info in enumerate(decision_infos):
    log_event(info, person_id=i, location="test_set", source="offline_meta_test")
    if i < 3:
        print(f"Sample {i}:")
        print(explanation_to_text(info))
        print("Privacy-safe:", privacy_safe_output(info))
        print("-" * 60)

print("Logged events to:", LOG_FILE)

# ---------- RQ4 ----------
print("Privacy configuration:", PRIVACY_CONFIG)
example_payload = privacy_safe_output(decision_infos[0])
print("Privacy-safe payload example:", example_payload)

# ---------- RQ5 ----------
for i in range(3):
    print(f"Sample {i} explanation:")
    print(explanation_to_text(decision_infos[i]))
    print()


Current policy: {'min_mask_prob': 0.95, 'allowed_classes': ['with_mask'], 'hand_hygiene_required': True}
[2025-12-28T22:50:22.209714] Policy saved: {'min_mask_prob': 0.95, 'allowed_classes': ['with_mask'], 'hand_hygiene_required': True}
Updated policy: {'min_mask_prob': 0.95, 'allowed_classes': ['with_mask'], 'hand_hygiene_required': True}
Expert system compliance rate: 0.040
Pure ML compliance rate:       0.460
First 10 expert vs ML: [(np.int64(0), np.int64(1)), (np.int64(0), np.int64(0)), (np.int64(0), np.int64(1)), (np.int64(0), np.int64(1)), (np.int64(0), np.int64(0)), (np.int64(0), np.int64(0)), (np.int64(0), np.int64(1)), (np.int64(0), np.int64(1)), (np.int64(1), np.int64(1)), (np.int64(0), np.int64(1))]
Sample 0:
Decision: NON-COMPLIANT. The system predicted 'with_mask' with probability 0.57 which is below the policy threshold 0.95 or not in the allowed classes.
Privacy-safe: {'decision': 'NON_COMPLIANT', 'predicted_class': 'with_mask', 'predicted_prob': 0.5658130872778954, 'pol