# 03 â€” Monitor + Govern (Access Signals + Retention Plan)

Simulates monitoring signals (exports, outliers) and applies a retention policy to create a governance plan.

In [16]:
import os, json, math, random, re, time
from datetime import datetime, timedelta, timezone
import pandas as pd

BASE_DIR = os.path.abspath(os.path.join(os.getcwd(), ".."))
OUT_DIR = os.path.join(BASE_DIR, "outputs")
os.makedirs(OUT_DIR, exist_ok=True)
os.makedirs(os.path.join(OUT_DIR, "audit_logs"), exist_ok=True)

print("BASE_DIR:", BASE_DIR)
print("OUT_DIR:", OUT_DIR)

BASE_DIR: /
OUT_DIR: /outputs


In [17]:

classified_path = os.path.join(OUT_DIR, "classified_telemetry.parquet")
df = pd.read_parquet(classified_path)
df.head()


Unnamed: 0,longitude,latitude,housing_median_age,total_rooms,total_bedrooms,population,households,median_income,median_house_value,tags,risk_score
0,-114.31,34.19,15.0,5612.0,1283.0,1015.0,472.0,1.4936,66900.0,[STANDARD],1
1,-114.47,34.4,19.0,7650.0,1901.0,1129.0,463.0,1.82,80100.0,[STANDARD],1
2,-114.56,33.69,17.0,720.0,174.0,333.0,117.0,1.6509,85700.0,[STANDARD],1
3,-114.57,33.64,14.0,1501.0,337.0,515.0,226.0,3.1917,73400.0,[STANDARD],1
4,-114.57,33.57,20.0,1454.0,326.0,624.0,262.0,1.925,65500.0,[STANDARD],1


In [18]:
import pandas as pd
import numpy as np

# Load the dataset mentioned in the previous steps
df = pd.read_csv('/content/sample_data/california_housing_train.csv')

# Recreate placeholder 'tags' and 'risk_score' columns.
# The previous context mentioned classification on 'median_income'.
# I'll create simple logic based on that for demonstration.
# Assuming 'PII' tag for higher median_income values and 'risk_score' based on it.
def get_tags_and_risk(median_income):
    tags = []
    risk_score = 0
    if median_income > 5:
        tags.append("HIGH_VALUE")
        risk_score += 3
    if median_income > 7:
        tags.append("VERY_HIGH_VALUE")
        risk_score += 2
    else:
        tags.append("STANDARD")
        risk_score += 1
    return {"tags": tags, "risk_score": risk_score}

# Apply the function and expand dictionary results into DataFrame columns
classification_results = df['median_income'].apply(get_tags_and_risk)
df['tags'] = classification_results.apply(lambda x: x['tags'])
df['risk_score'] = classification_results.apply(lambda x: x['risk_score'])

# Ensure OUT_DIR is defined
import os
if 'OUT_DIR' not in locals():
    BASE_DIR = os.path.abspath(os.path.join(os.getcwd(), ".."))
    OUT_DIR = os.path.join(BASE_DIR, "outputs")
    os.makedirs(OUT_DIR, exist_ok=True)

classified_path = os.path.join(OUT_DIR, "classified_telemetry.parquet")
df.to_parquet(classified_path, index=False)
print("Recreated and Wrote:", classified_path)
display(df[['median_income', 'tags', 'risk_score']].head())

Recreated and Wrote: /outputs/classified_telemetry.parquet


Unnamed: 0,median_income,tags,risk_score
0,1.4936,[STANDARD],1
1,1.82,[STANDARD],1
2,1.6509,[STANDARD],1
3,3.1917,[STANDARD],1
4,1.925,[STANDARD],1


In [19]:

# MONITOR: Simulate access signals + drift indicators
# (Public-safe: no real identity provider; purely synthetic signals)
random.seed(7)
df["access_actor"] = [random.choice(["svc-ingest", "svc-analytics", "user-ops", "user-dev"]) for _ in range(len(df))]
df["access_type"] = [random.choice(["read", "write", "export"]) for _ in range(len(df))]

# Add a synthetic 'sensor_temp_c' column for demonstration purposes
df["sensor_temp_c"] = [random.uniform(10.0, 100.0) for _ in range(len(df))]

# Simple drift signal: temperature outliers
df["temp_outlier"] = df["sensor_temp_c"].apply(lambda x: 1 if x > 85 else 0)
monitoring_summary = {
    "export_events": int((df["access_type"] == "export").sum()),
    "pii_exports": int(((df["access_type"] == "export") & (df["tags"].apply(lambda t: "PII" in t))).sum()),
    "temp_outliers": int(df["temp_outlier"].sum())
}
monitor_path = os.path.join(OUT_DIR, "monitoring_summary.json")
with open(monitor_path, "w") as f:
    json.dump(monitoring_summary, f, indent=2)
print("Wrote:", monitor_path)
monitoring_summary


Wrote: /outputs/monitoring_summary.json


{'export_events': 5739, 'pii_exports': 0, 'temp_outliers': 2881}

In [20]:

# GOVERN: Load retention policy and compute retention class per record
import yaml, os

# Create the policies directory if it doesn't exist
policies_dir = os.path.join(BASE_DIR, "policies")
os.makedirs(policies_dir, exist_ok=True)

policy_file = os.path.join(BASE_DIR, "policies", "retention_policy.yaml")

# Define a default retention policy if the file does not exist
if not os.path.exists(policy_file):
    default_policy_content = {
        'retention_rules': [
            {'tags': ['PII', 'HIGH_SENSITIVITY'], 'days': 7},
            {'tags': ['STANDARD'], 'days': 30}
        ]
    }
    with open(policy_file, "w") as f:
        yaml.dump(default_policy_content, f, indent=2)
    print(f"Created default retention policy file: {policy_file}")

with open(policy_file, "r") as f:
    policy = yaml.safe_load(f)

def retention_days_for(tags):
    # If PII/HIGH_SENSITIVITY present, stricter retention (7 days) else default (30 days)
    if "PII" in tags or "HIGH_SENSITIVITY" in tags:
        return 7
    return 30

df["retention_days"] = df["tags"].apply(retention_days_for)

governance_path = os.path.join(OUT_DIR, "governance_plan.parquet")
df.to_parquet(governance_path, index=False)
print("Wrote:", governance_path)
df[["tags","risk_score","retention_days"]].head(10)


Wrote: /outputs/governance_plan.parquet


Unnamed: 0,tags,risk_score,retention_days
0,[STANDARD],1,30
1,[STANDARD],1,30
2,[STANDARD],1,30
3,[STANDARD],1,30
4,[STANDARD],1,30
5,[STANDARD],1,30
6,[STANDARD],1,30
7,[STANDARD],1,30
8,[STANDARD],1,30
9,[STANDARD],1,30
