In [1]:
from IPython.display import HTML, display

In [2]:
%load_ext Cython

In [3]:
%%cython
# cython: infer_types=True, annotation_typing=True
## cython: infer_types.verbose=True 
from IPython.display import HTML, display
cdef class Connect4:
    cdef public:
        long turn
        long long[2] data
    cpdef get_col_row(self, col: long, row: long):
        cdef long long mask
        pos = col * 7 + row
        mask = (<long long>1) << pos 
        if self.data[1] & mask:
            return 2
        return int(bool(self.data[0] & mask))
    
    cpdef is_end(self):
        cdef long long bitboard, bound, mask
        bitboard = self.data[1-self.turn%2]
        bound = (<long long>1)<<48 # 49 = 7*(6+1)  
        # horizontal: 0x204081 = 1|(1<<7)|(1<<14)|(1<<21)
        # vertical: 0xf = 1|(1<<1)|(1<<2)|(1<<3)
        # up-right: 0x1010101 = 1|(1<<8)|(1<<16)|(1<<24)
        # down-right: 0x208208 = (1<<3)|(1<<9)|(1<<15)|(1<<21)
        for mask in [0x204081, 0xf,  0x1010101, 0x208208]:
            while mask < bound:
                if mask & bitboard == mask:
                    return True
                mask <<= 1
        return False
    
    cpdef set_col_row(self, col:long, row:long, value:long):
        # assert value in [0,1,2]
        pos = col * 7 + row
        mask = (<long long>1) << pos
        neg_mask = ~mask       
        if value == 1 or value ==2:
            self.data[value-1] |= mask
            self.data[2-value] &= neg_mask
        else:
            self.data[0] &= neg_mask
            self.data[1] &= neg_mask
            
    def __init__(self, data=None, turn=0):
        if data is not None:
            self.data = data[:]
        else:
            self.data = [0, 0]
        self.turn = turn
        
    cpdef remove(self, col:long):
        shift = col*7
        mask = (((self.data[0]|self.data[1]) >> shift) &0x3f) +1
        mask = (mask >> 1) << shift
        # print(shift, hex(mask), hex(self.data[0]), hex(self.data[1]))
        neg_mask = ~mask
        self.data[0] &= neg_mask
        self.data[1] &= neg_mask
        
    cpdef move(self, col:long, test=False):
        # assert 0<= col <7
        shift = col*7
        mask = (((self.data[0]|self.data[1]) >> shift) &0x3f) +1
        # print("mask=", mask)
        if mask >= 64:
            return None
        if not test:
            self.data[self.turn%2] |= (mask<<shift)
            self.turn += 1
        return self
    
    def board_data(self):
        for i in range(7):
            for j in range(6):
                c = self.get_col_row(i,j)
                if c!=0:
                    yield i,j,c
                    
    def _repr_html_(self):
        def pos(i):
            return int(7+(220-6.5)*i/8)
        imgstr = "<img src='img/%s.png' width='23px' height='23px' style='position: absolute; top: %spx; left: %spx;margin-top: 0;z-index: %d' />"
        header = """<div style="width: 200px; height:180px;position: relative;background: blue">"""
        header += "\n".join(imgstr%('empty', pos(5-j), pos(i), 0) for i in range(7) for j in range(6))
        return header +"\n".join(imgstr%('red_coin' if c==1 else 'yellow_coin', pos(5-j), pos(i), 2) for (i,j,c) in self.board_data()) +"</div>"
    
    def display(self):
        display(HTML(self._repr_html_()))
    
    def __repr__(self):
        row_str = lambda j: "".join(".ox"[self.get_col_row(i,j)] for i in range(7))
        return "\n".join(row_str(j) for j in range(5,-1,-1))

from random import randint
def random_play(init_data=None, init_turn=0, display=False):
    game = Connect4(init_data, init_turn)
    while game.turn < 42 and not game.is_end():
        while game.move(randint(0,6)) is None:
            continue
    if display:
        game.display()
    if game.is_end():
        return game.turn
    return 0

  warn("get_ipython_cache_dir has moved to the IPython.paths module")


In [4]:
game = Connect4()
game.move(3).move(4).move(4).display()
game.move(2).move(1).move(2).display()
print(repr(game))

.......
.......
.......
.......
..x.o..
.oxox..


In [5]:
hex(game.data[0])

'0x20200080'

In [6]:
def test_moves(name, moves, answers, display=False):
    print("Test::"+name)
    game = Connect4()
    for i, m in enumerate(moves):
        if game.move(m) is None:
            assert answers[i] is None
            continue
        if display:
            print(i, m, game.is_end(), answers[i])
            game.display()
        assert game.is_end() == answers[i]
    return game

