In [3]:
import os
import torch as T
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.distributions.normal import Normal
import numpy as np

In [4]:
class ActorNetwork(nn.Module):
    def __init__(self, alpha, input_dims, fc1_dims, fc2_dims, max_action,
            n_actions, name, chkpt_dir='tmp/sac'):
        super(ActorNetwork, self).__init__()
        self.input_dims = input_dims
        self.fc1_dims = fc1_dims
        self.fc2_dims = fc2_dims
        self.n_actions = n_actions
        self.name = name
        self.max_action = max_action
        self.checkpoint_dir = chkpt_dir
        self.checkpoint_file = os.path.join(self.checkpoint_dir, name+'_sac')
        self.reparam_noise = 1e-6

        self.fc1 = nn.Linear(*self.input_dims, self.fc1_dims)
        self.fc2 = nn.Linear(self.fc1_dims, self.fc2_dims)
        self.mu = nn.Linear(self.fc2_dims, self.n_actions)
        self.sigma = nn.Linear(self.fc2_dims, self.n_actions)

        self.optimizer = optim.Adam(self.parameters(), lr=alpha)
        self.device = T.device('cuda:0' if T.cuda.is_available() else 'cpu')

        self.to(self.device)

    def forward(self, state):
        prob = self.fc1(state)
        prob = F.relu(prob)
        prob = self.fc2(prob)
        prob = F.relu(prob)

        mu = self.mu(prob)
        sigma = self.sigma(prob)
        sigma = T.clamp(sigma, min=-20, max=2) # sigma = T.clamp(sigma, min=self.reparam_noise, max=1) 

        return mu, sigma

    def sample_normal(self, state, reparameterize=True):
        mu, sigma = self.forward(state)
        probabilities = T.distributions.Normal(mu, sigma)

        if reparameterize:
            actions = probabilities.rsample() # reparameterizes the policy
        else:
            actions = probabilities.sample()

        action = T.tanh(actions)*T.tensor(self.max_action).to(self.device) 
        log_probs = probabilities.log_prob(actions)
        log_probs -= T.log(1-action.pow(2) + self.reparam_noise)
        log_probs = log_probs.sum(1, keepdim=True)

        return action, log_probs

    def sample_mvnormal(self, state, reparameterize=True):
        """
            Doesn't quite seem to work.  The agent never learns.
        """
        mu, sigma = self.forward(state)
        n_batches = sigma.size()[0]

        cov = [sigma[i] * T.eye(self.n_actions).to(self.device) for i in range(n_batches)]
        cov = T.stack(cov)
        probabilities = T.distributions.MultivariateNormal(mu, cov)

        if reparameterize:
            actions = probabilities.rsample() # reparameterizes the policy
        else:
            actions = probabilities.sample()

        action = T.tanh(actions) # enforce the action bound for (-1, 1)
        log_probs = probabilities.log_prob(actions)
        log_probs -= T.sum(T.log(1-action.pow(2) + self.reparam_noise))
        log_probs = log_probs.sum(-1, keepdim=True)

        return action, log_probs

In [10]:
act = ActorNetwork(0.1, (4,), 256, 256, 10, 2, 'actor')

In [11]:
x = T.rand((32, 4))

In [12]:
act.forward(x)

(tensor([[ 0.0344, -0.0445],
         [ 0.0829,  0.0051],
         [ 0.0970, -0.0068],
         [ 0.0766, -0.0238],
         [ 0.1017, -0.0137],
         [ 0.0328, -0.0363],
         [ 0.0680, -0.0656],
         [ 0.0219, -0.0494],
         [ 0.0557, -0.0251],
         [ 0.0741, -0.0268],
         [ 0.0509, -0.0204],
         [ 0.1077,  0.0048],
         [ 0.1028, -0.0145],
         [ 0.1087, -0.0030],
         [ 0.0233, -0.0574],
         [ 0.0518, -0.0466],
         [ 0.0712,  0.0016],
         [ 0.0949, -0.0205],
         [ 0.0727, -0.0095],
         [ 0.0764, -0.0144],
         [ 0.0918,  0.0023],
         [ 0.0823, -0.0169],
         [ 0.0890, -0.0467],
         [ 0.0381, -0.0457],
         [ 0.0806,  0.0118],
         [ 0.0350, -0.0481],
         [ 0.0553, -0.0658],
         [ 0.0995,  0.0071],
         [ 0.1015, -0.0335],
         [ 0.0850,  0.0007],
         [ 0.0905,  0.0001],
         [ 0.0401, -0.0427]], grad_fn=<AddmmBackward>),
 tensor([[-0.0315,  0.0717],
         [-0.049