<a href="https://colab.research.google.com/github/marvin-math/active_inference_multiarmed_bandit/blob/main/active_inference_agent.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [92]:
import numpy as np
import scipy
from statistics import mean
import random
#from scipy.special import psi


class BayesianBanditAgent:
    def __init__(self, num_arms, prior_mean, prior_precision, prior_alpha, prior_beta, draws, posterior_dist_mean, posterior_variance, eta, decay):
        self.num_arms = num_arms
        self.prior_mean = prior_mean
        self.prior_precision = prior_precision
        self.prior_alpha = prior_alpha
        self.prior_beta = prior_beta
        self.draws = draws
        self.posterior_dist_mean = posterior_dist_mean
        self.posterior_variance = posterior_variance
        self.posterior_mean = np.ones(num_arms)
        self.posterior_precision = np.ones(num_arms)
        self.posterior_alpha = np.ones(num_arms)
        self.posterior_beta = np.ones(num_arms)
        self.rewards = np.zeros(num_arms)
        self.observations = np.empty(num_arms, dtype=object)
        self.observations[:] = [[] for _ in range(num_arms)]
        self.obs_mean = np.zeros(num_arms)
        self.t_dof = np.zeros(num_arms)
        self.t_mean = np.zeros(num_arms)
        self.t_precision = np.zeros(num_arms)
        self.t_sd = np.zeros(num_arms)
        self.predictive_pdf = np.ones(num_arms)
        self.entropy_predictive_pdf = np.ones(num_arms)
        self.eta = eta
        self.kl_div = np.ones(num_arms)
        self.ambiguity_term = np.ones(num_arms)
        self.expected_free_energy = np.ones(num_arms)
        self.expected_free_energies = np.empty(num_arms, dtype=object)
        self.expected_free_energies[:] = [[] for _ in range(num_arms)]
        self.decay = decay

    def select_arm(self):

        # Select the arm with the highest sample
        arm = random.randint(0,self.num_arms-1) # doesnt make sense yet, just dummy function to check functionality

        self.draws[arm] += 1

        return arm

    def select_lowest_expected_free_energy_arm(self):
        min_expected_free_energy = float('inf')
        best_arm = 0

        for arm in range(self.num_arms):
            self.expected_free_energy[arm] = self.calc_expected_free_energy(arm)
            self.expected_free_energies[arm].append(self.expected_free_energy[arm])
            if self.expected_free_energy[arm] <= min_expected_free_energy: # a bit ambiguity here, but probably not relevant
                min_expected_free_energy = self.expected_free_energy[arm]
                best_arm = arm
        self.draws[best_arm] += 1

        return best_arm

    def update(self, arm, reward):
        # Update the posterior parameters for the selected arm
        self.rewards[arm] += reward
        self.observations[arm].append(reward)
        self.obs_mean[arm] = np.mean(self.observations[arm])

        self.posterior_precision[arm] = self.draws[arm] + self.prior_precision[arm]
        self.posterior_mean[arm] = (self.prior_precision[arm] * self.prior_mean[arm] + self.draws[arm] * self.obs_mean[arm]) / self.posterior_precision[arm]
        self.posterior_alpha[arm] = self.prior_alpha[arm] + 0.5 * self.draws[arm]
        self.posterior_beta[arm] = self.prior_beta[arm] + (0.5 * np.sum((self.observations[arm]-self.obs_mean[arm])**2)) + \
                                    (self.prior_precision[arm] * self.draws[arm]*(self.obs_mean[arm]-self.prior_mean[arm])**2) / (2*(self.posterior_precision[arm]+self.draws[arm]))
        self.posterior_variance[arm] = np.sqrt(np.mean(1/np.random.gamma(self.posterior_alpha[arm], 1/self.posterior_beta[arm], size = 1000))) # this is actually sd
        self.posterior_dist_mean[arm] = mean(np.random.normal(self.posterior_mean[arm], (self.posterior_variance[arm]/np.sqrt(self.posterior_precision[arm])), size= 1000))

        # update the parameters for the next iteration
        self.prior_precision[arm] = self.posterior_precision[arm]
        self.prior_mean[arm] = self.posterior_mean[arm]
        self.prior_alpha[arm] = self.posterior_alpha[arm]
        self.prior_beta[arm] = self.posterior_beta[arm]


    def posterior_predictive(self, arm):
        # Calculate the posterior predictive t-distribution parameters for the selected arm
        self.t_dof[arm] = 2*(self.posterior_alpha[arm] + 1)
        self.t_mean[arm] = self.posterior_mean[arm]
        self.t_precision[arm] = (self.posterior_beta[arm] * (1 + self.posterior_precision[arm])) / (self.posterior_precision[arm]* (1 + self.posterior_alpha[arm]))
        self.t_sd[arm] = 1/np.sqrt(self.t_precision[arm]) # scipy uses standard deviation, so we have to transform


        v = self.t_dof[arm]
        v1 = (1+v)/2
        v2 = v/2
        self.entropy_predictive_pdf[arm] = v1*(scipy.special.digamma(v1)-scipy.special.digamma(v2))+np.log(np.sqrt(v)*scipy.special.beta(v2,0.5))

        return self.entropy_predictive_pdf[arm]

    def kl_divergence(self, arm, eta):
        entropy_of_predictive = self.posterior_predictive(arm)
        self.kl_div[arm] = entropy_of_predictive - eta*self.posterior_mean[arm]
        return self.kl_div[arm]

    def ambiguity(self, arm):
        self.ambiguity_term[arm] = 0.5 * np.log(2 * np.pi * np.e) + np.log(self.posterior_beta[arm]) + scipy.special.digamma(self.posterior_alpha[arm])
        return self.ambiguity_term[arm]

    def calc_expected_free_energy(self, arm):
        self.expected_free_energy_term = self.kl_divergence(arm, self.eta) + self.ambiguity(arm)
        return self.expected_free_energy_term




