In [None]:
import numpy as np
import matplotlib.pyplot as plt
from mpl_toolkits.mplot3d import Axes3D

import torch
import torch.nn as nn
import torch.optim as optim

import random

from tqdm import tqdm

from google.colab import drive
drive.mount('/content/drive')

SEED = 42
np.random.seed(SEED)
torch.manual_seed(SEED)

In [None]:
VERBOSE = False

In [None]:
# initial value for experiment
omega_d = 50
omega = 15
C = 15
mu = 6
alpha = 0.7
d_0 = 5
R_0 = 1

In [None]:
# steady state of mouse experiment
R = R_0
d_st = d_0

# calculate g steady state
g_st = (C - d_0 + mu * np.log(R))/alpha

g_st

In [None]:
def ode_system(R, d_val, g_val, omega_d, C, mu, alpha, omega, d0):
    d_dot = omega_d * (C + mu * np.log(R) - alpha * g_val - d_val)
    g_dot = omega * (d_val / d0 - 1)
    return [d_dot, g_dot]

In [None]:
# using Euler method
def explicit_euler(old_d_val, old_g_val, R_input, h=0.01):

    new_d_val = old_d_val + h * ode_system(R_input, old_d_val, old_g_val, omega_d, C, mu, alpha, omega, d_0)[0]
    new_g_val = old_g_val + h * ode_system(R_input, old_d_val, old_g_val, omega_d, C, mu, alpha, omega, d_0)[1]

    return new_d_val, new_g_val

In [None]:
# Updated parameters for increased sensitivity
m0 = 1
alpha = 0.1
n = 1
h = 1
a0 = 1/3
tau = 3

In [None]:
def calculate_a_l_m(l, m):
    K_m = np.exp(alpha * (m - m0))
    a = 1 / (1 + (l / K_m)**n)
    return a

def calculate_phi(l,m):
    l+=1
    a_lm = calculate_a_l_m(l,m)
    value = 1/tau * (a_lm/a0)**h
    return  1-max(0, min(0.9, value))

In [None]:
class GridEnvironment:
    def __init__(self, width, height, reward_pos_size_prob):
        self.width = width
        self.height = height
        self.reward_pos, self.reward_size, self.reward_prob = reward_pos_size_prob
        self.visits = np.zeros((width, height))

    def step(self, action, state):
        # Update state based on action
        #action0 = right 1=up 2=left 3=down
        state_m, state_n = state

        if action == 0:
            state_n += 1
        elif action == 1:
            state_m -= 1
        elif action == 2:
            state_n -= 1
        elif action == 3:
            state_m += 1
        else:
            raise NotImplementedError('Wrong action')

        state_m = max(0, min(state_m, self.height - 1)) #make sure the agent stays in the grid
        state_n = max(0, min(state_n, self.width - 1)) #make sure the agent stays in the grid

        self.visits[state_m][state_n] += 1

        reward = 0
        for i in range(len(self.reward_pos)):
            # only at the reward position and with certain probability,the reward appeared
            if self.reward_pos[i][0] == state_m and self.reward_pos[i][1] == state_n and np.random.random() <= self.reward_prob[i]:
                    reward = self.reward_size[i]
                    break

        state = [state_m, state_n]

        return state, reward

def update_dopamine(d_list, g_list, reward_input):
    dop, gaba = explicit_euler(d_list[-1], g_list[-1], reward_input+1)
    return dop, gaba


In [None]:
class TDLearningAgent:
    def __init__(self, num_states_m, num_states_n, num_actions, learning_rate, gamma):
        self.q_table = np.zeros((num_states_m, num_states_n, num_actions)) # agent expect no reward
        self.learning_rate = learning_rate
        self.gamma = gamma

    def choose_action(self, prev_action, state, epsilon, last_epi=False):
        if last_epi:
            return 1

        if np.random.rand() < epsilon:

            elements = [0, 1, 2, 3]

            # Filter the list to exclude the prev_direction
            filtered_elements = [elem for elem in elements if elem != prev_action]

            # Randomly choose an element from the filtered list
            new_direction = random.choice(filtered_elements)

            return new_direction  # change direction with prob epsilon

        else:
            return prev_action # else keep direction

    def learn(self, state, action, reward):

        state_m, state_n = state

        q_action = action

        if action == 0:
            target = reward + self.gamma * np.max(self.q_table[state_m, state_n+1])
        elif action == 1:
            target = reward + self.gamma * np.max(self.q_table[state_m-1, state_n])
        elif action == 2:
            target = reward + self.gamma * np.max(self.q_table[state_m, state_n-1])
        elif action == 3:
            target = reward + self.gamma * np.max(self.q_table[state_m+1, state_n])
        else:
            raise NotImplementedError('Wrong action')

        predict = self.q_table[state_m, state_n, q_action]

        updated_value = self.q_table[state_m, state_n, q_action] + self.learning_rate * (target - predict)
        self.q_table[state_m, state_n, q_action] = max(0, updated_value)


