In [None]:
%load_ext autoreload
%autoreload 2
# Do not run this cell unless running the code on a local notebook. This line is for auto-reloading the code in the notebook whenever you make changes to the code files. 
# If you're running this code in a Jupyter notebook/ Google Colab, you can uncomment the line to enable auto-reloading of your code files.

In [None]:
%pip install  torch torchvision torchaudio --index-url https://download.pytorch.org/whl/cu121
# Use it only if you are sure GPU is available and compatible with CUDA 12.1+. Otherwise, you can install the CPU version or the appropriate CUDA version for your system.

In [1]:
# -----------------------------
# Environment configuration
# -----------------------------
gpu_available = True   # <-- change to True to enable GPU scheduling

# What to do with GPU tasks when gpu_available=False:
DOWNGRADE_GPU_TASKS_TO_CPU = True   # True = run them on CPU, False = reject them

# For clearer scheduling behavior in demos, cap CPU cores (optional)
CPU_CORES_CAP: int = 8                      # set None or large number to remove cap

# Run settings
DEFAULT_RUN_SECONDS: float = 3.0
SENSOR_PERIOD_S: float = 0.12               # sensor event rate (larger => fewer logs)
HEALTH_PERIOD_S: float = 1.0                # health monitor interval


In [2]:
# ============================
# Task 3: Defense AI System Simulation (CPU-only + logical GPUs)
# Multi-agent + Orchestrated pipelines + Priority scheduling + Fault tolerance
# ============================

import asyncio
import heapq
import os
import random
import sys
import time
from dataclasses import dataclass, field
from datetime import datetime
from typing import Any, Callable, Dict, List, Optional




In [3]:
# -------------------------------------------------------------------
# Logging to read later (writes to file AND notebook output)
# -------------------------------------------------------------------
class Tee:
    def __init__(self, *streams):
        self.streams = streams
    def write(self, data):
        for s in self.streams:
            s.write(data)
            s.flush()
    def flush(self):
        for s in self.streams:
            s.flush()

class LogContext:
    def __init__(self, log_file: str, fh, orig_out, orig_err):
        self.log_file = log_file
        self._fh = fh
        self._orig_out = orig_out
        self._orig_err = orig_err
    def close(self):
        # restore stdio
        sys.stdout = self._orig_out
        sys.stderr = self._orig_err
        try:
            self._fh.flush()
        finally:
            self._fh.close()

def log_writer() -> LogContext:
    log_dir = "logs"
    os.makedirs(log_dir, exist_ok=True)
    ts = datetime.now().strftime("%Y%m%d_%H%M%S")
    log_file = os.path.join(log_dir, f"task3_{ts}.log")

    if os.path.exists(log_file):
        os.remove(log_file)

    log_fh = open(log_file, "w", encoding="utf-8")

    # Keep originals so Jupyter still displays output
    original_stdout = sys.stdout
    original_stderr = sys.stderr

    sys.stdout = Tee(original_stdout, log_fh)
    sys.stderr = Tee(original_stderr, log_fh)

    print("=== Logging Enabled ===")
    print("Log file:", log_file)
    print("=======================\n")

    return LogContext(log_file, log_fh, original_stdout, original_stderr)

def log(msg: str):
    # Single place to emit output so it always hits both notebook and file (via Tee)
    print(msg)


In [4]:
# -------------------------------------------------------------------
# 2) LOGICAL RESOURCES
# -------------------------------------------------------------------
@dataclass
class LogicalCluster:
    cpu_cores: int
    gpus: int
    cpu_sem: asyncio.Semaphore = field(init=False)
    gpu_sem: asyncio.Semaphore = field(init=False)

    def __post_init__(self):
        self.cpu_sem = asyncio.Semaphore(self.cpu_cores)
        self.gpu_sem = asyncio.Semaphore(self.gpus)

async def acquire_n(sem: asyncio.Semaphore, n: int):
    for _ in range(max(0, n)):
        await sem.acquire()

def release_n(sem: asyncio.Semaphore, n: int):
    for _ in range(max(0, n)):
        sem.release()


In [5]:
# -------------------------------------------------------------------
# 3) EVENTS + BUS (Resource Allocation and Shutdown Function)
# -------------------------------------------------------------------
@dataclass
class SensorEvent:
    source: str
    ts: float
    data: Dict[str, Any]

