In [2]:
# ============================================================
# ChiEAC Secure Client Database (SCD) – Full Colab Project
# Zero-cost stack: Google Colab + Google Sheets + Gradio
# Features: AES-256-GCM client-side encryption, RBAC, ADE logic,
#           risk keyword locks, anonymized notes, audit logs
# ============================================================

# -----------------------------
# 0) Install dependencies
# -----------------------------
!pip -q install gspread google-auth cryptography gradio==4.44.0 pandas

# -----------------------------
# 1) Imports & Google auth
# -----------------------------
import os, json, base64, hashlib, secrets, time, re
from datetime import datetime
import pandas as pd

import gspread
from google.colab import auth
from google.auth import default as google_auth_default

from cryptography.hazmat.primitives.ciphers.aead import AESGCM
from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC
from cryptography.hazmat.primitives import hashes

import gradio as gr

# Authenticate this Colab session to your Google account
auth.authenticate_user()
creds, _ = google_auth_default()
gc = gspread.authorize(creds)
print("✅ Google authentication complete.")

# -----------------------------
# 2) Configuration
# -----------------------------
# If you already created a Google Sheet for this project, paste its URL here.
# Leave blank to auto-create a new one.
SHEET_URL = ""  # e.g., "https://docs.google.com/spreadsheets/d/xxxxx/edit"

ORG_NAME = "ChiEAC"
BOOTSTRAP_TABS = ["RBAC", "Clients", "Families", "CaseNotes", "AccessLogs", "Locks", "Config"]

# ADE risk keywords – edit in-sheet later (Config tab) if you prefer
DEFAULT_SENSITIVE_KEYWORDS = ["violence", "eviction", "suicidal", "abuse", "threat", "danger", "weapon", "self-harm"]

# Redaction (ADE) – volunteers see less PII
REDACT_FIELDS_FOR_ROLES = {
    "peer_volunteer": ["contact_phone", "address", "exact_dob", "legal_name"],
    "staff": [],
    "supervisor": []
}

# Notes anonymized for these roles
ANONYMIZE_NOTES_FOR_ROLES = ["peer_volunteer"]

# Key management:
#   Option A (recommended): set a strong passphrase (will derive a stable AES key)
KEY_DERIVATION_PASSPHRASE = "CHANGE_ME_TO_A_LONG_RANDOM_PASSPHRASE"

#   Option B: if left empty, a random key is generated and stored as a file in Colab VM
KEY_FILENAME = f"{ORG_NAME}_SCD_master_key.bin"  # Download & store safely if using Option B.

# -----------------------------
# 3) Open or create Google Sheet
# -----------------------------
def open_or_create_sheet(url: str):
    if url.strip():
        try:
            sh = gc.open_by_url(url.strip())
            print("✅ Opened existing sheet.")
            return sh
        except Exception as e:
            raise RuntimeError(f"Could not open provided sheet URL: {e}")
    # Create new
    title = f"{ORG_NAME} SCD Data"
    sh = gc.create(title)
    print(f"✅ Created new Google Sheet: {title}")
    print("🔗 Sheet URL:", sh.url)
    print("➡️  Share this sheet with collaborators (Editor for maintainers, Viewer for others).")
    return sh

sh = open_or_create_sheet(SHEET_URL)

# Ensure bootstrap tabs exist
existing = {w.title for w in sh.worksheets()}
for tab in BOOTSTRAP_TABS:
    if tab not in existing:
        sh.add_worksheet(tab, rows=200, cols=26)
        print(f"➕ Created tab: {tab}")

print("✅ Sheet is ready.")

# -----------------------------
# 4) Crypto: key mgmt + helpers
# -----------------------------
from cryptography.hazmat.primitives import hashes

def derive_key_from_passphrase(passphrase: str, salt: bytes) -> bytes:
    kdf = PBKDF2HMAC(
        algorithm=hashes.SHA256(),
        length=32,
        salt=salt,
        iterations=200_000,
    )
    return kdf.derive(passphrase.encode("utf-8"))

