# 堆 heapq

In [2]:
import heapq

## 基本操作

### 创建堆

In [21]:
h = [] # 直接对list使用堆函数操作即可

### 添加和弹出

In [36]:
h = list()

heapq.heappush(h, 2)
heapq.heappush(h, 3)
heapq.heappush(h, 1)

heapq.heappop(h) # 这里可以看出h是一个最小堆

1

### 最大值和最小值

In [24]:
h = list()

for i in range(10):
    heapq.heappush(h, i)

(heapq.nsmallest(1, h), heapq.nlargest(1, h)) # 这里返回的结果是list形式，而不是直接的数值

([0], [9])

## 高级操作

### 合并

In [30]:
h = list()
p = list()

for i in [2, 3, 4]:
    heapq.heappush(h, i)
for i in [5, 6, 7]:
    heapq.heappush(p, i)

l = heapq.merge(h, p)
list(l)

[2, 3, 4, 5, 6, 7]

### 最大的n个值和最小的n个值


In [31]:
h = list()

for i in range(10):
    heapq.heappush(h, i)

n = 3
(heapq.nsmallest(n, h), heapq.nlargest(n, h)) # 这里返回的结果是list形式，而不是直接的数值

([0, 1, 2], [9, 8, 7])

## 特性和用途


### 使用heapq建立的是最小堆，栈顶维最小元素


In [37]:
h = list()

for i in [3, 8, 2, 5, 6]:
    heapq.heappush(h, i)

2

In [None]:
heapq.heappop(h) # 返回最小值2，证明栈顶是最小元素

### 建立最大堆
* 使用负号来建立最大堆 (仅适用于堆内数值全部为正或全部为负时)

In [4]:
h = list()

for i in [3, 8, 2, 5, 6]:
    heapq.heappush(h, -i) # 这里使用负号取反，来让数字大小顺序逆序
top = heapq.heappop(h)
(top, -top) # 取出的元素也取反，变回原样

(-8, 8)

In [8]:
# 将最大堆封装成函数

def maxheap_heappush(h, num):
    return heapq.heappush(h, -num)

def maxheap_heappop(h):
    return -heapq.heappop(h)

# 测试最大堆函数
h = list()

for i in [3, 8, 2, 5, 6]:
    maxheap_heappush(h, i) # 这里使用负号取反，来让数字大小顺序逆序
maxheap_heappop(h) # 取出栈顶是最大元素

8

### 实现堆排序


In [39]:
def heapsort(l: list):
    ret = list()
    for item in l:
        heapq.heappush(ret, item)
    return list(ret)

heapsort(h)

[3, 5, 6, 8]

### 实现优先队列


### 计算数据流的中位数

In [54]:
from random import randint



def classic_medianfinder(l: list) -> float: # 使用传统方法计算中位数，作为测试基准
    l.sort()

    n = len(l)
    if n % 2 != 0:
        return l[n//2] / 1.0
    else:
        return (l[n//2-1] + l[n//2]) / 2.0

def test_medianfinder(medianfinder, test_count=1000): # 对比测试
    dataset = [randint(0, test_count) for _ in range(test_count)]

    true_count = 0
    for i in range(test_count):
        data = dataset[:i+1]
        correct = classic_medianfinder(data)
        answer = medianfinder(data)
        if correct == answer:
            true_count += 1

    return (true_count / test_count)

# 数据流的中位数算法类
class MedianFinder: 

    def __init__(self):
        self.queMin = list()
        self.queMax = list()

    def addNum(self, num: int) -> None:
        queMin_ = self.queMin
        queMax_ = self.queMax

        if not queMin_ or num <= -queMin_[0]:
            heapq.heappush(queMin_, -num)
            if len(queMax_) + 1 < len(queMin_):
                heapq.heappush(queMax_, -heapq.heappop(queMin_))
        else:
            heapq.heappush(queMax_, num)
            if len(queMax_) > len(queMin_):
                heapq.heappush(queMin_, -heapq.heappop(queMax_))
        
    def findMedian(self) -> float:
        queMin_ = self.queMin
        queMax_ = self.queMax

        if len(queMin_) > len(queMax_):
            return -queMin_[0]
        return (-queMin_[0] + queMax_[0]) / 2

# 将算法类包装成函数，方便测试函数调用
def target_medianfinder(l: list):
    mf = MedianFinder()
    for item in l:
        mf.addNum(item)
    return mf.findMedian()

# 进行测试
test_medianfinder(target_medianfinder)

1.0