In [1]:
import random
import numpy as np
import collections
from game import Board, Game
from players.scripts.player_test import PlayerTest
from players.scripts.player_random import PlayerRandom
from players.player import Player
# from players.scripts.DSL import DSL
import sys
import uuid
import inspect
import copy
import anytree
from anytree import Node, RenderTree, PreOrderIter
from anytree.exporter import DotExporter
from enum import Enum
import functools
import tqdm
from tqdm.notebook import tqdm as tqdm_nb

In [2]:
def tournament(player_1, player_2, num_games, switch=False):
    p1_wins = 0
    p2_wins = 0
    
    for _ in range(num_games):
        game = Game(n_players=2, dice_number=4, dice_value=3, column_range=[2, 6],
                    offset=2, initial_height=1)

        is_over = False
        who_won = None

        number_of_moves = 0
        current_player = game.player_turn
        while not is_over:
            moves = game.available_moves()
            if game.is_player_busted(moves):
                if current_player == 1:
                    current_player = 2
                else:
                    current_player = 1
                continue
            else:
                if game.player_turn == 1:
                    chosen_play = player_1.get_action(game)
                else:
                    chosen_play = player_2.get_action(game)
                if chosen_play == 'n':
                    if current_player == 1:
                        current_player = 2
                    else:
                        current_player = 1
                game.play(chosen_play)
                number_of_moves += 1

            who_won, is_over = game.is_finished()

            if number_of_moves >= 200:
                is_over = True
                who_won = -1

        if who_won == 1:
            p1_wins += 1
        if who_won == 2:
            p2_wins += 1
    if switch:
        p2_wins_2, p1_wins_2 = tournament(player_2, player_1, num_games, switch=False)
        p1_wins += p1_wins_2
        p2_wins += p2_wins_2
    return p1_wins, p2_wins

In [3]:
def log_call(func):
    @functools.wraps(func)
    def _wrapper(*args):
        self, id_, *rest = args
        self.call_log.add(id_)
        return func(*rest)
    return _wrapper

class Script(Player):
    def __init__(self, tree):
        super().__init__()
        self.tree = tree
        self.call_log = set()
        
    def reset_call_log(self):
        self.call_log.clear()
        
    def get_action(self):
        raise NotImplementedError
        
class Lib(object):
    @log_call
    def is_doubles(action):
        if len(action) > 1 and action[0] == action[1]:
            return True
        else:
            return False

    @log_call
    def contains_number(action, small_num):
        if not isinstance(action, str):
            if small_num in action:
                return True
        return False

    @log_call
    def action_wills_column(state, action):
        copy_state = copy.deepcopy(state)
        copy_state.play(action)
        columns_won = copy_state.columns_won_current_round()
        columns_won_previously = state.columns_won_current_round()
        if len(columns_won) > 0 and columns_won != columns_won_previously:
            return True
        return False

    @log_call
    def number_positions_progressed_this_round(state, column_num):
        return state.number_positions_conquered_this_round(column_num)

    @log_call
    def number_positions_conquered(state, column_num):
        return state.number_positions_conquered(column_num)

    @log_call
    def has_won_column(state, action):
        return len(state.columns_won_current_round()) > 0

    @log_call
    def is_stop_action(action):
        if isinstance(action, str) and action == 'n':
            return True
        return False

In [4]:
default_enums = [
    "START",
    "BLOCK",
    "IF_BLOCK",
    "IF_BODY",
    "BOOL_EXP",
    "AND_EXP",
    "NOT_EXP",
    "BOOL",
    "RETURN",
    "FUNC_CALL",
    "COLUMN_NUM",
    "SMALL_NUM",
]
    
lib_functions = inspect.getmembers(Lib, inspect.isfunction)

In [5]:
lib_func_names = [
    name.upper()
    for name, _ in lib_functions
]

Rule = Enum('Rule', default_enums + lib_func_names)

print(list(Rule))