def load_or_create_key():
    if KEY_DERIVATION_PASSPHRASE:
        salt_path = KEY_FILENAME + ".salt"
        if os.path.exists(salt_path):
            with open(salt_path, "rb") as f:
                salt = f.read()
        else:
            salt = secrets.token_bytes(16)
            with open(salt_path, "wb") as f:
                f.write(salt)
        key = derive_key_from_passphrase(KEY_DERIVATION_PASSPHRASE, salt)
        print("🔐 Derived AES key from passphrase.")
        return key

    # Random key per session unless you download the file for reuse
    if os.path.exists(KEY_FILENAME):
        with open(KEY_FILENAME, "rb") as f:
            key = f.read()
        print("🔑 Loaded existing AES key from file.")
        return key
    key = AESGCM.generate_key(bit_length=256)
    with open(KEY_FILENAME, "wb") as f:
        f.write(key)
    print("🆕 Generated new AES key. Download & store securely to read old data later.")
    return key

AES_KEY = load_or_create_key()

def aes_gcm_encrypt(plaintext: str, key: bytes) -> str:
    if plaintext is None or plaintext == "":
        return ""
    aesgcm = AESGCM(key)
    nonce = secrets.token_bytes(12)
    ct = aesgcm.encrypt(nonce, plaintext.encode("utf-8"), None)
    return base64.b64encode(nonce + ct).decode("utf-8")

def aes_gcm_decrypt(b64_cipher: str, key: bytes) -> str:
    if not b64_cipher:
        return ""
    raw = base64.b64decode(b64_cipher)
    nonce, ct = raw[:12], raw[12:]
    aesgcm = AESGCM(key)
    pt = aesgcm.decrypt(nonce, ct, None)
    return pt.decode("utf-8")

def pseudonymize(value: str, salt: str) -> str:
    return hashlib.sha256((salt + "|" + value).encode("utf-8")).hexdigest()

print("✅ Crypto ready.")

# -----------------------------
# 5) Sheets helpers
# -----------------------------
def get_ws(name: str):
    try:
        return sh.worksheet(name)
    except gspread.WorksheetNotFound:
        sh.add_worksheet(name, rows=200, cols=26)
        return sh.worksheet(name)

def ensure_headers(ws, headers):
    existing = ws.row_values(1)
    if existing != headers:
        ws.clear()
        ws.update("A1", [headers])

def append_row(ws, row):
    ws.append_row(row, value_input_option="RAW")

def read_all(ws) -> pd.DataFrame:
    data = ws.get_all_records()
    return pd.DataFrame(data) if data else pd.DataFrame()

# -----------------------------
# 6) Bootstrap tabs with headers
# -----------------------------
# RBAC: email, role, approved
rbac = get_ws("RBAC")
ensure_headers(rbac, ["email", "role", "approved"])

# Clients: PII are encrypted
clients = get_ws("Clients")
ensure_headers(clients, [
    "client_id", "legal_name", "preferred_name", "dob", "contact_phone", "address",
    "created_at", "created_by", "family_id"
])

# Families
families = get_ws("Families")
ensure_headers(families, [
    "family_id", "display_name", "locked", "lock_reason", "locked_by", "locked_at"
])

# Case notes
notes = get_ws("CaseNotes")
ensure_headers(notes, [
    "note_id", "family_id", "author_email", "role", "created_at",
    "note_text", "is_anonymized", "risk_flagged", "risk_hits"
])

# Access logs
logs = get_ws("AccessLogs")
ensure_headers(logs, ["ts", "email", "role", "action", "resource", "resource_id", "details"])

# Locks history
locks = get_ws("Locks")
ensure_headers(locks, ["family_id", "status", "reason", "set_by", "ts"])

# Config with org salt and keywords
config_ws = get_ws("Config")
cfg_df = read_all(config_ws)
if cfg_df.empty:
    salt = base64.b64encode(secrets.token_bytes(16)).decode("utf-8")
    config_ws.update("A1", [["org_name","salt","keywords_json"]])
    config_ws.update("A2", [[ORG_NAME, salt, json.dumps(DEFAULT_SENSITIVE_KEYWORDS)]])
    print("🧩 Wrote default Config.")
else:
    print("🔧 Config already present.")

print("✅ Bootstrap complete.")

