https://github.com/seungeunrho/minimalRL
위의 깃허브 링크를 참조해서 Policy Gradient 중에 Reinforce를 구현한 코드
Reinforce : 정책을 확률로 표현하고, 보상 크기에 따라 그 확률을 그대로 미분해 업데이트 하는 정책 경사법의 원형 형태이다.

In [5]:
import gymnasium as gym
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.distributions import Categorical

In [7]:
#하이퍼파라미터 변수들
learning_rate = 0.0002
gamma = 0.98

class Policy(nn.Module):
    def __init__(self):
        super(Policy, self).__init__()
        self.data = []
        self.fc1 = nn.Linear(4, 128)
        self.fc2 = nn.Linear(128, 2)
        self.optimizer = optim.Adam(self.parameters(), lr=learning_rate)

    def forward(self, x):
        x = F.relu(self.fc1(x))
        x = F.softmax(self.fc2(x), dim=0)
        return x
        
    def put_data(self, item):
        self.data.append(item)

    def train_net(self):
        R = 0
        self.optimizer.zero_grad()
        for r, prob in self.data[::-1]:
            R = r + gamma * R
            loss = -torch.log(prob) * R
            loss.backward()
        self.optimizer.step()
        self.data = []

#reinforce 알고리즘 메인 함수
def main():
    env = gym.make('CartPole-v1')
    pi = Policy()
    score = 0.0
    print_interval = 20

    for n_epi in range(10000):  
        s, info = env.reset()
        done = False
        
        while not done:    
            prob = pi(torch.from_numpy(s).float())
            m = Categorical(prob)
            a = m.sample()
            s_prime, r, terminated, truncated, info = env.step(a.item())
            done = terminated or truncated
            pi.put_data((r,prob[a]))
            s = s_prime
            score += r
            
        pi.train_net()
        
        if n_epi%print_interval==0 and n_epi!=0:
            print("# of episode: {}, Avg timestpe : {}".format(n_epi, score/print_interval))
            score = 0.0
    env.close()

if __name__ == '__main__':
    main()

# of episode: 20, Avg timestpe : 21.6
# of episode: 40, Avg timestpe : 24.1
# of episode: 60, Avg timestpe : 28.3
# of episode: 80, Avg timestpe : 19.2
# of episode: 100, Avg timestpe : 24.7
# of episode: 120, Avg timestpe : 28.95
# of episode: 140, Avg timestpe : 28.3
# of episode: 160, Avg timestpe : 23.9
# of episode: 180, Avg timestpe : 28.0
# of episode: 200, Avg timestpe : 36.05
# of episode: 220, Avg timestpe : 24.1
# of episode: 240, Avg timestpe : 26.95
# of episode: 260, Avg timestpe : 30.7
# of episode: 280, Avg timestpe : 30.6
# of episode: 300, Avg timestpe : 22.35
# of episode: 320, Avg timestpe : 32.7
# of episode: 340, Avg timestpe : 23.8
# of episode: 360, Avg timestpe : 38.95
# of episode: 380, Avg timestpe : 44.6
# of episode: 400, Avg timestpe : 30.45
# of episode: 420, Avg timestpe : 47.5
# of episode: 440, Avg timestpe : 35.15
# of episode: 460, Avg timestpe : 42.35
# of episode: 480, Avg timestpe : 48.85
# of episode: 500, Avg timestpe : 40.0
# of episode: 520, A

https://github.com/multicore-it/rl/blob/main/codes/cartpole_reinforce.ipynb
위의 깃허브 링크에서 얻어온 Reinforce cartpole 예제 코드 실습하기에 아주 좋음!

In [5]:
import tensorflow as tf
import tensorflow.keras.backend as K
from tensorflow.keras.layers import Input, Dense, Flatten
from tensorflow.keras.optimizers import Adam, SGD
import gym
import numpy as np
import random as rand

