In [1]:
import numpy as np
import copy
import random

import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.distributions import Categorical
from torch.distributions import Normal

import gym
from collections import deque
import matplotlib.pyplot as plt
plt.rcParams['figure.figsize'] = (16, 10)

In [2]:
class LinearLayer(nn.Module):
    def __init__(self, in_features, out_features):
        super().__init__()
        self.linear_layer = nn.Sequential(nn.Linear(in_features, out_features), nn.ReLU())
    def forward(self, x):
        x = self.linear_layer(x)
        return x

In [3]:
class ActorStochastic(nn.Module):
    def __init__(self, s_dim, a_dim, action_boundaries, hidden_dim=256, num_hidden_layers=2):
        super(ActorStochastic, self).__init__()

        self.s_dim = s_dim
        self.a_dim = a_dim

        layers = [LinearLayer(s_dim, hidden_dim)]
        for _ in range(num_hidden_layers):
            layers.append(LinearLayer(hidden_dim, hidden_dim))

        self.f = nn.Sequential(*layers)
        self.mean = nn.Linear(hidden_dim, a_dim)
        self.log_std = nn.Linear(hidden_dim, a_dim)

        self.action_boundaries = action_boundaries

    def forward(self, state):
        if not isinstance(state, torch.Tensor):
            state = torch.tensor(state, dtype=torch.float32)

        out = self.f(state)
        mean = self.mean(out)
        log_std = self.log_std(out)

        #out = torch.tanh(out)
        #log_std = torch.tanh(log_std)
        #std = torch.exp(log_std)
        #noise = torch.normal(0, 1, out.shape[0])
        #action = mean + noise * std
        #dist=torch.distributions.Normal(mean, log_std)
        # log_std = torch.clamp(log_std, -20, 2)
        
        log_std = torch.clamp(log_std, -20, 2)
        
        return mean, log_std

    def sampling(self, state):
        mean, log_std = self.forward(state)
        std = log_std.exp()
        dist = Normal(mean, std)
        x_t = dist.rsample() #reparametrization trick implemented by pytorch
        action = torch.tanh(x_t) #Bounds the action
        log_prob = dist.log_prob(x_t) # Log probability(/ies if state in batch)
        log_Jacobian = torch.log(1-action**2+1e-10)#.sum(dim=1, keepdim=True)
        #print(log_Jacobian)
        log_prob = (log_prob-log_Jacobian).sum(dim=1, keepdim=True)
        return action, log_prob


In [4]:
class QDNN(nn.Module):
    def __init__(self, s_dim, a_dim, hidden_dim=256, num_hidden_layers=2):
        super(QDNN, self).__init__()

        self.s_dim = s_dim
        self.a_dim = a_dim

        layers = [LinearLayer(s_dim + a_dim, hidden_dim)]
        for _ in range(num_hidden_layers):
            layers.append(LinearLayer(hidden_dim, hidden_dim))
        layers.append(nn.Linear(hidden_dim, 1))

        self.f = nn.Sequential(*layers)

    def forward(self, state, action):
        x = torch.cat([state, action], dim=1)
        out = self.f(x)
        return out

In [5]:
class Critic(nn.Module):
    def __init__(self, s_dim, a_dim, hidden_dim=256, num_hidden_layers=2):
        super(Critic, self).__init__()

        self.s_dim = s_dim
        self.a_dim = a_dim

        # Two DNNs to mitigate positive bias
        self.Q1 = QDNN(s_dim, a_dim, hidden_dim, num_hidden_layers)
        self.Q2 = QDNN(s_dim, a_dim, hidden_dim, num_hidden_layers)

    def forward(self, state, action):
        q1 = self.Q1(state, action)
        q2 = self.Q2(state, action)
        return q1, q2

