### Tamagotchi POMCP
pam o.p. 2018  
Pomagotchi (Partially observable tamagotchi)  
And POMCP solution method (MCTS in PO framework)  
version 2.0

In [35]:
# imports
from random import randint
import numpy as np
import random
import datetime # for limiting calculation to wall clock time
import math
import copy
import matplotlib.pyplot as plt
import pdb

In [28]:
# tamagotchi class adapted from https://github.com/bitterfly/tamagotchi/blob/master/tamagotchi/core/tamagotchi.py

class Tamagotchi:
    def __init__(self):
        self.stats = {"food": 100, "happiness": 100, "hygiene": 100,
                 "health": 100, "energy": 100}
        self.is_sleeping = False
        self.is_dead = False
        self.is_playing = False
        self.is_sick = False
        self.number_of_poo = 0
        self.money = 0
        self.time = 0
        self.in_store = False

    #Makes sure the statistic isn't below 0 or above 100
    def constrain(self, value):
        value = min(100, value)
        value = max(0, value)
        return value

    #Constrains all the stats
    def constrain_stats(self):
        for statistic, value in self.stats.items():
            self.stats[statistic] = self.constrain(value)

    #Takes a dictionary with statistics and adds each value
    #to the tamagotchi statistics
    def apply(self, item):
#         for statistic, value in self.stats.items():
#             self.stats[statistic] += item[statistic]
        self.stats[item['stats']] += item['effect']
        self.constrain_stats()

#     #Takes a statistic and decreases it to zero in "full hours" time
    def decrease_to_minimum(self, statistic, full_hours, time_given):
        self.stats[statistic] -= 2 * np.ceil( (time_given * 10) / (full_hours * 36) ) 
#     def decrease_to_minimum(self, statistic, full_hours, time_given):
#         self.stats[statistic] -= int(full_hours/2)

#     #Takes a statistic and increases it to max in "full hours" time
    def increase_to_maximum(self, statistic, full_hours, time_given):
        self.stats[statistic] += 2 * np.ceil( (time_given * 10) / (full_hours * 36) ) 
