In [29]:
import os
import sys
import csv
import time
import chess
import chess.engine
import requests
import json
import pandas as pd
from dotenv import load_dotenv
from subprocess import Popen, PIPE, DEVNULL
from tqdm import tqdm

In [2]:
# Constants
stockfish_exe = "/Users/saumik/Documents/classes/rotations/code/Stockfish/src/stockfish"
engine = chess.engine.SimpleEngine.popen_uci(stockfish_exe)
depth = 20
load_dotenv()   
token = os.getenv('LICHESS_TOKEN')

In [3]:
def process_score(score):
    score = round(float(score),2)
    score = 2.0 if score > 2 else score
    score = -2.0 if score < -2 else score
    return score

In [4]:
def evaluate(fen, next_move=None):
    process = Popen(stockfish_exe, shell=True, stdin=PIPE, stdout=PIPE, stderr=DEVNULL)
    if next_move:
        output = process.communicate(input=str.encode(f"position fen {fen} moves {next_move}\neval\n"))[0]
    else:
        output = process.communicate(input=str.encode(f"position fen {fen}\neval\n"))[0]
    if process.returncode != 0:
        sys.stderr.write(f"Failed: {stockfish_exe}\n")
        sys.exit(1)

    # parse for parameter output
    components = ['Material','Imbalance','Mobility','King safety','Threats','Passed','Space','Winnable']
    scores = []
    for line in output.decode("utf-8").split("\n"):
        if any(c in line for c in components):
            mg_score = line.split('|')[3].split()[0]
            mg_score = process_score(mg_score)
            scores.append(mg_score)
    return tuple(scores)

In [5]:
def get_best_move(fen):
#     print(fen)
    board = chess.Board(fen=fen)
    info = engine.analyse(board, chess.engine.Limit(depth=20))
    return info['pv'][0]

In [6]:
get_best_move('1r3rk1/p3bpp1/4p2p/2ppP3/q4P2/PPP5/2Q3PP/R1N1K2R w KQ - 2 21')

Move.from_uci('b3a4')

In [7]:
def get_next_states(fen):
    board = chess.Board(fen)
    moves = [move.uci() for move in board.legal_moves]
    states = set()
    for move in tqdm(moves):
        scores = evaluate(fen, move)
        if scores != ():
            states.add(scores)
    return states

In [8]:
def get_agent_bias(data):
    pass

In [9]:
def get_fens(player, max_games='null', token=None):
    headers = {'Accept':'application/x-ndjson'}
    if token:
        headers['Authorization'] = f'Bearer {token}'
    params = {'max':max_games}
    response = requests.get(f"https://lichess.org/api/games/user/{player}", headers=headers, params=params)
    with open(f'data/{player}.csv', 'w') as f:
        for line in response.iter_lines():
            line = json.loads(line.decode('utf-8'))
            if line['variant'] == 'standard' and 'moves' in line:
                try:
                    color = 'w' if line['players']['white']['user']['name'] == player else 'b'
                except:
                    continue
                moves = line['moves'].split()
                board = chess.Board()
                for san in moves:
                    uci = board.parse_san(san).uci()
                    fen = board.fen()
                    movenum = int(fen.split()[5])
                    if fen.split()[1] == color and movenum >= 10 and movenum < 30:
                        print(f'{uci},{fen}', file=f)
                    board.push_uci(uci)

In [10]:
# %%time
# get_fens('snar',max_games=10, token=token)

In [106]:
top_blitz = ['Konevlad', 'may6enexttime','IWANNABEADOORED','Ogrilla','cjota95',
             'alireza2003','Drvitman','black_knight22','NIndja64','Benefactorr']
top_blitz = ['Konevlad']

In [12]:
# for p in tqdm(top_blitz):
#     get_fens(p, max_games=100, token=token)

In [20]:
def rm_paren(tup):
    return ','.join(str(i) for i in tup)

In [27]:
def get_avg_bias(player):
    total = [0,0,0,0,0,0,0,0]
    with open(f'data/{player}.csv', 'r') as f:
        with open(f'diffs/{player}.csv','w') as fw:
            csv_reader = csv.reader(f, delimiter=',')
#             count = 0
            for uci,fen in tqdm(csv_reader):
#                 if count >= 10:
#                     break
                current_eval = evaluate(fen)
                played_eval = evaluate(fen, uci)
                best = get_best_move(fen)
                best_eval = played_eval if str(best) == str(uci) else evaluate(fen, best)
                print(f'{fen},{rm_paren(current_eval)},{uci},{rm_paren(played_eval)},{best},{rm_paren(best_eval)}', file=fw)
#                 count+=1

In [28]:
for p in top_blitz:
    print(get_avg_bias(p))
    time.sleep(1)
    break
    

1860it [1:01:52,  2.00s/it]


None


In [21]:
tup = (1,2,3,4,5)
rm_paren(tup)

'1,2,3,4,5'

In [94]:
def get_df(filename, cols):
    df = pd.read_csv(filename, names=names, index_col=False, usecols=['fen']+cols)
    df['color'] = df['fen'].apply(lambda x: x.split()[1])
    df = df.drop('fen', axis=1)
    df.loc[df['color'] == 'b',cols] = df*-1
    df = df.drop('color', axis=1)
    return df

In [101]:
elen = 8
fen_cols = [f'f{i}' for i in range(elen)]
player_cols = [f'p{i}' for i in range(elen)]
best_cols = [f'b{i}' for i in range(elen)]
names = ['fen'] + fen_cols + ['player'] + player_cols + ['best'] + best_cols
# skiprows = lambda x: x.split()[1] == 'b'

df_fen = get_df('diffs/Konevlad.csv', fen_cols)
df_player = get_df('diffs/Konevlad.csv', player_cols)
df_best = get_df('diffs/Konevlad.csv', best_cols)

df_player_change = df_player - df_fen.values
df_best_change = df_best - df_fen.values
df_player_bias = df_player - df_best.values

In [103]:
df_player_change.mean()

p0    0.447852
p1    0.092609
p2    0.067268
p3    0.161833
p4    0.151961
p5    0.031395
p6    0.025365
p7   -0.000969
dtype: float64

In [104]:
df_best_change.mean()

b0    0.435044
b1    0.091800
b2    0.066411
b3    0.150584
b4    0.143700
b5    0.024752
b6    0.025207
b7   -0.000970
dtype: float64

In [105]:
df_player_bias.mean()

p0    0.008796
p1    0.002173
p2    0.000803
p3    0.007672
p4    0.005396
p5    0.006325
p6    0.000201
p7    0.000000
dtype: float64