-
Notifications
You must be signed in to change notification settings - Fork 29
/
annotator.py
executable file
·675 lines (521 loc) · 21.9 KB
/
annotator.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
#!/usr/bin/env python3
# -*- coding: UTF-8 -*-
"""
© Copyright 2016 Ryan Delaney. All rights reserved.
This program is free software: you can redistribute it and/or modify it
under the terms of the GNU General Public License as published by the
Free Software Foundation, either version 3 of the License, or (at your
option) any later version.
You should have received a copy of the GNU General Public License along
with this program. If not, see <http://www.gnu.org/licenses/>.
"""
import os
import argparse
import json
import logging
import chess
import chess.pgn
import chess.uci
# Initialize Logging Module
logger = logging.getLogger(__name__)
if not logger.handlers:
ch = logging.StreamHandler()
logger.addHandler(ch)
def parse_args():
"""
Define an argument parser and return the parsed arguments
"""
parser = argparse.ArgumentParser(
prog='annotator.py',
description='takes chess games in a PGN file and prints annotations to standard output')
parser.add_argument("--file", "-f", help="input PGN file", required=True, metavar="FILE.pgn")
parser.add_argument("--engine", "-e", help="analysis engine", default="stockfish")
parser.add_argument("--time", "-t", help="how long to spend on each game", default="1", type=float, metavar="MINUTES")
parser.add_argument("--verbose", "-v", help="increase verbosity", action="count")
return parser.parse_args()
def setup_logging(args):
"""
Sets logging module verbosity according to runtime arguments
"""
if args.verbose:
if args.verbose >= 3:
# EVERYTHING TO LOG FILE
logger.setLevel(logging.DEBUG)
hldr = logging.FileHandler('annotator.log')
logger.addHandler(hldr)
elif args.verbose == 2:
# DEBUG TO STDERR
logger.setLevel(logging.DEBUG)
elif args.verbose == 1:
# INFO TO STDERR
logger.setLevel(logging.INFO)
def eval_numeric(info_handler):
"""
Returns a numeric evaluation of the position, even if depth-to-mate was
found. This facilitates comparing numerical evaluations with depth-to-mate
evaluations
"""
dtm = info_handler.info["score"][1].mate
cp = info_handler.info["score"][1].cp
if dtm is not None:
# We have depth-to-mate (dtm), so translate it into a numerical
# evaluation. This number needs to be just big enough to guarantee that
# it is always greater than a non-dtm evaluation.
max_score = 10000
if dtm >= 1:
return max_score - dtm
else:
return -(max_score + dtm)
elif cp is not None:
# We don't have depth-to-mate, so return the numerical evaluation (in centipawns)
return cp
# If we haven't returned yet, then the info_handler had garbage in it
raise RuntimeError("Evaluation found in the info_handler was unintelligible")
def eval_human(white_to_move, info_handler, invert):
"""
Returns a human-readable evaluation of the position:
If depth-to-mate was found, return plain-text mate announcement (e.g. "Mate in 4")
If depth-to-mate was not found, return an absolute numeric evaluation
"""
dtm = info_handler.info["score"][1].mate
cp = info_handler.info["score"][1].cp
if dtm is not None:
return "Mate in {}".format(abs(dtm))
elif cp is not None:
# We don't have depth-to-mate, so return the numerical evaluation (in pawns)
if invert:
return '{:.2f}'.format(eval_absolute(cp / -100, white_to_move))
else:
return '{:.2f}'.format(eval_absolute(cp / 100, white_to_move))
# If we haven't returned yet, then the info_handler had garbage in it
raise RuntimeError("Evaluation found in the info_handler was unintelligible")
def eval_absolute(number, white_to_move):
"""
Accepts a relative evaluation (from the point of view of the player to
move) and returns an absolute evaluation (from the point of view of white)
"""
if not white_to_move:
number = -number
return number
def needs_annotation(delta):
"""
Returns a boolean indicating whether a node with the given evaluations
should have an annotation added
"""
return delta > 50
def judge_move(board, played_move, engine, info_handler, searchtime_s):
"""
Evaluate the strength of a given move by comparing it to engine's best
move and evaluation at a given depth, in a given board context
Returns a judgment
A judgment is a dictionary containing the following elements:
"bestmove": The best move in the position, according to the engine
"besteval": A numeric evaluation of the position after the best move is played
"bestcomment": A plain-text comment appropriate for annotating the best move
"pv": The engine's primary variation including the best move
"playedeval": A numeric evaluation of the played move
"playedcomment": A plain-text comment appropriate for annotating the played move
"depth": Search depth in plies
"nodes": Number nodes searched
"""
# Calculate the search time in milliseconds
searchtime_ms = searchtime_s * 1000
judgment = {}
# First, get the engine bestmove and evaluation
engine.position(board)
engine.go(movetime=searchtime_ms / 2)
judgment["bestmove"] = info_handler.info["pv"][1][0]
judgment["besteval"] = eval_numeric(info_handler)
judgment["pv"] = info_handler.info["pv"][1]
judgment["depth"] = info_handler.info["depth"]
judgment["nodes"] = info_handler.info["nodes"]
# Annotate the best move
judgment["bestcomment"] = eval_human(board.turn, info_handler, False)
# If the played move matches the engine bestmove, we're done
if played_move == judgment["bestmove"]:
judgment["playedeval"] = judgment["besteval"]
else:
# get the engine evaluation of the played move
board.push(played_move)
engine.position(board)
engine.go(movetime=searchtime_ms / 2)
# Store the numeric evaluation.
# We invert the sign since we're now evaluating from the opponent's perspective
judgment["playedeval"] = -eval_numeric(info_handler)
# Take the played move off the stack (reset the board)
board.pop()
# Annotate the played move
judgment["playedcomment"] = eval_human(board.turn, info_handler, True)
return judgment
def get_nags(judgment):
"""
Returns a Numeric Annotation Glyph (NAG) according to how much worse the
played move was vs the best move
"""
delta = judgment["playedeval"] - judgment["besteval"]
if delta < -300:
return [chess.pgn.NAG_BLUNDER]
elif delta < -150:
return [chess.pgn.NAG_MISTAKE]
elif delta < -75:
return [chess.pgn.NAG_DUBIOUS_MOVE]
else:
return []
def var_end_comment(node, judgment):
"""
Return a human-readable annotation explaining the board state (if the game
is over) or a numerical evaluation (if it is not)
"""
board = node.board()
score = judgment["bestcomment"]
depth = judgment["depth"]
if board.is_stalemate():
string = "Stalemate"
elif board.is_insufficient_material():
string = "Insufficient material to mate"
elif board.can_claim_fifty_moves():
string = "Fifty move rule"
elif board.can_claim_threefold_repetition():
string = "Three-fold repetition"
elif board.is_checkmate():
# checkmate speaks for itself
string = None
else:
string = str(score)
return "{}/{}".format(string, depth)
def add_annotation(node, judgment):
"""
Add evaluations and the engine's primary variation as annotations to a node
"""
prev_node = node.parent
# Add the engine evaluation
if judgment["bestmove"] != node.move:
node.comment = judgment["playedcomment"]
# Add the engine's primary variation (PV) as an annotation
# We truncate the PV to 10 moves because engine variations tend to get silly near the end
prev_node.add_main_variation(judgment["bestmove"])
var_node = prev_node.variation(judgment["bestmove"])
for move in judgment["pv"][:10]:
if var_node.move != move:
try:
assert var_node.board().is_legal(move)
except AssertionError:
logger.critical("\nTried to add an illegal move:")
logger.critical(var_node.board())
logger.critical(var_node.board().fen())
logger.critical(move)
logger.critical("engine bestmove: {}".format(judgment["bestmove"].uci()))
logger.critical("engine pv root : {}".format(judgment["pv"][0].uci()))
raise
var_node.add_main_variation(move)
var_node = var_node.variation(move)
# Add a comment to the end of the variation explaining the game state
var_node.comment = var_end_comment(var_node, judgment)
# We added the variation as the main line, so now it has to be demoted
# (This is done so that variations can be added to the final node)
prev_node.demote(judgment["bestmove"])
# Add a Numeric Annotation Glyph (NAG) according to how weak the played move was
node.nags = get_nags(judgment)
def classify_fen(fen, ecodb):
"""
Searches a JSON file with Encyclopedia of Chess Openings (ECO) data to
check if the given FEN matches an existing opening record
Returns a classification
A classfication is a dictionary containing the following elements:
"code": The ECO code of the matched opening
"desc": The long description of the matched opening
"path": The main variation of the opening
"""
classification = {}
classification["code"] = ""
classification["desc"] = ""
classification["path"] = ""
for opening in ecodb:
if opening['f'] == fen:
classification["code"] = opening['c']
classification["desc"] = opening['n']
classification["path"] = opening['m']
return classification
def eco_fen(board):
"""
Takes a board position and returns a FEN string formatted for matching with eco.json
"""
board_fen = board.board_fen()
castling_fen = board.castling_xfen()
if board.turn: # If white to move
to_move = 'w'
else:
to_move = 'b'
fen = board_fen + " " + to_move + " " + castling_fen
return fen
def debug_print(node, judgment):
"""
Prints some debugging info about a position that was just analyzed
"""
logger.debug(node.board())
logger.debug(node.board().fen())
logger.debug("Played move: %s", format(node.parent.board().san(node.move)))
logger.debug("Best move: %s", format(node.parent.board().san(judgment["bestmove"])))
logger.debug("Best eval: %s", format(judgment["besteval"]))
logger.debug("Best comment: %s", format(judgment["bestcomment"]))
logger.debug("PV: %s", format(node.parent.board().variation_san(judgment["pv"])))
logger.debug("Played eval: %s", format(judgment["playedeval"]))
logger.debug("Played comment: %s", format(judgment["playedcomment"]))
logger.debug("Delta: %s", format(judgment["besteval"] - judgment["playedeval"]))
logger.debug("Depth: %s", format(judgment["depth"]))
logger.debug("Nodes: %s", format(judgment["nodes"]))
logger.debug("")
def cpl(string):
"""
Centipawn Loss
Takes a string and returns an integer representing centipawn loss of the move
We put a ceiling on this value so that big blunders don't skew the acpl too much
"""
cpl = int(string)
max_cpl = 2000
return min(cpl, max_cpl)
def acpl(cpl_list):
"""
Average Centipawn Loss
Takes a list of integers and returns an average of the list contents
"""
return sum(cpl_list) / len(cpl_list)
def clean_game(game):
"""
Takes a game and strips all comments and variations, returning the "cleaned" game
"""
node = game.end()
while not node == game.root():
prev_node = node.parent
node.comment = None
node.nags = []
for variation in node.variations:
if not variation.is_main_variation():
node.remove_variation(variation)
node = prev_node
return node.root()
def classify_opening(game):
"""
Takes a game and adds an ECO code classification for the opening
Returns the classified game and root_node, which is the node where the classification was made
"""
ecofile = os.path.join(os.path.dirname(__file__), 'eco/eco.json')
ecodata = json.load(open(ecofile, 'r'))
ply_count = 0
node = game.end()
root_node = node
while not node == game.root():
prev_node = node.parent
fen = eco_fen(node.board())
classification = classify_fen(fen, ecodata)
if classification["code"] != "":
# Add some comments classifying the opening
node.root().headers["ECO"] = classification["code"]
node.root().headers["Opening"] = classification["desc"]
node.comment = classification["code"] + " " + classification["desc"]
# Remember this position so we don't analyze the moves preceding it later
root_node = node
# Break (don't classify previous positions)
break
ply_count += 1
node = prev_node
return node.root(), root_node, ply_count
def add_acpl(game, root_node):
"""
Takes a game and a root node, and adds PGN headers with the computed ACPL
(average centipawn loss) for each player. Returns a game with the added
headers.
"""
white_cpl = []
black_cpl = []
node = game.end()
while not node == root_node:
prev_node = node.parent
if node.board().turn:
black_cpl.append(cpl(node.comment))
else:
white_cpl.append(cpl(node.comment))
node = prev_node
node.root().headers["White ACPL"] = round(acpl(white_cpl))
node.root().headers["Black ACPL"] = round(acpl(black_cpl))
return node.root()
def get_total_budget(arg_time):
return float(arg_time) * 60
def get_pass1_budget(total_budget):
return total_budget / 10
def get_pass2_budget(total_budget, pass1_budget):
return total_budget - pass1_budget
def get_time_per_move(pass_budget, ply_count):
return float(pass_budget) / float(ply_count)
def analyze_game(game, arg_time, enginepath):
"""
Take a PGN game and return a GameNode with engine analysis added
- Attempt to classify the opening with ECO and identify the root node
* The root node is the position immediately after the ECO classification
* This allows us to skip analysis of moves that have an ECO classification
- Analyze the game, adding annotations where appropriate
- Return the root node with annotations
"""
# First, check the game for PGN parsing errors
# This is done so that we don't waste CPU time on nonsense games
checkgame(game)
###########################################################################
# Initialize the engine
###########################################################################
try:
engine = chess.uci.popen_engine(enginepath)
except FileNotFoundError:
errormsg = "Engine '{}' was not found. Aborting...".format(enginepath)
logger.critical(errormsg)
raise
except PermissionError:
errormsg = "Engine '{}' could not be executed. Aborting...".format(enginepath)
logger.critical(errormsg)
raise
engine.uci()
info_handler = chess.uci.InfoHandler()
engine.info_handlers.append(info_handler)
# Start keeping track of the root node
# This will change if we successfully classify the opening
root_node = game.end()
node = root_node
###########################################################################
# Clear existing comments and variations
###########################################################################
game = clean_game(game)
###########################################################################
# Attempt to classify the opening and calculate the game length
###########################################################################
logger.info("Classifying the opening...")
game, root_node, ply_count = classify_opening(game)
###########################################################################
# Perform game analysis
###########################################################################
# Calculate how many seconds we have to accomplish this
# The parameter is priced in minutes so we convert to seconds
budget = get_total_budget(arg_time)
logger.debug("Total budget is {} seconds".format(budget))
# First pass:
#
# - Performs a shallow-depth search to the root node
# - Leaves annotations showing the centipawn loss of each move
#
# These annotations form the basis of the second pass, which will analyze
# those moves that had a high centipawn loss (mistakes)
# We have a fraction of the total budget to finish the first pass
pass1_budget = get_pass1_budget(budget)
time_per_move = get_time_per_move(pass1_budget, ply_count)
logger.debug("Pass 1 budget is %i seconds, with %f seconds per move", pass1_budget, time_per_move)
# Loop through the game doing shallow analysis
logger.info("Performing first pass...")
# Count the number of mistakes that will have to be annotated later
error_count = 0
node = game.end()
while not node == root_node:
prev_node = node.parent
# Get the engine judgment of the played move in this position
judgment = judge_move(prev_node.board(), node.move, engine, info_handler, time_per_move)
delta = judgment["besteval"] - judgment["playedeval"]
# Record the delta, to be referenced in the second pass
node.comment = str(delta)
# Count the number of mistakes that will have to be annotated later
if needs_annotation(delta):
error_count += 1
# Print some debugging info
debug_print(node, judgment)
node = prev_node
# Calculate the average centipawn loss (ACPL) for each player
game = add_acpl(game, root_node)
# Second pass:
#
# - Iterate through the comments looking for moves with high centipawn
# loss
# - Leaves annotations on those moves showing what the player could have
# done instead
#
# We use the rest of the budgeted time to perform the second pass
pass2_budget = get_pass2_budget(budget, pass1_budget)
logger.debug("Pass 2 budget is %i seconds", pass2_budget)
try:
time_per_move = pass2_budget / error_count
except ZeroDivisionError:
logger.debug("No errors found on first pass!")
# There were no mistakes in the game, so deeply analyze all the moves
time_per_move = pass2_budget / ply_count
node = game.end()
while not node == root_node:
prev_node = node.parent
# Reset the comments to a value high enough to ensure that they all get analyzed
node.comment = '8593'
node = prev_node
# Loop through the game doing deep analysis on the flagged moves
logger.info("Performing second pass...")
node = game.end()
while not node == root_node:
prev_node = node.parent
delta = int(node.comment)
if needs_annotation(delta):
# Get the engine judgment of the played move in this position
judgment = judge_move(prev_node.board(), node.move, engine, info_handler, time_per_move)
# Verify that the engine still dislikes the played move
delta = judgment["besteval"] - judgment["playedeval"]
if needs_annotation(delta):
add_annotation(node, judgment)
else:
node.comment = None
# Print some debugging info
debug_print(node, judgment)
else:
node.comment = None
node = prev_node
###########################################################################
annotator = engine.name
node.root().comment = annotator
node.root().headers["Annotator"] = annotator
return node.root()
def checkgame(game):
"""
Check for PGN parsing errors and abort if any were found
This prevents us from burning up CPU time on nonsense positions
"""
if game.errors:
errormsg = "There were errors parsing the PGN game:"
logger.critical(errormsg)
for error in game.errors:
logger.critical(error)
logger.critical("Aborting...")
raise RuntimeError(errormsg)
# Try to verify that the PGN file was readable
if game.end().parent is None:
errormsg = "Could not render the board. Is the file legal PGN? Aborting..."
logger.critical(errormsg)
raise RuntimeError(errormsg)
def main():
"""
Main function
- Load games from the PGN file
- Annotate each game, and print the game with the annotations
"""
args = parse_args()
setup_logging(args)
pgnfile = args.file
try:
with open(pgnfile) as pgn:
for game in iter(lambda: chess.pgn.read_game(pgn), None):
try:
analyzed_game = analyze_game(game, args.time, args.engine)
except KeyboardInterrupt:
logger.critical("\nReceived KeyboardInterrupt.")
raise
except Exception as e:
logger.critical("\nAn unhandled exception occurred: {}".format(type(e)))
raise e
else:
print(analyzed_game, '\n')
except PermissionError:
errormsg = "Input file not readable. Aborting..."
logger.critical(errormsg)
raise
if __name__ == "__main__":
main()
# vim: ft=python expandtab smarttab shiftwidth=4 softtabstop=4 fileencoding=UTF-8: