# Coding the policy gradient method in PyTorch

***

In [1]:
import numpy as np
import torch
import torch.nn as nn
from torch.distributions import Categorical

In [2]:
class CustomEnv:

    def __init__(self, 
                 max_pos=5, 
                 max_steps=50):
        
        self.max_pos = max_pos
        self.max_steps = max_steps

    def reset(self):
        
        self.x = 0
        self.t = 0
        
        return self._state()

    def step(self, a):
        
        self.t += 1

        if a == 0:
            self.x += -1 
        else:
            self.x += 1 

        # Clip the state:
        self.x = max(-self.max_pos, min(self.max_pos, self.x))

        done = False
        
        r = -0.01

        # Termination criteria:
        if self.x >= self.max_pos:
            
            r = 1.0
            
            done = True
      
        if self.t >= self.max_steps:
            
            done = True
            
        return self._state(), r, done

    def _state(self):

        state = torch.tensor([self.x / float(self.max_pos)], dtype=torch.float32)
        
        return state

In [3]:
env = CustomEnv()

In [4]:
class Policy(nn.Module):
    
    def __init__(self):
        
        super().__init__()
        
        self.net = nn.Sequential(
            nn.Linear(1, 8), 
            nn.Tanh(),
            nn.Linear(8, 8), 
            nn.Tanh(),
            nn.Linear(8, 2)
        )

    def forward(self, x):
        
        logits = self.net(x)
        
        return Categorical(logits=logits)

In [5]:
policy = Policy()

In [6]:
def generate_τ(env, 
               policy, 
               γ=0.95):

    # Reset the environment to an initial state, s_0:
    state = env.reset()
    
    ln_π_list = []
    rewards = []
    done = False

    # Step in the environment until termination:
    while not done:

        # Get the output of π for the current state:
        distribution = policy(state.unsqueeze(0))

        # Sample action from the current π:
        action = distribution.sample()

        # Compute the ln of π for that action:
        ln_π = distribution.log_prob(action)

        # Take a step in the environment:
        state, r, done = env.step(action.item())

        # Save the current ln(π) and reward:
        ln_π_list.append(ln_π)
        rewards.append(r)

    # Once τ has terminated: - - - - - - - - - - - - -

    # Compute the total return from this τ:
    R = sum([(γ ** t) * r for t, r in enumerate(rewards)])
    R = torch.tensor(R, dtype=torch.float32)

    # Compute the sum of ln(π) across the trajectory:
    sum_ln_π = torch.stack(ln_π_list).sum()
    
    return sum_ln_π, R

In [7]:
n_episodes = 1000
N = 10
α = 1e-3
γ = 0.95

In [8]:
θ = policy.parameters()
optimizer = torch.optim.Adam(θ, lr=α)

In [9]:
for _ in range(1, n_episodes+1):
    
    optimizer.zero_grad()

    loss = 0.0

    # Monte Carlo estimate from N trajectories:
    for _ in range(0, N):
        
        sum_ln_π, R = generate_τ(env, policy, γ)
        
        loss += -(R.detach() * sum_ln_π)

    # Compute the arithmetic average:
    loss /= N

    # Backpropagate loss and update θ:
    loss.backward()
    optimizer.step()

***