<a href="https://colab.research.google.com/github/seabay/ml_practice/blob/master/weighted_random_sampling.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import random
from collections import defaultdict, Counter
import time

In [None]:
def wsample(samples, n):

  ret=defaultdict(float)
  random.seed(time.time())
  for item in samples:
    ele=item[0]
    w=item[1]

    r=random.uniform(0,1)
    v=r**(1/w)

    ret[ele]=v
  return list(sorted(ret, key=ret.get, reverse=True))[0:n]



In [None]:
samples=[('a', 0.4), ('b', 0.3), ('c', 0.2), ('d', 0.05), ('e', 0.03), ('f', 0.02)]

In [None]:
ret=wsample(samples, 1)
ret

['b']

In [None]:
ret=[item for i in range(100000) for item in wsample(samples, 1) ]

In [None]:
ret[0:10]

['c', 'c', 'b', 'b', 'b', 'e', 'd', 'b', 'a', 'a']

In [None]:
ret=Counter(ret)

In [None]:
ret

Counter({'a': 40108, 'b': 29945, 'c': 19953, 'd': 5006, 'e': 2985, 'f': 2003})

有放回抽样

In [1]:
# https://guyuecanhui.github.io/2020/12/05/weighted-sample/

import numpy as np
import numpy.random as npr

class AliasMethodSampling:
    def __init__(self, p):
        self.prob, self.alias = self._setup_alias(p)
    
    def _setup_alias(self, p):
        small, large = [], []
        n = len(p)
        prob, alias = np.zeros(n), np.zeros(n, dtype=np.int)
        # init small and large array
        for i, pi in enumerate(p):
            prob[i] = pi * n
            small.append(i) if prob[i] < 1.0 else large.append(i)
        # fill a small element each iteration
        while small and large:
            il, ig = small.pop(), large.pop()
            alias[il] = ig
            prob[ig] = (prob[ig] + prob[il]) - 1
            small.append(ig) if prob[ig] < 1 else large.append(ig)
        # handle numerical instability
        while large:
            print('only large exists', prob, alias, large, small)
            prob[large.pop()] = 1
        while small:
            print('only small exists', prob, alias, large, small)
            prob[small.pop()] = 1
        return prob, alias
        
    def take_sample(self):
        i_rand = npr.randint(len(self.prob))
        return i_rand if npr.rand() < self.prob[i_rand] else self.alias[i_rand]

In [2]:
probs, cnt = [0.1, 0.5, 0.2, 0.05, 0.15], np.zeros(5)

In [3]:
alias = AliasMethodSampling(probs)

only large exists [0.5  1.   0.75 0.25 0.75] [1 0 1 1 2] [1] []


In [4]:
for i in range(1000000):
    cnt[alias.take_sample()] += 1
    
cnt / 1000000

array([0.09991 , 0.50091 , 0.19996 , 0.049804, 0.149416])

In [95]:
import random
import math
import heapq
import numpy as np


def a_expj_sample(prob, m):
    """ 根据 prob 数组无放回随机抽取 m 个元素 """
    topn = []
    for i, pi in enumerate(prob[:m]):
        heapq.heappush(topn, (random.random() ** (1/pi), i))
        
    thres, w_sum = topn[0][0], 0
    xw = math.log(random.random()) / math.log(thres)
    i = m
    for pi in prob[m:]:
        if w_sum + pi >= xw:
            tw = thres ** pi
            r2 = random.uniform(tw, 1)
            ki = r2 ** (1/pi)
            heapq.heappop(topn)
            heapq.heappush(topn, (ki, i))
            thres = topn[0][0]
            xw = math.log(random.random()) / math.log(thres)
            w_sum = 0
        else:
            w_sum += pi
        i += 1
    return [item[1] for item in topn]


probs = [0.1, 0.5, 0.2, 0.05, 0.15]

for k in range(1, 4):
    cnt = np.zeros(5)
    for i in range(1000000):
        for j in a_expj_sample(probs, k):
            cnt[j] += 1
    print(k, ':', cnt / 1000000 / k)

1 : [0.099579 0.50004  0.199411 0.050069 0.150901]
2 : [0.123667  0.3975605 0.2340275 0.0635825 0.1811625]
3 : [0.15280367 0.31326433 0.243917   0.081902   0.208113  ]
