# BTD6 AI
Builds an AI that can play BTD 6.

In [1]:
!pip install onnxruntime
!pip install func_timeout

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting onnxruntime
  Downloading onnxruntime-1.13.1-cp38-cp38-manylinux_2_27_x86_64.whl (4.5 MB)
[K     |████████████████████████████████| 4.5 MB 4.2 MB/s 
Collecting coloredlogs
  Downloading coloredlogs-15.0.1-py2.py3-none-any.whl (46 kB)
[K     |████████████████████████████████| 46 kB 4.2 MB/s 
Collecting humanfriendly>=9.1
  Downloading humanfriendly-10.0-py2.py3-none-any.whl (86 kB)
[K     |████████████████████████████████| 86 kB 6.4 MB/s 
Installing collected packages: humanfriendly, coloredlogs, onnxruntime
Successfully installed coloredlogs-15.0.1 humanfriendly-10.0 onnxruntime-1.13.1
Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting func_timeout
  Downloading func_timeout-4.3.5.tar.gz (44 kB)
[K     |████████████████████████████████| 44 kB 1.6 MB/s 
[?25hBuilding wheels for collected packages: func-timeout
  Bui

In [2]:
from google.colab import drive 
drive.mount('/content/drive')

Mounted at /content/drive


In [3]:
import pandas as pd
from tqdm.notebook import tqdm
from tqdm import trange
import tensorflow as tf
import numpy as np
import json
import math
import onnxruntime as ort
import random
import math

In [4]:
np.random.seed(1)
random.seed(1)

In [5]:
btd6_prediction_model_path = '/content/drive/MyDrive/ScienceFair/prediction_model.onnx'
bloon_rounds_path = '/content/drive/MyDrive/ScienceFair/Data/BTD6_Bloon_Rounds.csv'
map_file_path = '/content/drive/MyDrive/ScienceFair/Data/map_points.json'
possible_placements_path = '/content/drive/MyDrive/ScienceFair/Data/possible_placements.json'

In [6]:
class Point:
  def __init__(self, x, y):
    self.x = x
    self.y = y
  def distance_squared(self, other):
      dx = self.x - other.x
      dy = self.y - other.y
      return dx*dx + dy*dy

  def interpolate(self, other, start_distance):
    """ Returns a list of points interpolated from two points
    and the magnitude of the next starting vector.
    """
    interpolated_points =[]
    vector = np.array([other.x - self.x, other.y-self.y])
    distance = np.linalg.norm(vector)
    if distance ==0:
      return [], start_distance
    vector_norm = vector/distance
    start_vector = np.zeros(2)+vector_norm*start_distance
    # Take steps with the normalized vector of magnitude 4      
    while np.linalg.norm([start_vector[0], start_vector[1]]) < np.linalg.norm([other.x-self.x, other.y-self.y]):
      vector_to_point = Point(start_vector[0]+self.x, start_vector[1]+self.y)
      interpolated_points.append(vector_to_point)
      start_vector+=vector_norm*4


    return interpolated_points, np.linalg.norm([start_vector[0], start_vector[1]]) - np.linalg.norm([other.x-self.x, other.y-self.y])


class TowerCost:
  """ Class that represents a tower's cost.
  Args:
    base_cost: int, the base cost of the tower
    top_cost: list of length 5, cost given by top path upgrades
    middle_cost: list of length 5, cost given by middle path upgrades
    bottom_cost: list of length 5, cost given by bottom path upgrades
  """
  def __init__(self, base_cost, top_cost,middle_cost,bottom_cost):
    assert len(top_cost) == 5 and len(middle_cost) == 5 and len(bottom_cost) == 5
    self.base_cost = base_cost
    self.top_cost = top_cost
    self.middle_cost = middle_cost
    self.bottom_cost = bottom_cost
  def find_tower_cost(self, upgrade_path):
    """ Finds tower cost based on its upgrades.
    Args:
      upgrade_path: 3 char string (EX: "001")
    Returns:
      An integer representing tower cost.
    """
    top_path, middle_path, bottom_path = int(upgrade_path[0]), int(upgrade_path[1]),int(upgrade_path[2])
    assert top_path < 6 and middle_path < 6 and bottom_path < 6 and len(upgrade_path) == 3
    return self.base_cost+sum(self.top_cost[:top_path])+sum(self.middle_cost[:middle_path])+sum(self.bottom_cost[:bottom_path])

class TowerRange:
  """ Class that represents a tower's ranges.
  Args:
    base_range: int, the base range of the tower
    top_range: list of length 6, range given by top path upgrades
    middle_range: list of length 6, range given by middle path upgrades
    bottom_range: list of length 6, range given by bottom path upgrades
  """
  def __init__(self, base_range, top_range,middle_range,bottom_range):
    assert len(top_range) == 6 and len(middle_range) == 6 and len(bottom_range) == 6
    self.base_range = base_range
    self.top_range = top_range
    self.middle_range = middle_range
    self.bottom_range = bottom_range

  def find_tower_coverage(self,upgrade_path, tower_position, map_points):
    """ Finds the # times a tower attacks given a simulation of 6 balloons
    Args:
      upgrade_path: 3 char string (EX: "001")
      tower_position: point, tower position
      map_points: list of points of a map path
    Returns:
      num of points
    """
    top_path, middle_path, bottom_path = int(upgrade_path[0]), int(upgrade_path[1]),int(upgrade_path[2])
    assert top_path < 6 and middle_path < 6 and bottom_path < 6 and len(upgrade_path) == 3

    tower_range = self.base_range+self.top_range[top_path]+self.middle_range[middle_path]+self.bottom_range[bottom_path]
    
    points_in_range = []
    for idx, point in enumerate(map_points):
      if point.distance_squared(tower_position) < (tower_range+5)*(tower_range+5):
        points_in_range.append(idx)

    num_attacks = 0
    attack_cooldown = 21
    for idx, point in enumerate(map_points):
      attack_cooldown+=1
      if attack_cooldown > 20:
        for i in range(6):
          if idx-i*70 > 0 and idx-i*70 in points_in_range:
            num_attacks+=1
            attack_cooldown = 0
            break

    map_length =  len(map_points)
    for idx in range(map_length-70*6):
      attack_cooldown+=1
      if attack_cooldown > 20:
        for i in range(6):
          if -200+idx+i*70 < 0 and map_length -200+idx + i*70 in points_in_range:
            num_attacks+=1
            attack_cooldown = 0
            break

    return num_attacks


def get_bloons_per_round():
  bloon_rounds = pd.read_csv(bloon_rounds_path)
  bloon_rounds_dict = bloon_rounds.to_dict()
  return [[bloon_rounds_dict[bloon][round]/50 for bloon in bloon_rounds.columns if bloon!='Round' and bloon!='RBE'] for round in range(100)]

  

In [7]:
INCOME_PER_ROUND=[650,650,650,650,650,650,813,995,1195,1394,1708,1897,2089,2371,2630,2896,3164,3329,3687,3947,4133,4484,
        4782,5059,5226,5561,5894,6556,6822,7211,7548,8085,8712,8917,9829,10979,11875,13214,14491,16250,
        16771,18952,19611,20889,22183,24605,25321,26958,29801,34559,37575,38673.5,40269,41193.5,43391,45874,
        47160,49019,51317,53476,54399,55631,57017,59843,60693,63764,64769,65792,66570,67961,70580,72083,
        73587,74979,78023,80691,82007,84547,89409,96118,97518,102884,107641,112390,119434,122060,123008,
        125635, 128949, 131120,131460,135651,140188,142135,149802,153520,163475,164893,174546,177374,178909]
TOWER_NAMES = ["WizardMonkey","Alchemist","NinjaMonkey","Druid","SuperMonkey"]
UPGRADE_PATHS = ["000","001","010","100","002","020","200","101","110","011","201","102","210","120","012","021","220","202","022","032","023","302","203","320","230","042","024","204","402","240","420","052","025","502","205","520","250","031","013","301","103","310","130","041","014","104","401","140","410","051","015","501","105","510","150","030","003","300","040","004","400","050","005","500"]
TOWERS= {'WizardMonkey': {'Range':TowerRange(40,[0,0,0,20,20,30],[0,0,0,10,10,20],[0,0,0,20,20,20]), 
                                 'Cost':TowerCost(450,[180,540,1560,12000,38400],[360,1140,3600,5400,64800],[360,360,1800,3360,31800])}, 
                'Alchemist':{'Range':TowerRange(45,[0,0,0,0,0,0],[0,0,0,0,0,0],[0,0,0,0,0,0]), 
                             'Cost':TowerCost(660,[300,420,1500,3600,72000],[300,570,3600,5400,54000],[780,540,1200,3300,48000])}, 
                'NinjaMonkey':{'Range':TowerRange(40,[0,7,7,7,7,18],[0,0,0,0,0,5],[0,0,0,0,0,0]),
                               'Cost':TowerCost(600,[360,420,1020,3300,42000],[420,600,1080,6240,26400],[300,480,2700,6000,48000])}, 
                'Druid':{'Range':TowerRange(35,[0,0,0,0,0,0],[0,0,0,0,10,10],[0,10,10,10,10,15]),
                         'Cost':TowerCost(480,[300,1200,1980,5400,78000],[300,420,1140,6000,42000],[120,360,720,3000,54000])}, 
                'SuperMonkey':{'Range':TowerRange(50, [0,0,0,0,15,15], [0,10,22,22,22,32], [0,0,0,3,3,3]),
                               'Cost':TowerCost(3000,[3000,3600,24000,120000,600000],[1200,1680,9600,22800,108000],[3600,1440,6720,72000,240000])}
                }

BLOONS_PER_ROUND =get_bloons_per_round()
with open(map_file_path, 'r') as file:
  MAP = json.loads(file.read())
MAP_POINTS =  [Point(point['x'],point['y']) for point in MAP]

prediction_model = ort.InferenceSession(btd6_prediction_model_path)

In [8]:
def get_map_points(map):
  points=[]
  starting_distance = 0
  for idx, point in enumerate(map):
    if idx != len(map)-1 and idx !=0:
      point1 = Point(point['x'], point['y'])
      point2 = Point(map[idx+1]['x'], map[idx+1]['y'])
      temp, starting_distance = point1.interpolate(point2, starting_distance)
      points.extend(temp)
  return points

MAP_POINTS = get_map_points(MAP)

In [9]:
with open(possible_placements_path, 'r') as file:
  POSSIBLE_PLACEMENTS = json.loads(file.read())

def find_my_placement_points(possible_placements):
  """Returns a list of 12 of the best placement points."""
  possible_coverage = []
  for possible_placement in possible_placements:
    possible_placement_point = Point(possible_placement['x'],possible_placement['y'])
    possible_coverage.append([possible_placement_point,TOWERS['Druid']['Range'].find_tower_coverage('000', possible_placement_point,MAP_POINTS)])
  
  possible_coverage.sort(key=lambda x:x[1], reverse = True)
  my_placement_points = []
  for coverage in possible_coverage:
    covered = False
    if len(my_placement_points) == 12:
      break
    for placement_point in my_placement_points:
      if placement_point.distance_squared(coverage[0]) < 121 or placement_point.distance_squared(coverage[0]) > 10000:
        covered = True
    if not covered:
      print(coverage[1])
      my_placement_points.append(coverage[0])
  return my_placement_points

PLACEMENT_POINTS = find_my_placement_points(POSSIBLE_PLACEMENTS)

30
30
30
30
30
30
30
30
30
29
29
28



# Genetic Algorithm

In [None]:
# Code adapted from geneticalgorithm:
# https://github.com/rmsolgi/geneticalgorithm/blob/master/geneticalgorithm/geneticalgorithm.py

import sys
import time
from func_timeout import func_timeout, FunctionTimedOut
import matplotlib.pyplot as plt


class tower_geneticalogirthm():
    
    '''  Genetic Algorithm (Elitist version) for Python
    
    An implementation of elitist genetic algorithm for solving problems with
    continuous, integers, or mixed variables.
    
    
    
    Implementation and output:
        
        methods:
                run(): implements the genetic algorithm
                
        outputs:
                output_dict:  a dictionary including the best set of variables
            found and the value of the given function associated to it.
            {'variable': , 'function': }
            
                report: a list including the record of the progress of the
                algorithm over iterations
    '''

    def __init__(self, function, dimension, variable_type='bool', \
                 variable_boundaries=None,\
                 variable_type_mixed=None, \
                 function_timeout=10,\
                 algorithm_parameters={'max_num_iteration': None,\
                                       'population_size':100,\
                                       'mutation_probability':0.1,\
                                       'elit_ratio': 0.01,\
                                       'crossover_probability': 0.5,\
                                       'parents_portion': 0.3,\
                                       'crossover_type':'uniform',\
                                       'max_iteration_without_improv':None},\
                  convergence_curve=True,\
                  progress_bar=True, \
                  round_num = 5,\
                  tower_encodings = None,\
                  goal = [],\
                  ):

        self.__name__=tower_geneticalogirthm
      
        assert (callable(function)),"function must be callable"     
        
        self.f=function
        
        self.dim=int(dimension)

        self.round_num=round_num
        self.tower_encodings = tower_encodings
        self.goal = goal

        
        assert(variable_type=='bool' or variable_type=='int' or\
               variable_type=='real'), \
               "\n variable_type must be 'bool', 'int', or 'real'"

        if variable_type_mixed is None:
            
            if variable_type=='real': 
                self.var_type=np.array([['real']]*self.dim)
            else:
                self.var_type=np.array([['int']]*self.dim)            

 
        else:
            assert (type(variable_type_mixed).__module__=='numpy'),\
            "\n variable_type must be numpy array"  
            assert (len(variable_type_mixed) == self.dim), \
            "\n variable_type must have a length equal dimension."       

            for i in variable_type_mixed:
                assert (i=='real' or i=='int'),\
                "\n variable_type_mixed is either 'int' or 'real' "+\
                "ex:['int','real','real']"+\
                "\n for 'boolean' use 'int' and specify boundary as [0,1]"
                

            self.var_type=variable_type_mixed
            
        if variable_type!='bool' or type(variable_type_mixed).__module__=='numpy':
                       
            assert (type(variable_boundaries).__module__=='numpy'),\
            "\n variable_boundaries must be numpy array"
        
            assert (len(variable_boundaries)==self.dim),\
            "\n variable_boundaries must have a length equal dimension"        
        
        
            for i in variable_boundaries:
                assert (len(i) == 2), \
                "\n boundary for each variable must be a tuple of length two." 
                assert(i[0]<=i[1]),\
                "\n lower_boundaries must be smaller than upper_boundaries [lower,upper]"
            self.var_bound=variable_boundaries
        else:
            self.var_bound=np.array([[0,1]]*self.dim)
 

        self.funtimeout=float(function_timeout)

        if convergence_curve==True:
            self.convergence_curve=True
        else:
            self.convergence_curve=False

        if progress_bar==True:
            self.progress_bar=True
        else:
            self.progress_bar=False
        
        self.param=algorithm_parameters
        
        self.pop_s=int(self.param['population_size'])
        
        assert (self.param['parents_portion']<=1\
                and self.param['parents_portion']>=0),\
        "parents_portion must be in range [0,1]" 
        
        self.par_s=int(self.param['parents_portion']*self.pop_s)
        trl=self.pop_s-self.par_s
        if trl % 2 != 0:
            self.par_s+=1
               
        self.prob_mut=self.param['mutation_probability']
        
        assert (self.prob_mut<=1 and self.prob_mut>=0), \
        "mutation_probability must be in range [0,1]"
        
        
        self.prob_cross=self.param['crossover_probability']
        assert (self.prob_cross<=1 and self.prob_cross>=0), \
        "mutation_probability must be in range [0,1]"
        
        assert (self.param['elit_ratio']<=1 and self.param['elit_ratio']>=0),\
        "elit_ratio must be in range [0,1]"                
        
        trl=self.pop_s*self.param['elit_ratio']
        if trl<1 and self.param['elit_ratio']>0:
            self.num_elit=1
        else:
            self.num_elit=int(trl)
            
        assert(self.par_s>=self.num_elit), \
        "\n number of parents must be greater than number of elits"
        
        if self.param['max_num_iteration']==None:
            self.iterate=0
            for i in range (0,self.dim):
                if self.var_type[i]=='int':
                    self.iterate+=(self.var_bound[i][1]-self.var_bound[i][0])*self.dim*(100/self.pop_s)
                else:
                    self.iterate+=(self.var_bound[i][1]-self.var_bound[i][0])*50*(100/self.pop_s)
            self.iterate=int(self.iterate)
            if (self.iterate*self.pop_s)>10000000:
                self.iterate=10000000/self.pop_s
        else:
            self.iterate=int(self.param['max_num_iteration'])
        
        self.c_type=self.param['crossover_type']
        assert (self.c_type=='uniform' or self.c_type=='one_point' or\
                self.c_type=='two_point'),\
        "\n crossover_type must 'uniform', 'one_point', or 'two_point' Enter string" 
        
        
        self.stop_mniwi=False
        if self.param['max_iteration_without_improv']==None:
            self.mniwi=self.iterate+1
        else: 
            self.mniwi=int(self.param['max_iteration_without_improv'])

      
    def run(self):
        
        self.integers=np.where(self.var_type=='int')
        self.reals=np.where(self.var_type=='real')
        
        
        
        pop=np.array([np.zeros(self.dim+1)]*self.pop_s)
        solo=np.zeros(self.dim+1)
        var=np.zeros(self.dim)       
        
        for p in range(0,self.pop_s):
         
            for i in self.integers[0]:
                var[i]=np.random.randint(self.var_bound[i][0],\
                        self.var_bound[i][1]+1)  
                solo[i]=var[i].copy()
            for i in self.reals[0]:
                var[i]=self.var_bound[i][0]+np.random.random()*\
                (self.var_bound[i][1]-self.var_bound[i][0])    
                solo[i]=var[i].copy()


            obj=self.sim(var)            
            solo[self.dim]=obj
            pop[p]=solo.copy()

        # Report
        self.report=[]
        self.test_obj=obj
        self.best_variable=var.copy()
        self.best_function=obj
                        
        t=1
        counter=0
        while t<=self.iterate:
            
            if self.progress_bar==True:
                self.progress(t,self.iterate,status="GA is running...")

            pop = pop[pop[:,self.dim].argsort()]

                
            
            if pop[0,self.dim]<self.best_function:
                counter=0
                self.best_function=pop[0,self.dim].copy()
                self.best_variable=pop[0,: self.dim].copy()
            else:
                counter+=1

            self.report.append(pop[0,self.dim])
    
            
            normobj=np.zeros(self.pop_s)
            
            minobj=pop[0,self.dim]
            if minobj<0:
                normobj=pop[:,self.dim]+abs(minobj)
                
            else:
                normobj=pop[:,self.dim].copy()
    
            maxnorm=np.amax(normobj)
            normobj=maxnorm-normobj+1

            sum_normobj=np.sum(normobj)
            prob=np.zeros(self.pop_s)
            prob=normobj/sum_normobj
            cumprob=np.cumsum(prob)

            par=np.array([np.zeros(self.dim+1)]*self.par_s)
            
            for k in range(0,self.num_elit):
                par[k]=pop[k].copy()
            for k in range(self.num_elit,self.par_s):
                index=np.searchsorted(cumprob,np.random.random())
                par[k]=pop[index].copy()
                
            ef_par_list=np.array([False]*self.par_s)
            par_count=0
            while par_count==0:
                for k in range(0,self.par_s):
                    if np.random.random()<=self.prob_cross:
                        ef_par_list[k]=True
                        par_count+=1
                 
            ef_par=par[ef_par_list].copy()

            pop=np.array([np.zeros(self.dim+1)]*self.pop_s)
            
            for k in range(0,self.par_s):
                pop[k]=par[k].copy()
                
            for k in range(self.par_s, self.pop_s, 2):
                r1=np.random.randint(0,par_count)
                r2=np.random.randint(0,par_count)
                pvar1=ef_par[r1,: self.dim].copy()
                pvar2=ef_par[r2,: self.dim].copy()
                
                ch=self.cross(pvar1,pvar2,self.c_type)
                ch1=ch[0].copy()
                ch2=ch[1].copy()
                
                ch1=self.mut(ch1)
                ch2=self.mutmidle(ch2,pvar1,pvar2)               
                solo[: self.dim]=ch1.copy()                
                obj=self.sim(ch1)
                solo[self.dim]=obj
                pop[k]=solo.copy()                
                solo[: self.dim]=ch2.copy()                
                obj=self.sim(ch2)               
                solo[self.dim]=obj
                pop[k+1]=solo.copy()

            t+=1
            if counter > self.mniwi:
                pop = pop[pop[:,self.dim].argsort()]
                if pop[0,self.dim]>=self.best_function:
                    t=self.iterate
                    if self.progress_bar==True:
                        self.progress(t,self.iterate,status="GA is running...")
                    time.sleep(2)
                    t+=1
                    self.stop_mniwi=True

        pop = pop[pop[:,self.dim].argsort()]
        
        if pop[0,self.dim]<self.best_function:
                
            self.best_function=pop[0,self.dim].copy()
            self.best_variable=pop[0,: self.dim].copy()

        self.report.append(pop[0,self.dim])
        
        self.output_dict={'variable': self.best_variable, 'function':\
                          self.best_function}
        if self.progress_bar==True:
            show=' '*100
            sys.stdout.write('\r%s' % (show))
        #sys.stdout.write('\r The best solution found:\n %s' % (self.best_variable))
        #sys.stdout.write('\n\n Objective function:\n %s\n' % (self.best_function))
        sys.stdout.flush() 
        re=np.array(self.report)
        if self.convergence_curve==True:
            plt.plot(re)
            plt.xlabel('Iteration')
            plt.ylabel('Objective function')
            plt.title('Genetic Algorithm')
            plt.show()
        
        if self.stop_mniwi==True:
            sys.stdout.write('\nWarning: GA is terminated due to the'+\
                             ' maximum number of iterations without improvement was met!')
      
    def cross(self,x,y,c_type):
         
        ofs1=x.copy()
        ofs2=y.copy()
        

        if c_type=='one_point':
            ran=np.random.randint(0,self.dim)
            for i in range(0,ran):
                ofs1[i]=y[i].copy()
                ofs2[i]=x[i].copy()
  
        if c_type=='two_point':
                
            ran1=np.random.randint(0,self.dim)
            ran2=np.random.randint(ran1,self.dim)
                
            for i in range(ran1,ran2):
                ofs1[i]=y[i].copy()
                ofs2[i]=x[i].copy()
            
        if c_type=='uniform':
                
            for i in range(0, self.dim):
                ran=np.random.random()
                if ran <0.5:
                    ofs1[i]=y[i].copy()
                    ofs2[i]=x[i].copy() 
                   
        return np.array([ofs1,ofs2])
    
    def mut(self,x):
        
        for i in self.integers[0]:
            ran=np.random.random()
            if ran < self.prob_mut:
                
                x[i]=np.random.randint(self.var_bound[i][0],\
                 self.var_bound[i][1]+1) 
                    
        

        for i in self.reals[0]:                
            ran=np.random.random()
            if ran < self.prob_mut:   

               x[i]=self.var_bound[i][0]+np.random.random()*\
                (self.var_bound[i][1]-self.var_bound[i][0])    
            
        return x

    def mutmidle(self, x, p1, p2):
        for i in self.integers[0]:
            ran=np.random.random()
            if ran < self.prob_mut:
                if p1[i]<p2[i]:
                    x[i]=np.random.randint(p1[i],p2[i])
                elif p1[i]>p2[i]:
                    x[i]=np.random.randint(p2[i],p1[i])
                else:
                    x[i]=np.random.randint(self.var_bound[i][0],\
                 self.var_bound[i][1]+1)
                        
        for i in self.reals[0]:                
            ran=np.random.random()
            if ran < self.prob_mut:   
                if p1[i]<p2[i]:
                    x[i]=p1[i]+np.random.random()*(p2[i]-p1[i])  
                elif p1[i]>p2[i]:
                    x[i]=p2[i]+np.random.random()*(p1[i]-p2[i])
                else:
                    x[i]=self.var_bound[i][0]+np.random.random()*\
                (self.var_bound[i][1]-self.var_bound[i][0]) 
        return x

    def evaluate(self):
        return self.f(self.temp, self.round_num, self.tower_encodings, self.goal)

    def sim(self,X):
        self.temp=X.copy()
        obj=None
        try:
            obj=func_timeout(self.funtimeout,self.evaluate)
        except FunctionTimedOut:
            print("given function is not applicable")
        assert (obj!=None), "After "+str(self.funtimeout)+" seconds delay "+\
                "func_timeout: the given function does not provide any output"
        return obj

    def progress(self, count, total, status=''):
        bar_len = 50
        filled_len = int(round(bar_len * count / float(total)))

        percents = round(100.0 * count / float(total), 1)
        bar = '|' * filled_len + '_' * (bar_len - filled_len)

        sys.stdout.write('\r%s %s%s %s' % (bar, percents, '%', status))
        sys.stdout.flush()     

In [10]:
class Tower:
  """ Class that represents a specific tower.
  Args:
    name: string
    cost: integer
    upgrade_path: list of 3 ints
    encoding: list of 12 list of 81 bools, represents a tower's encoding in the 12 possible placements
  """
  def __init__(self, name, cost, upgrade_path, encoding):
    assert len(upgrade_path) == 3 and len(encoding) == 80
    self.name=name
    self.cost = cost
    self.upgrade_path = upgrade_path
    self.encoding = []

    for placement_point in PLACEMENT_POINTS:
      upgrade_path = ''.join([str(upgrade) for upgrade in self.upgrade_path])
      self.encoding.append(encoding + [TOWERS[self.name]['Range'].find_tower_coverage(upgrade_path, placement_point, MAP_POINTS)/100])

    if str(5) in upgrade_path:
      self.tier_5 = name+str(upgrade_path.index('5'))
    else:
      self.tier_5 = False

  def can_upgrade_to(self, other):
    """ Sees if other tower can upgrade to self."""
    if self.upgrade_path[0] >= other.upgrade_path[0] and  self.upgrade_path[1] >= other.upgrade_path[1] and  self.upgrade_path[2] >= other.upgrade_path[2]:
      return True
    else:
      return False


In [11]:
def create_tower_encoding():
  """ Returns a list that contains all
  combinations of the towers.
  """
  tower_list=[]
  for tower_idx, tower_name in enumerate(TOWER_NAMES):
    if tower_name != "Alchemist":
      for upgrade in UPGRADE_PATHS:
        upgrade_multi_hot=[] 
        for upgrade_path in upgrade:                                                                 
          upgrade_multi_hot+=[1]*int(upgrade_path)+[0]*(5-int(upgrade_path))

        tower_cost = TOWERS[tower_name]['Cost'].find_tower_cost(upgrade)
        tower_upgrade_path = [int(upgrade[0]),int(upgrade[1]),int(upgrade[2])]
        tower_encoding = [0]*(tower_idx*16)+ [1] + upgrade_multi_hot + [0]*((4-tower_idx)*16)
        tower_list.append(Tower(tower_name, tower_cost, tower_upgrade_path,tower_encoding))
  return tower_list

TOWER_ENCODING = create_tower_encoding()

In [12]:
def create_tower_encodings_restricted(money, previous_towers):
  """ Returns a list of 12 tower_lists, each list
  containing the combinations of the towers
  given the restrictions (money, previous towers).
  """
  tower_encodings_list = [-1]*12
  previous_tower_cost = 0
  for idx, previous_tower in enumerate(previous_towers):
    if hasattr(previous_tower, 'name'):
      previous_tower_cost+=previous_tower.cost
      tower_list = []
      for tower in TOWER_ENCODING:
        if previous_tower.name == tower.name and tower.cost <= money and tower.can_upgrade_to(previous_tower):
          tower_list.append(tower)
      tower_encodings_list[idx] = tower_list
  
  for idx, previous_tower in enumerate(previous_towers):
    if not hasattr(previous_tower, 'name'):
      tower_list = []
      for tower in TOWER_ENCODING:
        if tower.cost <= money-previous_tower_cost:
          tower_list.append(tower)
      tower_encodings_list[idx] = tower_list
    
  return tower_encodings_list

In [None]:
def find_tower_similiarity(tower_comp, tower_encodings, goal):
  bonus = 0
  found_towers = []
  for tower_idx, tower in enumerate(tower_comp):
    if tower >= 0:
      tower_encoding = tower_encodings[tower_idx][int(tower)]
      for goal_idx, goal_tower in enumerate(goal):
        if hasattr(goal_tower, 'name'):
          if goal_tower.name == tower_encoding.name and goal_tower.encoding[goal_idx] not in found_towers:
            temp = np.add(np.array(goal_tower.encoding[goal_idx]),np.array(tower_encoding.encoding[tower_idx]))
            bonus += 10*((np.count_nonzero(temp == 2)+np.count_nonzero(temp==0))-80)
            found_towers.append(goal_tower.encoding[goal_idx])
          else:
            bonus-=20
  return bonus

In [None]:
def predict(otherInputs, towerInputs):
  result = prediction_model.run(None, {'Other': np.reshape(otherInputs,(1,46)), 'Towers':np.reshape(towerInputs,(1,12,81))})
  return result[0]

In [None]:
def fitness_function(tower_comp, round_num, tower_encodings, goal):
  win_odds = 0
  cost = 0
  tower_count = 0
  tower_list = []
  tower_tier_five_list=[]


  for tower_idx, tower in enumerate(tower_comp):
    if tower < 0:
      tower_list+=[0]*81
    else:
      tower_encoding = tower_encodings[tower_idx][int(tower)]
      if tower_encoding.tier_5:
        tower_tier_five_list.append(tower_encoding.tier_5)
      tower_count+=1
      cost+= tower_encoding.cost
      tower_list+=tower_encoding.encoding[tower_idx]
      

  if cost > INCOME_PER_ROUND[round_num]:
    return cost-INCOME_PER_ROUND[round_num]+1000+tower_count/12*8
    
  if len(tower_tier_five_list) != len(set(tower_tier_five_list)):
    return 1000+0.4+3.5

  win_odds = predict(np.array(BLOONS_PER_ROUND[round_num]).astype(np.float32),np.array(tower_list).astype(np.float32))
  if win_odds > .7:
      loss = -800 + tower_count/12*400 - len(tower_tier_five_list)*3000
      loss-= find_tower_similiarity(tower_comp, tower_encodings, goal)
      loss-=(INCOME_PER_ROUND[round_num]-cost)/30
  else:
    loss = -win_odds*3

  return loss


def generate_tower_combination(current_round, previous_towers, algorithm_param, goal):
  tower_encodings_restricted = create_tower_encodings_restricted(INCOME_PER_ROUND[current_round], previous_towers)
  varbound = [-1]*12
  for idx, previous_tower in enumerate(previous_towers):
    if hasattr(previous_tower, 'name'):
      varbound[idx] = [0,len(tower_encodings_restricted[idx])-1]
    else:
      varbound[idx] = [-300,len(tower_encodings_restricted[idx])-1]
  model=tower_geneticalogirthm(function=fitness_function,dimension=12, 
           variable_type='int',variable_boundaries=np.array(varbound), 
           algorithm_parameters = algorithm_param, 
           convergence_curve = False, 
           progress_bar = False,
           round_num = current_round,
           tower_encodings = tower_encodings_restricted,
           goal = goal)
  
  model.run()
  cost = 0
  tower_encoding_list = []
  towers_list = [0]*12
  tower_names = []
  for tower_idx, tower in enumerate(model.best_variable):
    if tower >= 0:
      tower_encoding = tower_encodings_restricted[tower_idx][int(tower)]
      towers_list[tower_idx] = tower_encoding
      tower_encoding_list.append(tower_encoding.encoding[tower_idx])
      tower_names.append(f"{tower_encoding.name}-{tower_encoding.upgrade_path}- Coverage: {tower_encoding.encoding[tower_idx][-1]} X: {POSSIBLE_PLACEMENTS[tower_idx]['x']:.2f} Y:{POSSIBLE_PLACEMENTS[tower_idx]['y']:.2f}")
      cost+=tower_encoding.cost
    else:
      tower_encoding_list.append([0]*81)
      towers_list[tower_idx] = -1


  win_odds = predict(np.array( BLOONS_PER_ROUND[current_round]).astype(np.float32),np.array(tower_encoding_list).astype(np.float32))


  if win_odds > 0.7:
    current_round_temp = current_round
    print(f'Round: {current_round_temp} Win Odds: {win_odds[0][0]*100:.2f}%')
    print("Towers: " +','.join(tower_names))
    print(f'Cost: ${cost}\n')
    while win_odds > 0.7:
      current_round_temp+=1 # See how long this tower combination can last
      win_odds = predict(np.array(BLOONS_PER_ROUND[current_round_temp]).astype(np.float32),np.array(tower_encoding_list).astype(np.float32))
      if win_odds>0.7:
        print(f'Round: {current_round_temp} Win Odds: {win_odds[0][0]*100:.2f}%')
        print("Towers: " +','.join(tower_names))
        print(f'Cost: ${cost}\n')
    return towers_list, current_round_temp-1
  else:
    print(f"Win Odds: {win_odds} Retrying...\n")
    return [], 0

In [None]:
TOWER_ORDER = ["WizardMonkey","SuperMonkey","NinjaMonkey","Alchemist","Druid"]

In [None]:
algorithm_param = {'max_num_iteration': 100,\
                  'population_size':100,\
                  'mutation_probability':0.1,\
                  'elit_ratio': 0,\
                  'crossover_probability': 0.5,\
                  'parents_portion': 0.3,\
                  'crossover_type':'uniform',\
                  'max_iteration_without_improv':None}
goal, round_it_lasts, json_names = generate_tower_combination(62, [-1]*12, algorithm_param, goal = [])

Round: 62 Win Odds: 77.01%
Towers: NinjaMonkey-[3, 0, 2]- Coverage: 1.01 X: 13.44 Y:-96.85,NinjaMonkey-[0, 0, 3]- Coverage: 0.97 X: -54.66 Y:-4.52,Druid-[2, 1, 0]- Coverage: 0.91 X: 93.60 Y:-4.60,WizardMonkey-[0, 2, 5]- Coverage: 1.03 X: -87.28 Y:105.88
Cost: $49170



ValueError: ignored

In [None]:
i = 5
max_num_iteration = 100
population_size = 50
previous_towers = [-1]*12

while i < 100:
  algorithm_param = {'max_num_iteration': max_num_iteration,\
                   'population_size':population_size,\
                   'mutation_probability':0.2,\
                   'elit_ratio': 0.01,\
                   'crossover_probability': 0.5,\
                   'parents_portion': 0.3,\
                   'crossover_type':'uniform',\
                   'max_iteration_without_improv':None}
  
  tower_combo, round_it_lasts = generate_tower_combination(i , previous_towers, algorithm_param, goal = goal)
  
  if len(tower_combo) == 0:
    max_num_iteration += 200
    population_size+=200
  else:
    max_num_iteration = 100 +i*4
    population_size= 50 +i*2

    previous_towers = tower_combo
    i=round_it_lasts+1

Round: 5 Win Odds: 75.01%
Towers: NinjaMonkey-[0, 0, 0]- Coverage: 1.79 X: -136.44 Y:-84.81
Cost: $600

Win Odds: [[0.6681594]] Retrying...

Win Odds: [[0.6681594]] Retrying...

Win Odds: [[0.6681594]] Retrying...

Win Odds: [[0.6681594]] Retrying...

Win Odds: [[0.6681594]] Retrying...

Win Odds: [[0.6681594]] Retrying...



KeyboardInterrupt: ignored

In [None]:
print(f"{POSSIBLE_PLACEMENTS[1]['x']:.2f}")

-74.18


# Actor and Critic



In [22]:
import tensorflow as tf
from keras.models import Model
from tensorflow import keras
from keras import layers
from keras import activations
import collections
import statistics
from tqdm import tnrange
tf.random.set_seed(1)

In [759]:
class BTD6Environment:
  def __init__(self, tower_encodings, round_num):
    self.towers = tower_encodings
    self.round = round_num
    self.state = []
    self.action_counter = 0

  def step(self, action):
    if action == 0:
      self.round+=1
      input_state = self.get_multi_hot_state()
      if self.round > 98:
        return input_state, self.get_mask(), 0, True
    
      padded_state = self.get_padded_state()
      prediction = self.predict(np.array(BLOONS_PER_ROUND[self.round-1]).astype(np.float32), padded_state)
      self.action_counter = 0
      if prediction > 0.7:
        comp_cost = self.get_comp_cost()
        reward = (INCOME_PER_ROUND[self.round-1]/comp_cost*3-self.get_num_tower()/4+self.count_tier_5()*20)
        if self.round<40:
          reward+=self.predict(np.array(BLOONS_PER_ROUND[39]).astype(np.float32), padded_state)
          reward+=self.predict(np.array(BLOONS_PER_ROUND[62]).astype(np.float32), padded_state)/2
          reward+=self.predict(np.array(BLOONS_PER_ROUND[97]).astype(np.float32), padded_state)/4
        elif self.round<62:
          reward+=self.predict(np.array(BLOONS_PER_ROUND[62]).astype(np.float32), padded_state)
          reward+=self.predict(np.array(BLOONS_PER_ROUND[97]).astype(np.float32), padded_state)/2
        elif self.round<97:
          reward+=self.predict(np.array(BLOONS_PER_ROUND[97]).astype(np.float32), padded_state)

        #if self.round == 24:
          #reward+=1
        #if self.round == 40:
          #reward+=1
       # if self.round== 63:
          #reward+=1
        #if self.round == 97:
          #reward+=1
      else:
        reward = -3 + prediction
        return input_state, self.get_mask(), reward, False

      return input_state, self.get_mask(), reward/3, False
    else:
      self.state[self.action_counter] = self.towers[action-1]
      self.action_counter +=1

      return self.get_multi_hot_state(), self.get_mask(), 0, False

  def predict(self, otherInputs, towerInputs):
    result = prediction_model.run(None, {'Other': np.reshape(otherInputs,(1,46)), 'Towers':np.reshape(towerInputs,(1,12,81))})
    return result[0][0][0]

  def reset(self):
    self.action_counter = 0
    self.round = 5
    self.state = [-1]*12
    return self.get_multi_hot_state(), self.get_mask()

  def get_padded_state(self):
    padded_inputs = []
    for tower in self.state:
      if tower != -1:
        padded_inputs.extend(tower.encoding[0])
      else:
        padded_inputs.extend([0]*81)
    return np.array(padded_inputs).astype(np.float32)

  def get_multi_hot_state(self):
    #prediction = self.predict(np.array(BLOONS_PER_ROUND[self.round]).astype(np.float32), self.get_padded_state())
    upgrading = np.zeros((len(TOWER_ENCODING)))
    if self.action_counter < len(self.state):
      previous_tower = self.state[self.action_counter]
      if previous_tower !=-1:
        upgrading[TOWER_ENCODING.index(previous_tower)] = 1
 
    temp = np.concatenate([np.array([0]*(len(TOWER_ENCODING)+1)+BLOONS_PER_ROUND[self.round]), upgrading])
    for tower in self.state:
      if tower != -1:
        temp[TOWER_ENCODING.index(tower)] +=1
    return temp

  def get_mask(self):
    tier_5s = []
    cash = INCOME_PER_ROUND[self.round]
    for tower in self.state:
      if tower != -1:
        if tower.tier_5:
          tier_5s.append(tower.tier_5)
        cash-=tower.cost
    mask = np.empty(len(TOWER_ENCODING)+1)
    if self.action_counter < len(self.state):
      previous_tower = self.state[self.action_counter]
      if previous_tower !=-1:
        mask[0] = 0
        #print(f'{previous_tower.name}-{previous_tower.upgrade_path}')
        for i,tower in enumerate(TOWER_ENCODING):
          if tower.name != previous_tower.name or not tower.can_upgrade_to(previous_tower) or tower.cost - previous_tower.cost > cash or tower.tier_5 in tier_5s:
            mask[i+1] = 0
          else:
            #print(f'{tower.name}-{tower.upgrade_path}')
            mask[i+1]=1
        return mask

    mask[0] = 0
    prediction = self.predict(np.array(BLOONS_PER_ROUND[self.round]).astype(np.float32), self.get_padded_state())
    if prediction > 0.7:
      mask = np.zeros(len(TOWER_ENCODING)+1)
      mask[0] = 1
      return mask

    for i,tower in enumerate(TOWER_ENCODING):
      if self.action_counter > 11 or tower.cost > cash or tower.tier_5 in tier_5s:
        mask[i+1] = 0
      else:
        mask[i+1]=1
    if 1 not in mask:
      mask[0] = 1
    return mask

  def get_comp_cost(self):
    cash = 0
    for tower in self.state:
      if tower != -1:
        cash+=tower.cost
    return cash

  def count_tier_5(self):
    num_tier5= 0
    for tower in self.state:
      if tower != -1:
        if tower.tier_5:
          num_tier5+=1
    return num_tier5

  def get_num_tower(self):
    num_tower = 0
    for tower in self.state:
      if tower != -1:
        num_tower+=1
    return num_tower

In [760]:
# Make environment 
env = BTD6Environment(TOWER_ENCODING, 5)

In [761]:
lol, mask = env.reset()

In [762]:
eps = np.finfo(np.float32).eps.item()

In [763]:
class ActorCritic(tf.keras.Model):
  def __init__(
      self, 
      num_actions: int, 
      num_hidden_units: int):
    super().__init__()

    self.common = layers.Dense(num_hidden_units, activation="relu")
    self.actor = layers.Dense(num_actions)
    self.dense1 = layers.Dense(num_hidden_units)
    self.dense2 = layers.Dense(num_hidden_units)
    self.critic = layers.Dense(1)

  def call(self, inputs: tf.Tensor, mask):
    x = self.common(inputs)
    x = self.dense1(self.dense2(x))
    action_values = layers.Softmax()(self.actor(x),mask)
    return action_values, self.critic(x)

In [771]:
num_actions = len(env.towers)+1
num_hidden_units = 512
model = ActorCritic(num_actions, num_hidden_units)

In [772]:
def env_step(action: np.ndarray):
  state, mask, reward, done = env.step(action)
  return (state.astype(np.float32), 
          np.array(mask, bool),
          np.array(reward, np.float32), 
          np.array(done, np.int32))

def tf_env_step(action: tf.Tensor):
  return tf.numpy_function(env_step, [action], 
                           [tf.float32, tf.bool, tf.float32, tf.int32])

In [773]:
def run_episode(
    initial_state: tf.Tensor,
    mask: tf.Tensor,  
    model: tf.keras.Model, 
    max_steps: int):
  action_probs = tf.TensorArray(dtype=tf.float32, size=0, dynamic_size=True)
  values = tf.TensorArray(dtype=tf.float32, size=0, dynamic_size=True)
  rewards = tf.TensorArray(dtype=tf.float32, size=0, dynamic_size=True)

  initial_state_shape = initial_state.shape
  mask_shape = mask.shape

  state = initial_state

  for t in tf.range(max_steps):
    state = tf.expand_dims(state, 0)
    action_values, value = model(state, mask)
    #action_values+=tf.random.uniform(action_values.shape, minval=-0.1, maxval=0.1)
    #action_values = tf.multiply(action_values, tf.cast(mask,tf.float32))
    action = tf.random.categorical(tf.math.log(action_values), 1)[0,0]

    action_probs = action_probs.write(t, action_values[0,action])
    values = values.write(t, tf.squeeze(value))

    state, mask, reward, done = tf_env_step(action)
    state.set_shape(initial_state_shape)
    mask.set_shape(mask_shape)

    rewards = rewards.write(t, reward)
    if tf.cast(done, tf.bool):
      break

  action_probs = action_probs.stack()
  rewards = rewards.stack()
  values = values.stack()

  #print(rewards)
  return action_probs, values, rewards

In [774]:
def get_expected_return(
    rewards: tf.Tensor, 
    gamma: float, 
    standardize: bool = False) -> tf.Tensor:
  """Compute expected returns per timestep."""

  n = tf.shape(rewards)[0]
  returns = tf.TensorArray(dtype=tf.float32, size=n)

  # Start from the end of `rewards` and accumulate reward sums
  # into the `returns` array
  rewards = tf.cast(rewards[::-1], dtype=tf.float32)
  discounted_sum = tf.constant(0.0)
  discounted_sum_shape = discounted_sum.shape
  for i in tf.range(n):
    reward = rewards[i]
    discounted_sum = reward + gamma * discounted_sum
    discounted_sum.set_shape(discounted_sum_shape)
    returns = returns.write(i, discounted_sum)
  returns = returns.stack()[::-1]

  if standardize:
    returns = ((returns - tf.math.reduce_mean(returns)) / 
               (tf.math.reduce_std(returns) + eps))

  return returns

In [775]:
huber_loss = tf.keras.losses.Huber(reduction=tf.keras.losses.Reduction.SUM)

def compute_loss(
    action_probs: tf.Tensor,  
    values: tf.Tensor,  
    returns: tf.Tensor) -> tf.Tensor:
  """Computes the combined Actor-Critic loss."""

  advantage = returns - values

  action_log_probs = tf.math.log(action_probs)
  actor_loss = -tf.math.reduce_sum(action_log_probs * advantage)

  critic_loss = huber_loss(values, returns)

  return actor_loss + critic_loss

In [776]:
optimizer = tf.keras.optimizers.Adam(learning_rate=1e-3)

@tf.function
def train_step(
    initial_state: tf.Tensor, 
    mask: tf.Tensor,
    model: tf.keras.Model, 
    optimizer: tf.keras.optimizers.Optimizer, 
    gamma: float, 
    max_steps_per_episode: int) -> tf.Tensor:
  """Runs a model training step."""

  with tf.GradientTape() as tape:

    # Run the model for one episode to collect training data
    action_probs, values, rewards = run_episode(
        initial_state, mask, model, max_steps_per_episode) 

    # Calculate the expected returns
    returns = get_expected_return(rewards, gamma)

    # Convert training data to appropriate TF tensor shapes
    action_probs, values, returns = [
        tf.expand_dims(x, 1) for x in [action_probs, values, returns]] 

    # Calculate the loss values to update our network
    loss = compute_loss(action_probs, values, returns)

  # Compute the gradients from the loss
  grads = tape.gradient(loss, model.trainable_variables)

  # Apply the gradients to the model's parameters
  optimizer.apply_gradients(zip(grads, model.trainable_variables))

  episode_reward = tf.math.reduce_sum(rewards)
  return episode_reward

In [None]:
%%time

min_episodes_criterion = 100
max_episodes = 1000000
max_steps_per_episode = 100000


reward_threshold = 10000000
running_reward = 0
gamma = 0.999
episodes_reward: collections.deque = collections.deque(maxlen=min_episodes_criterion)

t = trange(max_episodes)
for i in t:
    initial_state, mask  = env.reset()
    initial_state = tf.constant(initial_state, dtype=tf.float32)
    mask = tf.constant(mask, dtype=tf.bool)
    episode_reward = float(train_step(
        initial_state,mask, model, optimizer, gamma, max_steps_per_episode))

    episodes_reward.append(episode_reward)
    running_reward = statistics.mean(episodes_reward)

    t.set_postfix(
        episode_reward=episode_reward, running_reward=running_reward)

    if running_reward > reward_threshold and i >= min_episodes_criterion:  
        break
    if i % 10000 == 999:
      model.save('/content/drive/MyDrive/BTD6AI2')

print(f'\nSolved at episode {i}: average reward: {running_reward:.8f}!')

  0%|          | 3475/1000000 [2:00:02<572:36:03,  2.07s/it, episode_reward=1.45e+3, running_reward=1.45e+3]

In [758]:
state, mask = env.reset()
round_num = 5
for t in tf.range(50000):
  state = tf.expand_dims(state, 0)
  action_values, values = model(state, mask)
  #print(env.round)
  #action_values+=tf.random.uniform(action_values.shape, minval=-0.05, maxval=0.05)
  #print(values)
  action_values = tf.multiply(action_values, tf.cast(mask,tf.float32))
  #action = tf.math.argmax(action_values[0])
  action = tf.random.categorical(tf.math.log(action_values), 1)[0,0]
  #print(action_values)
  print(action_values[0][action])
  state, mask, reward, done = tf_env_step(action)
  if reward != 0:
    print(float(reward))
    round_num+=1
    print(f'Round:{round_num}')
  if tf.cast(done, tf.bool):
    break
  
  tower = action-1
  if tower > 0:
    print(f"Tower:{TOWER_ENCODING[tower].name}-{TOWER_ENCODING[tower].upgrade_path}")

tf.Tensor(0.99906784, shape=(), dtype=float32)
Tower:Druid-[0, 0, 0]
tf.Tensor(1.0, shape=(), dtype=float32)
1.2709174156188965
Round:6
tf.Tensor(0.9999995, shape=(), dtype=float32)
Tower:Druid-[0, 0, 0]
tf.Tensor(1.0, shape=(), dtype=float32)
1.610500693321228
Round:7
tf.Tensor(0.99999857, shape=(), dtype=float32)
Tower:Druid-[0, 0, 0]
tf.Tensor(1.0, shape=(), dtype=float32)
1.9896674156188965
Round:8
tf.Tensor(0.9996631, shape=(), dtype=float32)
Tower:Druid-[0, 0, 0]
tf.Tensor(0.99970573, shape=(), dtype=float32)
Tower:Druid-[0, 0, 0]
tf.Tensor(1.0, shape=(), dtype=float32)
1.0782701969146729
Round:9
tf.Tensor(1.0, shape=(), dtype=float32)
Tower:Druid-[0, 0, 0]
tf.Tensor(1.0, shape=(), dtype=float32)
Tower:Druid-[0, 0, 0]
tf.Tensor(1.0, shape=(), dtype=float32)
1.2855619192123413
Round:10
tf.Tensor(0.99999976, shape=(), dtype=float32)
Tower:Druid-[0, 0, 0]
tf.Tensor(0.99999976, shape=(), dtype=float32)
Tower:Druid-[0, 0, 0]
tf.Tensor(1.0, shape=(), dtype=float32)
1.6126452684402466
R

In [None]:
test=  []
for i in range(8):
  test.extend(TOWER_ENCODING[129].encoding[0])
test.extend([0]*(81*(12-8)))

def predict(otherInputs, towerInputs):
    result = prediction_model.run(None, {'Other': np.reshape(otherInputs,(1,46)), 'Towers':np.reshape(towerInputs,(1,12,81))})
    return result[0][0][0]
predict(np.array(BLOONS_PER_ROUND[23]).astype(np.float32), np.array(test).astype(np.float32))


0.45778066

In [None]:
TOWER_ENCODING[129].upgrade_path

[0, 0, 1]

In [None]:
tf.math.argmax(action_values[0])

<tf.Tensor: shape=(), dtype=int64, numpy=95>

In [None]:
TOWER_ENCODING[94].cost

6720

In [None]:
INCOME_PER_ROUND[39]

16250

In [None]:
def fitness_function(tower_comp, round_num, tower_encodings):
  comp = []
  round_num = round_num[0][0].numpy()
  for tower in generated_comp:
    comp.append(tf.math.argmax(tower))
  
  win_odds = 0
  cost = 0
  tower_count = 0
  tower_list = []
  tower_tier_five_list=[]


  for tower_idx, tower in enumerate(comp):
    if tower < 0:
      tower_list+=[0]*81
    else:
      tower_encoding = tower_encodings[int(tower)]
      if tower_encoding.tier_5:
        tower_tier_five_list.append(tower_encoding.tier_5)
      tower_count+=1
      cost+= tower_encoding.cost
      tower_list+=tower_encoding.encoding[tower_idx]
      

  if cost > INCOME_PER_ROUND[round_num]:
    loss = (cost/INCOME_PER_ROUND[round_num]) + 1
    return tf.constant([loss], dtype=tf.float32)
    
  if len(tower_tier_five_list) != len(set(tower_tier_five_list)):
    return tf.constant([1.], dtype=tf.float32)
  win_odds = predict(np.array(BLOONS_PER_ROUND[round_num]).astype(np.float32),np.array(tower_list).astype(np.float32))
  if win_odds > .7:
      loss = -tower_count/12 - len(tower_tier_five_list)
      loss-=cost/INCOME_PER_ROUND[round_num]*1.5
  else:
    loss = 1-win_odds
    
  return tf.constant([loss], dtype=tf.float32)

In [None]:
tower_encoding_np = np.array([tower.encoding for tower in TOWER_ENCODING])
population_size = 300000
current_round = 97
previous_towers = [-1]*12

In [None]:
tower_encodings_restricted = create_tower_encodings_restricted(INCOME_PER_ROUND[current_round], previous_towers)

In [None]:
varbound = [-1]*12
for idx, previous_tower in enumerate(previous_towers):
  if hasattr(previous_tower, 'name'):
    varbound[idx] = [0,len(tower_encodings_restricted[idx])-1]
  else:
    varbound[idx] = [-300,len(tower_encodings_restricted[idx])-1]

In [None]:
range1 = np.array([i[0] for i in varbound])
range2 = np.array([i[1] for i in varbound])
select = np.random.randint(range1,range2-1, size = (population_size,12))
select

array([[  16,  -28, -202, ..., -170, -252,  188],
       [-169, -112,  196, ...,  -68, -191,  179],
       [-117,   82, -221, ...,  194,   42, -170],
       ...,
       [  40,  189,  -20, ..., -281, -201,  179],
       [-154,  190,  224, ...,  113,  196,   58],
       [  84,  -55,  147, ...,  -57,  142,   -6]])

In [None]:
optimal_comp = None
for comp in tqdm(select):
  result = fitness_function(comp, current_round, tower_encodings_restricted, [])
  if test > result:
    test = result
    optimal_comp = comp

  0%|          | 0/300000 [00:00<?, ?it/s]

KeyboardInterrupt: ignored

In [None]:
tower_encoding_list = []
towers_list = [0]*12
tower_names = []
cost = 0
for tower_idx, tower in enumerate(optimal_comp):
  if tower >= 0:
    tower_encoding = tower_encodings_restricted[tower_idx][int(tower)]
    towers_list[tower_idx] = tower_encoding
    tower_encoding_list.append(tower_encoding.encoding[tower_idx])
    tower_names.append(f"{tower_encoding.name}-{tower_encoding.upgrade_path}- Coverage: {tower_encoding.encoding[tower_idx][-1]} X: {POSSIBLE_PLACEMENTS[tower_idx]['x']:.2f} Y:{POSSIBLE_PLACEMENTS[tower_idx]['y']:.2f}")
    cost+=tower_encoding.cost
  else:
    tower_encoding_list.append([0]*81)
    towers_list[tower_idx] = -1


win_odds = predict(np.array( BLOONS_PER_ROUND[current_round]).astype(np.float32),np.array(tower_encoding_list).astype(np.float32))


if win_odds > 0.7:
  current_round_temp = current_round
  print(f'Round: {current_round_temp} Win Odds: {win_odds[0][0]*100:.2f}%')
  print("Towers: " +','.join(tower_names))
  print(f'Cost: ${cost}\n')
  while win_odds > 0.7:
    current_round_temp+=1 # See how long this tower combination can last
    win_odds = predict(np.array(BLOONS_PER_ROUND[current_round_temp]).astype(np.float32),np.array(tower_encoding_list).astype(np.float32))
    if win_odds>0.7:
      print(f'Round: {current_round_temp} Win Odds: {win_odds[0][0]*100:.2f}%')
      print("Towers: " +','.join(tower_names))
      print(f'Cost: ${cost}\n')
else:
  print(f"Win Odds: {win_odds} Retrying...\n")


TypeError: ignored