<a href="https://colab.research.google.com/github/razvancraciun/space-invaders-ai/blob/master/space.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [0]:
TRAIN = True
LOAD = True
LOAD_INDEX = 180


# learning params
EPISODES = 1000
BATCH_SIZE = 32  # <! 256
LEARNING_RATE = 0.001
SAVE_PATH = '/content/drive/My Drive/save3/'
SAVE_INTERVAL = 20

# agent params
GAMMA = 0.95
EPSILON = 1
EPSILON_MIN = 0.01
EPSILON_DEC = 0.9995

FRAME_STACK_SIZE = 3
BUFFER_SIZE = 10000

## Imports

In [0]:
import numpy as np
import torch
import torch.nn as nn
import gym
import matplotlib.pyplot as plt
from IPython import display

## Buffer

In [0]:
class ReplayBuffer:
    def __init__(self, size, state_shape, n_actions):
        self.size = size
        self.count = 0

        self.from_stack = torch.zeros( (self.size, *state_shape), dtype=torch.float32)
        self.to_stack = torch.zeros((self.size, *state_shape), dtype=torch.float32)
        self.actions = torch.zeros(self.size, dtype=torch.long)
        self.rewards = torch.zeros(self.size)
        self.terminals = torch.zeros(self.size)

    def store(self, from_stack, action, reward, to_stack, done):
        index = self.count % self.size
        
        self.from_stack[index] = torch.Tensor(from_stack)
        self.to_stack[index] = torch.Tensor(to_stack)
        self.actions[index] = torch.Tensor([action])
        self.rewards[index] = reward
        self.terminals[index] = 1 - int(done)
        self.count += 1

    def sample(self, batch_size):
        size = min(self.count, self.size)
        batch = np.random.choice(size, batch_size)

        from_states = self.from_stack[batch]
        to_states = self.to_stack[batch]
        actions = self.actions[batch]
        rewards = self.rewards[batch]
        terminals = self.terminals[batch]

        return from_states, actions, rewards, to_states, terminals 

## Model

In [0]:
class Model(nn.Module):
    def __init__(self, input_channels, output_shape, learning_rate):
        super(Model, self).__init__()    

        self.block1 = nn.Sequential(
            nn.Conv2d(input_channels, 16, kernel_size=3, stride=1, padding=1),
            nn.BatchNorm2d(16),
            nn.ReLU(),
            nn.MaxPool2d(2),
            nn.Conv2d(16, 32, kernel_size=3, stride=1, padding=1),
            nn.BatchNorm2d(32),
            nn.ReLU(),
            nn.MaxPool2d(3)
        )

        self.block2 = nn.Sequential(
            nn.Conv2d(32, 64, kernel_size=3, stride=1, padding=1),
            nn.BatchNorm2d(64),
            nn.ReLU(),
            nn.MaxPool2d(2),
            nn.Conv2d(64, 128, kernel_size=3, stride=1, padding=1),
            nn.BatchNorm2d(128),
            nn.ReLU(),
            nn.MaxPool2d(4)
        )

        self.fc = nn.Sequential(
            nn.Flatten(),
            nn.Linear(768, 128),
            nn.ReLU(),
            nn.Linear(128, output_shape)
        )


        self.optimizer = torch.optim.Adam(self.parameters(), learning_rate)
        self.loss = nn.MSELoss()

    def forward(self, x):
        y = self.block1(x)
        y = self.block2(y)
        y = self.fc(y)
        if torch.isnan(y).any():
            print(y)
        return y


## Frame handling

In [0]:
def add_frame(frame):
    frame = preprocess(frame)
    stack.pop(0)
    stack.append(frame)


def init_stack(frame):
    frame = preprocess(frame)
    for _ in range(FRAME_STACK_SIZE):
        stack.append(frame)


def preprocess(state):
    state = np.moveaxis(state, 2, 0)
    r,g,b = state[0], state[1], state[2]
    state = 0.3 * r + 0.59 * g + 0.11 * b
    state = state[20:-14, 15:-15]
    return state

## Agent