#     def increase_to_maximum(self, statistic, full_hours, time_given):
#         self.stats[statistic] += int(full_hours)

    #Generates random sickness and poo
    def random_event(self):
        if (not self.is_playing and not self.is_sleeping):
            random_number = randint(0, 600)
            if random_number == 0:
                self.is_sick = True
            if random_number == 1:
                self.number_of_poo = min (self.number_of_poo + 1, 4)

    #Used in mainwindow - removes sickness
    def cure(self):
        self.is_sick = False

    #The function witch decreases all the stats every second
    #or is called when tamagotchi is sleeping
    def second_pass(self, seconds=1):
        "Докато спиш всички статове падат за 8 часа, освен сънят, който се възстановява"
        "As long as you sleep, all the stats fall for 8 hours, except for the sleep that is recovering"
        self.time += 1 
        
        # let's see what happens if make it a finite problem (though should really be same w discounted futures but...)
        if self.time >= 1000:
            self.is_dead = True
        
        if self.is_sleeping:
            self.increase_to_maximum("energy", 3, seconds)
            self.decrease_to_minimum("happiness", 20, seconds)
            self.decrease_to_minimum("hygiene", (20 + 2*self.number_of_poo), seconds)
            self.decrease_to_minimum("food", 20, seconds)
            
            # earn money while asleep
            if self.is_playing:
                self.money += 4*seconds # pam added
            
            if self.stats["energy"] > 50:
                self.is_sleeping = False
        else:
            "Докато играеш, повечето статистики падат по-бързо."
            "As you play, most stats fall faster"
            if self.is_playing:
                self.decrease_to_minimum("energy", 3, seconds)
                self.decrease_to_minimum("hygiene", 3, seconds)
                self.decrease_to_minimum("food", 6, seconds)
                self.increase_to_maximum("happiness", 1, seconds)
                self.money += 4*seconds # pam added
            else:
                self.decrease_to_minimum("energy", 4, seconds)
                self.decrease_to_minimum("hygiene", 4 / (self.number_of_poo + 1), seconds)
                self.decrease_to_minimum("food", 8, seconds)
                self.decrease_to_minimum("happiness", 4, seconds)

        if (self.stats["happiness"] <= 50 or
               self.stats["hygiene"] <= 50):
                self.is_sick = True
                
        if self.is_sick:
            self.decrease_to_minimum("health", 3, seconds)

        self.constrain_stats()

        self.random_event()

        if (self.stats["food"] == 0 or self.stats["health"] == 0):
            self.is_dead = True
        
        #add (?) that if energy is <=20 then goes to sleep
        if self.stats["energy"] <=20:
            self.is_sleeping = True

    #The function is called when an item is chosen and
    #applies its statistics
    def buy_item(self, item):
        self.apply(item)
        self.money -= item["price"]
        return self
    
    # so that tuple(tamagotchi) can be called, make tamagotchi iterable
    def __iter__(self):
        traits = [tuple(self.stats.items()),
                 self.is_sleeping,
                 self.is_dead,
                 self.is_playing,
                 self.is_sick,
                 self.number_of_poo,
                 self.money,
                 self.time,
                 self.in_store]
        for i in range(len(traits)):
            yield traits[i]
                
    
    def print_tama(self):
        print("Time:",self.time," seconds elapsed")
        if self.is_dead:
            print("Tama is dead!")
    
        print("TAMA STATS: ", self.stats)
        if self.is_sleeping:
            print("Tama is asleep.")
        else:
            print("Tama is awake.")
            if self.is_playing:
                print("Tama is playing.")
            if self.is_sick:
                print("Tama is sick.")
        print("Number of poo:",self.number_of_poo)
        print("Money:",self.money)
        if self.in_store:
            print("You're in the store, so you can buy an item if you want.")
        else:
            print("You're not in the store, so you can play, do nothing, or travel to the store.")
                
        print("\n")
        

In [29]:
# game class:
# defines transition probs, legal actions, reward probs, observation probs 
class Tamagotchi_Game():
    def __init__(self):
#         self.tamagotchi = tamagotchi
        self.time_passing = True
        self.observation_prob = 0.8
    
    # start game 
#     def start(self,state):
#         self.time_passing = True
#         return state
        
    # not used, but would be useful for a human playing the game to take breaks
#     def pause(self):
#         self.time_passing = False
#         return self.tamagotchi
        
    # given a tuple tamagotchi "state" and action "action", what would the next tama state be?
    # this contains the transition function for the POMDP
    def next_state(self, tupstate, action):
        
        state = unpack(tupstate)   
        
        free_acts = ['nothing','check-happy','check-energy',
                     'check-food','check-health','check-hygiene',
                     'check-asleep','check-playing','check-money',
                     'check-sick','check-instore','check-numpoo']
        
        if action in free_acts and not action == 'nothing':
