## Assignments from january 11

- Write out the MP/MRP definitions and MRP Value Function definition (in LaTeX) in your own style/notation (so you really internalize these concepts) $\Done$
- Think about the data structures/class design (in Python 3) to represent MP/MRP and implement them with clear type declarations $\Not\;\done$ -- I will try to see how this works later. For now I am sticking with my undefined data structures.
- Remember your data structure/code design must resemble the Mathematical/notational formalism as much as possible $\Not\;\Done$
- Specifically the data structure/code design of MRP should be incremental (and not independent) to that of MP $\Done$
- Separately implement the $r(s,s')$ and the $\mathcal{R}(s) = \sum_{s'} p(s,s') \cdot r(s,s')$ definitions of MRP $\Done$
- Write code to convert/cast the r(s,s') definition of MRP to the R(s) definition of MRP (put some thought into code  design here) $\Done$
- Write code to generate the stationary distribution for an MP $\Done$

Firstly, we have some definitions. The value function $v(s)$ is defines as 
$$v(s)=\mathbb{E}\Big[\sum_{i=0}^{\infty}\gamma^iR_{t+i+1}\Big|S_t=s\Big],$$
where $R_i$ is the reward at time $i$ and $\gamma$ is the discount factor. Furthermore, the reward function $\mathcal{R}(s)$ is defined as $$\mathcal{R}(s)=\mathbb{E}[R_{t}|S_{t-1}=s].$$
Following the notations from Sutton's book we also have the function $r(s,s')$ such that 
$$r(s,s')=\mathbb{E}[R_t|S_{t-1}=s\;\cap\;S_t=s']$$ and the function $$p(s,s')=\mathbb{P}(S_t=s'|S_{t-1}=s).$$
Thus we see that 
\begin{equation*}
    \begin{split}
        \mathcal{R}(s)&=\mathbb{E}[R_{t}|S_{t-1}=s]\\
        &=\sum_{s'}R_t(\{\texttt{reward after state }
        s'\})\mathbb{P}(S_t=s'|S_{t-1}=s)\\
        &=\sum_{s'}\mathbb{E}[R_t|S_{t-1}=s\;\cap\;S_t=s']\mathbb{P}(S_t=s'|S_{t-1}=s)\\
        &=\sum_{s'}r(s,s')p(s,s').
    \end{split}
\end{equation*}

First we import the necesarry packages.

In [335]:
import numpy as np

**Markov Process** - defined by states and probability matrix

In [539]:
class MP(object):
    def __init__(self, ProbDistribution,States:list=None, print_info:bool=False, Name:str="Nameless"):
        self.Name=Name
        if States == None:
            self.States = [i for i in range(len(ProbDistribution))]
        else:
            self.States = States
        self.ProbDistribution = ProbDistribution
        self.print_info = print_info
        if print_info == True:
            print("The Markov process", Name, "has been created. It has", len(States), "states.")
        assert len(self.States) == len(self.ProbDistribution)
    
    def Generate_Stationary_Dist(self):
        eigenvalues, eigenvectors = np.linalg.eig(self.ProbDistribution.T)
        stat_dist=np.zeros((len(self.States),1))
        for i in range(len(eigenvalues)):
            if abs(eigenvalues[i]-1) < 1e-8:
                stat_dist = stat_dist + eigenvectors[:,i].reshape(len(self.States),1)
        return((stat_dist/np.sum(stat_dist)).reshape(1,len(self.States)))
    
    def Simulate(self,steps:int=10,start=None,print_text=False):
        #steps=len(self.States)
        if start == None:
            start = self.States[0]
        path = [start]
        current_activity = start
        i=0
        while i < steps:
            for j in range(len(self.States)):
                if current_activity == self.States[j]:
                    RV = np.random.choice(self.States,replace=True,p=self.ProbDistribution[j])
                    for k in range(len(self.States)):
                        if RV == self.States[k]:
                            if self.ProbDistribution[j][k] == 1 and k == j:
                                if print_text == True:
                                    print("The procces reached the termination state", "'",self.States[j],"'", "after", i, "steps.")
                                i = steps
                                break
                            path.append(self.States[k])
                            current_activity = self.States[k]
                            break
                    break
            i += 1
        if print_text == True:
            print("The path was:", path)
        return(path)
    

**Now we can define a Markov Reward Process**

In [803]:
class MRP(MP):
    def __init__(self,ProbDistribution,Rewards,States:list=None,gamma=1,ABC="A"):
        MP.__init__(self,ProbDistribution,States)
        self.ABC=ABC
        if self.ABC == "A":
            assert Rewards.shape == (len(ProbDistribution), 1)
            self.Rewards=Rewards
        elif self.ABC == "B":
            assert Rewards.shape == (len(ProbDistribution), len(ProbDistribution))
            self.Rewards=np.diag(np.dot(self.ProbDistribution,Rewards)).reshape(len(States),1)
        elif self.ABC == "C":
            print("Please use definition A or B") 
            #i.e. no idea what to do here
        self.gamma=gamma
        
    def Get_Expected_Reward_one_state(self,start=None):
        if start == None:
            return(np.dot(self.ProbDistribution,self.Rewards))
        else:
            for i in range(len(self.States)):
                if self.States[i]==start:
                    return((np.dot(self.ProbDistribution,self.Rewards))[i])
                
    def Get_Value_Function(self):
        if np.linalg.det(self.ProbDistribution) > 1e-5:
            R=np.dot(self.ProbDistribution,self.Rewards)
            inverse=np.linalg.inv(np.identity(len(self.States))-self.gamma*self.ProbDistribution)
            return(np.dot(inverse,R))
        else:
            print("Determinant Zero -- wait for simulation")
            VF=np.zeros((len(self.States),1))
            for item in self.States:
                n=4000
                test=0
                for i in range(n):
                    test+=self.Simulate_Rewards(n,item)
                VF[self.States.index(item)]=test/n
            return(VF)
    
    def rss_To_RS(self):
        assert self.ABC=="B"
        return(self.Rewards)
        
    def Simulate_Rewards(self,steps:int=10,start=None,print_text=False):
        if isinstance(start,(int,float)) == True:
            start=self.States[int(start)]
        path = self.Simulate(steps,start,print_text)
        accumulated_reward=0
        i=0
        for item in path:
            path_ind=self.States.index(item)
            accumulated_reward=accumulated_reward+self.Rewards[path_ind]*self.gamma**i
            i+=1
        if print_text == True:
            print('The path Resulted in a value of ', float(accumulated_reward))
        return(float(accumulated_reward))

**We can now test how the code runs** - we use the example from the lecture slides.

In [823]:
P = np.array([[0,0.5,0,0,0,0.5,0],
              [0,0,0.8,0,0,0,0.2],
              [0,0,0,0.6,0.4,0,0],
              [0,0,0,0,0,0,1],
              [0.2,0.4,0.4,0,0,0,0],
              [0.1,0,0,0,0,0.9,0],
              [0,0,0,0,0,0,1]])
S=["C1","C2","C3","Pass","Pub","FB","Sleep"]
R=np.array([-2,-2,-2,10,1,-1,0]).reshape(len(P),1)

In [643]:
M_R_P=MRP(P,R,S)
M_R_P.Get_Value_Function()

Determinant Zero -- wait for simulation


array([[-12.6332],
       [  1.6798],
       [  4.4716],
       [ 10.    ],
       [  0.4838],
       [-22.9384],
       [  0.    ]])

## Assignments from January 16

- Write the Bellman equation for MRP Value Function and code to calculate MRP Value Function (based on Matrix inversion method you learnt in this lecture) -- $\Done $
- Write out the MDP definition, Policy definition and MDP Value Function definition (in LaTeX) in your own style/notation (so you really internalize these concepts) 
- Think about the data structure/class design (in Python 3) to represent MDP, Policy, Value Function, and implement them with clear type definitions $\Not \;\done$
- The data structure/code design of MDP should be incremental (and not independent) to that of MRP
- Separately implement the $r(s,s',a)$ and $R(s,a) = \sum_{s'} p(s,s',a) * r(s,s',a)$ definitions of MDP $\Not \;\done$ -- again, not really sure what to do here....
- Write code to convert/cast the $r(s,s',a)$ definition of MDP to the $R(s,a)$ definition of MDP (put some thought into code design here) $\Not\;\Done$ -- also no idea
- Write code to create a MRP given a MDP and a Policy $\Done$
- Write out all 8 MDP Bellman Equations and also the transformation from Optimal Action-Value function to Optimal Policy (in LaTeX) $\Done$
                                 

A MDP is a MRP with actions $A$ at some states of the MRP. A policy $\pi(a|s)$ is the probability that the agent does action $a$ in in state $s$. The value function of an MDP is defined in the same way as a value function for an MRP, but taking the action into account as well. We denote the value function for a given policy $\pi$ as $$q_\pi(s,a)=\mathbb{E}\Big[\sum_{i=0}^{\infty}\gamma^iR_{t+i+1}\Big|S_t=s\;\cap\;A_t=a\Big].$$

In [827]:
class MDP(MRP):
    def __init__(self,ProbDistribution,Rewards,States:list=None,gamma=1,ABC="A",Action_States=None,Policy=None,Feasible_Steps=None):
        MRP.__init__(self,ProbDistribution,Rewards,States,gamma,ABC)
        self.Policy=Policy
        self.Action_States=Action_States
        self.Feasible_Steps=Feasible_Steps
        if np.all(self.Policy) != None and self.Action_States != None:
            for item in self.Action_States:
                if isinstance(item, str) == True:
                    self.ProbDistribution[self.States.index(item)] = self.Policy[self.States.index(item)]
                elif isinstance(item, (float,int)) == True:
                    self.ProbDistribution[int(item)] = self.Policy[int(item)]
        elif np.all(self.Policy) == None and self.Action_States != None and np.all(self.Feasible_Steps) !=None:
            k=0
            for item in self.Action_States:
                if isinstance(item, str) == True:
                    i=self.States.index(item)
                elif isinstance(item,(float,int)) == True:
                    i=item
                VF=self.Get_Value_Function()
                for j in range(len(VF)):
                    if self.Feasible_Steps[k,j] == 0:
                        VF[j]=0
                ind = np.unravel_index(np.argmax(VF, axis=None), VF.shape)
                new_dist=np.zeros((1,len(self.ProbDistribution)))
                new_dist[0,ind[0]] = 1
                self.ProbDistribution[i] = new_dist
                k+=1
            
                

The code takes either a policy or a set of possible directions. If the we input the feasible directions, the code finds the policy that gives the highest value function. Code belows shows the example from the lecture where you can go to either C1, C2 or C3 from the Pub.

In [824]:
M_D_P=MDP(P,R,S,1,"A",["Pub"],None,np.array([1,1,1,0,0,0,0]).reshape(1,7))

Determinant Zero -- wait for simulation


In [825]:
M_D_P.Get_Value_Function()

Determinant Zero -- wait for simulation


array([[-10.086 ],
       [  3.792 ],
       [  7.3315],
       [ 10.    ],
       [  8.3395],
       [-20.209 ],
       [  0.    ]])

As we see, the value function has now increased in all steps, as the process always goes to C3 from Pub instead. Below you can se that this has also changed in the transition matrix.

In [826]:
P

array([[0. , 0.5, 0. , 0. , 0. , 0.5, 0. ],
       [0. , 0. , 0.8, 0. , 0. , 0. , 0.2],
       [0. , 0. , 0. , 0.6, 0.4, 0. , 0. ],
       [0. , 0. , 0. , 0. , 0. , 0. , 1. ],
       [0. , 0. , 1. , 0. , 0. , 0. , 0. ],
       [0.1, 0. , 0. , 0. , 0. , 0.9, 0. ],
       [0. , 0. , 0. , 0. , 0. , 0. , 1. ]])