In [63]:
import unittest
from ddt import ddt, data, unpack
from itertools import product

import sys
sys.path.append('../Environments/')
from ColoredGridWorld import OBMDP as targetCode
from ColoredGridWorld.MDP import MDP
from ColoredGridWorld import SetUpInferenceSpace as sup
sys.path.append('../Algorithms/')
import ActionInterpretation as AI

In [82]:
@ddt
class TestOBMDPConstruction(unittest.TestCase):
    def setUp(self): 
        dimensions = (3,3)
        goals = [(2,1)]
        actions = [(-1,0),(0,1),(0,-1),(1,0)]
        oneColourStates = {(0,0): 'white', (0,1): 'white', (0,2):'white', (1,0): 'white', (1,1): 'orange', (1,2):'white', (2,0): 'white', (2,1): 'yellow', (2,2):'white'}
        convergenceTolerance = 10e-7
        gamma = 0.94
        alpha = 20
        eps = 0.01
        hyperparameters = (convergenceTolerance, gamma, alpha, eps)
        oneColourUtilitySpace = [ {'white': 0, 'yellow': 10, 'orange': 0}, {'white': 0, 'yellow': 10, 'orange': -2} ]
        transitionSpace = [True]
        oneColourWorlds = sup.buildWorldSpace(oneColourUtilitySpace, transitionSpace)
        oneColourEnvSpace = [(world, goal) for world, goal in product(oneColourWorlds, goals)]
        oneColourEnvPolicySpace = sup.buildEnvPolicySpace(dimensions, oneColourStates, actions, oneColourEnvSpace, hyperparameters)
        actionInterpretation = AI.ActionInterpretation(oneColourEnvPolicySpace)
        self.literalObserver = targetCode.LiteralObserver(actionInterpretation)
        bins = [0,0.25,0.5,0.75,1]
        beliefSpacePossible = [{key:value for key, value in zip(oneColourEnvSpace, permutations)} for permutations in product(bins, repeat = len(oneColourEnvSpace))]
        self.discreteBeliefSpace = [beliefVector for beliefVector in beliefSpacePossible if (sum(value for value in beliefVector.values())==1)]
        hashableDiscreteBeliefSpace = [sup.HashableBelief(beliefVector) for beliefVector in self.discreteBeliefSpace]
        getMDP = MDP(dimensions, oneColourStates, {'white': 0, 'yellow': 10, 'orange': 0} )
        objectRewardFn, objectTransitionFn = getMDP()
        self.getNextBelief = self.literalObserver(self.discreteBeliefSpace, True)
        beliefUtilityFn = targetCode.getBeliefUtility()
        jointStateSpace = list(product(oneColourStates.keys(), hashableDiscreteBeliefSpace))
        getOBMDP = targetCode.OBMDP(jointStateSpace, (sup.HashableWorld({'white': 0, 'yellow': 10, 'orange': 0}, True), (2,1)), True, 10)
        self.jointRewardFn, self.jointTransitionFn = getOBMDP(objectTransitionFn, objectRewardFn, self.getNextBelief, beliefUtilityFn)

    #case1: checking increase in utility, case2: checking boolean isInformative works, case3: checking no increase in utility, case4: checking negative utility
    @data( ({'env1':0.5, 'env2':0.5}, {'env1':0.75, 'env2':0.25}, 'env1', True, 0.25),
           ({'env1':0.5, 'env2':0.5}, {'env1':0.75, 'env2':0.25}, 'env1', False, 0),
           ({'env1':0.4, 'env2':0.5, 'env3':0.1}, {'env1':0.7, 'env2':0.2, 'env3': 0.1}, 'env3', True, 0), 
           ({'env1':0.4, 'env2':0.5, 'env3':0.1}, {'env1':0.7, 'env2':0.2, 'env3': 0.1}, 'env2', True, -0.3) )
    @unpack
    def test_beliefUtility(self, beliefDict1, beliefDict2, trueEnv, isInformative, beliefUtilityExpected):
        beliefUtilityFn = targetCode.getBeliefUtility()
        beliefUtilityReceived = beliefUtilityFn(beliefDict1, beliefDict2, trueEnv, isInformative)
        self.assertEqual(beliefUtilityReceived, beliefUtilityExpected)
    
    #last testcase checks for targetVector being exactly in the middle of possible beliefs (outputs nearest neighbour as the first vector encountered in list)
    @data( (sup.HashableBelief({'x':0.6}), [{'x':0}, {'x':1}], {'x':1}),
           (sup.HashableBelief({'x':0.6}), [{'x':0}, {'x':0.1}, {'x':0.2}, {'x':0.3}, {'x':0.4}, {'x':0.5}, {'x':0.6}, {'x':1}], {'x':0.6}),
           (sup.HashableBelief({'x':100.8}), [{'x':105}, {'x':98}], {'x':98}),
           (sup.HashableBelief({'x':0.5}), [{'x':0}, {'x':1}], {'x':0}) )
    @unpack
    def test_classLiteralObserver_getNearestNeighbour_1D(self, targetVectorDict, listOfVectorDicts, expectedNearestNeighbour):
        nearestNeighbour = self.literalObserver.getNearestNeighbour(targetVectorDict, listOfVectorDicts)
        self.assertEqual(nearestNeighbour(), expectedNearestNeighbour)
        
        
    #last testcase checks for targetVector being exactly in the middle of possible beliefs (outputs nearest neighbour as the first vector encountered in list)
    @data( (sup.HashableBelief({'x':0.75, 'y':0.7}), [{'x':0, 'y':0}, {'x':1, 'y':0}, {'x':1, 'y':1}, {'x':0, 'y':1}], {'x':1, 'y':1}),
           (sup.HashableBelief({'x':18, 'y':27}), [{'x':0, 'y':0}, {'x':25, 'y':50}, {'x':25, 'y':0}, {'x':0, 'y':50}], {'x':25, 'y':50}),
           (sup.HashableBelief({'x':0.5, 'y':0.5}), [{'x':0, 'y':0}, {'x':1, 'y':0}, {'x':1, 'y':1}, {'x':0, 'y':1}], {'x':0, 'y':0}) )
    @unpack
    def test_classLiteralObserver_getNearestNeighbour_2D(self, targetVectorDict, listOfVectorDicts, expectedNearestNeighbour):
        nearestNeighbour = self.literalObserver.getNearestNeighbour(targetVectorDict, listOfVectorDicts)
        self.assertEqual(nearestNeighbour(), expectedNearestNeighbour)
       
    #case1 and 2- choosing a path around the potential trap state which corresponds to it giving negative reward, case3: choosing a path through the potential trap state which corresponds to it being safe
    @data( ( ((0,1), sup.HashableBelief({(sup.HashableWorld({'white':0, 'yellow':10, 'orange':0}, True), (2,1)):0.5, (sup.HashableWorld({'white':0, 'yellow':10, 'orange':-2}, True), (2,1)):0.5 })), (0,1), (0,2), sup.HashableBelief({(sup.HashableWorld({'white':0, 'yellow':10, 'orange':0}, True), (2,1)):0, (sup.HashableWorld({'white':0, 'yellow':10, 'orange':-2}, True), (2,1)):1 }) ), 
           ( ((0,1), sup.HashableBelief({(sup.HashableWorld({'white':0, 'yellow':10, 'orange':0}, True), (2,1)):0.5, (sup.HashableWorld({'white':0, 'yellow':10, 'orange':-2}, True), (2,1)):0.5 })), (0,-1), (0,0), sup.HashableBelief({(sup.HashableWorld({'white':0, 'yellow':10, 'orange':0}, True), (2,1)):0, (sup.HashableWorld({'white':0, 'yellow':10, 'orange':-2}, True), (2,1)):1 }) ), 
           ( ((0,1), sup.HashableBelief({(sup.HashableWorld({'white':0, 'yellow':10, 'orange':0}, True), (2,1)):0.5, (sup.HashableWorld({'white':0, 'yellow':10, 'orange':-2}, True), (2,1)):0.5 })), (1,0), (1,1), sup.HashableBelief({(sup.HashableWorld({'white':0, 'yellow':10, 'orange':0}, True), (2,1)):1, (sup.HashableWorld({'white':0, 'yellow':10, 'orange':-2}, True), (2,1)):0 }) ))
    @unpack
    def test_classLiteralObserver(self, jointState, action, nextObjectState, expectedBelief):
        outputBelief = self.getNextBelief(jointState, action, nextObjectState)
        self.assertEqual(outputBelief, expectedBelief)
    
    #both cases for moving to a non-informative state with 0 object reward. 
    @data( ( ((0,1), sup.HashableBelief({(sup.HashableWorld({'white':0, 'yellow':10, 'orange':0}, True), (2,1)):0.5, (sup.HashableWorld({'white':0, 'yellow':10, 'orange':-2}, True), (2,1)):0.5 })), (0,1), ((0,2), sup.HashableBelief({(sup.HashableWorld({'white':0, 'yellow':10, 'orange':0}, True), (2,1)):0, (sup.HashableWorld({'white':0, 'yellow':10, 'orange':-2}, True), (2,1)):1 })) ), 
           ( ((0,1), sup.HashableBelief({(sup.HashableWorld({'white':0, 'yellow':10, 'orange':0}, True), (2,1)):0.5, (sup.HashableWorld({'white':0, 'yellow':10, 'orange':-2}, True), (2,1)):0.5 })), (0,-1), ((0,0), sup.HashableBelief({(sup.HashableWorld({'white':0, 'yellow':10, 'orange':0}, True), (2,1)):0, (sup.HashableWorld({'white':0, 'yellow':10, 'orange':-2}, True), (2,1)):1 })) ) ) 
    @unpack
    def test_OBMDP_jointRewardFunction_expectedRewardNegative(self, jointState, action, nextJointState):
        outputReward = self.jointRewardFn(jointState, action, nextJointState)
        self.assertTrue(outputReward<0)
    
    #case 1: moving to an informative state with 0 object reward, case 2 and 3: moving to goal state
    @data( ( ((0,1), sup.HashableBelief({(sup.HashableWorld({'white':0, 'yellow':10, 'orange':0}, True), (2,1)):0.5, (sup.HashableWorld({'white':0, 'yellow':10, 'orange':-2}, True), (2,1)):0.5 })), (1,0), ((1,1), sup.HashableBelief({(sup.HashableWorld({'white':0, 'yellow':10, 'orange':0}, True), (2,1)):1, (sup.HashableWorld({'white':0, 'yellow':10, 'orange':-2}, True), (2,1)):0 })) ), 
           ( ((1,1), sup.HashableBelief({(sup.HashableWorld({'white':0, 'yellow':10, 'orange':0}, True), (2,1)):0.5, (sup.HashableWorld({'white':0, 'yellow':10, 'orange':-2}, True), (2,1)):0.5 })), (1,0), ((2,1), sup.HashableBelief({(sup.HashableWorld({'white':0, 'yellow':10, 'orange':0}, True), (2,1)):0.5, (sup.HashableWorld({'white':0, 'yellow':10, 'orange':-2}, True), (2,1)):0.5 })) ), 
           ( ((2,0), sup.HashableBelief({(sup.HashableWorld({'white':0, 'yellow':10, 'orange':0}, True), (2,1)):0.5, (sup.HashableWorld({'white':0, 'yellow':10, 'orange':-2}, True), (2,1)):0.5 })), (0,1), ((2,1), sup.HashableBelief({(sup.HashableWorld({'white':0, 'yellow':10, 'orange':0}, True), (2,1)):0.5, (sup.HashableWorld({'white':0, 'yellow':10, 'orange':-2}, True), (2,1)):0.5 })) ) )
    @unpack
    def test_OBMDP_jointRewardFunction_expectedRewardPositive(self, jointState, action, nextJointState):
        outputReward = self.jointRewardFn(jointState, action, nextJointState)
        self.assertTrue(outputReward>0)

    @data( ( ((0,1), sup.HashableBelief({(sup.HashableWorld({'white': 0, 'yellow': 10, 'orange': 0}, True),(2,1)):0.5, (sup.HashableWorld({'white': 0, 'yellow': 10, 'orange': -2}, True), (2,1)):0.5})), (0,1) ), 
           ( ((1,2), sup.HashableBelief({(sup.HashableWorld({'white': 0, 'yellow': 10, 'orange': 0}, True),(2,1)):0.5, (sup.HashableWorld({'white': 0, 'yellow': 10, 'orange': -2}, True), (2,1)):0.5})), (1,0) ) )
    @unpack
    def test_jointTransitionFunction_checkDeterminsitic(self, jointState, action):
        nextStateDictionary = self.jointTransitionFn(jointState, action)
        numberOfNextStates = len(list(nextStateDictionary.keys()))
        self.assertEqual(numberOfNextStates, 1)
        
        
    @data( ((((0,1), sup.HashableBelief({(sup.HashableWorld({'white': 0, 'yellow': 10, 'orange': 0}, True),(2,1)):0.5, (sup.HashableWorld({'white': 0, 'yellow': 10, 'orange': -2}, True), (2,1)):0.5})), (0,1), ((0,2), sup.HashableBelief({(sup.HashableWorld({'white': 0, 'yellow': 10, 'orange': 0}, True), (2,1)):0, (sup.HashableWorld({'white': 0, 'yellow': 10, 'orange': -2}, True), (2,1)):1})))), 
           ((((0,1), sup.HashableBelief({(sup.HashableWorld({'white': 0, 'yellow': 10, 'orange': 0}, True),(2,1)):0.5, (sup.HashableWorld({'white': 0, 'yellow': 10, 'orange': -2}, True), (2,1)):0.5})), (1,0), ((1,1), sup.HashableBelief({(sup.HashableWorld({'white': 0, 'yellow': 10, 'orange': 0}, True), (2,1)):1, (sup.HashableWorld({'white': 0, 'yellow': 10, 'orange': -2}, True), (2,1)):0})))) )
    @unpack
    def test_jointTransitionFunction_checkNextJointState(self, jointState, action, nextJointState):
        probabilityOfTransition = self.jointTransitionFn(jointState, action)[nextJointState]
        expectedProbability = 1
        self.assertEqual(probabilityOfTransition, expectedProbability)
    
    def tearDown(self):
        pass

if __name__ == '__main__':
    unittest.main(argv=['first-arg-is-ignored'], exit=False)


.......................
----------------------------------------------------------------------
Ran 23 tests in 0.080s

OK
