In [None]:
# PPO와 관련된 기본 설계는 https://github.com/seungeunrho/minimalRL의 코드를 이용했습니다.

In [23]:
import gym
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.distributions import Normal
from torch.distributions import MultivariateNormal
import numpy as np

In [43]:
# PPO for car_racing
## Action : 3 type; continuous
## Observation : (96, 96, 3) image -> CNN?

#Hyperparameters
learning_rate  = 0.0003
gamma           = 0.9
lmbda           = 0.9
eps_clip        = 0.2
K_epoch         = 10
rollout_len    = 5
buffer_size    = 3
minibatch_size = 3

class PPO_Car(nn.Module):
    def __init__(self):
        super(PPO_Car, self).__init__()
        self.data = []
        self.conv1 = nn.Sequential(
            nn.Conv2d(3, 32, kernel_size=3, padding=1),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2),
        )
        self.conv2 = nn.Sequential(
            nn.Conv2d(32, 64, kernel_size=3, padding=1),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2),
        )
        self.conv3 = nn.Sequential(
            nn.Conv2d(64, 128, kernel_size=3, padding=1),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2),
        )
        self.fc_mu = nn.Linear(12*12*128,3)
        self.fc_std  = nn.Linear(12*12*128,3)
        self.fc_v = nn.Linear(12*12*128,3)
        self.optimizer = optim.Adam(self.parameters(), lr=learning_rate)
        self.optimization_step = 0

    def forward(self, x, softmax_dim = 0):
        #x = np.transpose(x, (0,3,2,1))
        output = self.conv1(x)
        output = self.conv2(output)
        output = self.conv3(output)
        #output = output.view(output.size(0), -1)
        output = output.flatten()
        mu = 2.0 * torch.tanh(self.fc_mu(output))
        std = F.softplus(self.fc_std(output))
        return mu, std

    def v(self, x):
        output = self.conv1(x)
        output = self.conv2(output)
        output = self.conv3(output)
        # output = output.view(output.size(0), -1)
        output = output.flatten()
        v = self.fc_v(output)
        return v

    def put_data(self, transition):
        self.data.append(transition)

    def make_batch(self):
        s_batch, a_batch, r_batch, s_prime_batch, prob_a_batch, done_batch = [], [], [], [], [], []
        data = []

        for j in range(buffer_size):
            for i in range(minibatch_size):
                rollout = self.data.pop()
                s_lst, a_lst, r_lst, s_prime_lst, prob_a_lst, done_lst = [], [], [], [], [], []

                for transition in rollout:
                    s, a, r, s_prime, prob_a, done = transition

                    s_lst.append(s)
                    a_lst.append([a])
                    r_lst.append([r])
                    s_prime_lst.append(s_prime)
                    prob_a_lst.append([prob_a])
                    done_mask = 0 if done else 1
                    done_lst.append([done_mask])

                s_batch.append(s_lst)
                a_batch.append(a_lst)
                r_batch.append(r_lst)
                s_prime_batch.append(s_prime_lst)
                prob_a_batch.append(prob_a_lst)
                done_batch.append(done_lst)

            print("mini batch")
            mini_batch = torch.tensor(s_batch, dtype=torch.float), a_batch, \
                          torch.tensor(r_batch, dtype=torch.float), torch.tensor(s_prime_batch, dtype=torch.float), \
                          torch.tensor(done_batch, dtype=torch.float), torch.tensor(prob_a_batch, dtype=torch.float)
            data.append(mini_batch)

        return data

    def calc_advantage(self, data):
        data_with_adv = []
        for mini_batch in data:
            s, a, r, s_prime, done_mask, old_log_prob = mini_batch
            with torch.no_grad():
                print(s_prime.shape, type(s_prime))
                td_target = r + gamma * self.v(s_prime) * done_mask
                delta = td_target - self.v(s)
            delta = delta.numpy()

            advantage_lst = []
            advantage = 0.0
            for delta_t in delta[::-1]:
                advantage = gamma * lmbda * advantage + delta_t[0]
                advantage_lst.append([advantage])
            advantage_lst.reverse()
            advantage = torch.tensor(advantage_lst, dtype=torch.float)
            data_with_adv.append((s, a, r, s_prime, done_mask, old_log_prob, td_target, advantage))

        return data_with_adv


    def train_net(self):
        if len(self.data) == minibatch_size * buffer_size:
            data = self.make_batch()
            data = self.calc_advantage(data)

            for i in range(K_epoch):
                for mini_batch in data:
                    s, a, r, s_prime, done_mask, old_log_prob, td_target, advantage = mini_batch

                    mu, std = self.forward(s, softmax_dim=1)
                    dist = Normal(mu, std)
                    log_prob = dist.log_prob(a)
                    ratio = torch.exp(log_prob - old_log_prob)  # a/b == exp(log(a)-log(b))

                    surr1 = ratio * advantage
                    surr2 = torch.clamp(ratio, 1-eps_clip, 1+eps_clip) * advantage
                    loss = -torch.min(surr1, surr2) + F.smooth_l1_loss(self.v(s) , td_target)

                    self.optimizer.zero_grad()
                    loss.mean().backward()
                    nn.utils.clip_grad_norm_(self.parameters(), 1.0)
                    self.optimizer.step()
                    self.optimization_step += 1

