In [1]:
import os
import gym
import pickle
import matplotlib.pyplot as plt
import numpy as np
import ppaquette_gym_doom
from collections import deque

In [2]:
# Create a classic Doom environment with Gym
env = gym.make('ppaquette/DoomDefendCenter-v0')

INFO:gym.envs.registration:Making new env: ppaquette/DoomDefendCenter-v0
[2017-04-26 20:47:43,780] Making new env: ppaquette/DoomDefendCenter-v0


In [3]:
class Agent():
    def __init__(self,\
               learn_rate=0.05,\
               observation_encoder=lambda x: x,\
               verbose=True):
        self.alpha = 0.9 # Probability of re-learning 
        self.policy = dict() # Vπ: action => state => v
        self.total_reward = 0
        self.learn_rate = learn_rate
        self.observation_encoder = observation_encoder
        self.history = deque() # List of (state, action, reward), first is newest
    
    def reset(self):
        self.history.clear()
    
    def learn_aggregate(self, observation, observation_, action, reward):
        # Encode [observation] => [state]
        state  = self.observation_encoder(observation)
        state_ = self.observation_encoder(observation_)
        self.history.appendleft((state, action, reward))
        self.learn_V(state, action, reward, state_)
        
    """
    Find maximum possible reward we would get
    from the best action attempted on [state]
    """
    def find_best_action(self, state):
        best_return = -1
        best_action = -1
        for action in self.policy:
            if state in self.policy[action]:
                if self.policy[action][state] > best_return:
                    best_action = action
                    best_return = self.policy[action][state]
        return (best_action, best_return)
    
    """
    Query the value policy V(state):π
    """
    def V(self, state, action):
        if action not in self.policy:
            self.policy[action] = {state: 0}
        if state not in self.policy[action]:
            self.policy[action][state] = 0
        return self.policy[action][state]

    """
    Update the value policy π
    """
    def learn_V(self, state, action, reward, next_state):
        old_V = self.V(state, action)
        new_V = self.V(next_state, action)
        
        self.policy[action][state] = old_V + \
            self.learn_rate * (reward + self.alpha * new_V - old_V)
    
    @staticmethod
    def load(path, default):
        if os.path.isfile(path):
          with open(path,'rb') as f:
            return pickle.load(f) 
        else:
          print('MODEL NOT FOUND, initialising a brand new one.')
          return default

    @staticmethod
    def save(path,agent):
        with open(path,'wb+') as f:
          return pickle.dump(agent, f)

In [4]:
# Computer vision utils
from PIL import Image
from scipy.stats import threshold
from scipy.signal import medfilt
from scipy.misc import toimage
def encode_screen(observation):
  # Crop & downsampling & grayscale
  cropped = observation[150:230:3, ::2 , :]
  r = cropped[:,:,0]
  return projection(pixelate(r))

def show(mat):
  toimage(mat).show()

"""
Coarse pixelate
"""
def pixelate(observation):
  # Threshold
  m = threshold(observation, threshmin=100, threshmax=None, newval=0)
  # Remove noise
  m = medfilt(m, 3)
  return m

def projection(pixels):
  # Horizontal projection
  _,w = pixels.shape
  proj = np.zeros(w)
  for x in range(w):
    proj[x] = np.where(np.sum(pixels[:,x]) < 2400, 1, 0)

  # Reduce
  pj = []
  count = np.count_nonzero
  start = 0
  stride = 5
  while len(proj[start:start+stride])>0:
    pj.append(count(proj[start:start+stride]) > 3)
    start += stride+1
    
  return str(pj)