In [1]:
import click
import numpy as np
import time
from gurobipy import *
import collections

class fluid_model():
    def p(self, s, a, sprime):
        if a == 0:
            return s == sprime
        if a == 1:
            tmp_a, tmp_b = s[0], s[1]
            tmp_c, tmp_d = sprime[0], sprime[1]
            if tmp_a == tmp_c and tmp_b + 1 == tmp_d:
                return 1 - (tmp_a + 1)/(tmp_a + tmp_b + 2)
            if tmp_a + 1 == tmp_c and tmp_b == tmp_d:
                return (tmp_a + 1)/(tmp_a + tmp_b + 2)
            return 0
    
    def reward(self, s, a):
        if a == 0:
            return 0.0
        else:
            return (s[0] + 1)/(s[0] + s[1] + 2)
    def __init__(self, T, alphas):
        self.alphas = alphas
        self.T = T
        self.m, self.x, self.y, self.z = None, {}, {}, {}
        self.duals = None
        self.A, self.B, self.C = set(), set(), set()

    def __LP_sol(self, T, alphas):
        
        time = [t for t in range(T)]
        win = [i for i in range(T)]
        los = [j for j in range(T)]
        d_var, reward = multidict({(t, i, j): (i+1)/(i+j+2) \
                                    for t in time \
                                        for i in win \
                                            for j in los})
        m = Model("MAB")
        n = m.addVars(time, win, los, name = "n")
        x = m.addVars(time, win, los, name = "x")
        
        # set ojective
        m.setObjective(sum(x[t, i, j] * reward[(t, i, j)] \
                            for t in time for i in win \
                                for j in los), GRB.MAXIMIZE)
        
        # add resource constraint
        m.addConstrs(sum(x[t, i, j] for i in win for j in los) \
                        == alphas[t] for t in time)
        
        # add initial constraint
        m.addConstr(n[0, 0, 0] == 1)
        m.addConstr(sum(n[0, i, j] for i in win for j in los) == 1)
        
        # add x constraint
        m.addConstrs(x[t, i, j] <= n[t, i, j] for t in time for i in win for j in los)
        m.addConstrs(x[t, i, j] >= 0 for t in time for i in win for j in los)
        
        # add fluid balance
        m.addConstrs(n[t, i, j] - x[t-1, i-1, j]*reward[(t-1, i-1, j)] - x[t-1, i, j-1]*(1-reward[(t-1, i, j-1)]) - (n[t-1, i, j] - x[t-1, i, j]) == 0 
                    for t in range(1, T) for i in range(1, T) for j in range(1, T))
        m.addConstrs(n[t, 0, j] - x[t-1, 0, j-1]*(1-reward[(t-1, 0, j-1)]) - (n[t-1, 0, j] - x[t-1, 0, j])== 0 
                    for t in range(1, T) for j in range(1, T))
        m.addConstrs(n[t, i, 0] - x[t-1, i-1, 0]*reward[(t-1, i-1, 0)] - (n[t-1, i, 0] - x[t-1, i, 0]) == 0
                    for t in range(1, T) for i in range(1, T))
        m.addConstrs(n[t, 0, 0] - (n[t-1, 0, 0] - x[t-1, 0, 0]) == 0 for t in range(1, T))
        m.setParam('OutputFlag', False)
        m.optimize()
        print('Obj: %g' % m.objVal)
        self.objVal = m.objVal
        
        return m

    def __solve(self, method=1):
        setParam("Method", method)
        T = self.T
        m = self.__LP_sol(self.T, self.alphas)
        self.m = m
        self.duals = m.PI[0: T]
        
        self.sorted = {t: [(a, b) for a in range(T) for b in range(T) if a + b <= t] for t in range(T)}
        v = {}
        for t in range(T - 1, -1, -1):
            advantage = {(a, b): 0 for a in range(T) for b in range(T) if a + b <= t}
            for a in range(T):
                for b in range(T):
                    if a + b <= t:
                        if t == T - 1:
                            r_pull = self.reward((a, b), 1) - self.duals[t]
                            r_idle = 0
                        else:
                            r_pull = self.reward((a, b), 1) - self.duals[t] + \
                                     self.p((a, b), 1, (a + 1, b)) * v[t + 1, (a + 1, b)] + \
                                     self.p((a, b), 1, (a, b + 1)) * v[t + 1, (a, b + 1)]
                            r_idle = v[t + 1, (a, b)]
                        advantage[(a, b)] = r_pull - r_idle
                        if r_pull < r_idle:
                            v[(t, (a, b))] = r_idle
                            continue
                        v[(t, (a, b))] = r_pull
            self.sorted[t].sort(reverse=True, key=lambda x: advantage[x])
        
        self.z[(0, (0, 0))] = 1
        self.pull_reward = 0
        for t in range(T):
            resource = self.alphas[t]
            for a, b in self.sorted[t]:
                self.x[(t, (a, b))] = min(self.z[(t, (a, b))], resource)
                self.pull_reward += self.x[(t, (a, b))] * self.reward((a, b), 1)
                self.y[(t, (a, b))] = self.z[(t, (a, b))] - self.x[(t, (a, b))]
                resource = resource - self.x[(t, (a, b))]
            if t == T - 1:
                continue
            for a, b in self.sorted[t + 1]:
                self.z[(t + 1, (a, b))] = (self.y[(t, (a, b))] if a + b <= t else 0) + \
                                          a / (a + b + 1) * (self.x[(t, (a - 1, b))] if a >= 1 else 0) + \
                                          b / (a + b + 1) * (self.x[(t, (a, b - 1))] if b >= 1 else 0)
    
    def calculate_occupation_measure_and_classify_state(self, epsilon=10**(-6)):
        self.__solve()
        m, sol, T = self.m, dict(), self.T

        for key in self.z:
            if self.x[key] > epsilon and self.y[key] < epsilon:
                self.A.add(key)
                continue
            if self.x[key] > epsilon and self.y[key] > epsilon:
                self.B.add(key)
                continue
            if self.x[key] < epsilon and self.y[key] > epsilon:
                self.C.add(key)
                continue
        return
    
    def check_degeneracy(self):
        if sorted([key[0] for key in self.B]) == [i for i in range(self.T)] and \
                    abs(self.objVal - self.pull_reward) < 10**(-13):
            print(f"""
            Model objective: {self.pull_reward}, different from {self.objVal} (solution from LP) less than 10e-15.
            Model is non-degenerate.
            """)
        else:
            raise Exception("Model is degenerate.\n")

    def calculate_diffusion_index(self, epsilon=10**(-8)):
        self.calculate_occupation_measure_and_classify_state()
        x, y, z, T = self.x, self.y, self.z, self.T
        v = {(t, (a, b)): 0 for t in range(T + 1) for a in range(t + 1) for b in range(t + 1) if a + b <= t}
        diffusion_index = {(t, (a, b)): 0 for t in range(T) for a in range(t + 1) for b in range(t + 1) if a + b <= t}

        for t in range(T - 1, -1, -1):
            state_t = [(a, b) for a in range(t + 1) for b in range(t + 1) if a + b <= t]
            for s in state_t:
                s_w = (s[0] + 1, s[1]) # state after win
                s_l = (s[0], s[1] + 1) # state after loss
                diffusion_index[(t, s)] = self.reward(s, 1) + self.p(s, 1, s_w)*v[(t + 1, s_w)] \
                                          + self.p(s, 1, s_l)*v[(t + 1, s_l)] \
                                          - self.reward(s, 0) - self.p(s, 0, s)*v[(t + 1, s)]
            
            l_y_g_0 = [(diffusion_index[(t, s)], s) for s in state_t if self.x[(t, s)] > epsilon]

            _, sbar = min(l_y_g_0)
            for s in state_t:
                if (t, s) in self.A:
                    v[(t, s)]= diffusion_index[(t, s)] - diffusion_index[(t, sbar)] + v[(t + 1, s)]
                else:
                    v[(t, s)] = self.reward(s, 0) + v[(t + 1, s)]
        self.diffusion_index = diffusion_index