def main():
    # env = gym.make("CarRacing-v2", render_mode="human") # car racing visualizing
    env = gym.make("CarRacing-v2")
    model = PPO_Car()
    score = 0.0
    print_interval = 20
    rollout = []

    for n_epi in range(10000):
        s, info = env.reset(seed=410, options={})
        done = False
        while not done:
            for t in range(rollout_len):
                state = torch.from_numpy(s).float()
                state = np.transpose(state, (2,1,0))
                mu, std = model.forward(state)  # 다변량 정규분포 평균, 표준편차
                dist = MultivariateNormal(mu, torch.diag(std))     # 정규분포 함수 생성
                action = dist.sample()                      # 샘플링; 2차원 텐서
                log_prob = dist.log_prob(action)
                observation, reward, terminated, truncated, info = env.step(action.numpy())
                observation = np.transpose(observation, (2,1,0))
                if terminated or truncated : done = True
                rollout.append((state, action, reward/10.0, observation, log_prob.item(), done))
                if len(rollout) == rollout_len :
                    model.put_data(rollout)
                    rollout = []
                s = observation
                score += reward
                if done :
                    print("break!")
                    break
            # print("train_net")
            model.train_net()

        if n_epi%print_interval==0 and n_epi!=0:
            print("# of episode :{}, avg score : {:.1f}, opt step: {}".format(n_epi, score/print_interval, model.optimization_step))
            score = 0.0

    env.close()

if __name__ == '__main__':
    main()

(96, 96, 3) <class 'numpy.ndarray'>
(96, 96, 3) <class 'numpy.ndarray'>
(96, 96, 3) <class 'numpy.ndarray'>
(96, 96, 3) <class 'numpy.ndarray'>
(96, 96, 3) <class 'numpy.ndarray'>
(96, 96, 3) <class 'numpy.ndarray'>
(96, 96, 3) <class 'numpy.ndarray'>
(96, 96, 3) <class 'numpy.ndarray'>
(96, 96, 3) <class 'numpy.ndarray'>
(96, 96, 3) <class 'numpy.ndarray'>
(96, 96, 3) <class 'numpy.ndarray'>
(96, 96, 3) <class 'numpy.ndarray'>
(96, 96, 3) <class 'numpy.ndarray'>
(96, 96, 3) <class 'numpy.ndarray'>
(96, 96, 3) <class 'numpy.ndarray'>
(96, 96, 3) <class 'numpy.ndarray'>
(96, 96, 3) <class 'numpy.ndarray'>
(96, 96, 3) <class 'numpy.ndarray'>
(96, 96, 3) <class 'numpy.ndarray'>
(96, 96, 3) <class 'numpy.ndarray'>
(96, 96, 3) <class 'numpy.ndarray'>
(96, 96, 3) <class 'numpy.ndarray'>
(96, 96, 3) <class 'numpy.ndarray'>
(96, 96, 3) <class 'numpy.ndarray'>
(96, 96, 3) <class 'numpy.ndarray'>
(96, 96, 3) <class 'numpy.ndarray'>
(96, 96, 3) <class 'numpy.ndarray'>
(96, 96, 3) <class 'numpy.nd

RuntimeError: Expected 3D (unbatched) or 4D (batched) input to conv2d, but got input of size: [3, 5, 96, 96, 3]