# Summary

Let $q$ denote the probability that $a < t$.  Then $q = \frac{t - x_{min}}{x_{max} - x_{min}}$ .  Note that $1 - q = \frac{x_{max} - t}{x_{max} - x_{min}}$

The probability that an agent will attend the bar should be

$$
p = \begin{cases}
 1 && (C_1 = C_2) \wedge (C_1 < 60)  && \text{all predictions equal} \\
 1 && (t < x_{min}) \wedge (C_1 < C_2) && \text{threshold outside window}\\
 1 && (t > x_{max}) \wedge (C_1 > C_2) && \text{threshold outside window}\\
 0 && (C_1 = C_2) \wedge (C_1 > 60) && \text{all predictions equal} \\
 0 && (t < x_{min}) \wedge (C_1 < C_2) && \text{threshold outside window}\\
 0 && (t > x_{max}) \wedge (C_1 < C_2) && \text{threshold outside window}\\
q && (C_2 = C_3 = C_4) \wedge (C_1 > C_2) && \text{error function flat}\\
1 - q && (C_2 = C_3 = C_4) \wedge (C_1 < C_2) &&\text{error function flat}\\
1 - q^s && t < r_\ell \wedge (C_1 < C_2)  &&\text{no symmetric region}\\
q^s && t < r_\ell \wedge (C_1 > C_2) &&\text{no symmetric region}\\
1 - (1 - q)^s && t > r_r \wedge C_1 > C_2 &&\text{no symmetric region}\\
(1 - q)^s&& t > r_r \wedge C_1 < C_2 &&\text{no symmetric region} \\
1 - \frac{1}{2} (1 - 2\frac{|t - a_0|}{x_{max} - x_{min}})^s - \frac{1}{2} \big( \frac{r_\ell - x_{min}} {x_{max} - x_{min}} \big)^s && 
    (x_{min} \leq r_\ell \leq t < a_0) \wedge (C_1 < C_2) && \text{left asym, bottom goes}\\
1 - \frac{1}{2} (1 - 2\frac{|t - a_0|}{x_{max} - x_{min}})^s + \frac{1}{2} \big( \frac{r_\ell - x_{min}} {x_{max} - x_{min}} \big)^s && 
    (x_{min} \leq r_\ell \leq t) \wedge (t > a_0) \wedge (C_1 > C_2) && \text{left asym, bottom and left go} \\ 
\frac{1}{2} (1 - 2\frac{|t - a_0|}{x_{max} - x_{min}})^s - \frac{1}{2} \big( \frac{r_\ell - x_{min}} {x_{max} - x_{min}} \big)^s && 
    (x_{min} \leq r_\ell \leq t) \wedge (t > a_0) \wedge (C_1 < C_2) && \text{left asym, only symmetric goes} \\
\frac{1}{2} (1 - 2\frac{|t - a_0|}{x_{max} - x_{min}})^s + \frac{1}{2} \big( \frac{r_\ell - x_{min}} {x_{max} - x_{min}} \big)^s && 
    (x_{min} \leq r_\ell \leq t < a_0) \wedge (C_1 > C_2) && \text{left asym, left goes} \\   
1 - \frac{1}{2} (1 - 2\frac{|t - a_0|}{x_{max} - x_{min}})^s - \frac{1}{2} \big( \frac{ x_{max} - r_r} {x_{max} - x_{min}} \big)^s && 
    (a_0 < t \leq r_r < x_{max}) \wedge (C_1 > C_2) && \text{right asym, bottom goes} \\
1 - \frac{1}{2} (1 - 2\frac{|t - a_0|}{x_{max} - x_{min}})^s + \frac{1}{2} \big( \frac{ x_{max} - r_r} {x_{max} - x_{min}} \big)^s && 
    (t \leq r_r < x_{max}) \wedge (t < a_0) \wedge (C_1 < C_2) && \text{right asym, bottom, right go} \\
\frac{1}{2} (1 - 2\frac{|t - a_0|}{x_{max} - x_{min}})^s - \frac{1}{2} \big( \frac{x_{max} - r_r} {x_{max} - x_{min}} \big)^s && 
    (a_0 < t \leq r_r < x_{max}) \wedge (C_1 > C_2) && \text{right asym, only symmetric goes} \\