In [None]:
def flatten_index(m, n, width):
    """ Convert 2D grid coordinates to a flattened index. """
    return m * width + n

def flatten_visits(visit_matrix):
    """ Flatten a 2D matrix of visits into a 1D list. """
    return list(np.array(visit_matrix).flatten())

def plot_visits(visit_matrix, reward_pos, width, height):
    """
    Plot the cumulative frequency of an agent's positions on a grid, emphasizing reward positions.

    Parameters:
    - visit_matrix: 2D list or numpy array, where element [m][n] contains the number of visits to position (m, n).
    - reward_pos: List of tuples, where each tuple is (m, n) coordinates of a reward position.
    - width, height: Dimensions of the grid.
    """
    # Flatten the visit matrix to a 1D list of visits
    visits = flatten_visits(visit_matrix)
    positions = list(range(width * height))  # List of all position indices

    # Generate labels for the x-axis using grid coordinates
    labels = [f"({i // width},{i % width})" for i in positions]

    plt.figure(figsize=(15, 6))
    plt.bar(positions, visits, color='gray', label='Visits')

    # Highlight reward positions by plotting them in a different color
    reward_indices = [flatten_index(m, n, width) for m, n in reward_pos]
    reward_visits = [visits[idx] for idx in reward_indices]
    plt.bar(reward_indices, reward_visits, color='red', label='Reward Positions')

    plt.xlabel('Position (m, n)', fontsize=14)
    plt.ylabel('Number of Visits', fontsize=14)
    #plt.title('Cumulative Frequency of Agent\'s Positions with Reward Emphasis')
    plt.xticks(positions, labels, rotation=45, ha='right')  # Rotate labels for better readability

    plt.legend()
    plt.grid(True)  # Adds a grid for better visibility

    if VERBOSE:
        directory = f'drive/MyDrive/Dopamine/graphs/'
        name = f'grid_cumu_freq_sto_rew_dop_reori_td'
        plt.savefig(directory + name +'.png', format='png',bbox_inches='tight',dpi=500)

    plt.show()

In [None]:
def plot_visits_heatmap(visit_matrix, reward_pos, width, height):
    """
    Plot a heatmap of the cumulative frequency of an agent's positions on a grid,
    emphasizing reward positions with a specific marker.

    Parameters:
    - visit_matrix: 2D list or numpy array, where element [m][n] contains the number of visits to position (m, n).
    - reward_pos: List of tuples, where each tuple is (m, n) coordinates of a reward position.
    - width, height: Dimensions of the grid.
    """
    visits = np.array(visit_matrix)  # Convert visit matrix to numpy array for better handling in plotting

    plt.figure(figsize=(8, 8))
    ax = plt.gca()  # Get the current axis

    # Create a heatmap
    cax = ax.imshow(visits, cmap='hot', interpolation='nearest', aspect='equal')

    # Adding color bar
    cbar = plt.colorbar(cax, orientation='vertical')
    cbar.set_label('Number of Visits', fontsize=14)

    # Mark reward positions
    for m, n in reward_pos:
        plt.scatter(n, m, color='cyan', s=200, edgecolors='black', marker='o')  # Circle marker for reward positions

    # Set axis properties
    ax.set_xticks(np.arange(width))
    ax.set_yticks(np.arange(height))
    ax.set_xticklabels(np.arange(width))
    ax.set_yticklabels(np.arange(height))
    ax.set_xlabel('State Dimension n', fontsize=14)
    ax.set_ylabel('State Dimension m', fontsize=14)
    #ax.set_title('Heatmap of Agent\'s Visit Frequencies')

    # Minor ticks to show grid lines
    ax.set_xticks(np.arange(-.5, width, 1), minor=True)
    ax.set_yticks(np.arange(-.5, height, 1), minor=True)
    ax.grid(which='minor', color='black', linestyle='-', linewidth=2)

    if VERBOSE:
        directory = f'drive/MyDrive/Dopamine/graphs/'
        name = f'grid_heat_sto_rew_dop_reori_td'
        plt.savefig(directory + name +'.png', format='png',bbox_inches='tight',dpi=500)

    plt.show()


