In [15]:
import random
import numpy as np
import collections
from game import Board, Game
# from players.scripts.player_test import PlayerTest
from players.scripts.player_random import PlayerRandom
# from players.player import Player
# from players.scripts.DSL import DSL
import sys
import uuid
import inspect
import copy
import anytree
from anytree import Node, RenderTree, PreOrderIter
from anytree.exporter import DotExporter
from enum import Enum
import functools
import tqdm
from tqdm.notebook import tqdm as tqdm_nb
import itertools

In [16]:
def log_call(func):
    @functools.wraps(func)
    def _wrapper(*args):
        self, id_, *rest = args
        self.call_log.add(id_)
        return func(*rest)
    return _wrapper

class Script(object):
    def __init__(self, tree):
        super().__init__()
        self.tree = tree
        self.call_log = set()
        self.fitness = 0
        
    def reset(self):
        self.call_log.clear()
        self.fitness = 0
        
    def get_action(self):
        raise NotImplementedError
        
class Lib(object):
    @log_call
    def is_doubles(action):
        if len(action) > 1 and action[0] == action[1]:
            return True
        else:
            return False

    @log_call
    def contains_number(action, column_num):
        if not isinstance(action, str):
            if column_num in action:
                return True
        return False

    @log_call
    def action_wins_column(state, action):
        copy_state = copy.deepcopy(state)
        copy_state.play(action)
        columns_won = copy_state.columns_won_current_round()
        columns_won_previously = state.columns_won_current_round()
        if len(columns_won) > 0 and columns_won != columns_won_previously:
            return True
        return False
    
    @log_call
    def has_won_column(state, action):
        return len(state.columns_won_current_round()) > 0
    
    @log_call
    def column_progression_this_round_greater_than(state, column_num, small_num):
        return state.number_positions_conquered_this_round(column_num) >= small_num

    @log_call
    def column_progression_greater_than(state, column_num, small_num):
        return state.number_positions_conquered(column_num) >= small_num
    
    @log_call
    def progression_this_round_greater_than(state, small_num):
        progression = sum(state.number_positions_conquered_this_round(i) for i in range(2, 7))
        return progression >= small_num
    
    @log_call
    def progression_greater_than(state, small_num):
        progression = sum(state.number_positions_conquered(i) for i in range(2, 7))
        return progression >= small_num

    @log_call
    def is_yes_action(action):
        if isinstance(action, str) and action == 'y':
            return True
        return False
    
    @log_call
    def is_no_action(action):
        if isinstance(action, str) and action == 'n':
            return True
        return False

In [17]:
default_enums = [
    "START",
    "BLOCK",
    "IF_BLOCK",
    "IF_BODY",
    "BOOL_EXP",
    "AND_EXP",
    "NOT_EXP",
    "BOOL",
    "RETURN",
    "FUNC_CALL",
    "COLUMN_NUM",
    "SMALL_NUM",
]
    
lib_functions = inspect.getmembers(Lib, inspect.isfunction)

In [18]:
lib_func_names = [
    name.upper()
    for name, _ in lib_functions if name[0] != '_'
]

Rule = Enum('Rule', default_enums + lib_func_names)

print(list(Rule))

[<Rule.START: 1>, <Rule.BLOCK: 2>, <Rule.IF_BLOCK: 3>, <Rule.IF_BODY: 4>, <Rule.BOOL_EXP: 5>, <Rule.AND_EXP: 6>, <Rule.NOT_EXP: 7>, <Rule.BOOL: 8>, <Rule.RETURN: 9>, <Rule.FUNC_CALL: 10>, <Rule.COLUMN_NUM: 11>, <Rule.SMALL_NUM: 12>, <Rule.ACTION_WINS_COLUMN: 13>, <Rule.COLUMN_PROGRESSION_GREATER_THAN: 14>, <Rule.COLUMN_PROGRESSION_THIS_ROUND_GREATER_THAN: 15>, <Rule.CONTAINS_NUMBER: 16>, <Rule.HAS_WON_COLUMN: 17>, <Rule.IS_DOUBLES: 18>, <Rule.IS_NO_ACTION: 19>, <Rule.IS_YES_ACTION: 20>, <Rule.PROGRESSION_GREATER_THAN: 21>, <Rule.PROGRESSION_THIS_ROUND_GREATER_THAN: 22>]


