In [None]:
import random
from dataclasses import dataclass, field
from typing import List, Tuple, Dict, Any
import matplotlib.pyplot as plt
import numpy as np

import ipywidgets as widgets
from ipywidgets import IntSlider, FloatSlider, Button, VBox, HBox, FloatText, HTML, Image
from IPython.display import display, clear_output

Process dataclass

In [None]:
@dataclass(order=True)
class Process:
    pid: int
    arrival: int
    burst: int
    priority: int = field(default=0, compare=False)
    remaining: int = field(init=False, compare=False)
    start_time: int = field(default=-1, compare=False)
    finish_time: int = field(default=-1, compare=False)

    def __post_init__(self):
        self.remaining = self.burst

Metrics utilities

In [127]:
def compute_metrics_from_processes(processes: List[Process], gantt: List[Tuple[int,int,int]]):
    # processes should have finish_time set
    n = len(processes)
    waiting_times = []
    turnaround_times = []
    for p in processes:
        tt = p.finish_time - p.arrival
        wt = tt - p.burst
        turnaround_times.append(tt)
        waiting_times.append(wt)
    avg_wait = float(np.mean(waiting_times)) if waiting_times else 0.0
    avg_turn = float(np.mean(turnaround_times)) if turnaround_times else 0.0
    fairness = float(np.std(waiting_times)) if waiting_times else 0.0
    # context switches: count consecutive gantt blocks where pid changes (ignore idle blocks if any)
    ctx_switches = 0
    last_pid = None
    for pid, start, end in gantt:
        if last_pid is None:
            last_pid = pid
            continue
        if pid != last_pid:
            ctx_switches += 1
        last_pid = pid
    return {
        "avg_waiting_time": round(avg_wait, 3),
        "avg_turnaround_time": round(avg_turn, 3),
        "fairness_std": round(fairness, 3),
        "num_context_switches": ctx_switches,
        "waiting_times": waiting_times,
        "turnaround_times": turnaround_times
    }

def plot_gantt(gantt: List[Tuple[int, int, int]], title: str):
    if not gantt:
        print("No schedule to show.")
        return
    fig, ax = plt.subplots(figsize=(10, 2))
    colors = plt.cm.tab20.colors
    for (pid, start, end) in gantt:
        ax.broken_barh(
            [(start, end - start)],
            (pid * 10, 8),
            facecolors=[colors[pid % len(colors)]],
            edgecolor='none'  # removed black outline
        )
        ax.text(
            (start + end) / 2,
            pid * 10 + 4,
            f"P{pid}",
            ha='center',
            va='center',
            color='white',
            fontsize=8,
            fontweight='bold'
        )
    ax.set_xlabel("Time")
    ax.set_yticks([])
    ax.set_title(title)
    plt.tight_layout()
    plt.show()


Scheduler implementations </br>
All return (processes_list_with_finish, gantt_list) </br>
gantt: list of (pid, start, end) with contiguous segments merged where appropriate



In [128]:
def fcfs(procs_in: List[Process]):
    procs = [Process(p.pid, p.arrival, p.burst, p.priority) for p in sorted(procs_in, key=lambda x: x.arrival)]
    if not procs:
        return [], []
    time = procs[0].arrival
    gantt = []
    for p in procs:
        time = max(time, p.arrival)
        p.start_time = time
        time += p.burst
        p.finish_time = time
        gantt.append((p.pid, p.start_time, p.finish_time))
    return procs, gantt

def sjf(procs_in: List[Process]):
    procs = [Process(p.pid, p.arrival, p.burst, p.priority) for p in procs_in]
    if not procs:
        return [], []
    time = min(p.arrival for p in procs)
    n = len(procs)
    visited = [False]*n
    done = 0
    gantt = []
    while done < n:
        ready = [p for p in procs if p.arrival <= time and not visited[procs.index(p)]]
        if not ready:
            time += 1
            continue
        p = min(ready, key=lambda x: x.burst)
        idx = procs.index(p)
        visited[idx] = True
        p.start_time = time
        time += p.burst
        p.finish_time = time
        gantt.append((p.pid, p.start_time, p.finish_time))
        done += 1
    return procs, gantt

def srtf(procs_in: List[Process]):
    procs = [Process(p.pid, p.arrival, p.burst, p.priority) for p in procs_in]
    if not procs:
        return [], []
    time = min(p.arrival for p in procs)
    gantt = []
    while True:
        if all(p.remaining == 0 for p in procs):
            break
        ready = [p for p in procs if p.arrival <= time and p.remaining > 0]
        if not ready:
            time += 1
            continue
        p = min(ready, key=lambda x: x.remaining)
        if p.start_time == -1:
            p.start_time = time
        if not gantt or gantt[-1][0] != p.pid:
            gantt.append((p.pid, time, time+1))
        else:
            pid,s,e = gantt[-1]; gantt[-1] = (pid, s, e+1)
        p.remaining -= 1
        time += 1
        if p.remaining == 0:
            p.finish_time = time
    return procs, gantt