In [6]:
class SAC():
    def __init__(self, s_dim, a_dim, hidden_dim_actor=256, hidden_dim_critic=256, 
                 num_layer_actor=2, num_layer_critic=2, lr_act=3e-4, lr_crit=3e-4, 
                 gamma=0.99, tau=0.005, alpha=0.2, lambd=0.005, target_upd_inter=1, 
                 buffer_capacity=int(1000), batch_size=32, grad_steps = 1, device="cpu"):
        
        self.s_dim = s_dim
        self.a_dim = a_dim
        self.device = device
        self.batch_size = batch_size
        self.buffer = deque(maxlen=buffer_capacity)
        self.grad_steps = grad_steps

        self.alpha = alpha
        self.lambd = lambd
        self.gamma = gamma
        self.tau = tau
        self.target_upd_inter = target_upd_inter

        self.actor = ActorStochastic(s_dim, a_dim, hidden_dim_actor, num_hidden_layers=num_layer_actor).to(device)
        self.actor_optimizer = torch.optim.Adam(self.actor.parameters(), lr=lr_act)

        self.critic = Critic(s_dim, a_dim, hidden_dim_critic, num_hidden_layers=num_layer_critic).to(device)
        self.critic_optimizer = torch.optim.Adam(self.critic.parameters(), lr=lr_crit)
        self.critic_target = copy.deepcopy(self.critic).to(device)

    def choose_action(self, state, evaluate=False):
        #print(state.shape)
        if evaluate:
            # Choosing action to give to the environnement and not train the model
            with torch.no_grad():
                state = torch.tensor(state, dtype=torch.float32, device=self.device).unsqueeze(0)
                action, log_prob = self.actor.sampling(state)

            return action.cpu().detach().numpy()[0], log_prob
        
        state = torch.tensor(state, dtype=torch.float32, device=self.device)
        # To train actor model and critic
        action, log_prob = self.actor.sampling(state)
        
        return action, log_prob
    
    def critic_train(self, states, actions, rewards, next_states, dones):
        
        with torch.no_grad():
            next_actions, next_log_probs = self.choose_action(next_states, evaluate=False)
            q1_next, q2_next = self.critic_target(next_states, next_actions)
            min_q_next = torch.min(q1_next, q2_next)
            target_q_value = rewards + self.gamma *(torch.ones_like(dones)-dones).unsqueeze(1)*(min_q_next - self.alpha * next_log_probs)
            
        
        #print(target_q_value.shape)
        q1, q2 = self.critic(states, actions)
        #print(q1.shape, q2.shape)    
        critic_loss = F.mse_loss(q1, target_q_value) + F.mse_loss(q2, target_q_value)
        self.critic_optimizer.zero_grad()
        critic_loss.backward()
        self.critic_optimizer.step()

        return critic_loss

    
    def actor_train(self, states):
        actions, log_probs = self.choose_action(states, evaluate=False)
        q1_actor, q2_actor = self.critic(states, actions)
        min_q_actor = torch.min(q1_actor, q2_actor)

        # print(actions.shape, log_probs.shape)
        # print(min_q_actor.shape)
        #print((self.alpha * log_probs - min_q_actor).shape)
            
        actor_loss = (self.alpha * log_probs - min_q_actor).mean(dim=0)

        #print(actor_loss)
        self.actor_optimizer.zero_grad()
        actor_loss.backward()
        self.actor_optimizer.step()

        return actor_loss

    def train(self, update_interval):
        if len(self.buffer) < self.batch_size:
            return None, None

        for i in range(self.grad_steps):
            
            states, actions, rewards, next_states, dones = self.sample_batch()
            
            # Critic train
            self.critic.train()
            critic_loss= self.critic_train(states, actions, rewards, next_states, dones)

            # Actor train
            self.actor.train()
            actor_loss = self.actor_train(states)

            # Soft update of target networks
            if update_interval % self.target_upd_inter == 0:
                for target_parameters, parameters in zip(self.critic_target.parameters(), self.critic.parameters()):
                    target_parameters.data.copy_(self.tau * parameters.data + (1.0 - self.tau) * target_parameters.data)

        return critic_loss.item(), actor_loss.item()

    def add_elements_to_buffer(self, state, action, reward, next_state, done):
        self.buffer.append((state, action, reward, next_state, done))

    def sample_batch(self):
        batch = random.sample(self.buffer, self.batch_size)
        states, actions, rewards, next_states, dones = zip(*batch)
        states = torch.tensor(states, dtype=torch.float32).to(self.device)
        actions = torch.tensor(actions, dtype=torch.float32).to(self.device)
        rewards = torch.tensor(rewards, dtype=torch.float32).unsqueeze(1).to(self.device)
        next_states = torch.tensor(next_states, dtype=torch.float32).to(self.device)
        dones = torch.tensor(dones, dtype=torch.float32).to(self.device)

        return states, actions, rewards, next_states, dones

## Mountain car continuous

In [7]:
# Create the environment
env = gym.make('MountainCarContinuous-v0')

