In [36]:
import numpy as np
from enum import Enum
from typing import Dict, Tuple

In [37]:
# helpful enums

# object types - bottle, can, or glass
class ObjectType(Enum):
    BOTTLE = 1
    CAN = 2
    GLASS = 3

# outcomes
class OutcomeType(Enum):
    SP_SUCCESS = 0 # robot succeeds
    SP_FAIL = 1 # robot fails
    IT = 2 # human intervenes

TRUST_LEVELS = [1,2,3,4,5,6,7]

In [38]:
# reward table

REWARD_TABLE: Dict[Tuple[ObjectType, OutcomeType], float] = {
    (ObjectType.BOTTLE, OutcomeType.SP_SUCCESS): 1.0,
    (ObjectType.BOTTLE, OutcomeType.SP_FAIL):    0.0,
    (ObjectType.BOTTLE, OutcomeType.IT):         0.0,

    (ObjectType.CAN, OutcomeType.SP_SUCCESS):    2.0,
    (ObjectType.CAN, OutcomeType.SP_FAIL):      -4.0,
    (ObjectType.CAN, OutcomeType.IT):            0.0,

    (ObjectType.GLASS, OutcomeType.SP_SUCCESS):  3.0,
    (ObjectType.GLASS, OutcomeType.SP_FAIL):    -9.0,
    (ObjectType.GLASS, OutcomeType.IT):          0.0,
}

In [39]:
def sigmoid(x):
  return 1 / (1 + np.exp(-x))

class HumanDecisionModel:

  # maps object type to discounted reward --> must learn this from data
  gamma: Dict[ObjectType, float]
  # maps object type to 'risk penalty' --> must learn this from data
  eta: Dict[ObjectType, float]

  # does the human think the robot will succeed?
  # bj = S(gamma*theta + eta)
  def success_belief(self, theta, obj):

    g = self.gamma[obj]
    n = self.eta[obj]

    return sigmoid(g * theta + n)

  # what is the probability the human will stay put / not intervene?
  # Pt = S(bj*rj + (1 - bj)*rj)
  def prob_stay_put(self, theta, obj):

    b_tj = self.success_belief(theta, obj)
    rS = REWARD_TABLE[(obj, OutcomeType.SP_SUCCESS)]
    rF = REWARD_TABLE[(obj, OutcomeType.SP_FAIL)]

    return sigmoid(b_tj * rS + (1 - b_tj) * rF)

  # calculate human action
  # a ~ B(Pt)
  def sample_human_action(self, theta, obj):

    p_stay = self.prob_stay_put(theta, obj)
    if np.random.rand() < p_stay:
      return OutcomeType.SP_SUCCESS
    return OutcomeType.IT

In [40]:
human_model = HumanDecisionModel()

# bayesion inference for the probability of each observation
def belief(state_theta, action_obj, observation):

    p_stay = human_model.prob_stay_put(state_theta, action_obj)

    if observation == OutcomeType.SP_SUCCESS:
        return p_stay
    else:  # observation == OutcomeType.IT (OutcomeType.Fail isn't possible yet)
        return 1 - p_stay

In [41]:
from scipy.stats import norm

def belief_dist(theta_t, observation, alpha=1, beta=.2, sigma=1):
    """
    P(theta_{t+1} | theta_t, a_t, o_t)

    theta ~ N(u, sigma)
    where u changes as shown below:
    - if robot succeeds:      u + alpha
    - if human intervenes:    u - beta
    """

    # if the robot succeeds
    if observation == OutcomeType.SP_SUCCESS:
        mean = theta_t + alpha  # trust increases
    else:  # otherwise, trust decreases slightly (by beta)
        mean = theta_t - beta

    # calculate probabilities for all trust levels
    # ex. if the robot succeeds, higher trust level becomes more probably
    probs = {}
    for theta in TRUST_LEVELS:
        lower = theta - 0.5
        upper = theta + 0.5
        probs[theta] = norm.cdf(upper, loc=mean, scale=sigma) - norm.cdf(lower, loc=mean, scale=sigma)

    total = sum(probs.values())
    probs = {theta: p / total for theta, p in probs.items()}

    return probs

In [42]:
def belief_update(belief, action, observation, human_model):
    """
    q(theta) = sum_over_all_t(b(theta)p(next_theta | theta, a_r, a_h)) 

    -- typical belief update equation for POMDPs
    """
    predicted = {}
    for theta_next in TRUST_LEVELS:
        predicted[theta_next] = 0.0
        for theta_curr in TRUST_LEVELS:
            prior = belief[theta_curr]
            trans_prob = belief_dist(theta_curr, observation)[theta_next]
            predicted[theta_next] += prior * trans_prob

    new_belief = {}
    for theta_next in TRUST_LEVELS:
        if observation == OutcomeType.SP_SUCCESS:
            obs_prob = human_model.prob_stay_put(theta_next, action)
        else:
            obs_prob = 1 - human_model.prob_stay_put(theta_next, action)

        new_belief[theta_next] = predicted[theta_next] * obs_prob

    total = sum(new_belief.values())
    new_belief = {theta: p / total for theta, p in new_belief.items()}

    return new_belief


In [43]:
from functools import lru_cache

# ChatGPT helped generate an exhaustive tree search in place of using SARSOP (which was incredibly difficult to download/run)
# The important distinction here is that, since our POMDP environment is so small, we are able to explore the entire state space
# Our results converge to a completely optimal policy (versus SARSOP uses approximation)
# This would NOT scale to a larger environment

