In [None]:
!pip install keras

In [None]:
!pip install tensorflow

In [None]:
pip install gym

In [1]:
import keras
from keras.layers import Dense, Activation
from keras.models import Sequential, load_model
from keras.optimizers import Adam
import numpy as np
from google.colab import drive
drive.mount('/content/drive')
import os

os.chdir('/content/drive/My Drive/Colab Notebooks/RL Bacteria')


Mounted at /content/drive


In [2]:


class ReplayBuffer(object):
    def __init__(self, max_size, input_shape, n_actions, discrete=False):
        self.mem_size = max_size
        self.mem_cntr = 0
        self.discrete = discrete
        self.state_memory = np.zeros((self.mem_size, input_shape))
        self.new_state_memory = np.zeros((self.mem_size, input_shape))
        dtype = np.int8 if self.discrete else np.float32
        self.action_memory = np.zeros((self.mem_size, n_actions), dtype=dtype)
        self.reward_memory = np.zeros(self.mem_size)
        self.terminal_memory = np.zeros(self.mem_size, dtype=np.float32)

    def store_transition(self, state, action, reward, state_, done):
        index = self.mem_cntr % self.mem_size
        self.state_memory[index] = state
        self.new_state_memory[index] = state_
        # store one hot encoding of actions, if appropriate
        if self.discrete:
            actions = np.zeros(self.action_memory.shape[1])
            actions[action] = 1.0
            self.action_memory[index] = actions
        else:
            self.action_memory[index] = action
        self.reward_memory[index] = reward
        self.terminal_memory[index] = 1 - done
        self.mem_cntr += 1

    def sample_buffer(self, batch_size):
        max_mem = min(self.mem_cntr, self.mem_size)
        batch = np.random.choice(max_mem, batch_size)

        states = self.state_memory[batch]
        actions = self.action_memory[batch]
        rewards = self.reward_memory[batch]
        states_ = self.new_state_memory[batch]
        terminal = self.terminal_memory[batch]

        return states, actions, rewards, states_, terminal

def build_dqn(lr, n_actions, input_dims, fc1_dims, fc2_dims):
    model = Sequential([
                 Dense(128, input_shape=(input_dims,)),

                 Dense(n_actions)])

    model.compile(optimizer=Adam(lr=lr), loss='mse')

    return model

class Agent(object):
    def __init__(self, alpha, gamma, n_actions, epsilon, batch_size,
                 input_dims, epsilon_dec=0.99984,  epsilon_end=0.01,
                 mem_size=100000, fname='my_dqn_model.h5'):
        self.action_space = [i for i in range(n_actions)]
        self.gamma = gamma
        self.epsilon = epsilon
        self.epsilon_dec = epsilon_dec
        self.epsilon_min = epsilon_end
        self.batch_size = batch_size
        self.model_file = fname
        self.memory = ReplayBuffer(mem_size, input_dims, n_actions,
                                   discrete=True)
        self.q_eval = build_dqn(alpha, n_actions, input_dims, 128, 128)


    def remember(self, state, action, reward, new_state, done):
        self.memory.store_transition(state, action, reward, new_state, done)

    def choose_action(self, state):
        state = state[np.newaxis, :]
        rand = np.random.random()
        if rand < self.epsilon:
            action = np.random.choice(self.action_space)
        else:
            actions = self.q_eval.predict(state)
            action = np.argmax(actions)

        return action

    def learn(self):
        print("epsilon: ",self.epsilon)
        if self.memory.mem_cntr > self.batch_size:
            state, action, reward, new_state, done = \
                                          self.memory.sample_buffer(self.batch_size)

            action_values = np.array(self.action_space, dtype=np.int8)
            action_indices = np.dot(action, action_values)

            q_eval = self.q_eval.predict(state)

            q_next = self.q_eval.predict(new_state)

            q_target = q_eval.copy()

            batch_index = np.arange(self.batch_size, dtype=np.int32)

            q_target[batch_index, action_indices] = reward + \
                                  self.gamma*np.max(q_next, axis=1)*done

            _ = self.q_eval.fit(state, q_target, verbose=0)

            self.epsilon = self.epsilon*self.epsilon_dec if self.epsilon > \
                           self.epsilon_min else self.epsilon_min

    def save_model(self):
        self.q_eval.save(self.model_file)

    def load_model(self):
        self.q_eval = load_model(self.model_file)
        self.q_eval.summary()

