In [2]:
from __future__ import annotations
from collections import namedtuple
import numpy as np
import matplotlib.pyplot as plt
%matplotlib inline
import random
import torch
from torch import nn
from torch import optim
import torch.nn.functional as F
import gym


In [1]:
'''
env = gym.make('CartPole-v0')
env.reset()
random_episodes = 0
reward_sum = 0
while random_episodes < 10:
    env.render()
    action = env.action_space.sample()
    observation, reward, done, _ = env.step(action)
    print(observation, reward, done)
    reward_sum += reward
    if done:
        random_episodes += 1
        print(f"Reward for this episode was: {reward_sum}")
        reward_sum = 0
        env.reset()
env.close()
'''

'\nenv = gym.make(\'CartPole-v0\')\nenv.reset()\nrandom_episodes = 0\nreward_sum = 0\nwhile random_episodes < 10:\n    env.render()\n    action = env.action_space.sample()\n    observation, reward, done, _ = env.step(action)\n    print(observation, reward, done)\n    reward_sum += reward\n    if done:\n        random_episodes += 1\n        print(f"Reward for this episode was: {reward_sum}")\n        reward_sum = 0\n        env.reset()\nenv.close()\n'

In [None]:
Transition = namedtuple(
    'Transition', ('state', 'action', 'next_state', 'reward'))

# set env with setting (especially reward and max episode)
gym.envs.register(
    id='CartPole_prefer-v0',
    entry_point='gym.envs.classic_control:CartPole',
    max_episode_steps=500,      # CartPole-v0 uses 200
    reward_threshold=-110.0,
)

# 상수 정의
ENV = 'CartPole_prefer-v0'     # 태스크 이름
GAMMA = 0.99            # 시간할인율
MAX_STEPS = 200         # 1에피소드 당 최대 단계 수
NUM_EPISODES = 500      # 최대 에피소드 수
batch_size = 32
capacity = 10000        # Memory capacity

In [4]:
# 애니메이션을 만드는 함수
# 참고 URL http://nbviewer.jupyter.org/github/patrickmineault
# /xcorr-notebooks/blob/master/Render%20OpenAI%20gym%20as%20GIF.ipynb
from JSAnimation.IPython_display import display_animation
from matplotlib import animation
from IPython.display import display


def display_frames_as_gif(frames):
    """
    Displays a list of frames as a gif, with controls
    """
    plt.figure(figsize=(frames[0].shape[1]/72.0, frames[0].shape[0]/72.0),
               dpi=72)
    patch = plt.imshow(frames[0])
    plt.axis('off')

    def animate(i):
        patch.set_data(frames[i])

    anim = animation.FuncAnimation(plt.gcf(), animate, frames=len(frames),
                                   interval=50)

    anim.save('movie_cartpole_DQN.mp4')  # 애니메이션을 저장하는 부분
    display(display_animation(anim, default_mode='loop'))
    

In [12]:
class ReplayMemory:
    ''' Memory for random selection of trials '''
    
    def __init__(self, capacity: int) -> None:
        self.capacity = capacity    # Memory capacity
        self.memory = []            # Transition memory
        self.index = 0              # indicate saving location
        
    def push(self, state: torch.FloatTensor, action: torch.LongTensor,
             state_next: torch.FloatTensor, reward: torch.FloatTensor) -> None:
        '''Transition~(s,a,s_n,r) 메모리 저장'''
        if len(self.memory) < self.capacity:                                        # case memory not full
            self.memory.append(None)                                                # increase size of list to avoid index error
        
        self.memory[self.index] = Transition(state, action, state_next, reward)     # add namedtuple Transition
        self.index = (self.index + 1) % self.capacity                               # increase index (??? index init?)
        
    def sample(self, batch_size: int) -> list:
        '''replay 메모리에서 batch size 만큼 Transition 랜덤 뽑기'''
        return random.sample(self.memory, batch_size)
    
    def __len__(self) -> int:
        '''return saved Transitions'''
        return len(self.memory)
        

In [1]:
class Network:
    ''' Where deep network formed and optimized'''
    def __init__(self,num_states: int, num_actions: int) -> None:
        '''Initialize network models'''
        self.num_states = num_states
        self.num_actions = num_actions
        
        self.mem = ReplayMemory(capacity)                           # Initialize ReplayMem

        self.model = nn.Sequential()                                # Sequential container of modules(networks) 
        self.model.add_module('fc1', nn.Linear(num_states, 32))     # Fully-connected(FC) model with input of state output of 32
        self.model.add_module('relu1', nn.ReLU())                   # Backward diff wave alg (RELU typically)
        self.model.add_module('fc2', nn.Linear(32, 32))             # Second module, modules can also have CNN, RNN, etc..
        self.model.add_module('relu2', nn.ReLU())
        self.model.add_module('fc3', nn.Linear(32, num_actions))    # Goes through network with output of action

        print(self.model)                                           # print out the model
        
        # Selection of gradient descent model.(i.e. SGD, RMSprop, Adagrad,...)
        # change weight coeffs of network with gradient descent of params
        self.optimizer = optim.Adam(self.model.parameters(), lr = 0.0001)
        
    def replay(self) -> None:
        '''REPLAY OF MEMORY & TRAIN OF NETWORK'''

        
    def decide_action(self, state: torch.FloatTensor, episode: int) -> torch.LongTensor:
        '''Decision of action with \epsilon greedy'''
        epsilon = 0.5 * (1 / (episode + 1))                         # epsilon decay
        if epsilon <= np.random.uniform(0,1):                           # weighted action
            self.model.eval()                                           # change network mode to inference
            with torch.no_grad():
                action = self.model(state).max(1)[1].view(1,1)
                # 신경망 출력의 최댓값에 대한 인덱스(action) = max(1)[1]
                # .view(1,1)은 [torch.LongTensor of size 1] 을 size 1*1로 변환하는 역할을 한다

        else:                                                           # random action
            action = torch.LongTensor(
                [[random.randrange(self.num_actions)]])                 # return 0 or 1 (LongTensor of size 1*1)

        return action