def round_robin(procs_in: List[Process], quantum:int=2):
    procs = [Process(p.pid, p.arrival, p.burst, p.priority) for p in procs_in]
    if not procs:
        return [], []
    time = min(p.arrival for p in procs)
    queue = []
    arrived = set()
    gantt = []
    while any(p.remaining > 0 for p in procs):
        for p in procs:
            if p.arrival <= time and p.pid not in arrived:
                queue.append(p); arrived.add(p.pid)
        if not queue:
            time += 1
            continue
        p = queue.pop(0)
        if p.start_time == -1:
            p.start_time = time
        run = min(quantum, p.remaining)
        if gantt and gantt[-1][0] == p.pid and gantt[-1][2] == time:
            pid,s,e = gantt[-1]; gantt[-1] = (pid, s, e+run)
        else:
            gantt.append((p.pid, time, time+run))
        p.remaining -= run
        time += run
        for q in procs:
            if q.arrival <= time and q.pid not in arrived:
                queue.append(q); arrived.add(q.pid)
        if p.remaining > 0:
            queue.append(p)
        else:
            p.finish_time = time
    return procs, gantt

def priority_preemptive(procs_in: List[Process]):
    procs = [Process(p.pid, p.arrival, p.burst, p.priority) for p in procs_in]
    if not procs:
        return [], []
    time = min(p.arrival for p in procs)
    gantt = []
    while True:
        if all(p.remaining == 0 for p in procs):
            break
        ready = [p for p in procs if p.arrival <= time and p.remaining > 0]
        if not ready:
            time += 1
            continue
        p = min(ready, key=lambda x: x.priority)
        if p.start_time == -1:
            p.start_time = time
        if not gantt or gantt[-1][0] != p.pid:
            gantt.append((p.pid, time, time+1))
        else:
            pid,s,e = gantt[-1]; gantt[-1] = (pid, s, e+1)
        p.remaining -= 1
        time += 1
        if p.remaining == 0:
            p.finish_time = time
    return procs, gantt

def mlfq(procs_in: List[Process], q1:int=2, q2:int=4):
    # simple two-level MLFQ: high queue RR q1, then low queue RR q2
    procs = [Process(p.pid, p.arrival, p.burst, p.priority) for p in procs_in]
    if not procs:
        return [], []
    time = min(p.arrival for p in procs)
    q_high = []
    q_low = []
    arrived = set()
    gantt = []
    while any(p.remaining > 0 for p in procs):
        for p in procs:
            if p.arrival <= time and p.pid not in arrived:
                q_high.append(p); arrived.add(p.pid)
        if q_high:
            p = q_high.pop(0)
            if p.start_time == -1:
                p.start_time = time
            run = min(q1, p.remaining)
            if gantt and gantt[-1][0] == p.pid and gantt[-1][2] == time:
                pid,s,e = gantt[-1]; gantt[-1] = (pid, s, e+run)
            else:
                gantt.append((p.pid, time, time+run))
            p.remaining -= run
            time += run
            for a in procs:
                if a.arrival <= time and a.pid not in arrived:
                    q_high.append(a); arrived.add(a.pid)
            if p.remaining > 0:
                q_low.append(p)
            else:
                p.finish_time = time
        elif q_low:
            p = q_low.pop(0)
            if p.start_time == -1:
                p.start_time = time
            run = min(q2, p.remaining)
            if gantt and gantt[-1][0] == p.pid and gantt[-1][2] == time:
                pid,s,e = gantt[-1]; gantt[-1] = (pid, s, e+run)
            else:
                gantt.append((p.pid, time, time+run))
            p.remaining -= run
            time += run
            for a in procs:
                if a.arrival <= time and a.pid not in arrived:
                    q_high.append(a); arrived.add(a.pid)
            if p.remaining > 0:
                q_low.append(p)
            else:
                p.finish_time = time
        else:
            # no ready processes
            time += 1
    return procs, gantt

Scheduler map (include MLFQ index = 5)