In [3]:
from multiprocessing import Pool

class State():
    count = {}
    number = None
    t = None

    def __init__(self, t):
            self.count = {(a, b): 0 for a in range(t + 1) \
                            for b in range(t + 1) if a + b <= t}
            self.number = 0
            self.t = t


def initialize(N):
    state = State(t=0)
    state.number = N
    state.count = {(0, 0): N}
    return state

def model_paras(model):
    alphas = model.alphas
    A = model.A
    B = model.B
    C = model.C
    diffusion_index = model.diffusion_index
    T = model.T
    paras = (alphas, A, B, C, diffusion_index, T)
    return paras
    

def pull(state, paras, use_A_B_C=True):
    alphas, A, B, C, diffusion_index, T = paras[0], paras[1], paras[2], paras[3], paras[4], paras[5]
    t = state.t
    
    budget = int(alphas[t] * state.number)
    pull_state = {s: 0 for s in state.count}
    
    if use_A_B_C:
        state_A = [s for s in state.count if (t, s) in A]
        state_B = [s for s in state.count if (t, s) in B]
        state_C = [s for s in state.count if (t, s) in C]

        state_A.sort(key=lambda s: diffusion_index[(t, s)], 
                     reverse=True)
        state_B.sort(key=lambda s: diffusion_index[(t, s)], 
                     reverse=True)
        state_C.sort(key=lambda s: diffusion_index[(t, s)], 
                     reverse=True)

        for H in [state_A, state_B, state_C]:
            for s in H:
                if state.count[s] <= budget:
                    pull_state[s] = state.count[s]
                    budget -= state.count[s]
                else:
                    pull_state[s] = budget
                    budget = 0
        return pull_state
    
    state_now = [s for s in state.count]
    state_now.sort(key=lambda state: diffusion_index[(t, state)], reverse=True)
    for s in state_now:
        if state.count[s] <= budget:
            pull_state[s] = state.count[s]
            budget -= state.count[s]
        else:
            pull_state[s] = budget
            budget = 0
    return pull_state