In [19]:
def get_func(name):
    return dict(lib_functions)[name.lower()]

In [20]:
def get_params(f):
    return list(inspect.signature(f).parameters)

In [21]:
def make_dynamic_rule(name):
    func = get_func(name)
    params = get_params(func)
    def _convert(param):
        if param == 'state':
            return 'state'
        elif param == 'action':
            return 'a'
        elif param == 'column_num':
            return Rule.COLUMN_NUM
        elif param == 'small_num':
            return Rule.SMALL_NUM
        elif param == 'self':
            return 'self'
        else:
            raise ValueError
    return ['Lib.' + name.lower(), *list(map(_convert, params))]

In [22]:
class Sampler(object):
    pass

class Diminishing(Sampler):
    def __init__(self, gamma, rule):
        self.gamma = gamma
        self.rule = rule
        
    def sample(self):
        ret = []
        curr = 1
        while random.random() <= curr:
            ret.append(self.rule)
            curr *= self.gamma
        return ret
    
class Weighted(Sampler):
    def __init__(self, dict_):
        self.dict = dict_
    
    def sample(self):
        weight_sum = sum(self.dict.keys())
        normal_quoefficient = 1 / weight_sum
        rand = random.random()
        for weight, rule in self.dict.items():
            prob = weight * normal_quoefficient
            if rand <= prob:
                if not isinstance(rule, list):
                    rule = [rule]
                return rule
            else:
                rand -= prob
        raise ValueError

In [23]:
grammar = {
    Rule.START: (
        Rule.BLOCK,
    ),
    Rule.BLOCK: (
        Diminishing(0.6, Rule.IF_BLOCK),
    ),
    Rule.IF_BLOCK: (
        [Rule.BOOL_EXP, Rule.IF_BODY],
    ),
    Rule.IF_BODY: [
        Weighted({
            7: Rule.RETURN,
            3: [Rule.BLOCK, Rule.RETURN],
        })
#         Rule.RETURN,
    ],
    Rule.BOOL_EXP: [
        Rule.BOOL, 
        Rule.AND_EXP,
        Rule.NOT_EXP,
    ],
    Rule.AND_EXP: (
        [Rule.BOOL, Rule.BOOL],
    ),
    Rule.NOT_EXP: (
        Rule.BOOL,
    ),
    Rule.BOOL: (
        Rule.FUNC_CALL,
    ),
    Rule.FUNC_CALL: tuple(
        make_dynamic_rule(name) for name in lib_func_names
    ),
    Rule.RETURN: (
        "return a",
    ),
    Rule.COLUMN_NUM: (
        '2', '3', '4', '5', '6'
    ),
    Rule.SMALL_NUM: (
        '0', '1', '2', '3'
    )
}
grammar

