# Imports and Installs

In [3]:
#Installs & Imports
!pip install scipy
!pip install matplotlib
!pip install numpy
!pip install torchvision
import gym 
from gym import spaces
import collections
import pprint
import torch
import numpy as np
import random
import operator



#MDP

In [99]:
#constants
GAMMA = 0.9
TEST_EPISODES = 20
REWARD_GOAL = 0.8
N = 1000

class MDP:
    def init(self):

        self.S = [0, 1, 2]
        self.endstate = self.S[-1]
        self.gamma = 0.99 #for val_iteration_for_q
        self.actions = [0, 1, 2] # 0 = BACK, 1 = FORWARD, 2 = STAY
        self.currentstate = self.S[0]
        self.rewards = {0: -0.9641254941220043, 1: -0.9971473606992696, 2: 1.0}#true optimal reward function
        self.T = {
            (0, 0): {0: 0.7, 1: 0.2, 2: 0.1},  
            (0, 1): {0: 0.1, 1: 0.7, 2: 0.2}, 
            (0, 2): {0: 0.7, 1: 0.2, 2: 0.1}, 
            (1, 0): {0: 0.7, 1: 0.2, 2: 0.1}, 
            (1, 1): {0: 0.1, 1: 0.2, 2: 0.7}, 
            (1, 2): {0: 0.1, 1: 0.7, 2: 0.2},
            (2, 0): {0: 0.2, 1: 0.7, 2: 0.1}, 
            (2, 1): {0: 0.7, 1: 0.2, 2: 0.1},
            (2, 2): {0: 0.1, 1: 0.1, 2: 0.8}}

        self.R = collections.defaultdict(float)
        self.values = collections.defaultdict(float)
        
    #helper functions

    def step(self, action):
        isdone = False
        #new state = state with highest probability from transition model given current state and action
        new_state = max(self.T[(self.currentstate, action)].items(), key=operator.itemgetter(1))[0] 
        if new_state == self.endstate: 
            isdone = True 
        else:
            isdone = False
        return new_state, self.rewards[new_state], isdone
       
    def select_action(self, state):
        best_action, best_value = None, None #initialise
        for action in range(len(self.actions)): #for each action
            action_value = self.values[(state, action)] #get utility
            if best_value is None or best_value < action_value: #if action value > best value 
                best_value = action_value #set the value as best value
                best_action = action #set the action as best action
        return best_action #return action from state which yields highest utility 

    def get_state_utility(self, state):
        utility = 0
        for action in self.actions:
            utility += self.values[state, action] #sum utilities of all possible actions from given state
        return utility/len(agent.actions) #return average utility for that state 

    #functions

    def play_n_random_steps(self, count):
        for _ in range(count):
            action = random.choice(self.actions) #select random action
            new_state, reward, is_done = self.step(action) #take random action
            self.R[(self.currentstate, action, new_state)] = reward #populate R with findings 
            self.currentstate = new_state #increment state
        self.currentstate = self.S[0] #reset starting state

    def value_iteration_for_Q(self):
        for state in self.S: #for each state
            for action in self.actions: #for each action
                action_value = 0.0
                target_probs = self.T[(state, action)] #get probabilites of target states
                total = sum(target_probs.values()) #get total probability of all states
                for tgt_state, probability in target_probs.items(): #for each tuple (S*, probability of landing in it given (S,A))
                    key = (state, action, tgt_state) 
                    reward = self.R[key] #get reward for the transition
                    best_action = self.select_action(tgt_state) #select action with highest utility
                    val = reward + GAMMA * self.values[(tgt_state, best_action)] #generate Q value 
                    action_value += (probability / total) * val  #calculate updated action value
                self.values[(state, action)] = action_value #update values with learned utility of (S,A)

    def find_optimal_policy(self):
        policy = [None] * len(self.S) #initialise empty policy
        for state in self.S: #for each state
            best_action = self.select_action(state) #select action with highest utility
            policy[state] = best_action #set this as optimal action
        return policy


    def sample_paths(self, policy, no_paths):
        paths = []
        path = []
        self.currentstate = self.S[0]
        while (len(paths) != no_paths):
            outcomes = self.T[self.currentstate, policy[self.currentstate]].values() #get probability of target states given action from policy as dict value obj
            probs = [] #empty list
            for item in outcomes:
                probs.append(item) #populate probs list with outcomes 
            probs = np.array(probs) #cast probs list to np.array
            newstatearray = np.random.multinomial(1, probs).tolist() #get new state array e.g [0,1,0]
            new_state = newstatearray.index(1) #get actual state
            path.append(policy[new_state]) #add action to path
            endstate = self.endstate
            if (new_state == endstate): #if at terminal state
                #if (path not in paths and path != [1,1,2]): 
                paths.append(path) #add path to paths
                path = [] #reset path
                self.currentstate = self.S[0] #reset to start state
            else:
                self.currentstate = new_state 

        return paths

