In [40]:
import itertools
from typing import Tuple, Iterable, Mapping, MutableSequence
from rl.markov_decision_process import FiniteMarkovDecisionProcess
from rl.distribution import FiniteDistribution, Categorical
from rl.markov_process import NonTerminal

from rl import dynamic_programming
from dataclasses import dataclass
import numpy as np

In [43]:
@dataclass(frozen=True)
class GameAction:
    K: int
    action: Tuple[int, ...]

@dataclass(frozen=True)
class GameState:
    K: int
    hand: Tuple[int, ...]
    table: Tuple[int, ...]



def get_possible_actions(s: GameState) -> Iterable[GameAction]:
    actions = []
    K = s.K
    for a in itertools.product(range(sum(s.table)+1), repeat=K):
        if sum(a) > 0:
            include = True
            for j in range(K):
                if a[j] > s.table[j]:
                    include = False
            if include:
                actions.append(GameAction(K, a))
    return actions

def uniform_multinomial(obs, n, K):
    ''' obs should be K-tuple summing to n'''
    denom = 1
    def factorial(n):
        if n == 1 or n == 0:
            return 1
        else:
            return factorial(n-1) * n
    for i in range(K):
        denom *= factorial(obs[i])

    return factorial(n) * (1./K)**n / denom

def compute_reward(C, K, hand: Tuple[int, ...]):
    if hand[0] >= C:
        return sum([(i+1) * hand[i] for i in range(K)])
    else:
        return 0

def generate_distribution(s: GameState, a: GameAction, C: int) \
        -> FiniteDistribution[Tuple[GameState, float]]:
    K = s.K
    distribution: Mapping[GameState, float] = {}
    hand: Tuple[int, ...] = tuple(np.add(s.hand, a.action))
    n = sum(s.table) - sum(a.action)
    assert n >= 0
    reward = 0 if n > 0 else compute_reward(C, K, hand)
    for table_roll in itertools.product(range(n+1), repeat=K):
        if sum(table_roll) == n:
            distribution[(GameState(K, hand, table_roll), reward)] = uniform_multinomial(table_roll, n, K)
    dist: FiniteDistribution[Tuple[GameState, float]] = Categorical(distribution)
    return dist

# a = get_possible_actions(GameState(3, (1,1,1), (1,1,2)))
# print("a:")
# print(a)
#
# d = generate_distribution(GameState(3, (1,1,1), (1,1,2)), GameAction(3, (1,1,1)), C=1)
# print()
# print("d:")
# print(d)

class DiceGameMDP(FiniteMarkovDecisionProcess[GameState, GameAction]):
    N: int
    K: int
    C: int

    def __init__(self, N: int, K: int, C: int):
        self.N, self.K, self.C = N, K, C
        non_terminal_states: MutableSequence[GameState] = []
        for i in itertools.product(range(N+1), repeat=2*K):
            if sum(i[:K]) < N and sum(i) == N:
                non_terminal_states.append(GameState(K, i[:K], i[K:]))
        mapping: Mapping[GameState, Mapping[GameAction, FiniteDistribution[Tuple[GameState, float]]]] = {s: {a: generate_distribution(s, a, C) for a in get_possible_actions(s)} for s in non_terminal_states}
        super().__init__(mapping)

game = DiceGameMDP(6, 4, 1)

In [44]:
opt_vf, opt_policy = dynamic_programming.value_iteration_result(game, 1)
print("opt_vf:", opt_vf)
print()
print("opt_policy:", opt_policy)

