# APPENDIX

## Event.py

In [None]:
from dataclasses import dataclass
from enum import Enum, auto
from math import factorial
from matplotlib import pyplot as plt
import numpy as np
from scipy import stats
import seaborn as sns


class State(Enum):
    INCOMING = auto()
    IN_SERVICE = auto()
    SERVICED = auto()
    BLOCKED = auto()

@dataclass
class Event:
    state: State
    arrival_time: int
    departure_time: int


class EventList:

    events: 'list[Event]'
    in_service: 'list[Event]'

    def __init__(
        self, arrival_time_distribution: stats.rv_continuous, 
        service_time_distribution: stats.rv_continuous, 
        number_of_events
    ):
        self._arr_dist = arrival_time_distribution
        self._serv_dist = service_time_distribution
        self.events = self.generate_events(number_of_events)
        self.time_line = {}
        self.in_service = []
    
    @property
    def states(self):
        return [event.state for event in self.events]
    
    def update_in_service(self, time):
        for event in self.in_service:
            if event.departure_time <= time:
                event.state = State.SERVICED
        
        self.in_service = [event for event in self.events if event.state == State.IN_SERVICE]

    def generate_events(self, number_of_events: int) -> 'list[Event]':
        arr_times = self._arr_dist.rvs(size=number_of_events)
        arr_times = np.cumsum(arr_times)

        serv_times = self._serv_dist.rvs(size=number_of_events)
        dep_times = arr_times + serv_times
        return [Event(State.INCOMING, arr, dep) for arr, dep in zip(arr_times, dep_times)]

    def update_timeline(self, time):
        self.time_line[time] = self.states
    
    def __str__(self):
        str = ''
        for iter, (time, states) in enumerate(self.time_line.items()):
            if iter < len(self.events) - 10:
                continue
            str += f'TIME: {time:.2f}\n'
            for i, state in enumerate(states):
                if state != State.INCOMING:
                    str += f'Obs. {i}: {state.name}\n'
            str += '\n'
        return str

class BlockingEventSimulation:

    def __init__(
            self, arrival_time_distribution: stats.rv_continuous, 
            service_time_distribution: stats.rv_continuous
    ):
        self.arrival_dist = arrival_time_distribution
        self.service_dist = service_time_distribution

    def simulate(self, max_events: int, service_units: int):
        blocked_count = 0
        event_list = EventList(self.arrival_dist, self.service_dist, max_events)
        for event in event_list.events:
            time = event.arrival_time
            event_list.update_in_service(time)
            if len(event_list.in_service) < service_units:
                event.state = State.IN_SERVICE
                event_list.update_in_service(time)
            else:
                event.state = State.BLOCKED
                blocked_count += 1

    
            event_list.update_timeline(time)
        
        return blocked_count / max_events


def calculate_theoretical_block_pct(m, a):
    return (a**10/factorial(m))/ sum([a**i / factorial(int(i)) for i in range(m+1)])

if __name__ == '__main__':
    pass

## gen.py

In [None]:
import itertools
from re import I
from typing import Callable, Protocol
import numpy as np
from scipy.stats import uniform

def lcg(a, c, M, n, x=0):
    for _ in range(int(n)):
        x = (a*x + c) % M
        yield x / M

def geometric(p, size):
    u = uniform.rvs(size=size)
    return np.log(u) // np.log(1-p) + 1

def exponential(lmbda, size):
    u = uniform.rvs(size=size)
    return - np.log(u) / lmbda

def pareto(k , beta, size, loc=0):
    u = uniform.rvs(size=size)
    return beta*(u**(-1/k) - loc)

def norm_box_mueller(size):
    u1 = uniform.rvs(size=size)
    r = np.sqrt(-2*np.log(u1))
    return r*sin_cos(size)

def sin_cos(size):
    sin, cos = [], []
    while len(cos) < size / 2:
        v1, v2 = uniform.rvs(loc=-1, scale=2, size=2)
        r2 = v1**2 + v2**2
        if r2 <= 1:
            cos.append(v1/np.sqrt(r2))
            sin.append(v2/np.sqrt(r2))
    
    return np.array(sin + cos)


