# 滑动窗口最大值

### 思路

使用单调递减队列（队头最大），保证队列内元素从大到小排列。

每次加入新元素时，将队尾所有比它小的元素弹出，保持单调性。

当窗口大小达到 k 时，队头就是当前窗口最大值，加入结果。

每次窗口右移，如果窗口移出的元素等于队头，则将其从队头弹出。

每个元素最多进队一次出队一次，整体复杂度 O(n)。

### 代码

In [5]:
from collections import deque

class MonotonicQueue:
    def __init__(self):
        self.dq = deque()
    
    def push(self, x):
        # 维持递减队列：比新元素小的全部弹掉
        while self.dq and self.dq[-1] < x:
            self.dq.pop()
        self.dq.append(x)
    
    def pop(self, x):
        # 如果要移出的元素是队头，就弹出队头
        if self.dq and self.dq[0] == x:
            self.dq.popleft()
    
    def max(self):
        return self.dq[0]


def maxSlidingWindow(nums, k):
    mq = MonotonicQueue()
    res = []
    
    for i in range(len(nums)):
        mq.push(nums[i])
        
        # 当窗口大小到达 k，开始记录结果
        if i >= k - 1:
            res.append(mq.max())
            mq.pop(nums[i - k + 1])
    
    return res


# -----------------------------
# Example Test
# -----------------------------
nums = [1,3,-1,-3,5,3,6,7]
k = 3
print("滑动窗口最大值：", maxSlidingWindow(nums, k))


滑动窗口最大值： [3, 3, 5, 5, 6, 7]


### 类似题目（480. 滑动窗口中位数）

### 思路

使用两个堆：最大堆 small 保存窗口较小的一半元素；最小堆 large 保存窗口较大的一半元素。

插入新元素时，按大小放入对应堆，再调用 rebalance() 保持两堆大小差 ≤ 1。

窗口满 k 个后，中位数 =

small 的堆顶（奇数）

small 堆顶和 large 堆顶平均（偶数）。

当窗口右移，需要把被移出的旧元素“延迟删除”：标记后在堆顶出现时才实际弹出。

每次插入 / 删除后都 rebalance 保持堆结构平衡，然后继续计算中位数。

### 代码

In [11]:
import heapq
from collections import defaultdict

class DualHeap:
    def __init__(self, k):
        self.small = []              # 最大堆（存放较小的一半） → 取反数实现
        self.large = []              # 最小堆（存放较大的一半）
        self.delayed = defaultdict(int)   # 延迟删除
        self.k = k
        self.small_size = 0
        self.large_size = 0

    def prune(self, heap):
        # 把堆顶所有需要删除的元素真正弹出去
        while heap:
            num = heap[0]
            if heap is self.small:
                num = -num
            if self.delayed[num] > 0:
                # 把这个数真正删除
                if heap is self.small:
                    heapq.heappop(self.small)
                else:
                    heapq.heappop(self.large)
                self.delayed[num] -= 1
            else:
                break

    def rebalance(self):
        # 保持 small >= large，且差值不超过 1
        if self.small_size > self.large_size + 1:
            # small（最大堆）往 large（最小堆）移动堆顶
            x = -heapq.heappop(self.small)
            heapq.heappush(self.large, x)
            self.small_size -= 1
            self.large_size += 1
            self.prune(self.small)

        elif self.small_size < self.large_size:
            x = heapq.heappop(self.large)
            heapq.heappush(self.small, -x)
            self.small_size += 1
            self.large_size -= 1
            self.prune(self.large)

    def insert(self, num):
        if not self.small or num <= -self.small[0]:
            heapq.heappush(self.small, -num)
            self.small_size += 1
        else:
            heapq.heappush(self.large, num)
            self.large_size += 1
        self.rebalance()

    def erase(self, num):
        self.delayed[num] += 1
        # 判断在哪个堆
        if num <= -self.small[0]:
            self.small_size -= 1
            if num == -self.small[0]:
                self.prune(self.small)
        else:
            self.large_size -= 1
            if self.large and num == self.large[0]:
                self.prune(self.large)
        self.rebalance()

    def getMedian(self):
        if self.k % 2 == 1:
            return float(-self.small[0])
        else:
            return (-self.small[0] + self.large[0]) / 2


def medianSlidingWindow(nums, k):
    dh = DualHeap(k)
    res = []

    # 前 k 个元素先加入
    for i in range(k):
        dh.insert(nums[i])
    res.append(dh.getMedian())

    # 后续滑动
    for i in range(k, len(nums)):
        dh.insert(nums[i])
        dh.erase(nums[i - k])
        res.append(dh.getMedian())

    return res


# -----------------------
# Example Test in Notebook
# -----------------------
nums = [1,3,-1,-3,5,3,6,7]
k = 3
print(medianSlidingWindow(nums, k))


[1.0, -1.0, -1.0, 3.0, 5.0, 6.0]
