In [106]:
import numpy as np
import random
from matplotlib import pyplot
from dataclasses import dataclass
from collections import deque

# ML
import tensorflow as tf
from tensorflow.keras import layers, models, losses, optimizers

In [107]:
@dataclass
class QSettings():
    epsilon: float = 1.0
    default_epsilon: float = epsilon
    epsilon_min = 0.01
    epsilon_decay = 0.99
    discount: float = 0.9
    learning_rate: float = 0.001
        
    def reset(self):
        self.epsilon = self.default_epsilon
    
        
    

In [108]:
testt = [ 
    np.array([0, 1], dtype=np.float32),  # radial movement
    np.array([0, 2*np.pi], dtype=np.float32), # Anglular movement
]
neww = (np.random.uniform(testt[0][0], testt[0][1]), np.random.uniform(testt[1][0], testt[1][1]))
print(neww)

(0.42945926372600585, 0.612431402464325)


In [109]:
class Math:
    @staticmethod
    def cartesian_to_polar(x,y):
        return (np.sqrt(x*x + y*y), np.arctan2(y,x))
    
    @staticmethod
    def polar_to_cartesian(r, theta):
        return (r*np.cos(theta), r*np.sin(theta))
    
    @staticmethod
    def distance(x1, x2, y1, y2):
        return np.sqrt( np.power( (x2 - x1) , 2) + np.power( (y2 - y1 ) , 2) )

In [110]:
class Character:
    def __init__(self, x, y, health: float, attdmg: float):
        self.x = x
        self.y = y
        self.startingx = x
        self.startingy = y
        self.health = health
        self.attdmg = attdmg

# Move qsettings to agent (?)
        
class Agent(Character):
    def __init__(self, env, x, y):
        super().__init__(x,y, 200, 10)
        self.env = env
        self.memory = deque(maxlen=3000)
        
        self.actions = [ 
            np.array([0, 1], dtype=np.float32),  # radial movement
            np.array([0, 2*np.pi], dtype=np.float32), # Anglular movement
        ]
        self.action_size = len(self.actions)
        
        self.state_size = 2 # (x, y)
        
        self.model = self._build_model()
        self.target_model = self._build_model()
        self.update_target_model()
        
        
    def _build_model(self):
        """
        Builds state-action DNN model
        """
        model = models.Sequential()
        model.add(layers.Dense(24, input_dim = self.state_size, activation='relu')) # state size is 2 (agent x, agent y)
        model.add(layers.Dense(24, activation='relu'))
        model.add(layers.Dense(self.action_size, activation='linear'))
        model.compile(loss='mse', optimizer=optimizers.Adam(lr=self.env.qsettings.learning_rate))
        return model
    
    def update_target_model(self):
        self.target_model.set_weights(self.model.get_weights())
        
    def memorize(self, state, action, reward, next_state, done):
        self.memory.append((state, action, reward, next_state, done))
        
    
    
    # model must hold polar coordinates because it's relative to the action
    def get_next_action(self, state):
        if np.random.rand() <= self.env.qsettings.epsilon:
            return (np.random.uniform(testt[0][0], testt[0][1]), np.random.uniform(testt[1][0], testt[1][1]))
        act_values = self.model.predict(state)
        return act_values
    
    def move(self, x, y, action):
        assert isinstance(action, tuple)
        assert len(action)==2
        r = action[0]
        theta = action[1]
        newx, newy = Math.polar_to_cartesian(r, theta)
        return newx, newy
        
    
    def step(self, action):
        self.x, self.y = self.move(self.x, self.y, action)
        if len(self.env.nearby_enemies(self.x, self.y)) > 0:
            reward = -20
        elif self.env.is_end(self.x, self.y):
            reward = 100
        else:
            reward = -1
        state = (self.x, self.y)
        done = self.env.is_end(self.x, self.y)
        return (state, reward, done)
        
            
        
    
    def replay(self, batch_size: int):
        minibatch = random.sample(self.memory, batch_size)
        for state, action, reward, next_state, done in minibatch:
            target = self.model.predict(self.build_state(state))
            if done:
                target = reward
            else:
                t = self.target_model.predict(self.build_state(next_state))[0]
                print("t= ",t)
                target = reward + self.env.qsettings.discount * np.amax(t)
                print("target= ",target)
            self.model.fit(self.build_state(state), target, epochs=1, verbose=0)
        if self.env.qsettings.epsilon > self.env.qsettings.epsilon_min:
            self.env.qsettings.epsilon *= self.env.qsettings.epsilon_decay
            
    def build_state(self, *args):
        if len(args)==2:
            # x, y passed
            x = args[0]
            y = args[1]
            return np.array([[x, y],])
        elif len(args)==1:
            # state passed
            state = args[0]
            return np.array([state,])
            
    def reset(self):
        self.env.qsettings.reset()
        # Reset state
        self.x = self.startingx
        self.y = self.startingy
        return self.build_state(self.x, self.y)
    
    def train(self, epochs: int, max_time_limit: int, batch_size: int):
        done = False
        for epoch in range(epochs):
            state = self.reset()
            for time in range(max_time_limit):
                print(f"Running epoch {epoch}/{epochs-1} - time {time}/{max_time_limit-1}", end='\r',flush=True)
                action = self.get_next_action(state)
                next_state, reward, done = self.step(action)
                self.memorize(state, action, reward, next_state, done)
                state = next_state
                if done:
                    self.update_target_model()
                    print("episode: {}/{}, score: {}/{}, e: {:.2}"
                      .format(epoch, epochs-1, time, max_time_limit-1, self.env.qsettings.epsilon))
                    break
                else:
                    if len(self.memory) > batch_size:
                        self.replay(batch_size)
                
        
            
        
class Enemy(Character):
    def __init__(self, x,y, radius=10):
        super().__init__(x,y, 50, 7)
        self.radius = radius

In [111]:
class Environment():
    def __init__(self, mapsize: tuple = (30,30), timeout = 500, nenemies = 50, endpos: tuple = (0,0) , qsettings = QSettings()):
        self.mapsize = mapsize
        self.qsettings = qsettings
        self.enemies = []
        self.endpos = endpos
        self._build_enemies(nenemies)
    
    def _build_enemies(self, nenemies):
        x = np.random.uniform(0, self.mapsize[0], size=nenemies)
        y = np.random.uniform(0, self.mapsize[1], size=nenemies)
        for i in range(len(x)):
            self.enemies.append(Enemy(x[i], y[i]))
            
    def nearby_enemies(self, x, y):
        inrange_enemies = []
        for enemy in self.enemies:
            if Math.distance(x, enemy.x, y, enemy.y) < enemy.radius:
                inrange_enemies.append(enemy)
        return inrange_enemies 
    
    def is_end(self, x, y):
        if x == self.endpos[0] and y == self.endpos[1]:
            return True
        else:
            return False

In [112]:
e = Environment(nenemies=55)
a = Agent(e,1,1)

In [113]:
a.train(100, 64, 32)

t=  [-0.00781407  0.0042919 ]63
target=  -19.99613728877157


ValueError: Failed to find data adapter that can handle input: <class 'numpy.ndarray'>, <class 'numpy.float64'>