STOP_EVENT = SensorEvent(source="__STOP__", ts=0.0, data={"stop": True})

class EventBus:
    def __init__(self):
        self.q: asyncio.Queue[SensorEvent] = asyncio.Queue()

    async def publish(self, ev: SensorEvent):
        await self.q.put(ev)

    async def subscribe(self) -> SensorEvent:
        ev = await self.q.get()
        # important: prevents join-style hangs if you ever add them later
        self.q.task_done()
        return ev


In [6]:
# -------------------------------------------------------------------
# 4) TASK + DLQ (Task Handler and Dead Letter Queue)
# -------------------------------------------------------------------
@dataclass(order=True)
class ScheduledItem:
    priority: int
    created_at: float
    seq: int
    task: "Task" = field(compare=False)

@dataclass
class Task:
    task_id: str
    agent: str
    kind: str
    cpu_cores: int
    gpus: int
    est_s: float
    priority: int                   # lower = higher priority
    payload: Dict[str, Any] = field(default_factory=dict)

    max_retries: int = 2
    retry_backoff_s: float = 0.3
    fail_prob: float = 0.05
    submitted_at: float = field(default_factory=time.time)
    attempt: int = 0

@dataclass
class DeadLetter:
    task: Task
    error: str
    when: float = field(default_factory=time.time)




In [7]:
# -------------------------------------------------------------------
# 5) MULTI-AGENT SIMULATIONS ALLOCATION
# -------------------------------------------------------------------
def ingest_agent(task: Task):
    task.payload["ingested"] = True

def preprocess_agent(task: Task):
    task.payload["preprocessed"] = True

def fusion_agent(task: Task):
    task.payload["fused"] = True

def threat_detect_agent(task: Task):
    base = task.payload.get("signal_strength", 0.5)
    if task.payload.get("degraded"):
        base *= 0.9
    task.payload["threat_score"] = min(1.0, max(0.0, base + random.uniform(-0.1, 0.2)))

def decision_agent(task: Task):
    score = task.payload.get("threat_score", 0.0)
    task.payload["decision"] = "INTERCEPT" if score >= 0.75 else ("MONITOR" if score >= 0.5 else "IGNORE")

def comms_agent(task: Task):
    # In real systems: publish to C2, alerts, logs
    pass

def batch_analytics_agent(task: Task):
    task.payload["batch_report"] = "ok"

def model_update_agent(task: Task):
    task.payload["model_update"] = "staged"

AGENTS: Dict[str, Callable[[Task], Any]] = {
    "ingest": ingest_agent,
    "preprocess": preprocess_agent,
    "fusion": fusion_agent,
    "threat": threat_detect_agent,
    "decision": decision_agent,
    "comms": comms_agent,
    "batch": batch_analytics_agent,
    "model_update": model_update_agent,
}




