In [1]:
from dataclasses import dataclass
from typing import Dict, Iterator, List, Mapping, Tuple
from rl.distribution import Categorical
from rl.dynamic_programming import value_iteration_result
from rl.markov_decision_process import FiniteMarkovDecisionProcess
import itertools as it
import numpy as np


#### Problem (3):

#### Conventions used:

We will represent a given roll of $N$ dice with a vector $\vec{v}$ for which $v_i$ equals the number of rolls equal to $i+1$ (using zero-indexing).  Therefore, the vector $(1, 0, 3, 2)$ denotes a roll of one 1, three 3's, and two 4's.  This convention allows us to work with a reasonably-sized state space, and prevent an exponential explosion resulting from taking all possible rearrangements of dice into account.

In [2]:
def roll_probability(roll: Tuple[int], K: int) -> float:
    """Calculate the probability of a given roll vector."""
    num_dice: int = sum(roll)
    p: float = np.math.factorial(num_dice)

    for roll_count in roll:
        p /= np.math.factorial(roll_count)

    p *= np.power(K, -float(num_dice))
    return p


@dataclass(frozen=True)
class Dicegame_State:
    """Class storing a state for our dice game."""

    hand: Tuple[int]
    avail_rolls: Tuple[int]


@dataclass(frozen=True)
class Roll_Choice:
    """Class storing dice choice as an action."""

    choice: Tuple[int]


@dataclass
class Dicegame_StateAction_Transition:
    """Main class storing the state action map of our dice game."""

    N: int
    K: int
    C: int

    @staticmethod
    def build_counter_array(selected_idx: int, tup: Tuple[int]):
        """Create a vector storing the number of values for a given roll."""
        arr = np.array(tup)
        arr[selected_idx] += 1
        return tuple(arr)

    def get_all_possible_choices(self, state: Dicegame_State) -> Iterator[Roll_Choice]:
        """Get all possible dice choices from a given state's available rolls."""
        return map(
            Roll_Choice,
            (
                filter(
                    lambda x: sum(x) > 0,
                    it.product(*[np.arange(idx + 1) for idx in state.avail_rolls]),
                )
            ),
        )

    @property
    def get_stateaction_map(
        self,
    ) -> Mapping[Roll_Choice, Categorical[Tuple[Dicegame_State, float]]]:
        """Create the entire state action map."""
        self.stateaction_map = {}
        all_rolls = set(
            filter(
                lambda x: sum(x) <= self.N,
                it.product(*[np.arange(self.N + 1) for _ in range(self.K)]),
            )
        )
        for hand in all_rolls:

            for roll in filter(lambda x: sum(x) == (self.N - sum(hand)), all_rolls):

                state = Dicegame_State(hand=hand, avail_rolls=roll)
                for selection in self.get_all_possible_choices(state=state):
                    self.add_stateaction(state=state, action=selection)
                    assert state in self.stateaction_map.keys()

        return self.stateaction_map

    def add_stateaction(self, state: Dicegame_State, action: Roll_Choice) -> None:
        """Add a state-action to the state action map."""
        if state in self.stateaction_map.keys():

            self.stateaction_map[state].update(
                {action: self.prob(action=action, state=state)}
            )
        else:
            self.stateaction_map[state] = {
                action: self.prob(action=action, state=state)
            }

    def reward(self, next_state: Dicegame_State) -> int:
        """Calculate the reward at the end of the game."""
        if sum(next_state.hand) == self.N:
            if next_state.hand[0] >= self.C:
                return sum((i + 1) * next_state.hand[i] for i in np.arange(self.K))
        return 0

    def prob(
        self, state: Dicegame_State, action: Roll_Choice
    ) -> Categorical[Tuple[Dicegame_State, float]]:
        """Calculate the distribution of outcomes for a given state-action."""
        dist: Dict[Tuple[Dicegame_State, int], float] = {}
        next_hand: Tuple[int] = tuple(np.array(state.hand) + np.array(action.choice))
        num_remaining_dice: int = self.N - sum(state.hand) - sum(action.choice)

        for next_roll in filter(
            lambda x: sum(x) == num_remaining_dice,
            it.product(*[np.arange(self.N) for _ in range(self.K)]),
        ):
            next_state = Dicegame_State(hand=next_hand, avail_rolls=next_roll)

            p: float = np.math.factorial(num_remaining_dice)

            for roll_count in next_roll:
                p /= np.math.factorial(roll_count)

            p *= np.power(self.K, -float(num_remaining_dice))

            dist[next_state, self.reward(next_state=next_state)] = p

        return Categorical(dist)


In [3]:
d = Dicegame_StateAction_Transition(N=6, K=4, C=1)
mdp = FiniteMarkovDecisionProcess(d.get_stateaction_map)

opt_v, opt_policy = value_iteration_result(mdp=mdp, gamma=1.0)

# calculate the expected reward when starting from scratch: state.hand == (0, 0, 0, 0)
expected_reward: float = 0.0
for init_state in list(filter(lambda x: x.state.hand == (0, 0, 0, 0), opt_v.keys())):
    expected_reward += opt_v[init_state] * roll_probability(
        init_state.state.avail_rolls, K=4
    )

print(f"3.2.1 : Expected reward when playing optimally : {expected_reward}")

# a roll of (1, 2, 2, 3, 3, 4) is represented as (1, 2, 2, 1)
target_state = list(
    filter(
        lambda x: x.hand == (0, 0, 0, 0) and x.avail_rolls == (1, 2, 2, 1),
        opt_policy.action_for.keys(),
    )
)[0]
opt_action = opt_policy.action_for[target_state]

print(
    f"3.2.2 : Optimal action when the first roll is (1, 2, 2, 3, 3, 4) : {opt_action.choice}"
)


3.2.1 : Expected reward when playing optimally : 18.390390253776786
3.2.2 : Optimal action when the first roll is (1, 2, 2, 3, 3, 4) : (0, 0, 0, 1)


Therefore, for question 3.2.2., if we encounter the roll (1, 2, 2, 3, 3, 4) as our first roll, the optimal action is to choose the single die that rolled 4.