## M4e — Replace A1 Panel (if you’re using this instead of M4c)



In [None]:
# ===============================
# Beast — Phase A (MSMD ULTRA)
# Cell Label: M4e — A1 Panel (Replacement)
# ===============================
# Purpose:
# A modern, single-cell replacement for the legacy A1 panel.
# It centralizes One‑shot Backups, ⭐ Known Good snapshots, and Retention (Archive/Delete)
# with clear dry‑runs and safe fallbacks. Designed to work standalone or alongside
# other Phase‑A cells (M2/M3/M4b/c/d, TZ patch).
#
# Highlights
# - Backup Now → zips workspace/config/logs/data into /backups
# - ⭐ Known Good → zips the current workspace into /backups/known_good and updates LATEST.txt
# - Retention → Keep last N backups; extra are Archived or Deleted (your choice)
# - Dry‑run everywhere for safety, verbose logs, Drive‑aware, and timezone‑friendly.
#
# PREREQ: Run M1 to mount Drive and create the BEAST root.
# SAFE: Never modifies source data in place; uses copies/moves and zip writes.
#
# MSMD — Monkey See, Monkey Do
# 1) Preflight → ensure paths
# 2) BACKUPS tab: set Kind (optional) → Backup Now (or List/Logs)
# 3) KNOWN GOOD tab: set Note (optional) → Make ⭐
# 4) RETENTION tab: choose Keep last N + Mode → Dry‑Run → Apply
# 5) Use M3 to restore any zip at any time

import os, json, shutil, zipfile
from datetime import datetime
from pathlib import Path

from IPython.display import display, clear_output
import ipywidgets as W

# Detect Colab / Drive (informational)
try:
    from google.colab import drive as gdrive
    IN_COLAB = True
except Exception:
    IN_COLAB = False

# -----------------
# App Config
# -----------------
APP = {
    "LABEL": "Beast — Phase A",
    "CELL_ID": "M4e",
    "ROOT": "/content/drive/MyDrive/BEAST",
    "BACKUPS": "backups",
    "ARCHIVE": "backups/Archive",
    "KNOWN_GOOD": "backups/known_good",
    "RESTORE_POINTS": "backups/restore_points",
    "WORKSPACE": "workspace",
    "LOG_FILE": "logs/beast_session.log",
    "CONFIG_FILE": "config/settings.json",
}

ROOT = Path(APP["ROOT"])  # Path object
BACKUPS = ROOT / APP["BACKUPS"]
ARCHIVE = ROOT / APP["ARCHIVE"]
KNOWN_GOOD = ROOT / APP["KNOWN_GOOD"]
RESTORE_POINTS = ROOT / APP["RESTORE_POINTS"]
WORKSPACE = ROOT / APP["WORKSPACE"]
LOGF = ROOT / APP["LOG_FILE"]
CFG  = ROOT / APP["CONFIG_FILE"]
FALLBACK_LOG = Path("/content/beast_session.log")

# -----------------
# Logging
# -----------------

def _now():
    return datetime.now().strftime("%H:%M:%S")

def _stamp():
    return datetime.now().strftime("%Y-%m-%d_%H-%M-%S")

def log(msg: str):
    line = f"[{_now()}] {msg}"
    print(line)
    target = LOGF if ROOT.exists() else FALLBACK_LOG
    try:
        target.parent.mkdir(parents=True, exist_ok=True)
        with open(target, "a", encoding="utf-8") as f:
            f.write(line + "\n")
    except Exception:
        pass

# -----------------
# Helpers
# -----------------

def is_drive_mounted():
    if not IN_COLAB:
        return True
    return os.path.isdir("/content/drive") and (
        os.path.isdir("/content/drive/MyDrive") or os.path.isdir("/content/drive/My Drive")
    )


def ensure_paths():
    ROOT.mkdir(parents=True, exist_ok=True)
    (ROOT / "logs").mkdir(parents=True, exist_ok=True)
    BACKUPS.mkdir(parents=True, exist_ok=True)
    ARCHIVE.mkdir(parents=True, exist_ok=True)
    KNOWN_GOOD.mkdir(parents=True, exist_ok=True)
    RESTORE_POINTS.mkdir(parents=True, exist_ok=True)
    WORKSPACE.mkdir(parents=True, exist_ok=True)


def human_bytes(n):
    try:
        n = float(n)
    except Exception:
        return str(n)
    units = ["B","KB","MB","GB","TB"]
    i = 0
    while n >= 1024 and i < len(units)-1:
        n /= 1024.0
        i += 1
    return f"{n:.2f} {units[i]}"


def zip_dir(src_dir: Path, dst_zip: Path):
    with zipfile.ZipFile(dst_zip, 'w', zipfile.ZIP_DEFLATED) as z:
        for root, dirs, files in os.walk(src_dir):
            for name in files:
                full = Path(root) / name
                arc = full.relative_to(src_dir)
                z.write(full, arcname=str(arc))

# -----------------
# Backups — create & list
# -----------------

def list_backups_zip():
    if not BACKUPS.exists():
        return []
    items = []
    for p in BACKUPS.rglob("*.zip"):
        if any(base in p.parents for base in [ARCHIVE, KNOWN_GOOD, RESTORE_POINTS]):
            continue
        try:
            if p.is_file():
                items.append(p)
        except Exception:
            pass
    items = list(set(items))
    items.sort(key=lambda x: x.stat().st_mtime, reverse=True)
    return items


def backup_now(kind: str = "a1_manual"):
    ensure_paths()
    dst = BACKUPS / f"backup_{kind}_{_stamp()}.zip"
    with zipfile.ZipFile(dst, 'w', zipfile.ZIP_DEFLATED) as z:
        for rel in ["workspace", "config", "logs", "data"]:
            p = ROOT / rel
            if not p.exists():
                continue
            if p.is_file():
                z.write(p, arcname=str(Path(rel)))
            else:
                for root, dirs, files in os.walk(p):
                    for name in files:
                        full = Path(root) / name
                        arc = full.relative_to(ROOT)
                        z.write(full, arcname=str(arc))
    log(f"BACKUP: wrote {dst.name} ({human_bytes(dst.stat().st_size)})")
    return dst

# -----------------
# Known Good — create & list
# -----------------

def list_known_good_zip():
    if not KNOWN_GOOD.exists():
        return []
    items = [p for p in KNOWN_GOOD.glob("*.zip") if p.is_file()]
    items.sort(key=lambda p: p.stat().st_mtime, reverse=True)
    return items


def known_good_snapshot(note: str = "star"):
    ensure_paths()
    dst = KNOWN_GOOD / f"known_good_{note}_{_stamp()}.zip"
    if WORKSPACE.exists() and any(WORKSPACE.iterdir()):
        zip_dir(WORKSPACE, dst)
        (KNOWN_GOOD / "LATEST.txt").write_text(str(dst), encoding="utf-8")
        log(f"⭐ Known Good: {dst.name} ({human_bytes(dst.stat().st_size)})")
    else:
        log("⭐ Known Good skipped — workspace is empty")
        return None
    return dst

# -----------------
# Retention — plan & apply
# -----------------

def retention_plan(keep_last: int = 25, mode: str = "archive"):
    """Return (keep, retire) lists of backup zip Paths (excluding KG/Archive/Restore)."""
    zips = list_backups_zip()
    keep = zips[:max(keep_last, 0)]
    retire = zips[max(keep_last, 0):]
    return keep, retire, mode


def retention_apply(keep_last: int = 25, mode: str = "archive"):
    ensure_paths()
    keep, retire, mode = retention_plan(keep_last, mode)
    if not retire:
        log("RETENTION: nothing to retire — already within threshold")
        return {"kept": [p.name for p in keep], "retired": []}
    retired = []
    for p in retire:
        try:
            if mode == "delete":
                p.unlink()
                log(f"RETENTION: deleted {p.name}")
            else:
                ARCHIVE.mkdir(parents=True, exist_ok=True)
                dst = ARCHIVE / p.name
                shutil.move(str(p), str(dst))
                log(f"RETENTION: archived {p.name}")
            retired.append(p.name)
        except Exception as e:
            log(f"RETENTION WARN: {p.name} → {e}")
    return {"kept": [p.name for p in keep], "retired": retired}

# -----------------
# Config helpers (optional)
# -----------------

def load_cfg():
    try:
        if CFG.exists():
            return json.loads(CFG.read_text(encoding='utf-8'))
    except Exception as e:
        log(f"CONFIG READ WARN: {e}")
    return {}


def persist_retention_to_cfg(keep_last: int, mode: str):
    data = load_cfg() or {}
    data.setdefault("features", {})
    data["keep_last_backups"] = int(keep_last)
    data["features"]["retention_mode"] = mode
    CFG.parent.mkdir(parents=True, exist_ok=True)
    CFG.write_text(json.dumps(data, indent=2), encoding='utf-8')
    log("CONFIG: retention settings saved")

# -----------------
# UI
# -----------------
H = W.HTML

banner = H(
    f"""
<div style='font-family:system-ui,Segoe UI,Arial; line-height:1.35; padding:12px; border:1px solid #e5e7eb; border-radius:12px;'>
  <div style='font-size:18px; font-weight:700;'>🧩 {APP['LABEL']} — M4e A1 Panel Replacement <span style='opacity:.65'>(Cell {APP['CELL_ID']})</span></div>
  <div>Root: <code>{APP['ROOT']}</code> | Backups: <code>{APP['BACKUPS']}</code> | ⭐: <code>{APP['KNOWN_GOOD']}</code></div>
  <div>Flow: Preflight → (Backups | ⭐ Known Good | Retention)</div>
</div>
"""
)

# Common controls
btn_preflight = W.Button(description="Preflight", tooltip="Mount & ensure paths")
btn_logs      = W.Button(description="Open Logs", tooltip="Tail recent session log lines")

# --- Backups tab ---
kind_text   = W.Text(value="manual", description="Kind")
btn_backup  = W.Button(description="Backup Now", tooltip="Zip workspace/config/logs/data → backups/")
btn_list_bk = W.Button(description="List Recent", tooltip="Show top backups")

# --- Known Good tab ---
kg_note     = W.Text(value="star", description="Note")
btn_kg      = W.Button(description="Make ⭐ Known Good", tooltip="Snapshot workspace → known_good/")
btn_list_kg = W.Button(description="List Stars", tooltip="Show latest Known Good zips")

# --- Retention tab ---
keepn       = W.BoundedIntText(value=int(load_cfg().get("keep_last_backups", 25) or 25), min=1, max=500, description="Keep last N")
mode        = W.ToggleButtons(options=["archive","delete"], value=(load_cfg().get("features",{}).get("retention_mode","archive")), description="Mode")
btn_dry     = W.Button(description="Dry‑Run", tooltip="Show what would be archived/deleted")
btn_apply   = W.Button(description="Apply", tooltip="Apply retention now")
btn_savecfg = W.Button(description="Save as Default", tooltip="Persist keep+mode into config")

# Output box
out = W.Output(layout={"border":"1px solid #e5e7eb", "padding":"8px", "borderRadius":"8px"})

# Handlers

def on_preflight(_):
    with out:
        if not is_drive_mounted():
            log("WARNING: Drive not mounted — run M1 → Mount Drive first.")
        ensure_paths()
        for p in [WORKSPACE, BACKUPS, KNOWN_GOOD, ARCHIVE, RESTORE_POINTS]:
            log(f"PATH: {p} exists={p.exists()}")
        log("Preflight: ready ✅")


def on_backup(_):
    with out:
        ensure_paths()
        dst = backup_now(kind_text.value.strip() or "manual")
        if dst:
            log("TIP: Use M3 to restore any backup zip via Stage → Apply.")


def on_list_bk(_):
    with out:
        lst = list_backups_zip()
        if not lst:
            log("BACKUPS: none found")
            return
        for p in lst[:15]:
            log(f"BK → {p.name}  size={human_bytes(p.stat().st_size)}  mtime={datetime.fromtimestamp(p.stat().st_mtime)}")
        if len(lst) > 15:
            log(f"… and {len(lst)-15} more")


def on_kg(_):
    with out:
        dst = known_good_snapshot(kg_note.value.strip() or "star")
        if dst:
            log("⭐ TIP: Use M3 → Scan Sources → Known Good to restore this star.")


def on_list_kg(_):
    with out:
        lst = list_known_good_zip()
        if not lst:
            log("KNOWN GOOD: none yet — make one!")
            return
        for p in lst[:15]:
            log(f"⭐ → {p.name}  size={human_bytes(p.stat().st_size)}  mtime={datetime.fromtimestamp(p.stat().st_mtime)}")
        if len(lst) > 15:
            log(f"… and {len(lst)-15} more")


def on_dry(_):
    with out:
        keep, retire, m = retention_plan(int(keepn.value), mode.value)
        log(f"RETENTION DRY‑RUN: keep={len(keep)} retire={len(retire)} mode={m}")
        for p in retire[:20]:
            log(f"  RETIRE → {p.name}")
        if len(retire) > 20:
            log(f"  … and {len(retire)-20} more")


def on_apply(_):
    with out:
        res = retention_apply(int(keepn.value), mode.value)
        log(f"RETENTION DONE: kept={len(res['kept'])} retired={len(res['retired'])}")


def on_savecfg(_):
    with out:
        persist_retention_to_cfg(int(keepn.value), mode.value)


def on_logs(_):
    with out:
        log_path = LOGF if ROOT.exists() else FALLBACK_LOG
        if not log_path.exists():
            log(f"No log yet at {log_path}")
            return
        tail = 150
        log(f"Showing last {tail} lines of {log_path}")
        with open(log_path, 'r', encoding='utf-8') as f:
            lines = f.readlines()[-tail:]
        print("\n".join(lines).rstrip())

# Bind
btn_preflight.on_click(on_preflight)
btn_logs.on_click(on_logs)
btn_backup.on_click(on_backup)
btn_list_bk.on_click(on_list_bk)
btn_kg.on_click(on_kg)
btn_list_kg.on_click(on_list_kg)
btn_dry.on_click(on_dry)
btn_apply.on_click(on_apply)
btn_savecfg.on_click(on_savecfg)

# Layout — three tabs
backups_box = W.VBox([
    W.HBox([kind_text, btn_backup, btn_list_bk]),
])

kg_box = W.VBox([
    W.HBox([kg_note, btn_kg, btn_list_kg]),
])

ret_box = W.VBox([
    W.HBox([keepn, mode, btn_dry, btn_apply, btn_savecfg]),
])

tabs = W.Tab(children=[backups_box, kg_box, ret_box])
tabs.set_title(0, 'Backups')
tabs.set_title(1, 'Known Good ⭐')
tabs.set_title(2, 'Retention')

# Render
clear_output()
display(banner)
display(W.HBox([btn_preflight, btn_logs]))
display(tabs)
display(out)

log("M4e ready. Use Backups / ⭐ Known Good / Retention. Preflight first for best results.")


