LCR算法：解决了领导者选举问题，系统中有n个进程，每个进程有一个唯一ID，网络是一个单向环，没有初始领导者，最终恰好有一个进程宣布自己是领导者，且所有进程都知道它是谁

模型假设：
1. 拓扑结构：所有进程构成一个逻辑环；每个进程只知道它的后继节点
2. 通信模型：只能沿一个方向发送消息；消息可靠、不丢失、不篡改；不要求同步
3. 进程特性：每个进程有一个唯一 ID；所有进程执行同一套算法；没有全局视图

算法流程：
1. 初始化：每个进程$p_i$，知道自己的ID：$id_i$，向后继结点发送一条消息：$⟨id_i⟩$
2. 接收与处理：当某个进程$p$收到一条消息$⟨id_j⟩$：

$\ \ \ \ $(1)$id_i\geq id_p$: 转发$id_j$

$\ \ \ \ $(2)$id_i\leq id_p$: 丢弃$id_j$

$\ \ \ \ $(3)$id_i=id_p$: 声称自己是leader

时间复杂度：$O(n)$ $ \ \ \ \ \ $ 消息复杂度：$O(n^2)$

In [7]:
from collections import deque

class Process:
    def __init__(self, pid):
        self.pid=pid
        self.inbox=deque()
        self.active=True
        self.is_leader=False


    def receive(self, msg):
        self.inbox.append(msg)

    def step(self):
        if not self.inbox:
            return None
        msg=self.inbox.popleft()
        if msg > self.pid:
            return msg
        elif msg<self.pid:
            return None
        else:
            self.is_leader=True
            return None

def run_lcr(pids):
    n=len(pids)
    processes=[Process(pid) for pid in pids]
    successors={i: (i+1)%n for i in range(n)}
    # 初始阶段：每个进程向其后继发送自己的PID
    for i, proc in enumerate(processes):
        processes[successors[i]].receive(proc.pid)
    round_num=0
    while True:
        round_num+=1
        outgoing=[]
        # 第一阶段：所有进程执行一步操作
        for i, proc in enumerate(processes):
            # 每个进程执行一步算法，可能返回要转发的消息
            msg=proc.step()
            if msg is not None:
                outgoing.append((i, msg)) # 记录发送者索引和消息
        # 第二阶段：转发消息给后继进程
        for i, msg in outgoing:
            # 进程i将消息msg发送给它的后继进程
            processes[successors[i]].receive(msg)
        # 检查是否有进程被选举为领导者
        leaders = [p.pid for p in processes if p.is_leader]
        if leaders:
            print(f"Leader elected in round {round_num}: {leaders[0]}")
            return leaders[0]

if __name__ == '__main__' :
    pids=[3,7,2,9,5]
    leader=run_lcr(pids)

Leader elected in round 5: 9


Hs算法：分阶段、指数扩张、双向试探，只让足够强的ID才能传播得更远

模型假设
1. 拓扑结构：双向环
2. 每个进程有唯一ID，知道左右邻居
3. 通信点对点，可靠，可同步也可异步

Phased-based竞争：在第k个阶段，每个仍然活跃的进程向左右两方向个发送一个探测消息，探测距离为$2^k$
1. 发起探测：每个active的进程p，向左发送$OUT(pid, 2^k)$，向右发送$OUT(pid, 2^k)$
2. 当某个进程q收到$OUT(id, dist)$：

$\ \ \ \ $(1)$id\leq q.id$: 丢弃$id$

$\ \ \ \ $(2)$id\geq q.id$: 若$dist\geq 1$: 转发$OUT(id, dist-1)$。若$dist==1$: 探测到达边界，向反方向发送$IN(id)$

3. 当$IN(id)$一路返回到发起者：表示在该方向上，$2^k$距离范围内，没有比id更大的进程

4. 进程p在phase k存活，当且仅当左右方向的$IN(pid)$均返回，否则p被淘汰

当某个进程p在某个phase k中，探测距离$2^k\geq n$，且左右方向的$IN$都返回，则该进程是整个环中ID最大的进程，它宣布自己是leader

时间复杂度：$O(n)$ $ \ \ \ \ \ $ 消息复杂度：$O(nlogn)$

In [8]:
class Message:
    def __init__(self, msg_type, pid ,dist, direction, origin):
        self.msg_type = msg_type
        self.pid = pid
        self.dist = dist
        self.direction = direction
        self.origin = origin