# Set device
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# Hyperparameters
s_dim = env.observation_space.shape[0]
a_dim = env.action_space.shape[0]
hidden_dim_actor = 64
hidden_dim_critic = 64
num_layer_actor = 2
num_layer_critic = 2
lr_act = 3e-4
lr_crit = 3e-4
gamma = 0.99
tau = 0.1
alpha = 0.1
batch_size = 16
num_episodes = 100
update_interval = 2
target_upd_inter = 1
buffer_capacity = int(1e6)
grad_step=2

#print(a_dim)
# Initialize SAC agent
agent = SAC(s_dim, a_dim, hidden_dim_actor=hidden_dim_actor, hidden_dim_critic=hidden_dim_critic, 
            num_layer_actor=num_layer_actor, num_layer_critic=num_layer_critic, lr_act=lr_act, 
            lr_crit=lr_crit, gamma=gamma, tau=tau, alpha=alpha, batch_size=batch_size, 
            grad_steps=grad_step, device=device, buffer_capacity=buffer_capacity)

# Training loop
episode_rewards = []
critic_loss=None
actor_loss=None

for episode in range(num_episodes):
    state = env.reset() # shape of (2,)
    episode_reward = 0
    for t in range(1000):
        action, _ = agent.choose_action(state, evaluate=True)
        next_state, reward, done, _ = env.step(action)
        agent.add_elements_to_buffer(state, action, reward, next_state, done)
        state = next_state.copy()
        episode_reward += reward
        
        if t % update_interval == 0:
            critic_loss, actor_loss = agent.train(t)

        if done:
            break

    episode_rewards.append(episode_reward)
    if critic_loss is not None:
        print("critic loss: ",critic_loss, "actor loss : ", actor_loss)
    if (episode + 1) % 10 == 0:
        print(f"Episode {episode + 1}, Reward: {episode_reward}")