In [None]:
SCHEDULERS = {
    0: ("FCFS", lambda procs, params: fcfs(procs)),
    1: ("SJF",  lambda procs, params: sjf(procs)),
    2: ("SRTF", lambda procs, params: srtf(procs)),
    3: ("RR",   lambda procs, params: round_robin(procs, quantum=int(params.get('quantum',2)))),
    4: ("PRIO", lambda procs, params: priority_preemptive(procs)),
    5: ("MLFQ", lambda procs, params: mlfq(procs, q1=int(params.get('q1',2)), q2=int(params.get('q2',4))))
}

Genetic Algorithm (extended chromosome) </br>
 Chromosome encoding: </br>
 [ sched_idx (0..5), </br>
   rr_quantum (1..10), </br>
   mlfq_q1 (1..6), </br>
   mlfq_q2 (1..12), </br>
   ctx_weight (0.0..0.5), </br>
   fair_weight (0.0..0.5) </br>
 ]

In [129]:
def random_chromosome():
    return [
        random.randint(0,5),
        random.randint(1,10),
        random.randint(1,6),
        random.randint(1,12),
        round(random.uniform(0.0, 0.5), 3),
        round(random.uniform(0.0, 0.5), 3)
    ]

def decode_and_eval(chrom, processes: List[Process]):
    sched_idx = int(chrom[0])
    params = {
        'quantum': int(chrom[1]),
        'q1': int(chrom[2]),
        'q2': int(chrom[3])
    }
    name, func = SCHEDULERS[sched_idx]
    # run a fresh copy
    procs_copy = [Process(p.pid, p.arrival, p.burst, p.priority) for p in processes]
    scheduled, gantt = func(procs_copy, params)
    metrics = compute_metrics_from_processes(scheduled, gantt)
    # fitness: lower is better
    ctx_w = float(chrom[4])
    fair_w = float(chrom[5])
    score = (0.4 * metrics["avg_waiting_time"] +
             0.3 * metrics["avg_turnaround_time"] +
             ctx_w * metrics["num_context_switches"] +
             fair_w * metrics["fairness_std"])
    return score, name, scheduled, gantt, metrics

def tournament_selection(pop, scores, k=3):
    # pick k random, return best chromosome (lowest score)
    picked = random.sample(list(zip(pop, scores)), k)
    picked.sort(key=lambda x: x[1])
    return picked[0][0]

def crossover(a, b):
    # two-point style for 6-length chromosome: swap middle parts
    if random.random() < 0.9:
        # keep scheduler gene from parents but mix params
        child1 = [a[0], b[1], b[2], a[3], b[4], a[5]]
        child2 = [b[0], a[1], a[2], b[3], a[4], b[5]]
    else:
        child1, child2 = a[:], b[:]
    return child1, child2

def mutate(chrom, mut_rate=0.2):
    c = chrom[:]
    # mutate scheduler
    if random.random() < mut_rate:
        c[0] = random.randint(0,5)
    # mutate rr quantum
    if random.random() < mut_rate:
        c[1] = max(1, min(12, c[1] + random.choice([-2,-1,1,2])))
    # mutate mlfq q1
    if random.random() < mut_rate:
        c[2] = max(1, min(8, c[2] + random.choice([-1,1])))
    # mutate mlfq q2
    if random.random() < mut_rate:
        c[3] = max(1, min(16, c[3] + random.choice([-2,-1,1,2])))
    # mutate weights
    if random.random() < mut_rate:
        c[4] = round(min(0.5, max(0.0, c[4] + random.uniform(-0.05, 0.05))), 3)
    if random.random() < mut_rate:
        c[5] = round(min(0.5, max(0.0, c[5] + random.uniform(-0.05, 0.05))), 3)
    return c

def run_ga(processes: List[Process], pop_size=30, gens=40, mut_rate=0.2, elite=2):
    pop = [random_chromosome() for _ in range(pop_size)]
    history = []
    for gen in range(gens):
        scores = []
        details = []
        for chrom in pop:
            score, name, scheduled, gantt, metrics = decode_and_eval(chrom, processes)
            scores.append(score)
            details.append((chrom, score, name, scheduled, gantt, metrics))
        # sort by score (lower better)
        zipped = list(zip(pop, scores, details))
        zipped.sort(key=lambda x: x[1])
        pop_sorted = [z[0] for z in zipped]
        scores_sorted = [z[1] for z in zipped]
        best_chrom = pop_sorted[0]
        history.append((gen, scores_sorted[0], best_chrom))
        # elitism + generate new population
        new_pop = pop_sorted[:elite]
        while len(new_pop) < pop_size:
            parent1 = tournament_selection(pop_sorted, scores_sorted, k=3)
            parent2 = tournament_selection(pop_sorted, scores_sorted, k=3)
            child1, child2 = crossover(parent1, parent2)
            if random.random() < mut_rate:
                child1 = mutate(child1, mut_rate)
            if random.random() < mut_rate and len(new_pop)+1 < pop_size:
                child2 = mutate(child2, mut_rate)
            new_pop.append(child1)
            if len(new_pop) < pop_size:
                new_pop.append(child2)
        pop = new_pop
    # final evaluate and pick best
    final_evals = [decode_and_eval(ch, processes) for ch in pop]
    scores_final = [e[0] for e in final_evals]
    best_idx = int(min(range(len(scores_final)), key=lambda i: scores_final[i]))
    best_eval = final_evals[best_idx]
    best_chrom = pop[best_idx]
    return best_eval, history