#Likelihood & Hillclimb Function

In [103]:
def likelihood(paths, rewards):
    #initialise MDP with given reward function
    agent = MDP()
    agent.init()
    agent.rewards = rewards
    agent.play_n_random_steps(N)
    agent.value_iteration_for_Q()
    agent.find_optimal_policy()
    likelihoodList = [] 
    for path in paths:
        actionLikelihoodList = [None] * len(path) #empty list length of path
        state = agent.S[0]
        i = 0
        for step in path:
            action = step 
            state = max(agent.T[(state, action)].items(), key=operator.itemgetter(1))[0]  #update state given action from path
            actionLikelihoodList[i] = agent.values[state,action]  - agent.get_state_utility(state) #Q^Rstate,action −V^Rstate
            i += 1
        likelihoodList.append(sum(actionLikelihoodList) / len(actionLikelihoodList))
    return sum(likelihoodList)

In [106]:
def stochastic_hillclimb(paths, threshold):
    Start = -1.0 #lower reward limit
    Stop = 1.0 #upper reward limit
    limit = 2 #number of states we are estimating likelihoods for (0 & 1), terminal state is set at 1.
    likelihood_threshold = threshold
    threshold_reached = False 
    final_rewards = collections.defaultdict(float) #reward dict to be returned

    while not threshold_reached:
        RandomListOfIntegers = [random.uniform(Start, Stop) for iter in range(limit)] #random list of reward ints 
        rewards = {0: RandomListOfIntegers[0], 1: RandomListOfIntegers[1], 2: 1.0} #map to dictionary {state:rewards}

        HC_likelihood_list = []
        for i in range(100):
            HC_likelihood_list.append(likelihood(paths, rewards)) 

        rewards_likelihood = sum(HC_likelihood_list)/len(HC_likelihood_list) #average likelihood in list of 100
        
        """
        #un-comment to print reward func and corresponding likelihood func on each iteration
        formatlist = [rewards, rewards_likelihood]
        print("Rewards: {},   likeihood: {}".format(*formatlist))
        """

        if(rewards_likelihood > likelihood_threshold): #once threshold exceeded
            #terminate and return
            threshold_reached = True
            final_rewards = rewards

    return final_rewards

#Main

In [110]:
agent = MDP()
agent.init()
agent.play_n_random_steps(N)
agent.value_iteration_for_Q()
optimal_policy = agent.find_optimal_policy()
no_of_paths = 1000
paths = agent.sample_paths(optimal_policy, no_of_paths)
main_likelihoods_list = []
for i in range(100):
    main_likelihoods_list.append(likelihood(paths, agent.rewards)) 
printlist = [agent.rewards, sum(main_likelihoods_list)/len(main_likelihoods_list)]
print("\nThe optimal reward function {} has likelihood of {} (avg of 100 samples)".format(*printlist))
print("\nTrue Optimal Policy is {}".format(optimal_policy))
formatlist = [no_of_paths, paths]
print("\nThe {} sampled paths are: {}".format(*formatlist))
optimal_rewards = stochastic_hillclimb(paths, 270)
formatlist = [optimal_rewards, likelihood(paths, optimal_rewards)]
print("\nHill climb converges at reward function {} with a likelihood of {} (avg of 100 samples)".format(*formatlist))
agent.rewards = optimal_rewards
agent.play_n_random_steps(N)
agent.value_iteration_for_Q()
print("\nThe optimal policy under the optimal reward func is: {}".format(agent.find_optimal_policy()))
agent.value_iteration_for_Q()
print("\nThe Q under the optimal reward func is")
pprint.pprint(agent.values)


The optimal reward function {0: -0.999753592635372, 1: -0.9374563633938111, 2: 1.0} has likelihood of 279.42164671115745 (avg of 100 samples)

True Optimal Policy is [1, 1, 2]