In [8]:
# -------------------------------------------------------------------
# 6) PRIORITY SCHEDULER FOR RT(Real Time) + BATCH TASKS
# -------------------------------------------------------------------
class PriorityScheduler:
    def __init__(self, cluster: LogicalCluster):
        self.cluster = cluster
        self._rt_heap: List[ScheduledItem] = []
        self._batch_heap: List[ScheduledItem] = []
        self._seq = 0
        self.running: List[asyncio.Task] = []
        self.dead_letter: List[DeadLetter] = []
        self.stats = {
            "submitted": 0,
            "started": 0,
            "succeeded": 0,
            "failed": 0,
            "retried": 0,
            "dlq": 0,
        }

    def submit(self, task: Task):
        # Apply GPU availability policy
        if not gpu_available and task.gpus > 0:
            if DOWNGRADE_GPU_TASKS_TO_CPU:
                task.gpus = 0
                task.payload["degraded"] = True
            else:
                self.dead_letter.append(DeadLetter(task, "GPU not available (policy reject)"))
                self.stats["dlq"] += 1
                return

        self._seq += 1
        item = ScheduledItem(task.priority, time.time(), self._seq, task)
        if task.priority <= 4:
            heapq.heappush(self._rt_heap, item)
        else:
            heapq.heappush(self._batch_heap, item)
        self.stats["submitted"] += 1

    def _pick_next(self) -> Optional[Task]:
        if self._rt_heap:
            return heapq.heappop(self._rt_heap).task
        if self._batch_heap:
            return heapq.heappop(self._batch_heap).task
        return None

    def _cleanup_running(self):
        self.running = [t for t in self.running if not t.done()]

    async def _execute(self, task: Task, t0: float, stop_event: asyncio.Event):
        # Admission control
        if task.cpu_cores > self.cluster.cpu_cores or task.gpus > self.cluster.gpus:
            self.dead_letter.append(DeadLetter(task, "request exceeds cluster capacity"))
            self.stats["dlq"] += 1
            return

        # Acquire resources
        await acquire_n(self.cluster.cpu_sem, task.cpu_cores)
        if task.gpus > 0:
            await acquire_n(self.cluster.gpu_sem, task.gpus)

        place = "GPU" if task.gpus > 0 else "CPU"
        wait_s = time.time() - task.submitted_at
        degraded = " degraded" if task.payload.get("degraded") else ""

        need_retry = False
        retry_delay = 0.0
        err: Optional[str] = None

        try:
            self.stats["started"] += 1
            log(f"[{time.time()-t0:6.2f}s] START {task.task_id:16} "
                f"agent={task.agent:11} kind={task.kind:12} on={place}{degraded} "
                f"wait={wait_s:5.2f}s req(cpu={task.cpu_cores}, gpu={task.gpus}) attempt={task.attempt}")

            await asyncio.sleep(task.est_s)

            if random.random() < task.fail_prob:
                raise RuntimeError("simulated transient failure")

            fn = AGENTS.get(task.agent, lambda _t: None)
            fn(task)

            self.stats["succeeded"] += 1
            log(f"[{time.time()-t0:6.2f}s] DONE  {task.task_id:16} agent={task.agent:11} on={place}")

        except Exception as e:
            self.stats["failed"] += 1
            err = f"{type(e).__name__}: {e}"
            log(f"[{time.time()-t0:6.2f}s] FAIL  {task.task_id:16} agent={task.agent:11} err={err}")

            # Decide retry *now*, but perform the sleep/submission only AFTER releasing resources.
            if (task.attempt < task.max_retries) and (not stop_event.is_set()):
                task.attempt += 1
                self.stats["retried"] += 1
                need_retry = True
                retry_delay = task.retry_backoff_s * task.attempt
            else:
                self.dead_letter.append(DeadLetter(task, err))
                self.stats["dlq"] += 1

        finally:
            # Always release resources immediately
            if task.gpus > 0:
                release_n(self.cluster.gpu_sem, task.gpus)
            release_n(self.cluster.cpu_sem, task.cpu_cores)

        # Retry happens here (resources already released)
        if need_retry and (not stop_event.is_set()):
            await asyncio.sleep(retry_delay)
            # re-submit with updated submitted_at so wait times make sense
            task.submitted_at = time.time()
            self.submit(task)

    async def run_loop(self, t0: float, stop_event: asyncio.Event):
        while not stop_event.is_set():
            self._cleanup_running()
            task = self._pick_next()
            if task is None:
                await asyncio.sleep(0.02)
                continue
            self.running.append(asyncio.create_task(self._execute(task, t0, stop_event)))

        # On stop: cancel outstanding *queued* work by clearing heaps
        self._rt_heap.clear()
        self._batch_heap.clear()

        # Drain running tasks, but don't hang forever
        deadline = time.time() + 2.0
        while self.running and time.time() < deadline:
            self._cleanup_running()
            await asyncio.sleep(0.05)

        # If anything still running, cancel it
        for t in self.running:
            if not t.done():
                t.cancel()
        await asyncio.gather(*self.running, return_exceptions=True)
        self.running.clear()