# test
test_data = [
    ("Overflow1", [3]*7, [False]*6+[None]), 
    ("Overflow2", [6]*7, [False]*6+[None]), 
    ("Overflow3", [0,1,2]*7+[3], [False]*18+[None]*3+[False]),
    ("Vertical1", [1,2]*4, [False]*6+[True]*2),
    ("Vertical2", [1,2]*3+[2,1]*3, [False]*12),
    ("Vertical3", [6]*3+[5,6]*3, [False]*8+[True]),
    ("Horizontal1", [0,0,1,1,2,2,3,3], [False]*6+[True]*2),
    ("Horizontal2", [0,1,2,3,4,5,6]*2+[1,2,3,4,5,6,0]*2+[0,1,2,3,4,5,6]+[6,0,5,1,4,2,3], [False]*41+[True]),
    ("Diagonal1", [0,1,2,3,4,5,6]*3+[0,1], [False]*21+[True]*2),
    ("Diagonal2", [0,1,2,3,4,5,6]*3+[1,2], [False]*21+[False]*2),
    ("Diagonal3", [0,1,2,3,4]*3+[2,3,4], [False]*15+[False,True,True]),
    ("Diagonal4", [0,1,2,3,4,5,6]*3+[2,3], [False]*21+[True]*2),]
for data in test_data:
    test_moves(*data).display()

Test::Overflow1


Test::Overflow2


Test::Overflow3


Test::Vertical1


Test::Vertical2


Test::Vertical3


Test::Horizontal1


Test::Horizontal2


Test::Diagonal1


Test::Diagonal2


Test::Diagonal3


Test::Diagonal4


In [7]:
print(repr(game))

.......
.......
.......
.......
..x.o..
.oxox..


In [8]:
def MC_agent(_game):
    N = 100
    score = [-1.0*N]*7
    for i in range(7):
        game = Connect4(_game.data, _game.turn)
        if game.move(i):
            if game.is_end():
                return i
            s = 0
            for j in range(N):
                #print("move", i, "case", j)
                r = random_play(game.data, game.turn)
                turn = (r-1)%2
                if r == 0:
                    pass
                elif  (r-1)%2 == _game.turn%2:
                    s += 0.95** (r-_game.turn-1)
                else:
                    s -= .95** (r-_game.turn-1)
            score[i] = s/N
    return max(zip(score, range(7)))[1]

In [9]:
def random_vs_MC(init_data=None, init_turn=0, display=False):
    game = Connect4(init_data, init_turn)
    while game.turn < 42 and not game.is_end():
        if game.turn%2 == 0:
            while game.move(randint(0,6)) is None:
                continue
        else:
            i = MC_agent(game)
            game.move(i)
        if display == 'all':
            game.display()
    if display:
        game.display()
    if game.is_end():
        return game.turn
    return 0

In [10]:
%%timeit -n 1 -r 1
stat=[0,0]
for i in range(100):
    r = random_vs_MC()
    if r !=0:
        stat[(r-1)%2]+=1
    if i%100 == 0:
        print(i, stat)
print(stat)

0 [0, 1]
[0, 100]
1 loops, best of 1: 5.53 s per loop


In [24]:

import websocket, json, sys, argparse
server = "ws://miin.thechiao.com:8001"
#server = "ws://localhost:8001"

class c4client:

    def __init__(self, name , ngames=1, against=None, display=True):
        ws = self.ws = websocket.create_connection(server)
        self.name = name
        self.display = display
        self.result = [[0, 0, 0], [0, 0, 0]]
        self.against = against
        self.total_games = self.ngames=ngames
        self.start_game()
        self.loop()
        
    def loop(self):
        ws = self.ws
        while 1:
            msg = json.loads(ws.recv())
            msg_type = msg["type"]
            byebye = ({"end" : "Game has ended",
                       "disconnected" : "Other player has disconnected"}
                      .get(msg_type, None))
            if byebye:
                #print(byebye)
                observer = None
                if self.ngames:
                    self.start_game()
                    continue
                else:
                    print("Result: %s of %d games"%(self.result, self.total_games))
                    return

            if msg_type == "ignored":
                print("Invalid input, try again")
                ws.send(json.dumps({"type" : "state_request"}))
                continue

            if msg_type == "state":
                you = msg["you"]
                turn = msg["turn"]

                game = Connect4()
                board = msg["state"]
                for i in range(7):
                    for j in range(6):
                        v = board[i*6+j]
                        game.set_col_row(i, j, v)
                        if v >0:
                            game.turn += 1
                if self.display=="all":
                        game.display()
                        print(turn, "Your side:", you)
                winner = msg.get("winner", None)

                reward = 0

                if winner != None:
                    self.result[you-1][winner] += 1
                    if self.display:
                        game.display()
                        print("Your side=", you, "winner=", winner, self.result)
                    if self.ngames %10 == 0:
                        print(self.total_games-self.ngames, self.result)
                    continue

                if turn == you:
                    move = MC_agent(game)
                    if self.display=="all":
                        print("my move", move)
                    ws.send(json.dumps({"type" : "move",  "move" : move}))
                else:
                    #print("Waiting for other player to play")
                    pass

    def start_game(self):
        self.ws.send(json.dumps({"type" : "start",  "id" : self.name, "against" : self.against}))
        #print("Waiting for game to start", self.ngames)
        self.ngames -= 1