def discrete_crude(p, size):
    u =  uniform.rvs(size=size)
    probs = np.concatenate([[0], np.cumsum(p), [np.inf]], axis=0)
    x = np.zeros_like(u)
    for i, p in enumerate(probs):
        if 0 < i:
            x += ((probs[i-1] < u) & (u <= p)) * i

    return x

def discrete_rejection(p, size):
    c = max(p)
    k = len(p)
    I = []
    while len(I) < size:
        u1, u2 = uniform.rvs(size=2)
        i = int(np.floor(k*u1)) +1
        if u2 <= p[i-1]/c:
            I.append(i)
    
    return I

def discrete_alias(p, size):
    k = len(p)
    p = np.array(p)
    L = list(range(1,k+1))
    F = k*p
    G, S = np.where(F>=1)[0] + 1, np.where(F<=1)[0] + 1
    while len(S) > 0:
        i, j = int(G[0]), int(S[0])
        L[j-1] = i
        F[i-1] = F[i-1] - (1-F[j-1])
        if F[i-1] < 1:
            G = np.delete(G, 0)
            print(S)
            S = np.append(S, i)
            print(S)
        S = np.delete(S, 0)
        
    
    result = []
    while len(result) < size:
        u1, u2 = uniform.rvs(size=2)
        i = int(np.floor(k*u1)) + 1
        if u2 <= F[i-1]:
            result.append(i)
        else:
            result.append(L[i-1])
        
    return result

## mcmc.py

In [None]:
from itertools import product
from math import factorial, pi
import numpy as np
from scipy.stats import binom, uniform, multivariate_normal, norm
import random

def h_1(x, y, m=10):
    return .5 if abs(x-y) % (m-1) == 1 else 0

def step_1(x, m=10):
    dx = 1 if binom.rvs(1, .5) == 1 else -1
    return (x + dx) % (m+1)

def g_1(x, a=8, m=10):
    return a**x / factorial(x)

def mcmc_1(x0, g, h, step, a=8, m=10, size = 10_000, burn_in = 100):
    x = x0
    for i in range(burn_in):
        y = step_1(x, m)
        cond = (g(y) * h(y,x)) / (g(x)*h(x,y))
        if uniform.rvs() <= cond:
            x = y
    
    states = []
    for i in range(size):
        y = step_1(x, m)
        cond = (g(y) * h(y,x)) / (g(x)*h(x,y))
        if uniform.rvs() <= cond:
            x = y
        states.append(x)

    return states

def set_of_valid_points(m=10):
    point_in_set = lambda i,j: 0 <= i + j <= m \
        and i >= 0 \
        and j >=0
    return {(i,j) for i,j in product(range(m+1), repeat=2) if point_in_set(i,j)}

def nearby_points(x, m=10):
    for i,j in product([-1, 0, 1], repeat=2):
        if i == j:
            continue
        new_point = (x[0] + i, x[1] + j)
        if new_point in set_of_valid_points(m):
            yield new_point

def cardinal_points(x, m=10):
    for i,j in product([-1, 0, 1], repeat=2):
        if i == j:
            continue
        if i!=0 and j!=0:
            continue
        
        new_point = (x[0] + i, x[1] + j)
        if new_point in set_of_valid_points(m):
            yield new_point

def g2(x, m=10, a1=4, a2=4):
    return a1**x[0]* a2**x[1] / (factorial(x[0])*factorial(x[1]))


def h2a(x, y, m=10):
    if y[0] + y[1] > m:
        return 0
    valid_count = 0
    for p in nearby_points(x, m):
        valid_count+=1
    
    
    return 1/valid_count


def step2a(x, m):
    return random.choice([p for p in nearby_points(x, m)])
    

def h2b(x, y, m=10):
    if y[0] + y[1] > m:
        return 0
    
    valid_count = 0
    for p in cardinal_points(x, m):
        valid_count += 1

    return 1/valid_count


def step2b(x, m):
    return random.choice([p for p in cardinal_points(x, m)])