Baseline compare

In [None]:
def run_baseline(processes: List[Process]):
    results = {}
    for idx, (name, func) in SCHEDULERS.items():
        if name == "RR":
            res_q2 = func([Process(p.pid,p.arrival,p.burst,p.priority) for p in processes], {'quantum':2})
            metrics_q2 = compute_metrics_from_processes(res_q2[0], res_q2[1])
            results[f"RR(q=2)"] = (res_q2[1], metrics_q2)
            res_q4 = func([Process(p.pid,p.arrival,p.burst,p.priority) for p in processes], {'quantum':4})
            metrics_q4 = compute_metrics_from_processes(res_q4[0], res_q4[1])
            results[f"RR(q=4)"] = (res_q4[1], metrics_q4)
        elif name == "MLFQ":
            res_default = func([Process(p.pid,p.arrival,p.burst,p.priority) for p in processes], {'q1':2, 'q2':4})
            metrics_default = compute_metrics_from_processes(res_default[0], res_default[1])
            results[f"MLFQ(q1=2,q2=4)"] = (res_default[1], metrics_default)
        else:
            res = func([Process(p.pid,p.arrival,p.burst,p.priority) for p in processes], {})
            metrics = compute_metrics_from_processes(res[0], res[1])
            results[name] = (res[1], metrics)
    return results

UI & controls

In [None]:
plt.rcParams.update({
    'figure.facecolor': '#282A2C',     # dark gray figure background
    'axes.facecolor': '#40444A',       # slightly lighter gray for chart area
    'savefig.facecolor': '#2e2e2e',
    'axes.edgecolor': '#aaaaaa',
    'axes.labelcolor': '#ffffff',
    'axes.titlecolor': '#ffffff',
    'xtick.color': '#ffffff',
    'ytick.color': '#ffffff',
    'text.color': '#ffffff',
    'grid.color': '#666666'
})

In [146]:
default_demo = [
    Process(1,0,5,3),
    Process(2,1,3,1),
    Process(3,2,8,4),
    Process(4,3,6,2)
]

num_proc_widget = IntSlider(value=4, min=1, max=8, step=1, description='Processes:')
ga_pop = IntSlider(value=30, min=6, max=100, step=1, description='GA Pop:')
ga_gens = IntSlider(value=40, min=5, max=200, step=1, description='Generations:')
ga_mut = FloatSlider(value=0.2, min=0.0, max=0.6, step=0.01, description='Mutation:')
run_ga_btn = Button(description="Run GA Optimizer", button_style='success')
compare_btn = Button(description="Compare Baselines", button_style='info')
output = widgets.Output(layout={'border': '0.5px solid black'})

def make_boxes(n, initial=default_demo):
    boxes=[]
    for i in range(n):
        if i < len(initial):
            d = initial[i]
            arr = FloatText(value=d.arrival, description=f"P{i+1} Arrival:")
            bur = FloatText(value=d.burst, description=f"P{i+1} Burst:")
            pri = FloatText(value=d.priority, description=f"P{i+1} Priority:")
        else:
            arr = FloatText(value=0, description=f"P{i+1} Arrival:")
            bur = FloatText(value=1, description=f"P{i+1} Burst:")
            pri = FloatText(value=1, description=f"P{i+1} Priority:")
        boxes.append(HBox([arr, bur, pri]))
    return boxes

proc_box = VBox(make_boxes(num_proc_widget.value))

def refresh_proc_boxes(change=None):
    proc_box.children = make_boxes(num_proc_widget.value)

num_proc_widget.observe(refresh_proc_boxes, names='value')

def read_processes():
    ps = []
    for i, box in enumerate(proc_box.children):
        arr = int(box.children[0].value)
        bur = int(box.children[1].value)
        pri = int(box.children[2].value)
        ps.append(Process(i+1, arr, bur, pri))
    return ps