In [0]:
class Agent:
    def __init__(self, n_actions):
        self.epsilon = EPSILON
        self.epsilon_min = EPSILON_MIN
        self.epsilon_dec = EPSILON_DEC
        self.batch_size = BATCH_SIZE
        self.gamma = GAMMA
        self.action_space = range(n_actions)
        print('Allocating model...')
        self.model = Model(FRAME_STACK_SIZE, n_actions, LEARNING_RATE)
        print('Done')
        print('Allocating buffer...')
        self.buffer = ReplayBuffer(BUFFER_SIZE, (FRAME_STACK_SIZE, 176, 130), n_actions)
        print('Done')
        self.device = default_device()
        self.model = self.model.to(self.device)

    def choose_action(self, stack):
        stack = torch.Tensor(stack)
        stack.unsqueeze_(0)
        rand = np.random.rand()
        if rand < self.epsilon:
            action = np.random.choice(self.action_space)
        else:
            actions = self.model.forward(stack.clone().to(self.device))
            _, action = torch.max(actions,1)
            action = action.data.tolist()[0]
        return action

    def train(self):
        if self.buffer.count < self.batch_size:
            return
        from_states, actions, rewards, to_states, terminals = self.buffer.sample(self.batch_size)

        from_states = from_states.to(self.device)
        actions = actions.to(self.device)
        rewards = rewards.to(self.device)
        to_states = to_states.to(self.device)
        terminals = terminals.to(self.device)

        q = self.model.forward(from_states).to(self.device)
        q_next = self.model.forward(to_states).to(self.device)



        q_target = q.clone()
        batch_index = torch.arange(0, self.batch_size, dtype=torch.long)
        max_next, _ = q_next.max(dim=1)
        q_target[batch_index, actions] = (rewards + self.gamma * max_next * terminals).to(self.device)
        

        self.model.optimizer.zero_grad()
        cost = self.model.loss(q, q_target)
        cost.backward()
        self.model.optimizer.step()

        self.epsilon = self.epsilon * self.epsilon_dec \
            if self.epsilon > self.epsilon_min else self.epsilon_min

    def save_model(self, index):
        torch.save(self.model, SAVE_PATH + f'model{index}.pt')

    def load_model(self, index):
        self.model = torch.load(SAVE_PATH + f'model{index}.pt', map_location=default_device())
        self.model.eval()


def default_device():
    if torch.cuda.is_available():
        return torch.device('cuda')
    else:
        return torch.device('cpu')


   

## Main

In [0]:
def log(scores, index):
	f = open(SAVE_PATH + f'log{index}.txt', 'w')
	f.write(str(scores))
	f.close()

In [0]:
env = gym.make('SpaceInvaders-v0')
agent = Agent(env.action_space.n)
if LOAD:
    print(f'Loading checkpoint with index {LOAD_INDEX}')
    agent.load_model(LOAD_INDEX)
    agent.epsilon = EPSILON_MIN
start = LOAD_INDEX + 1 if LOAD else 0

if TRAIN:
    print('Training...')
    scores = []
    for episode in range(start, EPISODES+start):
        stack = []
        init_stack(env.reset())
        done = False
        score = 0
        actions = []
        while not done:
            action = agent.choose_action(stack)
            new_state, reward, done, info = env.step(action)
            score += reward
            if done and info['ale.lives'] == 0:
                reward = -100
            old_stack = np.copy(stack)
            add_frame(new_state)
            agent.buffer.store(old_stack, action, reward, stack, done)
            agent.train()
        print(f'episode:{episode} score:{score}')
        scores.append(score)
        if episode % SAVE_INTERVAL == 0 and episode != 0:
            agent.save_model(episode)
            log(scores, episode)
else:
    print('Testing...')
    agent.epsilon = 0
    stack = []
    init_stack(env.reset())
    done = False
    score = 0
    img = plt.imshow(env.render(mode='rgb_array'))
    while not done:
        img.set_data(env.render(mode='rgb_array'))
        display.display(plt.gcf())
        display.clear_output(wait=True)
        action = agent.choose_action(stack)
        new_state, reward, done, info = env.step(action)
        score += reward
        old_stack = np.copy(stack)
    print(f'Done! Score:{score}')



Allocating model...
Done
Allocating buffer...
Done
Loading checkpoint with index 180
Training...
episode:181 score:260.0
episode:182 score:270.0
episode:183 score:140.0


In [0]:

# agent = Agent(6)
# agent.load_model()
# agent.epsilon = 0
# stack = []
# init_stack(env.reset())
# done = False
# score = 0
# while not done:
#     action = agent.choose_action(stack)
#     new_state, reward, done, info = env.step(action)
#     score += reward
#     if done and info['ale.lives'] == 0:
#         reward = -100
#     old_stack = np.copy(stack)
#     add_frame(new_state)
# print(score)

In [0]:
from google.colab import drive
drive.mount('/content/drive')