class Agent(object):
    def __init__(self):
        self.env = gym.make('CartPole-v1')
        self.state_size = self.env.observation_space.shape[0]
        self.action_size = self.env.action_space.n
        self.value_size = 1

        self.node_num = 12
        self.learning_rate = 0.0005
        self.epochs_cnt = 5
        self.model = self.build_model()

        self.discount_rate = 0.95
        self.penalty = -10
        self.episode_num = 500
        self.moving_avg_size = 20

        self.reward_list = []
        self.count_list = []
        self.moving_avg_list = []
        self.states, self.action_matrixs, self.action_probs, self.rewards = [], [], [], []

        self.DUMMY_ACTION_MATRIX, self.DUMMY_REWARD = np.zeros((1, 1, self.action_size)), np.zeros((1,1,self.value_size))

    class MyModel(tf.keras.Model):
        def train_step(self, data):
            in_datas, out_actions = data
            states, action_matrix, rewards = in_datas[0], in_datas[1], in_datas[2]

            with tf.GradientTape() as tape:
                y_pred = self(states, training=True)
                action_probs = K.sum(action_matrix*y_pred, axis=-1)
                loss = -K.log(action_probs)*rewards

            trainable_vars = self.trainable_variables
            gradients = tape.gradient(loss, trainable_vars)
            self.optimizers.apply_gradients(zip(gradients, trainable_vars))

    def build_model(self):
        input_states = Input(shape=(1,self.state_size), name='input_states')
        input_action_matrixs = Input(shape=(1, self.action_size), name='input_action_matrixs')
        input_rewards = Input(shape=(1,self.value_size), name='input_rewards')

        x = (input_states)
        x = Dense(self.node_num, activation='tanh')(x)
        out_actions = Dense(self.action_size, activation='softmax', name='output')(x)

        model = self.MyModel(inputs=[input_states, input_action_matrixs, input_rewards], outputs=out_actions)
        model.compile(optimizer=Adam(lr=self.learning_rate))
        model.summary()
        return model

    def train(self):
        for episode in range(self.episode_num):
            state = self.env.reset()
            state = state[0]
            self.env.max_episode_steps = 500

            count, reward_tot = self.make_memory(episode, state)
            self.train_mini_batch()
            self.clear_memory()

            if count < 500:
                reward_tot = reward_tot - self.penalty

            self.reward_list.append(reward_tot)
            self.count_list.append(count)
            self.moving_avg_list.append(self.moving_avg(self.count_list, self.moving_avg_size))

            if(episode % 10 == 0):
                print("episode:{}, moving_avg:{}, rewards_avg:{}".format(episode, self.moving_avg_list[-1], np.mean(self.reward_list)))

        self.save_model()

    def make_memory(self, episode, state):
        reward_tot = 0
        count = 0
        reward = np.zeros(self.value_size)
        action_matrix = np.zeros(self.action_size)
        done = False
        while not done:
            count += 1
            state_t = np.reshape(state, [1, 1, self.state_size])
            action_matrix_t = np.reshape(action_matrix, [1, 1, self.action_size])

            action_prob = self.model.predict([state_t, self.DUMMY_ACTION_MATRIX, self.DUMMY_REWARD])
            action = np.random.choice(self.action_size, 1, p=action_prob[0][0])[0]
            action_matrix = np.zeros(self.action_size)
            action_matrix[action] = 1
            state_next, reward, done, none, none2 = self.env.step(action)

            if count < 500 and done:
                reward = self.penalty

            self.states.append(np.reshape(state_t, [1, self.state_size]))
            self.action_matrixs.append(np.reshape(action_matrix, [1, self.action_size]))
            self.action_probs.append(np.reshape(action_prob, [1, self.action_size]))
            self.rewards.append(reward)
            reward_tot += reward
            state = state_next
        return count, reward_tot

    def clear_memory(self):
        self.states, self.action_matrixs, self.action_probs, self.rewards = [], [], [], []

    def make_discount_rewards(self, rewards):
        discounted_rewards = np.zeros(np.array(rewards).shape)
        running_add = 0
        for t in reversed(range(0, len(rewards))):
            running_add = running_add * self.discount_rate + rewards[t]
            discounted_rewards[t] = running_add

        return discounted_rewards

    def train_mini_batch(self):
        discount_rewards = np.array(self.make_discount_rewards(self.rewards))
        discount_rewards_t = np.reshape(discount_rewards, [len(discount_rewards), 1, 1])
        states_t = np.array(self.states)
        action_matrixs_t = np.array(self.action_matrixs)
        action_probs_t = np.array(self.action_probs)
        self.model.fit(x=[states_t, action_matrixs_t, discount_rewards_t], y=[action_probs_t], epochs=self.epochs_cnt, verbose=0)

    def moving_avg(self, data, size=10):
        if len(data) > size:
            c = np.array(data[len(data)-size:len(data)])
        else:
            c= np.array(data)
        return np.mean(c)

    def save_model(self):
        self.model.save("jaedeok_reinforce_example")
        print("It's finish!")

if __name__ == "__main__":
    agent = Agent()
    agent.train()






Model: "my_model"
__________________________________________________________________________________________________
 Layer (type)                Output Shape                 Param #   Connected to                  
 input_states (InputLayer)   [(None, 1, 4)]               0         []                            
                                                                                                  
 dense (Dense)               (None, 1, 12)                60        ['input_states[0][0]']        
                                                                                                  
 input_action_matrixs (Inpu  [(None, 1, 2)]               0         []                            
 tLayer)                                                                                          
                                                                                                  
 input_rewards (InputLayer)  [(None, 1, 1)]               0         []                     

ValueError: in user code:

    File "C:\Users\user1\anaconda3\envs\masan\lib\site-packages\keras\src\engine\training.py", line 1401, in train_function  *
        return step_function(self, iterator)
    File "C:\Users\user1\anaconda3\envs\masan\lib\site-packages\keras\src\engine\training.py", line 1384, in step_function  **
        outputs = model.distribute_strategy.run(run_step, args=(data,))
    File "C:\Users\user1\anaconda3\envs\masan\lib\site-packages\keras\src\engine\training.py", line 1373, in run_step  **
        outputs = model.train_step(data)
    File "C:\Users\user1\AppData\Local\Temp\ipykernel_31996\2410009075.py", line 39, in train_step
        y_pred = self(states, training=True)
    File "C:\Users\user1\anaconda3\envs\masan\lib\site-packages\keras\src\utils\traceback_utils.py", line 70, in error_handler
        raise e.with_traceback(filtered_tb) from None
    File "C:\Users\user1\anaconda3\envs\masan\lib\site-packages\keras\src\engine\input_spec.py", line 219, in assert_input_compatibility
        raise ValueError(

    ValueError: Layer "my_model" expects 3 input(s), but it received 1 input tensors. Inputs received: [<tf.Tensor 'IteratorGetNext:0' shape=(None, 1, 4) dtype=float32>]