# -----------------------------
# 7) RBAC & ADE logic
# -----------------------------
def load_config():
    df = read_all(get_ws("Config"))
    if df.empty:
        raise RuntimeError("Missing Config row")
    row = df.iloc[0]
    return {
        "org_name": row["org_name"],
        "salt": row["salt"],
        "keywords": json.loads(row["keywords_json"])
    }

def load_rbac():
    df = read_all(get_ws("RBAC"))
    if df.empty:
        return {}
    rules = {}
    for _, r in df.iterrows():
        rules[str(r["email"]).strip().lower()] = {
            "role": str(r["role"]).strip(),
            "approved": str(r["approved"]).strip().lower() in ("true","yes","1")
        }
    return rules

def has_access(email: str, rbac: dict):
    user = rbac.get(email.lower())
    return bool(user and user["approved"])

def user_role(email: str, rbac: dict, default="peer_volunteer"):
    return rbac.get(email.lower(), {}).get("role", default)

def should_redact(field: str, role: str):
    return field in REDACT_FIELDS_FOR_ROLES.get(role, [])

def detect_risk(text: str, keywords: list):
    found = [kw for kw in keywords if re.search(rf"\b{re.escape(kw)}\b", text, flags=re.IGNORECASE)]
    return (len(found) > 0, found)

def audit(email, role, action, resource, resource_id, details=""):
    ws = get_ws("AccessLogs")
    append_row(ws, [datetime.utcnow().isoformat(), email, role, action, resource, resource_id, details])

# -----------------------------
# 8) CRUD + Locking
# -----------------------------
def upsert_family(display_name: str, requested_by: str):
    cfg = load_config()
    fam_id = pseudonymize(display_name.lower().strip(), cfg["salt"])[:16]
    ws = get_ws("Families")
    df = read_all(ws)
    exists = False
    if not df.empty and fam_id in df["family_id"].values:
        exists = True
    if not exists:
        append_row(ws, [fam_id, display_name, "False", "", "", ""])
        audit(requested_by, user_role(requested_by, load_rbac()), "CREATE", "Families", fam_id, f"display_name={display_name}")
    return fam_id

def create_client(creator_email: str, legal_name: str, preferred_name: str, dob: str, phone: str, address: str, family_display: str):
    rbac = load_rbac()
    if not has_access(creator_email, rbac):
        raise PermissionError("User not approved in RBAC. Add your email to the RBAC sheet and set approved=TRUE.")
    role = user_role(creator_email, rbac)
    cfg = load_config()
    fam_id = upsert_family(family_display, creator_email)
    client_id = pseudonymize(legal_name.lower().strip() + "|" + dob, cfg["salt"])[:16]

    ws = get_ws("Clients")
    enc_legal = aes_gcm_encrypt(legal_name, AES_KEY)
    enc_dob = aes_gcm_encrypt(dob, AES_KEY)
    enc_phone = aes_gcm_encrypt(phone, AES_KEY)
    enc_addr = aes_gcm_encrypt(address, AES_KEY)
    append_row(ws, [client_id, enc_legal, preferred_name, enc_dob, enc_phone, enc_addr,
                    datetime.utcnow().isoformat(), creator_email, fam_id])
    audit(creator_email, role, "CREATE", "Clients", client_id, f"family_id={fam_id}")
    return client_id, fam_id

def family_locked(family_id: str) -> bool:
    ws = get_ws("Families")
    df = read_all(ws)
    if df.empty: return False
    m = df[df["family_id"] == family_id]
    if m.empty: return False
    val = str(m.iloc[0]["locked"]).strip().lower()
    return val in ("true","yes","1")

def set_family_lock(family_id: str, status: bool, reason: str, by_email: str):
    wsF = get_ws("Families")
    df = read_all(wsF)
    if df.empty: return False
    idx = df.index[df["family_id"] == family_id]
    if len(idx) == 0: return False
    i = idx[0]
    df.loc[i, "locked"] = "True" if status else "False"
    df.loc[i, "lock_reason"] = reason
    df.loc[i, "locked_by"] = by_email
    df.loc[i, "locked_at"] = datetime.utcnow().isoformat()
    # Write back
    wsF.clear()
    wsF.update("A1", [df.columns.tolist()] + df.fillna("").values.tolist())

    wsL = get_ws("Locks")
    append_row(wsL, [family_id, "locked" if status else "unlocked", reason, by_email, datetime.utcnow().isoformat()])
    audit(by_email, user_role(by_email, load_rbac()), "UPDATE", "Families", family_id, f"lock={status} reason={reason}")
    return True