{<Rule.START: 1>: (<Rule.BLOCK: 2>,),
 <Rule.BLOCK: 2>: (<__main__.Diminishing at 0x11c366f98>,),
 <Rule.IF_BLOCK: 3>: ([<Rule.BOOL_EXP: 5>, <Rule.IF_BODY: 4>],),
 <Rule.IF_BODY: 4>: [<__main__.Weighted at 0x11c366cf8>],
 <Rule.BOOL_EXP: 5>: [<Rule.BOOL: 8>, <Rule.AND_EXP: 6>, <Rule.NOT_EXP: 7>],
 <Rule.AND_EXP: 6>: ([<Rule.BOOL: 8>, <Rule.BOOL: 8>],),
 <Rule.NOT_EXP: 7>: (<Rule.BOOL: 8>,),
 <Rule.BOOL: 8>: (<Rule.FUNC_CALL: 10>,),
 <Rule.FUNC_CALL: 10>: (['Lib.action_wins_column', 'state', 'a'],
  ['Lib.column_progression_greater_than',
   'state',
   <Rule.COLUMN_NUM: 11>,
   <Rule.SMALL_NUM: 12>],
  ['Lib.column_progression_this_round_greater_than',
   'state',
   <Rule.COLUMN_NUM: 11>,
   <Rule.SMALL_NUM: 12>],
  ['Lib.contains_number', 'a', <Rule.COLUMN_NUM: 11>],
  ['Lib.has_won_column', 'state', 'a'],
  ['Lib.is_doubles', 'a'],
  ['Lib.is_no_action', 'a'],
  ['Lib.is_yes_action', 'a'],
  ['Lib.progression_greater_than', 'state', <Rule.SMALL_NUM: 12>],
  ['Lib.progression_this_ro

In [24]:
def get_node_id():
    return str(uuid.uuid4())[:8]

In [25]:
def generate_tree(root):
    if isinstance(root.name, str):
        return root
    next_ = grammar.get(root.name, None)
    if not next_:
        return root
    
    branch = random.choice(next_)
    
    candidates = [] 
    if isinstance(branch, list):
        candidates = branch
    elif isinstance(branch, (Rule, str)):
        candidates = [branch]
    elif isinstance(branch, Sampler):
        candidates.extend(branch.sample())
    elif branch is None:
        pass
    else:
        raise ValueError
        
    for cand in candidates:
        child = Node(cand, parent=root, id_=get_node_id())
        generate_tree(child)
    return root

In [26]:
def get_random_tree(seed=None):
    if seed:
        random.seed(seed)
    root = Node(Rule.START, id_=get_node_id())
    tree = generate_tree(root)
    return tree

def print_tree(tree, call_log=None):
    def _format_node(node):
        return str(node.name) + (' (X) ' if node.id_ in call_log else '')

    if not call_log:
        print(RenderTree(tree, style=anytree.render.AsciiStyle()).by_attr())
    else:
        print(RenderTree(tree, style=anytree.render.AsciiStyle()).by_attr(_format_node))

In [49]:
print(render_script(get_random_tree())[1])



class Script_b9a7203fa92341169f6ba1681a55ede2(Script):
    def get_action(self, state):
        actions = state.available_moves()
        for a in actions:
            if (not (Lib.is_no_action(self, "36af38cc", a))):
                return a
            pass
        return actions[0]



In [46]:
def indent(raw, level):
    tab = '    '
    lines = raw.splitlines()
    lines = [tab * level + line for line in lines]
    return '\n'.join(lines)

In [44]:
def render(node):
    if isinstance(node.name, str):
        return node.name
    elif isinstance(node.name, Rule):
        if node.name == Rule.IF_BLOCK:
            template = "if ({0}):\n{1}\n"
            bool_exp = render(node.children[0])
            body = indent(render(node.children[1]), 1)
            return template.format(bool_exp, body)
        elif node.name == Rule.AND_EXP:
            template = "({0} and {1})"
            left = render(node.children[0])
            right = render(node.children[1])
            return template.format(left, right)
        elif node.name == Rule.NOT_EXP:
            template = "not ({0})"
            op = render(node.children[0])
            return template.format(op)
        elif node.name == Rule.FUNC_CALL:
            template = "{0}({1})"
            func_name = render(node.children[0])
            prefix = 'self, "' + node.id_ + '", '
            other_params = ', '.join([render(child) for child in node.children[1:]])
            params = prefix + other_params
            return template.format(func_name, params)
        return ''.join(render(child) for child in node.children)

In [45]:
script_template = r"""

class {0}(Script):
    def get_action(self, state):
        actions = state.available_moves()
        for a in actions:
{1}
            pass
        return actions[0]
"""

def render_script(node):
    script_name = 'Script_' + str(uuid.uuid4()).replace('-', '')
    code = indent(render(node), 3)
    return script_name, script_template.format(script_name, code)

In [64]:
def exec_tree(tree):
    script_name, raw_script = render_script(tree)
    try:
        exec(raw_script)
    except Exception as e:
        print(e)
        print(raw_script)
    script = eval(script_name)
    return script(tree)

In [65]:
def mutate(tree):
    tree = copy.deepcopy(tree)
    candidates = anytree.search.findall(tree)
    if candidates:
        target = random.choice(candidates)
        target.children = []
        generate_tree(target)
    return tree

In [66]:
def crossover(left, right):
    def _is_valid_subtree_head(node):
        return node.name in [Rule.IF_BLOCK]

    def _find_head_types(root):
        return [node.name for node in anytree.search.findall(root, filter_=_is_valid_subtree_head)]

    def _select_type_node(root, type_):
        candidates = anytree.search.findall(root, filter_=lambda node: node.name == type_)
        return random.choice(candidates)
    
    def _swap_children(left, right):
        left.children, right.children = right.children, left.children
        
    left = copy.deepcopy(left)
    right = copy.deepcopy(right)

    left_head_types = _find_head_types(left)
    right_head_types = _find_head_types(right)

    type_intersection = list(set(left_head_types) & set(right_head_types))
    if type_intersection:
        type_to_swap = random.choice(type_intersection)
        left_head = _select_type_node(left, type_to_swap)
        right_head = _select_type_node(right, type_to_swap)
        _swap_children(left_head, right_head)
    return left, right

In [67]:
def bubble_up_call_log(script):
    def _mark_up(node, call_log):
        while node.parent:
            if any(child.id_ in call_log for child in node.parent.children):
                call_log.add(node.parent.id_)
            node = node.parent

    for marked in anytree.search.findall(script.tree, lambda node: node.id_ in script.call_log):
        _mark_up(marked, script.call_log)

def remove_unused_if_blocks(script):
    def _unused_if_block(node, call_log):
        return node.name == Rule.IF_BLOCK and node.id_ not in call_log

    flag = False
    for unused in PreOrderIter(script.tree, lambda node: _unused_if_block(node, script.call_log)):
        unused.parent = None
        flag = True
    return flag

In [68]:
def play_game(lhs, rhs):
    game = Game(n_players=2, dice_number=4, dice_value=3, column_range=[2, 6],
                offset=2, initial_height=1)
    is_over = False
    who_won = None

    number_of_moves = 0
    current_player = game.player_turn
    while not is_over:
        moves = game.available_moves()
        if game.is_player_busted(moves):
            if current_player == 1:
                current_player = 2
            else:
                current_player = 1
            continue
        else:
            if game.player_turn == 1:
                chosen_play = lhs.get_action(game)
            else:
                chosen_play = rhs.get_action(game)
            if chosen_play == 'n':
                if current_player == 1:
                    current_player = 2
                else:
                    current_player = 1
            game.play(chosen_play)
            number_of_moves += 1

        who_won, is_over = game.is_finished()

        if number_of_moves >= 200:
            is_over = True
            who_won = -1
    return who_won

def evaluate_pair(lhs, rhs, num_games=3):
    for _ in range(num_games):
        result = play_game(lhs, rhs)
        if result == 1:
            winner, losers = lhs, [rhs]
        elif result == 2:
            winner, losers = rhs, [lhs]
        else:
            winner, losers = None, [lhs, rhs]
        if winner:
            winner.fitness += 1
        for loser in losers:
            loser.fitness -= 1

def evaluate_population(population, num_games=3):
    for lhs, rhs in list(itertools.combinations(population, 2)):
        evaluate_pair(lhs, rhs, num_games)
        evaluate_pair(rhs, lhs, num_games)
        
def get_elites(population, limit=5):
    return sorted(population, key=lambda script: script.fitness)[-limit:]

def get_tournament_elites(population, limit=5):
    return get_elites(random.sample(population, k=limit), limit=2)

In [69]:
# def eval_against_random(script, num_games=5):
#     player_random = PlayerRandom()
#     wins, _ = tournament(script, player_random, num_games, switch=True)
#     return wins  / (num_games * 2)
def get_win_rate_against_random(script, num_games=5):
    player_random = PlayerRandom()
    wins = 0
    
    for _ in range(num_games):
        wins += int(play_game(script, player_random) == 1)
        wins += int(play_game(player_random, script) == 2)
    return wins / (num_games * 2)

def play_population_against_random(population, num_games=5):
    results = []
    player_random = PlayerRandom()
    for script in population:
        wins = 0
        for _ in range(num_games):
            wins += int(play_game(script, player_random) == 1)
            wins += int(play_game(player_random, script) == 2)
        win_rate = wins / (num_games * 2)
        results.append(win_rate)
#         results.append(eval_against_random(script, num_games))
    return np.mean(results)

In [70]:
def run_ga(
    epochs,
    limit,
    num_elites=5,
    mutate_rate=0.5,
    num_games=3,
):
    population = [exec_tree(get_random_tree()) for _ in range(limit)]

    generations = []
    
    for _ in tqdm_nb(range(epochs)):
        evaluate_population(population, num_games=num_games)
        
        # remove unused if blocks
        for i, script in enumerate(population):
            bubble_up_call_log(script)
            removed = remove_unused_if_blocks(script)
            if removed:
                if len(anytree.search.findall(script.tree)) <= 2:
                    print_tree(script.tree)
                    tree = get_random_tree()
                else:
                    tree = script.tree
                population[i] = exec_tree(tree)
                population[i].fitness = script.fitness
        
        generations.append([copy.deepcopy(script) for script in population])
        
        next_population = []
        next_population.extend(get_elites(population, num_elites))
        
        while len(next_population) < len(population):
            mother, father = get_tournament_elites(population)
            for child_tree in crossover(mother.tree, father.tree):
                child = exec_tree(child_tree)
                if random.random() < mutate_rate:
                    child_tree = mutate(child.tree)
                    child = exec_tree(child_tree)
                next_population.append(child)
                
        for script in next_population:
            script.reset()
                
        population = next_population
        
    evaluate_population(population)
    generations.append([copy.deepcopy(script) for script in population])
    return generations

In [96]:
parameters_set = [
    # epochs, limit, num_games, num_elites
    (50, 8, 2, 4),
    (50, 12, 2, 4),
    (50, 16, 2, 4),
    (50, 20, 2, 4),
    (50, 12, 1, 4),
    (50, 12, 3, 4),
    (50, 12, 2, 2),
    (50, 12, 2, 6),
]
results = {}
for params in parameters_set:
    epochs, limit, num_games, num_elites = params
    generations = run_ga(
        epochs=epochs,
        limit=limit,
        num_games=num_games,
        num_elites=num_elites
    )
    results[params] = generations

HBox(children=(FloatProgress(value=0.0, max=50.0), HTML(value='')))




HBox(children=(FloatProgress(value=0.0, max=50.0), HTML(value='')))




HBox(children=(FloatProgress(value=0.0, max=50.0), HTML(value='')))




HBox(children=(FloatProgress(value=0.0, max=50.0), HTML(value='')))




HBox(children=(FloatProgress(value=0.0, max=50.0), HTML(value='')))




HBox(children=(FloatProgress(value=0.0, max=50.0), HTML(value='')))




HBox(children=(FloatProgress(value=0.0, max=50.0), HTML(value='')))




HBox(children=(FloatProgress(value=0.0, max=50.0), HTML(value='')))




In [1]:
import pickle

In [116]:
plot_data = []
for params, generations in results.items():
    vals = np.array([play_population_against_random(get_elites(gen, 1), 10)
                    for gen in generations])
    epochs, limit, num_games, num_elites = params
    title = str({"epochs": epochs, "num_games": num_games,
             "population-size": limit, "num_elites": num_elites})
    
    plot_data.append((vals, title))

In [118]:
with open("plot_data.pkl", 'wb') as f:
    pickle.dump(plot_data, f)

In [2]:
with open("plot_data.pkl", 'rb') as f:
    plot_data = pickle.load(f)

In [12]:
for vals, title in plot_data:
    plot(vals, title)

In [24]:
results = np.array([play_population_against_random(get_elites(gen, 1), 10) for gen in generations])

In [4]:
from bokeh.plotting import figure, output_file, show
from bokeh.io import output_notebook, output_file, reset_output, export_png

In [26]:
elite = get_elites(generations[-1], 1)[0]
# bubble_up_call_log(elite)
# removed = remove_unused_if_blocks(elite)
print_tree(elite.tree)

Rule.START
└── Rule.BLOCK
    ├── Rule.IF_BLOCK
    │   ├── Rule.BOOL_EXP
    │   │   └── Rule.AND_EXP
    │   │       ├── Rule.BOOL
    │   │       │   └── Rule.FUNC_CALL
    │   │       │       ├── Lib.is_no_action
    │   │       │       └── a
    │   │       └── Rule.BOOL
    │   │           └── Rule.FUNC_CALL
    │   │               ├── Lib.progression_this_round_greater_than
    │   │               ├── state
    │   │               └── Rule.SMALL_NUM
    │   │                   └── 1
    │   └── Rule.IF_BODY
    │       └── Rule.RETURN
    │           └── return a
    ├── Rule.IF_BLOCK
    │   ├── Rule.BOOL_EXP
    │   │   └── Rule.AND_EXP
    │   │       ├── Rule.BOOL
    │   │       │   └── Rule.FUNC_CALL
    │   │       │       ├── Lib.is_no_action
    │   │       │       └── a
    │   │       └── Rule.BOOL
    │   │           └── Rule.FUNC_CALL
    │   │               ├── Lib.progression_this_round_greater_than
    │   │               ├── state
    │   │               └── Rul

In [11]:
def plot(vals, title=""):
    title = title.replace(":", "_").replace(
            "{", "").replace(
            "}", "").replace(",", "_").replace(" ", "").replace("'", "")
    reset_output()
    output_notebook()
    output_file(title + '.html')

    x = np.array(list(range(len(vals))))
    y = vals

    p = figure(
        title=title,
        x_axis_label='generation',
        y_axis_label='win rate against random agent',
        y_range=(0, 1),
    )

    p.line(x, y, line_width=2)
#     show(p)
    export_png(p, filename='results/' + title + '.png')

In [45]:
class MyScript(Script):
    def get_action(self, state):
        actions = state.available_moves()
        for a in actions:
            if (Lib.is_no_action(self, "", a)):
                if (Lib.has_won_column(self, "", state, a)):
                    return a
                
            if (Lib.is_yes_action(self, "", a)):
                if (not Lib.progression_greater_than(self, "", state, 2)
                   and not Lib.has_won_column(self, "", state, a)):
                    return a
                if (not (Lib.progression_this_round_greater_than(self, "", state, 1))):
                    return a
            
            if (Lib.action_wins_column(self, "", state, a)):
                return a
                
            if (Lib.is_doubles(self, "", a)):
                return a
            
        return actions[0]
    
get_win_rate_against_random(MyScript(None), num_games=5000)

0.9235

In [39]:
c = collections.Counter([play_game(elite, MyScript(None)) for _ in range(100)])
c2 = collections.Counter([play_game(MyScript(None), elite) for _ in range(100)])
print(c, c2)

Counter({1: 66, 2: 34}) Counter({1: 55, 2: 45})


In [50]:
for script in generations[-2]:
    print_tree(script.tree)

NameError: name 'generations' is not defined