In [221]:
from environment.to_optimize_v0 import *
from abc import ABC, abstractmethod
from collections import defaultdict
from copy import deepcopy
import math
import random

from typing import Dict, Any
import hashlib
import json

In [3]:
parameters = {
    "p1":0.23,
    "p2":12,
    "p3":34
}

In [262]:
granularity = 50

parameters_range = {
    "p1":np.linspace(start=0, stop=1, num=granularity, dtype=float),
    "p2":np.linspace(start=0, stop=100, num=granularity, dtype=int).astype(float),
    "p3":np.linspace(start=0, stop=50, num=granularity, dtype=int).astype(float)
}

In [263]:

class MCTS:
    "Monte Carlo tree searcher. First rollout the tree then choose a move."

    def __init__(self, exploration_weight=1):
        self.Q = defaultdict(int)  # total reward of each node
        self.N = defaultdict(int)  # total visit count for each node
        self.children = dict()  # children of each node
        self.exploration_weight = exploration_weight

    def choose(self, node):
        "Choose the best successor of node. (Choose a move in the game)"
        if node.is_terminal():
            raise RuntimeError(f"choose called on terminal node {node}")

        if node not in self.children:
            return node.find_random_child()

        def score(n):
            if self.N[n] == 0:
                return float("-inf")  # avoid unseen moves
            return self.Q[n] / self.N[n]  # average reward

        return max(self.children[node], key=score)

    def do_rollout(self, node):
        "Make the tree one layer better. (Train for one iteration.)"
        path = self._select(node)
        leaf = path[-1]
        self._expand(leaf)
        reward = self._simulate(leaf)
        self._backpropagate(path, reward)

    def _select(self, node):
        "Find an unexplored descendent of `node`"
        path = []
        while True:
            path.append(node)
            if node not in self.children or not self.children[node]:
                # node is either unexplored or terminal
                return path
            unexplored = self.children[node] - self.children.keys()
            if unexplored:
                n = unexplored.pop()
                path.append(n)
                return path
            node = self._uct_select(node)  # descend a layer deeper

    def _expand(self, node):
        "Update the `children` dict with the children of `node`"
        if node in self.children:
            return  # already expanded
        self.children[node] = node.find_children()

    def _simulate(self, node):
        "Returns the reward for a random simulation (to completion) of `node`"
        invert_reward = True
        
        while True:
            if node.is_terminal():
                reward = node.reward()
                return 1 - reward if invert_reward else reward
            node = node.find_random_child()
            invert_reward = not invert_reward

    def _backpropagate(self, path, reward):
        "Send the reward back up to the ancestors of the leaf"
        for node in reversed(path):
            self.N[node] += 1
            self.Q[node] += reward
            #reward = 1 - reward  # 1 for me is 0 for my enemy, and vice versa

    def _uct_select(self, node):
        "Select a child of node, balancing exploration & exploitation"

        # All children of node should already be expanded:
        assert all(n in self.children for n in self.children[node])

        log_N_vertex = math.log(self.N[node])

        def uct(n):
            "Upper confidence bound for trees"
            return self.Q[n] / self.N[n] + self.exploration_weight * math.sqrt(
                log_N_vertex / self.N[n]
            )

        return max(self.children[node], key=uct)

In [264]:
class state:
    def __init__(self, p1=None, p2=None, p3=None):
        self.parameters = {"p1":p1, "p2":p2, "p3":p3}

    def assess_terminal(self):
        flag = True
        for k in self.parameters.keys():
            if self.parameters[k] is None:
                flag=False
        return flag
    
    def compute_value(self, input_value):
        if not self.assess_terminal():
            raise ValueError("parameters are not all defined")
        else:
            return main_loop(self.parameters, input_value)

In [265]:
class Node(ABC):
    """
    A representation of a single board state.
    MCTS works by constructing a tree of these Nodes.
    Could be e.g. a chess or checkers board state.
    """

    @abstractmethod
    def find_children(self):
        "All possible successors of this board state"
        return set()

    @abstractmethod
    def find_random_child(self):
        "Random successor of this board state (for more efficient simulation)"
        return None

    @abstractmethod
    def is_terminal(self):
        "Returns True if the node has no children"
        return True

    @abstractmethod
    def reward(self):
        "Assumes `self` is terminal node. 1=win, 0=loss, .5=tie, etc"
        return 0

    @abstractmethod
    def __hash__(self):
        "Nodes must be hashable"
        return 123456789

    @abstractmethod
    def __eq__(node1, node2):
        "Nodes must be comparable"
        return True

