<a href="https://colab.research.google.com/github/techmehedi/CSC36000-Project-Mehedi-Aidan/blob/main/demo.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [4]:
# Imports + Task model

import numpy as np
import random
from dataclasses import dataclass, field
from typing import List, Dict, Callable

# Fix seeds so runs are reproducible in class
random.seed(42)
np.random.seed(42)

@dataclass
class Task:
    """
    One learning job:
      - arrival, deadline: time window
      - eps: target loss to "finish"
      - true_a, true_b: hidden loss curve params (only used to simulate reality)
      - max_rate: max samples this task can process per time unit
      - progress: how many samples it has processed so far
      - history_s, history_l: what the scheduler has observed for curve fitting
    """
    tid: int
    arrival: int
    deadline: int
    eps: float
    true_a: float
    true_b: float
    max_rate: float
    progress: float = 0.0
    done: bool = False
    missed: bool = False
    history_s: List[float] = field(default_factory=list)
    history_l: List[float] = field(default_factory=list)

    def true_loss(self) -> float:
        """
        Simulated ground truth loss at current progress.
        The scheduler does not see the formula. It only sees noisy observations.
        """
        s = max(self.progress, 1.0)
        noise = np.random.normal(loc=0.0, scale=0.02)
        return max(self.true_a * (s ** (-self.true_b)) + noise, 1e-4)

    def observe(self):
        """
        One observation step:
          - query the current loss
          - store (samples_so_far, loss) into history for curve fitting
        """
        loss = self.true_loss()
        self.history_s.append(max(self.progress, 1.0))
        self.history_l.append(loss)
        return loss


In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [5]:
# Creates a small task bundle

def create_tasks(num_tasks: int = 6) -> List[Task]:
    """
    Create a small set of synthetic tasks.
    Each task:
      - has its own loss curve difficulty (a, b)
      - has a deadline and a target loss eps
      - has a max_rate which caps how fast it can consume samples
    """
    tasks = []
    t0 = 0
    for i in range(num_tasks):
        # Hidden loss curve parameters for the simulator
        a = np.random.uniform(0.8, 1.5)
        b = np.random.uniform(0.2, 0.7)

        # Different deadlines and difficulty levels
        deadline = t0 + np.random.randint(20, 45)
        eps = np.random.uniform(0.12, 0.22)

        # At most this many samples per time step for this task
        max_rate = np.random.uniform(30, 60)

        task = Task(
            tid=i,
            arrival=t0,
            deadline=deadline,
            eps=eps,
            true_a=a,
            true_b=b,
            max_rate=max_rate,
        )
        tasks.append(task)
    return tasks


In [6]:
# Progress prediction with curve fitting

def estimate_remaining_samples(task: Task, gamma: float = 0.9) -> float:
    """
    Estimate how many more samples this task needs to reach its target loss.
    Idea:
      - Fit log(loss) = theta0 * log(s) + theta1 with weighted least squares
      - This corresponds to loss = a * s^{-b}
      - Then solve for s such that loss(s) = eps
    """
    s_hist = np.array(task.history_s, dtype=float)
    l_hist = np.array(task.history_l, dtype=float)

    # If we do not have enough history, return a very large number
    if len(s_hist) < 5:
        return 1e9

    # Filter out non positive values to avoid log issues
    mask = (s_hist > 0) & (l_hist > 0)
    s_hist = s_hist[mask]
    l_hist = l_hist[mask]
    if len(s_hist) < 5:
        return 1e9

    x = np.log(s_hist)
    y = np.log(l_hist)

    n = len(x)
    # More weight on recent observations
    weights = gamma ** (np.arange(n)[::-1])
    W = np.diag(weights)

    X = np.vstack([x, np.ones_like(x)]).T
    try:
        XtWX = X.T @ W @ X
        XtWy = X.T @ W @ y
        theta = np.linalg.solve(XtWX, XtWy)

        b_hat = -theta[0]
        a_hat = np.exp(theta[1])

        # If we get a weird slope, fall back to "very hard"
        if b_hat <= 0:
            return 1e9

        # a_hat * s^{-b_hat} = eps  =>  s = (a_hat / eps)^{1 / b_hat}
        s_target = (a_hat / task.eps) ** (1.0 / b_hat)
        remaining = max(s_target - task.progress, 0.0)

        if np.isnan(remaining) or remaining < 0:
            return 1e9

        return float(remaining)

    except np.linalg.LinAlgError:
        # If the system is ill conditioned, treat as very large remaining
        return 1e9