def add_case_note(author_email: str, family_id: str, note_text: str):
    rbac = load_rbac()
    if not has_access(author_email, rbac):
        raise PermissionError("User not approved in RBAC.")
    role = user_role(author_email, rbac)
    if family_locked(family_id) and role != "supervisor":
        raise PermissionError("Family record is locked. Supervisor required.")

    cfg = load_config()
    risk, hits = detect_risk(note_text, cfg["keywords"])
    if risk:
        set_family_lock(family_id, True, f"Risk keywords: {', '.join(hits)}", author_email)
    is_anon = role in ANONYMIZE_NOTES_FOR_ROLES or risk

    ws = get_ws("CaseNotes")
    note_id = pseudonymize(author_email + "|" + datetime.utcnow().isoformat(), cfg["salt"])[:16]
    enc_text = aes_gcm_encrypt(note_text, AES_KEY)
    append_row(ws, [note_id, family_id, author_email, role, datetime.utcnow().isoformat(),
                    enc_text, str(is_anon), str(risk), ",".join(hits)])
    audit(author_email, role, "CREATE", "CaseNotes", note_id, f"family_id={family_id}; risk={risk}; hits={hits}")
    return note_id, is_anon, risk, hits

def get_family_view(requestor_email: str, family_id: str):
    rbac = load_rbac()
    if not has_access(requestor_email, rbac):
        raise PermissionError("User not approved in RBAC.")
    role = user_role(requestor_email, rbac)

    # Clients
    wsC = get_ws("Clients")
    dfC = read_all(wsC)
    subsetC = dfC[dfC["family_id"] == family_id].copy()

    display_rows = []
    for _, r in subsetC.iterrows():
        row = dict(r)
        # Decrypt with ADE redaction
        try:
            row["legal_name"]    = "" if should_redact("legal_name", role)    else aes_gcm_decrypt(r["legal_name"], AES_KEY)
            row["dob"]           = "" if should_redact("exact_dob", role)     else aes_gcm_decrypt(r["dob"], AES_KEY)
            row["contact_phone"] = "" if should_redact("contact_phone", role) else aes_gcm_decrypt(r["contact_phone"], AES_KEY)
            row["address"]       = "" if should_redact("address", role)       else aes_gcm_decrypt(r["address"], AES_KEY)
        except Exception:
            pass
        display_rows.append(row)

    # Notes
    wsN = get_ws("CaseNotes")
    dfN = read_all(wsN)
    subsetN = dfN[dfN["family_id"] == family_id].copy()
    notes_out = []
    for _, n in subsetN.iterrows():
        original = ""
        try:
            original = aes_gcm_decrypt(n["note_text"], AES_KEY)
        except Exception:
            original = ""
        show_text = original
        if n["is_anonymized"] == "True" and role in ANONYMIZE_NOTES_FOR_ROLES:
            show_text = "[ANONYMIZED]"
        notes_out.append({
            "note_id": n["note_id"],
            "author_email": n["author_email"],
            "role": n["role"],
            "created_at": n["created_at"],
            "text": show_text,
            "risk_flagged": n["risk_flagged"],
            "risk_hits": n["risk_hits"]
        })

    audit(requestor_email, role, "READ", "Families", family_id, f"rows={len(display_rows)} notes={len(notes_out)}")
    return display_rows, notes_out, family_locked(family_id)

def supervisor_unlock(supervisor_email: str, family_id: str, reason: str = "Supervisor override"):
    rbac = load_rbac()
    role = user_role(supervisor_email, rbac)
    if role != "supervisor":
        raise PermissionError("Only supervisors can unlock records.")
    return set_family_lock(family_id, False, reason, supervisor_email)

# -----------------------------
# 9) Gradio UI (Intake / Notes / View / Unlock)
# -----------------------------
def ui_create_client(creator_email, legal_name, preferred_name, dob, phone, address, family_display):
    try:
        cid, fid = create_client(creator_email, legal_name, preferred_name, dob, phone, address, family_display)
        return f"✅ Client created. client_id={cid}, family_id={fid}"
    except Exception as e:
        return f"❌ {e}"

