In [17]:
!pip install gymnasium

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


In [1]:
!pip install gymnasium[mujoco]

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


In [2]:
import gym
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
#from collections import deque
import random

In [3]:
class Soft_Q_Net(nn.Module):
  def __init__(self, observation_dim, action_dim):
    super(Soft_Q_Net, self).__init__()
    self.observation_dim = observation_dim
    self.action_dim = action_dim
    # define network architecture
    self.FC1 = nn.Linear(self.observation_dim+self.action_dim, 64)    # input layer
    self.FC2 = nn.Linear(64, 256)                     # hidden layer
    self.FC3 = nn.Linear(256, 1)                      # output layer

  # network connecting
  def forward_pass(self, observation, action):
    x = torch.cat([observation, action], dim=-1)
    x = self.FC1(x)
    x = F.relu(x)
    x = self.FC2(x)
    x = F.relu(x)
    x = self.FC3(x)
    return x

In [4]:
class buffer_memory(object):
    def __init__(self, capacity):
        self.capacity = capacity
        self.buffer = []
        self.position = 0

    def store(self, transition):
        if len(self.buffer) < self.capacity:
            self.buffer.append(None)
        self.buffer[self.position] = transition
        self.position = (self.position + 1) % self.capacity

    def sample(self, batch_size):
        indices = np.random.choice(len(self.buffer), size=batch_size, replace=False)
        observations, actions, rewards, next_observations, dones = zip(*[self.buffer[i] for i in indices])
        return (torch.FloatTensor(observations),
            torch.FloatTensor(actions),
            torch.FloatTensor(rewards),
            torch.FloatTensor(next_observations),
            torch.FloatTensor(dones))

    def __len__(self):
        return len(self.buffer)

In [5]:
def train(buffer, target_model, eval_model, gamma, optimizer, batch_size, loss_fn, count, update_freq):
  observations, actions, rewards, next_observations, dones = buffer.sample(batch_size)

  q_vals = eval_model.forward_pass(observations, actions)                                      # get Qt values (2d tensor of two element) for all observation (observation is 2d) from eval model
  next_q_vals = target_model.forward_pass(next_observations, actions)                          # get Qt+1 values for all next observation from target model
  expected_q_vals = rewards + gamma * (1 - dones) * next_q_vals.detach()

  loss = F.mse_loss(q_vals, expected_q_vals)

  optimizer.zero_grad()         # set eval_model gradient to none
  loss.backward()               # computes the gradient w.r.t loss
  optimizer.step()              # Performs a single optimization step (parameter update)

  if count % update_freq == 0:  # update target model for every 200 steps by sharing the params of eval model
    target_model.load_state_dict(eval_model.state_dict())

  return loss


In [10]:
## Start here

gamma = 0.99             # discount rate
learning_rate = 0.001   # learning rate
batch_size = 128          # training batch size
update_freq = 200        # update target network for every 200 steps (after every 200 state)
capacity = 10000   # size of buffer memory
render = False           # renedering of cartpole window
episode = 1000    # Total episode
alpha = 0.5            # entropy/temperature coefficient
episodes = 1000

env = gym.make("Pusher-v4")
env = env.unwrapped
observation_dim = env.observation_space.shape[0]         # State space size: 4
action_dim = env.action_space.shape[0]                   # Action space size: 2
print(observation_dim, '||', action_dim)

target_net = Soft_Q_Net(observation_dim, action_dim)   # initializing target nn
eval_net = Soft_Q_Net(observation_dim, action_dim)     # initializing evaluation nn
eval_net.load_state_dict(target_net.state_dict())             # loading initialized params (weights and biases) of target nn to eval nn

optimizer = torch.optim.Adam(eval_net.parameters(), lr=learning_rate)   # optimizer
buffer = buffer_memory(capacity)                                        # initialize buffer memory
loss_fn = nn.MSELoss()

count = 0
weight_reward = None

for i in range(episode):
  # within each episode
  obs = env.reset()     # get initial state observation from env
  reward_total = 0      # total reward got in a episode
  episode_reward = 0
  episode_steps = 0
  if render:
    env.render()
  while True:
    action = env.action_space.sample()
    next_obs, reward, done, info, _ = env.step(action)                      # taking sampled action on environment
    buffer.store((obs, action, reward, next_obs, done))                      # storing the st, at, rt+1, st+1 into buffer
    episode_reward += reward
    episode_steps += 1
    count += 1
    obs = next_obs
    if len(buffer) > batch_size:                                     # if buffer have more new samples than batch size (32); trainig will be done
      loss = train(buffer, target_net, eval_net, gamma, optimizer, batch_size, loss_fn, count, update_freq)
    if done:
      if not weight_reward:
        weight_reward = reward_total
      else:
        weight_reward = 0.99 * weight_reward + 0.01 * reward_total          # a relative current episode reward with past episodes reward
      if (i+1) % 10 == 0:
        print('episode: {}\treward: {}\tweight_reward: {:.3f}\tepisode loss: {:.3f}'.format(i+1, reward_total, weight_reward, loss))
      break


  return (torch.FloatTensor(observations),
  loss = F.mse_loss(q_vals, expected_q_vals)


KeyboardInterrupt: ignored