In [7]:
# Three schedulers

TOTAL_SERVER_POWER = 120.0   # Total samples per time unit across all tasks
EXPLORATION_SAMPLES = 300.0  # Per task exploration budget for LARA style

def uniform_scheduler(t, tasks, active_ids):
    """
    Simple baseline:
      - Split server power equally across all active tasks.
    """
    if not active_ids:
        return {}
    frac = 1.0 / len(active_ids)
    return {tid: frac for tid in active_ids}

def shortest_deadline_first_scheduler(t, tasks, active_ids):
    """
    Greedy time based baseline:
      - Always give all power to the task whose deadline is closest.
    """
    if not active_ids:
        return {}
    best = min(active_ids, key=lambda i: tasks[i].deadline)
    return {best: 1.0}

def lara_style_scheduler(t, tasks, active_ids):
    """
    LARA style scheduler:
      - Phase 1: Exploration
        Give equal power to tasks that have not seen enough samples yet.
      - Phase 2: Exploitation
        Use remaining sample estimates and deadlines to decide how much
        rate fraction each task needs to finish on time and pack them
        greedily until server power is full.
    """
    if not active_ids:
        return {}

    # Exploration phase: make sure each task gets some initial budget
    explore_ids = []
    for tid in active_ids:
        task = tasks[tid]
        if task.progress < EXPLORATION_SAMPLES:
            explore_ids.append(tid)

    if explore_ids:
        frac = 1.0 / len(explore_ids)
        return {tid: frac for tid in explore_ids}

    # After exploration, use predicted remaining samples and deadlines
    req_rates = []
    for tid in active_ids:
        task = tasks[tid]
        remaining = estimate_remaining_samples(task)
        remaining_time = max(task.deadline - t + 1, 1)
        # Required fraction of total server power to finish on time
        req_rate = remaining / (TOTAL_SERVER_POWER * remaining_time)
        req_rates.append((tid, req_rate))

    # Drop impossible or invalid entries
    req_rates = [(tid, r) for tid, r in req_rates if r > 0]

    if not req_rates:
        return {}

    # Smaller required rate means "easier to finish"
    req_rates.sort(key=lambda x: x[1])

    chosen = []
    used = 0.0
    for tid, r in req_rates:
        if used + r <= 1.0:
            chosen.append((tid, r))
            used += r
        else:
            break

    alloc = {}
    for tid, r in chosen:
        alloc[tid] = r

    # If we have extra capacity, spread the remainder across chosen tasks
    if chosen and used < 1.0:
        extra = (1.0 - used) / len(chosen)
        for tid, _ in chosen:
            alloc[tid] += extra

    return alloc


In [8]:
# Simulation loop

