In [1]:
!pip install stable-baselines3



In [1]:
import argparse
import gym
import numpy as np
from itertools import count
from collections import namedtuple
import os
import pygame
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import torch.autograd as autograd
from torch.distributions import Categorical
from stable_baselines3.common.cmd_util import make_atari_env,make_vec_env
from stable_baselines3.common.vec_env import VecFrameStack
from stable_baselines3.common.vec_env import VecTransposeImage
# import gym
import sys, os
sys.path.append('../')
#from Snake_game.environment_discret_states import SnakeGame
from Snake_game.environment_images import SnakeGameImage
# import matplotlib.pyplot as plt
# from Utils.plot import plot_reward
# from Utils.config import * ## BATCH_SIZE , N_EPISODES ..




# Cart Pole

parser = argparse.ArgumentParser(description='PyTorch actor-critic example')
parser.add_argument('--gamma', type=float, default=0.999, metavar='G',
                    help='discount factor (default: 0.99)')
parser.add_argument('--seed', type=int, default=543, metavar='N',
                    help='random seed (default: 543)')
parser.add_argument('--render', action='store_true',
                    help='render the environment')
parser.add_argument('--log-interval', type=int, default=10, metavar='N',
                    help='interval between training status logs (default: 10)')
args,unk = parser.parse_known_args()

print("UNK",unk)
print("ARGS",args)



#env = make_vec_env('CartPole-v1', n_envs=1)

#####################
# env = make_atari_env('PongNoFrameskip-v4', n_envs=1, seed=0)
# env = VecFrameStack(env, n_stack=4)
# env = VecTransposeImage(env)

env = SnakeGameImage()

#env.seed(args.seed)
torch.manual_seed(args.seed)


SavedAction = namedtuple('SavedAction', ['log_prob', 'value'])


class Policy(nn.Module):
    """
    implements both actor and critic in one model
    """
    def __init__(self,env):
        super(Policy, self).__init__()
        self.input_dim = env.observation_space

        self.action_dim = len(env.actions)

        #self.affine1 = nn.Linear(4, 128)

        ###### IF INPUT IS IMAGE TAKE THIS !! #### 

        self.affine1 = nn.Sequential(  nn.Conv2d(3,20,5),
          nn.ReLU(),
          nn.Conv2d(20,64,5),
          nn.ReLU(),
          nn.Flatten(),
        )

        self.fc_input_dim = self.feature_size()

        self.affine1.add_module("before_last",nn.Linear(self.fc_input_dim,128))

        # actor's layer
        self.action_head = nn.Linear(128, self.action_dim)

        # critic's layer
        self.value_head = nn.Linear(128, 1)

        # action & reward buffer
        self.saved_actions = []
        self.rewards = []


    def forward(self, x):
        """
        forward of both actor and critic
        """
        x = F.relu(self.affine1(x))

        # actor: choses action to take from state s_t 
        # by returning probability of each action
        action_prob = F.softmax(self.action_head(x), dim=-1)

        # critic: evaluates being in the state s_t
        state_values = self.value_head(x)

        # return values for both actor and critic as a tuple of 2 values:
        # 1. a list with the probability of each action over the action space
        # 2. the value from state s_t 
        return action_prob, state_values

    def feature_size(self):
        return self.affine1(autograd.Variable(torch.zeros(1, *self.input_dim))).view(1, -1).size(1)


model = Policy(env)
optimizer = optim.Adam(model.parameters(), lr=1e-4)
eps = np.finfo(np.float32).eps.item()


def select_action(state):
    
    print(state.shape)

    state = torch.from_numpy(state).float()
    probs, state_value = model(state)

    # create a categorical distribution over the list of probabilities of actions
    m = Categorical(probs)

    # and sample an action using the distribution
    action = m.sample()

    # save to action buffer
    model.saved_actions.append(SavedAction(m.log_prob(action), state_value))

    # the action to take (left or right)
    return action.item()


def update(last_state=''):
    """
    Training code. Calculates actor and critic loss and performs backprop.
    """
    
    #if not last_state.all(): R = 0

    if last_state =='': R=0
    
    else :
        last_state = torch.from_numpy(last_state).float()
        last_state = last_state.unsqueeze(0)
        with torch.no_grad():
            _, last_value = model(last_state)
        R = last_value.item()
    
    saved_actions = model.saved_actions
    policy_losses = [] # list to save actor (policy) loss
    value_losses = [] # list to save critic (value) loss
    returns = [] # list to save the true values

    # calculate the true value using rewards returned from the environment
    for r in model.rewards[::-1]:
        # calculate the discounted value
        R = r + args.gamma * R
        returns.insert(0, R)

    returns = torch.tensor(returns)
    returns = (returns - returns.mean()) / (returns.std() + eps)

    for (log_prob, value), R in zip(saved_actions, returns):
        advantage = R - value.item()

        # calculate actor (policy) loss 
        policy_losses.append(-log_prob * advantage)

        # calculate critic (value) loss using L1 smooth loss
        value_losses.append(F.smooth_l1_loss(value, torch.tensor([R])))

    # reset gradients
    optimizer.zero_grad()

    # sum up all the values of policy_losses and value_losses
    loss = torch.stack(policy_losses).sum() + torch.stack(value_losses).sum()

    # perform backprop
    loss.backward()
    optimizer.step()

    # reset rewards and action buffer
    del model.rewards[:]
    del model.saved_actions[:]


def main():
    running_reward = 10

    # run inifinitely many episodes
    for i_episode in count(1):


        # reset environment and episode reward
        state = env.reset()
        ep_reward = 0
        done = False
        # for each episode, only run 9999 steps so that we don't 
        # infinite loop while learning
        
        while not done :

            for _ in range(10): ## del after

              # select action from policy
              action = select_action(state)

              # take the action
              state, reward, done, _ = env.step([action])
              if args.render:
                  env.render()
                    
              model.rewards.append(reward)
              ep_reward += reward
            print(state)
            update(state)

           

        # update cumulative reward
        running_reward = 0.05 * ep_reward + (1 - 0.05) * running_reward
        

        # # perform backprop on the full episode
        #update()

        # log results
        if i_episode % args.log_interval == 0:
            # print('Episode {}\tLast reward: {:.2f}\tAverage reward: {:.2f}'.format(
            #       i_episode, ep_reward, running_reward))
            print("episode",i_episode,"reward",ep_reward,"running reward",running_reward)

        # check if we have "solved" the cart pole problem
        # if running_reward > env.spec.reward_threshold:
        #     print("Solved! Running reward is now {} and "
        #           "the last episode runs to {} time steps!".format(running_reward, t))
        #     break


if __name__ == '__main__':
    main()

pygame 2.0.1 (SDL 2.0.14, Python 3.7.4)
Hello from the pygame community. https://www.pygame.org/contribute.html




UNK ['-f', 'C:\\Users\\yassine\\AppData\\Roaming\\jupyter\\runtime\\kernel-26bcc6c0-e2f3-4f04-976c-f5d42b2cab8c.json']
ARGS Namespace(gamma=0.999, log_interval=10, render=False, seed=543)


  warn('The default multichannel argument (None) is deprecated.  Please '


(3, 40, 40)


RuntimeError: Expected 4-dimensional input for 4-dimensional weight [20, 3, 5, 5], but got 3-dimensional input of size [3, 40, 40] instead