In [None]:
# Initialization
rew_list = [[[2,3], [3,2], [4,4]], [1,2,4], [1,0.5, 0.25]]
env = GridEnvironment(width=7, height=7, reward_pos_size_prob=rew_list)
agent = TDLearningAgent(num_states_m=7, num_states_n=7, num_actions=4, learning_rate=0.1, gamma=0.2)

d_list_whole = []
g_list_whole = []
q_values_over_time = []

# Training Loop
for episode in tqdm(range(20000)):
    state = [0,0]
    done = False
    total_reward = 0
    d_list = [d_0]
    g_list = [g_st]
    pred_error_list=[]
    action = 0 #init action

    reward = 0
    dop_spike=0

    while not done:

        agent.learn(state, action, reward)

        state_m, state_n = state

        epsilon = calculate_phi(dop_spike, d_0)

        if state_m == env.height - 2 and state_n == env.width - 2:
            done = True
        elif state_m == 0 and state_n == 0:
            action = random.choice([0,3])
        else:
            if state_m == env.height - 2:
                action = random.choice([0,1,2])
            elif state_n == env.width - 2:
                action = random.choice([1,2,3])
            elif state_n == 0:
                action = random.choice([0,1,3])
            elif state_m == 0:
                action = random.choice([0,2,3])
            else:
                action = agent.choose_action(action, state, epsilon)

        next_state, reward = env.step(action, state)

        q_action = action

        # Update Dopamine based on prediction error
        prediction_error = abs(reward + agent.gamma * np.max(agent.q_table[next_state[0],next_state[1]]) - agent.q_table[next_state[0],next_state[1], q_action])

        dop, gaba = update_dopamine(d_list, g_list, prediction_error)
        dop_spike = dop - d_list[0]
        d_list.append(dop)
        g_list.append(gaba)

        state = next_state

        if next_state[0] == env.height - 1 and next_state[1] == env.width - 1:
            done = True

    if episode % 10 == 0:
        d_list_whole.append(d_list)
        g_list_whole.append(g_list)
        q_values_over_time.append(agent.q_table[:].copy())

In [None]:
# Initialization
env_simu = GridEnvironment(width=7, height=7, reward_pos_size_prob=rew_list)

d_list_simu = []
g_list_simu = []
q_values_over_time_simu = []

finish = False

state = [0,0]
action = 0 #init action
reward = 0
dop_spike=0
d_list = [d_0]
g_list = [g_st]

hit_time = 0

# Simulation Loop
while not finish:

    state_m, state_n = state

    epsilon = calculate_phi(dop_spike, d_0)

    if state_m == env_simu.height - 1 and state_n == env_simu.width - 1:
        action = random.choice([1,2])
        hit_time += 1
    elif state_m == 0 and state_n == 0:
        hit_time += 1
        action = random.choice([0,3])
    else:
        if state_m == env_simu.height - 1:
            action = random.choice([0,1,2])
        elif state_n == env_simu.width - 1:
            action = random.choice([1,2,3])
        elif state_n == 0:
            action = random.choice([0,1,3])
        elif state_m == 0:
            action = random.choice([0,2,3])
        else:
            action = agent.choose_action(action, state, epsilon)

    next_state, reward = env_simu.step(action, state)

    agent.learn(state, action, reward)

    q_action = action

    # Update Dopamine based on prediction error
    prediction_error = abs(reward + agent.gamma * np.max(agent.q_table[next_state[0],next_state[1]]) - agent.q_table[next_state[0],next_state[1], q_action])

    dop, gaba = update_dopamine(d_list, g_list, prediction_error)
    dop_spike = dop - d_list[0]
    d_list.append(dop)
    g_list.append(gaba)

    state = next_state

    if hit_time >= 10000:
        finish = True


