In [2]:
# --- bootstrap the import path so `src/...` works from notebooks/ ---
import sys
from pathlib import Path

ROOT = Path.cwd()
# If this notebook lives in <root>/notebooks, hop up to project root
if (ROOT / "notebooks").exists() and not (ROOT / "src").exists():
    ROOT = ROOT.parent
assert (ROOT / "src").exists(), f"Couldn't find 'src' at {ROOT}"
sys.path.insert(0, str(ROOT))

# --- normal imports start here ---
from datetime import datetime, timedelta, timezone
from collections import defaultdict
from pprint import pprint
import os

from dotenv import load_dotenv; load_dotenv()
import matplotlib.pyplot as plt

from src.common.db import get_logs_collection

WINDOW_MIN = 60  # last N minutes to visualize
coll = get_logs_collection()
print("Connected to:", coll.full_name)


AssertionError: Couldn't find 'src' at /home/mh13/Codes/maveric-mcp-mini/notebooks

In [None]:
def fetch_docs(minutes=WINDOW_MIN):
    since = datetime.now(timezone.utc) - timedelta(minutes=minutes)
    # Ascending for time-bucketing
    return list(coll.find({"ts": {"$gte": since}}).sort("ts", 1))

def bucket_per_minute(docs):
    buckets = defaultdict(lambda: {"ON": 0, "OFF": 0})
    for d in docs:
        ts = d["ts"]
        if ts.tzinfo is None:
            ts = ts.replace(tzinfo=timezone.utc)
        minute_ts = ts.replace(second=0, microsecond=0)
        buckets[minute_ts][d["status"]] += 1
    times = sorted(buckets.keys())
    on_counts  = [buckets[t]["ON"] for t in times]
    off_counts = [buckets[t]["OFF"] for t in times]
    return times, on_counts, off_counts

def group_by_cell(docs):
    by_cell = defaultdict(list)
    for d in docs:
        by_cell[int(d["cell_id"])].append(d)
    # ensure sorted
    for cid in by_cell:
        by_cell[cid].sort(key=lambda x: x["ts"])
    return dict(sorted(by_cell.items()))


In [None]:
docs = fetch_docs()
print(f"Fetched {len(docs)} docs from the last {WINDOW_MIN} min")
for d in docs[:3]:
    d = {**d, "_id": str(d["_id"])}
    pprint(d)

print("\nIndexes on cell_logs:")
for idx in coll.list_indexes():
    pprint(idx.document)


In [None]:
times, on_counts, off_counts = bucket_per_minute(docs)
if not times:
    print("No data in the selected window. Run the generator and re-run this cell.")
else:
    plt.figure()
    plt.plot(times, on_counts, label="ON")
    plt.plot(times, off_counts, label="OFF")
    plt.title(f"Cell events per minute (last {WINDOW_MIN} min)")
    plt.xlabel("Time")
    plt.ylabel("Events")
    plt.legend()
    plt.gcf().autofmt_xdate()
    plt.tight_layout()
    plt.show()


In [None]:
# Build last-known status per (cell, minute)
from math import ceil

by_cell = group_by_cell(docs)
if not by_cell:
    print("No data in the selected window.")