In [9]:
# -------------------------------------------------------------------
# 7) PIPELINE ORCHESTRATOR (RT + batch)
# -------------------------------------------------------------------
class PipelineOrchestrator:
    def __init__(self, scheduler: PriorityScheduler, bus: EventBus):
        self.scheduler = scheduler
        self.bus = bus
        self.counter = 0

    def _id(self, prefix: str) -> str:
        self.counter += 1
        return f"{prefix}_{self.counter:04d}"

    def submit_realtime_pipeline(self, ev: SensorEvent):
        mission_id = self._id("RT")
        payload = {
            "mission_id": mission_id,
            "source": ev.source,
            "signal_strength": ev.data.get("signal_strength", 0.5),
            "ts": ev.ts,
        }

        tasks = [
            Task(f"{mission_id}_ingest",   "ingest",     "ingest",       1, 0, 0.10, 0, payload, fail_prob=0.02),
            Task(f"{mission_id}_pre",      "preprocess", "preprocess",   1, 0, 0.12, 0, payload, fail_prob=0.03),
            Task(f"{mission_id}_fusion",   "fusion",     "fusion",       1, 0, 0.15, 0, payload, fail_prob=0.03),
            Task(f"{mission_id}_threat",   "threat",     "threat_detect",2, 1, 0.25, 0, payload, fail_prob=0.06),
            Task(f"{mission_id}_decision", "decision",   "decision",     1, 0, 0.08, 0, payload, fail_prob=0.02),
            Task(f"{mission_id}_comms",    "comms",      "comms",        1, 0, 0.06, 0, payload, fail_prob=0.01),
        ]
        for t in tasks:
            self.scheduler.submit(t)

    def submit_batch_pipeline(self):
        batch_id = self._id("BATCH")
        payload = {"batch_id": batch_id, "ts": time.time()}

        tasks = [
            Task(f"{batch_id}_analytics", "batch",       "batch_analytics", 4, 0, 1.20, 10, payload, fail_prob=0.08),
            Task(f"{batch_id}_model_upd", "model_update","model_update",    2, 1, 1.00, 10, payload, fail_prob=0.10),
        ]
        for t in tasks:
            self.scheduler.submit(t)

    async def event_listener(self, stop_event: asyncio.Event):
        # Clean shutdown: listener exits when it receives STOP_EVENT
        while True:
            ev = await self.bus.subscribe()
            if ev.data.get("stop"):
                break
            self.submit_realtime_pipeline(ev)




In [10]:
# -------------------------------------------------------------------
# 8) MONITORS + SIMULATORS
# -------------------------------------------------------------------
async def health_monitor(scheduler: PriorityScheduler, t0: float, stop_event: asyncio.Event):
    while not stop_event.is_set():
        await asyncio.sleep(HEALTH_PERIOD_S)
        s = scheduler.stats
        log(f"[{time.time()-t0:6.2f}s] HEALTH sub={s['submitted']} start={s['started']} "
            f"ok={s['succeeded']} fail={s['failed']} retry={s['retried']} dlq={s['dlq']} "
            f"dlq_items={len(scheduler.dead_letter)}")

async def sensor_simulator(bus: EventBus, stop_event: asyncio.Event):
    sources = ["radar", "eo_ir", "telemetry"]
    while not stop_event.is_set():
        src = random.choice(sources)
        ev = SensorEvent(source=src, ts=time.time(), data={"signal_strength": random.random()})
        await bus.publish(ev)
        await asyncio.sleep(SENSOR_PERIOD_S)

async def batch_submitter(orch: PipelineOrchestrator, stop_event: asyncio.Event):
    while not stop_event.is_set():
        await asyncio.sleep(2.0)
        orch.submit_batch_pipeline()