In [25]:
%%timeit -n 1 -r 1
server = "ws://miin.thechiao.com:8001"
#server = "ws://localhost:8001"
c4client(name="tjwmc", ngames = 1000, against="aima", display=False)

10 [[0, 4, 1], [0, 2, 3]]
20 [[0, 4, 3], [0, 7, 6]]
30 [[0, 10, 5], [0, 8, 7]]
40 [[0, 14, 7], [0, 9, 10]]
50 [[0, 15, 9], [0, 13, 13]]
60 [[0, 17, 9], [0, 17, 17]]
70 [[0, 21, 10], [0, 20, 19]]
80 [[0, 25, 14], [0, 21, 20]]
90 [[0, 30, 15], [0, 23, 22]]
100 [[0, 34, 15], [0, 29, 22]]
110 [[0, 35, 19], [0, 32, 24]]
120 [[0, 38, 23], [0, 35, 24]]
130 [[0, 42, 26], [0, 37, 25]]
140 [[0, 42, 30], [0, 39, 29]]
150 [[0, 43, 35], [1, 41, 30]]
160 [[0, 45, 37], [1, 45, 32]]
170 [[0, 48, 40], [1, 48, 33]]
180 [[1, 52, 43], [1, 49, 34]]
190 [[1, 54, 45], [1, 53, 36]]
200 [[1, 58, 46], [1, 57, 37]]
210 [[1, 61, 47], [1, 59, 41]]
220 [[1, 66, 48], [1, 62, 42]]
230 [[1, 68, 52], [1, 63, 45]]
240 [[1, 70, 54], [1, 67, 47]]
250 [[1, 73, 56], [1, 71, 48]]
260 [[1, 76, 62], [1, 71, 49]]
270 [[1, 79, 65], [1, 74, 50]]
280 [[1, 83, 66], [2, 77, 51]]
290 [[1, 88, 68], [2, 79, 52]]
300 [[1, 88, 72], [2, 83, 54]]
310 [[1, 92, 73], [2, 85, 57]]
320 [[1, 94, 76], [2, 88, 59]]
330 [[1, 100, 78], [2, 88, 61]]


In [26]:
%%timeit -n 1 -r 1
server = "ws://miin.thechiao.com:8001"
#server = "ws://localhost:8001"
c4client(name="tjwmc", ngames = 1000, against="random", display=False)

10 [[0, 6, 0], [0, 0, 4]]
20 [[0, 9, 0], [0, 0, 11]]
30 [[0, 13, 0], [0, 0, 17]]
40 [[0, 18, 0], [0, 0, 22]]
50 [[0, 24, 0], [0, 0, 26]]
60 [[0, 28, 0], [0, 0, 32]]
70 [[0, 35, 0], [0, 0, 35]]
80 [[0, 39, 0], [0, 0, 41]]
90 [[0, 46, 0], [0, 0, 44]]
100 [[0, 51, 0], [0, 0, 49]]
110 [[0, 60, 0], [0, 0, 50]]
120 [[0, 67, 0], [0, 0, 53]]
130 [[0, 74, 0], [0, 0, 56]]
140 [[0, 80, 0], [0, 0, 60]]
150 [[0, 84, 0], [0, 0, 66]]
160 [[0, 89, 0], [0, 0, 71]]
170 [[0, 91, 0], [0, 0, 79]]
180 [[0, 99, 0], [0, 0, 81]]
190 [[0, 104, 0], [0, 0, 86]]
200 [[0, 110, 0], [0, 0, 90]]
210 [[0, 117, 0], [0, 0, 93]]
220 [[0, 122, 0], [0, 0, 98]]
230 [[0, 127, 0], [0, 0, 103]]
240 [[0, 133, 0], [0, 0, 107]]
250 [[0, 139, 0], [0, 0, 111]]
260 [[0, 140, 0], [0, 0, 120]]
270 [[0, 146, 0], [0, 0, 124]]
280 [[0, 152, 0], [0, 0, 128]]
290 [[0, 156, 0], [0, 0, 134]]
300 [[0, 161, 0], [0, 0, 139]]
310 [[0, 166, 0], [0, 0, 144]]
320 [[0, 170, 0], [0, 0, 150]]
330 [[0, 175, 0], [0, 0, 155]]
340 [[0, 182, 0], [0, 0, 158]