In [None]:
plot_visits(env_simu.visits, env_simu.reward_pos, 7,7)

In [None]:
plot_visits_heatmap(env_simu.visits, env_simu.reward_pos, 7,7)

In [None]:
q_values_over_time = np.array(q_values_over_time)
actions = range(4)

fig, axs = plt.subplots(2, 2, figsize=(20, 12), subplot_kw={'projection': '3d'})
axs = axs.ravel()

for i, ax in enumerate(axs):
    if i < len(actions):
        action_index = actions[i]
        Q_values = q_values_over_time[-1, :, :, action_index]

        x = np.arange(Q_values.shape[1])
        y = np.arange(Q_values.shape[0])
        X, Y = np.meshgrid(x, y)

        # Plot the surface
        surface = ax.plot_surface(X, Y, Q_values, cmap='viridis', alpha=0.7)

        # Customize each subplot
        for j in range(len(rew_list[0])):
            m, n = rew_list[0][j]
            reward_expected_value = rew_list[1][j] * rew_list[2][j]
            ax.scatter(n, m, reward_expected_value, s=10 * rew_list[1][j], depthshade=True)
            ax.plot([n, n], [m, m], [0, 2], color='red', linestyle='dotted', linewidth=2)

        # Axis labels and title
        ax.set_xlabel('State Dimension n', labelpad=10)
        ax.set_ylabel('State Dimension m', labelpad=10)
        ax.set_zlabel('Q-values', labelpad=10)
        ax.set_title(f'Action {action_index}')

        # View angle for better visibility
        ax.view_init(elev=20, azim=-30)

plt.subplots_adjust(left=0.05, right=0.85, top=0.95, bottom=0.05, wspace=0.2, hspace=0.1)

# Add a color bar
cbar_ax = fig.add_axes([0.88, 0.15, 0.03, 0.7])
fig.colorbar(surface, cax=cbar_ax, label='Q-value')

if VERBOSE:
    directory = f'drive/MyDrive/Dopamine/graphs/'
    name = f'grid_last_q_sto_rew_dop_reori_td'
    plt.savefig(directory + name +'.png', format='png',bbox_inches='tight',dpi=500)

plt.show()


In [None]:
width, height = 7, 7

fig, axes = plt.subplots(1, len(rew_list[0]), figsize=(18, 6), sharey=True)

colors = ['red', 'blue', 'green', 'purple']
labels = ['Action 0', 'Action 1', 'Action 2', 'Action 3']

all_q_values = []

for i in range(3):
    m = rew_list[0][i][0]
    n = rew_list[0][i][1]
    all_q_values.extend(q_values_over_time[:, m, n, :].flatten())

min_q_value = min(all_q_values)
max_q_value = max(all_q_values)

bin_width = 0.05
bins = np.arange(min_q_value, max_q_value + bin_width, bin_width)

for index, (m, n) in enumerate(rew_list[0]):
    data = [q_values_over_time[:, m, n, action] for action in range(4)]
    axes[index].hist(data, bins=bins, color=colors, label=labels, stacked=True, alpha=0.75)

    axes[index].set_xlabel('Q-value intervals', fontsize=14)
    axes[index].set_xlim(min_q_value, max_q_value)
    axes[index].set_title(f'Reward at Position ({m},{n}) with Size {rew_list[1][index]} and Probability {rew_list[2][index]}')

    expected_value = rew_list[1][index] * rew_list[2][index]
    axes[index].axvline(x=expected_value, color='black', linestyle='--', linewidth=2, label=f'Expected Value = {expected_value}')
    axes[index].legend(loc='upper right')

fig.text(0.02, 0.5, 'Frequency', va='center', rotation='vertical', fontsize=12)

plt.tight_layout(rect=[0.03, 0, 1, 0.95])

if VERBOSE:
    directory = f'drive/MyDrive/Dopamine/graphs/'
    name = f'grid_q_freq_sto_rew_dop_reori_td'
    plt.savefig(directory + name +'.png', format='png',bbox_inches='tight',dpi=500)

plt.show()