def mcmc(x0, g, h, step, a=8, m=10, size = 10_000, burn_in = 100):
    x = x0
    for _ in range(burn_in):
        y = step(x, m)
        cond = (g(y) * h(y,x)) / (g(x)*h(x,y))
        if uniform.rvs() <= cond:
            x = y
    
    states = []
    for _ in range(size):
        y = step(x, m)
        cond = (g(y) * h(y,x)) / (g(x)*h(x,y))
        if uniform.rvs() <= cond:
            x = y
        states.append(x)

    return states


def p2(i, j, m=10):
    return g2((i,j)) / sum([g2((i,j)) for i,j in set_of_valid_points(m=10)])

def get_marginal_g2(i, x, m=10):
    j = x[(i+1) % 2]
    return [p2(i,j) / sum(p2(k,j) for k in range(m-j+1)) for i in range(m-j+1)]


def gibbs2c(x0, m = 10, size=10_000, burn=100):
    x = x0
    res = []
    for iter in range(size+burn):
        for i, x_i in enumerate(x):
            dist = get_marginal_g2(i, x, m)
            x[i] = np.random.choice(len(dist), p = dist)
        if iter >= burn:
            res.append(x.copy())
        
        if iter % 1000 == 0:
            print(iter)
    
    return res


def gen_xi_gamma(size=1):
    return multivariate_normal([0, 0], np.array([[1, .5],[.5, 1]])).rvs(size=size)

def gen_theta_psi(size=1):
    return np.exp(gen_xi_gamma(size=size))

def gen_observations(size=1):
    mean, var = gen_theta_psi()
    return norm(mean, np.sqrt(var)).rvs(size=size), (mean, var)

def norm_step(x):
    dx = norm(loc = 0, scale=1e-1).rvs(2)
    return x + dx

def g3(x, obs):
    ln_pdf = 1/(2*pi*x[0]*x[1]*np.sqrt(1 - .5**2))\
        *np.exp(- (np.log(x[0])**2 - np.log(x[0])*np.log(x[1]) + np.log(x[1])**2) \
            / 2*(1-.5**2))
    return np.exp(sum(norm(loc=x[0], scale=np.sqrt(x[1])).logpdf(obs))) * ln_pdf


def mcmc_continuous(x0, obs, g, step, burn_in=100, size=10_000):
    x = x0
    for _ in range(burn_in):

        y = step(x)
        cond = (g(np.exp(y), obs) / (g(np.exp(x), obs)))
        if uniform.rvs() <= cond:
            x = y
    
    states = []
    for i in range(size):
        y = step(x)
        cond = g(np.exp(y), obs) / g(np.exp(x), obs)
        if uniform.rvs() <= cond:
            x = y
        states.append(np.exp(x))
        if i % 1000 == 0:
            print(i)

    return states
    

if __name__ == '__main__':
    obs = gen_observations(10)
    print(mcmc_continuous([0,0], obs, g3, norm_step, size=10_000))



## tests.py

In [None]:
from typing import Iterable, Tuple
import numpy as np
import seaborn as sns
import matplotlib.pyplot as plt
import scipy.stats as stats
import scipy.optimize as opt


def group_count(p: Iterable, c: int) -> np.ndarray:
    split = 1 / c
    splits = np.zeros(c)
    for n in p:
        for i in range(c):
            if n <= split*(i+1):
                splits[i] += 1
                break
    return splits
            

def chi2(obs: np.ndarray, exp: np.ndarray, df=None):
    if df is None:
        df = len(obs) - 1
    T = sum((obs - exp)**2 / exp)
    p = 1 - stats.chi2.cdf(x=T, df=df)
    return p

def group_chi_test(obs, n_groups):
    splits = group_count(obs, n_groups)
    p = chi2(splits, np.ones_like(splits)*len(obs)/n_groups)
    return p

def emperical_dist(x: float, obs: np.ndarray) -> np.ndarray:
    return 1/len(obs) * sum(obs <= x)

def kolmogorov(obs, dist=stats.uniform, range=(0,1)):
    n = len(obs)
    obj = lambda x: - emperical_dist(x, obs) + dist.cdf(x)
    d_n = opt.minimize_scalar(obj, bounds = range, method='bounded').x
    return (np.sqrt(n) + 0.12 + 0.11/np.sqrt(n))*d_n, d_n