def proceed(state, pull_state):
    next_state = State(state.t + 1)
    next_state.number = state.number
    for s in pull_state:
            a, b = s[0], s[1]
            pull_s = pull_state[s]
            total_s = state.count[s]
            theta = (a + 1) / (a + b + 2)
            success = np.random.binomial(pull_s, theta)
            failure = pull_s - success
            idle = total_s - pull_s
            next_state.count[(a + 1, b)] += success
            next_state.count[(a, b)] += idle
            next_state.count[(a, b + 1)] += failure
    return next_state


def reward_generate(pull_state):
    tmp = [pull_state[s] * (s[0] + 1)/(s[0] + s[1] + 2) for s in pull_state]
    return np.sum(tmp)

def simulate(N, paras, use_A_B_C=True):
    alphas, A, B, C, diffusion_index, T = paras[0], paras[1], paras[2], paras[3], paras[4], paras[5]
    state = initialize(N)
    total_reward = 0
    for _ in range(T):
        pull_state = pull(state, paras, use_A_B_C=use_A_B_C)
        total_reward += reward_generate(pull_state)
        state = proceed(state, pull_state)
    return total_reward

def batch_simulate(N, paras, use_A_B_C, rounds):
    start = time.time()
    rewards = [simulate(N, paras, use_A_B_C) for _ in range(rounds)]
    end = time.time()
    mean = np.mean(rewards)
    std = np.std(rewards)
    comp_time = end - start
    return N, rounds, mean, std, comp_time

def wrapper(args):
    return batch_simulate(*args)