In [290]:
class ToOptimizeV0(Node):
    def __init__(self, parameter_range, state={"p1":None, "p2":None, "p3":None}, value=12):
        self.state = state
        self.range = parameter_range
        self.value = value
    
    def is_terminal(self):
        flag = True
        for k in self.state.keys():
            if self.state[k] is None:
                flag=False
        return flag
    
    def find_children(self):
        if self.is_terminal():
            raise ValueError("trying to find children of terminal node")
            
        child_list = []
        for k in self.state.keys():
            if self.state[k] is None:
                for p in self.range[k]:
                    child = deepcopy(self.state)
                    child[k] = p
                    child_list.append(ToOptimizeV0(self.range, deepcopy(child)))
                return set(child_list)

    def reward(self):
        "Assumes `self` is terminal node."
        a, b = main_loop(self.state, self.value)
        if abs(a-b) < 1:
            return 1/((abs(a-b)**2) + 1)
        
        else:
            return 1/((math.sqrt(abs(a-b))) + 1)
        return _distance(comp,co)
    

    def __hash__(self):
        dhash = hashlib.md5()
        # We need to sort arguments so {'a': 1, 'b': 2} is
        # the same as {'b': 2, 'a': 1}
        encoded = json.dumps(self.state, sort_keys=True).encode()
        dhash.update(encoded)
        return int(dhash.hexdigest(),16)



    def __eq__(node1, node2):
        "Nodes must be comparable"
        return node1.state == node2.state
    

    def find_random_child(self):
        "Random successor of this board state (for more efficient simulation)"
        if self.is_terminal():
            raise ValueError("trying to find child of terminal node")
            
        for k in self.state.keys():
            if self.state[k] is None:
                child = deepcopy(self.state)
                child[k] = np.random.choice(self.range[k])
                return ToOptimizeV0(self.range, deepcopy(child))

In [306]:
test_class = ToOptimizeV0(parameters_range)

In [307]:
test_class.__hash__()

55242016899870256513250550104786104889

In [308]:
test_tree = MCTS()

In [313]:
for i in range(10000):
    test_tree.do_rollout(test_class)

ValueError: trying to find children of terminal node

In [321]:
test_tree.Q

defaultdict(int,
            {<__main__.ToOptimizeV0 at 0x7ff5c85b9810>: 2.4306546177359483,
             <__main__.ToOptimizeV0 at 0x7ff5e9091610>: 50.958386337046036,
             <__main__.ToOptimizeV0 at 0x7ff5e8ff59d0>: 50.96262105740942,
             <__main__.ToOptimizeV0 at 0x7ff5e8ff5a10>: 50.955811854966996,
             <__main__.ToOptimizeV0 at 0x7ff5e9091310>: 50.96937101579162,
             <__main__.ToOptimizeV0 at 0x7ff5e9091bd0>: 50.94755229993362,
             <__main__.ToOptimizeV0 at 0x7ff5c8c42f10>: 50.93010282010564,
             <__main__.ToOptimizeV0 at 0x7ff5e8ff5710>: 50.940072344139665,
             <__main__.ToOptimizeV0 at 0x7ff5c8c42910>: 50.9451538667213,
             <__main__.ToOptimizeV0 at 0x7ff5c8c42510>: 50.94357645188804,
             <__main__.ToOptimizeV0 at 0x7ff5e9091490>: 50.959677545314406,
             <__main__.ToOptimizeV0 at 0x7ff5e8ff5bd0>: 50.955996192072426,
             <__main__.ToOptimizeV0 at 0x7ff5e9091410>: 50.94822048773971,
   

In [316]:
main_loop(test_tree.choose(test_tree.choose(test_tree.choose(test_class))).state, 12)

(3676492005376.0, 168817.67730447935)

In [247]:
test_tree.choose(test_class).state

{'p1': 0.9795918367346939, 'p2': None, 'p3': None}

In [245]:
test_class.state

{'p1': None, 'p2': None, 'p3': None}

In [178]:
test_tree.Q

defaultdict(int,
            {<__main__.ToOptimizeV0 at 0x7ff5d87331d0>: 0.021793996405826955})

In [200]:
list(test_tree.N.keys())[0].state

{'p1': None, 'p2': None, 'p3': None}