else:
    # All minute buckets present in the window
    start = (datetime.now(timezone.utc) - timedelta(minutes=WINDOW_MIN)).replace(second=0, microsecond=0)
    end   = datetime.now(timezone.utc).replace(second=0, microsecond=0)
    all_minutes = []
    t = start
    while t <= end:
        all_minutes.append(t)
        t += timedelta(minutes=1)

    # For each cell, compute last status seen in each minute
    matrix = []
    cell_ids = sorted(by_cell.keys())
    for cid in cell_ids:
        row = []
        # walk through logs once per cell
        logs = by_cell[cid]
        j = 0
        last_status = "OFF"
        for m in all_minutes:
            # advance through logs up to this minute
            while j < len(logs) and logs[j]["ts"].replace(second=0, microsecond=0, tzinfo=timezone.utc) <= m:
                last_status = logs[j]["status"]
                j += 1
            row.append(1 if last_status == "ON" else 0)
        matrix.append(row)

    plt.figure()
    im = plt.imshow(matrix, aspect="auto", interpolation="nearest")
    plt.title(f"Cell status by minute (1=ON, 0=OFF) — last {WINDOW_MIN} min")
    plt.xlabel("Time (minute buckets)")
    plt.ylabel("Cell ID")
    # Y ticks as cell IDs
    plt.yticks(range(len(cell_ids)), cell_ids)
    # X ticks: at most ~10 labels
    max_ticks = 10
    step = max(1, len(all_minutes)//max_ticks)
    tick_positions = list(range(0, len(all_minutes), step))
    tick_labels = [all_minutes[i].strftime("%H:%M") for i in tick_positions]
    plt.xticks(tick_positions, tick_labels, rotation=45, ha="right")
    cbar = plt.colorbar(im)
    cbar.set_ticks([0, 1]); cbar.set_ticklabels(["OFF", "ON"])
    plt.tight_layout(); plt.show()


In [None]:
if not by_cell:
    print("No data in the selected window.")
else:
    uptimes = []
    for cid, logs in by_cell.items():
        on = sum(1 for d in logs if d["status"] == "ON")
        off = sum(1 for d in logs if d["status"] == "OFF")
        total = on + off or 1
        uptimes.append((cid, 100.0 * on / total))
    uptimes.sort(key=lambda x: x[0])

    plt.figure()
    xs = [cid for cid, _ in uptimes]
    ys = [u for _, u in uptimes]
    plt.bar(xs, ys)
    plt.title(f"Uptime (%) per cell — last {WINDOW_MIN} min")
    plt.xlabel("Cell ID"); plt.ylabel("Uptime %")
    plt.ylim(0, 100)
    plt.tight_layout(); plt.show()


In [None]:
def count_flips(sorted_logs):
    if not sorted_logs: return 0
    flips = 0
    prev = sorted_logs[0]["status"]
    for d in sorted_logs[1:]:
        if d["status"] != prev:
            flips += 1
            prev = d["status"]
    return flips

if not by_cell:
    print("No data in the selected window.")
else:
    flip_counts = [(cid, count_flips(logs)) for cid, logs in by_cell.items()]
    flip_counts.sort(key=lambda x: x[1], reverse=True)

    plt.figure()
    xs = [cid for cid,_ in flip_counts]
    ys = [fc for _,fc in flip_counts]
    plt.bar(xs, ys)
    plt.title(f"Flip count per cell — last {WINDOW_MIN} min")
    plt.xlabel("Cell ID"); plt.ylabel("# flips")
    plt.tight_layout(); plt.show()


In [None]:
def longest_off_streak_seconds(sorted_logs):
    if not sorted_logs: return 0.0
    longest = 0.0
    cur_start = None
    cur_status = None
    # Walk edges between logs to measure OFF durations
    for i, d in enumerate(sorted_logs):
        ts = d["ts"].astimezone(timezone.utc)
        s  = d["status"]
        if cur_status is None:
            cur_status = s
            if s == "OFF":
                cur_start = ts
        else:
            # status transition at this timestamp
            if cur_status == "OFF" and s == "ON" and cur_start is not None:
                longest = max(longest, (ts - cur_start).total_seconds())
                cur_start = None
            if cur_status != s and s == "OFF":
                cur_start = ts
            cur_status = s
    # if ends OFF, measure up to now
    if cur_status == "OFF" and cur_start is not None:
        longest = max(longest, (datetime.now(timezone.utc) - cur_start).total_seconds())
    return longest

if not by_cell:
    print("No data in the selected window.")
else:
    streaks = [(cid, int(longest_off_streak_seconds(logs))) for cid, logs in by_cell.items()]
    streaks.sort(key=lambda x: x[1], reverse=True)
    print("Top 5 longest OFF streaks (sec):")
    for cid, secs in streaks[:5]:
        print(f"  cell {cid}: {secs}s")

    plt.figure()
    xs = [cid for cid,_ in streaks]
    ys = [secs for _,secs in streaks]
    plt.bar(xs, ys)
    plt.title(f"Longest OFF streak (sec) — last {WINDOW_MIN} min")
    plt.xlabel("Cell ID"); plt.ylabel("Seconds")
    plt.tight_layout(); plt.show()


In [None]:
times, _, off_counts = bucket_per_minute(docs)
if not times:
    print("No data in the selected window.")
else:
    import statistics as stats
    mu = stats.mean(off_counts)
    sigma = stats.pstdev(off_counts) or 1.0
    zs = [(c - mu)/sigma for c in off_counts]
    # anomalies where z > 2.5
    anomalies = [(t, c, z) for t,c,z in zip(times, off_counts, zs) if z > 2.5]
    print("Anomalous OFF spikes:", [(t.strftime("%H:%M"), c) for t,c,_ in anomalies])

    plt.figure()
    plt.plot(times, off_counts, label="OFF/min")
    for t, c, z in anomalies:
        plt.scatter([t], [c], marker="x")
    plt.title(f"OFF events/min with anomaly markers — last {WINDOW_MIN} min")
    plt.xlabel("Time"); plt.ylabel("OFF count")
    plt.gcf().autofmt_xdate()
    plt.tight_layout(); plt.show()


In [None]:
import time
from IPython.display import clear_output, display

def tail(seconds=20, refresh=2):
    loops = seconds // refresh
    for i in range(int(loops)):
        clear_output(wait=True)
        d = fetch_docs(minutes=5)
        times, on_c, off_c = bucket_per_minute(d)
        print(f"Live tail: last 5 minutes (refresh {refresh}s) — {len(d)} docs")
        if times:
            plt.figure()
            plt.plot(times, on_c, label="ON")
            plt.plot(times, off_c, label="OFF")
            plt.title("Live ON/OFF per minute")
            plt.legend(); plt.gcf().autofmt_xdate(); plt.tight_layout()
            display(plt.gcf()); plt.close()
        time.sleep(refresh)

# tail()  # uncomment to run
