In [None]:
from learner.pexp_mind import PExpMind
import core, models
from copy import deepcopy
from collections import defaultdict
import random
from core.board import Board

import numpy as np

from pyspark.sql import Row
import pyspark.sql.functions as F

In [None]:
def init_random_board():
    good_board = False
    while not good_board:
        round_board = Board(size=SIZE, win_chain_length=WIN_CHAIN_LENGTH)
        
        # randomize the board a bit
        for j in range(random.randint(0, int(SIZE * 2.5))):
            round_board.make_random_move()
            if round_board.game_over():
                break
                
        if not round_board.game_over():
            good_board = True
                
    return round_board

def versus(mind1_name, mind2_name, flipped):
    mind1 = PExpMind(size=SIZE, init=False, channels=CHANNELS)
    mind1.load_net('/hdfs/app/GCS_ANA/dsxusers/hseokho/q-gomoku/gomoku/models/9_4_4')
    #mind1.value_est.set_weights(broadcast_net_bc[mind1_name].get_weights()[0])     
    #mind1.policy_est.set_weights(broadcast_net_bc[mind1_name].get_weights()[1])    

    mind2 = PExpMind(size=SIZE, init=False, channels=CHANNELS)
    mind2.load_net('/hdfs/app/GCS_ANA/dsxusers/hseokho/q-gomoku/gomoku/models/9_8_14_18')
    #mind1.value_est.set_weights(broadcast_net_bc[mind2_name].get_weights()[0])     
    #mind1.policy_est.set_weights(broadcast_net_bc[mind2_name].get_weights()[1])    

    board = init_random_board()
    
    def expanding_p(depth, p):
        return np.logical_or(np.logical_or(
            np.logical_and(depth < 4, p > -5),
            np.logical_and(depth < 6, p > -4),
            np.logical_and(depth < 8, p > -4)),
            np.logical_and(depth < np.inf, p > -3)
        )

    def permissive_expansion(depth):
        if depth < 2:
            return np.inf
        if depth < 8:
            return 5
        return 3
    
    for mind in [mind1, mind2]:
        mind.define_policies(expanding_p, permissive_expansion, convergence_count=5,
                     alpha=0.2, q_exp_batch_size=SIZE ** 2,
                     p_exp_batch_size=SIZE ** 3, required_depth=6, max_iters=20)
        
    players = {}
    players[Board.FIRST_PLAYER] = mind1
    players[Board.SECOND_PLAYER] = mind2
    board_strings = []
    wins = {Board.FIRST_PLAYER : 0, Board.SECOND_PLAYER: 0}
    draws = 0

    while True:
        result = players[board.get_player_to_move()].make_move(board, 
                                                         as_player=board.get_player_to_move(),
                                                         verbose=False)
        board_strings.append(board.pprint())
        
        if result:
            if board.game_won():
                if board.get_player_to_move() == Board.FIRST_PLAYER:
                    wins[board.SECOND_PLAYER] += 1
                else:
                    wins[board.FIRST_PLAYER] += 1
            else:
                draws += 1
            break

    return Row(**{'boards' : board_strings, 
                  'wins_1': wins[Board.FIRST_PLAYER], 
                  'wins_2' : wins[Board.SECOND_PLAYER], 
                  'draws' : draws, 
                  'flipped' : flipped})

SIZE = 9
CHANNELS = 4
WIN_CHAIN_LENGTH = 5

In [None]:
rounds = 100

mind1_name = 'old_9_4_4'
mind2_name = 'new_9_4'

df = sc.parallelize([(i, (mind1_name, mind2_name, True)) for i in range(rounds)]) \
        .union(sc.parallelize([(i, (mind2_name, mind1_name, False)) for i in range(rounds, rounds * 2)])) \
        .partitionBy(rounds * 2, lambda x: x) \
        .map(lambda x : versus(x[1][0], x[1][1], x[1][2])).toDF()

#print('Mind 1 Wins / Mind 2 Wins / Draws', wins[1], wins[-1], draws)

In [None]:
df.write.saveAsTable('dsx_temp.sim_results', mode='overwrite')

In [None]:
spark.table('dsx_temp.sim_results').where(F.col('flipped')).agg(F.sum('draws'), F.sum('wins_1'), F.sum('wins_2')).collect()

In [None]:
spark.table('dsx_temp.sim_results').where(~F.col('flipped')).agg(F.sum('draws'), F.sum('wins_1'), F.sum('wins_2')).collect()