# 葫蘆問

References

- http://koaning.io/monopoly-simulations.html
- https://devpost.com/software/monopoly-simulation
- https://github.com/williamhbell/MonopolySimulation/blob/master/doc/MonopolyWorksheet.pdf
- https://github.com/jm-contreras/monopoly
- [Monopoly as a Markov process](https://faculty.math.illinois.edu/~bishop/monopoly.pdf)

## 規則

http://www.jen-pin.com.tw/doc/2013110810042950

**規則一「先走後跳」**

擲點數後，先前進到所擲的位置，再跳到所擲位置的對應圖案。這是什麼意思？以下以圖形來說明。

![規則1](http://www.jen-pin.com.tw/data/2013110810042950/attachments/image010.gif)
丟出2點，先走到**2驢**的地方>>再跳到**同樣是驢的14**位置。

**規則二「頂位」**

若四個人玩，乙丟3點，先走到3葫蘆，再跳到12葫蘆。可是這時候，丙也丟出3點，則丙會把乙從12葫蘆，**擠回**對應的3葫蘆，稱為「**頂位**」。(要注意，「頂位」發生在跳躍的位置，不是擲出點數的位置哦！)

![規則2](http://www.jen-pin.com.tw/data/2013110810042950/attachments/image012.gif)
丟出3點，先走到**3葫蘆**的地方>>再跳到對應的**12葫蘆**的位置。

**規則三「三打不成」**

如果丁也丟出3點，三個人踩在同一位置上，就稱為「**三打不成**」，只能「**和局**」重新開始

![規則3](http://www.jen-pin.com.tw/data/2013110810042950/attachments/image014.gif)
擲出12點，先走到**12葫蘆**的地方，再跳到相對應的**3葫蘆**的位置，也就是只前進三格@@

---

In [None]:
import logging
from datetime import datetime
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import string
import json
from tqdm import tnrange, tqdm_notebook
from pathlib import Path

import config

In [None]:
# Logging
logger = logging.getLogger()
logger.setLevel(logging.DEBUG)

### Dice

In [None]:
class Dice(object):
    def __init__(self):
        self.roll_sum = None

    def roll(self):
        '''Roll two fair six-sided die and store the sum of the roll'''

        roll = np.random.choice(np.arange(1, 7), 2)
        self.roll_sum = roll.sum()

### Board

In [None]:
class Board(object):
    def __init__(self, board_df):
        self.board = {}
        self.goal_fig = None
        self.board_max = None
        self.__board_df = board_df
        self._get_board(self.__board_df)
    
    def _get_board(self, board_df):
        '''
        Create board game with properties from CSV file in board_file.
        :param str board_file: Filename of CSV with board parameters
        '''

        board_df = self.__board_df
        self.goal_fig = board_df.loc[board_df['position'].idxmax(), 'figure']

        for index, row in board_df.iterrows():
            if row['figure'] != self.goal_fig:
                self.board[int(row['position'])] = {
                    'figure': row['figure'], 
                    'dual_position': int(row['dual_position']),
                    'occupant': []
                }
                self.board[int(row['dual_position'])] = {
                    'figure': row['figure'], 
                    'dual_position': int(row['position']),
                    'occupant': []
                }
            else:
                self.board[int(row['position'])] = {
                    'figure': row['figure'], 
                    'occupant': []
                }
                
        self.board_max = max(self.board.keys())
        
    def to_dataframe(self):
        return self.__board_df
    
    def __str__(self):
        board_list = ['{pos:02d}{occupant}'.format(
                            pos=i+1, 
                            occupant=[o.id for o in self.board[i+1]['occupant']] or ''
                      )
                      for i in range(self.board_max)]
        
        out_list = ['{:7}'.format(ele) if (i+1) % 10 != 0 else '{:7}|\n'.format(ele)
                    for i, ele in enumerate(board_list)]
        
        return '|' + '|'.join(out_list) + '|'
    
    
class BoardSnapshot(object):
    def __init__(self, board):      
        self.board = [{'position': k,
                       'occupant': [p.id for p in space['occupant']]}
                      for k, space in board.board.items()]
        self.board.sort(key=lambda x: x['position'])
        self.board_max = max([space['position'] for space in self.board])
        
    def __str__(self):
        board_list = ['{pos:02d}{occupant}'.format(
                            pos=space['position'], 
                            occupant=[o for o in space['occupant']] or ''
                      )
                      for space in self.board]
        
        out_list = ['{:7}'.format(ele) if (i+1) % 10 != 0 else '{:7}|\n'.format(ele)
                    for i, ele in enumerate(board_list)]
        
        return '|' + '|'.join(out_list) + '|'

### Game

In [5]:
class Huluwen(object):
    def __init__(self, n_players, board_filename):
        self._n_players = n_players
        self._board_filename = board_filename
        self.__board_df = pd.read_excel(board_filename)
    
    def init_game(self):
        self._history_temp = {'turn': [], 'board': []}
        self.game = Game(self._n_players, Board(self.__board_df))
        
    def init_history(self):
        self._history = []
    
    def run_game(self, max_round=np.Inf, is_sim=False):
        # Initialize game
        self.init_game()
        if not is_sim:
            self.init_history()
        
        g = self.game
        
        while g.status == 'on':
            
            # Update game round
            g.update_round()
            
            # Define player of turn
            for turn_player in g.players:
                
                # Continue until turn ends
                while True:
                    
                    g.update_turn()
                    
                    # Roll the dice
                    g.dice.roll()
                    
                    # Move player
                    move_event = turn_player.move(g.dice.roll_sum, g.board)
                    logger.debug('\n' + str(g.board))
                    #logger.debug(g.players_info)
                    
                    # Check game status
                    if turn_player.position == g.board.board_max:
                        g.status = 'set'
                        g.winner = turn_player.id
                        self._update_history(g, turn_player, move_event)
                        break
                    
                    if len(g.board.board[turn_player.position]['occupant']) > 1:
                        g.status = 'tie'
                        self._update_history(g, turn_player, move_event)
                        break
                    
                    # Update history
                    self._update_history(g, turn_player, move_event)
                                        
                    # End
                    break
                    
                # Stop if game set
                if g.status in ('set', 'tie'):
                    break

            # Stop if exceed max_round
            if g.round > max_round:
                break
        
        if not is_sim:
            self._update_sim_history(
                self._n_players,
                self.game.round, 
                self.game.status,
                self.game.winner,
                self._history_temp['turn'],
                self._history_temp['board'],
            )
        logger.info('Game result: {result}'.format(result=g.status))
        
        
    def simulate(self, n_iter):
        self.init_history()
        
        for _ in tnrange(n_iter, desc='simulate'):
            self.init_game()
            self.run_game(is_sim=True)
            self._update_sim_history(
                self._n_players,
                self.game.round, 
                self.game.status,
                self.game.winner,
                self._history_temp['turn'],
                self._history_temp['board'],
            )
        
        if self._n_players == 2:
            res, _, _ = self.get_sim_res()
            assert (res.result != 'set') is not None, 'Must not be tie when 2 players'
        
        return self._history
        
 
    def _update_history(self, game, player, move_event):
        self._history_temp['turn'].append({
            **game.log[0], # turn, round, dice_roll, status
            'player': player.id, 
            'move_event': move_event,
            'old_position': player.old_position,
            'to_position': player.position
        })
        self._history_temp['board'].append(game.log[1])
        
        
    def _update_sim_history(self, n_players, end_rounds, result, winner, turn, board_snapshot):
        self._history.append({
            'n_players': n_players,
            'end_rounds': end_rounds, 
            'result': result,
            'winner': winner,
            'turn': turn,
            'board': board_snapshot
        })
    
    
    def get_sim_res(self):
        '''Get simulation result.
        
        Results:
            iter_df
            turn_df_ls: turn, round, dice_roll, status, player, move_event, old_position, to_position
            board_ls
        '''
        
        fields = ['n_players', 'end_rounds', 'result', 'winner']
        iter_df = pd.DataFrame([{k: record[k] for k in fields}
                                for record in self._history])
        turn_df_ls = [pd.DataFrame(record['turn']) for record in self._history]
        board_ls = [record['board'] for record in self._history]
        
        return iter_df, turn_df_ls, board_ls
   

    def export_sim_res(self, output_path):
        '''Export simulation results
        
        Args:
            output_path (str): Path to export files to.
           
        Results:
            Output of 3 files:
                - iter_records.csv
                - turn_records.csv
                - board_records.csv
        '''
        
        output_path = Path(output_path).expanduser()
        
        if not output_path.exists():
            raise ValueError(f'Path does not exist! (output_path.as_posix())')

        iter_df, turn_df_ls, board_ls = self.get_sim_res()
        
        iter_df['iter'] = iter_df.index + 1
        iter_df.to_csv(output_path / Path(f'iter_records_{self._n_players}p.csv'), index=False)
        
        turn_df_combine = pd.concat([df.assign(iter = i+1) for i, df in enumerate(turn_df_ls)])
        (
            turn_df_combine.
            set_index(['iter', 'turn']).
            to_csv(output_path / Path(f'turn_records_{self._n_players}p.csv'))
        )
        
        board_records = [
            {
                'iter': i + 1,
                'turn': j + 1,
                'space_occuppied': list(map(lambda x: x['position'], 
                        filter(lambda x: x['occupant'] != [],
                           board_dict.board)))
            }
            for i, board_turn in enumerate(board_ls)
            for j, board_dict in enumerate(board_turn[:-1])
        ]
        (
            pd.DataFrame(board_records).
            set_index(['iter', 'turn']).
            to_csv(output_path / Path(f'board_records_{self._n_players}p.csv'))
        )
    

    def get_history(self, round_, iter_=0):
        '''Get nth iter of history.
        
        Args:
            round_ (slice): N-th round within an iter.
            iter_range (slice): N-th iter of simulation.
            
        Returns:
            tuple(turn, board): 
                - `turn`: turn, round, dice_roll, status, player, move_event, old_position, position; 
                - `board`: BoardSnapshot object
         '''
        
        return self._history[iter_]['turn'][round_], self._history[iter_]['board'][round_]
    
    @property
    def board_df(self):
        return self.__board_df
    
    @property
    def game_info(self):
        return {'n_players': self._n_players, 'board': str(_board_filename)}

In [6]:
class Game(object):
    '''Keeps track of all game pieces.'''
    
    def __init__(self, n_players: int, board):
        self.players = []
        self.winner = None
        self.turn = 0
        self.round = 0
        self.dice = Dice()
        self.status = 'on'

        self.board = board
        self.goal_fig = board.goal_fig
        
        self._get_players(n_players)

    def _get_players(self, n_players):
        '''
        Create list of 2 to 8 game players.
        :param int n_players: Number of players in game
        '''

        # Ensure number of players requested is legal
        if (n_players < 2) or (6 < n_players):
            raise ValueError('A game must have between 2 to 6 players. You input {} players.'.format(n_players))

        # Create list of players and set number of players remaining
        self.players = [Player(string.ascii_uppercase[p], self.board) for p in range(n_players)]
    
    def update_turn(self):

        self.turn += 1
        logger.debug(f'Turn: {self.turn}')
        
    
    def update_round(self):

        self.round += 1
        
        if config.verbose['round']:
            logger.info('Starting round {round}...'.format(round=self.round))
    
    @property
    def log(self):
        turn_log = {
            'turn': self.turn,
            'round': self.round,
            'dice_roll': self.dice.roll_sum,
            'status': self.status,
        }

        return turn_log, BoardSnapshot(self.board)

    @property
    def players_info(self):
        return pd.DataFrame([{'turn': self.turn, **p.info} for p in self.players])

### Player

In [7]:
class Player(object):
    def __init__(self, player_id, board, start_pos=0):
        '''Return a Player object.'''
        self.id = str(player_id)
        self.position = start_pos
        self.old_position = start_pos
        self._board_max = int(board.board_max)

    def move(self, roll, board):
        '''
        Move player on the board. Update player's position and collect $200 if player passed Go.
        :param int roll: Number of board positions to move
        '''
        
        board = board.board
        
        # 先走
        old_position = self.old_position = self.position
        new_position = self.position + roll
        
        # 檢查是否到終點
        if new_position == self._board_max:
            self.position = new_position
            board[old_position]['occupant'] = []
            board[new_position]['occupant'] = [self]
            logger.debug(f'{self.id} | roll: {roll} | pos: {old_position} => {new_position} | ({board[self.position]["figure"]})')
            return  
        
        # 後跳
        # 超過終點要倒退
        if new_position > self._board_max:
            new_position -= (new_position - self._board_max) * 2
        
        new_position = board[new_position].get('dual_position')
        self.position = new_position
        
        # Define current board space
        space = board[self.position]
        space_dual = board[space['dual_position']]
        
        logger.debug(f'{self.id} | roll: {roll} | pos: {old_position} => {new_position} | ({space["figure"]})')
        
        occupant_now = space['occupant']
        occupant_dual = board[space['dual_position']]['occupant']
        
        # Clear old position
        if old_position != 0:
            board[old_position]['occupant'] = []
        
        move_event = None
        if occupant_now:
            # 無走
            if old_position == new_position: 
                space['occupant'] = [self]
                move_event = '無走'
                logger.debug('(無走)')
            # 三打不成
            elif occupant_dual and self not in occupant_dual:
                space['occupant'].append(self)
                logger.debug('(三打不成)')
            # 頂位
            else:
                dual_position = space['dual_position']
                board[dual_position]['occupant'] = occupant_now
                occupant_now[0].position = dual_position
                space['occupant'] = [self]
                move_event = '頂位'
                logger.debug('(頂位)')
        else:
            space['occupant'] = [self]
            
        return move_event

    @property
    def info(self):
        return {'id': self.id, 'position': self.position, 'old_position': self.old_position}

---

## Simulation

In [8]:
def run_sim(n_players, N_ROUND):
    logger.setLevel(logging.WARNING)
    h = Huluwen(n_players=n_players, board_filename=config.board_filename)

    tic = datetime.now()
    h.simulate(N_ROUND)    
    toc = datetime.now()

    print('{} rounds | time elapsed:{} ({} per round)'.format(N_ROUND, toc - tic, (toc - tic)/N_ROUND))
    
    return h

In [9]:
h = run_sim(2, 100)

A Jupyter Widget


100 rounds | time elapsed:0:00:02.931056 (0:00:00.029311 per round)


In [10]:
# h.export_sim_res('simulation_history')

### Chekc simulation results

In [11]:
iter_df, turn_df_ls, board_ls = h.get_sim_res()

In [12]:
iter_df.head()

Unnamed: 0,end_rounds,n_players,result,winner
0,29,2,set,B
1,3,2,set,A
2,10,2,set,A
3,11,2,set,B
4,23,2,set,B


In [13]:
turn_df_ls[0].head()

Unnamed: 0,dice_roll,move_event,old_position,player,round,status,to_position,turn
0,8,,0,A,1,on,22,1
1,10,,0,B,1,on,1,2
2,5,,22,A,2,on,36,3
3,9,無走,1,B,2,on,1,4
4,7,,36,A,3,on,38,5


In [14]:
# print 10 steps of board
ITER = 0
for i in range(10):
    try:
        turn, player, old_p, to_p, = turn_df_ls[ITER].loc[i, ['turn', 'player', 'old_position', 'to_position']]
        print(f'(turn {turn}) [{player}] {old_p:02d} => {to_p:02d}')
        print(board_ls[ITER][i], '\n'+'='*81)
    except:
        pass

(turn 1) [A] 00 => 22
|01     |02     |03     |04     |05     |06     |07     |08     |09     |10     |
|11     |12     |13     |14     |15     |16     |17     |18     |19     |20     |
|21     |22['A']|23     |24     |25     |26     |27     |28     |29     |30     |
|31     |32     |33     |34     |35     |36     |37     |38     |39     |40     |
|41     |42     |43     |44     |45     |46     |47     | 
(turn 2) [B] 00 => 01
|01['B']|02     |03     |04     |05     |06     |07     |08     |09     |10     |
|11     |12     |13     |14     |15     |16     |17     |18     |19     |20     |
|21     |22['A']|23     |24     |25     |26     |27     |28     |29     |30     |
|31     |32     |33     |34     |35     |36     |37     |38     |39     |40     |
|41     |42     |43     |44     |45     |46     |47     | 
(turn 3) [A] 22 => 36
|01['B']|02     |03     |04     |05     |06     |07     |08     |09     |10     |
|11     |12     |13     |14     |15     |16     |17     |18     |19     |20   