#             state.is_playing = False
            state.second_pass()
            return tuple(state) # one second passes nothing else happens
          
        if state.in_store:
            state.is_playing = False
            state.second_pass() # does this need to be state.second_pass()

            if action == 'nothing': # leave the store
                state.in_store = False
                return tuple(state)
            elif action == 'coffee':
                return tuple(state.buy_item({"stats":"energy","price":3,"effect":8})) if state.money>=3 else tuple(state)
            elif action == 'snack':
                return tuple(state.buy_item({"stats":"food","price":3,"effect":8})) if state.money>=3 else tuple(state)
            elif action == 'clean':
                state.number_of_poo =0
                return tuple(state.buy_item({"stats":"hygiene","price":5,"effect":10})) if state.money>=5 else tuple(state)
            elif action == 'medicine':
                state.cure()
                return tuple(state.buy_item({"stats":"health","price":8,"effect":15})) if state.money>=8 else tuple(state)
            else:
                raise Exception('Invalid action while in store')

        if action == 'play':
            state.is_playing = True
            state.second_pass()
            # if the tama is sleeping, playing won't earn any money - tama needs to be awake
            return tuple(state)

        # "walk" to the store (takes one turn)
        if action == 'store':
            state.is_playing = False
            state.in_store = True
            state.second_pass()
            return tuple(state)

        if action == 'nothing':
            state.is_playing = False
            state.second_pass()
            return tuple(state)
       
    # observation function for the POMDP
    # if other actions provide observations, put that info in here, so this fxn looks more
    # like next_state (e.g., if action = "snack" then observe hunger level)
    def observation(self, tupletama, action):        
        stats = dict(tupletama[0])
        if action == 'check-happy':
            return ['happiness', stats['happiness']]
        elif action == 'check-energy':
            return ['energy', stats['energy']]
        elif action == 'check-food':
            return ['food', stats['food']]
        elif action == 'check-health':
            return ['health', stats['health']]
        elif action == 'check-hygiene':
            return ['hygiene', stats['hygiene']]
        elif action == 'check-asleep':
            return [1,tupletama[1]]
        elif action == 'check-playing':
            return [3,tupletama[3]]
        elif action == 'check-sick':
            return [4,tupletama[4]]
        elif action == 'check-money':
            return [6,tupletama[6]]
        elif action == 'check-instore':
            return [8,tupletama[8]]
        elif action == 'check-numpoo':
            return [5,tupletama[5]]
        else:
            return []

        
    # Take the full action and observation history, and return the full list
    # of actions that are legal actions
    def legal_actions(self, history): 
        # this method is important for MDPs/games where the actions might change based on the game state
        # for instance, in checkers, if player put a piece on square A4, no other piece can move to A4
        
        # get object version of most recent tamagotchi state
#         tama = self.unpack(state_history[-1])
#         tupletama = state_history[-1] # avoid unpacking bc it takes a long time to do
#         print(history)
        last_act = history[-1]
        
        # actions you can take whenever wherever
        free_acts = ['nothing','check-happy','check-energy',
                     'check-food','check-health','check-hygiene',
                     'check-asleep','check-playing','check-money',
                     'check-sick','check-instore','check-numpoo']
        
        if last_act==[] or last_act=='play':
            return ['play','store'] + free_acts
        elif last_act in ['store','coffee','snack','clean','medicine']:
            return ['coffee','snack','clean','medicine'] + free_acts
        else:
            return self.legal_actions(history[:-1])
        
#         if not tupletama[2]: # not dead
#             if tupletama[8]: # in store
#                 if tupletama[6] >= 8:
#                     return ['coffee','snack','clean','medicine'] + free_acts
#                 elif tupletama[6] >= 5:
#                     return ['coffee','snack','clean'] + free_acts
#                 elif tupletama[6] >= 3:
#                     return ['coffee','snack'] + free_acts
#                 else: # too poor to shop :(
#                     return free_acts
#             else: 
#                 return ['play','store'] + free_acts
#         else:
#             return []

    def legal_actions_state(self,state):
        # actions you can take whenever wherever
        free_acts = ['nothing','check-happy','check-energy',
                     'check-food','check-health','check-hygiene',
                     'check-asleep','check-playing','check-money',
                     'check-sick','check-instore','check-numpoo']
        if state[8]: # in store
            return ['coffee','snack','clean','medicine'] + free_acts
        else: 
            return ['play','store'] + free_acts
#         if not state[2]: # not dead
#         else:
#             return []
    
    # Should this be the length of state_history (how long tama alive for)? or is it trial by trial rwd?
    def reward(self, tupletama,action):
#         tupletama = state_history[-1]
        
        done = False
        if tupletama[2]: # tama dead
            reward = tupletama[7] + 1 # you killed the tama :'( enjoy your MONEY
            done = True
        else:
            # reward is how long you kept the tama alive