@output.capture(clear_output=True)
def on_compare(b):
    processes = read_processes()
    print("Baseline comparison for the current workload:\n")
    results = run_baseline(processes)
    rows = []
    for name, (gantt, metrics) in results.items():
        rows.append((name, metrics['avg_waiting_time'], metrics['avg_turnaround_time'], metrics['num_context_switches'], metrics['fairness_std']))
    rows.sort(key=lambda x: x[1])
    for r in rows:
        print(f"{r[0]:<20} AvgWait={r[1]:>6}  AvgTurn={r[2]:>6}  CtxSwitch={r[3]:>3}  Fairness={r[4]:>5}")
    # bar chart: avg wait and avg turn
    names = [r[0] for r in rows]
    waits = [r[1] for r in rows]
    turns = [r[2] for r in rows]
    x = np.arange(len(names))
    width = 0.35
    fig, ax = plt.subplots(figsize=(9,4))
    ax.bar(x - width/2, waits, width, label='AvgWaiting', color='#1384ad')
    ax.bar(x + width/2, turns, width, label='AvgTurnaround', color='#ad4f1f')
    ax.set_xticks(x); ax.set_xticklabels(names, rotation=45)
    ax.set_ylabel("Time")
    ax.legend()
    plt.show()
    # show Gantt of best by waiting
    best = rows[0][0]
    print(f"\nDisplaying Gantt chart for best baseline ({best}):")
    gantt = results[best][0]
    plot_gantt(gantt, f"Baseline Best: {best}")

@output.capture(clear_output=True)
def on_run_ga(b):
    processes = read_processes()
    pop = ga_pop.value
    gens = ga_gens.value
    mut = ga_mut.value
    print("Running GA optimizer... (this may take a moment depending on settings)\n")
    best_eval, hist = run_ga(processes, pop_size=pop, gens=gens, mut_rate=mut, elite=2)
    score, name, scheduled, gantt, metrics = best_eval
    print(f"GA Best -> Scheduler: {name}  Score={score:.3f}")
    print(f"Metrics: AvgWait={metrics['avg_waiting_time']}  AvgTurn={metrics['avg_turnaround_time']}  CtxSwitch={metrics['num_context_switches']}  FairnessStd={metrics['fairness_std']}")
    plot_gantt(gantt, f"GA Best: {name}")
    # show convergence
    gens_list = [h[0] for h in hist]
    best_scores = [h[1] for h in hist]
    fig, ax = plt.subplots(figsize=(7,3))
    ax.plot(gens_list, best_scores, marker='o')
    ax.set_xlabel("Generation")
    ax.set_ylabel("Best Score (lower better)")
    ax.set_title("GA Convergence (best score per generation)")
    plt.show()
    # compare to baselines
    print("\nBaseline summary (same workload):")
    results = run_baseline(processes)
    bl = []
    for k,(g,m) in results.items():
        bl.append((k, m['avg_waiting_time'], m['avg_turnaround_time'], m['num_context_switches'], m['fairness_std']))
    bl.sort(key=lambda x: x[1])
    for r in bl:
        marker = "<-- GA better" if r[1] > metrics['avg_waiting_time'] else ""
        print(f"{r[0]:<20} AvgWait={r[1]:>6}  AvgTurn={r[2]:>6}  CtxSwitch={r[3]:>3}  Fairness={r[4]:>5} {marker}")

run_ga_btn.on_click(on_run_ga)
compare_btn.on_click(on_compare)

with open("opticore_logo.png", "rb") as f:
    logo = f.read()

logo_widget = Image(value=logo, format='png', width=140)

title_widget = HTML("""
    <div style='text-align:center; line-height:1.2; margin-top:-10px;'>
        <h2 style='margin-top:4px; margin-bottom:4px; monospace; font-size:18px; color:#ffff;'>
            OptiCore
        </h2>
        <p style='margin-top:0; font-size:13px; color:#555; font-style:italic;'>
            Optimizing CPU scheduling through intelligent algorithms.
        </p>
    </div>
""")

header = VBox(
    [logo_widget, title_widget],
    layout={'align_items': 'center', 'padding': '0', 'margin': '0'}
)

controls = VBox([
    header,
    HBox([num_proc_widget, ga_pop, ga_gens, ga_mut]),
    HTML("<b>Per-process inputs (Arrival, Burst, Priority):</b>"),
    proc_box,
    HBox([run_ga_btn, compare_btn]),
    output
])

display(controls)

VBox(children=(VBox(children=(Image(value=b'\x89PNG\r\n\x1a\n\x00\x00\x00\rIHDR\x00\x00\x01\xf4\x00\x00\x01\xf…

In [None]:
plt.rcdefaults()
plt.style.use('default')