\frac{1}{2} (1 - 2\frac{|t - a_0|}{x_{max} - x_{min}})^s + \frac{1}{2} \big( \frac{x_{max} - r_r} {x_{max} - x_{min}} \big)^s && 
    (t \leq r_r < x_{max}) \wedge (t < a_0) \wedge (C_1 < C_2) && \text{right asym, right goes} \\
\end{cases}
$$

In [8]:
import numpy as np
from matplotlib import pyplot as plt
from scipy.stats import binom


def err(hist, a):
    [c_4, c_3, c_2, c_1] = hist
    return (c_3 - c_1 + (c_2 - c_3) * a)**2 + (c_4 - c_2 + (c_3 - c_4) * a)**2


def pred(hist, a):
    [c_4, c_3, c_2, c_1] = hist
    return a * c_1 + (1 - a) * c_2


def plot_errs(hist):
    x = np.arange(-1, 1.01, 0.01)
    y = err(hist, x)
    going = pred(hist, x)

    mask = np.where(going < 60)
    plt.plot(x[mask],y[mask], color="blue")

    mask = np.where(going >= 60)
    plt.plot(x[mask],y[mask], color="orange")

    plt.xlabel("a")
    plt.ylabel("Error")
    plt.legend(["going", "not going"])
    _ = plt.title(hist)
    
    
def binom_p_value(k, n, p):
    if k < n * p:
        return binom.cdf(k, n, p)
    else:
        return binom.sf(k, n, p)

    
def assert_equal(a, b, msg):
    assert (np.abs(a - b) < 1e-6).all(), msg
    

def assert_less(a, b, msg):
    assert (a - b < 1e-6).all(), msg

    
def assert_geq(a,b,msg):
    assert (a - b > -1e-6).all(), msg