#             reward = tupletama[7] + 1 
            reward = 0

        return reward, done
    
    # GENERATOR MODEL OF TAMAGOTCHI GAME
    # returns next state, observation, and reward given an action taken in given state
    # takes tuple state
    def G_model(self,state,action):
        s = self.next_state(state,action)
        obs = self.observation(state,action)
        rwd,done = self.reward(state,action) # note that this should be more like immediate reward of state, not long-term?
        return s, obs, rwd, done
    
#     # does this need to be a class method?
#     def unpack(self,tupletama):
#         tama = Tamagotchi()
#         tama.stats = dict(tupletama[0])
#         tama.is_sleeping = tupletama[1]
#         tama.is_dead = tupletama[2]
#         tama.is_playing = tupletama[3]
#         tama.is_sick = tupletama[4]
#         tama.number_of_poo = tupletama[5]
#         tama.money = tupletama[6]
#         tama.time = tupletama[7]
#         tama.in_store = tupletama[8]
#         return tama
    
    # Initial state distribution
    # is this ok? or slow bc transform to tuple?
    def sample_prior(self):
        s = Tamagotchi()
        return tuple(s)
    
    # when filtering particles, this is the rule to keep one given a real observation
    def keep_particle(self, part, real_obs):
        trash_prob = 0.8
        if real_obs == []:
            return True
        if part != real_obs and random.random() < trash_prob:
            return False
        return True

    #     if real_obs == []:
    #         return True
    #     else:
    #         if part == real_obs:
    #             return True
    #     return False

    
    # generate a new particle from one randomly sampled from current belief (e.g., just add a lil noise)
    # this might need work
    def new_particle(self, part):
        noise = [-2,-1,0,1,2] # add some artificial noise
        s = part
        stats = dict(s[0])
        for statistic, value in stats.items():
            stats[statistic] = constrain(stats[statistic] + random.choice(noise))
        return s
    
# does this need to be a class method?
def unpack(tupletama):
    tama = Tamagotchi()
    tama.stats = dict(tupletama[0])
    tama.is_sleeping = tupletama[1]
    tama.is_dead = tupletama[2]
    tama.is_playing = tupletama[3]
    tama.is_sick = tupletama[4]
    tama.number_of_poo = tupletama[5]
    tama.money = tupletama[6]
    tama.time = tupletama[7]
    tama.in_store = tupletama[8]
    return tama

In [30]:
class SearchTree(object):
    def __init__(self,visits=1,value=0):
        self.visits = visits
        self.value = value
        self.children=[]
        
        
class ActionNode(SearchTree):
    def __init__(self,action=None,visits=1,value=0):
        super().__init__(visits,value)
        self.action = action
        
        
class ObservationNode(SearchTree):
    def __init__(self,observation=[],visits=1,value=0,belief=[]):
        super().__init__(visits,value)
        self.observation = observation
        self.belief = belief
        
    def expand(self, legal_actions):
        for a in legal_actions:
            self.children += [ActionNode(a)]
            
        # upper confidence bound value for given node "child"
    def ucb(self, child): #maybe use index of child not object
#         print("self.visit=",self.visit," len of self.children=",len(self.children))
        logval = math.log(self.visits) #, len(self.children))
        div = logval / child.visits
        return math.sqrt(div)
    
#     def sample_belief(self):
#         return random.choice(self.belief)
    
    def next_hist(self,action,obs):
        act_child = next((c for c in self.children if c.action==action), None)
        assert act_child != None, "shouldn't you be expanded already?"
        assert isinstance(act_child, ActionNode), "action child should be an action node!"
        
        obs_child = next((c for c in act_child.children if c.observation == obs), None) 
        if obs_child is None:
            act_child.children += [ObservationNode(obs)]
#             print(act_child.children[0].observation)
            obs_child = next((c for c in act_child.children if c.observation == obs), None)     
        return obs_child
    
#     def next_hist_rollout(self,action,obs):

