#**Welcome to the Bubble Trouble AI Project**

#**Note :** 
#Before connecting make sure that
#the directory 'gym-bubble-trouble' is at the directory 'My Drive'




In [1]:
!pip install pygame

Collecting pygame
[?25l  Downloading https://files.pythonhosted.org/packages/8e/24/ede6428359f913ed9cd1643dd5533aefeb5a2699cc95bea089de50ead586/pygame-1.9.6-cp36-cp36m-manylinux1_x86_64.whl (11.4MB)
[K     |████████████████████████████████| 11.4MB 267kB/s 
[?25hInstalling collected packages: pygame
Successfully installed pygame-1.9.6


In [2]:
from google.colab import drive
drive.mount('/content/drive')

Go to this URL in a browser: https://accounts.google.com/o/oauth2/auth?client_id=947318989803-6bn6qk8qdgf4n4g3pfee6491hc0brc4i.apps.googleusercontent.com&redirect_uri=urn%3aietf%3awg%3aoauth%3a2.0%3aoob&scope=email%20https%3a%2f%2fwww.googleapis.com%2fauth%2fdocs.test%20https%3a%2f%2fwww.googleapis.com%2fauth%2fdrive%20https%3a%2f%2fwww.googleapis.com%2fauth%2fdrive.photos.readonly%20https%3a%2f%2fwww.googleapis.com%2fauth%2fpeopleapi.readonly&response_type=code

Enter your authorization code:
··········
Mounted at /content/drive


In [3]:

%cd drive/My\ Drive/gym-bubble-trouble

/content/drive/My Drive/gym-bubble-trouble


In [4]:
import sys
sys.path.append('bubbletrouble')
!mkdir -p ../models

## Import Libraries

In [5]:
%matplotlib inline

import gym
import math
import random
import numpy as np
import matplotlib
import matplotlib.pyplot as plt
from collections import namedtuple, deque
from itertools import count
from PIL import Image
import torch
import torch.nn as nn
import torch.optim as optim
import torch.optim

import torch.nn.functional as F
import torchvision.transforms as T
import gym_bubbletrouble
import cv2 as cv
from torchvision import utils


from settings import *
import time
from game import BubbleTroubleGame
import pygame
from pygame.locals import K_LEFT, K_RIGHT, K_SPACE, K_ESCAPE, KEYUP, KEYDOWN, QUIT


is_ipython = 'inline' in matplotlib.get_backend()
if is_ipython: from IPython import display



pygame 1.9.6
Hello from the pygame community. https://www.pygame.org/contribute.html


# DQN Architecture
Deep-Q-Networks (DQNs) are composed of: 
* 3 convolution layers
* 2 fully-connected linear layers

In [6]:

class CNN(nn.Module):
    def __init__(self, in_channels=4, n_actions=4):
        """
        Initialize Deep Q Network
        :param in_channels: Number of input channels
        :param n_actions: Number of outputs
        """

        super(CNN, self).__init__()
        self.conv1 = nn.Conv2d(in_channels, NUM_FILTER_L1, kernel_size=KER_L1, stride=STRIDE_L1)
        self.bn1 = nn.BatchNorm2d(NUM_FILTER_L1)
        self.conv2 = nn.Conv2d(NUM_FILTER_L1, NUM_FILTER_L2, kernel_size=KER_L2, stride=STRIDE_L2)
        self.bn2 = nn.BatchNorm2d(NUM_FILTER_L2)
        self.conv3 = nn.Conv2d(NUM_FILTER_L2, NUM_FILTER_L3, kernel_size=KER_L3, stride=STRIDE_L3)
        self.bn3 = nn.BatchNorm2d(NUM_FILTER_L3)

        def conv2d_size_out(size, kernel_size, stride):
            return (size - (kernel_size - 1) - 1) // stride + 1

        width_l1 = conv2d_size_out(WIDTH, KER_L1, STRIDE_L1)
        width_l2 = conv2d_size_out(width_l1, KER_L2, STRIDE_L2)
        convw = conv2d_size_out(width_l2, KER_L3, STRIDE_L3)

        heigt_l1 = conv2d_size_out(HEIGHT, KER_L1, STRIDE_L1)

        height_l2 = conv2d_size_out(heigt_l1, KER_L2, STRIDE_L2)
        convh = conv2d_size_out(height_l2, KER_L3, STRIDE_L3)

        linear_input_size = convw * convh * NUM_FILTER_L3

        self.fc4 = nn.Linear(linear_input_size, linear_input_size)
        self.head = nn.Linear(linear_input_size, n_actions)

    def forward(self, x):
        x = x.float()
        x = F.relu(self.bn1(self.conv1(x)))
        x = F.relu(self.bn2(self.conv2(x)))
        x = F.relu(self.bn3(self.conv3(x)))
        x = F.relu(self.fc4(x.reshape(x.size(0), -1)))
        return self.head(x)


# Normal Replay Memory

In [7]:
class ReplayMemory:
    def __init__(self, capacity):
        self.capacity = capacity
        self.memory = []
        self.push_count = 0

    def push(self, experience):
        """
        Push the given experience to the memory buffer
        :param experience: Experience object contains ('state', 'action', 'next_state', 'reward', 'done')
        """
        if len(self.memory) < self.capacity:
            self.memory.append(experience)
        else:
            self.memory[self.push_count % self.capacity] = experience
        self.push_count += 1

    def sample(self, batch_size):
        """
        :param batch_size: Number of samples
        :return: list of samples in size of the given batch size
        """
        return random.sample(self.memory, batch_size)

    def can_provide_sample(self, batch_size):
        """
        :return: True if there enough samples to sample and false otherwise
        """
        return len(self.memory) >= batch_size


# Prioritized Replay Memory

In [8]:

class PrioritizedReplayBuffer:
    def __init__(self, maxlen):
        self.buffer = deque(maxlen=maxlen)
        self.priorities = deque(maxlen=maxlen)

    def push(self, experience):
        """
        Push the given experience to the memory buffer
        :param experience: Experience object contains ('state', 'action', 'next_state', 'reward', 'done')
        """
        self.buffer.append(experience)
        self.priorities.append(max(self.priorities, default=1))

    def get_probabilities(self, priority_scale):
        """
        :param priority_scale: scale factor is a number in [0,1]
        :return: The current sample probabilities
        """
        scaled_priorities = np.array(self.priorities) ** priority_scale
        sample_probabilities = scaled_priorities / sum(scaled_priorities)
        return sample_probabilities

    def get_importance(self, probabilities):
        """
        :param probabilities: sample probabilities
        :return: The current importance
        """
        importance = 1 / len(self.buffer) * 1 / probabilities
        importance_normalized = importance / max(importance)
        return importance_normalized

    def sample(self, batch_size, priority_scale=1.0):
        """
        :param batch_size: Number of samples
        :param priority_scale: scale factor is a number in [0,1]
        :return: list of samples in size of the given batch_size, there importance
        and there indices in the memory buffer
        """
        sample_size = min(len(self.buffer), batch_size)
        sample_probs = self.get_probabilities(priority_scale)
        sample_indices = random.choices(range(len(self.buffer)), k=sample_size, weights=sample_probs)
        samples = [self.buffer[i] for i in sample_indices]
        importance = self.get_importance(sample_probs[sample_indices])
        return samples, importance, sample_indices

    def set_priorities(self, indices, errors, offset=0.1):
        """
         Sets new priorities to the experiences which there indices are given
        """
        for i, e in zip(indices, errors):
            self.priorities[i] = abs(e) + offset

    def can_provide_sample(self, batch_size):
        """
        :return: True if there enough samples to sample and false otherwise
        """
        return len(self.buffer) >= batch_size

# Exploration - Exploitation Strategy

In [9]:
class EpsilonGreedyStrategy:
    """
    This class responsible for providing the exploration - exploitation strategy
    """

    def __init__(self, start, end, decay):
        self.start = start
        self.end = end
        self.decay = decay

    def get_exploration_rate(self, curr_step):
        """
        :param curr_step: the current step number
        :return: exploration rate
        """
        return self.end + (self.start - self.end) * math.exp(-curr_step / self.decay)



# Agent

In [10]:

class Agent:
    """
    This class ris responsible for training and testing the model
    """

    def __init__(self, policy_net, target_net, strategy, em, test_em, num_actions, optimizer):
        self.curr_step = 0
        self.rate = 0
        self.strategy = strategy
        self.num_actions = num_actions
        self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
        self.policy_net, self.target_net = policy_net, target_net
        self.memory = PrioritizedReplayBuffer(MEMORY_SIZE)
        self.optimizer = optimizer
        self.em = em
        self.test_em = test_em
        self.scale = 0.7

    def select_action(self, state, actions):
        """
        This method is given state and actions that can be taken from this state
        then sample randomly number k in (0,1].
        if rate > k samples randomly number from the given actions  and returns it
        else returns the action with the highest q-value from the given actions

        :param state: Tensor
        :param actions: list of int - action space for the given state
        :return: int - selected action
        """
        self.rate = self.strategy.get_exploration_rate(self.curr_step)
        self.curr_step += 1
        if self.rate > random.random():
            return random.choice(actions)
        else:
            with torch.no_grad():
                return self.policy_step(state, actions)

    def policy_step(self, state, actions):
        """
        This method is given state and actions that can be taken from this state and returns
        the action with the highest q-value from the given actions
        :param state: Tensor
        :param actions: list of int - action space for the given state
        :return: int - selected action
        """
        with torch.no_grad():
            output = self.policy_net(state.to(self.device))
            top = output.topk(4).indices[0].cpu().numpy()
            for act in top:
                if act in actions:
                    return act

    def test(self, n_episodes):
        """
        This method is given the number of episides to test the model and tests it
        :param n_episodes: number of episodes to train the model
        :return: The average reward of the test
        """
        avg_reward = 0
        for episode in range(n_episodes):
            state = self.test_em.reset()
            total_reward = 0.0
            for _ in count():

                action = self.policy_step(state, self.test_em.get_legal_actions())
                next_state, reward, done, info = self.test_em.step(action)
                total_reward += reward
                state = next_state

                if done:
                    print("Finished Episode {} with reward {}".format(episode, total_reward))
                    avg_reward += total_reward
                    break

        avg_reward = avg_reward / n_episodes if n_episodes else 0
        print("Avarage reward over {} espisodes is: {}".format(n_episodes, avg_reward))
        return avg_reward / n_episodes

    def train(self, num_episodes):
        """
        This method is given the number of episides to train the model and trains it
        :param num_episodes: number of episodes to train the model
        """
        num_steps = cum_reward = max_reward_train = 0
        episode_reward = []
        episode_loss = []
        for episode in range(num_episodes):

            state = self.em.reset()
            cum_loss = 0

            for time_step in count():

                num_steps += 1
                action = self.select_action(state, self.em.get_legal_actions())
                next_state, reward, done, _ = self.em.step(action)
                reward = torch.tensor([reward], device=self.device)
                self.memory.push(Experience(state, action, next_state, reward, done))
                state = next_state
                cum_reward += reward.item()

                if self.memory.can_provide_sample(BATCH_SIZE):
                    cum_loss = self._train_step(cum_loss)

                if num_steps % TARGET_UPDATE == 0:
                    # Update the target net
                    self.target_net.load_state_dict(self.policy_net.state_dict())
          
                if self.em.done:
                    max_reward_train = max(max_reward_train, cum_reward)
                    episode_reward.append(cum_reward)
                    mean_loss = cum_loss / (time_step + 1)
                    episode_loss.append(mean_loss)
                    plot(episode_reward, 20, 'Reward')
                    plot(episode_loss, 20, 'Loss')
                    if is_ipython: display.clear_output(wait=True)
                    cum_reward =0
                    break


    def _train_step(self, cum_loss):

        # Sampling from memory a batch of experiences
        experiences, importance, indices = self.memory.sample(BATCH_SIZE, priority_scale=self.scale)
        states, actions, rewards, next_states, dones = extract_tensors(experiences, self.device)

        # Get the Q-values and the target values from the experiences
        current_q_values = QValues.get_current(self.policy_net, states, actions)
        next_q_values = QValues.get_next(self.target_net, next_states)
        target_q_values = (next_q_values * GAMMA) * (1 - dones).squeeze(1) + rewards
        weights = importance ** (1 - self.rate)
        errors, loss = get_loss(current_q_values, target_q_values, weights, BATCH_SIZE, self.device)

        # Update the priorities
        self.memory.set_priorities(indices, errors)

        # Finish the training step
        cum_loss += loss.item()
        self.optimizer.zero_grad()
        loss.backward()

        for param in self.policy_net.parameters():
            param.grad.data.clamp_(-1, 1)
        self.optimizer.step()
        return cum_loss



# Gym Wrapper

In [11]:
class MaxAndSkipEnv(gym.Wrapper):
    def __init__(self, env, skip=4):
        """Return only every `skip`-th frame"""
        gym.Wrapper.__init__(self, env)
        # most recent raw observations (for max pooling across time steps)
        self._obs_buffer = np.zeros((2,) + env.observation_space.shape, dtype='uint8')
        self._skip = skip

    def step(self, action):
        """Repeat action, sum reward, and max over last observations."""
        total_reward = 0.0
        done = None
        info = None
        danger = self.env.closest_dist_euc < 60 or not self.env.can_shoot()

        num_skip = 1 if danger else self._skip
        for i in range(num_skip):
            obs, reward, done, info = self.env.step(action)

            if i == num_skip - 2:
                self._obs_buffer[0] = obs
            if i == num_skip - 1:
                self._obs_buffer[1] = obs
            total_reward += reward
            if done:
                break
        # Note that the observation on the done=True frame
        # doesn't matter

        max_frame = np.max(np.stack(self._obs_buffer), axis=0)

        return max_frame, total_reward, done, info

In [12]:
class EpisodicLifeEnv(gym.Wrapper):
    def __init__(self, env=None):
        """Make end-of-life == end-of-episode, but only reset on true game over.
        Done by DeepMind for the DQN and co. since it helps value estimation.
        """
        super(EpisodicLifeEnv, self).__init__(env)
        self.lives = 0
        self.was_real_done = True
        self.was_real_reset = False

    def step(self, action):
        obs, reward, done, info = self.env.step(action)
        self.was_real_done = done
        # check current lives, make loss of life terminal,
        # then update lives to handle bonus lives
        lives = self.env.lives()
        if lives < self.lives and lives > 0:
          
            # for Qbert somtimes we stay in lives == 0 condtion for a few frames
            # so its important to keep lives > 0, so that we only reset once
            # the environment advertises done.
            done = True
        self.lives = lives
        return obs, reward, done, info

    def reset(self):
        """Reset only when lives are exhausted.
        This way all states are still reachable even though lives are episodic,
        and the learner need not know about any of this behind-the-scenes.
        """
        if self.was_real_done:
            obs = self.env.reset()
            self.was_real_reset = True
        else:
            # no-op step to advance from terminal/lost life state
            obs, _, _, _ = self.env.step(3)
            self.was_real_reset = False
        self.lives = self.env.lives()
        return obs

In [13]:
class LazyFrames(object):
    def __init__(self, frames):
        """This object ensures that common frames between the observations are only stored once.
        It exists purely to optimize memory usage which can be huge for DQN's 1M frames replay
        buffers.
        This object should only be converted to numpy array before being passed to the model.
        You'd not believe how complex the previous solution was."""
        self._frames = frames
        self._out = None

    def _force(self):
        if self._out is None:
            self._out = np.concatenate(self._frames, axis=-1)
            self._frames = None
        return self._out

    def __array__(self, dtype=None):
        out = self._force()
        if dtype is not None:
            out = out.astype(dtype)
        return out

    def __len__(self):
        return len(self._force())

    def __getitem__(self, i):
        return self._force()[i]

    def count(self):
        frames = self._force()
        return frames.shape[frames.ndim - 1]

    def frame(self, i):
        return self._force()[..., i]

In [14]:
class FrameStack(gym.Wrapper):
    def __init__(self, env, k):
        """Stack k last frames.
        Returns lazy array, which is much more memory efficient.
        See Also
        --------
        baselines.common.atari_wrappers.LazyFrames
        """
        gym.Wrapper.__init__(self, env)
        self.k = k
        self.frames = deque([], maxlen=k)
        shp = (HEIGHT, WIDTH, 1)
        self.observation_space = gym.spaces.Box(low=0, high=255, shape=(shp[0], shp[1], shp[2] * k))

    def reset(self):
        ob = self.env.reset()
        for _ in range(self.k):
            self.frames.append(ob)
        return self._get_ob()

    def step(self, action):
        ob, reward, done, info = self.env.step(action)
        self.frames.append(ob)
        obs = self._get_ob()
        return obs, reward, done, info

    def _get_ob(self):
        assert len(self.frames) == self.k
        return LazyFrames(list(self.frames))

# Q-Values

In [15]:
class QValues:
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

    @staticmethod
    def get_current(policy_net, states, actions):
        """
        given batch of states and actions and returns there current q values calculated by the given policy network
        """
        return policy_net(states.to(QValues.device)).gather(1, actions)

    @staticmethod
    def get_next(target_net, next_states):
        """
        Given batch of states and actions and returns there target values calculated by the given target network
        """
        final_state_locations = next_states.flatten(start_dim=1).max(dim=1)[0]
        final_state_locations = final_state_locations.eq(0).type(torch.bool)
        non_final_state_locations = (final_state_locations == False)
        non_final_states = next_states[non_final_state_locations]
        batch_size = next_states.shape[0]
        values = torch.zeros(batch_size).to(QValues.device)
        values[non_final_state_locations] = target_net(non_final_states).max(dim=1)[0].detach()
        return values


# Game Environment

In [16]:

ACTION_LEFT = 0
ACTION_RIGHT = 1
ACTION_FIRE = 2
ACTION_IDLE = 3

key_map = {0: K_LEFT, 1: K_RIGHT, 2: K_SPACE, 3: None}
DEFAULT_REWARDS = {'moving': 0, 'fire': 0, 'score': 1, 'death': -1, 'win': 0, 'step': 0}



class BubbleTroubleEnv(gym.Env):
    metadata = {'render.modes': ['rgb_array']}

    def __init__(self, rewards=None):
        pygame.init()
        self.rewards = rewards if rewards else DEFAULT_REWARDS
        self.action_space = gym.spaces.Discrete(4)
        self.n_steps = 0
        self.previous_score = None
        self.game = None
        self.closest_dist_euc, self.closest_ball_euc = 0, None
        self.closest_dist, self.closest_ball = 0, None
        # Init game
        self.surface = pygame.Surface((WINDOWWIDTH, WINDOWHEIGHT), pygame.RESIZABLE)

    def lives(self):
        """
        :return: Current number of lives of the player
        """
        return self.game.player.lives

    def can_shoot(self):
        """
        :return: True if the player can shoot and false otherwise
        """
        return self.game.player.can_shoot()

    def get_ball_list(self):
        """
        :return: List of all the ball objects in the game
        """
        return self.game.balls + self.game.hexagons

    def _update_closest_ball(self):
        # Update the closest ball and it's distance from the player
        # One for euclidean distance and once for x axis distance
        balls_list = self.get_ball_list()
        if balls_list:
            self.closest_ball, self.closest_dist = get_x_axis_closest_bubble(self.game, balls_list)
            self.closest_ball_euc, self.closest_dist_euc = get_euclidian_closest_bubble(self.game, balls_list)

    def reset(self):
        """
        Reset the environment and returns the first observation
        """
        self.n_steps = 0
        self.game = BubbleTroubleGame()
        self.game.load_level(1)
        self.previous_score = self.game.score
        return self._get_processed_screen()

    def step(self, action):
        """
        Execute a single step of the game
        :param action: int - action to execute
        """
        self.n_steps += 1
        self._make_single_step(action)
        state = self._get_processed_screen()
        win = self.game.is_completed
        done = self.is_done()
        reward = self._fitness(action, not self.game.player.is_alive, win, self._is_ball_hit())
        self.previous_score = self.game.score
        return state, reward, done, {}

    def is_done(self):
        """
        :return: True if the episode is over and false otherwise
        """
        return self.game.game_over or self.game.is_completed

    def render(self, mode='rgb_array', *args, **kwargs):
        """
        :return: current frame of the game
        """
        image = pygame.surfarray.array3d(self.surface)
        return image.swapaxes(1, 2).transpose((2, 0, 1))

    def _get_processed_screen(self):
        screen = self.render()
        screen = cv.cvtColor(screen, cv.COLOR_RGB2GRAY)
        screen = cv.resize(screen, (WIDTH, HEIGHT), interpolation=cv.INTER_AREA)
        screen = np.expand_dims(screen, -1)
        return screen

    def close(self):
        self.game.exit_game()

    def _fitness(self, action, dead, win, score_change):
        fitness = self.rewards['step']
        if action == ACTION_FIRE:
            fitness += self.rewards['fire']
        elif action != ACTION_IDLE:
            fitness += self.rewards['moving']
        if dead:
            fitness += self.rewards['death']
        if win:
            fitness += self.rewards['win']
        if score_change:
            fitness += self.rewards['score']
        return float(fitness)

    def _is_ball_hit(self):
        # True if an object is been destroyed and false otherwise
        return self.previous_score != self.game.score

    def _make_single_step(self, action):
        # Execute a single step of the game by drawing and updating the game image
        key = key_map[action]
        self.handle_key(key)
        self.game.update()
        self.draw_world()

    def render_with_states(self):

        img = np.ascontiguousarray(self.render(), dtype=np.uint8)
        c_x = int(self.game.player.position())
        closest_ball = self.closest_ball_euc.rect
        x, y = int(closest_ball.centerx), int(closest_ball.centery)
        p1, p2 = closest_ball.topleft, closest_ball.bottomright
        img = cv.rectangle(img, p1, p2, GREEN, 2)
        img = cv.line(img, (x, y), (c_x, WINDOWHEIGHT), GREEN, 1)
        return img

    def handle_key(self, key):
        game = self.game
        if key not in key_map.values():
            print('Wrong key')
            return
        if key == K_LEFT:
            game.move_player(direction=1)
        elif key == K_RIGHT:
            game.move_player(direction=-1)
        elif key == K_SPACE:
            game.fire_player()
        elif key is None:
            game.stop_player()

    def draw_world(self):
        self.surface.fill(WHITE)
        for hexagon in self.game.hexagons:
            self.draw_hex(hexagon)
        for ball in self.game.balls:
            self.draw_ball(ball)
        self._update_closest_ball()
        if self.game.player.weapon.is_active:
            self.draw_weapon(self.game.player.weapon)
        self.draw_player(self.game.player)

    def draw_ball(self, ball):
        self.surface.blit(ball.image, ball.rect)

    def draw_hex(self, hexagon):
        self.surface.blit(hexagon.image, hexagon.rect)

    def draw_player(self, player):
        self.surface.blit(player.image, player.rect)

    def draw_weapon(self, weapon):
        self.surface.blit(weapon.image, weapon.rect)



# Environment Manager

In [17]:


TOO_LOW = 80
TOO_CLOSE = 50
TOO_FAR = 320
TOO_FAR_X = 150

ACTION_LEFT = 0
ACTION_RIGHT = 1
ACTION_FIRE = 2
ACTION_IDLE = 3

reward_dict = {'moving': .0, 'fire': .0, 'score': 1, 'death': -1., 'win': 0, 'step': .0}


class EnvManager:
    """
    This class is responsible for managing the Bubble trouble environment
    """

    def __init__(self, device, num_frames=4, skip=2, to_skip=True, ep_live=False):
        self.device = device
        self.env = BubbleTroubleEnv(rewards=reward_dict)
        self.env = FrameStack(self.env, num_frames)
        if to_skip:
            self.env = MaxAndSkipEnv(self.env, skip)
        if ep_live:
            self.env = EpisodicLifeEnv(self.env)
        self.done = False
        self.env.reset()
        self.curr_screen = None
        self.frame_counter = 0

    def reset(self):
        """
        Reset the environment and returns the first observation
        """
        ob = self.env.reset()
        ob = get_state(ob)
        return ob

    def step(self, action):
        """
        Execute a single step of the game

        :param action: int - action to execute
        :return: observation - Tensor, reward -int, done -boolean, info -Not in use
        """
        ob, reward, done, info = self.env.step(action)
        ob = get_state(ob)
        self.done = done
        return ob, reward, done, info

    def close(self):
        self.env.close()

    def render(self):
        """
        :return: current frame of the game
        """
        return self.env.render('rgb_array')

    def num_actions_available(self):
        """
        :return: Number of steps
        """
        return self.env.action_space.n

    def just_starting(self):
        """
        :return: True if is the first frame and false otherwise
        """
        return self.curr_screen is None

    def can_shoot(self):
        """
        :return: True if the player may shoot and false otherwise
        """
        return self._get_player().can_shoot()

    def get_legal_actions(self):
        """
        :return:  Returns set of legal action minus 'irrational' if exist
        """
        player = self._get_player()
        actions = []
        logic_actions = self.get_logic_actions()
        if logic_actions:
            return logic_actions
        if player.rect.left > 0:
            actions.append(ACTION_LEFT)
        if player.rect.right < WINDOWWIDTH:
            actions.append(ACTION_RIGHT)
        if player.can_shoot() and self.env.closest_dist < TOO_FAR_X :
            actions.append(ACTION_FIRE)
        actions.append(ACTION_IDLE)
        return actions

    def get_logic_actions(self):
        """
        :return: Returns 'logical' set of action that the player may execute
        """
        if not self.env.closest_ball_euc:
            return []

        player = self._get_player()
        ball_rec = self.env.closest_ball_euc.rect
        on_the_left = ball_rec.centerx <= player.rect.centerx

        if self._is_in_danger(ball_rec):

            if on_the_left:
                return [ACTION_RIGHT, ACTION_FIRE] if player.can_shoot() else [ACTION_RIGHT]
            else:
                return [ACTION_LEFT, ACTION_FIRE] if player.can_shoot() else [ACTION_LEFT]

        if self._is_too_far() and self._is_normal_ball():

            if on_the_left:
                return [ACTION_LEFT]
            else:
                return [ACTION_RIGHT]

    def _is_in_danger(self, ball_rec):
        # True if the player in danger zone and false otherwise
        too_low = WINDOWHEIGHT - ball_rec.centery <= TOO_LOW
        player = self._get_player()
        return self._is_too_close() and (not player.can_shoot() or player.can_shoot() and too_low)

    def _is_normal_ball(self):
        # Normal means not Hexagon
        return self.env.closest_ball_euc in self.env.game.balls

    def _is_too_close(self):
        # True if too close to the closest ball and false otherwise
        return self.env.closest_dist_euc < TOO_CLOSE

    def _is_too_far(self):
        # True if too far from the closest ball and false otherwise
        return self.env.closest_dist > TOO_FAR

    def _get_game(self):
        return self.env.game

    def _get_player(self):
        return self._get_game().player


#Utill

In [18]:


Experience = namedtuple('Experience', ('state', 'action', 'next_state', 'reward', 'done'))


def init_networks(num_channels, num_actions, model, device):
    """
    :param num_channels: Number of channels in the input
    :param num_actions: Number of outputs
    :param model: The network
    :param device: Current device
    :return: initialized policy and target networks
    """
    policy_net = model(num_channels, num_actions).to(device)
    target_net = model(num_channels, num_actions).to(device)
    target_net.load_state_dict(policy_net.state_dict())
    target_net.eval()
    return policy_net, target_net


def load_model(policy, target, optimizer, path):
    """
    Load trained model from the given path
    """
    checkpoint = torch.load(path, map_location={'cuda:0': 'cpu'})
    policy.load_state_dict(checkpoint['model'])
    optimizer.load_state_dict(checkpoint['optimizer'])
    target.load_state_dict(policy.state_dict())
    target.eval()


def get_state(obs):
    """
    Converting the given observation to Tensor and returns it
    """
    state = np.array(obs)
    state = state.transpose((2, 0, 1))
    state = torch.from_numpy(state)
    return state.unsqueeze(0)


def extract_tensors(experiences, device):
    """
    Extracts the given experiences into tuple of Tensors and returns it
    """
    # Convert batch of Experiences to Experience of batches
    batch = Experience(*zip(*experiences))
    actions_b = tuple((map(lambda a: torch.tensor([[a]], device=device), batch.action)))
    rewards_b = tuple((map(lambda r: torch.tensor([r], device=device), batch.reward)))
    dones_b = tuple((map(lambda d: torch.tensor([[float(d)]], device=device), batch.done)))

    t1 = torch.cat(batch.state).to(device)
    t2 = torch.cat(actions_b)
    t3 = torch.cat(rewards_b)
    t4 = torch.cat([s for s in batch.next_state if s is not None]).to(device)
    t5 = torch.cat(dones_b)

    return t1, t2, t3, t4, t5


def get_loss(current_q_values, target_q_values, importance, batch_size, device):
    """
    Returns the weighted loss by the given importance and the TD errors
    """
    loss = (torch.tensor(importance, device=device) * F.smooth_l1_loss(current_q_values,
                                                                       target_q_values.unsqueeze(1))).mean()
    errors = abs(target_q_values.unsqueeze(1) - current_q_values.detach()).cpu().numpy().reshape(batch_size)
    return errors, loss


def get_euclidian_closest_bubble(game, bubbles_list):
    """
    :return: euclidean distance of agent and closest bubble from bubbles_list
    """
    bubbles_dist = [euclidean_dist_bubble_and_player(bubble, game.players[0]) for bubble in bubbles_list]
    min_dist_bubble_index = int(np.argmin(np.array(bubbles_dist)))
    return bubbles_list[min_dist_bubble_index], bubbles_dist[min_dist_bubble_index]


def get_x_axis_closest_bubble(game, bubbles_list):
    """
    :return: X axis distance of agent and closest bubble from bubbles_list
    """
    bubbles_dist = [dist_from_bubble_and_player(bubble, game.players[0], axis=0) for bubble in bubbles_list]
    min_dist_bubble_index = int(np.argmin(np.array(bubbles_dist)))
    return bubbles_list[min_dist_bubble_index], bubbles_dist[min_dist_bubble_index]


def dist_from_bubble_and_player(bubble, player, axis=0):
    """
    :return: dist of player from bubble in axis
    """
    if axis == 0:
        player_spot = player.rect.centerx
        pos_bubble_spot = bubble.rect.left
        neg_bubble_spot = bubble.rect.right
        return min(abs(player_spot - pos_bubble_spot), abs(player_spot - neg_bubble_spot))
    else:
        return abs(bubble.rect.bottom - player.rect.top)


def euclidean_dist_bubble_and_player(bubble, player):
    """
    :return: euclidean dist of player from bubble
    """
    return math.sqrt(math.pow(dist_from_bubble_and_player(bubble, player, 0), 2) + math.pow(
        dist_from_bubble_and_player(bubble, player, 1), 2))


def plot(values, moving_avg_period, val_type):
    plt.figure(2)
    plt.clf()
    plt.title('Training...')
    plt.xlabel('Episode')
    plt.ylabel(val_type)
    plt.plot(values, label=val_type)
    moving_avg = get_moving_average(moving_avg_period, values)
    plt.plot(moving_avg, label='Moving Average')
    plt.legend()
    plt.pause(0.001)
    print("Episode", len(values), "\n", moving_avg_period, "episode moving avg:", moving_avg[-1])


def get_moving_average(period, values):
    values = torch.tensor(values, dtype=torch.float)
    if len(values) >= period:
        moving_avg = values.unfold(dimension=0, size=period, step=1) \
            .mean(dim=1).flatten(start_dim=0)
        moving_avg = torch.cat((torch.zeros(period - 1), moving_avg))
        return moving_avg.numpy()
    else:
        moving_avg = torch.zeros(len(values))
        return moving_avg.numpy()


# Parameters


In [19]:
# Network Parameters
NUM_FILTER_L1 = 32
NUM_FILTER_L2 = 64
NUM_FILTER_L3 = 64

KER_L1 = 8
KER_L2 = 4
KER_L3 = 3

STRIDE_L1 = 4
STRIDE_L2 = 2
STRIDE_L3 = 1

# Shape of the network input (WIDTH,HEIGHT,NUM_OF_CHANNELS)
WIDTH = 84
HEIGHT = 84
NUM_OF_CHANNELS = 3

# Number of outputs
NUM_OF_ACTIONS = 4

# Training Parameters
SKIP_FRAMES = 3
EPS_START = 1.0
EPS_END = 0.02
EPS_DECAY = 70000
LR = 0.00025
BATCH_SIZE = 32
GAMMA = 0.99
TARGET_UPDATE = 10000
MEMORY_SIZE = 20000 # If you have access to high RAM you may change to 90,000
NUM_EPISODES = 10000


# Main : Run Train

In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
em = EnvManager(device, NUM_OF_CHANNELS, SKIP_FRAMES, to_skip=True, ep_live=True)
num_actions = em.num_actions_available()
test_em = EnvManager(device, NUM_OF_CHANNELS, SKIP_FRAMES, to_skip=False, ep_live=False)
policy_net, target_net = init_networks(NUM_OF_CHANNELS, num_actions, CNN, device)
optimizer = optim.Adam(params=policy_net.parameters(), lr=LR)
strategy = EpsilonGreedyStrategy(EPS_START, EPS_END, EPS_DECAY)
agent = Agent(policy_net, target_net, strategy, em, test_em, em.num_actions_available(), optimizer)
agent.train(NUM_EPISODES)