def parallel_simulation(N, model, n_proc, use_A_B_C, rounds):
    args = (N, model_paras(model), use_A_B_C)
    with Pool(n_proc) as p:
        res = p.map(wrapper, [(*args, rounds) for _ in range(n_proc)])
    m = rounds * n_proc
    mean = np.mean([item[2] for item in res])
    std = np.sqrt(np.sum([rounds * item[3] ** 2 for item in res])) / (n_proc * rounds)
    comp_time = np.max([item[-1] for item in res])
    opt_gap = model.pull_reward * N - mean
    return opt_gap, N, m, mean, std, comp_time

def fluid_priority_simulation(N, model, use_A_B_C, rounds=1000):
    n_proc = int(N * 50 / rounds) + 1
    return parallel_simulation(N, model, n_proc, use_A_B_C, rounds)
    

def DI_simulate(model, number_of_arm=12, rounds=1000, use_A_B_C=True):
    np.random.seed(101)
    start = time.time()
    rewards = [simulate(N=number_of_arm, model=model, use_A_B_C=use_A_B_C) for _ in range(rounds)]
    end = time.time()
    mean = np.mean(rewards)
    std = np.std(rewards)
    comp_time = end - start
    print(f'mean of reward:{mean}\n'
          f'std of reward: {std}\n'
          f'std of mean: {std/np.sqrt(rounds)}\n'
          f'computation time: {comp_time} seconds')
    return mean, std / np.sqrt(rounds), model.m.objVal * number_of_arm - mean


In [6]:
import pandas as pd
import os


def create_file(model, file_name, start=30, step=30, end=750):
    if os.path.exists(file_name):
        return
    
    
    df = pd.DataFrame(index=["opt-gap", "N", "M", "expect-reward", "std", "time"])
    df.to_csv(file_name, index=True)
    for N in range(start, end, step):
        # res = [opt_gap, N, m, mean, std, comp_time]
        res = fluid_priority_simulation(N, model, use_A_B_C=False, rounds=1000)
        df[N] = res
        df.to_csv(file_name, index=True)
        print(f"N: {N} finished.")
    return
    
    

In [7]:
import matplotlib.pyplot as plt
number_of_arms = []
horizon = 20
gaps, stds = [], []
model = fluid_model(T=horizon, alphas=[1/3]*horizon)
model.calculate_diffusion_index()
model.check_degeneracy()

create_file(model, f"fp-{horizon}")

Parameter Method unchanged
   Value: 1  Min: -1  Max: 5  Default: -1
Obj: 4.81431

            Model objective: 4.814311826391183, different from 4.814311826391183 (solution from LP) less than 10e-15.
            Model is non-degenerate.
            
N: 30 finished.
N: 60 finished.
N: 90 finished.
N: 120 finished.


Process ForkPoolWorker-25:
Process ForkPoolWorker-22:
Process ForkPoolWorker-19:
Process ForkPoolWorker-26:
Process ForkPoolWorker-23:
Process ForkPoolWorker-24:
Traceback (most recent call last):
Traceback (most recent call last):
Traceback (most recent call last):
Traceback (most recent call last):
Traceback (most recent call last):
  File "/opt/anaconda3/lib/python3.7/multiprocessing/process.py", line 297, in _bootstrap
    self.run()
  File "/opt/anaconda3/lib/python3.7/multiprocessing/process.py", line 297, in _bootstrap
    self.run()
  File "/opt/anaconda3/lib/python3.7/multiprocessing/process.py", line 297, in _bootstrap
    self.run()
  File "/opt/anaconda3/lib/python3.7/multiprocessing/process.py", line 297, in _bootstrap
    self.run()
  File "/opt/anaconda3/lib/python3.7/multiprocessing/process.py", line 99, in run
    self._target(*self._args, **self._kwargs)
  File "/opt/anaconda3/lib/python3.7/multiprocessing/process.py", line 297, in _bootstrap
    self.run()
  File "/o

KeyboardInterrupt: 