def runtest_above_below_median(obs: np.ndarray) -> int:
    T, n1, n2 = counts_above_and_below_median(obs)
    mean = (2*n1*n2)/(n1+n2) + 1
    var = (2*n1*n2*(2*n1*n2 - n1 - n2)) / ((n1 + n2)**2 * (n1 + n2 -1))
    print(mean, var, T)
    T = (T - mean)/np.sqrt(var)
    
    return 2* (1- stats.norm.cdf(abs(T)))

def counts_above_and_below_median(obs: np.ndarray) -> Tuple[int, int, int]:
    r_a = obs > np.median(obs)
    r_b = obs < np.median(obs)
    x, count = 0, 0
    for a, b in zip(r_a, r_b) :
        if a==1 and x != 1:
            count += 1
            x = 1
        elif b==1 and x != -1:
            count += 1
            x = -1 

    return count, sum(r_a), sum(r_b)


def runtest_up_down_lengths(obs: np.ndarray) -> int:
    n = len(obs)
    r = run_lengths_increasing_count(obs)
    a = np.array(
        [
        [4529.4, 9044.9, 13568, 18091, 22615, 27892],
        [9044.9, 18097, 27139, 36187, 45234, 55789],
        [13568, 27139, 40721, 54281, 67852, 83685],
        [18091, 36187, 54281, 72414, 90470, 111580],
        [22615, 45234, 67852, 90470, 113262, 139476],
        [27892, 55789, 83685, 111580, 139476, 172860]
        ]
    )
    b = np.array([1/6, 5/24, 11/120, 19/720, 29/5040, 1/840])
    z = 1/(n-6) * ((r - n*b).T @ a @ (r - n*b))

    return 1 - stats.chi2.cdf(z, df=6)

def run_lengths_increasing_count(obs: np.ndarray) -> np.ndarray:
    prev_x = -np.inf
    r = np.zeros(6)
    count = 0
    for x in obs:
        if x <= prev_x:
            count = min(6, count)
            r[count-1] += 1
            prev_x = x
            count = 1
        else:
            prev_x = x
            count += 1
    
    count = min(6, count)
    r[count-1] += 1
    
    return r


def runtest_increase_decrease(obs):
    t = run_count_increase_decrease(obs)
    n = len(obs)
    z = (t - (2*n - 1)/3) / np.sqrt((16*n - 29)/90)
    return 2 * (1- stats.norm.cdf(abs(z)))



def run_count_increase_decrease(obs: np.ndarray):
    x, count, prev= 0, 0, obs[0]
    for y in obs[1:]:
        if x != 1 and y > prev:
            count += 1
            x = 1
        elif x != -1 and y <= prev:
            count += 1
            x = -1
        prev = y
    return count
    

def corr_est(obs, max_lag) -> np.ndarray:
    n = len(obs)
    c = np.zeros(max_lag)
    for lag in range(max_lag):
        low = obs[:n-lag-1]
        upp = obs[lag+1:]
        c[lag] = 1/(n-lag) * low @ upp

    return c

def plot_corr(obs: np.ndarray, max_lag=5, conf=0.05) -> None:
    n = len(obs)
    corr_coef = (corr_est(obs, max_lag) - 0.25)
    x = np.arange(1, len(corr_coef)+1)
    conf = stats.norm.ppf(1 - conf/2) * np.sqrt((7/(144*n)))
    plt.plot(x, corr_coef, 'ob')
    plt.vlines(x, np.zeros_like(x), corr_coef)
    plt.hlines([conf, 0, -conf], 0, max_lag+1, linestyles=['dashed', 'solid', 'dashed'])
    plt.show()
    

def all_test(obs, groups=100, lag=5, plot=True):
    p_chi = group_chi_test(obs, 100)
    T_kol = kolmogorov(obs)
    p_ab_median = runtest_above_below_median(obs)
    p_ud = runtest_up_down_lengths(obs)
    p_inc_dec = runtest_increase_decrease(obs)

    print(f'____________Uniform Distribution Tests___________')
    print(f'Chi^2 test with {groups} groups:                p={p_chi:.2f}')
    print(f'Kolmogorov Smirnof:                        T={T_kol:.2f}')
    print(f'_______________Independence Tests________________')
    print(f'Run Test 1: Above/below Median:            p={p_ab_median:.2f}')
    print(f'Run Test 2: Up/Down length count Test:     p={p_ud:.2f}')
    print(f'Run Test 3: Up/Down run count Test:        p={p_inc_dec:.2f}')
    if plot:
        plt.plot(obs[1:], obs[0:-1], '.')
        plt.show()
        plot_corr(obs)

    return p_chi, T_kol, p_ab_median, p_ud, p_inc_dec