[<Rule.START: 1>, <Rule.BLOCK: 2>, <Rule.IF_BLOCK: 3>, <Rule.IF_BODY: 4>, <Rule.BOOL_EXP: 5>, <Rule.AND_EXP: 6>, <Rule.NOT_EXP: 7>, <Rule.BOOL: 8>, <Rule.RETURN: 9>, <Rule.FUNC_CALL: 10>, <Rule.COLUMN_NUM: 11>, <Rule.SMALL_NUM: 12>, <Rule.ACTION_WILLS_COLUMN: 13>, <Rule.CONTAINS_NUMBER: 14>, <Rule.HAS_WON_COLUMN: 15>, <Rule.IS_DOUBLES: 16>, <Rule.IS_STOP_ACTION: 17>, <Rule.NUMBER_POSITIONS_CONQUERED: 18>, <Rule.NUMBER_POSITIONS_PROGRESSED_THIS_ROUND: 19>]


In [6]:
def get_func(name):
    return dict(lib_functions)[name.lower()]

In [7]:
def get_params(f):
    return list(inspect.signature(f).parameters)

In [8]:
def make_dynamic_rule(name):
    func = get_func(name)
    params = get_params(func)
    def _convert(param):
        if param == 'state':
            return 'state'
        elif param == 'action':
            return 'a'
        elif param == 'column_num':
            return Rule.COLUMN_NUM
        elif param == 'small_num':
            return Rule.SMALL_NUM
        elif param == 'self':
            return 'self'
        else:
            raise ValueError
    return ['Lib.' + name.lower(), *list(map(_convert, params))]

In [9]:
class Sampler(object):
    pass

class Diminishing(Sampler):
    def __init__(self, gamma, rule):
        self.gamma = gamma
        self.rule = rule
        
    def sample(self):
        ret = []
        curr = 1
        while random.random() <= curr:
            ret.append(self.rule)
            curr *= self.gamma
        return ret
    
class Weighted(Sampler):
    def __init__(self, dict_):
        self.dict = dict_
    
    def sample(self):
        weight_sum = sum(self.dict.keys())
        normal_quoefficient = 1 / weight_sum
        rand = random.random()
        for weight, rule in self.dict.items():
            prob = weight * normal_quoefficient
            if rand <= prob:
                if not isinstance(rule, list):
                    rule = [rule]
                return rule
            else:
                rand -= prob
        raise ValueError

In [10]:
grammar = {
    Rule.START: (
        Rule.BLOCK,
    ),
    Rule.BLOCK: (
        Diminishing(0.3, Rule.IF_BLOCK),
    ),
    Rule.IF_BLOCK: (
        [Rule.BOOL_EXP, Rule.IF_BODY],
    ),
    Rule.IF_BODY: [
        Weighted({
            7: Rule.RETURN,
            3: [Rule.BLOCK, Rule.RETURN],
        })
    ],
    Rule.BOOL_EXP: [
        Rule.BOOL, 
        Rule.AND_EXP,
        Rule.NOT_EXP,
    ],
    Rule.AND_EXP: (
        [Rule.BOOL, Rule.BOOL],
    ),
    Rule.NOT_EXP: (
        Rule.BOOL,
    ),
    Rule.BOOL: (
        Rule.FUNC_CALL,
    ),
    Rule.FUNC_CALL: tuple(
        make_dynamic_rule(name) for name in lib_func_names
    ),
    Rule.RETURN: (
        "return a",
    ),
    Rule.COLUMN_NUM: (
        '2', '3', '4', '5', '6'
    ),
    Rule.SMALL_NUM: (
        '0', '1', '2'
    )
}
grammar

{<Rule.START: 1>: (<Rule.BLOCK: 2>,),
 <Rule.BLOCK: 2>: (<__main__.Diminishing at 0x117718748>,),
 <Rule.IF_BLOCK: 3>: ([<Rule.BOOL_EXP: 5>, <Rule.IF_BODY: 4>],),
 <Rule.IF_BODY: 4>: [<__main__.Weighted at 0x117718780>],
 <Rule.BOOL_EXP: 5>: [<Rule.BOOL: 8>, <Rule.AND_EXP: 6>, <Rule.NOT_EXP: 7>],
 <Rule.AND_EXP: 6>: ([<Rule.BOOL: 8>, <Rule.BOOL: 8>],),
 <Rule.NOT_EXP: 7>: (<Rule.BOOL: 8>,),
 <Rule.BOOL: 8>: (<Rule.FUNC_CALL: 10>,),
 <Rule.FUNC_CALL: 10>: (['Lib.action_wills_column', 'state', 'a'],
  ['Lib.contains_number', 'a', <Rule.SMALL_NUM: 12>],
  ['Lib.has_won_column', 'state', 'a'],
  ['Lib.is_doubles', 'a'],
  ['Lib.is_stop_action', 'a'],
  ['Lib.number_positions_conquered', 'state', <Rule.COLUMN_NUM: 11>],
  ['Lib.number_positions_progressed_this_round',
   'state',
   <Rule.COLUMN_NUM: 11>]),
 <Rule.RETURN: 9>: ('return a',),
 <Rule.COLUMN_NUM: 11>: ('2', '3', '4', '5', '6'),
 <Rule.SMALL_NUM: 12>: ('0', '1', '2')}