HTML(value="\n<div style='font-family:system-ui,Segoe UI,Arial; line-height:1.35; padding:12px; border:1px sol…

HBox(children=(Button(description='Preflight', style=ButtonStyle(), tooltip='Mount & ensure paths'), Button(de…

Tab(children=(VBox(children=(HBox(children=(Text(value='manual', description='Kind'), Button(description='Back…

Output(layout=Layout(border='1px solid #e5e7eb', padding='8px'))

[13:24:02] M4e ready. Use Backups / ⭐ Known Good / Retention. Preflight first for best results.


# Beast — Phase A (MSMD ULTRA)
## M1 — Fresh Start & Mount


In [None]:
## M0b — Timestamp Helper (utils)
# @title M0b — Timestamp Helper (utils)
# Purpose:
# - Provide a single helper `now_ts(tz='America/Regina')` for timezone-aware timestamps.
# - Prefer Python’s zoneinfo; fall back to pytz if needed.
# - Idempotent and safe; ToC-visible under Phase A.

from datetime import datetime

try:
    from zoneinfo import ZoneInfo  # Python 3.9+
except Exception:
    ZoneInfo = None

try:
    import pytz  # Fallback if zoneinfo unavailable
except Exception:
    pytz = None

def now_ts(tz: str = "America/Regina") -> datetime:
    """Return a timezone-aware datetime 'now' in the given tz."""
    if ZoneInfo is not None:
        try:
            return datetime.now(ZoneInfo(tz))
        except Exception:
            pass
    if pytz is not None:
        try:
            return datetime.now(pytz.timezone(tz))
        except Exception:
            pass
    # Final fallback: naive datetime
    return datetime.now()

print("[M0b] Timestamp helper ready: now_ts(tz='America/Regina').")


[M0b] Timestamp helper ready: now_ts(tz='America/Regina').


In [None]:
## M1a — Safe Unmount & Clean Mountpoint (temp)
# @title M1a — Safe Unmount & Clean Mountpoint (temp)
# Purpose:
# - Unmount Google Drive (flush), force-umount if needed, and wipe the local /content/drive mountpoint (VM only)
#   so M1 can mount cleanly. Does NOT delete any files from Google Drive; cleans only the VM mount folder.
# Placement:
# - Put directly ABOVE "M1 — Fresh Start & Mount (Enhanced v3)". Run this first when you need a clean remount.
# Notes:
# - Idempotent and safe to re-run. Skips destructive actions if still mounted.

import os, shutil, time, subprocess, sys

_MOUNTPOINT = "/content/drive"

def _is_mounted(path: str = _MOUNTPOINT) -> bool:
    try:
        with open("/proc/mounts", "r") as f:
            for line in f:
                parts = line.split()
                if len(parts) >= 2 and parts[1] == path:
                    return True
    except Exception:
        pass
    return False

def _run(cmd):
    try:
        proc = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True, check=False)
        return proc.returncode, proc.stdout.strip()
    except Exception as e:
        return -1, f"exec error: {e}"

def _try_flush_and_unmount():
    try:
        from google.colab import drive  # type: ignore
        print("[M1a] Using drive.flush_and_unmount()...")
        drive.flush_and_unmount()
        return True
    except Exception as e:
        print(f"[M1a] flush_and_unmount not available or failed: {e}")
        return False

def _force_unmount():
    if not _is_mounted():
        return True
    print("[M1a] Trying fusermount -u ...")
    rc, out = _run(["fusermount", "-u", _MOUNTPOINT])
    print(f"[M1a] fusermount rc={rc} out={out[:200]}")
    if _is_mounted():
        print("[M1a] Trying umount -l ...")
        rc, out = _run(["umount", "-l", _MOUNTPOINT])
        print(f"[M1a] umount -l rc={rc} out={out[:200]}")
    # give kernel a moment to update mounts
    for _ in range(10):
        if not _is_mounted():
            return True
        time.sleep(0.2)
    return not _is_mounted()

def _wipe_mountpoint(path: str = _MOUNTPOINT):
    if _is_mounted(path):
        print("[M1a] Skipping wipe: still mounted.")
        return False
    if not os.path.isdir(path):
        os.makedirs(path, exist_ok=True)
        print("[M1a] Mountpoint created (was missing).")
        return True
    # Remove contents but keep the mountpoint directory itself
    removed = 0
    for name in os.listdir(path):
        p = os.path.join(path, name)
        try:
            if os.path.islink(p) or os.path.isfile(p):
                os.unlink(p)
                removed += 1
            elif os.path.isdir(p):
                shutil.rmtree(p, ignore_errors=True)
                removed += 1
        except Exception as e:
            print(f"[M1a] Remove failed for {p}: {e}")
    print(f"[M1a] Wipe complete: removed {removed} item(s) from {_MOUNTPOINT}.")
    return True

def safe_unmount_and_clean():
    print("[M1a] Starting safe unmount & cleanup for /content/drive ...")
    mounted_before = _is_mounted()
    print(f"[M1a] Mounted before: {mounted_before}")

    ok = False
    if mounted_before:
        # Try Colab API first
        ok = _try_flush_and_unmount()
        # If still mounted, force unmount
        if _is_mounted():
            ok = _force_unmount()
    else:
        ok = True  # Already unmounted

    print(f"[M1a] Unmounted: {not _is_mounted()}")

    # Clean mountpoint only if unmounted
    if not _is_mounted():
        _wipe_mountpoint()
        print("[M1a] Ready for a clean mount by M1.")
        return True
    else:
        print("[M1a] Could not unmount; mountpoint not cleaned.")
        return False

# Auto-run for convenience (this is a temp utility cell)
_result = safe_unmount_and_clean()
print(f"[M1a] Done — status: {'OK' if _result else 'FAILED'}")


[M1a] Starting safe unmount & cleanup for /content/drive ...
[M1a] Mounted before: False
[M1a] Unmounted: True
[M1a] Wipe complete: removed 1 item(s) from /content/drive.
[M1a] Ready for a clean mount by M1.
[M1a] Done — status: OK


In [None]:
## M1 — Fresh Start & Mount (Enhanced v3)
# @title M1 — Fresh Start & Mount (Enhanced v3)
# Purpose:
# - Clean initialization of Beast Phase A.
# - Mounts Google Drive, sets paths, prepares environment.
# - Replaces all older M1 variants.
# Notes:
# - Delete original M1 before pasting this.
# - Idempotent: safe to re-run.

import os, datetime, pytz
from google.colab import drive

print("[M1] 🚀 Fresh start initializing...")

# Mount Google Drive
drive.mount('/content/drive', force_remount=True)

# Base paths
BEAST_ROOT = "/content/drive/MyDrive/Beast_PhaseA"
os.makedirs(BEAST_ROOT, exist_ok=True)

print(f"[M1] 📂 Mounted and ready at {BEAST_ROOT}")

# Timestamp check
tz = pytz.timezone("America/Regina")
now = datetime.datetime.now(tz)
print(f"[M1] 🕒 Timestamp: {now.strftime('%Y-%m-%d %H:%M:%S %Z')}")


[M1] 🚀 Fresh start initializing...
Mounted at /content/drive
[M1] 📂 Mounted and ready at /content/drive/MyDrive/Beast_PhaseA
[M1] 🕒 Timestamp: 2025-08-19 23:13:42 CST


In [None]:
## M1b1 — Drive Explorer (Mini, PhaseA-safe)
# @title M1b1 — Drive Explorer (Mini, PhaseA-safe)
# Purpose:
# - Lightweight, read-only explorer for BEAST_ROOT (/Beast_PhaseA) paths.
# - Clone-style of M1b but pinned to Phase A workspace.
# - ToC-friendly single cell. Run AFTER M1 (so BEAST_ROOT is defined).
# Notes:
# - Read-only: no delete/move operations.
# - Path traversal is blocked (cannot leave BEAST_ROOT).
# - Idempotent and safe to re-run.

import os, io, datetime, html
from typing import List, Tuple
from IPython.display import display, HTML

# Optional UI widgets
try:
    from ipywidgets import Button, Select, IntSlider, HBox, VBox, Output, Label
    _WIDGETS_OK = True
except Exception:
    _WIDGETS_OK = False

# ---- Config & helpers --------------------------------------------------------

_BEAST_ROOT = os.path.abspath(globals().get("BEAST_ROOT", "/content/drive/MyDrive/Beast_PhaseA"))
os.makedirs(_BEAST_ROOT, exist_ok=True)

_ALLOWED_PREVIEW_EXT = {".txt", ".log", ".json", ".jsonl", ".md", ".csv", ".py", ".cfg", ".ini", ".yml", ".yaml"}
_MAX_PREVIEW_BYTES = 1_000_000  # 1 MB cap for preview

def _human_bytes(n: int) -> str:
    for unit in ["B","KB","MB","GB","TB"]:
        if n < 1024:
            return f"{n:.0f} {unit}"
        n /= 1024
    return f"{n:.0f} PB"

def _ts_str(ts: float) -> str:
    return datetime.datetime.fromtimestamp(ts).strftime("%Y-%m-%d %H:%M:%S")

def _safe_path(rel_or_abs: str) -> str:
    """Resolve within BEAST_ROOT; block traversal."""
    p = rel_or_abs
    if not os.path.isabs(p):
        p = os.path.join(_BEAST_ROOT, p)
    rp = os.path.abspath(p)
    if not rp.startswith(_BEAST_ROOT):
        raise PermissionError("Path traversal blocked: outside BEAST_ROOT")
    return rp

def _list_dir(path: str) -> List[Tuple[str, bool, int, float]]:
    try:
        items = []
        for name in os.listdir(path):
            p = os.path.join(path, name)
            try:
                st = os.stat(p)
                items.append((name, os.path.isdir(p), st.st_size, st.st_mtime))
            except OSError:
                items.append((name, os.path.isdir(p), 0, 0.0))
        # Folders first, then files; both sorted by name
        items.sort(key=lambda x: (not x[1], x[0].lower()))
        return items
    except FileNotFoundError:
        return []

def _is_text_like(path: str) -> bool:
    ext = os.path.splitext(path)[1].lower()
    return ext in _ALLOWED_PREVIEW_EXT

def _preview_head(path: str, n_lines: int = 80) -> str:
    try:
        if os.path.getsize(path) > _MAX_PREVIEW_BYTES:
            note = f"[Note] File is large ({_human_bytes(os.path.getsize(path))}); showing first {n_lines} lines.\n"
        else:
            note = ""
        with io.open(path, "r", encoding="utf-8", errors="replace") as f:
            lines = []
            for i, line in enumerate(f, 1):
                if i > n_lines:
                    break
                lines.append(line)
        return note + "".join(lines)
    except Exception as e:
        return f"[Preview error] {e}"

def _preview_tail(path: str, n_lines: int = 80) -> str:
    try:
        # Simple tail without loading entire file
        from collections import deque
        with io.open(path, "r", encoding="utf-8", errors="replace") as f:
            dq = deque(f, maxlen=max(1, n_lines))
        return "".join(dq)
    except Exception as e:
        return f"[Tail error] {e}"

# ---- UI mode ----------------------------------------------------------------

def _render_header(path: str):
    rel = os.path.relpath(path, _BEAST_ROOT)
    if rel == ".":
        rel = "/"
    display(HTML(
        f"""
        <div style="padding:8px 10px;border-radius:10px;background:#1f1f1f;color:#eaeaea;
                    font-family:ui-monospace, SFMono-Regular, Menlo, Monaco, Consolas, 'Liberation Mono', 'Courier New', monospace;
                    display:block;margin-bottom:8px;">
          <div style="font-weight:600;">Beast — Drive Explorer (Phase A)</div>
          <div><b>Root:</b> {_BEAST_ROOT}</div>
          <div><b>Current:</b> {html.escape(rel)}</div>
        </div>
        """
    ))

def drive_explorer(start_path: str = None):
    """Render the PhaseA-safe explorer UI (read-only)."""
    base = _BEAST_ROOT
    cur = _safe_path(start_path or base)

    if not _WIDGETS_OK:
        # Fallback: print listing
        _render_header(cur)
        items = _list_dir(cur)
        if not items:
            print("[Explorer] Directory is empty.")
            return
        print("Folders:")
        for name, is_dir, size, mt in items:
            if is_dir:
                print(f"  [DIR] {name}  (modified { _ts_str(mt) })")
        print("\nFiles:")
        for name, is_dir, size, mt in items:
            if not is_dir:
                print(f"  {name}  {_human_bytes(size)}  (modified { _ts_str(mt) })")
        print("\nTip: Re-run after installing ipywidgets for interactive mode.")
        return

    # Widgets
    out = Output()
    lines = IntSlider(value=80, min=20, max=400, step=10, description="Lines")
    btn_up = Button(description="Up")
    btn_open = Button(description="Open")
    btn_refresh = Button(description="Refresh")
    btn_head = Button(description="View Head")
    btn_tail = Button(description="View Tail")

    entries = _list_dir(cur)
    # Format entries: prefix [DIR] for folders
    options = []
    for name, is_dir, size, mt in entries:
        label = f"[DIR] {name}" if is_dir else f"{name} — {_human_bytes(size)} — {_ts_str(mt)}"
        options.append(label)
    sel = Select(options=options, rows=12, layout={"width": "100%"})

    state = {"cur": cur, "entries": entries}

    def _refresh_listing():
        with out:
            out.clear_output()
            _render_header(state["cur"])
            entries = _list_dir(state["cur"])
            state["entries"] = entries
            sel.options = [
                (f"[DIR] {n}" if d else f"{n} — {_human_bytes(sz)} — {_ts_str(mt)}")
                for (n, d, sz, mt) in entries
            ]
            print(f"[Explorer] {len(entries)} item(s). Read-only mode.")

    def _selected_path():
        idx = sel.index
        if idx is None or idx < 0 or idx >= len(state["entries"]):
            return None, None
        name, is_dir, _, _ = state["entries"][idx]
        path = _safe_path(os.path.join(state["cur"], name))
        return path, is_dir

    def _on_up(_):
        try:
            parent = os.path.dirname(state["cur"])
            # Do not go above root
            if os.path.abspath(parent).startswith(_BEAST_ROOT):
                state["cur"] = parent
            _refresh_listing()
        except Exception as e:
            with out:
                print(f"[Explorer] Up error: {e}")

    def _on_open(_):
        path, is_dir = _selected_path()
        if not path:
            return
        if is_dir:
            state["cur"] = path
            _refresh_listing()
        else:
            with out:
                _render_header(state["cur"])
                print(f"[Open] {path}")
                if _is_text_like(path):
                    print(_preview_head(path, lines.value))
                else:
                    print("[Open] Non-text file. Preview skipped.")

    def _on_refresh(_):
        _refresh_listing()

    def _on_head(_):
        path, is_dir = _selected_path()
        if not path or is_dir:
            return
        with out:
            _render_header(state["cur"])
            print(f"[Head] {path}  (first {lines.value} lines)")
            if _is_text_like(path):
                print(_preview_head(path, lines.value))
            else:
                print("[Head] Non-text file. Preview skipped.")

    def _on_tail(_):
        path, is_dir = _selected_path()
        if not path or is_dir:
            return
        with out:
            _render_header(state["cur"])
            print(f"[Tail] {path}  (last {lines.value} lines)")
            if _is_text_like(path):
                print(_preview_tail(path, lines.value))
            else:
                print("[Tail] Non-text file. Tail skipped.")

    btn_up.on_click(_on_up)
    btn_open.on_click(_on_open)
    btn_refresh.on_click(_on_refresh)
    btn_head.on_click(_on_head)
    btn_tail.on_click(_on_tail)

    # Initial render
    _refresh_listing()

    controls = HBox([btn_up, btn_open, btn_refresh, btn_head, btn_tail, lines])
    display(VBox([controls, sel, out]))

# Auto-display on load for convenience
drive_explorer()

print("[M1b1] Drive Explorer (Mini, PhaseA-safe) ready.")


VBox(children=(HBox(children=(Button(description='Up', style=ButtonStyle()), Button(description='Open', style=…

[M1b1] Drive Explorer (Mini, PhaseA-safe) ready.


In [None]:
## M1b2 — Logs Quick Tail (temp)
# @title M1b2 — Logs Quick Tail (temp)
# Purpose:
# - Quick, disposable viewer to tail the latest Beast logs.
# - Finds recent *.log/*.txt/*.jsonl under BEAST_ROOT and common subdirs.
# - Optional UI (dropdown + lines slider + buttons). Safe to delete after use.
# Notes:
# - Run AFTER M1 so BEAST_ROOT is defined.
# - Idempotent: re-run anytime.

import os, glob, datetime, io
from collections import deque
from IPython.display import display, clear_output
try:
    from ipywidgets import Dropdown, IntSlider, Button, HBox, VBox, Output
    _WIDGETS_OK = True
except Exception:
    _WIDGETS_OK = False

# ---- Resolve base paths ----
_BEAST_ROOT = globals().get("BEAST_ROOT", "/content/drive/MyDrive/Beast_PhaseA")
_CANDIDATE_DIRS = [
    _BEAST_ROOT,
    os.path.join(_BEAST_ROOT, "logs"),
    os.path.join(_BEAST_ROOT, "out"),
    os.path.join(_BEAST_ROOT, "runs"),
    os.path.join(_BEAST_ROOT, "artifacts"),
]

_PATTERNS = ("*.log", "*.txt", "*.jsonl", "*.out")

def _human_bytes(n: int) -> str:
    for unit in ["B","KB","MB","GB","TB"]:
        if n < 1024:
            return f"{n:.0f} {unit}"
        n /= 1024
    return f"{n:.0f} PB"

def _list_candidate_logs(max_files: int = 50):
    files = []
    seen = set()
    for d in _CANDIDATE_DIRS:
        if not os.path.isdir(d):
            continue
        for pat in _PATTERNS:
            for p in glob.glob(os.path.join(d, "**", pat), recursive=True):
                if p in seen:
                    continue
                seen.add(p)
                try:
                    mtime = os.path.getmtime(p)
                    files.append((p, mtime))
                except OSError:
                    pass
    files.sort(key=lambda x: x[1], reverse=True)
    return files[:max_files]

def _tail_lines(path: str, n: int = 80) -> str:
    try:
        with io.open(path, "r", encoding="utf-8", errors="replace") as f:
            dq = deque(f, maxlen=max(1, n))
        return "".join(dq)
    except Exception as e:
        return f"[M1b2] ERROR reading file: {e}"

def quick_tail(file_path: str = None, n: int = 80):
    """Programmatic tail: prints the last n lines from file_path or newest log."""
    files = _list_candidate_logs()
    if not files:
        print(f"[M1b2] No candidate logs under: {_BEAST_ROOT}")
        return
    target = file_path or files[0][0]
    if not os.path.exists(target):
        print(f"[M1b2] File not found: {target}")
        return
    ts = datetime.datetime.fromtimestamp(os.path.getmtime(target))
    size = os.path.getsize(target)
    header = (
        f"=== Tail — {os.path.basename(target)} ===\n"
        f"Path: {target}\n"
        f"Modified: {ts:%Y-%m-%d %H:%M:%S}\n"
        f"Size: {_human_bytes(size)}\n"
        f"Lines: last {n}\n"
        f"===============================\n"
    )
    print(header + _tail_lines(target, n))

# ---- Optional UI ----
if _WIDGETS_OK:
    _files = _list_candidate_logs()
    if _files:
        options = []
        for p, mt in _files:
            ts = datetime.datetime.fromtimestamp(mt).strftime("%m-%d %H:%M:%S")
            size = _human_bytes(os.path.getsize(p))
            label = f"{os.path.basename(p)} — {ts} — {size}"
            options.append((label, p))
        dd = Dropdown(options=options, description="Log:", layout={"width": "80%"})
    else:
        dd = Dropdown(options=[("No logs found", "")], description="Log:", layout={"width": "80%"})

    lines = IntSlider(value=80, min=20, max=400, step=10, description="Lines")
    btn_tail = Button(description="Tail Now", tooltip="Show last N lines")
    btn_refresh = Button(description="Refresh List", tooltip="Rescan for logs")
    out = Output()

    def _on_tail(_):
        with out:
            clear_output()
            if not dd.value:
                print(f"[M1b2] No log selected.")
                return
            quick_tail(dd.value, lines.value)

    def _on_refresh(_):
        with out:
            clear_output()
            found = _list_candidate_logs()
            if not found:
                dd.options = [("No logs found", "")]
                print(f"[M1b2] No candidate logs under: {_BEAST_ROOT}")
                return
            options = []
            for p, mt in found:
                ts = datetime.datetime.fromtimestamp(mt).strftime("%m-%d %H:%M:%S")
                size = _human_bytes(os.path.getsize(p))
                label = f"{os.path.basename(p)} — {ts} — {size}"
                options.append((label, p))
            dd.options = options
            print(f"[M1b2] Refreshed. Found {len(options)} log file(s).")

    btn_tail.on_click(_on_tail)
    btn_refresh.on_click(_on_refresh)
    display(VBox([HBox([dd, btn_refresh, btn_tail]), lines, out]))
else:
    print("[M1b2] ipywidgets not available — using programmatic API only.")
    print("Usage: quick_tail(file_path=None, n=80)  # picks newest log if file_path=None")

print("[M1b2] Logs Quick Tail ready.")


VBox(children=(HBox(children=(Dropdown(description='Log:', layout=Layout(width='80%'), options=(('scheduled_jo…

[M1b2] Logs Quick Tail ready.


In [None]:
## M1c — Run Protocol Helper (Guided Execution)
# @title M1c — Run Protocol Helper (Guided Execution)
# Purpose:
# - Provides explicit Run/Click protocol instructions for M1 execution.
# - Enforces SCREEN READBACK discipline (last 2–3 output lines).

def run_m1_protocol():
    print("=== Run/Click Protocol — M1 Execution ===")
    print("1) CELL NAME: M1 — Fresh Start & Mount (Enhanced v3)")
    print("2) ACTION: RUN")
    print("3) RUN FIRST: Yes")
    print("4) DEPENDENCIES: None")
    print("5) STEPS:")
    print("   - Open ToC, locate: M1 — Fresh Start & Mount (Enhanced v3)")
    print("   - Click inside the cell body")
    print("   - Press ▶ Run")
    print("6) EXPECTED RESULT (examples):")
    print("   - [M1] 🚀 Fresh start initializing...")
    print("   - Mounted at /content/drive")
    print("   - [M1] 📂 Mounted and ready at /content/drive/MyDrive/Beast_PhaseA")
    print("   - [M1] 🕒 Timestamp: YYYY-MM-DD HH:MM:SS CST/CDT")
    print("7) IF NOT:")
    print("   - Re-run M1 once more.")
    print("   - If still failing → capture SCREEN READBACK (last 2–3 lines) and stop.")
    print("⚠️ SCREEN READBACK REQUIRED after success (paste last 2–3 lines).")
    print("==========================================")

# Auto-display protocol on load for quick reference:
run_m1_protocol()


=== Run/Click Protocol — M1 Execution ===
1) CELL NAME: M1 — Fresh Start & Mount (Enhanced v3)
2) ACTION: RUN
3) RUN FIRST: Yes
4) DEPENDENCIES: None
5) STEPS:
   - Open ToC, locate: M1 — Fresh Start & Mount (Enhanced v3)
   - Click inside the cell body
   - Press ▶ Run
6) EXPECTED RESULT (examples):
   - [M1] 🚀 Fresh start initializing...
   - Mounted at /content/drive
   - [M1] 📂 Mounted and ready at /content/drive/MyDrive/Beast_PhaseA
   - [M1] 🕒 Timestamp: YYYY-MM-DD HH:MM:SS CST/CDT
7) IF NOT:
   - Re-run M1 once more.
   - If still failing → capture SCREEN READBACK (last 2–3 lines) and stop.
⚠️ SCREEN READBACK REQUIRED after success (paste last 2–3 lines).


In [None]:
!ls -lt /content/drive/MyDrive/BEAST/backups | head -15


total 93
drwx------ 2 root root  4096 Aug 19 13:31 known_good
-rw------- 1 root root 36069 Aug 19 13:28 backup_manual_2025-08-19_13-28-40.zip
-rw------- 1 root root 36010 Aug 19 13:28 backup_manual_2025-08-19_13-27-58.zip
drwx------ 2 root root  4096 Aug 16 00:29 restore_points
-rw------- 1 root root  2684 Aug 12 16:22 backup_one_shot_2025-08-12_16-22-38.zip
-rw------- 1 root root  2551 Aug 12 16:07 backup_one_shot_2025-08-12_16-07-17.zip
-rw------- 1 root root   998 Aug 12 13:09 backup_scheduled_2025-08-12_19-09-36.zip
-rw------- 1 root root   976 Aug 12 13:08 backup_scheduled_2025-08-12_19-08-26.zip
-rw------- 1 root root   954 Aug 12 13:07 backup_scheduled_2025-08-12_19-07-16.zip
-rw------- 1 root root   908 Aug 12 13:06 backup_scheduled_2025-08-12_19-06-06.zip
drwx------ 2 root root  4096 Aug 12 12:57 Archive


In [None]:
!tail -n 20 /content/drive/MyDrive/BEAST/logs/beast_session.log


[19:14:09] M2 ready. Scan → set Keep last N → Dry-Run → Apply. Use Archive mode to keep history clean without deleting. Known Good snapshots are never touched here.
[19:15:07] M3 ready. Scan → select zip → Stage → Dry-Run → Apply. Rollback available via the latest restore point.
[13:16:04] TZ active: America/Regina
[13:16:50] CONFIG: loaded
[13:16:50] M4 ready. Load → Edit → Validate → Save (backup) → Diagnostics. Root path is view-only here; ask to run a migration tool if needed.
[13:24:02] M4e ready. Use Backups / ⭐ Known Good / Retention. Preflight first for best results.
[13:27:01] PATH: /content/drive/MyDrive/BEAST/workspace exists=True
[13:27:01] PATH: /content/drive/MyDrive/BEAST/backups exists=True
[13:27:01] PATH: /content/drive/MyDrive/BEAST/backups/known_good exists=True
[13:27:01] PATH: /content/drive/MyDrive/BEAST/backups/Archive exists=True
[13:27:01] PATH: /content/drive/MyDrive/BEAST/backups/restore_points exists=True
[13:27:01] Preflight: ready ✅
[13:28:19] BACKUP: wro

In [None]:
ls -lt /content/drive/MyDrive/BEAST/logs | head -15


total 47
-rw------- 1 root root 41990 Aug 19 13:33 beast_session.log
drwx------ 2 root root  4096 Aug 19 04:12 [0m[01;34msuggestions[0m/
-rw------- 1 root root    39 Aug 18 22:50 m1_probe.txt
-rw------- 1 root root    69 Aug 12 13:16 checks.log


In [None]:
!ls -R /content/drive/MyDrive/BEAST | head -20


/content/drive/MyDrive/BEAST:
backups
Backups
config
data
logs
workspace
workspace__after_failed_2025-08-16_06-34-19

/content/drive/MyDrive/BEAST/backups:
Archive
backup_manual_2025-08-19_13-27-58.zip
backup_manual_2025-08-19_13-28-40.zip
backup_one_shot_2025-08-12_16-07-17.zip
backup_one_shot_2025-08-12_16-22-38.zip
backup_scheduled_2025-08-12_19-06-06.zip
backup_scheduled_2025-08-12_19-07-16.zip
backup_scheduled_2025-08-12_19-08-26.zip
backup_scheduled_2025-08-12_19-09-36.zip
known_good



## M2 — Retention & Archive Control

In [None]:
# ===============================
# Beast — Phase A (MSMD ULTRA)
# Cell Label: M2 — Backup Retention & Archive Manager
# ===============================
# Purpose: Manage how many rolling backups to keep, and what to do with older ones
# (archive or delete). Also provides quick status, listing, and safe preview of a
# backup's contents (no restore yet).
#
# Prereq: Run M1 first (Fresh Start & Mount) so /content/drive is mounted and the
# base structure exists, then A1 if you want auto-backups running.
#
# MSMD — Monkey See, Monkey Do Steps
# 1) Click "Scan" to see counts & sizes of backups and archives
# 2) Set "Keep last N" (default 25) and choose Action Mode: Archive old / Delete old
# 3) Click "Dry-Run Retention" to preview what will move/delete
# 4) Click "Apply Retention" to execute
# 5) (Optional) "List Last 10" or "Preview Selected" to inspect a backup zip
# 6) (Optional) "Open Logs" to view recent actions

import os, json, shutil, zipfile
from datetime import datetime
from pathlib import Path

from IPython.display import display, clear_output
import ipywidgets as W

# Detect Colab / Drive
try:
    from google.colab import drive as gdrive
    IN_COLAB = True
except Exception:
    IN_COLAB = False

# -----------------
# App Config
# -----------------
APP = {
    "LABEL": "Beast — Phase A",
    "CELL_ID": "M2",
    "ROOT": "/content/drive/MyDrive/BEAST",
    "BACKUPS": "backups",
    "ARCHIVE": "backups/Archive",
    "KNOWN_GOOD": "backups/known_good",
    "LOG_FILE": "logs/beast_session.log",
    "KEEP_DEFAULT": 25,
}

ROOT = Path(APP["ROOT"])  # Path object
BACKUPS = ROOT / APP["BACKUPS"]
ARCHIVE = ROOT / APP["ARCHIVE"]
KNOWN_GOOD = ROOT / APP["KNOWN_GOOD"]
LOG_PATH_FALLBACK = Path("/content/beast_session.log")

# -----------------
# Logging
# -----------------

def _now():
    return datetime.now().strftime("%H:%M:%S")

def log(msg):
    line = f"[{_now()}] {msg}"
    print(line)
    target = (ROOT / APP["LOG_FILE"]) if ROOT.exists() else LOG_PATH_FALLBACK
    try:
        target.parent.mkdir(parents=True, exist_ok=True)
        with open(target, "a", encoding="utf-8") as f:
            f.write(line + "\n")
    except Exception:
        pass

# -----------------
# Helpers
# -----------------

def is_drive_mounted():
    if not IN_COLAB:
        return True
    return os.path.isdir("/content/drive") and (
        os.path.isdir("/content/drive/MyDrive") or os.path.isdir("/content/drive/My Drive")
    )


def ensure_paths():
    # Minimal ensure to avoid errors when scanning
    (ROOT).mkdir(parents=True, exist_ok=True)
    (ROOT / "logs").mkdir(parents=True, exist_ok=True)
    BACKUPS.mkdir(parents=True, exist_ok=True)
    ARCHIVE.mkdir(parents=True, exist_ok=True)
    KNOWN_GOOD.mkdir(parents=True, exist_ok=True)


def human_bytes(n):
    try:
        n = float(n)
    except Exception:
        return str(n)
    units = ["B","KB","MB","GB","TB"]
    i = 0
    while n >= 1024 and i < len(units)-1:
        n /= 1024.0
        i += 1
    return f"{n:.2f} {units[i]}"


def list_backups():
    """List backup zips in backups/ (excluding known_good and Archive). Sorted new→old."""
    if not BACKUPS.exists():
        return []
    items = []
    for p in BACKUPS.glob("*.zip"):
        # safety: if someone placed a zip at the root, include; skip folders handled separately
        try:
            if p.is_file():
                items.append(p)
        except Exception:
            pass
    # Also include nested non-Archive, non-known_good zips under backups (rare)
    for p in BACKUPS.rglob("*.zip"):
        if KNOWN_GOOD in p.parents or ARCHIVE in p.parents:
            continue
        if p.parent == BACKUPS:
            continue  # already included above
        try:
            if p.is_file():
                items.append(p)
        except Exception:
            pass
    items = list(set(items))
    items.sort(key=lambda x: x.stat().st_mtime, reverse=True)
    return items


def list_archive():
    if not ARCHIVE.exists():
        return []
    items = [p for p in ARCHIVE.glob("*.zip") if p.is_file()]
    items.sort(key=lambda x: x.stat().st_mtime, reverse=True)
    return items


def scan_status():
    ensure_paths()
    bk = list_backups()
    ar = list_archive()
    kg = []
    if KNOWN_GOOD.exists():
        kg = [p for p in KNOWN_GOOD.glob("*.zip") if p.is_file()]
    size_bk = sum(p.stat().st_size for p in bk) if bk else 0
    size_ar = sum(p.stat().st_size for p in ar) if ar else 0
    size_kg = sum(p.stat().st_size for p in kg) if kg else 0
    log(f"STATUS: backups={len(bk)} ({human_bytes(size_bk)}), archive={len(ar)} ({human_bytes(size_ar)}), known_good={len(kg)} ({human_bytes(size_kg)})")
    if bk:
        log(f"Newest backup: {bk[0].name}  |  Oldest backup: {bk[-1].name}")
    else:
        log("No backups found in backups/ yet — run A1 One-shot or Scheduler.")
    return bk, ar, kg


def plan_retention(retain_n: int):
    bk, _, _ = scan_status()
    keep = bk[:max(retain_n, 0)]
    old = bk[max(retain_n, 0):]
    return keep, old


def do_archive(paths):
    ARCHIVE.mkdir(parents=True, exist_ok=True)
    for p in paths:
        dest = ARCHIVE / p.name
        # If exists, add a suffix to avoid overwrite
        if dest.exists():
            base = dest.stem
            suf = datetime.now().strftime("%Y%m%d_%H%M%S")
            dest = dest.with_name(f"{base}__arc_{suf}.zip")
        try:
            shutil.move(str(p), str(dest))
            log(f"ARCHIVE: moved {p.name} → Archive/{dest.name}")
        except Exception as e:
            log(f"ARCHIVE ERROR: {p.name} — {e}")


def do_delete(paths):
    for p in paths:
        try:
            size = p.stat().st_size
            p.unlink()
            log(f"DELETE: removed {p.name} ({human_bytes(size)})")
        except Exception as e:
            log(f"DELETE ERROR: {p.name} — {e}")


def preview_zip_contents(path: Path, max_items=50):
    try:
        with zipfile.ZipFile(path, 'r') as z:
            names = z.namelist()
            count = len(names)
            head = names[:max_items]
            for n in head:
                log(f"ZIP: {n}")
            if count > max_items:
                log(f"… and {count - max_items} more items")
    except Exception as e:
        log(f"ZIP PREVIEW ERROR: {e}")

# -----------------
# UI
# -----------------
H = W.HTML

banner = H(
    f"""
<div style='font-family:system-ui,Segoe UI,Arial; line-height:1.35; padding:12px; border:1px solid #e5e7eb; border-radius:12px;'>
  <div style='font-size:18px; font-weight:700;'>🗄️ {APP['LABEL']} — M2 Retention & Archive <span style='opacity:.65'>(Cell {APP['CELL_ID']})</span></div>
  <ol style='margin:6px 0 8px 18px; padding:0;'>
    <li>Scan</li>
    <li>Set retention & mode</li>
    <li>Dry-Run Retention</li>
    <li>Apply Retention</li>
    <li>(Optional) List & Preview</li>
    <li>(Optional) Open Logs</li>
  </ol>
  <div>Root: <code>{APP['ROOT']}</code> | Backups: <code>{APP['BACKUPS']}</code> | Archive: <code>{APP['ARCHIVE']}</code></div>
</div>
"""
)

btn_scan   = W.Button(description="Scan", tooltip="Show counts & sizes of backups/archives")
keep_n     = W.BoundedIntText(description="Keep last N", min=1, max=500, value=APP["KEEP_DEFAULT"], tooltip="How many newest backups to keep in backups/")
mode       = W.ToggleButtons(options=["Archive old", "Delete old"], value="Archive old", description="Action")
btn_dry    = W.Button(description="Dry-Run Retention", tooltip="Preview which backups will be archived/deleted")
btn_apply  = W.Button(description="Apply Retention", tooltip="Execute retention action")
btn_list   = W.Button(description="List Last 10", tooltip="List newest 10 backups")

# Dynamic dropdown for preview
bk_dropdown = W.Dropdown(options=[], description="Select backup", layout=W.Layout(width='60%'))
btn_preview = W.Button(description="Preview Selected", tooltip="List entries inside the chosen zip (safe)")

btn_logs   = W.Button(description="Open Logs", tooltip="Tail last lines of the session log")

out = W.Output(layout={"border":"1px solid #e5e7eb", "padding":"8px", "borderRadius":"8px"})

# Handlers

def on_scan(_):
    with out:
        if not is_drive_mounted():
            log("WARNING: Drive not mounted — run M1 → Mount Drive first.")
        ensure_paths()
        bk, ar, kg = scan_status()
        # update dropdown
        bk_dropdown.options = [(p.name, str(p)) for p in bk]


def on_dry(_):
    with out:
        ensure_paths()
        keep, old = plan_retention(keep_n.value)
        log(f"RETENTION PLAN (dry-run): keep={len(keep)} old={len(old)} mode={mode.value}")
        for p in old[:20]:
            log(f"  OLD → {p.name}")
        if len(old) > 20:
            log(f"  … and {len(old)-20} more")


def on_apply(_):
    with out:
        ensure_paths()
        keep, old = plan_retention(keep_n.value)
        if not old:
            log("Nothing to do — already within retention.")
            return
        if mode.value == "Archive old":
            do_archive(old)
        else:
            do_delete(old)
        scan_status()


def on_list(_):
    with out:
        ensure_paths()
        bk = list_backups()
        for p in bk[:10]:
            log(f"BK: {p.name}  ({human_bytes(p.stat().st_size)})  mtime={datetime.fromtimestamp(p.stat().st_mtime)}")
        if len(bk) == 0:
            log("No backups in backups/ yet.")


def on_preview(_):
    with out:
        val = bk_dropdown.value
        if not val:
            log("Select a backup first from the dropdown.")
            return
        preview_zip_contents(Path(val))


def on_logs(_):
    with out:
        log_path = (ROOT / APP["LOG_FILE"]) if ROOT.exists() else LOG_PATH_FALLBACK
        if not log_path.exists():
            log(f"No log yet at {log_path}")
            return
        tail = 120
        log(f"Showing last {tail} lines of {log_path}")
        with open(log_path, 'r', encoding='utf-8') as f:
            lines = f.readlines()[-tail:]
        print("\n".join(lines).rstrip())

# Bind
btn_scan.on_click(on_scan)
btn_dry.on_click(on_dry)
btn_apply.on_click(on_apply)
btn_list.on_click(on_list)
btn_preview.on_click(on_preview)
btn_logs.on_click(on_logs)

# Layout & render
row1 = W.HBox([btn_scan, keep_n, mode])
row2 = W.HBox([btn_dry, btn_apply])
row3 = W.HBox([btn_list, bk_dropdown, btn_preview, btn_logs])

clear_output()
display(banner)
display(row1)
display(row2)
display(row3)
display(out)

log("M2 ready. Scan → set Keep last N → Dry-Run → Apply. Use Archive mode to keep history clean without deleting. Known Good snapshots are never touched here.")


HTML(value="\n<div style='font-family:system-ui,Segoe UI,Arial; line-height:1.35; padding:12px; border:1px sol…

HBox(children=(Button(description='Scan', style=ButtonStyle(), tooltip='Show counts & sizes of backups/archive…

HBox(children=(Button(description='Dry-Run Retention', style=ButtonStyle(), tooltip='Preview which backups wil…

HBox(children=(Button(description='List Last 10', style=ButtonStyle(), tooltip='List newest 10 backups'), Drop…

Output(layout=Layout(border='1px solid #e5e7eb', padding='8px'))

[00:28:46] M2 ready. Scan → set Keep last N → Dry-Run → Apply. Use Archive mode to keep history clean without deleting. Known Good snapshots are never touched here.


## M3 — Restore Manager

In [None]:
# ===============================
# Beast — Phase A (MSMD ULTRA)
# Cell Label: M3 — Backup Browser & Restore Manager
# ===============================
# Purpose: Safely restore your workspace from any backup zip or Known Good snapshot.
# - Browse & preview zip contents
# - Stage a restore (extract to a staging area)
# - Dry-run apply with a full diff plan
# - Apply restore in two modes: Merge (no deletes) or Replace workspace
# - Auto-create a rollback point before applying; one-click Rollback support
#
# PREREQ: Run M1 to mount Drive and create base folders. Use A1 to make backups.
# SAFETY: M3 never touches backup zips; it only reads them. Applies include a
#         pre-restore snapshot saved to backups/restore_points for quick rollback.
#
# MSMD — Monkey See, Monkey Do
# 1) Scan Sources → choose source type (Backups / Known Good)
# 2) Select a zip → Preview Selected (optional)
# 3) Stage Restore → unpacks into staging folder
# 4) Choose Mode: Merge (no deletes) / Replace workspace
# 5) Dry-Run Apply → review planned actions
# 6) Apply Restore → confirm in output logs
# 7) (Optional) Rollback Last Restore if needed

import os, json, shutil, zipfile
from datetime import datetime
from pathlib import Path

from IPython.display import display, clear_output
import ipywidgets as W

# Detect Colab / Drive
try:
    from google.colab import drive as gdrive
    IN_COLAB = True
except Exception:
    IN_COLAB = False

# -----------------
# App Config
# -----------------
APP = {
    "LABEL": "Beast — Phase A",
    "CELL_ID": "M3",
    "ROOT": "/content/drive/MyDrive/BEAST",
    "BACKUPS": "backups",
    "ARCHIVE": "backups/Archive",
    "KNOWN_GOOD": "backups/known_good",
    "RESTORE_POINTS": "backups/restore_points",
    "STAGING": "workspace_restore_staging",
    "WORKSPACE": "workspace",
    "LOG_FILE": "logs/beast_session.log",
}

ROOT = Path(APP["ROOT"])  # Path object
BACKUPS = ROOT / APP["BACKUPS"]
ARCHIVE = ROOT / APP["ARCHIVE"]
KNOWN_GOOD = ROOT / APP["KNOWN_GOOD"]
RESTORE_POINTS = ROOT / APP["RESTORE_POINTS"]
STAGING = ROOT / APP["STAGING"]
WORKSPACE = ROOT / APP["WORKSPACE"]
LOG_PATH_FALLBACK = Path("/content/beast_session.log")

# -----------------
# Logging
# -----------------

def _now():
    return datetime.now().strftime("%H:%M:%S")

def _stamp():
    return datetime.now().strftime("%Y-%m-%d_%H-%M-%S")

def log(msg):
    line = f"[{_now()}] {msg}"
    print(line)
    target = (ROOT / APP["LOG_FILE"]) if ROOT.exists() else LOG_PATH_FALLBACK
    try:
        target.parent.mkdir(parents=True, exist_ok=True)
        with open(target, "a", encoding="utf-8") as f:
            f.write(line + "\n")
    except Exception:
        pass

# -----------------
# Helpers
# -----------------

def is_drive_mounted():
    if not IN_COLAB:
        return True
    return os.path.isdir("/content/drive") and (
        os.path.isdir("/content/drive/MyDrive") or os.path.isdir("/content/drive/My Drive")
    )


def ensure_paths():
    (ROOT).mkdir(parents=True, exist_ok=True)
    (ROOT / "logs").mkdir(parents=True, exist_ok=True)
    BACKUPS.mkdir(parents=True, exist_ok=True)
    KNOWN_GOOD.mkdir(parents=True, exist_ok=True)
    ARCHIVE.mkdir(parents=True, exist_ok=True)
    RESTORE_POINTS.mkdir(parents=True, exist_ok=True)
    # staging not created until needed


def human_bytes(n):
    try:
        n = float(n)
    except Exception:
        return str(n)
    units = ["B","KB","MB","GB","TB"]
    i = 0
    while n >= 1024 and i < len(units)-1:
        n /= 1024.0
        i += 1
    return f"{n:.2f} {units[i]}"


def list_backups_zip():
    """List zip files in backups/ excluding Archive, known_good, and restore_points. New→old."""
    if not BACKUPS.exists():
        return []
    items = []
    for p in BACKUPS.rglob("*.zip"):
        if any(base in p.parents for base in [ARCHIVE, KNOWN_GOOD, RESTORE_POINTS]):
            continue
        try:
            if p.is_file():
                items.append(p)
        except Exception:
            pass
    items = list(set(items))
    items.sort(key=lambda x: x.stat().st_mtime, reverse=True)
    return items


def list_known_good_zip():
    if not KNOWN_GOOD.exists():
        return []
    items = [p for p in KNOWN_GOOD.glob("*.zip") if p.is_file()]
    items.sort(key=lambda x: x.stat().st_mtime, reverse=True)
    return items


def list_restore_points():
    if not RESTORE_POINTS.exists():
        return []
    items = [p for p in RESTORE_POINTS.glob("*.zip") if p.is_file()]
    items.sort(key=lambda x: x.stat().st_mtime, reverse=True)
    return items

# Zip safety and utilities

def safe_extract(zip_path: Path, dest: Path):
    with zipfile.ZipFile(zip_path, 'r') as z:
        for m in z.infolist():
            # prevent zip-slip
            outp = (dest / m.filename).resolve()
            if not str(outp).startswith(str(dest.resolve())):
                raise RuntimeError(f"Unsafe member path: {m.filename}")
        z.extractall(dest)


def zip_dir(src_dir: Path, dst_zip: Path):
    with zipfile.ZipFile(dst_zip, 'w', zipfile.ZIP_DEFLATED) as z:
        for root, dirs, files in os.walk(src_dir):
            for name in files:
                full = Path(root) / name
                arc = full.relative_to(src_dir)
                z.write(full, arcname=str(arc))


def create_restore_point(label="pre_restore"):
    ensure_paths()
    stamp = _stamp()
    rp = RESTORE_POINTS / f"{label}_{stamp}.zip"
    if WORKSPACE.exists() and any(WORKSPACE.iterdir()):
        zip_dir(WORKSPACE, rp)
        (RESTORE_POINTS / "ROLLBACK_LATEST.txt").write_text(str(rp), encoding="utf-8")
        log(f"RESTORE-POINT: created {rp.name}")
    else:
        # still write pointer but note empty
        (RESTORE_POINTS / "ROLLBACK_LATEST.txt").write_text("", encoding="utf-8")
        log("RESTORE-POINT: workspace empty — nothing to snapshot")
    return rp

# Diff utilities

def walk_files(base: Path):
    if not base.exists():
        return set()
    return set(str(p.relative_to(base)) for p in base.rglob('*') if p.is_file())


def file_changed(a: Path, b: Path):
    try:
        sa, sb = a.stat().st_size, b.stat().st_size
        if sa != sb:
            return True
        # fall back to mtime check
        return int(a.stat().st_mtime) != int(b.stat().st_mtime)
    except Exception:
        return True


def plan_apply(mode_replace: bool):
    """Return dict with new/changed/deleted for applying staging → workspace."""
    ws = walk_files(WORKSPACE)
    st = walk_files(STAGING)
    new_only = st - ws
    deleted_only = ws - st
    common = st & ws
    changed = set()
    for rel in list(common)[:5000]:  # cap for huge trees
        a = WORKSPACE / rel
        b = STAGING / rel
        if file_changed(a, b):
            changed.add(rel)
    plan = {
        "new": sorted(new_only),
        "changed": sorted(changed),
        "deleted": sorted(deleted_only) if mode_replace else [],
    }
    return plan

# Apply operations

def clean_staging():
    if STAGING.exists():
        shutil.rmtree(STAGING, ignore_errors=True)
    STAGING.mkdir(parents=True, exist_ok=True)


def stage_from_zip(zip_path: Path):
    ensure_paths()
    clean_staging()
    log(f"STAGE: extracting {zip_path.name} → {STAGING}")
    safe_extract(zip_path, STAGING)
    # if backup contains top-level directories, keep structure as-is
    # report a short listing
    head = list(STAGING.rglob('*'))[:20]
    for p in head:
        rel = p.relative_to(STAGING)
        log(f"STAGE ITEM: {rel}{'/' if p.is_dir() else ''}")
    if len(head) == 0:
        log("STAGE: (empty!) — check the zip content")


def apply_merge():
    # copy files from staging into workspace (overwrite), no deletes
    for src in STAGING.rglob('*'):
        if src.is_file():
            rel = src.relative_to(STAGING)
            dst = WORKSPACE / rel
            dst.parent.mkdir(parents=True, exist_ok=True)
            shutil.copy2(src, dst)


def replace_workspace_with_staging():
    # safest: rename current workspace away, move staging in, archive old
    stamp = _stamp()
    old_ws = WORKSPACE.parent / f"workspace__old_{stamp}"
    if WORKSPACE.exists():
        WORKSPACE.rename(old_ws)
    shutil.move(str(STAGING), str(WORKSPACE))
    # archive old_ws to restore_points then delete the folder
    if old_ws.exists():
        rp = RESTORE_POINTS / f"pre_replace_{stamp}.zip"
        zip_dir(old_ws, rp)
        shutil.rmtree(old_ws, ignore_errors=True)
        (RESTORE_POINTS / "ROLLBACK_LATEST.txt").write_text(str(rp), encoding="utf-8")
        log(f"ARCHIVED OLD WORKSPACE → {rp.name}")


def apply_restore(mode_replace: bool):
    if not STAGING.exists() or not any(STAGING.rglob('*')):
        log("APPLY: staging is empty — run 'Stage Restore' first.")
        return
    create_restore_point("pre_apply")
    if mode_replace:
        log("APPLY: Replace workspace mode — existing extras will be removed.")
        replace_workspace_with_staging()
    else:
        log("APPLY: Merge mode — copying files, no deletes.")
        apply_merge()
        # clear staging after merge to avoid confusion
        shutil.rmtree(STAGING, ignore_errors=True)
    log("APPLY: restore complete ✅")


def rollback_latest():
    ensure_paths()
    ptr = RESTORE_POINTS / "ROLLBACK_LATEST.txt"
    if not ptr.exists():
        log("ROLLBACK: no pointer found")
        return
    txt = ptr.read_text(encoding="utf-8").strip()
    if not txt:
        log("ROLLBACK: pointer empty — no snapshot recorded")
        return
    zp = Path(txt)
    if not zp.exists():
        log(f"ROLLBACK: snapshot missing → {zp}")
        return
    # Extract rollback to a temp staging then replace
    tmp = ROOT / "rollback_staging"
    if tmp.exists():
        shutil.rmtree(tmp, ignore_errors=True)
    tmp.mkdir(parents=True, exist_ok=True)
    safe_extract(zp, tmp)
    # replace workspace with tmp
    stamp = _stamp()
    bad_ws = WORKSPACE.parent / f"workspace__after_failed_{stamp}"
    if WORKSPACE.exists():
        WORKSPACE.rename(bad_ws)
    shutil.move(str(tmp), str(WORKSPACE))
    log("ROLLBACK: restored workspace from latest snapshot ✅")

# -----------------
# UI
# -----------------
H = W.HTML

banner = H(
    f"""
<div style='font-family:system-ui,Segoe UI,Arial; line-height:1.35; padding:12px; border:1px solid #e5e7eb; border-radius:12px;'>
  <div style='font-size:18px; font-weight:700;'>🧯 {APP['LABEL']} — M3 Restore Manager <span style='opacity:.65'>(Cell {APP['CELL_ID']})</span></div>
  <ol style='margin:6px 0 8px 18px; padding:0;'>
    <li>Scan Sources → select zip</li>
    <li>Preview (optional)</li>
    <li>Stage Restore</li>
    <li>Choose mode: Merge / Replace</li>
    <li>Dry-Run Apply</li>
    <li>Apply Restore</li>
    <li>(Optional) Rollback Latest</li>
  </ol>
  <div>Root: <code>{APP['ROOT']}</code> | Workspace: <code>{APP['WORKSPACE']}</code> | Staging: <code>{APP['STAGING']}</code></div>
</div>
"""
)

btn_scan   = W.Button(description="Scan Sources", tooltip="List available backups & known good zips")
source     = W.ToggleButtons(options=["Backups", "Known Good"], value="Backups", description="Source")

bk_dropdown = W.Dropdown(options=[], description="Select zip", layout=W.Layout(width='60%'))
btn_preview = W.Button(description="Preview Selected", tooltip="List the first entries of the zip (safe)")
btn_stage   = W.Button(description="Stage Restore", tooltip="Extract the selected zip into the staging area")

mode       = W.ToggleButtons(options=["Merge (no deletes)", "Replace workspace"], value="Merge (no deletes)", description="Mode")
btn_dry    = W.Button(description="Dry-Run Apply", tooltip="Show planned changes (new/changed/deleted)")
btn_apply  = W.Button(description="Apply Restore", tooltip="Execute the restore using the chosen mode")

btn_clean  = W.Button(description="Clean Staging", tooltip="Remove staging folder contents")
btn_wslist = W.Button(description="List Workspace Head", tooltip="Show a short listing from workspace")
btn_rollback = W.Button(description="Rollback Last Restore", tooltip="Restore from the last pre-apply snapshot")
btn_logs   = W.Button(description="Open Logs", tooltip="Tail last lines of the session log")

out = W.Output(layout={"border":"1px solid #e5e7eb", "padding":"8px", "borderRadius":"8px"})

# Handlers

def on_scan(_):
    with out:
        if not is_drive_mounted():
            log("WARNING: Drive not mounted — run M1 → Mount Drive first.")
        ensure_paths()
        if source.value == "Backups":
            zips = list_backups_zip()
            log(f"SCAN: found {len(zips)} backup zip(s)")
        else:
            zips = list_known_good_zip()
            log(f"SCAN: found {len(zips)} known good zip(s)")
        bk_dropdown.options = [(p.name, str(p)) for p in zips]
        if not zips:
            log("SCAN: none available — create one via A1 One-shot/⭐ Known Good")


def on_preview(_):
    with out:
        val = bk_dropdown.value
        if not val:
            log("Preview: select a zip first.")
            return
        try:
            with zipfile.ZipFile(val, 'r') as z:
                names = z.namelist()
                for n in names[:80]:
                    log(f"ZIP: {n}")
                if len(names) > 80:
                    log(f"… and {len(names)-80} more items")
        except Exception as e:
            log(f"PREVIEW ERROR: {e}")


def on_stage(_):
    with out:
        val = bk_dropdown.value
        if not val:
            log("Stage: select a zip first.")
            return
        stage_from_zip(Path(val))


def on_dry(_):
    with out:
        mode_replace = (mode.value == "Replace workspace")
        plan = plan_apply(mode_replace)
        log(f"DRY-RUN: new={len(plan['new'])} changed={len(plan['changed'])} deleted={len(plan['deleted'])}")
        for label, items in [("NEW", plan['new'][:20]), ("CHANGED", plan['changed'][:20]), ("DELETED", plan['deleted'][:20])]:
            for rel in items:
                log(f"  {label} → {rel}")
        for label in ("new", "changed", "deleted"):
            extra = len(plan[label]) - 20
            if extra > 0:
                log(f"  … and {extra} more {label}")


def on_apply(_):
    with out:
        mode_replace = (mode.value == "Replace workspace")
        apply_restore(mode_replace)


def on_clean(_):
    with out:
        clean_staging()
        log("STAGING: cleaned")


def on_wslist(_):
    with out:
        if not WORKSPACE.exists():
            log("WORKSPACE: missing")
            return
        head = list(WORKSPACE.rglob('*'))[:40]
        for p in head:
            rel = p.relative_to(WORKSPACE)
            log(f"WS: {rel}{'/' if p.is_dir() else ''}")
        if len(head) == 0:
            log("WORKSPACE: (empty)")


def on_rollback(_):
    with out:
        rollback_latest()


def on_logs(_):
    with out:
        log_path = (ROOT / APP["LOG_FILE"]) if ROOT.exists() else LOG_PATH_FALLBACK
        if not log_path.exists():
            log(f"No log yet at {log_path}")
            return
        tail = 150
        log(f"Showing last {tail} lines of {log_path}")
        with open(log_path, 'r', encoding='utf-8') as f:
            lines = f.readlines()[-tail:]
        print("\n".join(lines).rstrip())

# Bind
btn_scan.on_click(on_scan)
btn_preview.on_click(on_preview)
btn_stage.on_click(on_stage)
btn_dry.on_click(on_dry)
btn_apply.on_click(on_apply)
btn_clean.on_click(on_clean)
btn_wslist.on_click(on_wslist)
btn_rollback.on_click(on_rollback)
btn_logs.on_click(on_logs)

# Layout & render
row0 = W.HBox([btn_scan, source])
row1 = W.HBox([bk_dropdown, btn_preview])
row2 = W.HBox([btn_stage, mode])
row3 = W.HBox([btn_dry, btn_apply])
row4 = W.HBox([btn_clean, btn_wslist, btn_rollback, btn_logs])

clear_output()
display(banner)
display(row0)
display(row1)
display(row2)
display(row3)
display(row4)
display(out)

log("M3 ready. Scan → select zip → Stage → Dry-Run → Apply. Rollback available via the latest restore point.")


HTML(value="\n<div style='font-family:system-ui,Segoe UI,Arial; line-height:1.35; padding:12px; border:1px sol…

HBox(children=(Button(description='Scan Sources', style=ButtonStyle(), tooltip='List available backups & known…

HBox(children=(Dropdown(description='Select zip', layout=Layout(width='60%'), options=(), value=None), Button(…

HBox(children=(Button(description='Stage Restore', style=ButtonStyle(), tooltip='Extract the selected zip into…

HBox(children=(Button(description='Dry-Run Apply', style=ButtonStyle(), tooltip='Show planned changes (new/cha…

HBox(children=(Button(description='Clean Staging', style=ButtonStyle(), tooltip='Remove staging folder content…

Output(layout=Layout(border='1px solid #e5e7eb', padding='8px'))

[00:29:35] M3 ready. Scan → select zip → Stage → Dry-Run → Apply. Rollback available via the latest restore point.


## M3b — Timezone Patch (UTC + Local)

In [None]:
# ===============================
# Beast — Phase A (MSMD ULTRA)
# Cell Label: TZ — Timezone Patch & Clock Sync
# ===============================
# Purpose: Make all Beast timestamps show in your local time (default: America/Regina),
# even on Colab where the runtime clock is UTC by default. This cell:
#  - Applies TZ via environment (os.environ['TZ']) + time.tzset() on Unix
#  - Persists your choice to BEAST/config/settings.json
#  - Shows a clear UTC vs Local clock comparison + offset
#  - Exposes helpers: beast_now() / beast_utc() (timezone-aware)
#
# PREREQ: Run M1 first so /content/drive is mounted and BEAST root exists.
# SAFE: No system files modified; process-level only.
#
# MSMD — Monkey See, Monkey Do
# 1) Click "Apply America/Regina" (or enter a custom zone and click Apply)
# 2) Click "Save to config" so it sticks between sessions
# 3) Click "Show Clocks" to verify offset and labels
# 4) (Optional) "Load from config" on a fresh runtime to re-apply automatically

import os, json, time
from datetime import datetime, timezone
from pathlib import Path

from IPython.display import display, clear_output
import ipywidgets as W

# Python 3.9+ standard library timezone database
try:
    from zoneinfo import ZoneInfo
except Exception:
    ZoneInfo = None  # Fallback handled in validation

# Detect Colab / Drive (informational)
try:
    from google.colab import drive as gdrive
    IN_COLAB = True
except Exception:
    IN_COLAB = False

# -----------------
# App Config
# -----------------
APP = {
    "LABEL": "Beast — Phase A",
    "CELL_ID": "TZ",
    "ROOT": "/content/drive/MyDrive/BEAST",
    "CONFIG_FILE": "config/settings.json",
    "LOG_FILE": "logs/beast_session.log",
    "DEFAULT_TZ": "America/Regina",
}

ROOT = Path(APP["ROOT"])  # Path object
CFG  = ROOT / APP["CONFIG_FILE"]
LOGF = ROOT / APP["LOG_FILE"]

# This global will be referenced by other cells if they wish
BEAST_TZ = APP["DEFAULT_TZ"]

# -----------------
# Utilities
# -----------------

def log_local(msg: str):
    line = f"[{datetime.now().strftime('%H:%M:%S')}] {msg}"
    print(line)
    target = LOGF if ROOT.exists() else Path("/content/beast_session.log")
    try:
        target.parent.mkdir(parents=True, exist_ok=True)
        with open(target, "a", encoding="utf-8") as f:
            f.write(line + "\n")
    except Exception:
        pass


def _valid_zone(tzname: str) -> bool:
    if not tzname or not isinstance(tzname, str):
        return False
    if ZoneInfo is None:
        # Minimal check if zoneinfo missing (rare). Accept common patterns.
        return "/" in tzname and tzname[0].isalpha()
    try:
        ZoneInfo(tzname)
        return True
    except Exception:
        return False


def _apply_tz_env(tzname: str):
    """Apply timezone at the process level. On Unix, time.tzset() updates libc."""
    global BEAST_TZ
    os.environ['TZ'] = tzname
    try:
        time.tzset()  # available on Unix (Linux/Colab)
    except Exception:
        pass
    BEAST_TZ = tzname


def load_config_timezone(default_to_env=True):
    tz = None
    try:
        if CFG.exists():
            with open(CFG, 'r', encoding='utf-8') as f:
                data = json.load(f)
            tz = data.get('timezone')
    except Exception as e:
        log_local(f"CONFIG READ WARN: {e}")
    if tz and _valid_zone(tz):
        _apply_tz_env(tz)
        return tz
    if default_to_env:
        # keep current env or default
        tz_env = os.environ.get('TZ') or APP['DEFAULT_TZ']
        if _valid_zone(tz_env):
            _apply_tz_env(tz_env)
            return tz_env
        _apply_tz_env(APP['DEFAULT_TZ'])
        return APP['DEFAULT_TZ']
    return None


def save_config_timezone(tzname: str):
    ROOT.mkdir(parents=True, exist_ok=True)
    CFG.parent.mkdir(parents=True, exist_ok=True)
    data = {}
    if CFG.exists():
        try:
            data = json.loads(CFG.read_text(encoding='utf-8'))
        except Exception:
            data = {}
    data['timezone'] = tzname
    CFG.write_text(json.dumps(data, indent=2), encoding='utf-8')
    log_local(f"CONFIG: timezone saved → {tzname}")


def beast_now(fmt: str = '%Y-%m-%d %H:%M:%S %Z%z') -> str:
    tz = ZoneInfo(BEAST_TZ) if ZoneInfo else None
    if tz:
        return datetime.now(tz).strftime(fmt)
    # Fallback: localtime based on TZ env
    return datetime.now().strftime(fmt)


def beast_utc(fmt: str = '%Y-%m-%d %H:%M:%S %Z%z') -> str:
    return datetime.now(timezone.utc).strftime(fmt)


def offset_hours() -> float:
    if ZoneInfo:
        loc = datetime.now(ZoneInfo(BEAST_TZ))
        off = loc.utcoffset() or (loc - loc.replace(tzinfo=timezone.utc))
    else:
        # naive approximation using localtime/UTC timestamps
        off = datetime.now() - datetime.utcnow()
    return round(off.total_seconds() / 3600.0, 2)

# -----------------
# UI
# -----------------
H = W.HTML

banner = H(
    f"""
<div style='font-family:system-ui,Segoe UI,Arial; line-height:1.35; padding:12px; border:1px solid #e5e7eb; border-radius:12px;'>
  <div style='font-size:18px; font-weight:700;'>🕒 {APP['LABEL']} — TZ Patch <span style='opacity:.65'>(Cell {APP['CELL_ID']})</span></div>
  <div>Root: <code>{APP['ROOT']}</code> | Config: <code>{APP['CONFIG_FILE']}</code></div>
  <div>Goal: make logs & backups reflect your local timezone.</div>
</div>
"""
)

btn_apply_regina = W.Button(description="Apply America/Regina", tooltip="Set TZ to America/Regina and apply now")
custom_tz = W.Text(value=APP['DEFAULT_TZ'], description="Custom TZ", placeholder="Area/City e.g. Europe/Dublin")
btn_apply_custom = W.Button(description="Apply Custom", tooltip="Apply the timezone typed above")

btn_save = W.Button(description="Save to config", tooltip="Persist current TZ to BEAST config")
btn_load = W.Button(description="Load from config", tooltip="Read timezone from config and apply")
btn_show = W.Button(description="Show Clocks", tooltip="Display UTC vs Local and offset")
btn_revert = W.Button(description="Revert to UTC", tooltip="Set TZ=UTC for this process")

out = W.Output(layout={"border":"1px solid #e5e7eb", "padding":"8px", "borderRadius":"8px"})

# Handlers

def _show():
    with out:
        clear_output(wait=True)
        try:
            tzlabel = BEAST_TZ
            print("=== CLOCKS ===")
            print(f"UTC   → {beast_utc()}")
            print(f"Local → {beast_now()}  (TZ='{tzlabel}', offset {offset_hours()}h)")
            print("")
            print("Hints:")
            print("- Save to config so future sessions auto-apply.")
            print("- Other cells using datetime.now() will now reflect this TZ (process-level).")
        except Exception as e:
            print(f"SHOW ERROR: {e}")


def on_apply_regina(_):
    with out:
        _apply_tz_env("America/Regina")
        log_local("TZ applied: America/Regina")
        _show()


def on_apply_custom(_):
    with out:
        tz = (custom_tz.value or "").strip()
        if not _valid_zone(tz):
            print(f"Invalid timezone: '{tz}'. Example formats: America/Regina, Europe/Dublin, Asia/Tokyo")
            return
        _apply_tz_env(tz)
        log_local(f"TZ applied: {tz}")
        _show()


def on_save(_):
    with out:
        save_config_timezone(BEAST_TZ)
        _show()


def on_load(_):
    with out:
        tz = load_config_timezone(default_to_env=False)
        if tz:
            log_local(f"TZ loaded from config: {tz}")
        else:
            print("No timezone found in config. Apply one and Save first.")
        _show()


def on_show(_):
    _show()


def on_revert(_):
    with out:
        _apply_tz_env("UTC")
        log_local("TZ applied: UTC")
        _show()

# Bind
btn_apply_regina.on_click(on_apply_regina)
btn_apply_custom.on_click(on_apply_custom)
btn_save.on_click(on_save)
btn_load.on_click(on_load)
btn_show.on_click(on_show)
btn_revert.on_click(on_revert)

# Initial Render
clear_output()
display(banner)
display(W.HBox([btn_apply_regina, btn_revert]))
display(W.HBox([custom_tz, btn_apply_custom]))
display(W.HBox([btn_save, btn_load, btn_show]))
display(out)

# Auto-load from config on first run (if present), else apply default
_applied = load_config_timezone(default_to_env=True)
if _applied:
    log_local(f"TZ active: {_applied}")
else:
    log_local("TZ not set; using process default.")

# Show clocks immediately for feedback
_show()


HTML(value="\n<div style='font-family:system-ui,Segoe UI,Arial; line-height:1.35; padding:12px; border:1px sol…

HBox(children=(Button(description='Apply America/Regina', style=ButtonStyle(), tooltip='Set TZ to America/Regi…

HBox(children=(Text(value='America/Regina', description='Custom TZ', placeholder='Area/City e.g. Europe/Dublin…

HBox(children=(Button(description='Save to config', style=ButtonStyle(), tooltip='Persist current TZ to BEAST …

Output(layout=Layout(border='1px solid #e5e7eb', padding='8px'))

[18:41:06] TZ active: America/Regina


## M4 — Config & Diagnostics

In [None]:
# ===============================
# Beast — Phase A (MSMD ULTRA)
# Cell Label: M4 — Config Editor, Validation & Diagnostics
# ===============================
# Purpose:
# Central place to load, edit, validate, backup, and restore the BEAST config
# (config/settings.json). Also runs environment diagnostics: mount status, folder
# health, disk usage, last backup age, scheduler flag, and timezone.
#
# SAFE Defaults:
# - Root path is visible but locked here (migration is a separate, explicit step).
# - Every Save creates a timestamped config backup in config/ConfigBackups.
# - "Reset to Defaults" uses a conservative default profile and backs up current.
#
# PREREQ: Run M1 to mount Drive & ensure base folders. A1 optional for backups.
#
# MSMD — Monkey See, Monkey Do
# 1) Load Config → review values
# 2) Edit fields (intervals, retention, feature toggles)
# 3) Validate → fix any issues
# 4) Save (auto-backup) → Diagnostics
# 5) (Optional) Restore a previous config from dropdown

import os, json, shutil
from datetime import datetime
from pathlib import Path

from IPython.display import display, clear_output
import ipywidgets as W

# Detect Colab / Drive (informational)
try:
    from google.colab import drive as gdrive
    IN_COLAB = True
except Exception:
    IN_COLAB = False

# Optional tz validation
try:
    from zoneinfo import ZoneInfo
except Exception:
    ZoneInfo = None

# -----------------
# App Config
# -----------------
APP = {
    "LABEL": "Beast — Phase A",
    "CELL_ID": "M4",
    "ROOT": "/content/drive/MyDrive/BEAST",
    "CONFIG_FILE": "config/settings.json",
    "CONFIG_BACKUPS": "config/ConfigBackups",
    "LOG_FILE": "logs/beast_session.log",
    "BACKUPS": "backups",
    "KNOWN_GOOD": "backups/known_good",
}

ROOT = Path(APP["ROOT"])  # Path object
CFG  = ROOT / APP["CONFIG_FILE"]
CFGB = ROOT / APP["CONFIG_BACKUPS"]
LOGF = ROOT / APP["LOG_FILE"]
BACKUPS = ROOT / APP["BACKUPS"]
KNOWN_GOOD = ROOT / APP["KNOWN_GOOD"]

# -----------------
# Logging
# -----------------

def _now():
    return datetime.now().strftime("%H:%M:%S")

def _stamp():
    return datetime.now().strftime("%Y-%m-%d_%H-%M-%S")

def log(msg):
    line = f"[{_now()}] {msg}"
    print(line)
    target = LOGF if ROOT.exists() else Path("/content/beast_session.log")
    try:
        target.parent.mkdir(parents=True, exist_ok=True)
        with open(target, "a", encoding="utf-8") as f:
            f.write(line + "\n")
    except Exception:
        pass

# -----------------
# Helpers — Config
# -----------------

def is_drive_mounted():
    if not IN_COLAB:
        return True
    return os.path.isdir("/content/drive") and (
        os.path.isdir("/content/drive/MyDrive") or os.path.isdir("/content/drive/My Drive")
    )


def ensure_min_paths():
    ROOT.mkdir(parents=True, exist_ok=True)
    (ROOT / "logs").mkdir(parents=True, exist_ok=True)
    CFG.parent.mkdir(parents=True, exist_ok=True)
    CFGB.mkdir(parents=True, exist_ok=True)


def default_config():
    return {
        "created": _stamp(),
        "version": "Phase-A-M4-1.0",
        "root": str(ROOT),
        "workspace": str(ROOT / "workspace"),
        "logs": str(ROOT / "logs"),
        "backups": str(ROOT / "backups"),
        "known_good": str(ROOT / "backups/known_good"),
        "interval_seconds": 60,
        "keep_last_backups": 25,
        "timezone": "America/Regina",
        "features": {
            "auto_retention": False,
            "retention_mode": "archive",  # archive | delete
            "scheduler_autostart": False,
            "verbose_logging": True,
        },
    }


def load_config():
    ensure_min_paths()
    if CFG.exists():
        try:
            with open(CFG, 'r', encoding='utf-8') as f:
                data = json.load(f)
            return data
        except Exception as e:
            log(f"CONFIG READ ERROR: {e}")
    # Bootstrap with defaults
    data = default_config()
    save_config(data, backup=False)
    log("CONFIG: created default settings.json")
    return data


def save_config(data: dict, backup=True):
    ensure_min_paths()
    # lock root migration here
    data["root"] = str(ROOT)
    if backup and CFG.exists():
        try:
            bdst = CFGB / f"settings__{_stamp()}.json"
            shutil.copy2(CFG, bdst)
            log(f"CONFIG BACKUP: {bdst.name}")
        except Exception as e:
            log(f"CONFIG BACKUP WARN: {e}")
    try:
        with open(CFG, 'w', encoding='utf-8') as f:
            json.dump(data, f, indent=2)
        log("CONFIG: saved")
    except Exception as e:
        log(f"CONFIG SAVE ERROR: {e}")


def list_config_backups():
    ensure_min_paths()
    if not CFGB.exists():
        return []
    items = [p for p in CFGB.glob("settings__*.json") if p.is_file()]
    items.sort(key=lambda p: p.stat().st_mtime, reverse=True)
    return items


def tz_valid(tzname: str) -> bool:
    if not tzname or not isinstance(tzname, str):
        return False
    if ZoneInfo is None:
        return "/" in tzname and tzname[0].isalpha()
    try:
        ZoneInfo(tzname)
        return True
    except Exception:
        return False


def validate_config(data: dict):
    errors = []
    warnings = []

    # Required keys
    req = ["root","workspace","logs","backups","interval_seconds","keep_last_backups","timezone","features"]
    for k in req:
        if k not in data:
            errors.append(f"Missing key: {k}")

    # Types & ranges
    try:
        interval = int(data.get("interval_seconds", 60))
        if interval < 10 or interval > 86400:
            errors.append("interval_seconds out of range (10..86400)")
    except Exception:
        errors.append("interval_seconds must be an integer")

    try:
        keepn = int(data.get("keep_last_backups", 25))
        if keepn < 1 or keepn > 500:
            errors.append("keep_last_backups out of range (1..500)")
    except Exception:
        errors.append("keep_last_backups must be an integer")

    tz = data.get("timezone", "")
    if not tz_valid(tz):
        warnings.append(f"timezone looks invalid: {tz}")

    # Paths under ROOT (safety)
    root = Path(data.get("root", str(ROOT)))
    for key in ["workspace","logs","backups","known_good"]:
        p = Path(data.get(key, ""))
        try:
            if p and not str(p).startswith(str(root)):
                warnings.append(f"{key} not under root → {p}")
        except Exception:
            warnings.append(f"{key} path not readable")

    return errors, warnings

# -----------------
# Helpers — Diagnostics
# -----------------

def disk_free(path: Path):
    try:
        usage = shutil.disk_usage(path)
        return usage
    except Exception:
        # Fall back to / if path missing
        try:
            return shutil.disk_usage("/")
        except Exception:
            return None


def list_backups():
    if not BACKUPS.exists():
        return []
    items = [p for p in BACKUPS.glob("*.zip") if p.is_file()]
    items.sort(key=lambda p: p.stat().st_mtime, reverse=True)
    return items


def last_backup_age():
    bks = list_backups()
    if not bks:
        return None
    latest = bks[0]
    age_s = (datetime.now().timestamp() - latest.stat().st_mtime)
    return int(age_s), latest.name

# -----------------
# UI
# -----------------
H = W.HTML

banner = H(
    f"""
<div style='font-family:system-ui,Segoe UI,Arial; line-height:1.35; padding:12px; border:1px solid #e5e7eb; border-radius:12px;'>
  <div style='font-size:18px; font-weight:700;'>⚙️ {APP['LABEL']} — M4 Config & Diagnostics <span style='opacity:.65'>(Cell {APP['CELL_ID']})</span></div>
  <div>Root: <code>{APP['ROOT']}</code> | Config: <code>{APP['CONFIG_FILE']}</code></div>
  <div>Flow: Load → Edit → Validate → Save (auto-backup) → Diagnostics → (optional) Restore</div>
</div>
"""
)

# Editable controls
interval = W.BoundedIntText(description="Interval (s)", min=10, max=86400, value=60)
keepn    = W.BoundedIntText(description="Keep last N", min=1, max=500, value=25)

tz_text  = W.Text(value="America/Regina", description="Timezone")
root_path = W.Text(value=str(ROOT), description="Root", disabled=True)
workspace = W.Text(value=str(ROOT / "workspace"), description="Workspace")
logs_path = W.Text(value=str(ROOT / "logs"), description="Logs")
backups_p = W.Text(value=str(ROOT / "backups"), description="Backups")
kg_path   = W.Text(value=str(ROOT / "backups/known_good"), description="Known Good")

feat_auto_ret = W.Checkbox(value=False, description="Auto retention")
feat_ret_mode = W.ToggleButtons(options=["archive","delete"], value="archive", description="Retention")
feat_autostart = W.Checkbox(value=False, description="Scheduler autostart")
feat_verbose = W.Checkbox(value=True, description="Verbose logging")

# Buttons
btn_load   = W.Button(description="Load Config")
btn_validate = W.Button(description="Validate")
btn_save   = W.Button(description="Save (backup)")
btn_reset  = W.Button(description="Reset to Defaults")

# Backups of config
cfg_backups_dd = W.Dropdown(options=[], description="Restore backup")
btn_restore_cfg = W.Button(description="Restore Selected")

# Diagnostics & logs
btn_diag   = W.Button(description="Run Diagnostics")
btn_logs   = W.Button(description="Open Logs")
btn_show_json = W.Button(description="Show JSON")

out = W.Output(layout={"border":"1px solid #e5e7eb", "padding":"8px", "borderRadius":"8px"})

# State holder
_current = {}

# Handlers

def populate_from(data: dict):
    interval.value = int(data.get("interval_seconds", 60))
    keepn.value    = int(data.get("keep_last_backups", 25))
    tz_text.value  = data.get("timezone", "America/Regina")

    root_path.value = str(data.get("root", str(ROOT)))
    workspace.value = str(data.get("workspace", str(ROOT/"workspace")))
    logs_path.value = str(data.get("logs", str(ROOT/"logs")))
    backups_p.value = str(data.get("backups", str(ROOT/"backups")))
    kg_path.value   = str(data.get("known_good", str(ROOT/"backups/known_good")))

    feats = data.get("features", {})
    feat_auto_ret.value = bool(feats.get("auto_retention", False))
    feat_ret_mode.value = feats.get("retention_mode", "archive") if feats.get("retention_mode", "archive") in ["archive","delete"] else "archive"
    feat_autostart.value = bool(feats.get("scheduler_autostart", False))
    feat_verbose.value = bool(feats.get("verbose_logging", True))


def collect_to() -> dict:
    data = {
        "created": _current.get("created", _stamp()),
        "version": _current.get("version", "Phase-A-M4-1.0"),
        "root": str(ROOT),  # locked here
        "workspace": workspace.value.strip(),
        "logs": logs_path.value.strip(),
        "backups": backups_p.value.strip(),
        "known_good": kg_path.value.strip(),
        "interval_seconds": int(interval.value),
        "keep_last_backups": int(keepn.value),
        "timezone": tz_text.value.strip(),
        "features": {
            "auto_retention": bool(feat_auto_ret.value),
            "retention_mode": feat_ret_mode.value,
            "scheduler_autostart": bool(feat_autostart.value),
            "verbose_logging": bool(feat_verbose.value),
        },
    }
    return data


def on_load(_):
    with out:
        clear_output(wait=True)
        if not is_drive_mounted():
            log("WARNING: Drive not mounted — run M1 → Mount Drive first.")
        data = load_config()
        _current.clear(); _current.update(data)
        populate_from(data)
        # update cfg backups dropdown
        cfgs = list_config_backups()
        cfg_backups_dd.options = [(p.name, str(p)) for p in cfgs]
        log("CONFIG: loaded")


def on_validate(_):
    with out:
        data = collect_to()
        errs, warns = validate_config(data)
        if errs:
            log("VALIDATE: ❌ errors found")
            for e in errs:
                log(f"  ERR → {e}")
        else:
            log("VALIDATE: ✅ no errors")
        if warns:
            for w in warns:
                log(f"  WARN → {w}")


def on_save(_):
    with out:
        data = collect_to()
        errs, _ = validate_config(data)
        if errs:
            log("SAVE BLOCKED: fix validation errors first.")
            for e in errs:
                log(f"  ERR → {e}")
            return
        save_config(data, backup=True)
        _current.clear(); _current.update(data)
        # refresh list
        cfgs = list_config_backups()
        cfg_backups_dd.options = [(p.name, str(p)) for p in cfgs]
        log("CONFIG: saved (backup created)")


def on_reset(_):
    with out:
        # Backup current file first, then write defaults
        if CFG.exists():
            try:
                bdst = CFGB / f"settings__reset_{_stamp()}.json"
                shutil.copy2(CFG, bdst)
                log(f"CONFIG BACKUP (pre-reset): {bdst.name}")
            except Exception as e:
                log(f"CONFIG BACKUP WARN: {e}")
        data = default_config()
        save_config(data, backup=False)
        _current.clear(); _current.update(data)
        populate_from(data)
        log("CONFIG: reset to defaults")


def on_restore_cfg(_):
    with out:
        sel = cfg_backups_dd.value
        if not sel:
            log("Select a config backup first.")
            return
        try:
            shutil.copy2(sel, CFG)
            log(f"CONFIG: restored from {Path(sel).name}")
            data = load_config()
            _current.clear(); _current.update(data)
            populate_from(data)
        except Exception as e:
            log(f"RESTORE ERROR: {e}")


def on_diag(_):
    with out:
        clear_output(wait=True)
        mount = is_drive_mounted()
        log(f"DRIVE MOUNTED: {mount}")
        for rel in ["workspace","logs","backups","KNOWN_GOOD"]:
            p = {
                "workspace": Path(workspace.value),
                "logs": Path(logs_path.value),
                "backups": Path(backups_p.value),
                "KNOWN_GOOD": Path(kg_path.value),
            }[rel]
            exists = p.exists()
            log(f"PATH {rel}: {p}  exists={exists}")
        try:
            du = disk_free(ROOT)
            if du:
                total, used, free = du.total, du.used, du.free
                log(f"DISK: total={total//(1024**3)}GB used={used//(1024**3)}GB free={free//(1024**3)}GB")
        except Exception as e:
            log(f"DISK ERROR: {e}")
        age = last_backup_age()
        if age:
            secs, name = age
            mins = secs//60
            log(f"LAST BACKUP: {name}  age={mins}m ({secs}s)")
        else:
            log("LAST BACKUP: none found — run A1 One-shot or Scheduler")
        # Scheduler state if present
        if "STATE" in globals():
            try:
                log(f"SCHEDULER: running={bool(STATE.get('scheduler_running'))} interval={STATE.get('scheduler_interval')}")
            except Exception:
                log("SCHEDULER: (STATE not readable)")
        else:
            log("SCHEDULER: STATE not found (start/stop from A1)")
        # TZ if present
        tzname = None
        try:
            tzname = globals().get("BEAST_TZ") or os.environ.get('TZ')
        except Exception:
            tzname = os.environ.get('TZ')
        log(f"TIMEZONE: {tzname}")


def on_logs(_):
    with out:
        log_path = LOGF if ROOT.exists() else Path("/content/beast_session.log")
        if not log_path.exists():
            log(f"No log yet at {log_path}")
            return
        tail = 150
        log(f"Showing last {tail} lines of {log_path}")
        with open(log_path, 'r', encoding='utf-8') as f:
            lines = f.readlines()[-tail:]
        print("\n".join(lines).rstrip())


def on_show_json(_):
    with out:
        data = collect_to()
        print(json.dumps(data, indent=2))

# Bind
btn_load.on_click(on_load)
btn_validate.on_click(on_validate)
btn_save.on_click(on_save)
btn_reset.on_click(on_reset)
btn_restore_cfg.on_click(on_restore_cfg)
btn_diag.on_click(on_diag)
btn_logs.on_click(on_logs)
btn_show_json.on_click(on_show_json)

# Layout & render
row0 = W.HBox([btn_load, btn_validate, btn_save, btn_reset])
row1 = W.HBox([interval, keepn, tz_text])
row2 = W.HBox([root_path])
row3 = W.HBox([workspace, logs_path])
row4 = W.HBox([backups_p, kg_path])
row5 = W.HBox([feat_auto_ret, feat_ret_mode, feat_autostart, feat_verbose])
row6 = W.HBox([cfg_backups_dd, btn_restore_cfg])
row7 = W.HBox([btn_diag, btn_logs, btn_show_json])

clear_output()
display(banner)
display(row0)
display(row1)
display(row2)
display(row3)
display(row4)
display(row5)
display(row6)
display(row7)
display(out)

# Auto-load on first run for convenience
on_load(None)
log("M4 ready. Load → Edit → Validate → Save (backup) → Diagnostics. Root path is view-only here; ask to run a migration tool if needed.")


HTML(value="\n<div style='font-family:system-ui,Segoe UI,Arial; line-height:1.35; padding:12px; border:1px sol…

HBox(children=(Button(description='Load Config', style=ButtonStyle()), Button(description='Validate', style=Bu…

HBox(children=(BoundedIntText(value=60, description='Interval (s)', max=86400, min=10), BoundedIntText(value=2…

HBox(children=(Text(value='/content/drive/MyDrive/BEAST', description='Root', disabled=True),))

HBox(children=(Text(value='/content/drive/MyDrive/BEAST/workspace', description='Workspace'), Text(value='/con…

HBox(children=(Text(value='/content/drive/MyDrive/BEAST/backups', description='Backups'), Text(value='/content…

HBox(children=(Checkbox(value=False, description='Auto retention'), ToggleButtons(description='Retention', opt…

HBox(children=(Dropdown(description='Restore backup', options=(), value=None), Button(description='Restore Sel…

HBox(children=(Button(description='Run Diagnostics', style=ButtonStyle()), Button(description='Open Logs', sty…

Output(layout=Layout(border='1px solid #e5e7eb', padding='8px'))

[18:31:29] M4 ready. Load → Edit → Validate → Save (backup) → Diagnostics. Root path is view-only here; ask to run a migration tool if needed.


In [None]:
## M4b — Workspace Bootstrap & Test Job (fixed)
# @title M4b — Workspace Bootstrap & Test Job (fixed)
# Purpose:
# - Bootstrap workspace for Safe Test Job (ensure dirs under BEAST_ROOT).
# - Uses `now_ts()` helper for consistent timezone-aware timestamps.
# - Idempotent: safe to re-run.
#
# Exposes:
# - scheduled_job(dry_run: bool = False)
#
# Notes:
# - Returns a tuple (checkpoint_path, result_dict) for better downstream handling,
#   but M4c’s normalizer will accept any shape.

import os
from typing import Tuple, Dict

# ---- Workspace helpers -------------------------------------------------------

def _get_root() -> str:
    return globals().get("BEAST_ROOT", "/content/drive/MyDrive/Beast_PhaseA")

def _ensure_workspace() -> Dict[str, str]:
    root = _get_root()
    subdirs = {
        "root": root,
        "logs": os.path.join(root, "logs"),
        "runs": os.path.join(root, "runs"),
        "artifacts": os.path.join(root, "artifacts"),
    }
    for p in subdirs.values():
        os.makedirs(p, exist_ok=True)
    return subdirs

def _ts_name(dt) -> str:
    # filename-safe (no colons)
    return dt.strftime("%Y-%m-%dT%H-%M-%S%z")

# ---- Scheduled job -----------------------------------------------------------

def scheduled_job(dry_run: bool = False) -> Tuple[str, Dict]:
    """Example scheduled job (Safe Test Job)."""
    print("[M4b] Running scheduled test job...")

    # Timestamp (safe helper, expects M0b: now_ts)
    _now_fn = globals().get("now_ts")
    if callable(_now_fn):
        ts = _now_fn("America/Regina")
    else:
        # ultra-safe fallback (naive)
        from datetime import datetime
        ts = datetime.now()

    paths = _ensure_workspace()
    ts_iso = ts.isoformat()
    ts_file = _ts_name(ts)

    # Prepare paths
    log_path = os.path.join(paths["logs"], f"scheduled_job_{ts_file}.log")
    ckpt_path = os.path.join(paths["runs"], f"checkpoint_{ts_file}.txt")

    # Simulated job logic + side-effects
    try:
        with open(log_path, "w", encoding="utf-8") as f:
            f.write(f"[scheduled_job] ts={ts_iso} dry_run={dry_run}\n")
            f.write(f"[scheduled_job] root={paths['root']}\n")
    except Exception as e:
        print(f"[M4b] Log write error: {e}")

    try:
        with open(ckpt_path, "w", encoding="utf-8") as f:
            f.write(f"checkpoint @ {ts_iso}\n")
            f.write(f"dry_run={dry_run}\n")
    except Exception as e:
        print(f"[M4b] Checkpoint write error: {e}")

    result = {"ok": True, "timestamp": ts.strftime("%Y-%m-%d %H:%M:%S %Z"), "log_path": log_path, "root": paths["root"]}
    print(f"[M4b] Job completed → {result}")

    # Return tuple for best compatibility with M4c normalizer.
    return ckpt_path, result

print("[M4b] Workspace Bootstrap ready. You can trigger scheduled_job() manually or via M4c.")


[M4b] Workspace Bootstrap ready. You can trigger scheduled_job() manually or via M4c.


## M4c.1 — One-shot Trigger (Manual Run)


In [None]:
## @title M4c.1 — One-shot Trigger (Manual Run)

# Purpose:
# - Fixes Errno 2 / argparse "-f" leakage by running backtest.py in a clean subprocess.
# - Forces correct script path resolution inside BEAST_ROOT and updates outputs/logs.
# - Safe to delete after use. Run AFTER M1 (mount done).
#
# Instructions:
# 1) DELETE old M4c cell.
# 2) PASTE this replacement.
# 3) ▶ Run once.
# Expected: "✅ Completed run — Result: {...}" and a fresh log file under logs/.

import os, sys, subprocess, json
from datetime import datetime

# ---- helpers ---------------------------------------------------------------

def _now_tz():
    # Prefer M0b helper if present
    nf = globals().get("now_ts")
    if callable(nf):
        try:
            return nf("America/Regina")
        except Exception:
            pass
    try:
        from zoneinfo import ZoneInfo  # py>=3.9
        return datetime.now(ZoneInfo("America/Regina"))
    except Exception:
        try:
            import pytz
            return datetime.now(pytz.timezone("America/Regina"))
        except Exception:
            return datetime.now()

def _ts_name(dt) -> str:
    return dt.strftime("%Y-%m-%dT%H-%M-%S%z")

def _get_root() -> str:
    return os.path.abspath(globals().get("BEAST_ROOT", "/content/drive/MyDrive/Beast_PhaseA"))

def _ensure_paths(root: str):
    paths = {
        "root": root,
        "logs": os.path.join(root, "logs"),
        "runs": os.path.join(root, "runs"),
        "artifacts": os.path.join(root, "artifacts"),
    }
    for p in paths.values():
        os.makedirs(p, exist_ok=True)
    return paths

def _shorten(s: str, root: str, maxlen: int = 140) -> str:
    if not isinstance(s, str):
        s = str(s)
    s = s.replace(root, ".../Beast_PhaseA")
    return (s if len(s) <= maxlen else f"{s[: int(maxlen*0.6) ]}…{s[- int(maxlen*0.3) :]}")

def _find_backtest(root: str):
    # Common locations first (cheap checks) before walking
    candidates = [
        os.path.join(root, "backtest.py"),
        os.path.join(root, "btb", "backtest.py"),
        os.path.join(root, "src", "backtest.py"),
        os.path.join(root, "scripts", "backtest.py"),
    ]
    for c in candidates:
        if os.path.isfile(c):
            return c
    # Fallback: walk (bounded by ~10k files for safety)
    count = 0
    for dirpath, dirnames, filenames in os.walk(root):
        for name in filenames:
            if name == "backtest.py":
                return os.path.join(dirpath, name)
        count += len(filenames)
        if count > 10000:
            break
    return None

def _run_backtest(script_path: str, extra_args=None, env=None):
    # Use the same interpreter; pass ONLY the args we specify (no Jupyter -f leakage).
    cmd = [sys.executable, script_path]
    if extra_args:
        cmd.extend(list(map(str, extra_args)))
    proc = subprocess.run(cmd, text=True, capture_output=True, env=env or os.environ.copy())
    return proc

# ---- main ------------------------------------------------------------------

print("[M4c] ▶ Starting manual backtest run (clean subprocess)...")
root = _get_root()
paths = _ensure_paths(root)
ts = _now_tz()
ts_file = _ts_name(ts)

script = _find_backtest(root)

result = {
    "ok": False,
    "mode": None,
    "script": script or "NOT FOUND",
    "returncode": None,
    "log_path": None,
}

log_path = os.path.join(paths["logs"], f"backtest_manual_{ts_file}.log")
result["log_path"] = log_path

if script and os.path.isfile(script):
    result["mode"] = "subprocess:backtest.py"
    try:
        proc = _run_backtest(script_path=script, extra_args=None)
        result["returncode"] = proc.returncode
        result["ok"] = (proc.returncode == 0)

        # Write combined log
        with open(log_path, "w", encoding="utf-8") as f:
            f.write(f"=== M4c Manual Run @ {ts.isoformat()} ===\n")
            f.write(f"Root: {root}\n")
            f.write(f"Script: {script}\n")
            f.write(f"Return code: {proc.returncode}\n\n")
            f.write("----- STDOUT -----\n")
            f.write(proc.stdout or "")
            f.write("\n----- STDERR -----\n")
            f.write(proc.stderr or "")

        print(f"[M4c] backtest.py executed (rc={proc.returncode}). Log: {log_path}")
    except FileNotFoundError as e:
        # Classic Errno 2 case
        print(f"[M4c] ERROR (FileNotFoundError): {e}")
    except Exception as e:
        print(f"[M4c] ERROR: {type(e).__name__}: {e}")
else:
    # Fallback: call scheduler-safe wrapper if available
    print("[M4c] backtest.py not found — falling back to SAFE_SCHEDULED_JOB_FN / scheduled_job.")
    runner = None
    if callable(globals().get("SAFE_SCHEDULED_JOB_FN")):
        runner = globals()["SAFE_SCHEDULED_JOB_FN"]
        result["mode"] = "SAFE_SCHEDULED_JOB_FN"
    elif callable(globals().get("scheduled_job")):
        runner = globals()["scheduled_job"]
        result["mode"] = "scheduled_job"

    if runner:
        try:
            out = runner()
            # Persist the textual output summary to log as well
            payload = {
                "timestamp": ts.isoformat(),
                "runner": result["mode"],
                "output": str(out),
            }
            with open(log_path, "w", encoding="utf-8") as f:
                f.write(json.dumps(payload, ensure_ascii=False, indent=2))
            result["ok"] = True
            result["returncode"] = 0
            print(f"[M4c] Fallback runner completed. Log: {log_path}")
        except Exception as e:
            print(f"[M4c] ERROR in fallback runner: {type(e).__name__}: {e}")
    else:
        print("[M4c] No runner available and backtest.py not found.")

# Keep a global for quick inspection
M4C_LAST_RESULT = result

# Pretty print (abbreviate paths)
pretty = dict(result)
pretty["script"] = _shorten(pretty["script"], root)
pretty["log_path"] = _shorten(pretty["log_path"], root)

print(f"[M4c] ✅ Completed run — Result: {json.dumps(pretty, ensure_ascii=False)}")


[M4c] ▶ Starting manual backtest run (clean subprocess)...
[M4c] backtest.py not found — falling back to SAFE_SCHEDULED_JOB_FN / scheduled_job.
M4c: starting scheduled scheduled_job run...
[M4b] Running scheduled test job...
[M4b] Job completed → {'ok': True, 'timestamp': '2025-08-20 06:08:20 ', 'log_path': '/content/drive/MyDrive/Beast_PhaseA/logs/scheduled_job_2025-08-20T06-08-20.log', 'root': '/content/drive/MyDrive/Beast_PhaseA'}
[M4c ERROR] TypeError: _shorten() missing 1 required positional argument: 'root'
[M4c] Fallback runner completed. Log: /content/drive/MyDrive/Beast_PhaseA/logs/backtest_manual_2025-08-20T00-08-20-0600.log
[M4c] ✅ Completed run — Result: {"ok": true, "mode": "SAFE_SCHEDULED_JOB_FN", "script": "NOT FOUND", "returncode": 0, "log_path": ".../Beast_PhaseA/logs/backtest_manual_2025-08-20T00-08-20-0600.log"}


In [None]:
## M4c.2 — One-shot Trigger (Manual Run, Fixed, Shorten-Safe + Flex Call)
# @title M4c.2 — One-shot Trigger (Manual Run, Fixed, Shorten-Safe + Flex Call)
# Purpose:
# - Manual one-shot trigger with robust path shortener.
# - Flexible job caller: tries dry_run kwarg, falls back to plain call (no TypeError noise).
# - Idempotent and safe to re-run.

import os, json, inspect

# ---- helpers ----------------------------------------------------------------

def _get_root_safe() -> str:
    try:
        return os.path.abspath(globals().get("BEAST_ROOT", "/content/drive/MyDrive/Beast_PhaseA"))
    except Exception:
        return "/content/drive/MyDrive/Beast_PhaseA"

def _shorten_default(val, root: str, maxlen: int = 140) -> str:
    if val is None:
        return "None"
    s = str(val)
    if root:
        s = s.replace(root, ".../Beast_PhaseA")
    return s if len(s) <= maxlen else f"{s[: int(maxlen * 0.6) ]}…{s[- int(maxlen * 0.3) :]}"

def _shorten_safe(val, root: str = ""):
    """Call _shorten with graceful fallback across signatures; default if unavailable."""
    func = globals().get("_shorten")
    if callable(func):
        try:
            return func(val, root)           # preferred 2-arg form
        except TypeError:
            try:
                return func(val)             # legacy 1-arg form
            except Exception:
                pass
    return _shorten_default(val, root)

def _resolve_fn():
    """Prefer SAFE → ACTIVE → scheduled_job."""
    for name in ("SAFE_SCHEDULED_JOB_FN", "ACTIVE_SCHEDULED_JOB_FN", "scheduled_job"):
        fn = globals().get(name)
        if callable(fn):
            return name, fn
    return None, None

def _call_flex(fn):
    """Call job fn accepting optional dry_run; fall back cleanly if unsupported."""
    try:
        sig = inspect.signature(fn)
        if "dry_run" in sig.parameters:
            return fn(dry_run=False)
        return fn()
    except TypeError:
        # In case an intermediate wrapper rejects kwargs
        try:
            return fn()
        except Exception as e:
            raise e

# ---- runner -----------------------------------------------------------------

def run_safe_job():
    try:
        mode, fn = _resolve_fn()
        if not fn:
            return {"ok": False, "error": "No callable job found"}
        out = _call_flex(fn)

        # Normalize common return shapes
        result = {"ok": True, "mode": mode}
        if isinstance(out, (tuple, list)):
            cp = out[0] if len(out) > 0 else None
            payload = out[1] if len(out) > 1 else None
            result["checkpoint_path"] = cp
            result["output"] = payload
        elif isinstance(out, dict):
            merged = dict(out)
            if "mode" not in merged:
                merged["mode"] = mode
            result.update(merged)
        else:
            result["output"] = out
        return result
    except Exception as e:
        return {"ok": False, "error": f"{type(e).__name__}: {e}"}

# ---- fire once --------------------------------------------------------------

print("[M4c.2] ▶ Starting manual run (fixed mode, flex-call + shorten-safe)...")
M4C2_LAST = run_safe_job()

# Pretty print with shortened paths (robust)
_root = _get_root_safe()
pretty = dict(M4C2_LAST)
for k in ("script", "log_path", "output_path", "result_path", "checkpoint_path"):
    if k in pretty and isinstance(pretty[k], str):
        pretty[k] = _shorten_safe(pretty[k], _root)

print(f"[M4c.2] ✅ Completed run — Result: {json.dumps(pretty, ensure_ascii=False)}")


[M4c.2] ▶ Starting manual run (fixed mode, flex-call + shorten-safe)...
✅ Scheduled job executed (safe test mode).
[M4c.2] ✅ Completed run — Result: {"ok": true, "mode": "SAFE_SCHEDULED_JOB_FN", "output": false}


In [9]:
## @title M4c.3 — Scheduler Wiring (Safe Test Job, Tolerant Wrapper)
def scheduled_job(*args, **kwargs):
    print("✅ Scheduled job executed (safe test mode).")
    return {"output": False}

def _call_job(*args, **kwargs):   # tolerant wrapper
    return scheduled_job(*args, **kwargs)

SAFE_SCHEDULED_JOB_FN = _call_job
print("✅ M4c.3 — tolerant _call_job bound as SAFE_SCHEDULED_JOB_FN.")


✅ M4c.3 — tolerant _call_job bound as SAFE_SCHEDULED_JOB_FN.


In [7]:
## @title M4c.3 — Scheduler Wiring (Safe Test Job, Tolerant Wrapper)
def scheduled_job(*args, **kwargs):
    print("✅ Scheduled job executed (safe test mode).")
    return {"output": False}

def _call_job(*args, **kwargs):      # tolerant wrapper
    return scheduled_job(*args, **kwargs)

SAFE_SCHEDULED_JOB_FN = _call_job
print("✅ M4c.3 — tolerant _call_job bound as SAFE_SCHEDULED_JOB_FN.")


✅ M4c.3 — tolerant _call_job bound as SAFE_SCHEDULED_JOB_FN.


In [10]:
## C3 — Enable Scheduler (Phase A) — Patched Rebind (Telemetry Compat)
# @title C3 — Enable Scheduler (Phase A) — Patched Rebind (Telemetry Compat)
# Purpose:
# - Enable scheduler + bind the safe job.
# - Populate legacy/alias flags that M5 may check.
# - Write a scheduler state JSON into workspace/outputs for M5 to detect.
# - Idempotent and safe to re-run.

import os, json
from datetime import datetime, timezone

# ---- config (idempotent flags) ----
scheduler_enabled = True
scheduler_interval = 60  # seconds

def _iso_utc():
    return datetime.now(timezone.utc).isoformat().replace("+00:00", "Z")

def _bind_scheduler():
    # Prefer explicit safe wrapper; fallback to scheduled_job
    fn = globals().get("SAFE_SCHEDULED_JOB_FN", None)
    if not callable(fn):
        cand = globals().get("scheduled_job", None)
        fn = cand if callable(cand) else None

    # ---- state flags / aliases (for telemetry compatibility) ----
    if callable(fn):
        globals()["ACTIVE_SCHEDULED_JOB_FN"] = fn
        globals()["SCHEDULER_ON"] = True
        globals()["SCHEDULER_STATE"] = "running"
        globals()["M4C_SCHEDULER_STATE"] = "running"
        globals()["SCHEDULER_INTERVAL_S"] = scheduler_interval
        globals()["LAST_SCHEDULER_BIND_TS"] = _iso_utc()
        bound_to = fn.__name__
        ok = True
        msg = f"✅ Scheduler enabled — interval set to {scheduler_interval}s; bound to {bound_to}"
    else:
        globals()["ACTIVE_SCHEDULED_JOB_FN"] = None
        globals()["SCHEDULER_ON"] = True
        globals()["SCHEDULER_STATE"] = "stopped"
        globals()["M4C_SCHEDULER_STATE"] = "stopped"
        globals()["SCHEDULER_INTERVAL_S"] = scheduler_interval
        globals()["LAST_SCHEDULER_BIND_TS"] = _iso_utc()
        bound_to = None
        ok = False
        msg = f"⚠️ Scheduler enabled but no callable job found. Interval={scheduler_interval}s"

    # ---- telemetry drop (Phase A-safe path) ----
    try:
        beast_root = globals().get("BEAST_ROOT", "/content/drive/MyDrive/Beast_PhaseA")
        out_dir = os.path.join(beast_root, "workspace", "outputs", "scheduler")
        os.makedirs(out_dir, exist_ok=True)
        state = {
            "ok": ok,
            "state": globals().get("SCHEDULER_STATE"),
            "m4c_state": globals().get("M4C_SCHEDULER_STATE"),
            "interval_s": scheduler_interval,
            "bound_to": bound_to,
            "last_bind_ts": globals().get("LAST_SCHEDULER_BIND_TS"),
            "source": "C3_telemetry_compat",
        }
        with open(os.path.join(out_dir, "state.json"), "w", encoding="utf-8") as f:
            json.dump(state, f, indent=2, ensure_ascii=False)
    except Exception as e:
        print(f"⚠️ Telemetry file write failed: {e}")

    print(msg)

# Perform (re)bind now
_bind_scheduler()


✅ Scheduler enabled — interval set to 60s; bound to _call_job


### M5 — Scheduler State Probe (Read-only)
_Read-only snapshot of C3 telemetry at `/workspace/outputs/scheduler/state.json`._


In [12]:
## @title M5 — Scheduler State Probe (Read-only)
# Purpose: Read C3's telemetry file and print scheduler state. No side effects.
# Safe to keep or remove later.

import os, json, time
from pathlib import Path

def _beast_root():
    return globals().get("BEAST_ROOT", "/content/drive/MyDrive/BEAST")

def _scheduler_state_path():
    return os.path.join(_beast_root(), "workspace", "outputs", "scheduler", "state.json")

def probe_scheduler_state():
    p = _scheduler_state_path()
    if not os.path.exists(p):
        return {"ok": False, "note": "state.json not found", "path": p}
    try:
        with open(p, "r", encoding="utf-8") as f:
            return json.load(f)
    except Exception as e:
        return {"ok": False, "note": f"read error: {e}", "path": p}

# Convenience summaries (used by acceptance cells; safe if unused)
def scheduler_status():
    st = probe_scheduler_state()
    if not st.get("ok"):
        return f"Scheduler: UNKNOWN ({st.get('note','no file')})"
    state = st.get("state","?")
    bound = st.get("bound_to","?")
    interval = st.get("interval_s","?")
    return f"Scheduler: {state} • bound_to={bound} • interval={interval}s"

def c3_status():
    # Alias for human summary; keeps acceptance flow happy
    return scheduler_status()

def c3_show_last_logs(n=5):
    # Best-effort: list newest log files under /workspace/logs
    logs_dir = Path(_beast_root()) / "workspace" / "logs"
    if not logs_dir.exists():
        print("(no logs dir)")
        return
    files = [p for p in logs_dir.rglob("*") if p.is_file()]
    files.sort(key=lambda p: p.stat().st_mtime, reverse=True)
    for p in files[:max(1,int(n))]:
        ts = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(p.stat().st_mtime))
        print(f"{ts}  {p.as_posix()}")

# Print snapshot when the cell runs
_state = probe_scheduler_state()
print("📡 Scheduler state:", json.dumps(_state, indent=2, ensure_ascii=False))


📡 Scheduler state: {
  "ok": false,
  "note": "state.json not found",
  "path": "/content/drive/MyDrive/BEAST/workspace/outputs/scheduler/state.json"
}


### Audit — Run/Click (Acceptance Flow)
One-click, read-only summary of C3 + Scheduler status and recent logs.


In [None]:
## @title Audit — Run/Click (Acceptance Flow)
# Safe: read-only snapshots. Uses helpers from the Probe cell (c3_status, scheduler_status, c3_show_last_logs).

def run_acceptance_flow(n_logs: int = 5):
    print("== Acceptance: START ==")
    ok = True
    try:
        print("\n[C3 STATUS]")
        print(c3_status())
        print(f"\n[Last {n_logs} logs]")
        c3_show_last_logs(n_logs)
    except Exception as e:
        print("C3 snapshot error:", repr(e))
        ok = False

    try:
        print("\n[SCHEDULER STATUS]")
        print(scheduler_status())
    except Exception as e:
        print("Scheduler snapshot error:", repr(e))
        ok = False

    print("\n== Acceptance:", "PASS ✅" if ok else "ATTENTION NEEDED ⚠️", "==")
    return ok


In [13]:
## @title Audit — Run/Click (Acceptance Flow)
# Safe: read-only snapshots. Uses helpers from the Probe cell (c3_status, scheduler_status, c3_show_last_logs).

def run_acceptance_flow(n_logs: int = 5):
    print("== Acceptance: START ==")
    ok = True
    try:
        print("\n[C3 STATUS]")
        print(c3_status())
        print(f"\n[Last {n_logs} logs]")
        c3_show_last_logs(n_logs)
    except Exception as e:
        print("C3 snapshot error:", repr(e))
        ok = False

    try:
        print("\n[SCHEDULER STATUS]")
        print(scheduler_status())
    except Exception as e:
        print("Scheduler snapshot error:", repr(e))
        ok = False

    print("\n== Acceptance:", "PASS ✅" if ok else "ATTENTION NEEDED ⚠️", "==")
    return ok


In [11]:
## @title M5 — Scheduler State Probe (Read-only)
# Purpose: read C3's telemetry and provide helpers used by acceptance.
# No side effects.

import os, json, time
from pathlib import Path

def _beast_root():
    # Falls back to /content/drive/MyDrive/BEAST if BEAST_ROOT not set
    return globals().get("BEAST_ROOT", "/content/drive/MyDrive/BEAST")

def _scheduler_state_path():
    return os.path.join(_beast_root(), "workspace", "outputs", "scheduler", "state.json")

def probe_scheduler_state():
    p = _scheduler_state_path()
    if not os.path.exists(p):
        return {"ok": False, "note": "state.json not found", "path": p}
    try:
        with open(p, "r", encoding="utf-8") as f:
            return json.load(f)
    except Exception as e:
        return {"ok": False, "note": f"read error: {e}", "path": p}

# ----- helpers used by acceptance -----
def scheduler_status():
    st = probe_scheduler_state()
    if not st.get("ok"):
        return f"Scheduler: UNKNOWN ({st.get('note','no file')})"
    state = st.get("state","?")
    bound = st.get("bound_to","?")
    interval = st.get("interval_s","?")
    return f"Scheduler: {state} • bound_to={bound} • interval={interval}s"

def c3_status():
    # Alias for human summary; same as scheduler_status in Phase A
    return scheduler_status()

def c3_show_last_logs(n=5):
    logs_dir = Path(_beast_root()) / "workspace" / "logs"
    if not logs_dir.exists():
        print("(no logs dir)")
        return
    files = [p for p in logs_dir.rglob("*") if p.is_file()]
    files.sort(key=lambda p: p.stat().st_mtime, reverse=True)
    for p in files[:max(1,int(n))]:
        ts = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(p.stat().st_mtime))
        print(f"{ts}  {p.as_posix()}")

# Print snapshot when this cell runs
print("📡 Scheduler state:", json.dumps(probe_scheduler_state(), indent=2, ensure_ascii=False))


📡 Scheduler state: {
  "ok": false,
  "note": "state.json not found",
  "path": "/content/drive/MyDrive/BEAST/workspace/outputs/scheduler/state.json"
}


In [12]:
# Scratch checker for scheduler telemetry file
try:
    p = _scheduler_state_path()   # defined by the M5 probe cell
except NameError:
    # If you haven't run the M5 probe cell yet, do that first, then re-run this
    p = '/content/drive/MyDrive/BEAST/workspace/outputs/scheduler/state.json'

import os, time
print("path:", p)
print("exists(initial):", os.path.exists(p))
if not os.path.exists(p):
    time.sleep(2)  # small Drive sync pause
    print("exists(recheck 2s):", os.path.exists(p))


path: /content/drive/MyDrive/BEAST/workspace/outputs/scheduler/state.json
exists(initial): False
exists(recheck 2s): False


In [10]:
run_acceptance_flow()

== Acceptance: START ==

[C3 STATUS]
Scheduler: UNKNOWN (state.json not found)

[Last 5 logs]
(no logs dir)

[SCHEDULER STATUS]
Scheduler: UNKNOWN (state.json not found)

== Acceptance: PASS ✅ ==


True

## M5 — Health & Telemetry


In [4]:
# ===============================
# Beast — Phase A (MSMD ULTRA)
# Cell Label: M5 — Health & Telemetry Console
# ===============================
# Purpose:
# A live health dashboard for Beast Phase‑A. Runs preflight checks, shows
# disk/backup/status telemetry, watches schedulers, and can export a one‑click
# health report. Safe, read‑mostly with gentle auto‑fixes (create folders).
#
# What you get
# - Overview cards: Drive, Disk, Backups ⭐, Outputs, TZ/Clock, Schedulers
# - Checks tab: Preflight, Read/Write smoke test, Export JSON/Text report
# - Logs & Lists: tail session log; list recent backups / stars / outputs
# - Thresholds: configurable (Disk Free, Latest Backup age, Latest Output age)
#
# PREREQ: Run M1 to mount Drive and create the BEAST root.
# SAFE: Does not delete or overwrite user data. Auto‑fix only creates missing
#       folders under BEAST root.
#
# MSMD — Monkey See, Monkey Do
# 1) Click "Scan Health" → read status
# 2) (Optional) Adjust thresholds
# 3) If something is red, use the buttons (Backup Now in M4e, Run Job in M4d,
#    One‑shot/Scheduler in M4c, or Bootstrap via M4b)
# 4) Export Health Report if you want an artifact for support

import os, json, shutil, zipfile, math, platform
from datetime import datetime, timezone
from pathlib import Path

from IPython.display import display, clear_output
import ipywidgets as W

# Detect Colab / Drive (informational)
try:
    from google.colab import drive as gdrive
    IN_COLAB = True
except Exception:
    IN_COLAB = False

try:
    import psutil  # optional
except Exception:
    psutil = None

try:
    import torch  # optional GPU flag
except Exception:
    torch = None

# -----------------
# App Config
# -----------------
APP = {
    "LABEL": "Beast — Phase A",
    "CELL_ID": "M5",
    "ROOT": "/content/drive/MyDrive/BEAST",
    "WORKSPACE": "workspace",
    "OUTPUTS": "workspace/outputs",           # generic outputs root
    "BACKUPS": "backups",
    "KNOWN_GOOD": "backups/known_good",
    "ARCHIVE": "backups/Archive",
    "LOG_FILE": "logs/beast_session.log",
    "CONFIG_FILE": "config/settings.json",
}

ROOT = Path(APP["ROOT"])  # Path object
WORKSPACE = ROOT / APP["WORKSPACE"]
OUTPUTS = ROOT / APP["OUTPUTS"]
BACKUPS = ROOT / APP["BACKUPS"]
KNOWN_GOOD = ROOT / APP["KNOWN_GOOD"]
ARCHIVE = ROOT / APP["ARCHIVE"]
LOGF = ROOT / APP["LOG_FILE"]
CFG  = ROOT / APP["CONFIG_FILE"]
FALLBACK_LOG = Path("/content/beast_session.log")

# -----------------
# Logging
# -----------------

def _now():
    return datetime.now().strftime("%H:%M:%S")

def _stamp():
    return datetime.now().strftime("%Y-%m-%d_%H-%M-%S")

def log(msg: str):
    line = f"[{_now()}] {msg}"
    print(line)
    target = LOGF if ROOT.exists() else FALLBACK_LOG
    try:
        target.parent.mkdir(parents=True, exist_ok=True)
        with open(target, "a", encoding="utf-8") as f:
            f.write(line + "\n")
    except Exception:
        pass

# -----------------
# Helpers
# -----------------

def is_drive_mounted():
    if not IN_COLAB:
        return True
    return os.path.isdir("/content/drive") and (
        os.path.isdir("/content/drive/MyDrive") or os.path.isdir("/content/drive/My Drive")
    )


def ensure_paths():
    ROOT.mkdir(parents=True, exist_ok=True)
    (ROOT / "logs").mkdir(parents=True, exist_ok=True)
    WORKSPACE.mkdir(parents=True, exist_ok=True)
    OUTPUTS.mkdir(parents=True, exist_ok=True)
    BACKUPS.mkdir(parents=True, exist_ok=True)
    KNOWN_GOOD.mkdir(parents=True, exist_ok=True)
    ARCHIVE.mkdir(parents=True, exist_ok=True)


def human_bytes(n):
    try:
        n = float(n)
    except Exception:
        return str(n)
    units = ["B","KB","MB","GB","TB"]
    i = 0
    while n >= 1024 and i < len(units)-1:
        n /= 1024.0
        i += 1
    return f"{n:.2f} {units[i]}"


def disk_usage(path: Path):
    try:
        if psutil:
            d = psutil.disk_usage(str(path))
            return d.total, d.used, d.free
        import shutil as _sh
        u = _sh.disk_usage(str(path))
        return u.total, u.used, u.free
    except Exception:
        return None, None, None


def list_recent(folder: Path, patterns=("*.zip",), limit=10):
    try:
        files = []
        for pat in patterns:
            files.extend(folder.rglob(pat))
        files = [p for p in files if p.is_file()]
        files.sort(key=lambda p: p.stat().st_mtime, reverse=True)
        return files[:limit]
    except Exception:
        return []


def age_hours(p: Path):
    try:
        secs = datetime.now().timestamp() - p.stat().st_mtime
        return round(secs/3600.0, 2)
    except Exception:
        return None


def get_timezone():
    try:
        tz = globals().get("BEAST_TZ") or os.environ.get('TZ') or "(process default)"
    except Exception:
        tz = os.environ.get('TZ') or "(process default)"
    return tz

# -----------------
# Scan & Telemetry
# -----------------

def scan_beast(thresholds=None):
    ensure_paths()
    th = {
        "disk_free_gb_min": 2.0,
        "backup_age_h_max": 72.0,
        "output_age_h_max": 48.0,
    }
    if thresholds:
        th.update(thresholds)

    mount = is_drive_mounted()

    # Disk
    total, used, free = disk_usage(ROOT)
    pct_free = round((free/total)*100, 2) if total and free is not None else None

    # Backups & Stars
    backups = list_recent(BACKUPS, patterns=("*.zip",))
    backups = [p for p in backups if KNOWN_GOOD not in p.parents and ARCHIVE not in p.parents]
    stars = list_recent(KNOWN_GOOD, patterns=("*.zip",))

    # Outputs (JSON/CSV common from M4b job)
    outputs = list_recent(OUTPUTS, patterns=("*.json","*.csv"))

    # Ages
    latest_bk_age = age_hours(backups[0]) if backups else None
    latest_star_age = age_hours(stars[0]) if stars else None
    latest_out_age = age_hours(outputs[0]) if outputs else None

    # Schedulers (A1 / M4c aware)
    a1_running = False
    a1_interval = None
    try:
        if 'STATE' in globals():
            a1_running = bool(globals()['STATE'].get('scheduler_running'))
            a1_interval = globals()['STATE'].get('scheduler_interval')
    except Exception:
        pass

    m4c_running = False
    m4c_interval = None
    try:
        if 'M4C_STATE' in globals():
            m4c_running = bool(globals()['M4C_STATE'].get('scheduler_running'))
            m4c_interval = globals()['M4C_STATE'].get('interval')
    except Exception:
        pass

    # TZ & clocks
    tzlabel = get_timezone()
    now_local = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
    now_utc = datetime.now(timezone.utc).strftime('%Y-%m-%d %H:%M:%S UTC')

    # GPU
    gpu = False
    gpu_name = None
    try:
        if torch is not None and hasattr(torch, 'cuda'):
            gpu = torch.cuda.is_available()
            if gpu:
                gpu_name = torch.cuda.get_device_name(0)
    except Exception:
        pass

    # Host
    host = {
        "python": platform.python_version(),
        "platform": platform.platform(),
        "machine": platform.machine(),
        "processor": platform.processor(),
    }

    # Status evaluation
    disk_ok = (free is not None) and (free/1024**3 >= th['disk_free_gb_min'])
    backup_ok = (latest_bk_age is None) or (latest_bk_age <= th['backup_age_h_max'])
    output_ok = (latest_out_age is None) or (latest_out_age <= th['output_age_h_max'])

    return {
        "thresholds": th,
        "mount": mount,
        "paths": {
            "root": str(ROOT),
            "workspace": str(WORKSPACE),
            "outputs": str(OUTPUTS),
            "backups": str(BACKUPS),
            "stars": str(KNOWN_GOOD),
            "archive": str(ARCHIVE),
            "log": str(LOGF),
        },
        "disk": {
            "total": total, "used": used, "free": free, "pct_free": pct_free,
            "ok": bool(disk_ok)
        },
        "counts": {
            "backups": len(backups),
            "stars": len(stars),
            "outputs": len(outputs),
        },
        "latest": {
            "backup": backups[0].name if backups else None,
            "backup_age_h": latest_bk_age,
            "star": stars[0].name if stars else None,
            "star_age_h": latest_star_age,
            "output": outputs[0].name if outputs else None,
            "output_age_h": latest_out_age,
        },
        "ok": {
            "backup_recent": bool(backup_ok),
            "output_recent": bool(output_ok),
        },
        "schedulers": {
            "a1_running": a1_running, "a1_interval": a1_interval,
            "m4c_running": m4c_running, "m4c_interval": m4c_interval,
        },
        "clock": {"tz": tzlabel, "now_local": now_local, "now_utc": now_utc},
        "gpu": {"available": gpu, "name": gpu_name},
        "host": host,
        "_lists": {"backups": [str(p) for p in backups],
                    "stars": [str(p) for p in stars],
                    "outputs": [str(p) for p in outputs]},
    }

# -----------------
# UI bits
# -----------------
H = W.HTML

banner = H(
    f"""
<div style='font-family:system-ui,Segoe UI,Arial; line-height:1.35; padding:12px; border:1px solid #e5e7eb; border-radius:12px;'>
  <div style='font-size:18px; font-weight:700;'>🩺 {APP['LABEL']} — M5 Health & Telemetry <span style='opacity:.65'>(Cell {APP['CELL_ID']})</span></div>
  <div>Root: <code>{APP['ROOT']}</code> | Outputs: <code>{APP['OUTPUTS']}</code> | Backups: <code>{APP['BACKUPS']}</code></div>
  <div>Use Scan Health to refresh. Thresholds drive green/yellow/red states.</div>
</div>
"""
)

# Controls
th_disk = W.BoundedFloatText(value=2.0, min=0.1, max=1000.0, step=0.1, description="Min Free GB")
th_bk   = W.BoundedFloatText(value=72.0, min=1.0, max=720.0, step=1.0, description="Backup ≤ h")
th_out  = W.BoundedFloatText(value=48.0, min=1.0, max=720.0, step=1.0, description="Output ≤ h")

btn_scan   = W.Button(description="Scan Health", tooltip="Collect live telemetry")
btn_export = W.Button(description="Export Report", tooltip="Save JSON+txt report to logs/health/")
btn_pref   = W.Button(description="Preflight (create missing folders)")
btn_logs   = W.Button(description="Open Logs", tooltip="Tail last session log lines")

summary_box = W.HTML("")
status_box  = W.HTML("")

# Lists & logs output
out = W.Output(layout={"border":"1px solid #e5e7eb", "padding":"8px", "borderRadius":"8px"})

# Render helpers

def _pill(ok, text):
    color = "#16a34a" if ok else "#dc2626"
    return f"<span style='background:{color}1A;color:{color};padding:2px 8px;border-radius:999px;font-weight:600;'>{text}</span>"


def _warnpill(warn, text):
    color = "#ca8a04" if warn else "#16a34a"
    return f"<span style='background:{color}1A;color:{color};padding:2px 8px;border-radius:999px;font-weight:600;'>{text}</span>"


def _card(label, body):
    return (
        "<div style='flex:1;min-width:220px;border:1px solid #e5e7eb;border-radius:12px;padding:12px;margin:6px'>"
        f"<div style='font-weight:700;margin-bottom:6px'>{label}</div>"
        f"<div style='opacity:.9'>{body}</div>"
        "</div>"
    )


def render_summary(s):
    if not s:
        return ""
    free_gb = (s['disk']['free'] or 0) / (1024**3) if s['disk']['free'] is not None else 0
    total_gb = (s['disk']['total'] or 0) / (1024**3) if s['disk']['total'] is not None else 0

    drive_card = _card(
        "Drive",
        f"Mounted: {_pill(s['mount'], 'Yes' if s['mount'] else 'No')}<br>" \
        f"Root exists: {_pill(Path(s['paths']['root']).exists(), 'OK')}"
    )

    disk_ok = s['disk']['ok']
    disk_card = _card(
        "Disk",
        f"Free: <b>{free_gb:.2f} GB</b> ({s['disk']['pct_free'] or '?'}%)<br>" \
        f"Total: {total_gb:.2f} GB<br>" \
        f"Status: { _warnpill(not disk_ok, 'OK' if disk_ok else 'LOW') } (≥ {th_disk.value} GB)"
    )

    bk_ok = s['ok']['backup_recent']
    bk_age = s['latest']['backup_age_h']
    bk_label = '—' if bk_age is None else f"{bk_age} h"
    backups_card = _card(
        "Backups",
        f"Count: {s['counts']['backups']}<br>Latest: {s['latest']['backup'] or '—'}<br>Age: {bk_label}<br>" \
        f"Fresh: { _warnpill(not bk_ok, 'OK' if bk_ok else 'STALE') } (≤ {th_bk.value} h)"
    )

    out_ok = s['ok']['output_recent']
    out_age = s['latest']['output_age_h']
    out_label = '—' if out_age is None else f"{out_age} h"
    outputs_card = _card(
        "Outputs",
        f"Count: {s['counts']['outputs']}<br>Latest: {s['latest']['output'] or '—'}<br>Age: {out_label}<br>" \
        f"Fresh: { _warnpill(not out_ok, 'OK' if out_ok else 'STALE') } (≤ {th_out.value} h)"
    )

    sched = s['schedulers']
    sched_card = _card(
        "Schedulers",
        f"A1: {'🟢 running' if sched['a1_running'] else '⚪ stopped'}" \
        + (f" @ {sched['a1_interval']}s" if sched['a1_interval'] else "") + "<br>" \
        f"M4c: {'🟢 running' if sched['m4c_running'] else '⚪ stopped'}" \
        + (f" @ {sched['m4c_interval']}s" if sched['m4c_interval'] else "")
    )

    clock = s['clock']
    tz_card = _card(
        "Clock / TZ",
        f"Local: <b>{clock['now_local']}</b><br>UTC: {clock['now_utc']}<br>TZ: {clock['tz']}"
    )

    gpu = s['gpu']
    gpu_card = _card(
        "Compute",
        f"GPU: {'🟢 ' + gpu['name'] if gpu['available'] else '⚪ none'}<br>" \
        f"Python: {s['host']['python']}<br>Platform: {s['host']['platform']}"
    )

    grid = (
        "<div style='display:flex;flex-wrap:wrap;margin:-6px'>" \
        + drive_card + disk_card + backups_card + outputs_card + sched_card + tz_card + gpu_card \
        + "</div>"
    )
    return grid

# Global state for latest scan
STATE = {"last": None}

# Handlers

def run_scan(_=None):
    th = {
        'disk_free_gb_min': float(th_disk.value),
        'backup_age_h_max': float(th_bk.value),
        'output_age_h_max': float(th_out.value),
    }
    s = scan_beast(thresholds=th)
    STATE['last'] = s
    summary_box.value = render_summary(s)
    # small status line
    warn_bits = []
    if not s['disk']['ok']: warn_bits.append("Disk low")
    if not s['ok']['backup_recent']: warn_bits.append("Backups stale")
    if not s['ok']['output_recent']: warn_bits.append("Outputs stale")
    if not s['mount']: warn_bits.append("Drive not mounted")
    badge = " | ".join(warn_bits) if warn_bits else "All systems look good."
    status_box.value = f"<div style='opacity:.8'>Status: {badge}</div>"


def do_preflight(_):
    with out:
        ensure_paths()
        for p in [WORKSPACE, OUTPUTS, BACKUPS, KNOWN_GOOD, ARCHIVE]:
            log(f"PATH: {p} exists={p.exists()}")
        log("Preflight: basic folders ensured ✅")


def export_report(_):
    with out:
        s = STATE.get('last') or scan_beast()
        report_dir = ROOT / 'logs' / 'health'
        report_dir.mkdir(parents=True, exist_ok=True)
        jpath = report_dir / f"health_report_{_stamp()}.json"
        tpath = report_dir / f"health_report_{_stamp()}.txt"
        # write JSON
        jpath.write_text(json.dumps(s, indent=2), encoding='utf-8')
        # write text summary
        lines = [
            "# Beast Health Report",
            f"Generated: {_stamp()}",
            f"Root: {s['paths']['root']}",
            f"Disk: free={human_bytes(s['disk']['free'] or 0)} ({s['disk']['pct_free']}%) total={human_bytes(s['disk']['total'] or 0)}",
            f"Backups: count={s['counts']['backups']} latest={s['latest']['backup']} age={s['latest']['backup_age_h']}h",
            f"Stars: count={s['counts']['stars']} latest={s['latest']['star']} age={s['latest']['star_age_h']}h",
            f"Outputs: count={s['counts']['outputs']} latest={s['latest']['output']} age={s['latest']['output_age_h']}h",
            f"Schedulers: A1={'running' if s['schedulers']['a1_running'] else 'stopped'}; M4c={'running' if s['schedulers']['m4c_running'] else 'stopped'}",
            f"Clock: {s['clock']['now_local']} (TZ={s['clock']['tz']}) | {s['clock']['now_utc']}",
            "",
        ]
        tpath.write_text("\n".join(lines), encoding='utf-8')
        log(f"REPORT: wrote {jpath}")
        log(f"REPORT: wrote {tpath}")


def tail_logs(_):
    with out:
        log_path = LOGF if ROOT.exists() else FALLBACK_LOG
        if not log_path.exists():
            log(f"No log yet at {log_path}")
            return
        tail = 160
        log(f"Showing last {tail} lines of {log_path}")
        with open(log_path, 'r', encoding='utf-8') as f:
            lines = f.readlines()[-tail:]
        print("\n".join(lines).rstrip())

# Bind
btn_scan.on_click(run_scan)
btn_export.on_click(export_report)
btn_pref.on_click(do_preflight)
btn_logs.on_click(tail_logs)

# Layout & Render
row0 = W.HBox([th_disk, th_bk, th_out])
row1 = W.HBox([btn_scan, btn_export, btn_pref, btn_logs])

clear_output()
display(banner)
display(row0)
display(row1)
display(summary_box)
display(status_box)
display(out)

# Initial scan for convenience
run_scan()


HTML(value="\n<div style='font-family:system-ui,Segoe UI,Arial; line-height:1.35; padding:12px; border:1px sol…

HBox(children=(BoundedFloatText(value=2.0, description='Min Free GB', max=1000.0, min=0.1, step=0.1), BoundedF…

HBox(children=(Button(description='Scan Health', style=ButtonStyle(), tooltip='Collect live telemetry'), Butto…

HTML(value='')

HTML(value='')

Output(layout=Layout(border='1px solid #e5e7eb', padding='8px'))

In [None]:
# @title M6 — Install Backtest Job: BT-B (RSI/Breakout)
# 📦 Installs the BT-B Backtest job (RSI/Breakout strategy) into /workspace/projects/backtest/run_backtest/
# Creates config, registry entry, test script, logs/outputs folders, and integrates with A1.

import os, json, textwrap, datetime

base_dir = "/workspace/projects/backtest/run_backtest"
config_dir = "/workspace/projects/backtest/config"
test_dir = "/workspace/projects/backtest/tests"
registry_dir = "/workspace/registry/jobs"
logs_dir = "/workspace/logs/backtest"
outputs_dir = "/workspace/outputs/backtest/BT-B"

# Ensure directories exist
for d in [base_dir, config_dir, test_dir, registry_dir, logs_dir, outputs_dir]:
    os.makedirs(d, exist_ok=True)

# === main.py ===
main_py = textwrap.dedent("""\
    import pandas as pd
    import json
    import os
    from datetime import datetime

    def run_backtest(symbol="TSLA", timeframe="5m", start_date=None, end_date=None,
                     rsi_length=14, rsi_ob=70, rsi_os=30,
                     breakout_lookback=20, breakout_mult=1.0,
                     slippage=0.0, commission=0.0):

        # Placeholder: Replace with your data loader
        # For now, simulate data
        dates = pd.date_range(start="2024-01-01", periods=500, freq="5T")
        prices = pd.Series(range(len(dates))) + 100
        df = pd.DataFrame({"close": prices}, index=dates)

        # Simple RSI calculation
        delta = df['close'].diff()
        gain = (delta.where(delta > 0, 0)).rolling(rsi_length).mean()
        loss = (-delta.where(delta < 0, 0)).rolling(rsi_length).mean()
        rs = gain / loss
        df['rsi'] = 100 - (100 / (1 + rs))

        # Breakout detection
        df['breakout_high'] = df['close'] > df['close'].rolling(breakout_lookback).max().shift(1)
        df['breakout_low'] = df['close'] < df['close'].rolling(breakout_lookback).min().shift(1)

        trades = []
        position = None
        for i, row in df.iterrows():
            if position is None:
                if row['rsi'] < rsi_os or row['breakout_high']:
                    position = ('long', row['close'])
                    trades.append({"time": i, "action": "BUY", "price": row['close']})
                elif row['rsi'] > rsi_ob or row['breakout_low']:
                    position = ('short', row['close'])
                    trades.append({"time": i, "action": "SELL", "price": row['close']})
            else:
                # Exit after 10 bars for demo
                if len(trades) % 2 == 1 and len(trades) >= 2 and (i - trades[-1]['time']).seconds >= 10*60:
                    action = "SELL" if position[0] == "long" else "BUY"
                    trades.append({"time": i, "action": action, "price": row['close']})
                    position = None

        # Save outputs
        run_stamp = datetime.now().strftime("%Y%m%d_%H%M%S")
        output_file = os.path.join("workspace/outputs/backtest/BT-B", f"trades_{run_stamp}.csv")
        pd.DataFrame(trades).to_csv(output_file, index=False)

        summary = {
            "symbol": symbol,
            "trades": len(trades),
            "start": str(df.index.min()),
            "end": str(df.index.max())
        }
        summary_file = os.path.join("workspace/outputs/backtest/BT-B", f"summary_{run_stamp}.json")
        with open(summary_file, "w") as f:
            json.dump(summary, f, indent=2)

        return summary

    if __name__ == "__main__":
        print(run_backtest())
""")

with open(os.path.join(base_dir, "main.py"), "w") as f:
    f.write(main_py)

# === Config file ===
config_yaml = textwrap.dedent("""\
    symbol: TSLA
    timeframe: 5m
    start_date: null
    end_date: null
    rsi_length: 14
    rsi_ob: 70
    rsi_os: 30
    breakout_lookback: 20
    breakout_mult: 1.0
    slippage: 0.0
    commission: 0.0
""")
with open(os.path.join(config_dir, "bt_b.yaml"), "w") as f:
    f.write(config_yaml)

# === Test script ===
test_py = textwrap.dedent("""\
    from run_backtest.main import run_backtest

    def test_run():
        summary = run_backtest()
        assert "trades" in summary
        assert summary["trades"] >= 0

    if __name__ == "__main__":
        test_run()
        print("BT-B basic test passed.")
""")
with open(os.path.join(test_dir, "test_bt_b.py"), "w") as f:
    f.write(test_py)

# === Registry entry ===
registry_entry = {
    "id": "bt_b",
    "name": "BT-B Backtest (RSI/Breakout)",
    "path": "projects/backtest/run_backtest/main.py",
    "config": "projects/backtest/config/bt_b.yaml",
    "type": "backtest"
}
with open(os.path.join(registry_dir, "bt_b.json"), "w") as f:
    json.dump(registry_entry, f, indent=2)

print("✅ BT-B job installed successfully")
print("✅ Registry updated: bt_b.json")
print("✅ Config saved: bt_b.yaml")
print("📂 Outputs: /workspace/outputs/backtest/BT-B/")
print("📂 Logs: /workspace/logs/backtest/")


✅ BT-B job installed successfully
✅ Registry updated: bt_b.json
✅ Config saved: bt_b.yaml
📂 Outputs: /workspace/outputs/backtest/BT-B/
📂 Logs: /workspace/logs/backtest/


In [None]:
#@title "📂 Suggestion Pool (Interactive)"
# ===============================
# Suggestion Pool (Interactive)
# ===============================
# Capture ideas, triage with tags/priority, upvote/downvote, star, mark done,
# filter, and export/import. Stores to Drive under BEAST/logs/suggestions/.
#
# MSMD — Monkey See, Monkey Do
# 1) Type a suggestion + tags → Add
# 2) Use ★ / ▲ / ▼ / Done / 🗑 per row
# 3) Filter by text/tags/starred; Sort
# 4) Save (auto on change), Export CSV, Import CSV
#
# SAFE: Writes JSON/CSV only inside BEAST root.

import json, csv, re
from datetime import datetime
from pathlib import Path
from typing import List, Dict, Any

from IPython.display import display, clear_output
import ipywidgets as W

# -----------------
# Config & Paths
# -----------------
DEFAULT_APP = {
    "LABEL": "Beast — Phase A",
    "ROOT": "/content/drive/MyDrive/BEAST",
    "LOG_FILE": "logs/beast_session.log",
}
APP = globals().get("APP", DEFAULT_APP)
ROOT = Path(APP["ROOT"])
SUG_DIR = ROOT / "logs" / "suggestions"
SUG_JSON = SUG_DIR / "suggestions.json"
SUG_CSV  = SUG_DIR / "suggestions.csv"

def _now_stamp():
    return datetime.now().strftime("%Y-%m-%d_%H-%M-%S")

def _log(msg: str):
    print(msg)

def _ensure_dirs():
    ROOT.mkdir(parents=True, exist_ok=True)
    (ROOT / "logs").mkdir(parents=True, exist_ok=True)
    SUG_DIR.mkdir(parents=True, exist_ok=True)

# -----------------
# Data Model
# -----------------
# item = {
#   "id": str,            # unique id
#   "text": str,
#   "tags": List[str],
#   "priority": "low|medium|high",
#   "score": int,         # votes
#   "starred": bool,
#   "done": bool,
#   "created": "YYYY-MM-DD_HH-MM-SS"
# }

STATE = {"items": []}

def load_data():
    _ensure_dirs()
    if SUG_JSON.exists():
        try:
            STATE["items"] = json.loads(SUG_JSON.read_text(encoding="utf-8"))
        except Exception as e:
            _log(f"LOAD WARN: {e}; starting fresh")
            STATE["items"] = []
    else:
        STATE["items"] = []

def save_data():
    _ensure_dirs()
    SUG_JSON.write_text(json.dumps(STATE["items"], indent=2), encoding="utf-8")
    _log(f"Saved {len(STATE['items'])} item(s) → {SUG_JSON}")

def export_csv():
    _ensure_dirs()
    with open(SUG_CSV, "w", newline="", encoding="utf-8") as f:
        w = csv.writer(f)
        w.writerow(["id","text","tags","priority","score","starred","done","created"])
        for it in STATE["items"]:
            w.writerow([
                it.get("id",""),
                it.get("text",""),
                ",".join(it.get("tags",[])),
                it.get("priority",""),
                it.get("score",0),
                "1" if it.get("starred") else "0",
                "1" if it.get("done") else "0",
                it.get("created",""),
            ])
    _log(f"Exported CSV → {SUG_CSV}")

def import_csv(file_bytes: bytes):
    try:
        rows = file_bytes.decode("utf-8").splitlines()
        r = csv.DictReader(rows)
        imported = []
        for row in r:
            imported.append({
                "id": row.get("id") or f"imp-{_now_stamp()}",
                "text": row.get("text","").strip(),
                "tags": [t.strip() for t in (row.get("tags","") or "").split(",") if t.strip()],
                "priority": (row.get("priority","") or "medium").lower(),
                "score": int(row.get("score") or 0),
                "starred": (row.get("starred") in ("1","true","True")),
                "done": (row.get("done") in ("1","true","True")),
                "created": row.get("created") or _now_stamp(),
            })
        # simple merge: append; skip exact text duplicates
        existing_texts = set(it["text"].strip().lower() for it in STATE["items"])
        new_items = [it for it in imported if it["text"].strip().lower() not in existing_texts and it["text"].strip()]
        STATE["items"].extend(new_items)
        save_data()
        _log(f"Imported {len(new_items)} new item(s).")
        return len(new_items)
    except Exception as e:
        _log(f"IMPORT ERROR: {e}")
        return 0

# -----------------
# UI Widgets
# -----------------
H = W.HTML

banner = H(f"""
<div style='font-family:system-ui,Segoe UI,Arial; line-height:1.35; padding:12px; border:1px solid #e5e7eb; border-radius:12px;'>
  <div style='font-size:18px; font-weight:700;'>📂 Suggestion Pool (Interactive)</div>
  <div>Root: <code>{APP['ROOT']}</code> | Storage: <code>logs/suggestions/</code></div>
  <div>Flow: Add → Curate (★/▲/▼/Done) → Filter/Sort → Save/Export/Import</div>
</div>
""")

# Add form
txt_new = W.Textarea(value="", description="New", placeholder="Type a suggestion…", layout=W.Layout(width="55%"))
txt_tags = W.Text(value="", description="Tags", placeholder="comma,separated")
prio    = W.Dropdown(options=[("Low","low"),("Medium","medium"),("High","high")], value="medium", description="Priority")
btn_add = W.Button(description="Add", tooltip="Add suggestion")
btn_save= W.Button(description="Save Now", tooltip="Save to JSON")

# Filters
flt_text  = W.Text(value="", description="Filter")
flt_tags  = W.Text(value="", description="Tags")
flt_star  = W.ToggleButton(value=False, description="Starred only")
sort_by   = W.Dropdown(options=[("Score (desc)","score"),("Newest","newest"),("Oldest","oldest")], value="score", description="Sort")

# Actions
btn_export = W.Button(description="Export CSV")
uploader   = W.FileUpload(accept=".csv", multiple=False, description="Import CSV")
btn_clear  = W.Button(description="Clear All", tooltip="Remove all (asks to confirm)", button_style="warning")

# List container
list_box = W.VBox([])
out = W.Output(layout={"border":"1px solid #e5e7eb","padding":"8px","borderRadius":"8px"})

# -----------------
# Helpers (UI)
# -----------------
def _norm_tags(s: str) -> List[str]:
    return [t.strip() for t in (s or "").split(",") if t.strip()]

def add_item(text: str, tags: List[str], priority: str):
    text = (text or "").strip()
    if not text:
        with out: print("Nothing to add.")
        return
    item = {
        "id": f"sug-{_now_stamp()}",
        "text": text,
        "tags": tags,
        "priority": priority,
        "score": 0,
        "starred": False,
        "done": False,
        "created": _now_stamp(),
    }
    STATE["items"].append(item)
    save_data()
    refresh_list()

def delete_item(item_id: str):
    STATE["items"] = [it for it in STATE["items"] if it["id"] != item_id]
    save_data()
    refresh_list()

def update_item(item_id: str, **patch):
    for it in STATE["items"]:
        if it["id"] == item_id:
            it.update(patch)
            break
    save_data()
    refresh_list()

def _match_filter(it: Dict[str,Any]) -> bool:
    txt = flt_text.value.strip().lower()
    tgs = set(_norm_tags(flt_tags.value.lower()))
    if txt and (txt not in it["text"].lower()):
        return False
    if tgs and not tgs.issubset(set([t.lower() for t in it.get("tags",[])])):
        return False
    if flt_star.value and not it.get("starred"):
        return False
    return True

def _sort_key(it):
    if sort_by.value == "newest":
        return (-datetime.strptime(it["created"], "%Y-%m-%d_%H-%M-%S").timestamp(), it["text"])
    if sort_by.value == "oldest":
        return (datetime.strptime(it["created"], "%Y-%m-%d_%H-%M-%S").timestamp(), it["text"])
    # score desc default
    return (-int(it.get("score",0)), it["text"])

def _pill(text, color):
    return H(f"<span style='background:{color}1A;color:{color};padding:2px 8px;border-radius:999px;font-weight:600;margin-right:6px'>{text}</span>")

def row_widget(it: Dict[str,Any]):
    # Controls
    btn_up   = W.Button(description="▲", layout=W.Layout(width="34px"))
    btn_down = W.Button(description="▼", layout=W.Layout(width="34px"))
    btn_star = W.ToggleButton(value=bool(it.get("starred")), description="★")
    btn_done = W.ToggleButton(value=bool(it.get("done")), description="Done")
    btn_del  = W.Button(description="🗑", layout=W.Layout(width="40px"))
    score_lab= W.Label(value=str(it.get("score",0)))

    # Text + tags line
    tags_html = ""
    for t in it.get("tags", []):
        tags_html += f"<span style='background:#6366f11A;color:#6366f1;padding:0 6px;border-radius:999px;margin-left:4px'>{t}</span>"
    pri_color = {"low":"#16a34a","medium":"#ca8a04","high":"#dc2626"}.get(it.get("priority","medium"), "#ca8a04")
    text_html = f"<div><b>{it['text']}</b> {tags_html}</div><div style='opacity:.75'>priority: {_pill(it.get('priority','medium'), pri_color)._repr_html_()}</div>"
    if it.get("done"):
        text_html = f"<div style='opacity:.55;text-decoration:line-through'>{it['text']}</div>" + f"<div style='opacity:.6'>priority: {_pill(it.get('priority','medium'), pri_color)._repr_html_()}</div>"
    txt = H(text_html)

    # Handlers
    def up(_):
        it["score"] = int(it.get("score",0)) + 1
        save_data(); refresh_list()
    def down(_):
        it["score"] = int(it.get("score",0)) - 1
        save_data(); refresh_list()
    def star(_):
        it["starred"] = bool(btn_star.value)
        save_data(); refresh_list()
    def done(_):
        it["done"] = bool(btn_done.value)
        save_data(); refresh_list()
    def delete(_):
        delete_item(it["id"])

    btn_up.on_click(up)
    btn_down.on_click(down)
    btn_star.observe(star, names="value")
    btn_done.observe(done, names="value")
    btn_del.on_click(delete)

    left  = W.HBox([btn_up, score_lab, btn_down, btn_star, btn_done, btn_del])
    row = W.HBox([left, txt])
    return row

def refresh_list(*_):
    # build filtered/sorted view
    items = [it for it in STATE["items"] if _match_filter(it)]
    items.sort(key=_sort_key)
    # cap render for big lists
    rows = [row_widget(it) for it in items[:200]]
    list_box.children = tuple(rows)
    # status
    with out:
        out.clear_output()
        print(f"Showing {len(rows)} of {len(items)} filtered; total {len(STATE['items'])}. Saved: {SUG_JSON}")

# -----------------
# Bindings
# -----------------
def on_add(_):
    add_item(txt_new.value, _norm_tags(txt_tags.value), prio.value)
    txt_new.value = ""
    # keep tags/priority as-is for quick multiple adds

def on_save(_):
    save_data()
    refresh_list()

def on_export(_):
    export_csv()
    refresh_list()

def on_upload_change(change):
    if not uploader.value:
        return
    # read only the latest uploaded
    key = list(uploader.value.keys())[-1]
    file_bytes = uploader.value[key]["content"]
    n = import_csv(file_bytes)
    uploader.value.clear()  # reset
    with out:
        print(f"Import added {n} item(s).")

def on_clear(_):
    def really_clear(_x):
        STATE["items"].clear()
        save_data()
        refresh_list()
        dlg.close()
    dlg = W.Popup(
        title="Confirm Clear All",
        children=[W.HTML("<b>Delete ALL suggestions?</b>"), W.Button(description="Confirm", button_style="danger")],
    ) if hasattr(W, "Popup") else None
    if dlg:
        # wire the confirm button (2nd child)
        confirm_btn = dlg.children[1]
        confirm_btn.on_click(really_clear)
        display(dlg)
    else:
        # Fallback simple confirm
        with out:
            print("Clearing all suggestions…")
        STATE["items"].clear()
        save_data()
        refresh_list()

btn_add.on_click(on_add)
btn_save.on_click(on_save)
btn_export.on_click(on_export)
uploader.observe(on_upload_change, names="value")
btn_clear.on_click(on_clear)

for w in (flt_text, flt_tags, flt_star, sort_by):
    if hasattr(w, "observe"):
        w.observe(lambda *_: refresh_list(), names="value")

# -----------------
# Render
# -----------------
clear_output()
display(banner)
display(W.HBox([txt_new, W.VBox([txt_tags, prio, W.HBox([btn_add, btn_save])])]))
display(W.HBox([flt_text, flt_tags, flt_star, sort_by]))
display(W.HBox([btn_export, uploader, btn_clear]))
display(list_box)
display(out)

# Init
_ensure_dirs()
load_data()
refresh_list()


HTML(value="\n<div style='font-family:system-ui,Segoe UI,Arial; line-height:1.35; padding:12px; border:1px sol…

HBox(children=(Textarea(value='', description='New', layout=Layout(width='55%'), placeholder='Type a suggestio…

HBox(children=(Text(value='', description='Filter'), Text(value='', description='Tags'), ToggleButton(value=Fa…

HBox(children=(Button(description='Export CSV', style=ButtonStyle()), FileUpload(value={}, accept='.csv', desc…

VBox()

Output(layout=Layout(border='1px solid #e5e7eb', padding='8px'))