The 1000 sampled paths are: [[1, 1, 1, 1, 1, 2], [1, 2], [2], [1, 2], [2], [1, 2], [1, 2], [1, 1, 1, 1, 2], [1, 2], [1, 2], [1, 2], [1, 1, 1, 2], [1, 2], [1, 2], [1, 2], [1, 2], [1, 2], [1, 1, 1, 1, 1, 2], [1, 2], [1, 2], [1, 2], [1, 2], [1, 1, 1, 2], [1, 2], [2], [1, 2], [2], [1, 2], [1, 2], [1, 1, 1, 2], [1, 2], [1, 2], [1, 2], [2], [1, 1, 2], [2], [1, 1, 2], [2], [1, 2], [1, 1, 2], [2], [1, 1, 1, 1, 2], [2], [2], [1, 2], [1, 2], [2], [1, 2], [1, 2], [1, 1, 2], [1, 2], [1, 1, 2], [2], [1, 2], [1, 2], [1, 2], [1, 2], [1, 2], [1, 1, 2], [1, 2], [1, 2], [1, 2], [1, 1, 2], [1, 2], [1, 2], [1, 2], [1, 1, 1, 2], [1, 2], [1, 2], [1, 2], [1, 1, 2], [1, 2], [2], [2], [1, 2], [1, 2], [1, 2], [2], [1, 1, 2], [1, 1, 1, 1, 1, 2], [1, 2], [1, 2], [1, 1, 2], [2], [1, 2], [2], [1, 1, 2], [1, 2], [1, 2], [1, 2], [1, 2], [1, 2

#Testing Reward Func

In [102]:
#edit to test diff reward functions    
agent = MDP()
agent.init()
agent.play_n_random_steps(N)
agent.value_iteration_for_Q()
test_likelihoods_list =[]
rewardfunc = {0: -0.999753592635372, 1: -0.9374563633938111, 2: 1.0}
for i in range(100):
    test_likelihoods_list.append(likelihood(agent.sample_paths(agent.find_optimal_policy(), 1000), rewardfunc)) 
agent.rewards = rewardfunc
print(agent.find_optimal_policy())
print(sum(test_likelihoods_list)/len(test_likelihoods_list))

[1, 1, 2]
284.58953901567367


# **ignore** *Copy Paste Dump* 

In [None]:
"""
threshold = 0.0001
action = 0
TEST_EPISODES = 20
REWARD_GOAL = 0.8
N =100

        #hard coded R
        self.R_hardcoded = {(0, 0, 0): 0.0, 
                  (0, 0, 1): 0.0, 
                  (0, 0, 2): 1.0,
                  (0, 1, 0): 0.0,
                  (0, 1, 1): 0.0, 
                  (0, 1, 2): 1.0,
                  (0, 2, 0): 0.0,
                  (0, 2, 1): 0.0,
                  (1, 0, 0): 0.0, 
                  (1, 0, 1): 0.0, 
                  (1, 0, 2): 1.0,
                  (1, 1, 0): 0.0,
                  (1, 1, 1): 0.0, 
                  (1, 1, 2): 1.0,
                  (1, 2, 0): 0.0,
                  (1, 2, 1): 0.0,
                  (1, 2, 2): 1.0,
                  (2, 0, 0): 0.0, 
                  (2, 0, 1): 0.0, 
                  (2, 0, 2): 1.0,
                  (2, 1, 0): 0.0,
                  (2, 1, 1): 0.0, 
                  (2, 1, 2): 1.0,
                  (2, 2, 0): 0.0,
                  (2, 2, 1): 0.0,
                  (2, 2, 2): 1.0}

        #hard coded T

def step(self, action):
        isdone = False
        if action == 2:
            currentstate = self.currentstate #remain at current state
        elif action == 1:
            try:
                currentstate = self.actions[self.actions.index(self.currentstate)+1] #take step to right
            except IndexError:
                currentstate = self.actions[0]
        else:
            try:
                currentstate = self.actions[self.actions.index(self.currentstate)-1] #take step to left
            except IndexError:
                currentstate = self.currentstate
        if currentstate == self.endstate:
            isdone = True
        else:
            isdone = False

        return currentstate, self.rewards[currentstate], isdone
        
        
        #single path function
        def likelihood1(policy, agent):
            likelihoodList = [None] * len(policy)
            for i in range(len(policy)):
                state = int(i)
                action = int(policy[i])    
                likelihoodList[i] = agent.values[state,action]  - agent.get_state_utility(state) 
            return sum(likelihoodList) / len(likelihoodList)

            #create random policies for testing
actions = [0, 1, 2, 3]
nonoptimal1 = [None] * 16
nonoptimal2 = [None] * 16
nonoptimal3 = [None] * 16
policies = [nonoptimal1, nonoptimal2, nonoptimal3]
for policy in policies:
    for i in range(len(policy)):
        policy[i] = random.choice(actions)

#create random reward functions for testing
nonoptimal_rewardfunc1 = np.random.rand(env.observation_space.n,env.action_space.n)
nonoptimal_rewardfunc2 = np.random.rand(env.observation_space.n,env.action_space.n)
nonoptimal_rewardfunc3 = np.random.rand(env.observation_space.n,env.action_space.n)


#get optimal reward function
q_table = create_q_table(env) 


print("---------------------------------------")
print("\n*** Testing likelihood function using optimal policy with random reward functions ***\n")
print("---------------------------------------")
print("\n")
print("Likelihood for optimal reward function is {}".format(likelihood(optimal_policy, q_table)))
print("Likelihood for non optimal reward function 1 is {}".format(likelihood(optimal_policy, nonoptimal_rewardfunc1)))
print("Likelihood for non optimal reward function 2 is {}".format(likelihood(optimal_policy, nonoptimal_rewardfunc2)))
print("Likelihood for non optimal reward function 3 is {}".format(likelihood(optimal_policy, nonoptimal_rewardfunc3)))
print("\n")


print("---------------------------------------")
print("\n*** Testing likelihood function using optimal reward function with random policies ***\n")
print("---------------------------------------")
print("\n")
print("Likelihood for optimal policy  is {}".format(likelihood(optimal_policy, q_table)))
print("Likelihood for non optimal policy 1 is {}".format(likelihood(nonoptimal1, q_table)))
print("Likelihood for non optimal policy 2 is {}".format(likelihood(nonoptimal2, q_table)))
print("Likelihood for non optimal policy 3 is {}".format(likelihood(nonoptimal3, q_table)))
print("\n")
                
"""

def likelihood2(paths, agent):
    likelihoodList = []
    for path in paths:
        actionLikelihoodList = [None] * len(path)
        for i in range(len(path)):
            state = int(i)
            action = int(path[i])    
            actionLikelihoodList[i] = agent.values[state,action]  - agent.get_state_utility(state) 
        likelihoodList.append(sum(actionLikelihoodList) / len(actionLikelihoodList))
    return likelihoodList

"""
likelihoods = likelihood(paths, agent) #function call

#formatting output
print("The paths are: {}".format(paths))
for l in likelihoods:
    index = likelihoods.index(l)
    value = likelihoods[index]
    formatlist = [index+1, value]
    print("\nLikelihood for path {} is {}".format(*formatlist))

print("\nThe optimal policy is path {}".format(likelihoods.index(max(likelihoods))+1))

#take R as input and compute values in function
"""

def likelihood2(paths, rewards):
    listoflikelihoodlists = []
    for reward in rewards:
        agent = MDP()
        agent.init()
        agent.rewards = reward
        agent.play_n_random_steps(N)
        agent.value_iteration_for_Q()
        agent.find_optimal_policy()
        likelihoodList = []
        for path in paths:
            actionLikelihoodList = [None] * len(path)
            for i in range(len(path)):
                state = int(i)
                action = int(path[i])    
                actionLikelihoodList[i] = agent.values[state,action]  - agent.get_state_utility(state) 
            likelihoodList.append(sum(actionLikelihoodList) / len(actionLikelihoodList))
        listoflikelihoodlists.append(likelihoodList)
    
    
    return listoflikelihoodlists
"""
#0: 0.0, 1: 0.2, 2: 1.0
likelihoods = likelihood(paths, {0: 0.0, 1: 0.2, 2: 1.0}) #function call
formatlist = [paths, {0: 0.0, 1: 0.2, 2: 1.0}, likelihoods]
print("The likelihoods of paths: \n{}\nunder the rewards: \n{}\nare: \n\n{}\n\n".format(*formatlist))

#0: 0.0, 1: 0.0, 2: 1.0
likelihoods = likelihood(paths, {0: 0.0, 1: 0.0, 2: 1.0}) #function call
formatlist = [paths, {0: 0.0, 1: 0.0, 2: 1.0}, likelihoods]
print("The likelihoods of paths: \n{}\nunder the rewards: \n{}\nare: \n\n{}\n\n".format(*formatlist))

#0: 0.0, 1: 0.8, 2: 1.0
likelihoods = likelihood(paths, {0: 0.0, 1: 0.8, 2: 1.0}) #function call
formatlist = [paths, {0: 0.0, 1: 0.8, 2: 1.0}, likelihoods]
print("The likelihoods of paths: \n{}\nunder the rewards: \n{}\nare: \n\n{}\n\n".format(*formatlist))




rewards = [{0: 0.0, 1: 0.2, 2: 1.0},
{0: 0.0, 1: 0.0, 2: 1.0},
{0: 0.0, 1: 0.8, 2: 1.0},
{0: 0.0, 1: 1.0, 2: 2.0}]


listoflikelihoodlists = likelihood1(paths, rewards)

print("\n\n\nSum of likelihoods for each reward func are:")

for list in listoflikelihoodlists:
    print(sum(list))
"""