opt_vf: {NonTerminal(state=GameState(K=4, hand=(0, 0, 0, 0), table=(0, 0, 0, 6))): 18.376703715883195, NonTerminal(state=GameState(K=4, hand=(0, 0, 0, 0), table=(0, 0, 1, 5))): 18.376703715883195, NonTerminal(state=GameState(K=4, hand=(0, 0, 0, 0), table=(0, 0, 2, 4))): 18.376703715883195, NonTerminal(state=GameState(K=4, hand=(0, 0, 0, 0), table=(0, 0, 3, 3))): 18.376703715883195, NonTerminal(state=GameState(K=4, hand=(0, 0, 0, 0), table=(0, 0, 4, 2))): 18.376703715883195, NonTerminal(state=GameState(K=4, hand=(0, 0, 0, 0), table=(0, 0, 5, 1))): 18.376703715883195, NonTerminal(state=GameState(K=4, hand=(0, 0, 0, 0), table=(0, 0, 6, 0))): 17.390067176893353, NonTerminal(state=GameState(K=4, hand=(0, 0, 0, 0), table=(0, 1, 0, 5))): 18.376703715883195, NonTerminal(state=GameState(K=4, hand=(0, 0, 0, 0), table=(0, 1, 1, 4))): 18.376703715883195, NonTerminal(state=GameState(K=4, hand=(0, 0, 0, 0), table=(0, 1, 2, 3))): 18.376703715883195, NonTerminal(state=GameState(K=4, hand=(0, 0, 0, 0),

In [60]:
s = GameState(4, (0,0,0,0), (1,4,0,1))
print(f"s = {s}")
print(f"opt_vf = {opt_vf[NonTerminal(s)]}")
print(f"opt_policy = {opt_policy.action_for[s]}")

s = GameState(K=4, hand=(0, 0, 0, 0), table=(1, 4, 0, 1))
opt_vf = 18.376703715883195
opt_policy = GameAction(K=4, action=(0, 0, 0, 1))


In [49]:
expected_value = 0
p_arr = []
for roll in itertools.product(range(game.N+1), repeat=game.K):
        if sum(roll) == game.N:
            p = uniform_multinomial(roll, game.N, game.K)
            p_arr.append(p)
            expected_value += p * opt_vf[NonTerminal(GameState(game.K, (0,)*game.K, roll))]
print(f"Expected Value = {expected_value}")
print(f"Sum of probabilities = {sum(p_arr)}")

Expected Value = 18.390390253776786
Sum of probabilities = 1.0


In [47]:

mapped = game.mapping[NonTerminal(GameState(4, (1,0,0,0), (0,0,0,5)))]
for a in mapped.keys():
    print("a: ", a)
    for b in mapped[a]:
        print(b)
    print()

a:  GameAction(K=4, action=(0, 0, 0, 1))
((NonTerminal(state=GameState(K=4, hand=(1, 0, 0, 1), table=(0, 0, 0, 4))), 0), 0.00390625)
((NonTerminal(state=GameState(K=4, hand=(1, 0, 0, 1), table=(0, 0, 1, 3))), 0), 0.015625)
((NonTerminal(state=GameState(K=4, hand=(1, 0, 0, 1), table=(0, 0, 2, 2))), 0), 0.0234375)
((NonTerminal(state=GameState(K=4, hand=(1, 0, 0, 1), table=(0, 0, 3, 1))), 0), 0.015625)
((NonTerminal(state=GameState(K=4, hand=(1, 0, 0, 1), table=(0, 0, 4, 0))), 0), 0.00390625)
((NonTerminal(state=GameState(K=4, hand=(1, 0, 0, 1), table=(0, 1, 0, 3))), 0), 0.015625)
((NonTerminal(state=GameState(K=4, hand=(1, 0, 0, 1), table=(0, 1, 1, 2))), 0), 0.046875)
((NonTerminal(state=GameState(K=4, hand=(1, 0, 0, 1), table=(0, 1, 2, 1))), 0), 0.046875)
((NonTerminal(state=GameState(K=4, hand=(1, 0, 0, 1), table=(0, 1, 3, 0))), 0), 0.015625)
((NonTerminal(state=GameState(K=4, hand=(1, 0, 0, 1), table=(0, 2, 0, 2))), 0), 0.0234375)
((NonTerminal(state=GameState(K=4, hand=(1, 0, 0, 1),