# Plotting
plt.plot(episode_rewards)
plt.xlabel('Episode')
plt.ylabel('Reward')
plt.title('SAC on MountainCarContinuous-v0')
plt.show()

  deprecation(
  deprecation(
  if not isinstance(terminated, (bool, np.bool8)):
  states = torch.tensor(states, dtype=torch.float32).to(self.device)
  state = torch.tensor(state, dtype=torch.float32, device=self.device)


critic loss:  0.0009523602784611285 actor loss :  -2.334667682647705
critic loss:  0.0016016880981624126 actor loss :  -3.056730270385742
critic loss:  0.00045496877282857895 actor loss :  -3.138033390045166
critic loss:  0.0012823720462620258 actor loss :  -3.108534812927246
critic loss:  0.0008015856146812439 actor loss :  -3.1353752613067627
critic loss:  0.002074391581118107 actor loss :  -3.020723342895508
critic loss:  0.0005456255748867989 actor loss :  -3.178065776824951
critic loss:  0.0006264892290346324 actor loss :  -3.126984119415283
critic loss:  0.0011450889287516475 actor loss :  -3.2426505088806152
critic loss:  0.0005965380114503205 actor loss :  -3.1435346603393555
Episode 10, Reward: -25.44703913414886
critic loss:  0.0011127421166747808 actor loss :  -3.2255096435546875
critic loss:  0.005227414891123772 actor loss :  -3.1086409091949463
critic loss:  0.00041657191468402743 actor loss :  -3.038107395172119
critic loss:  0.00251865410245955 actor loss :  -2.97257208

KeyboardInterrupt: 

In [60]:
import gym
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from torch.distributions import Normal
import random
import copy
import matplotlib.pyplot as plt

# Define Linear Layer
class LinearLayer(nn.Module):
    def __init__(self, in_features, out_features):
        super(LinearLayer, self).__init__()
        self.linear_layer = nn.Sequential(nn.Linear(in_features, out_features), nn.ReLU())
    
    def forward(self, x):
        return self.linear_layer(x)

# Define Actor
class ActorStochastic(nn.Module):
    def __init__(self, s_dim, a_dim, hidden_dim=256, num_hidden_layers=2):
        super(ActorStochastic, self).__init__()
        layers = [LinearLayer(s_dim, hidden_dim)]
        for _ in range(num_hidden_layers):
            layers.append(LinearLayer(hidden_dim, hidden_dim))
        
        self.f = nn.Sequential(*layers)
        self.mean = nn.Linear(hidden_dim, a_dim)
        self.log_std = nn.Linear(hidden_dim, a_dim)

        self.action_scale = torch.FloatTensor((env.action_space.high - env.action_space.low) / 2.0)
        self.action_bias = torch.FloatTensor((env.action_space.high + env.action_space.low) / 2.0)

    def forward(self, state):
        if not isinstance(state, torch.Tensor):
            state = torch.tensor(state, dtype=torch.float32)
        out = self.f(state)
        mean = self.mean(out)
        log_std = self.log_std(out)
        log_std = torch.clamp(log_std, -20, 2)
        return mean, log_std

    def sample(self, state):
        mean, log_std = self.forward(state)
        std = log_std.exp()
        dist = Normal(mean, std)
        x_t = dist.rsample()  # Reparameterization trick
        action = torch.tanh(x_t)
        log_prob = dist.log_prob(x_t)
        log_prob -= torch.log(1 - action.pow(2) + 1e-6)
        log_prob = log_prob.sum(1, keepdim=True)
        return action, log_prob

# Define Q Network
class QDNN(nn.Module):
    def __init__(self, s_dim, a_dim, hidden_dim=256, num_hidden_layers=2):
        super(QDNN, self).__init__()
        layers = [LinearLayer(s_dim + a_dim, hidden_dim)]
        for _ in range(num_hidden_layers):
            layers.append(LinearLayer(hidden_dim, hidden_dim))
        layers.append(nn.Linear(hidden_dim, 1))
        self.f = nn.Sequential(*layers)

    def forward(self, state, action):
        x = torch.cat([state, action], dim=1)
        return self.f(x)

# Define Critic
class Critic(nn.Module):
    def __init__(self, s_dim, a_dim, hidden_dim=256, num_hidden_layers=2):
        super(Critic, self).__init__()
        self.Q1 = QDNN(s_dim, a_dim, hidden_dim, num_hidden_layers)
        self.Q2 = QDNN(s_dim, a_dim, hidden_dim, num_hidden_layers)

    def forward(self, state, action):
        q1 = self.Q1(state, action)
        q2 = self.Q2(state, action)
        return q1, q2

# Define Custom Replay Buffer
class ReplayBuffer:
    def __init__(self, s_dim, a_dim, buffer_capacity=1000000):
        self.s_dim = s_dim
        self.a_dim = a_dim
        self.buffer_capacity = buffer_capacity
        self.size = 0
        self.ptr = 0

        self.states = np.zeros((buffer_capacity, s_dim), dtype=np.float32)
        self.actions = np.zeros((buffer_capacity, a_dim), dtype=np.float32)
        self.rewards = np.zeros((buffer_capacity, 1), dtype=np.float32)
        self.next_states = np.zeros((buffer_capacity, s_dim), dtype=np.float32)
        self.dones = np.zeros((buffer_capacity, 1), dtype=np.float32)

    def add(self, state, action, reward, next_state, done):
        self.states[self.ptr] = state
        self.actions[self.ptr] = action
        self.rewards[self.ptr] = reward
        self.next_states[self.ptr] = next_state
        self.dones[self.ptr] = done

        self.ptr = (self.ptr + 1) % self.buffer_capacity
        if self.size < self.buffer_capacity:
            self.size += 1

    def sample(self, batch_size):
        idxs = np.random.randint(0, self.size, size=batch_size)
        states = torch.FloatTensor(self.states[idxs])
        actions = torch.FloatTensor(self.actions[idxs])
        rewards = torch.FloatTensor(self.rewards[idxs])
        next_states = torch.FloatTensor(self.next_states[idxs])
        dones = torch.FloatTensor(self.dones[idxs])
        return states, actions, rewards, next_states, dones

# Define SAC
class SAC:
    def __init__(self, s_dim, a_dim, hidden_dim_actor=256, hidden_dim_critic=256, 
                 num_layer_actor=2, num_layer_critic=2, lr_act=3e-4, lr_crit=3e-4, 
                 gamma=0.99, tau=0.005, alpha=0.2, buffer_capacity=1000000, batch_size=256, device="cpu"):
        
        self.s_dim = s_dim
        self.a_dim = a_dim
        self.device = device
        self.batch_size = batch_size

        self.alpha = alpha
        self.gamma = gamma
        self.tau = tau

        self.actor = ActorStochastic(s_dim, a_dim, hidden_dim_actor, num_hidden_layers=num_layer_actor).to(device)
        self.actor_optimizer = torch.optim.Adam(self.actor.parameters(), lr=lr_act)

        self.critic = Critic(s_dim, a_dim, hidden_dim_critic, num_hidden_layers=num_layer_critic).to(device)
        self.critic_optimizer = torch.optim.Adam(self.critic.parameters(), lr=lr_crit)

        self.critic_target = copy.deepcopy(self.critic).to(device)
        for p in self.critic_target.parameters():
            p.requires_grad = False

        self.replay_buffer = ReplayBuffer(s_dim, a_dim, buffer_capacity)

    def choose_action(self, state, evaluate=False):
        state = torch.FloatTensor(state).unsqueeze(0).to(self.device)
        if evaluate:
            with torch.no_grad():
                action, _ = self.actor.sample(state)
        else:
            action, _ = self.actor.sample(state)
        return action.cpu().detach().numpy()[0]

    def add_to_buffer(self, state, action, reward, next_state, done):
        self.replay_buffer.add(state, action, reward, next_state, done)

    def update_parameters(self, update_interval):
        if self.replay_buffer.size < self.batch_size:
            return

        states, actions, rewards, next_states, dones = self.replay_buffer.sample(self.batch_size)
        states, actions, rewards, next_states, dones = states.to(self.device), actions.to(self.device), rewards.to(self.device), next_states.to(self.device), dones.to(self.device)
        
        with torch.no_grad():
            next_actions, next_log_probs = self.actor.sample(next_states)
            q1_next, q2_next = self.critic_target(next_states, next_actions)
            min_q_next = torch.min(q1_next, q2_next) - self.alpha * next_log_probs
            q_target = rewards + (1 - dones) * self.gamma * min_q_next

        q1, q2 = self.critic(states, actions)
        critic_loss = 0.5*F.mse_loss(q1, q_target) + 0.5*F.mse_loss(q2, q_target)

        self.critic_optimizer.zero_grad()
        critic_loss.backward()
        self.critic_optimizer.step()

        actions_pred, log_probs = self.actor.sample(states)
        q1_actor, q2_actor = self.critic(states, actions_pred)
        min_q_actor = torch.min(q1_actor, q2_actor)
        actor_loss = (self.alpha * log_probs - min_q_actor).mean()

        self.actor_optimizer.zero_grad()
        actor_loss.backward()
        self.actor_optimizer.step()

        for target_param, param in zip(self.critic_target.parameters(), self.critic.parameters()):
            target_param.data.copy_(self.tau * param.data + (1.0 - self.tau) * target_param.data)

        return critic_loss.item(), actor_loss.item()


In [61]:
# Create the environment
env = gym.make('MountainCarContinuous-v0')

# Set device
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# Hyperparameters
s_dim = env.observation_space.shape[0]
a_dim = env.action_space.shape[0]
hidden_dim_actor = 128
hidden_dim_critic = 128
num_layer_actor = 2
num_layer_critic = 2
lr_act = 3e-4
lr_crit = 3e-5
gamma = 0.99
tau = 0.001
alpha = 0.1
batch_size = 64
num_episodes = 100
update_interval = 1
target_upd_inter = 1
buffer_capacity = 1000000

# Initialize SAC agent
agent = SAC(s_dim, a_dim, hidden_dim_actor, hidden_dim_critic, num_layer_actor, num_layer_critic, 
            lr_act, lr_crit, gamma, tau, alpha, buffer_capacity, batch_size, device)

# Training loop
episode_rewards = []
for episode in range(num_episodes):
    state = env.reset()
    episode_reward = 0
    for t in range(999):
        action = agent.choose_action(state)
        next_state, reward, done, _ = env.step(action)
        agent.add_to_buffer(state, action, reward, next_state, done)
        state = next_state
        episode_reward += reward

        if t % update_interval == 0:
            agent.update_parameters(update_interval)

        if done:
            break

    episode_rewards.append(episode_reward)
    if (episode + 1) % 5 == 0:
        print(f"Episode {episode + 1}, Reward: {episode_reward}")

# Plotting
plt.plot(episode_rewards)
plt.xlabel('Episode')
plt.ylabel('Reward')
plt.title('SAC on MountainCarContinuous-v0')
plt.show()

  deprecation(
  deprecation(
  if not isinstance(terminated, (bool, np.bool8)):


Episode 5, Reward: -26.592520982310738
Episode 10, Reward: -26.236385659628567
Episode 15, Reward: -27.542241849307754
Episode 20, Reward: -25.199084541555255
Episode 25, Reward: -25.54427986853094
Episode 30, Reward: -25.925458530710102
Episode 35, Reward: -26.996473362267658


KeyboardInterrupt: 