if __name__ == '__main__':
    obs = stats.uniform.rvs(size=10_000)
    all_test(obs)

## sales.py

In [None]:
import numpy as np
from scipy import stats
import random

def gen_stations(n, min = 0, max = 200):
    stations = np.zeros((2,n))
    for i in range(n):
        stations[0][i]=random.randint(min,max)
        stations[1][i]=random.randint(min,max)
    return stations

def euclDist(a,b):
    dist=np.sqrt(np.power(b[0]-a[0],2)+np.power(b[1]-a[1],2))
    return dist

## eventBis.py

In [None]:
from dataclasses import dataclass
from enum import Enum, auto
from math import factorial
from matplotlib import pyplot as plt
import numpy as np
from scipy import stats
import seaborn as sns


class State(Enum):
    INCOMING = auto()
    IN_SERVICE = auto()
    SERVICED = auto()
    BLOCKED = auto()

@dataclass
class Event:
    state: State
    arrival_time: int
    departure_time: int


class EventList:

    events: 'list[Event]'
    in_service: 'list[Event]'

    def __init__(self, arrival_time_distribution: stats.rv_continuous, service_time_distribution: stats.rv_continuous, number_of_events):
        self._arr_dist = arrival_time_distribution
        self._serv_dist = service_time_distribution
        self.events = self.generate_events(number_of_events)
        self.time_line = {}
        self.in_service = []
    
    @property
    def states(self):
        return [event.state for event in self.events]
    
    def update_in_service(self, time):
        for event in self.in_service:
            if event.departure_time <= time:
                event.state = State.SERVICED
        
        self.in_service = [event for event in self.events if event.state == State.IN_SERVICE]

    def generate_events(self, number_of_events: int) -> 'list[Event]':
        arr_times = self._arr_dist.rvs(size=number_of_events)
        arr_times = np.cumsum(arr_times)

        serv_times = self._serv_dist.rvs(size=number_of_events)
        dep_times = arr_times + serv_times
        return [Event(State.INCOMING, arr, dep) for arr, dep in zip(arr_times, dep_times)]

    def update_timeline(self, time):
        self.time_line[time] = self.states
    
    def __str__(self):
        str = ''
        for iter, (time, states) in enumerate(self.time_line.items()):
            if iter < len(self.events) - 10:
                continue
            str += f'TIME: {time:.2f}\n'
            for i, state in enumerate(states):
                if state != State.INCOMING:
                    str += f'Obs. {i}: {state.name}\n'
            str += '\n'
        return str

class BlockingEventSimulation:

    def __init__(self, arrival_time_distribution: stats.rv_continuous, service_time_distribution: stats.rv_continuous):
        self.arrival_dist = arrival_time_distribution
        self.service_dist = service_time_distribution

    def simulate(self, max_events: int, service_units: int):
        blocked_count = 0
        event_list = EventList(self.arrival_dist, self.service_dist, max_events)
        for event in event_list.events:
            time = event.arrival_time
            event_list.update_in_service(time)
            if len(event_list.in_service) < service_units:
                event.state = State.IN_SERVICE
                event_list.update_in_service(time)
            else:
                event.state = State.BLOCKED
                blocked_count += 1

    
            event_list.update_timeline(time)
        
        return blocked_count / max_events


def calculate_theoretical_block_pct(m, a):
    return (a**10/factorial(m))/ sum([a**i / factorial(int(i)) for i in range(m+1)])

if __name__ == '__main__':
    arr = stats.expon()
    serv = stats.expon(scale=8)
    sim = BlockingEventSimulation(arr, serv)
    event, block_pct = sim.simulate(10_000, 10)
    a=8
    print((a**10/factorial(10))/ sum([a**i / factorial(int(i)) for i in range(10+1)]), block_pct)

   