In [11]:
# -------------------------------------------------------------------
# 9) MAIN ENTRY POINT
# -------------------------------------------------------------------
async def run_sim(run_seconds: float = DEFAULT_RUN_SECONDS,
                  enable_logging: bool = True) -> Dict[str, Any]:
    """
    Run the simulation for run_seconds and return summary dict.
    Enforces a hard time bound: will stop & shut down within run_seconds (+ small grace).
    """
    log_ctx = log_writer() if enable_logging else None

    t0 = time.time()
    random.seed(7)

    cpu_cores = os.cpu_count() or 1
    if CPU_CORES_CAP is not None:
        cpu_cores = min(cpu_cores, CPU_CORES_CAP)

    gpus_used = 1 if gpu_available else 0

    log("=== On-Prem Logical Deployment ===")
    log(f"CPU logical cores (used): {cpu_cores}")
    log(f"gpu_available flag: {gpu_available}")
    log(f"DOWNGRADE_GPU_TASKS_TO_CPU: {DOWNGRADE_GPU_TASKS_TO_CPU}")
    log(f"Logical GPU tokens: {gpus_used}")
    if log_ctx:
        log(f"Log file: {log_ctx.log_file}")
    log("")

    cluster = LogicalCluster(cpu_cores=cpu_cores, gpus=gpus_used)
    scheduler = PriorityScheduler(cluster)
    bus = EventBus()
    orch = PipelineOrchestrator(scheduler, bus)
    stop_event = asyncio.Event()

    bg_tasks = [
        asyncio.create_task(sensor_simulator(bus, stop_event)),
        asyncio.create_task(orch.event_listener(stop_event)),
        asyncio.create_task(scheduler.run_loop(t0, stop_event)),
        asyncio.create_task(health_monitor(scheduler, t0, stop_event)),
        asyncio.create_task(batch_submitter(orch, stop_event)),
    ]

    async def _run_window():
        await asyncio.sleep(run_seconds)

    # Hard time bound + small grace for shutdown
    try:
        await asyncio.wait_for(_run_window(), timeout=run_seconds + 0.1)
    finally:
        stop_event.set()
        # Wake event listener so it can exit immediately
        await bus.publish(STOP_EVENT)

        # Give background tasks a small grace period to exit cleanly
        try:
            await asyncio.wait_for(asyncio.gather(*bg_tasks, return_exceptions=True), timeout=2.5)
        except asyncio.TimeoutError:
            # Cancel anything still alive
            for t in bg_tasks:
                if not t.done():
                    t.cancel()
            await asyncio.gather(*bg_tasks, return_exceptions=True)

    log("\n=== FINAL STATS ===")
    log(str(scheduler.stats))
    log(f"DLQ items: {len(scheduler.dead_letter)}")
    if scheduler.dead_letter:
        dlq0 = scheduler.dead_letter[0]
        log(f"Sample DLQ: {dlq0.task.task_id} | {dlq0.error}")
    if log_ctx:
        log(f"\nLog saved to: {log_ctx.log_file}")
        # restore stdout/stderr at the end
        log_ctx.close()

    return {
        "stats": scheduler.stats,
        "dlq_count": len(scheduler.dead_letter),
        "log_file": (log_ctx.log_file if log_ctx else None),
    }


In [12]:
# -------------------------------------------------------------------
# 10) Run Script
# -------------------------------------------------------------------
def run_sim_script():
    asyncio.run(run_sim(DEFAULT_RUN_SECONDS, enable_logging=True))
await run_sim(30, enable_logging=True)


=== Logging Enabled ===
Log file: logs/task3_20260204_230517.log

=== On-Prem Logical Deployment ===
CPU logical cores (used): 8
gpu_available flag: True
DOWNGRADE_GPU_TASKS_TO_CPU: True
Logical GPU tokens: 1
Log file: logs/task3_20260204_230517.log

[  0.01s] START RT_0001_ingest   agent=ingest      kind=ingest       on=CPU wait= 0.00s req(cpu=1, gpu=0) attempt=0
[  0.01s] START RT_0001_pre      agent=preprocess  kind=preprocess   on=CPU wait= 0.00s req(cpu=1, gpu=0) attempt=0
[  0.01s] START RT_0001_fusion   agent=fusion      kind=fusion       on=CPU wait= 0.00s req(cpu=1, gpu=0) attempt=0
[  0.01s] START RT_0001_threat   agent=threat      kind=threat_detect on=GPU wait= 0.00s req(cpu=2, gpu=1) attempt=0
[  0.01s] START RT_0001_decision agent=decision    kind=decision     on=CPU wait= 0.01s req(cpu=1, gpu=0) attempt=0
[  0.02s] START RT_0001_comms    agent=comms       kind=comms        on=CPU wait= 0.01s req(cpu=1, gpu=0) attempt=0
[  0.08s] DONE  RT_0001_comms    agent=comms       o

{'stats': {'submitted': 1516,
  'started': 97,
  'succeeded': 93,
  'failed': 4,
  'retried': 4,
  'dlq': 0},
 'dlq_count': 0,
 'log_file': 'logs/task3_20260204_230517.log'}