In [None]:
class BraessNetwork(object):
    """Stores the cost for all links. Handles calculating the cost of a path given action
       of every car.
    """
    def __init__(self):
        self.__links = {
            "AB": lambda f: 1 + (f/100),
            "AC": lambda _: 2,
            "BD": lambda _: 2,
            "CD": lambda f: 1 + (f/100),
            "BC": lambda _: 0.25
        } # Dictionary of links and their congestion functions
        self.__paths = {
            "ABD": ("AB", "BD"),
            "ACD": ("AC", "CD"),
            "ABCD": ("AB", "BC", "CD")
        } # Dictionaries of paths to links
        self.total_flow = 100  # 100 cars in total on this network
        return 
    
    @property
    def routes(self):
        """Gives a list of all possible paths in the network to the environment. 
           The environment could then assign an action number to each path. 
        """
        return ("ABD", "ACD", "ABCD")
    
    def calculate_ttime(self, flows):
        """Given a dictionary of paths and flows, this function returns a dictionary of 
           paths and travel time (secs), a.k.a ttime.
           
           Arg:
               flows (dictionary): A dictionare where the key correspond to a path in the network of one o-d pair
                                   and the value corresponds to the flow on that path. Flow will be a float between 
                                   0 and 1 represent the percent of flow. 
           
           Returns: 
               travel_times (dictionary): A dictionary of paths to their travel times
        """
        congestion = {}
        for path in flows:
            links = self.__paths[path]
            for link in links:
                if link not in congestion:
                    congestion[link] = 0
                congestion[link] += flows[path] * self.total_flow
        
        t_time = {}
        for path in flows:
            total_time = 0
            # Calculate travel time of path by adding the congestion time of every 
            # link in that path
            links = self.__paths[path]
            for link in links:
                t_time_func = self.__links[link]
                total_time += t_time_func(congestion[link])
            t_time[path] = total_time
        
        return t_time

In [None]:
# Tests
network = BraessNetwork()

# Test 1 for calculate ttime
#
#                  B
#                / | \                   
#             /    |    \
#          A       |       D
#             \    |    /
#                \ | /
#                  C
#
# Out of 100 cars, we will do: 
#     ABD = 25; 
#     ACD = 25; 
#     ABCD = 50 
# 
# The travel time on each path should result as:
#     ABD = 3.75 (units)
#     ACD = 3.75 (units)
#     ABCD = 3.75 (units)
#
flows = {
    "ABD": 0.25,
    "ACD": 0.25,
    "ABCD": 0.50
}
expect1 = {
    "ABD": 3.75,
    "ACD": 3.75,
    "ABCD": 3.75
}
times = network.calculate_ttime(flows)
print("This is what was given: " + str(times))
print("This is what I expect: " + str(expect1))
print("---")

# Test 2 for calculate ttime
# Out of 100 cars, we will do: 
#     ABD = 50; 
#     ACD = 50; 
#     ABCD = 0 
# 
# The travel time on each path should result as:
#     ABD = 3.5 (units)
#     ACD = 3.5 (units)
#     ABCD = 3.25 (units)  - Even though no one's using this path
flows = {
    "ABD": 0.50,
    "ACD": 0.50,
    "ABCD": 0
}
expect2 = {
    "ABD": 3.5,
    "ACD": 3.5,
    "ABCD": 3.25
}
times = network.calculate_ttime(flows)
print("This is what was given: " + str(times))
print("This is what I expect: " + str(expect2))

In [None]:
### 
# MDP: (All characteristics of this MDP is given in W. Krichene's Paper 
#                            -- "Learning Nash Equilibria in Congestion Games")
#
#  - Observations/States: Each player will observe the cost (travel time) on all of the paths (according to paper)
#                         If the player only observes the loss she incurs then it becomes a multiarmed bandit 
#                         setting.
#  - Actions: Each player will choose a path, using a randomized/mixed strategy. 
#             This means we have a stochastic policy.
#  - Reward: The *cost* of each player will be the travel time that they've incurred on their path. T
#            Each player wants to minimize their travel time. For reward, we can maximize the negative cost.
#  - Model of the environment: We don't have one in this case
#
###

# Test 1: Test that reset() returns an initial observation. 
#         The initial observation should be:
#                "ABD": 3
#                "ACD": 3
#                "ABCD": 2.25
class TestBraessEnv(unittest.TestCase):
    def testResetReturnsObservation(self):
        env = BraessEnv()
        init = env.reset()
        expected = {
            "ABD": 3,
            "ACD": 3,
            "ABCD": 2.25
        }
        self.assertDictEqual(expected, init)
        
    def testStepReturnsCorrectInformation(self):
        # Test that step returns correct next observations, reward, and termination signal
        env = BraessEnv()
        _ = env.reset()
        # Note: Currently, it's expected that the single car will specify a number that corresponds to a route.
        #       This will have to change when we switch to the nonatomic setting.
        action = {
            "ABD": 0.01,
            "ACD": 0,
            "ABCD": 0
        }  # Representing 1 flow of .01 taking path "ABD"
        next_obs, reward, done, _ = env.step(action)
        expected_obs = {
            "ABD": 3.01,
            "ACD": 3,
            "ABCD": 2.26
        }
        expected_reward = {
            "ABD": -3.01,
            "ACD": -3,
            "ABCD": -2.26
        } # The reward should be the negative of the travel times
        expected_done = True
        
        self.assertDictEqual(expected_obs, next_obs)
        self.assertEqual(expected_reward, reward)
        self.assertEqual(expected_done, done)
        
    def testStepSavesNoPrevInfo(self):
        env = BraessEnv()
        env.reset()
        action = {
            "ABD": .01,
            "ACD": 0,
            "ABCD": 0
        }  # Representing flow of .01 taking path "ABD"
        next_obs, reward, done, _ = env.step(action)
        
        action = {
            "ABD": 0,
            "ACD": 0,
            "ABCD": 0.01
        } # Flow of 0.01 taking "ABCD"
        next_obs, reward, done, _ = env.step(action)
        expected_obs = {
            "ABD": 3.01,
            "ACD": 3.01,
            "ABCD": 2.27
        }
        expected_reward = {
            "ABD": -3.01,
            "ACD": -3.01,
            "ABCD": -2.27
        }
        expected_done = True
        
        self.assertDictEqual(expected_obs, next_obs)
        self.assertEqual(expected_reward, reward)
        self.assertEqual(expected_done, done)