In [23]:
def get_node_id():
    return str(uuid.uuid4())[:8]

In [24]:
def generate_tree(root):
    if isinstance(root.name, str):
        return root
    next_ = grammar.get(root.name, None)
    if not next_:
        return root
    
    branch = random.choice(next_)
    
    candidates = [] 
    if isinstance(branch, list):
        candidates = branch
    elif isinstance(branch, (Rule, str)):
        candidates = [branch]
    elif isinstance(branch, Sampler):
        candidates.extend(branch.sample())
    elif branch is None:
        pass
    else:
        raise ValueError
        
    for cand in candidates:
        child = Node(cand, parent=root, id_=get_node_id())
        generate_tree(child)
    return root

In [25]:
def get_random_tree(seed=None):
    if seed:
        random.seed(seed)
    root = Node(Rule.START, id_=get_node_id())
    tree = generate_tree(root)
    return tree

def print_tree(tree, call_log=None):
    def _format_node(node):
        return str(node.name) + (' (X) ' if node.id_ in call_log else '')

    if not call_log:
        print(RenderTree(script.tree).by_attr())
    else:
        print(RenderTree(script.tree).by_attr(_format_node))

In [26]:
def indent(raw, level):
    tab = '    '
    lines = raw.splitlines()
    lines = [tab * level + line for line in lines]
    return '\n'.join(lines)

In [27]:
def render(node):
    if isinstance(node.name, str):
        return node.name
    elif isinstance(node.name, Rule):
        if node.name == Rule.IF_BLOCK:
            template = "if ({0}):\n{1}\n"
            bool_exp = render(node.children[0])
            body = indent(render(node.children[1]), 1)
            return template.format(bool_exp, body)
        elif node.name == Rule.AND_EXP:
            template = "({0} and {1})"
            left = render(node.children[0])
            right = render(node.children[1])
            return template.format(left, right)
        elif node.name == Rule.NOT_EXP:
            template = "not ({0})"
            op = render(node.children[0])
            return template.format(op)
        elif node.name == Rule.FUNC_CALL:
            template = "{0}({1})"
            func_name = render(node.children[0])
            prefix = 'self, "' + node.id_ + '", '
            other_params = ','.join([render(child) for child in node.children[1:]])
            params = prefix + other_params
            return template.format(func_name, params)
        return ''.join(render(child) for child in node.children)

In [28]:
script_template = r"""

class {0}(Script):
    def get_action(self, state):
        actions = state.available_moves()
        for a in actions:
{1}
            pass
        return actions[0]
"""

def render_script(node):
    script_name = 'Script_' + str(uuid.uuid4()).replace('-', '')
    code = indent(render(node), 3)
    return script_name, script_template.format(script_name, code)

In [29]:
def exec_tree(tree):
    script_name, raw_script = render_script(tree)
    try:
        exec(raw_script)
    except Exception as e:
        print(e)
        print(raw_script)
    script = eval(script_name)
    return script(tree)

In [30]:
def mutate(tree):
#     tree = copy.deepcopy(tree)
    all_ifs = anytree.search.findall(tree, filter_=lambda n: n.name == Rule.IF_BLOCK)
    if all_ifs:
        target_if = random.choice(all_ifs)
        target_if.children = []
        generate_tree(target_if)
    return tree