class HSSimulator:
    def __init__(self, pids):
        self.n=len(pids)
        self.processes = [Process(pid) for pid in pids]
        self.left = lambda i: (i-1)%self.n
        self.right = lambda i: (i+1)%self.n

    def run(self):
        phase=0
        while True:
            print(f"\n=== Phase {phase} ===")
            radius = 2**phase
            inbox = [[] for _ in range(self.n)]

            #发起OUT探测
            for i, p in enumerate(self.processes):
                if not p.active:
                    continue
                inbox[self.left(i)].append(
                    Message('OUT', p.pid, radius, 'L', i)
                )
                inbox[self.right(i)].append(
                    Message('OUT', p.pid, radius, 'R', i)
                )
            #传播消息
            returned = {i: {'L': False, 'R': False} for i in range(self.n)}
            active_messages = True

            while active_messages:
                active_messages = False
                new_inbox = [[] for _ in range(self.n)]
                for i, msgs in enumerate(inbox):
                    for msg in msgs:
                        active_messages = True
                        self._handle_message(i, msg, new_inbox, returned)
                inbox = new_inbox
            #phase结束，淘汰失败者
            alive = 0
            for i, p in enumerate(self.processes):
                if not p.active:
                    continue
                if returned[i]['L'] and returned[i]['R']:
                    alive += 1
                else:
                    p.active = False
            #终止条件
            if alive == 1:
                leader = next(p for p in self.processes if p.active)
                leader.is_leader = True
                print(f"\nLeader elected: {leader.pid}")
                return leader.pid
            phase += 1

    def _handle_message(self, idx, msg, new_inbox, returned):
        p = self.processes[idx]
        if msg.msg_type == 'OUT':
            if msg.pid < p.pid:
                return
            if msg.dist > 1:
                nxt = self.left(idx) if msg.direction == 'L' else self.right(idx)
                new_inbox[nxt].append(
                    Message('OUT', msg.pid, msg.dist - 1, msg.direction, msg.origin)
                )
            else:
                nxt = self.right(idx) if msg.direction == 'L' else self.left(idx)
                new_inbox[nxt].append(
                    Message('IN', msg.pid, 0, msg.direction, msg.origin)
                )
        elif msg.msg_type == 'IN':
            if idx == msg.origin:
                returned[idx][msg.direction] = True
            else:
                nxt = self.right(idx) if msg.direction == 'L' else self.left(idx)
                new_inbox[nxt].append(msg)
if __name__ == "__main__":
    pids = [3, 7, 2, 9, 5]
    sim = HSSimulator(pids)
    sim.run()


=== Phase 0 ===

=== Phase 1 ===

Leader elected: 9


时间片算法：已知环的规模为n，在单核或有限核CPU中，如何让多个进程看起来像同时运行，并且做到公平，可预测。通过“强制抢占 + 循环调度”，在牺牲部分吞吐量的前提下，换取了系统级的公平性和可响应性。

模型假设：
1. 可抢占内核，内核可以在任意安全点中断当前进程
2. 定时器中断：硬件周期性产生中断
3. 就绪队列：所有可运行进程排队等待CPU
4. 上下文切换：保存/回复寄存器，程序计数器，栈等

算法流程：
1. 系统维护一个FIFO就绪队列
2. 每次调度：从队首取一个进程，分配一个固定长度的时间片q
3. 若进程在时间片内，完成或阻塞时立即让出CPU
4. 若时间片耗尽但进程未完成，则强制抢占并把进程放回队尾

时间复杂度：$O(n\cdot uid_{min})$ $ \ \ \ \ \ $ 消息复杂度：$O(n)$

In [4]:
class Process:
    def __init__(self, pid, arrival, burst):
        self.pid=pid
        self.arrival=arrival
        self.burst=burst
        self.remaining=burst
        self.start_time=None
        self.finish_time=None
from collections import deque
class RoundRobinScheduler:
    def __init__(self, processes, quantum):
        self.processes=processes
        self.quantum=quantum
        self.time=0
        self.ready_queue=deque()
        self.context_switches=0
        self.last_running=None
    def run(self):
        processes=sorted(self.processes, key=lambda p: p.arrival)
        n=len(processes)
        finished=0
        idx=0
        print("=== Round-Robin Scheduling Start ===")
        while finished<n:
            #新进程进入就绪队列
            while idx<n and processes[idx].arrival <= self.time:
                self.ready_queue.append(processes[idx])
                idx+=1
            if not self.ready_queue:
                self.time+=1
                continue
            #取对手进程
            current=self.ready_queue.popleft()
            #统计上下文切换
            if self.last_running is not None and self.last_running!=current.pid:
                self.context_switches+=1
            self.last_running=current.pid
            if current.start_time is None:
                current.start_time=self.time
            #执行一个时间片
            run_time=min(self.quantum, current.remaining)
            self.time+=run_time
            current.remaining-=run_time
            print(f"Time {self.time-run_time:2d} → {self.time:2d}: "f"P{current.pid} runs ({run_time})")
            #检查新到达进程
            while idx<n and processes[idx].arrival<=self.time:
                self.ready_queue.append(processes[idx])
                idx+=1
            #是否完成
            if current.remaining>0:
                self.ready_queue.append(current)
            else:
                current.finish_time=self.time
                finished+=1
        print("=== Scheduling Finished ===\n")
    def report(self):
        print("Process | Arrival | Burst | Finish | Turnaround")
        for p in self.processes:
            turnaround = p.finish_time-p.arrival
            print(f"P{p.pid:6d} | {p.arrival:7d} | {p.burst:5d} | "f"{p.finish_time:6d} | {turnaround:10d}")
        print(f"\nContext switches: {self.context_switches}")