class BarModel:
    def __init__(
        self,
        hist, 
        s, 
        x_min, 
        x_max
    ):
        try:
            self.predict(hist, s, x_min, x_max)
        except AssertionError as E:
            self.error = E
            print(E)
        
    def predict(self, hist, s, x_min, x_max):
        # write the errors as a quadratic eqation
        [c_4, c_3, c_2, c_1] = hist

        self.a_1 = (c_2 - c_3)**2 + (c_3 - c_4)**2
        self.a_2 = 2 * ((c_2 - c_3) * (c_3 - c_1) + (c_3 - c_4) * (c_4 - c_2))
        self.a_3 = (c_3 - c_1)**2 + (c_4 - c_2)**2

        # check work
        x = np.arange(-1, 1.01, 0.01)
        y = err(hist, x)
        y_2 = (self.a_1 * x**2 + self.a_2 * x + self.a_3)

        assert_equal( (y_2 - y).max(), 0, "Check your coefficients!")
        
        # check for degenerate  case
        # when prediction is a constant
        # equal to c_1
        if c_1 == c_2:
            self.p_go = 1 * (c_1 < 60)
            self.case = "0"
            return
        
        # threshold
        self.t = (60 - c_2) / (c_1 - c_2)

        # whether or not prediction is increasing
        # determines which regions of the map are going
        # this inequality follows from the predictor
        # for time t
        self.increasing = c_1 > c_2
        
        # threshold lower than entire window
        if self.t < x_min:
            self.p_go = 1 - self.increasing
            self.case = f"1_{self.increasing}"
            return
        
        # threshold above entire window
        if self.t >= x_max:
            self.p_go = 1 * self.increasing
            self.case = f"2_{self.increasing}"
            return
        
        # when err function is a constant
        # when a_1 is 0, a_2 should also be 0
        # so this is convenient
        if self.a_1 == 0:
            assert self.a_2 == 0
            
            if self.increasing:
                self.p_go = (self.t - x_min) / (x_max - x_min)
            else:
                self.p_go = (x_max - self.t) / (x_max - x_min)

            self.case = f"-1_{self.increasing}"
            return
            
        # optimal A is min of error curve
        self.opt_a = -self.a_2 / (2*self.a_1)

        assert_geq (y.min(), err(hist, self.opt_a), "Opt_a isn't minimum!")

        
        # is the bottom part going
        inc_going = self.increasing and self.t > self.opt_a
        dec_going = not self.increasing and self.t <= self.opt_a
        self.bottom_part_going = inc_going or dec_going
        
        # find coordinates of bottom region
        # reflection of threshold about *opt_a*
        self.t_ref  = 2 * self.opt_a - self.t

        assert_equal(self.opt_a - self.t_ref, self.t - self.opt_a, "reflected thresholds not symmetrical")
        
        # to get the bottom part we need the left-hand
        # and right-hand sides of the threshold
        self.t_l = min(self.t, self.t_ref)
        self.t_r = max(self.t, self.t_ref)

        assert_equal(self.opt_a - self.t_l, self.t_r - self.opt_a, "right and left thresholds not symmetrical")
        
        # reflection of LHS of window across threshold
        # this is the righthand boundary of the symmetric region
        self.ref_r = 2 * self.opt_a - x_min

        assert_equal(self.opt_a - x_min, self.ref_r - self.opt_a, "ref_r not symmetrical")

        # reflection of RHS of window across threshold
        # this is the lefthand boundary of the symmetric region
        self.ref_l = 2 * self.opt_a - x_max

        assert_equal(x_max - self.opt_a, self.opt_a - self.ref_l, "ref_l not symmetrical")

        
        # For the remaining cases we can now
        # assume that threshold is inside of
        # the window

        # threshold to the left of all points other 
        # interesting things.
        # agents will always pick a > t
        # when possible
        if self.t < self.ref_l:
            # the probability that an arbitrary a > t
            self.p_optimal_region = (x_max - self.t) / (x_max - x_min)
            self.p_any_optimal_region = 1 - (1 - self.p_optimal_region)**s
            self.p_go = self.p_any_optimal_region * (1 - self.increasing) + (1 - self.p_any_optimal_region) * self.increasing
            self.case = f"3_{self.increasing}"
            return
        
        # threshold within window
        # and asymmetric region on the left
        # (or perfectly symmetric)
        elif self.t >= self.ref_l and self.ref_l >= x_min:
            assert_geq( self.ref_r,  x_max, "ref_r < x_max")
            assert_less( self.opt_a, x_max, "opt_a >= x_max")
            
            # the probability that a is in the bottom region
            self.p_optimal_region = (self.t_r - self.t_l) / (x_max - x_min)
            self.p_any_optimal_region = 1 - (1 - self.p_optimal_region) ** s
            self.p_go_optimal = self.p_any_optimal_region * self.bottom_part_going
            
            # the probability that a is in the lefthand asymmetric region
            self.p_left_side = (self.ref_l - x_min) / (x_max - x_min)
            self.p_all_left_side = self.p_left_side ** s
            self.p_go_left_side = self.p_all_left_side * self.increasing
            
            self.p_sym = 1 - self.p_any_optimal_region - self.p_all_left_side
            assert_geq( self.p_sym, 0, "Negative probability.  Oh no!")
            self.p_go_sym = 0.5 * self.p_sym
            
            self.p_go = self.p_go_optimal + self.p_go_left_side + self.p_go_sym
            
            self.case = f"4_{self.increasing}"           
            if self.p_go_optimal > 0 and self.p_go_left_side == 0:
                assert_equal(self.p_go, 1 - 0.5 * (1 - 2* np.abs(self.t - self.opt_a) / (x_max - x_min))**s - 0.5 * ((self.ref_l - x_min)/(x_max - x_min))**s, "error in case 4.1")
            elif self.p_go_optimal > 0 and self.p_go_left_side > 0:
                assert_equal(self.p_go, 1 - 0.5 * (1 - 2* np.abs(self.t - self.opt_a) / (x_max - x_min))**s + 0.5 * ((self.ref_l - x_min)/(x_max - x_min))**s, "error in case 4.2" )
            elif self.p_go_optimal == 0 and self.p_go_left_side == 0:
                assert_equal(self.p_go, 0.5 * (1 - 2* np.abs(self.t - self.opt_a) / (x_max - x_min))**s - 0.5 * ((self.ref_l - x_min)/(x_max - x_min))**s, "error in case 4.3" )    
            elif self.p_go_optimal == 0 and self.p_go_left_side > 0:
                assert_equal(self.p_go, 0.5 * (1 - 2* np.abs(self.t - self.opt_a) / (x_max - x_min))**s + 0.5 * ((self.ref_l - x_min)/(x_max - x_min))**s, "error in case 4.4" )  
            else:
                assert False, "Extra case!"
            return
        
        # threshold within window
        # and asymmetric region on the right
        elif self.t < self.ref_r and self.ref_r < x_max:
            assert_less( self.ref_l, x_min, "ref_r >= x_min")
            assert_geq( self.opt_a, x_min, "opt_a < x_min")
            
            # the probability that a is in the bottom region
            self.p_optimal_region = (self.t_r - self.t_l) / (x_max - x_min)
            self.p_any_optimal_region = 1 - (1 - self.p_optimal_region) ** s
            self.p_go_optimal = self.p_any_optimal_region * self.bottom_part_going
            
            # the probability that a is in the righthand asymmetric region
            self.p_right_side = (x_max - self.ref_r) / (x_max - x_min)
            self.p_all_right_side = self.p_right_side ** s
            self.p_go_right_side = self.p_all_right_side * (1 - self.increasing)
            
            self.p_sym = 1 - self.p_any_optimal_region - self.p_all_right_side
            assert_geq( self.p_sym, 0, "Negative probability.  Oh no!")
            self.p_go_sym = 0.5 * self.p_sym
            
            self.p_go = self.p_go_optimal + self.p_go_right_side + self.p_go_sym
            self.case = f"5_{self.increasing}"
            if self.p_go_optimal > 0 and self.p_go_right_side == 0:
                assert_equal(self.p_go, 1 - 0.5 * (1 - 2* np.abs(self.t - self.opt_a) / (x_max - x_min))**s - 0.5 * ((x_max - self.ref_r)/(x_max - x_min))**s, "error in case 5.1")
            elif self.p_go_optimal > 0 and self.p_go_right_side > 0:
                assert_equal(self.p_go, 1 - 0.5 * (1 - 2* np.abs(self.t - self.opt_a) / (x_max - x_min))**s + 0.5 * ((x_max - self.ref_r)/(x_max - x_min))**s, "error in case 5.2")    
            elif self.p_go_optimal == 0 and self.p_go_right_side == 0:
                assert_equal(self.p_go, 0.5 * (1 - 2* np.abs(self.t - self.opt_a) / (x_max - x_min))**s - 0.5 * ((x_max - self.ref_r)/(x_max - x_min))**s, "error in case 5.3")        
            elif self.p_go_optimal == 0 and self.p_go_right_side > 0:
                assert_equal(self.p_go, 0.5 * (1 - 2* np.abs(self.t - self.opt_a) / (x_max - x_min))**s + 0.5 * ((x_max - self.ref_r)/(x_max - x_min))**s, "error in case 5.4")
            return
        
        # threshold within window
        # but to the right of all points other 
        # interesting things.
        # agents will always pick a <= t
        # when possible
        elif self.t >= self.ref_r:
            self.p_optimal_region = (self.t - x_min) / (x_max - x_min)
            self.p_any_optimal_region = 1 - (1 - self.p_optimal_region)**s
            self.p_go = self.p_any_optimal_region * self.increasing + (1 - self.p_any_optimal_region) * (1 - self.increasing) 
            self.case = f"6_{self.increasing}"
            return
        
        else:
            assert False, "Missing case!"