def simple_trust_aware_policy(belief, human_model, available_actions):
    """
    Full-horizon trust-aware planning with memoization.
    Explores all trajectories until remaining_objects is empty.
    Returns (best_action, best_total_expected_reward).
    """

    # Convert initial belief to tuple (because dicts aren't hashable)
    init_belief_tuple = tuple(belief[theta] for theta in TRUST_LEVELS)

    # Convert available_actions to a tuple of integers (ObjectType values)
    init_actions_tuple = tuple(sorted([obj.value for obj in available_actions]))

    # ---- 1. One-step expected reward (unchanged) ----
    def immediate_expected_reward(belief_state, action_obj):
        expected = 0.0
        for theta in TRUST_LEVELS:
            p_theta = belief_state[theta]
            p_stay = human_model.prob_stay_put(theta, action_obj)

            reward_if_stay = REWARD_TABLE[(action_obj, OutcomeType.SP_SUCCESS)]
            reward_if_intervene = REWARD_TABLE[(action_obj, OutcomeType.IT)]

            expected += p_theta * (
                p_stay * reward_if_stay +
                (1 - p_stay) * reward_if_intervene
            )
        return expected

    # ---- 2. Cached wrapper for belief_update ----
    @lru_cache(maxsize=None)
    def cached_belief_update(belief_tuple, action_value, obs_value):
        belief_dict = {theta: belief_tuple[i] for i, theta in enumerate(TRUST_LEVELS)}
        action_obj = ObjectType(action_value)
        obs = OutcomeType(obs_value)
        new_b = belief_update(belief_dict, action_obj, obs, human_model)
        return tuple(new_b[theta] for theta in TRUST_LEVELS)

    # ---- 3. Main recursive value function, fully memoized ----
    @lru_cache(maxsize=None)
    def expected_return(belief_tuple, actions_tuple):
        """
        belief_tuple: tuple of 7 floats (belief over trust levels)
        actions_tuple: sorted tuple of remaining object values (ints)
        """

        # Base case: no objects left
        if not actions_tuple:
            return (None, 0.0)

        # Convert tuple â†’ usable dict
        belief_state = {theta: belief_tuple[i] for i, theta in enumerate(TRUST_LEVELS)}

        best_act = None
        best_val = -float('inf')

        # Try each remaining action
        for idx, action_value in enumerate(actions_tuple):
            action_obj = ObjectType(action_value)

            # 1-step reward
            immediate = immediate_expected_reward(belief_state, action_obj)

            # Expected future reward
            future = 0.0
            for obs in (OutcomeType.SP_SUCCESS, OutcomeType.IT):
                obs_value = obs.value

                # Compute P(obs | belief, action)
                if obs == OutcomeType.SP_SUCCESS:
                    p_obs = sum(
                        belief_state[t] * human_model.prob_stay_put(t, action_obj)
                        for t in TRUST_LEVELS
                    )
                else:
                    p_obs = sum(
                        belief_state[t] * (1 - human_model.prob_stay_put(t, action_obj))
                        for t in TRUST_LEVELS
                    )

                if p_obs == 0:
                    continue

                # Updated belief (cached!)
                next_b_tuple = cached_belief_update(belief_tuple, action_value, obs_value)

                # Remove this action from remaining list
                next_actions = list(actions_tuple)
                next_actions.pop(idx)
                next_actions_tuple = tuple(sorted(next_actions))

                # Recursive call (cached!)
                _, v_next = expected_return(next_b_tuple, next_actions_tuple)

                future += p_obs * v_next

            total = immediate + future

            if total > best_val:
                best_val = total
                best_act = action_obj

        return (best_act, best_val)

    # ---- Call planner ----
    return expected_return(init_belief_tuple, init_actions_tuple)


In [44]:
def simulate_task(human_model, initial_belief, initial_theta):
    """
    simulating full table-clearing task
    """
    
    belief = initial_belief.copy()
    total_reward = 0.0
    history = []

    true_theta = initial_theta

    remaining_objects = [
        ObjectType.BOTTLE, ObjectType.BOTTLE, ObjectType.BOTTLE,
        ObjectType.CAN, ObjectType.GLASS
    ]

    while remaining_objects:
        
        # robot action
        action, _ = simple_trust_aware_policy(belief, human_model, remaining_objects)

        # observes whether human intervenes
        observation = human_model.sample_human_action(true_theta, action)

        # get reward
        reward = REWARD_TABLE[(action, observation)]
        total_reward += reward

        # trust dynamics
        trans_probs = belief_dist(true_theta, observation)
        true_theta = np.random.choice(
            TRUST_LEVELS,
            p=[trans_probs[theta] for theta in TRUST_LEVELS]
        )

        # remove action availability
        remaining_objects.remove(action)

        history.append(action.name)

    return total_reward, history


In [45]:
# FULL RUN

human_model = HumanDecisionModel()

human_model.gamma = {
    ObjectType.BOTTLE: 1,   
    ObjectType.CAN:    1.2,  
    ObjectType.GLASS:  1.5,  
}

human_model.eta = {
    ObjectType.BOTTLE: 0,  
    ObjectType.CAN:   -2.5, 
    ObjectType.GLASS: -5,  
}

initial_belief =   {
    1: 0.0625,
    2: 0.125,
    3: 0.1825,
    4: 0.375,
    5: 0.1825,
    6: 0.125,
    7: 0.0625
}

total_reward, history = simulate_task(
            human_model,
            initial_belief,
            4
)

print("Total Reward: " + str(total_reward))
print("Action Sequence: " + str(history))

Total Reward: 8.0
Action Sequence: ['BOTTLE', 'BOTTLE', 'BOTTLE', 'CAN', 'GLASS']
