In [3]:
import csv
import json
from pathlib import Path
from datetime import datetime

# ----------------------------
# CREATE SAMPLE DATASETS
# ----------------------------
def create_demo_datasets():
    v1 = [
        {"id": "1", "customer": "Alice", "revenue": "1200.50", "industry_group": "Retail"},
        {"id": "2", "customer": "Bob", "revenue": "800.25", "industry_group": "Finance"},
    ]

    v2 = [
        {"id": "1", "customer": "Alice", "revenue": "high", "region_code": "IN"},
        {"id": "2", "customer": "Bob", "revenue": "low", "region_code": "US"},
    ]

    with open("dataset_v1.csv", "w", newline="", encoding="utf-8") as f:
        writer = csv.DictWriter(f, fieldnames=v1[0].keys())
        writer.writeheader()
        writer.writerows(v1)

    with open("dataset_v2.csv", "w", newline="", encoding="utf-8") as f:
        writer = csv.DictWriter(f, fieldnames=v2[0].keys())
        writer.writeheader()
        writer.writerows(v2)


In [4]:
# ----------------------------
# SCHEMA INFERENCE
# ----------------------------
def infer_type(value: str) -> str:
    if value == "" or value is None:
        return "null"
    v = value.strip().lower()
    if v in ("true", "false"):
        return "bool"
    try:
        int(value)
        return "int"
    except ValueError:
        pass
    try:
        float(value)
        return "float"
    except ValueError:
        pass
    return "string"


def infer_schema(csv_path: str, sample_rows: int = 1000):
    types_per_col = {}

    with open(csv_path, newline="", encoding="utf-8") as f:
        reader = csv.DictReader(f)
        for i, row in enumerate(reader):
            if i >= sample_rows:
                break
            for col, val in row.items():
                t = infer_type(val)
                if t != "null":
                    types_per_col.setdefault(col, []).append(t)

    schema = {}
    for col, detected_list in types_per_col.items():
        unique = set(detected_list)

        if unique == {"int"}:
            schema[col] = "int"
        elif unique <= {"int", "float"}:
            schema[col] = "float"
        elif unique == {"bool"}:
            schema[col] = "bool"
        else:
            schema[col] = "string"

    return schema

In [6]:
# ----------------------------
# SCHEMA COMPARISON
# ----------------------------
def compare_schemas(schema1, schema2):
    new_fields = sorted(list(set(schema2) - set(schema1)))
    removed_fields = sorted(list(set(schema1) - set(schema2)))

    type_mismatches = {
        col: [schema1[col], schema2[col]]
        for col in (set(schema1) & set(schema2))
        if schema1[col] != schema2[col]
    }

    return {
        "new_fields": new_fields,
        "removed_fields": removed_fields,
        "type_mismatches": type_mismatches
    }


# ----------------------------
# EXECUTE LOGIC
# ----------------------------
create_demo_datasets()

schema_v1 = infer_schema("dataset_v1.csv")
schema_v2 = infer_schema("dataset_v2.csv")

drift_report = compare_schemas(schema_v1, schema_v2)
print(json.dumps(drift_report, indent=2))

# Store report locally
Path("schema_drift_reports").mkdir(exist_ok=True)
timestamp = datetime.utcnow().strftime("%Y%m%dT%H%M%SZ")
file_path = f"schema_drift_reports/drift_{timestamp}.json"

with open(file_path, "w", encoding="utf-8") as f:
    json.dump({
        "timestamp": timestamp,
        "dataset_v1_schema": schema_v1,
        "dataset_v2_schema": schema_v2,
        "drift": drift_report
    }, f, indent=2)

print(f"\n Drift report saved to: {file_path}")

{
  "new_fields": [
    "region_code"
  ],
  "removed_fields": [
    "industry_group"
  ],
  "type_mismatches": {
    "revenue": [
      "float",
      "string"
    ]
  }
}

Drift report saved to: schema_drift_reports/drift_20251122T174405Z.json