In [38]:
class POMCP:
    def __init__(self, 
                 game,
                 discount=0.8,
                 explore=1,
                 epsilon=1e-7,
                 n_particles=100,
                 reinvigoration=20, 
                 **kwargs):
        
#         self.context = {}
        self.game = game
        self.discount = discount
        self.epsilon = epsilon
        self.explore = explore
        self.n_particles = n_particles
        self.reinvigoration = reinvigoration
        self.G = game.G_model      
        self.tree = None
        self.history = []
        # list of all possible actions
#         self.actions = kwargs.get('actions') 
        
        seconds = kwargs.get('time',30)
        self.calculation_time = datetime.timedelta(seconds=seconds)
        self.maxdepth = kwargs.get('maxdepth',20)
        self.nsims = kwargs.get('nsims',1000)
        
    def search(self,obs):
        
        self.history += [obs]
        
        if self.tree is None:
            self.tree = ObservationNode(obs)                        
#             particle = self.game.sample_prior()
#             self.simulate(particle,self.tree,0)
        else:
            self.prune_tree(obs)
            
        for _ in range(self.nsims):
            particle = self.draw_sample()
            self.simulate(particle,self.tree,0)
        
        child = self.greedy_action_selection(self.tree,self.game.legal_actions(self.history)) # will again need to handle legal actions differently for real
        self.tree = child # move forward to child action node (will move to obs node when real obs occurs)
        self.history += [child.action]
        
        return child.action
    
    def simulate(self,state,tree,depth):
        if depth >= self.maxdepth:
            return 0
        
#         legal = self.game.legal_actions(state,tree,depth)
#         legal = self.game.legal_actions(self.history) # would want it to be more elegant/complicated for real
        legal = self.game.legal_actions_state(state)
    
        if len(tree.children) == 0:
            tree.expand(legal)
            return self.rollout(state,depth)
        
        if len(legal)==1:
            action = legal[0]
            child = tree.children[0]
        else:
            child = self.ucb_action_selection(tree,legal)
            action = child.action
            
        next_state, next_obs, r, done = self.G(state,action)
        if done:
            return r
        next_tree = tree.next_hist(action,next_obs)
        reward = r + self.discount * self.simulate(next_state,next_tree,depth+1)
        
        tree.belief += [state] 
        tree.visits += 1
        
        child.visits += 1
        child.value += (reward - child.value)/child.visits
        
        return reward
    
    def rollout(self,state,depth):
        if depth >= self.maxdepth:
            return 0
        
#         legal = self.game.legal_actions(self.history)
        legal = self.game.legal_actions_state(state)
        a = random.choice(legal)
        
        next_state, next_obs, r, done = self.G(state,a)
#         next_tree = tree.next_hist(a,next_obs)
        
        if done:
            return r
        
        return r + self.discount * self.rollout(next_state,depth+1)
        
    ! ! ! ! ! ! ! ! ! ! ! ! ! ! ! ! ! ! ! ! ! ! ! !
    ## ## ## ## ## ## ## ## ## ## ## ## ## ## ## ##
    ## ## ## ## ## FIX THIS !! ## ## ## ## ## ## ## 
    ## ## ## ## ## ## ## ## ## ## ## ## ## ## ## ##
    ! ! ! ! ! ! ! ! ! ! ! ! ! ! ! ! ! ! ! ! ! ! ! !
    def prune_tree(self,obs):
        #current tree is an action node. find child node with observation obs
        obs_child = next((c for c in self.tree.children if c.observation == obs), None) 
        self.tree = obs_child
        return
        
    def greedy_action_selection(self,tree,legal):
        children = [child for child in tree.children if child.action in legal] #filter(lambda child: child.action in legal_actions, tree.children)
        child_vals = np.array([child.value for child in children])
        favechildren = np.argwhere(child_vals == np.amax(child_vals))
        child = children[random.choice(favechildren.flatten().tolist())]
        return child
        
    def ucb_action_selection(self,tree,legal):
        children = [child for child in tree.children if child.action in legal] #filter(lambda child: child.action in legal_actions, tree.children)
        child_vals = np.array([child.value + self.explore * tree.ucb(child) for child in children])
        favechildren = np.argwhere(child_vals == np.amax(child_vals))
        child = children[random.choice(favechildren.flatten().tolist())]
        return child
    
    def draw_sample(self):
        
        if not isinstance(self.tree, ObservationNode):
            pdb.set_trace()