if __name__ == "__main__":
    processes=[
        Process(1, arrival=0, burst=5),
        Process(2, arrival=1, burst=3),
        Process(3, arrival=2, burst=4),
    ]
    scheduler = RoundRobinScheduler(processes, quantum=2)
    scheduler.run()
    scheduler.report()

=== Round-Robin Scheduling Start ===
Time  0 →  2: P1 runs (2)
Time  2 →  4: P2 runs (2)
Time  4 →  6: P3 runs (2)
Time  6 →  8: P1 runs (2)
Time  8 →  9: P2 runs (1)
Time  9 → 11: P3 runs (2)
Time 11 → 12: P1 runs (1)
=== Scheduling Finished ===

Process | Arrival | Burst | Finish | Turnaround
P     1 |       0 |     5 |     12 |         12
P     2 |       1 |     3 |      9 |          8
P     3 |       2 |     4 |     11 |          9

Context switches: 6


变速算法：环的规模未知，根据进程行为动态改变其运行速度（优先级/时间片）的调度策略

标准形式：MLFQ多级反馈队列
1. 新进程从最高优先级开始：所有新创建的进程->Q0，假设它是交互型的
2. 用完整时间片：如果进程在某个队列，用完了整个时间片，说明它是CPU-bound的可能性大，移动到下一层队列
3. 主动阻塞/提前让出，则保持或升级加速：如果进程在时间片用完前，因I/O/等待而阻塞，说明可能是交互型，留在当前队列或者提升
4. 抢占规则；只要更高优先级队列变为非空，当前运行的低优先级进程立刻被抢占

In [6]:
from collections import deque

class Process:
    def __init__(self, pid, arrival_time, total_time):
        self.pid = pid
        self.arrival_time = arrival_time
        self.total_time = total_time
        self.remaining_time = total_time
        self.queue_level = 0

    def __repr__(self):
        return f"P{self.pid}(rem={self.remaining_time}, Q={self.queue_level})"
class MLFQScheduler:
    def __init__(self, time_slices):
        self.time_slices=time_slices
        self.num_queues=len(time_slices)
        self.queues=[deque() for _ in range(self.num_queues)]
        self.time=0
        self.finished=[]
    def add_process(self, process):
        self.queues[0].append(process)
    def schedule(self):
        for level in range(self.num_queues):
            if self.queues[level]:
                return level, self.queues[level].popleft()
        return None, None
    def run(self, processes):
        processes=sorted(processes, key=lambda p: p.arrival_time)
        i=0
        while i<len(processes) or any(self.queues):
            while i<len(processes) and processes[i].arrival_time<=self.time:
                self.add_process(processes[i])
                i+=1
            level, proc=self.schedule()
            if proc is None:
                self.time+=1
                continue
            slice_time = self.time_slices[level]
            exec_time=min(slice_time, proc.remaining_time)
            print(f"[t={self.time}] 执行 {proc} 时间片={exec_time}")
            self.time+=exec_time
            proc.remaining_time -= exec_time
            if proc.remaining_time == 0:
                print(f"[t={self.time}] 进程 P{proc.pid} 完成")
                self.finished.append(proc)
            else:
                # 用完时间片 → 降级
                if level < self.num_queues - 1:
                    proc.queue_level += 1
                self.queues[proc.queue_level].append(proc)

        print("\n调度结束")
if __name__ == "__main__":
    processes = [
        Process(1, arrival_time=0, total_time=10),
        Process(2, arrival_time=1, total_time=4),
        Process(3, arrival_time=2, total_time=6),
    ]

    scheduler = MLFQScheduler(time_slices=[2, 4, 8])
    scheduler.run(processes)

[t=0] 执行 P1(rem=10, Q=0) 时间片=2
[t=2] 执行 P2(rem=4, Q=0) 时间片=2
[t=4] 执行 P3(rem=6, Q=0) 时间片=2
[t=6] 执行 P1(rem=8, Q=1) 时间片=4
[t=10] 执行 P2(rem=2, Q=1) 时间片=2
[t=12] 进程 P2 完成
[t=12] 执行 P3(rem=4, Q=1) 时间片=4
[t=16] 进程 P3 完成
[t=16] 执行 P1(rem=4, Q=2) 时间片=4
[t=20] 进程 P1 完成

调度结束