In [3]:
import numpy as np
import gym
from gym import spaces
import numpy as np

class Bacteria:
    def __init__(self, position, direction):
        self.position = position
        self.direction = direction
        self.energy = 60

    def change_direction(self, direction):
        self.direction = direction

    def move(self, board, to):
        self.change_direction(to)
        next_pos = self.get_next_position()
        if self.is_valid_position(next_pos, board):
            self.update_position(next_pos)

    def get_next_position(self):
        x, y = self.position
        if self.direction == "N":
            return (x-1, y)
        elif self.direction == "NE":
            return (x-1, y+1)
        elif self.direction == "E":
            return (x, y+1)
        elif self.direction == "SE":
            return (x+1, y+1)
        elif self.direction == "S":
            return (x+1, y)
        elif self.direction == "SW":
            return (x+1, y-1)
        elif self.direction == "W":
            return (x, y-1)
        elif self.direction == "NW":
            return (x-1, y-1)

    def is_valid_position(self, pos, board):
        x, y = pos
        if x < 0 or x >= board.shape[0] or y < 0 or y >= board.shape[1]:
            return False
        return True

    def update_position(self, pos):
        self.position = pos

    def calculate_smell(self, board, food_pos):
        std_dev = 3
        for i in range(board.shape[0]):
            for j in range(board.shape[1]):
                food_value = (board[food_pos[0]][food_pos[1]])/1.5
                #print(food_value,"food value")
                dist = self.distance((i, j), food_pos)
                smell = food_value * np.exp(-dist**2 / (2*std_dev**2))

                if(i!= food_pos[0] or j!= food_pos[1] ):
                    board[i][j] += smell

    def distance(self, pos1, pos2):
        x1, y1 = pos1
        x2, y2 = pos2
        return np.sqrt((x1-x2)**2 + (y1-y2)**2)