IndentationError: expected an indented block (<ipython-input-1-2d147e8b142e>, line 16)

In [None]:
class Environment:
    ''' Initialize and run the environment '''
    
    def __init__(self) -> None:
        self.env = gym.make(ENV)                                # env set
        num_states = self.env.observation_space.shape[0]        # Get State shape (4)
        num_actions = self.env.action_space.n                   # Get action numbers (2)
        self.network = Network(num_states,num_actions)          # Initialize Network Class

        
    def run(self) -> None:
        '''Run and update iteration'''
        episode_10_list = np.zeros(10)                          # Save steps succeeded for last 10 episodes
        complete_episodes = 0                                   # Episodes number that reached goal
        is_episode_final = False                                # did it succeeded for 10 episodes? (terminate)
        frames = []                                             # frame for animation

        for episode in range(NUM_EPISODES):                         # Episode iteration (single train per episode)
            observation = self.env.reset()                          # reset env (=initialize)
            state = observation                                     # state <- initialized env
            state = torch.from_numpy(state).type(
                torch.FloatTensor)                                  # convert np.array -> FloatTensor
            state = torch.unsqueeze(state, 0)                       # size 4 -> size 1*4 convert

            for step in range(MAX_STEPS):                           # Iterate through max action(or state) per episode
                
                if is_episode_final is True:                              # If the goal reached
                    frames.append(self.env.render(
                        mode = 'rgb_array'))                              # Save result of final episode with frame
                
                action = self.network.decide_action(state, episode) # determine action through eps-greedy

                new_observation, _, done, _ = self.env.step(
                    action.item())                                  # get new state from decided action (reward & info -> blank)

                if done:                                                # if pole lean down|| iteration > max_episode_steps 
                    new_state = None                                    # No new state
                    episode_10_list = np.hstack(
                        (episode_10_list[1:], step + 1))                # save succeeded steps by every 'episode'
                    
                    if step < 495:                                          # if pole lean down( < max_episode_steps)
                        reward = torch.FloatTensor([-1.0])                  # reward -> -1
                        complete_episodes = 0                               # reset complete_episode
                    else:                                                   # if pole does not lean down
                        reward = torch.FloatTensor([1.0])                   # reward -> 1
                        complete_episodes += 1                              # update succeeded number
                else:                                                   # not done(still in interation)
                    reward = torch.FloatTensor([0.0])                   # reward = 0
                    new_state = new_observation                         # update new state
                    new_state = torch.from_numpy(state).type(           # change type to FloatTensor
                        torch.FloatTensor)
                    new_state = torch.unsqueeze(new_state, 0)           # size 4 -> size 1*4 convert

                self.network.mem.push(
                    state, action, new_state, reward)               # store iteration information(Transition) into memory
                self.network.replay()                               # update Q with Experience Replay
                state = new_state                                   # update state

                if done:
                    print(f'{episode} Episode: Finished after {step + 1} steps：최근 10 에피소드의 평균 단계 수 = {episode_10_list.mean()}')
                    break

            if is_episode_final is True:
                display_frames_as_gif(frames)                   # save result as gif
                break

            if complete_episodes >= 10:                         # (iteration > max_episode_steps) && (pole does not lean down) for 10 episodes
                print('10 에피소드 연속 성공')
                is_episodes_final = True                        # save result and break
            

In [7]:
np.random.uniform?

[0;31mDocstring:[0m
uniform(low=0.0, high=1.0, size=None)

Draw samples from a uniform distribution.

Samples are uniformly distributed over the half-open interval
``[low, high)`` (includes low, but excludes high).  In other words,
any value within the given interval is equally likely to be drawn
by `uniform`.

.. note::
    New code should use the ``uniform`` method of a ``default_rng()``
    instance instead; see `random-quick-start`.

Parameters
----------
low : float or array_like of floats, optional
    Lower boundary of the output interval.  All values generated will be
    greater than or equal to low.  The default value is 0.
high : float or array_like of floats
    Upper boundary of the output interval.  All values generated will be
    less than or equal to high.  The default value is 1.0.
size : int or tuple of ints, optional
    Output shape.  If the given shape is, e.g., ``(m, n, k)``, then
    ``m * n * k`` samples are drawn.  If size is ``None`` (default),
    a single