In [31]:
def crossover(left, right):
    def _is_valid_subtree_head(node):
        return node.name in [Rule.IF_BLOCK, Rule.BOOL_EXP]

    def _find_head_types(root):
        return [node.name for node in anytree.search.findall(root, filter_=_is_valid_subtree_head)]

    def _select_type_node(root, type_):
        candidates = anytree.search.findall(root, filter_=lambda node: node.name == type_)
        return random.choice(candidates)
    
    def _swap_children(left, right):
        left.children, right.children = right.children, left.children
        
    left = copy.deepcopy(left)
    right = copy.deepcopy(right)

    left_head_types = _find_head_types(left)
    right_head_types = _find_head_types(right)

    type_intersection = list(set(left_head_types + right_head_types))
    if type_intersection:
        type_to_swap = random.choice(type_intersection)
        left_head = _select_type_node(left, type_to_swap)
        right_head = _select_type_node(right, type_to_swap)
        _swap_children(left_head, right_head)
        return left, right

In [32]:
def bubble_up_call_log(script):
    def _mark_up(node, call_log):
        while node.parent:
            if any(child.id_ in call_log for child in node.parent.children):
                call_log.add(node.parent.id_)
            node = node.parent

    for marked in anytree.search.findall(script.tree, lambda node: node.id_ in script.call_log):
        _mark_up(marked, script.call_log)

def remove_unused_if_blocks(script):
    def _unused_if_block(node, call_log):
        return node.name == Rule.IF_BLOCK and node.id_ not in call_log

    flag = False
    for unused in PreOrderIter(script.tree, lambda node: _unused_if_block(node, script.call_log)):
        unused.parent = None
        flag = True
    return flag

In [33]:
def eval_against_random(script, num_games=5):
    player_random = PlayerRandom()
    wins, _ = tournament(script, player_random, num_games, switch=True)
    return wins  / (num_games * 2)

def eval_population(population, num_games=5):
    results = []
    for script in population:
        results.append(eval_against_random(script, num_games))
    return np.mean(results)

In [34]:
def run_ga(
    epochs,
    limit,
    mutate_rate=0.5,
#     evaluation_num_games=5,
    selection_num_games=5,
):
    population = [exec_tree(get_random_tree()) for _ in range(limit)]

#     results = []
    generations = []
    
    for _ in tqdm_nb(range(epochs)):
        
        next_population = []
        for i in range(0, len(population) - 1, 2):
            lhs, rhs = population[i], population[i + 1]
            lhs_wins, rhs_wins = tournament(lhs, rhs, selection_num_games, switch=True)
            winner = [lhs, rhs][int(rhs_wins >= lhs_wins)]
            next_population.append(winner)

        for i, script in enumerate(next_population):
            bubble_up_call_log(script)
            removed = remove_unused_if_blocks(script)
            if removed:
                next_population[i] = exec_tree(script.tree)
            else:
                script.reset_call_log()

        # mutation
        to_mutate = [script for script in next_population if random.random() < mutate_rate]
        for script in to_mutate:
            mutated_tree = mutate(script.tree)
            mutated_script = exec_tree(mutated_tree)
            next_population.remove(script)
            next_population.append(mutated_script)

        # crossover
        while len(next_population) < limit:
            lhs, rhs = random.sample(next_population, k=2)
            lhs_child, rhs_child = crossover(lhs.tree, rhs.tree)
            for child in lhs_child, rhs_child:
                child_script = exec_tree(child)
                next_population.append(child_script)

        random.shuffle(next_population)
        population = next_population
        
#         results.append(eval_population(population, evaluation_num_games))
        generations.append([copy.deepcopy(script) for script in population])
    return generations

In [42]:
generations = run_ga(
    epochs=10,
    limit=128,
)

HBox(children=(FloatProgress(value=0.0, max=10.0), HTML(value='')))




In [39]:
results = np.array([eval_population(gen, 1) for gen in generations])

In [40]:
from bokeh.plotting import figure, output_file, show
from bokeh.io import output_notebook, output_file, reset_output

In [41]:
reset_output()
output_notebook()

x = np.array(list(range(len(results))))
y = results

p = figure(
    title="Win Rate Against Random Agent Over Time",
    x_axis_label='generation',
    y_axis_label='Win Rate'
)

p.line(x, y, line_width=2)

show(p)

In [None]:
i
