class BacteriaEnvironment(gym.Env):
    metadata = {'render.modes': ['human']}

    def __init__(self, board_size=10, num_food=1):
        super(BacteriaEnvironment, self).__init__()

        self.board_size = board_size
        self.num_food = num_food
        self.food_positions = []
        self.board = None
        self.bacteria = None

        self.action_space = spaces.Discrete(8) # 8 possible actions
        self.observation_space = spaces.Box(low=-1, high=100, shape=(3, 3), dtype=np.float16)

        self.reset()

    def getNeighbors(self):
      neighbors = np.zeros((3, 3), dtype=np.float16)

      row, col = self.bacteria.position[0], self.bacteria.position[1]
      neighbors[1][1]= self.board[row][col]

      if row+1 > self.board_size - 1: # x+1, y
        neighbors[2][1]= -1
      else:
        neighbors[2][1]= self.board[row+1][col]

      if row-1 < 0: # x-1, y
        neighbors[0][1]= -1
      else:
        neighbors[0][1]= self.board[row-1][col]

      if col-1 < 0: # x, y-1
        neighbors[1][0]= -1
      else:
        neighbors[1][0]= self.board[row][col-1]

      if col+1 > self.board_size - 1: # x, y+1
        neighbors[1][2]= -1
      else:
        neighbors[1][2]= self.board[row][col+1]

      if row+1 > self.board_size - 1 or col+1 > self.board_size - 1: # x+1, y+1
        neighbors[2][2]= -1
      else:
        neighbors[2][2]= self.board[row+1][col+1]

      if row-1 < 0 or col-1 < 0: # x-1, y-1
        neighbors[0][0]= -1
      else:
        neighbors[0][0]= self.board[row-1][col-1]

      if row+1 > self.board_size - 1 or col-1 < 0: # x+1, y-1
        neighbors[2][0]= -1
      else:
        neighbors[2][0]= self.board[row+1][col-1]

      if row-1 < 0 or col+1 > self.board_size - 1: # x-1, y+1
        neighbors[0][2]= -1
      else:
        neighbors[0][2]= self.board[row-1][col+1]
      return neighbors

    def reset(self):
        # initialize the board
        self.board = np.zeros((self.board_size, self.board_size), dtype=np.float16)

        # randomly place food on the board
        self.food_positions = []
        for i in range(self.num_food):
            food_pos = (np.random.randint(0, self.board_size), np.random.randint(0, self.board_size))
            self.food_positions.append(food_pos)
            self.board[food_pos[0]][food_pos[1]] = 100


        # create the bacteria
        start_pos = (self.board_size//2, self.board_size//2)
        #start_pos = (5, 3)
        self.bacteria = Bacteria(start_pos, "N")

        self.cost = 0
        for food_pos in self.food_positions:
            self.bacteria.calculate_smell(self.board, food_pos)


        return self.getNeighbors().flatten()


    def step(self, action):
        done = False
        reward = 0
        past_pos= self.bacteria.position
        bac_x = self.bacteria.position[0]
        bac_y = self.bacteria.position[1]

        if action == 0:
            if(self.bacteria.direction=="W"):
                self.bacteria.energy -=0.1
            else:
                self.bacteria.energy -=1.1
            if bac_y == 0:
                reward -=20
                self.bacteria.move(self.board, "W")
            else:
                #reward -= 0.5
                self.bacteria.move(self.board, "W")
        elif action == 1:
            if(self.bacteria.direction=="E"):
                self.bacteria.energy -=0.1
            else:
                self.bacteria.energy -=1.1
            if bac_y == len(self.board)-1:
                reward -=20
                self.bacteria.move(self.board, "E")
            else:
                #reward -=0.5
                self.bacteria.move(self.board, "E")
        elif action == 2:
            if(self.bacteria.direction=="N"):
                self.bacteria.energy -=0.1
            else:
                self.bacteria.energy -=1.1
            if bac_x == 0:
                reward -=20
                self.bacteria.move(self.board, "N")
            else:
                #reward -=0.5
                self.bacteria.move(self.board, "N")
        elif action == 3:
            if(self.bacteria.direction=="S"):
                self.bacteria.energy -=0.1
            else:
                self.bacteria.energy -=1.1
            if bac_x == len(self.board)-1:
                reward -=20
                self.bacteria.move(self.board, "S")
            else:
                #reward -=0.5
                self.bacteria.move(self.board, "S")
        elif action == 4:
            if(self.bacteria.direction=="NE"):
                self.bacteria.energy -=0.1
            else:
                self.bacteria.energy -=1.1
            if (bac_y==len(self.board)-1 or bac_x==0):
                reward -=20
                self.bacteria.move(self.board, "NE")
            else:
                #reward -=0.5
                self.bacteria.move(self.board, "NE")
        elif action == 5:
            if(self.bacteria.direction=="NW"):
                self.bacteria.energy -=0.1
            else:
                self.bacteria.energy -=1.1
            if (bac_x==0 or bac_y==0):
                reward -=20
                self.bacteria.move(self.board, "NW")
            else:
                #reward -=0.5
                self.bacteria.move(self.board, "NW")
        elif action == 6:
            if(self.bacteria.direction=="SE"):
                self.bacteria.energy -=0.1
            else:
                self.bacteria.energy -=1.1
            if (bac_x==len(self.board)-1 or bac_y==len(self.board)-1):
                reward -=20
                self.bacteria.move(self.board, "SE")
            else:
                #reward -=0.5
                self.bacteria.move(self.board, "SE")
        elif action == 7:
            if(self.bacteria.direction=="SW"):
                self.bacteria.energy -=0.1
            else:
                self.bacteria.energy -=1.1
            if (bac_y==0 or bac_x==len(self.board)-1):
                reward -=20
                self.bacteria.move(self.board, "SW")
            else:
                #reward -=0.5
                self.bacteria.move(self.board, "SW")

        new_pos = self.bacteria.position

        # check if the bacteria has found any food
        if new_pos in self.food_positions:
            reward += 200
            self.food_positions.remove(new_pos)
            self.bacteria.energy += 15
        else:
            reward = reward +(self.board[new_pos] - self.board[past_pos])

        # check if the bacteria is out of energy or has found all the food
        if self.bacteria.energy <= 0:
            done = True
            reward -=1000
        elif len(self.food_positions) == 0:
            done = True
            reward +=1000

        # create the observation for the new state
        observation_ = self.getNeighbors()
        #self.show()
        return observation_.flatten(), reward, done

    def show(self):
      # print the board as a grid
        for row in range(self.board_size):
            for col in range(self.board_size):
                # check if the current cell contains the bacteria
                if (row, col) == self.bacteria.position:
                    print('B', end=' ')
                # check if the current cell contains any food
                elif (row, col) in self.food_positions:
                    print('F', end=' ')
                # otherwise, print an empty cell
                else:
                    print('-', end=' ')
            print()

    def show_rewards(self):
          for row in range(self.board_size):
            for col in range(self.board_size):
                # check if the current cell contains the bacteria
                    print(self.board[row][col], end='  ')

            print()





# Training Process

In [None]:


import matplotlib.pyplot as plt

def plotterr1(ii, avg, title):
    x1.append(ii)
    y1.append(avg)
    plt.scatter(x1, y1)
    # set the x and y axis labels
    plt.xlabel('X axis')
    plt.ylabel('Y axis')
    # set the title of the plot
    plt.title(title)
    # show the plot
    plt.show()

def plotterr2(ii, avg, title):
    x2.append(ii)
    y2.append(avg)
    plt.scatter(x2, y2)
    # set the x and y axis labels
    plt.xlabel('X axis')
    plt.ylabel('Y axis')
    # set the title of the plot
    plt.title(title)
    # show the plot
    plt.show()



x1=[]
y1=[]
x2=[]
y2=[]

env = BacteriaEnvironment()
lr = 0.001
n_games = 500

agent = Agent(gamma=0.3, epsilon=0.9, alpha=lr, input_dims=3*3, n_actions=8, mem_size=100000, batch_size=64, epsilon_end=0.01)
agent.save_model()
agent.load_model()
scores = []
eps_history = []

for i in range(n_games):
    done = False
    score = 0
    rewpos = []
    observation = env.reset()
    print("******************new game********************")
    #env.show()
    while not done:
        #print(observation)
        #env.show_rewards()
        action = agent.choose_action(observation)
        #print(action)
        observation_, reward, done = env.step(action)
        #env.show()
        #print(reward," reward")
        score += reward
        if(reward>0):
            rewpos.append(reward)
        agent.remember(observation, action, reward, observation_, int(done))
        observation = observation_
        agent.learn()

    eps_history.append(agent.epsilon)
    scores.append(score)
    sum_positive = sum(rewpos)

    plotterr1(i,score, "Sum of Rewards vs Epochs")
    print("  ")
    plotterr2(i,sum_positive, "Sum of Positive Rewards vs Epochs")


    # show the plot
    plt.show()

    if i % 10 == 0 and i > 0:
        agent.save_model()
        print("save erav")

Output hidden; open in https://colab.research.google.com to view.

# Using the RL to simulate

In [8]:
agent = load_model('my_dqn_model.h5')

def solve_by_RL(state):
  obs = state
  #print(observation)
  #observation = observation.flatten()
  observation = state[np.newaxis, :]
  actions = agent.predict(observation)
  action = np.argmax(actions)
  return action

env = BacteriaEnvironment()
observation = env.reset()
done = False
while not done:
  action = solve_by_RL(observation)
  observation_, reward, done = env.step(action)
  observation = observation_
  print(observation)
  env.show()

#run several times for different results

[40.44 38.25 32.38 27.4  25.92 21.95 16.62 15.73 13.31]
- - - F - - - - - - 
- - - - - - - - - - 
- - - - - - - - - - 
- - - - - - - - - - 
- - - - B - - - - - 
- - - - - - - - - - 
- - - - - - - - - - 
- - - - - - - - - - 
- - - - - - - - - - 
- - - - - - - - - - 
[50.5  53.38 50.5  38.25 40.44 38.25 25.92 27.4  25.92]
- - - F - - - - - - 
- - - - - - - - - - 
- - - - - - - - - - 
- - - B - - - - - - 
- - - - - - - - - - 
- - - - - - - - - - 
- - - - - - - - - - 
- - - - - - - - - - 
- - - - - - - - - - 
- - - - - - - - - - 
[59.66 63.06 59.66 50.5  53.38 50.5  38.25 40.44 38.25]
- - - F - - - - - - 
- - - - - - - - - - 
- - - B - - - - - - 
- - - - - - - - - - 
- - - - - - - - - - 
- - - - - - - - - - 
- - - - - - - - - - 
- - - - - - - - - - 
- - - - - - - - - - 
- - - - - - - - - - 
[ 63.06 100.    63.06  59.66  63.06  59.66  50.5   53.38  50.5 ]
- - - F - - - - - - 
- - - B - - - - - - 
- - - - - - - - - - 
- - - - - - - - - - 
- - - - - - - - - - 
- - - - - - - - - - 
- - - - - -