def ui_add_note(author_email, family_id, note_text):
    try:
        nid, anon, risk, hits = add_case_note(author_email, family_id, note_text)
        msg = f"✅ Note saved. note_id={nid}."
        if risk:
            msg += f" ⚠️ Risk detected: {', '.join(hits)}. Family locked."
        if anon:
            msg += " (Anonymized for some roles)"
        return msg
    except Exception as e:
        return f"❌ {e}"

def ui_view_family(requestor_email, family_id):
    try:
        rows, notes_list, locked = get_family_view(requestor_email, family_id)
        return {"locked": locked, "clients": rows, "notes": notes_list}
    except Exception as e:
        return {"error": str(e)}

def ui_unlock(supervisor_email, family_id, reason):
    try:
        ok = supervisor_unlock(supervisor_email, family_id, reason)
        return "✅ Unlocked." if ok else "❌ Family not found."
    except Exception as e:
        return f"❌ {e}"

with gr.Blocks() as demo:
    gr.Markdown("# ChiEAC Secure Client Database (SCD)")
    gr.Markdown("**Zero-cost, privacy-first** – Sheets backend, AES-256-GCM encryption, RBAC, ADE, audit logs.")

    with gr.Tab("Intake Client"):
        creator = gr.Textbox(label="Your email")
        legal = gr.Textbox(label="Legal name (encrypted)")
        pref = gr.Textbox(label="Preferred name")
        dob = gr.Textbox(label="Date of birth (YYYY-MM-DD) (encrypted)")
        phone = gr.Textbox(label="Phone (encrypted)")
        addr = gr.Textbox(label="Address (encrypted)")
        fam = gr.Textbox(label="Family display name (e.g., 'Gonzalez Family')")
        out1 = gr.Textbox(label="Result")
        gr.Button("Create Client").click(ui_create_client, [creator, legal, pref, dob, phone, addr, fam], out1)

    with gr.Tab("Add Case Note"):
        author = gr.Textbox(label="Your email")
        fam2 = gr.Textbox(label="Family ID")
        note = gr.Textbox(label="Note text", lines=6)
        out2 = gr.Textbox(label="Result")
        gr.Button("Save Note").click(ui_add_note, [author, fam2, note], out2)

    with gr.Tab("View Family"):
        req = gr.Textbox(label="Your email")
        fam3 = gr.Textbox(label="Family ID")
        out3 = gr.JSON(label="Family View")
        gr.Button("Load").click(ui_view_family, [req, fam3], out3)

    with gr.Tab("Supervisor Unlock"):
        sup = gr.Textbox(label="Supervisor email")
        fam4 = gr.Textbox(label="Family ID")
        reason = gr.Textbox(label="Reason")
        out4 = gr.Textbox(label="Result")
        gr.Button("Unlock").click(ui_unlock, [sup, fam4, reason], out4)

print("✅ All set. Launching app...")
demo.launch(share=False)


✅ Google authentication complete.
✅ Created new Google Sheet: ChiEAC SCD Data
🔗 Sheet URL: https://docs.google.com/spreadsheets/d/1uTLn_4laC3mkQl7NOiuYxms4BZ8MmLBPnEHKmRhlDe0
➡️  Share this sheet with collaborators (Editor for maintainers, Viewer for others).
➕ Created tab: RBAC
➕ Created tab: Clients
➕ Created tab: Families
➕ Created tab: CaseNotes
➕ Created tab: AccessLogs
➕ Created tab: Locks
➕ Created tab: Config
✅ Sheet is ready.
🔐 Derived AES key from passphrase.
✅ Crypto ready.


  ws.update("A1", [headers])
  config_ws.update("A1", [["org_name","salt","keywords_json"]])
  config_ws.update("A2", [[ORG_NAME, salt, json.dumps(DEFAULT_SENSITIVE_KEYWORDS)]])


🧩 Wrote default Config.
✅ Bootstrap complete.
✅ All set. Launching app...
Colab notebook detected. To show errors in colab notebook, set debug=True in launch()
Note: opening Chrome Inspector may crash demo inside Colab notebooks.

To create a public link, set `share=True` in `launch()`.


--------


<IPython.core.display.Javascript object>