def run_simulation(strategy_name: str,
                   scheduler: Callable,
                   horizon: int = 80,
                   verbose: bool = True):
    """
    Main simulation loop:
      - Create tasks
      - For each time step:
          * find active tasks
          * mark missed deadlines
          * observe losses for active tasks
          * ask scheduler for allocation fractions
          * advance progress and mark tasks as completed if loss <= eps
      - Report which tasks finished before their deadline
    """
    tasks = create_tasks()
    num_tasks = len(tasks)

    completed = []
    missed = []

    if verbose:
        print(f"\n=== Strategy: {strategy_name} ===")

    for t in range(horizon):
        # Tasks that are eligible at this time and not finished
        active_ids = [
            task.tid for task in tasks
            if (task.arrival <= t <= task.deadline)
            and (not task.done)
            and (not task.missed)
        ]

        # Mark tasks that have passed their deadline
        for task in tasks:
            if not task.done and not task.missed and t > task.deadline:
                task.missed = True
                missed.append(task.tid)
                if verbose:
                    print(f"t={t:2d}  ❌ Task {task.tid} missed deadline")

        if not active_ids:
            continue

        # Observe current loss and log for prediction
        for tid in active_ids:
            tasks[tid].observe()

        # Ask scheduler how to split server power across tasks
        allocation = scheduler(t, tasks, active_ids)

        # Apply progress for each scheduled task
        for tid, frac in allocation.items():
            task = tasks[tid]
            power = min(frac * TOTAL_SERVER_POWER, task.max_rate)
            task.progress += power

            # Check if task finished this step
            if not task.done:
                if task.true_loss() <= task.eps:
                    task.done = True
                    completed.append(task.tid)
                    if verbose:
                        print(f"t={t:2d}  ✅ Task {task.tid} completed "
                              f"(progress={task.progress:.1f}, loss≈{task.true_loss():.3f})")

    success_rate = len(completed) / len(tasks) * 100.0

    if verbose:
        print("\n--- Results ---")
        print("Completed:", sorted(completed))
        print("Missed   :", sorted([i for i in range(num_tasks) if i not in completed]))
        print(f"Success rate: {success_rate:.1f} %")

    return {
        "completed": completed,
        "tasks": tasks,
        "success_rate": success_rate,
    }


In [9]:
# Runs all three strategies


# Run Uniform, Shortest Deadline First, and LARA style
res_uniform = run_simulation("Uniform", uniform_scheduler, verbose=True)
res_sdf     = run_simulation("Shortest Deadline First", shortest_deadline_first_scheduler, verbose=True)
res_lara    = run_simulation("LARA style", lara_style_scheduler, verbose=True)

print("\n================ SUMMARY ================")
print(f"Uniform success rate: {res_uniform['success_rate']:.1f} %")
print(f"SDF success rate    : {res_sdf['success_rate']:.1f} %")
print(f"LARA style rate     : {res_lara['success_rate']:.1f} %")



=== Strategy: Uniform ===
t= 0  ✅ Task 0 completed (progress=20.0, loss≈0.118)
t= 0  ✅ Task 3 completed (progress=20.0, loss≈0.076)
t=12  ✅ Task 4 completed (progress=380.0, loss≈0.171)
t=20  ✅ Task 1 completed (progress=700.0, loss≈0.194)
t=22  ❌ Task 2 missed deadline
t=23  ✅ Task 5 completed (progress=878.5, loss≈0.228)

--- Results ---
Completed: [0, 1, 3, 4, 5]
Missed   : [2]
Success rate: 83.3 %

=== Strategy: Shortest Deadline First ===
t= 0  ✅ Task 1 completed (progress=44.8, loss≈0.102)
t= 1  ✅ Task 0 completed (progress=56.8, loss≈0.098)
t= 2  ✅ Task 3 completed (progress=41.1, loss≈0.048)
t= 3  ✅ Task 4 completed (progress=47.2, loss≈0.044)
t= 4  ✅ Task 2 completed (progress=53.1, loss≈0.061)
t=41  ❌ Task 5 missed deadline

--- Results ---
Completed: [0, 1, 2, 3, 4]
Missed   : [5]
Success rate: 83.3 %

=== Strategy: LARA style ===
t= 0  ✅ Task 1 completed (progress=20.0, loss≈0.182)
t= 0  ✅ Task 2 completed (progress=20.0, loss≈0.227)
t= 1  ✅ Task 0 completed (progress=50.0