In [97]:
# Define the number of arms and prior hyperparameters
num_arms = 5
mean_of_true_rewards = 5
mean_of_true_sd = 3
prior_mean = np.ones(num_arms)*5
prior_precision = np.ones(num_arms)
prior_alpha = np.ones(num_arms)
prior_beta = np.ones(num_arms)*20
draws = np.zeros(num_arms)
posterior_dist_mean = np.ones(num_arms)
posterior_variance = np.ones(num_arms)
cumulative_reward = 0
eta = 0.5
decay = 0.1

# Create the Bayesian bandit agent
agent = BayesianBanditAgent(num_arms, prior_mean, prior_precision, prior_alpha, prior_beta, draws, posterior_dist_mean, posterior_variance, eta, decay)

# Simulate the multi-armed bandit problem for a certain number of iterations
num_iterations = 5000
arm_means = np.random.normal(mean_of_true_rewards, 1.0, num_arms)  # True mean rewards for each arm
arm_sds = np.random.gamma(mean_of_true_sd, 2.0, num_arms) # true sd for each arm

for t in range(num_iterations):
    # Select an arm
    chosen_arm = agent.select_lowest_expected_free_energy_arm()


    # Simulate the reward for the chosen arm (normally distributed)
    reward = np.random.normal(arm_means[chosen_arm], arm_sds[chosen_arm])
    cumulative_reward += reward

    # Update the agent with the observed reward
    agent.update(chosen_arm, reward)
    #print(f"the chosen arm was: {chosen_arm}")
    #print(f"the reward from this arm is: {reward}")
    #print(f"the cumulative reward is: {cumulative_reward}")
    #print(f"the posterior_dist_mean is: {agent.posterior_dist_mean}")
    #print(f"the posterior_variance is: {agent.posterior_variance}")
    """    if t < 10:
      print(f"ambiguity: {agent.ambiguity_term[chosen_arm]}")
      print(f"kl divergence: {agent.kl_div[chosen_arm]}")
      print(f"beta: {agent.posterior_beta[chosen_arm]}")
      print(f"alpha: {agent.posterior_alpha[chosen_arm]}")"""


# Calculate the posterior predictive distribution for a new data point given the selected arm


#print("Posterior Predictive PDF for New Data Point:", posterior_predictive_pdf)
#print(f"the posterior means are: {agent.posterior_dist_mean}")
#print(f"the posterior variance is: {agent.posterior_variance}")
print(f"the true arm means are: {arm_means}")
print(f"the true arm sds are: {arm_sds}")
#print(f"entropy of t-distribution is: {entropy}")
#print(f"kl_divergence is: {agent.kl_divergence}")
#print(f"ambiguity is: {agent.ambiguity}")
#print(f"the expected free energy of the actions: {agent.expected_free_energy}")
print(f"draws: {agent.draws}")
for arm in range(num_arms):
  print(f"mean free energy per arm: {np.mean(agent.expected_free_energies[arm])}")
  #print(f"observation matrix: {agent.observations}")







the true arm means are: [5.75374965 4.69457226 3.9350866  4.0824162  7.02487323]
the true arm sds are: [0.83111821 3.21041257 3.52256334 4.08323752 4.89668376]
draws: [1872.  825.  716.  676.  911.]
mean free energy per arm: 22.945788854288093
mean free energy per arm: 22.95534857111754
mean free energy per arm: 22.95698471948863
mean free energy per arm: 22.96590287947884
mean free energy per arm: 22.95530351746119