def make_strategies(rng, strategies, memory):
    # weights should sum to 1
    # essentially, we are partitioning the [0,1] interval
    # and taking the size of each sub-interval
    # TODO: add negative weights?
    w = rng.uniform(-1, 1, size=(strategies, memory-1))
    w.sort(axis=1)
    offsets = np.hstack([w[:, :], np.ones(shape=(strategies,1))])
    return offsets - np.hstack([np.zeros(shape=(strategies,1)), w[:, :]])

def run_test(
    start,
    agents,
    strategies,
    trials,
    seed
):
    # number of weeks back in predictor function
    # AND number of weeks back to look when selecting a predictor
    memory = 2
    # number of rounds to run the simulation
    n_iter = 1
    threshold = 60
    
    x_min = -1
    x_max = 1
    
    model = BarModel(start, strategies, x_min, x_max)
    p = model.p_go
    print(f"Case: {model.case}")
    print(f"We expect agents to attend with probability {p}.")
    print(f"Total weekly attendance should have mean {agents * p} and variance {agents * p * (1-p)}")
    
    results = np.empty(trials)

    rng = np.random.default_rng(seed)

    for tr in range(trials):
        # each row is a strategy
        strats = [
            make_strategies(rng, strategies, memory) for _ in range(agents)
        ]

        # weekly attendance count
        # the first 2*memory weeks are randomly generated
        # to seed the strategies
        hist = np.hstack([start, np.zeros(n_iter)]).astype(int)

        # index of week
        # we need some starting history to begin making selections
        t = memory * 2

        # Record the index of the optimal strategy 
        # on each iteration.
        # each row corresponds to an agent
        # each column corresponds to a week
        best_strats = np.zeros((agents, len(hist)))

        # record each agent's prediction on each iteration
        pred_history = np.zeros((agents, len(hist)))

        # construct time windows for evaluating strategies

        # The columns begin at
        # t - m - 1 
        # t - m
        # ...
        # t - 1
        # t
        # as you go down the column you are looking back
        # to that week's history.
        # so the column beginning at *t - m - 1*
        # generates the prediction for week *t - m*
        # and the rightmost column generates a
        # prediction for next week.
        windows = np.vstack([
            hist[t-memory-i-1: t-i]
            for i in range(memory)
        ])

        for agent in range(agents):
            strat = strats[agent]
            # each row is a strategy
            # each column is predicted attendance
            # in increasing order.
            # the last column has the prediction for
            # next week
            predictions = strat.dot(windows)

            # these are the observations that we use to
            # to test our predictions.
            # note that the observation from column 0
            # is not used since its prediction would come
            # from a previous week's history.
            observations = windows[0, 1:]

            # calculate the absolute error of predictions
            # here, we discard the rightmost prediction as
            # this is the prediction for the future.
            # if we knew the correct answer for that,
            # we wouldn't need to predict it!
            errs = np.abs(predictions[:, :-1] - observations).sum(axis=1)

            best_strat = np.argmin(errs)
            best_strats[agent, t] = best_strat

            pred = strat[best_strat].dot(windows[:,-1])
            pred_history[agent, t] = pred

        hist[t] = (pred_history[:, t] < threshold).sum()
        results[tr] = hist[t]

    plt.figure()
    plt.hist(results)

    print(f"Average attendance: {results.mean()}")
    print(f"Variance of attendance: {results.var()}")
    
    # individual trials have attendance distributed
    # binom(agents, p)
    
    # attendance of the full experiment should be distributed
    # binom(trials * agents, p)
    
    n = trials * agents
    print(f"P-value: {binom_p_value(results.sum(), n, p)}")

In [9]:
# save an example of each case for testing
cases = dict()
strategies = 2
rng = np.random.default_rng(27)
x_min = -1
x_max = 1

n_iter = 0
while n_iter < 10000 or len(cases) < 15:
    n_iter += 1
    start = rng.choice(100, size=(4,))
    model = BarModel(start, strategies, x_min, x_max)
    
    if model.case not in cases:
        cases[model.case] = start
    
cases

{'2_True': array([ 0, 69, 21, 31]),
 '4_False': array([98, 12, 59, 32]),
 '5_True': array([43, 93, 19, 78]),
 '2_False': array([31, 29, 98, 94]),
 '5_False': array([29, 40, 64, 18]),
 '6_True': array([39, 86,  0, 90]),
 '3_False': array([45, 75, 65, 26]),
 '3_True': array([16, 11, 74, 89]),
 '1_False': array([24, 97, 22,  1]),
 '1_True': array([72,  0, 90, 99]),
 '0': array([ 7, 83, 95, 95]),
 '6_False': array([77, 68, 90, 29]),
 '4_True': array([37,  0, 51, 65]),
 '-1_True': array([ 9,  9,  9, 77]),
 '-1_False': array([71, 71, 71,  4])}