## Offline trajectory generalization

In [1]:
import numpy as np
from pathlib import Path


def UCB(theta_oracled, k):
    count = np.zeros(k)
    theta = np.zeros(k)
    action = np.zeros(horizon)
    reward = np.zeros(horizon)

    for t in range(horizon):
        # initialization
        if t < k:
            selected_arm = t
        else:
            selected_arm = np.argmax(theta + np.sqrt(2 * np.log(t) / count))

        r_i = np.random.binomial(1, theta_oracled[selected_arm]) # r_i ~ Bern(theta_oracled[selected_arm])
        count[selected_arm] += 1
        theta[selected_arm] += 1 / count[selected_arm] * (r_i - theta[selected_arm])

        action[t] = selected_arm
        reward[t] = r_i

    return action, reward


def TS(theta_oracled, k):
    count = np.zeros(k)
    theta = np.zeros(k)
    action = np.zeros(horizon)
    reward = np.zeros(horizon)

    alpha_beta = [(1, 1) for _ in range(k)]

    for t in range(horizon):
        for i in range(k):
            alpha, beta = alpha_beta[i]
            theta[i] = np.random.beta(alpha, beta) # theta[i] ~ Beta(alpha, beta)

        selected_arm = np.argmax(theta)

        r_i = np.random.binomial(1, theta_oracled[selected_arm]) # r_i ~ Bern(theta_oracled[selected_arm])
        count[selected_arm] += 1
        theta[selected_arm] += 1 / count[selected_arm] * (r_i - theta[selected_arm])
        alpha_beta[selected_arm] = (alpha_beta[selected_arm][0] + r_i, alpha_beta[selected_arm][1] + 1 - r_i)

        action[t] = selected_arm
        reward[t] = r_i

    return action, reward


n_traj: int = 100                 # number of trajectories
horizon: int = 50                 # steps per trajectory

def generate_dataset(n_traj: int, horizon: int, k) -> np.ndarray:
    """Return ndarray (n_traj, horizon, 2) containing (action, reward) pairs."""
    output_path: Path = Path(f"traj_{k}.npy")
    if k == 5:
        theta_oracled = np.array([0.2, 0.4, 0.6, 0.7, 0.8])
    elif k == 10:
        theta_oracled = np.array([0.2, 0.3, 0.4, 0.5, 0.6, 0.65, 0.7, 0.75, 0.8, 0.9])
    elif k == 20:
        theta_oracled = np.array([0.35, 0.35, 0.35, 0.35, 0.35, 0.4, 0.4, 0.4, 0.4, 0.4, 0.45, 0.5, 0.55, 0.6, 0.65, 0.7, 0.75, 0.8, 0.85, 0.9])
        # theta_oracled = np.array([0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.84, 0.85, 0.86, 0.87, 0.88, 0.89, 0.9, 0.91, 0.92, 0.93, 0.94, 0.95])
    else:
        theta_oracled = np.random.uniform(0, 1, k)


    # use UCB, TS to generate trajectories
    data = np.empty((n_traj, horizon, 2), dtype=np.float64)
    for i in range(n_traj):
        # a, r = UCB(theta_oracled, k)
        # data[i * 2, :, 0] = a
        # data[i * 2, :, 1] = r
        a, r = TS(theta_oracled, k)
        data[i, :, 0] = a
        data[i, :, 1] = r

    np.save(output_path, data)
    print(f'Saved {output_path} with shape {data.shape}')

# K = [5, 10, 20]
K = [20]
# K = [5]

for k in K:
    generate_dataset(n_traj, horizon, k)

Saved traj_20.npy with shape (100, 50, 2)