#         assert isinstance(self.tree, ObservationNode), "why you not an obs node ?!?!"
        if self.tree.belief == []:
            return self.game.sample_prior()
        else:
            return random.choice(self.tree.belief)

In [32]:
s = tuple(Tamagotchi())
game = Tamagotchi_Game()
agent = POMCP(game, 0.9, maxdepth=20, nsims=10000)

In [33]:
# Check that the tree runs simulations to choose the next action using the choose_move() method

print("INITIAL STATE: ")
print(s) # initial tiger problem state

obs = []

action = agent.search(obs)
print("Taking action:", action)
obs = game.observation(state,action)
if obs!=[]:
    print("observed ",obs)
state = game.next_state(s,action)
game.reward(s,action)

INITIAL STATE: 
((('food', 100), ('happiness', 100), ('hygiene', 100), ('health', 100), ('energy', 100)), False, False, False, False, 0, 0, 0, False)
Taking action: check-energy
observed  ['energy', 98.0]


(0, False)

In [39]:
# now let's compare the performance of different nsims n

storeobservations = []
storeactions = []
storerewards = []
ns = [100, 500, 1000, 5000, 10000]
c = 20

for n in ns:
    print("nsims = ", n)
    # Initialize the tiger
    s = tuple(Tamagotchi())
    game = Tamagotchi_Game()

    print("INITIAL STATE: ")
    print(s) # initial tiger state

    agent = POMCP(game, 0.9, c, maxdepth=20, nsims=n)

    action_seq = []
    obs_seq = []
    state = s
    obs = []
    R = 0


    while len(action_seq) < 20: # play for a certain amount of time (better rule?)

        action = agent.search(obs)
        action_seq.append(action)
        print('Action %i: True state is %s'% (len(action_seq), state))
        print("Taking action %s."% action)

        obs = game.observation(state,action)
        obs_seq.append(obs)
        
        if obs!=[]:
            print("observed ",obs)

        r,_= game.reward(state,action)
        R = R + r
        print("Reward so far: ",R)    

        state = game.next_state(state,action)

    print("game over!")
    storeobservations.append([obs_seq])
    storeactions.append([action_seq])
    storerewards.append(R)

nsims =  100
INITIAL STATE: 
((('food', 100), ('happiness', 100), ('hygiene', 100), ('health', 100), ('energy', 100)), False, False, False, False, 0, 0, 0, False)
Action 1: True state is ((('food', 100), ('happiness', 100), ('hygiene', 100), ('health', 100), ('energy', 100)), False, False, False, False, 0, 0, 0, False)
Taking action check-numpoo.
observed  [5, 0]
Reward so far:  0
Action 2: True state is ((('food', 98.0), ('happiness', 98.0), ('hygiene', 98.0), ('health', 100), ('energy', 98.0)), False, False, False, False, 0, 0, 1, False)
Taking action check-sick.
observed  [4, False]
Reward so far:  0
Action 3: True state is ((('food', 96.0), ('happiness', 96.0), ('hygiene', 96.0), ('health', 100), ('energy', 96.0)), False, False, False, False, 0, 0, 2, False)
Taking action check-sick.
observed  [4, False]
Reward so far:  0
Action 4: True state is ((('food', 94.0), ('happiness', 94.0), ('hygiene', 94.0), ('health', 100), ('energy', 94.0)), False, False, False, False, 0, 0, 3, False)


AttributeError: 'NoneType' object has no attribute 'belief'