<a href="https://colab.research.google.com/github/rkoikeHWU/f91a/blob/main/MDP_SmartBuilding.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
""" code from https://github.com/aimacode/aima-python/blob/master/mdp4e.py adapted to SmartBuilding. """

"""
Markov Decision Processes (Chapter 16)

First we define an MDP, and the special case of a GridMDP, in which
states are laid out in a 2-dimensional grid. We also represent a policy
as a dictionary of {state: action} pairs, and a Utility function as a
dictionary of {state: number} pairs. We then define the value_iteration
and policy_iteration algorithms.
"""

# ______________________________________________________________________________
# Core MDP class (adapted from AIMA)

class MDP:
    """A Markov Decision Process, defined by an initial state, transition model,
    and reward function. We also keep track of a gamma value, for use by
    algorithms. The transition model is represented somewhat differently from
    the text. Instead of P(s' | s, a) being a probability number for each
    state/state/action triplet, we instead have T(s, a) return a
    list of (p, s') pairs. We also keep track of the possible states,
    terminal states, and actions for each state. [Page 646]"""

    def __init__(self, init, actlist, terminals, transitions=None, reward=None, states=None, gamma=0.9):
        if not (0 < gamma <= 1):
            raise ValueError("An MDP must have 0 < gamma <= 1")

        # collect states from transitions table if not passed.
        self.states = states or self.get_states_from_transitions(transitions)

        self.init = init

        if isinstance(actlist, list):
            # if actlist is a list, all states have the same actions
            self.actlist = actlist

        elif isinstance(actlist, dict):
            # if actlist is a dict, different actions for each state
            self.actlist = actlist

        self.terminals = terminals
        self.transitions = transitions or {}
        if not self.transitions:
            print("Warning: Transition table is empty.")

        self.gamma = gamma

        self.reward = reward or {s: 0 for s in self.states}

    def R(self, state):
        """Return a numeric reward for this state."""

        return self.reward[state]

    def R(self, state, action):
        """Polymorphic method to retun reward using state/action pair"""
        if action is None:
            return 0.0 # no action gets 0 reward

        return float(self.reward_sa[(state, action)])

    def T(self, state, action):
        """Transition model. From a state and an action, return a list
        of (probability, result-state) pairs."""

        if not self.transitions:
            raise ValueError("Transition model is missing")
        else:
            return self.transitions[state][action]

    def actions(self, state):
        """Return a list of actions that can be performed in this state. By default, a fixed list of actions, except for terminal states. Override this method if you need to specialize by state."""

        if state in self.terminals:
            return [None]
        else:
            return self.actlist

    def get_states_from_transitions(self, transitions):
        if isinstance(transitions, dict):
            s1 = set(transitions.keys())
            s2 = set(tr[1] for actions in transitions.values()
                     for effects in actions.values()
                     for tr in effects)
            return s1.union(s2)
        else:
            print('Could not retrieve states from transitions')
            return None

# ______________________________________________________________________________
# 16.1.3 The Bellman equation for utilities

def q_value(mdp, s, a, U):
    """Reward depends on state, action) not just state. So we use mdp.R(s,a) to get q value."""

    if not a:
        return 0.0 # if no action return 0

    res = 0.0
    for p, s_prime in mdp.T(s, a):
        res += p * (mdp.R(s, a) + mdp.gamma * U[s_prime])
    return res
#______________________________________________________________________________
# Value Iteration (prints V0, V1, V2, V3 then converges)

def value_iteration(mdp, epsilon=0.001):

    U1 = {s: 0.0 for s in mdp.states}
    gamma = mdp.gamma

    def show(k, U):
   #     print(f"V{k}: " + ", ".join(f"{s}={U[s]:.4f}" for s in order))
      print(f"V{k}: {U['L']}, {U['M']}, {U['H']}")
      print("-------------------------")

    # Print V0
    show(0, U1)

    k = 0
    while True:
        k += 1
        U = U1.copy()
        delta = 0.0

        for s in mdp.states:
            U1[s] = max(q_value(mdp, s, a, U) for a in mdp.actions(s))
            delta = max(delta, abs(U1[s] - U[s]))

        if k <= 3: # print first 3 only
          show(k, U1)

        if delta <= epsilon * (1 - gamma) / gamma:
            return U1


# ______________________________________________________________________________
# Extract optimal policy

def best_policy(mdp, U):
    """Given an MDP and a utility function U, determine the best policy,
    as a mapping from state to action. [Equation 17.4]"""

    pi = {}
    for s in mdp.states:
        pi[s] = max(mdp.actions(s), key=lambda a: expected_utility(a, s, U, mdp))
    return pi


def expected_utility(a, s, U, mdp):
    """
    Return the expected utility of taking action a in state s.

    Rewards depend on (state, action), not just state. So we use mdp.R(s,a) to get the immediare reward and add this to the discounted future reward.
    """

    immediate = mdp.R(s, a)
    future = sum(p * U[s1] for (p, s1) in mdp.T(s, a))

    return immediate + mdp.gamma * future

# ______________________________________________________________________________
# Smart Building MDP

class SmartBuildingMDP(MDP):

    def __init__(self, init, actlist, terminals, transitions, reward_sa, gamma=0.9):
        states = self.get_states_from_transitions(transitions)
        super().__init__(init, actlist, terminals,
                         transitions=transitions,
                         reward={s: 0 for s in states},
                         states=states,
                         gamma=gamma)
        self.reward_sa = reward_sa

    def R(self, state, action):
        if action is None:
            return 0.0
        return float(self.reward_sa[(state, action)])


# ______________________________________________________________________________
# Define Smart Building transitions & rewards

transitions = {
    "L": {"E": [(1.0, "L")],
          "B": [(0.5, "M"), (0.5, "L")]},

    "M": {"E": [(0.7, "M"), (0.3, "L")],
          "B": [(0.6, "H"), (0.4, "M")]},

    "H": {"E": [(0.8, "M"), (0.2, "H")],
          "B": [(1.0, "H")]}
}

reward_sa = {
    ("L","E"): 1,
    ("L","B"): 3,
    ("M","E"): 2,
    ("M","B"): 4,
    ("H","E"): 1,
    ("H","B"): -8
}

mdp = SmartBuildingMDP(
    init="L",
    actlist=["E","B"],
    terminals=[],
    transitions=transitions,
    reward_sa=reward_sa,
    gamma=0.9
)

# ______________________________________________________________________________
# Run value iteration and print results

print("Smart Building MDP")
print("Policy for L, M, H Energy States")
print("-------------------------")
U_star = value_iteration(mdp)

print("\nV*: " + ", ".join(f"{s}={U_star[s]:.6f}" for s in ["L","M","H"]))

policy = best_policy(mdp, U_star)
print("pi*: " + ", ".join(f"{s}->{policy[s]}" for s in ["L","M","H"]))


Smart Building MDP
Policy for L, M, H Energy States
-------------------------
V0: 0.0, 0.0, 0.0
-------------------------
V1: 3.0, 4.0, 1.0
-------------------------
V2: 6.15, 5.98, 4.06
-------------------------
V3: 8.4585, 8.3452, 6.0364
-------------------------

V*: L=28.434840, M=28.087246, H=25.881364
pi*: L->B, M->B, H->E
