In [None]:
import sqlite3
import re
import io
import chess.pgn
import ast
import csv
import pandas
import numpy as np

In [None]:
def quick_fen_to_array(fen):
    # fen = fen.split(' ')[0]
    result = np.zeros((2, 6, 8, 8))
    row, col = 7, 0

    letter_to_id = {'k': 0, 'q': 1, 'r': 2, 'b': 3, 'n': 4, 'p': 5}
    for letter in fen:
      if 'a' <= letter <= 'z':
        result[1, letter_to_id[letter], row, col] = 1
      elif 'A' <= letter <= 'Z':
        result[0, letter_to_id[letter.lower()], row, col] = 1
      elif letter == ' ':
        break
      if '0' <= letter <= '9':
        col += int(letter)
      elif letter == '/':
        row -= 1
        col = 0
      else:
        col += 1
    position = result.flatten()
    result = np.append(
      position, 
      [
        letter in fen.split(' ')[2] for letter in ['K', 'Q', 'k', 'q']
      ]
    )
    return result

def save_db_to_csv():
  
  with sqlite3.connect('../../chess_data/lichess.db') as connection:
    cursor = connection.cursor()

    cursor.execute("SELECT fen, eval, move FROM positions")
    rows = []
    counter = 1
    for row in cursor:
      canonical = quick_fen_to_array(row[0])
      rows.append(','.join(str(i) for i in list(canonical) + [row[1], row[2]]))

      if counter % 100_000 == 0:
        with open(f'../../chess_data/csv_data/data_{counter//100_000}.csv', 'w') as output:
          output.write('\n'.join(rows))
          rows = []
      counter += 1
    cursor.close()

save_db_to_csv()
# quick_fen_to_array('rnbqkbnr/pppppppp/8/8/4P3/8/PPPP1PPP/RNBQKBNR b Kq e3 0 1')

In [None]:
data_file = "../../chess_data/lichess_nn_2022-06.pgn"
output_file = "../../chess_data/lichess_nn_2022-06.csv"

def fen_to_csv(data_file, output_file):
    with open(data_file) as input, open(output_file, 'w') as output:
        data_writer = csv.writer(output, delimiter=',')
        while True:
            line = input.readline()
            if line == "":
                break
            if line[-2:] != ']\n':
                continue
            parsed_line = ast.literal_eval(line)
            data_writer.writerow(parsed_line)

In [None]:
def getCanonicalForm(board, player):
    if int(player) == 0:
        return board
    
    # board = board.numpy().decode()
    board_splited = board.split()
    board_splited[0] = board_splited[0].swapcase()[::-1]
    board_splited[0] = '/'.join([line[::-1] for line in board_splited[0].split('/')])
    board_splited[1] = 'w' if board_splited[1] == 'b' else 'b'
    board_splited[2] = board_splited[2].swapcase()

    castling = ""
    for letter in ['K', 'Q', 'k', 'q']:
        if letter in board_splited[2]:
            castling += letter
    if castling == "":
        castling = "-"
    board_splited[2] = castling

    if board_splited[3] != '-':
        row = '4' if board_splited[3][1] == '3' else '3'
        col = {
            'a': 'h',
            'b': 'g',
            'c': 'f',
            'd': 'e',
            'e': 'd',
            'f': 'c',
            'g': 'b',
            'h': 'a'
        }[board_splited[3][0]]
        board_splited[3] = col + row
    
    return ' '.join(board_splited)

In [None]:
with sqlite3.connect('../../chess_data/lichess.db') as connection:
    cursor = connection.cursor()
    # cursor.execute('DROP INDEX IF EXISTS idx_fen')
    # cursor.execute('CREATE INDEX idx_fen ON positions (fen)')
    # connection.commit()
    # cursor.close()

    cursor.execute("SELECT fen, eval, move FROM positions WHERE fen LIKE '% b %'")
    update_cursor = connection.cursor()
    params = []
    for i, row in enumerate(cursor):
      print(row)
      canonical = getCanonicalForm(row[0], 1)
      eval_reversed = -1 * int(row[1])
      params.append([canonical, eval_reversed, row[0]])
      if i % 100000 == 0 and params:
        print(params[0])
        update_cursor.executemany(f"UPDATE positions SET fen = ?, eval = ? WHERE fen = ?", params)
        connection.commit()
        params = []
    update_cursor.executemany(f"UPDATE positions SET fen = ?, eval = ? WHERE fen = ?", params)
    connection.commit()
    cursor.close()
    update_cursor.close()
    # print(cursor.execute('ALTER TABLE positions ADD player INTEGER'))
    # cursor.execute("UPDATE positions SET player = CASE WHEN fen LIKE '% w %' THEN 0 ELSE 1 END")
    connection.commit()

In [None]:
with sqlite3.connect('../../chess_data/lichess.db') as connection:
    cursor = connection.cursor()
    print(cursor.execute('ALTER TABLE positions ADD player INTEGER'))
    # cursor.execute("UPDATE positions SET player = CASE WHEN fen LIKE '% w %' THEN 0 ELSE 1 END")
    connection.commit()

In [None]:
with sqlite3.connect('../../chess_data/lichess_normal.db') as connection:
    cursor = connection.cursor()
    # print(cursor.execute('select * from positions limit 10').fetchall())
    # cursor.close()
    cursor.execute("CREATE TABLE IF NOT EXISTS positions (fen TEXT, eval REAL, move TEXT, UNIQUE(fen, move) ON CONFLICT IGNORE)")
    connection.commit()

In [None]:
csv_file = "../../chess_data/lichess_nn_2022-06.csv"

df = pandas.read_csv(csv_file, names=['fen', 'eval', 'move'])
df.to_sql('positions', connection, if_exists='append', index=False)

## Scripts for data

In [None]:
data_path = '../../chess_data/lichess_db_standard_rated_2022-06.pgn'
only_evaluated_path = '../../chess_data/lichess_evaluated_2022-06.pgn'

In [None]:
import bz2

def convert_move(move):
    from_position = ord(move[0]) - 97 + 8*(int(move[1])-1)
    to_position = ord(move[2]) - 97 + 8*(int(move[3])-1)

    return from_position * 64 + to_position


db_path = '../../chess_data/lichess_normal.db'
data_path = '../../chess_data/lichess_db_standard_rated_2022-07.pgn.bz2'
def pgn_to_csv(data_path, db_path):
    sqlite_insert_query = """INSERT INTO positions
                          (fen, eval, move) 
                          VALUES (?, ?, ?);"""
    # connection = sqlite3.connect(db_path)
    # cursor = connection.cursor()
    file_counter = 0
    with bz2.BZ2File(data_path, 'r') as input:
        half = False
        one_png = []
        results = []
        game_in, game_out = 0, 0
        elo_white, elo_black = 0, 0
        for line in input:
            line = line.decode()
            if line == "\n":
                if half:
                    half = False
                    if "eval" not in one_png[-1] or elo_white < 2000 or elo_black < 2000:
                        game_out += 1
                    else:
                        game_in += 1
                        pgn = io.StringIO(one_png[-1])
                        game = chess.pgn.read_game(pgn)
                        board = game.board()
                        evaluation = 0.2
                        for move_node in game.mainline():
                            # print(board.fen(), evaluation, convert_move(str(move_node.move)))
                            results.append([board.fen(), str(evaluation), str(convert_move(str(move_node.move)))])
                            if '#' in move_node.comment:
                                evaluation = int(re.search('(?<=val #)[-0-9]+', move_node.comment).group(0))
                                evaluation = 20 * (-1 if evaluation < 0 else 20)
                            elif 'eval' in move_node.comment:
                                evaluation = float(re.search('(?<=val )[0-9\.-]+', move_node.comment).group(0))
                            else:
                                # print([board.fen(), evaluation, convert_move(str(move_node.move))])
                                break
                            board.push(move_node.move)
                        
                        # cursor.executemany(sqlite_insert_query, results)
                        # connection.commit()
                        # results = []
                    # output.write('\n'.join(results+['']))
                    one_png = []
                    half = False
                    if len(results) > 1000_000:
                        print(f'Games with eval {game_in}/{game_in+game_out}')
                        
                        with open(f'../../chess_data/csv_data_normal/{file_counter}.csv', 'w') as output:
                            output.write('\n'.join([','.join(row) for row in results]))
                        file_counter += 1
                        results = []
                else:
                    half = True
                    one_png.append(line)
            elif "[WhiteElo" in line:
                elo_white = int(re.search('(?<=")\d+', line).group(0))
            elif "[BlackElo" in line:
                elo_black = int(re.search('(?<=")\d+', line).group(0))
            else:
                one_png.append(line)

    # cursor.close()
    # connection.close()

In [None]:
pgn_to_csv(data_path, db_path)

In [None]:
def pgn_with_evaluation(data_path, only_evaluated_path):
    with open(data_path) as input, open(only_evaluated_path, 'w') as output:
        half = False
        one_png = []
        game_in, game_out = 0, 0
        while line:=input.readline():
            if line == "\n":
                if half:
                    half = False
                    if "eval" in one_png[-1]:
                        output.write(''.join(one_png+['\n']))
                        game_in += 1
                    else:
                        game_out += 1
                    if (game_in + game_out)%100000 == 0:
                        print(f'Games with eval {game_in}/{game_in+game_out}')
                    one_png = []
                else:
                    half = True
                    one_png.append(line)
            else:
                one_png.append(line)

In [None]:
def pgn_with_above_2000(data_path, only_above_2000_path):
    with open(data_path) as input, open(only_above_2000_path, 'w') as output:
        half = False
        one_png = []
        game_in, game_out = 0, 0
        elo_white, elo_black = 0, 0
        while line:=input.readline():
            if line == "\n":
                if half:
                    half = False
                    if elo_white > 2000 and elo_black > 2000:
                        output.write(''.join(one_png+['\n']))
                        game_in += 1
                    else:
                        game_out += 1
                    if (game_in + game_out)%100000 == 0:
                        print(f'Games with eval {game_in}/{game_in+game_out}')
                    one_png = []
                else:
                    half = True
                    one_png.append(line)
            elif "[WhiteElo" in line:
                elo_white = int(re.search('(?<=")\d+', line).group(0))
            elif "[BlackElo" in line:
                elo_black = int(re.search('(?<=")\d+', line).group(0))
            else:
                one_png.append(line)

In [None]:
data_path = '../../chess_data/lichess_evaluated_2022-06.pgn'
only_above_2000_path = '../../chess_data/lichess_evaluated_above_2000_2022-06.pgn'
pgn_with_above_2000(data_path, only_above_2000_path)

In [None]:
def convert_move(move):
    from_position = ord(move[0]) - 97 + 8*(int(move[1])-1)
    to_position = ord(move[2]) - 97 + 8*(int(move[3])-1)

    return from_position * 64 + to_position


def convert_to_nn_input(data_path, output_path):
    with open(data_path) as input, open(output_path, 'w') as output:
        half = False
        one_png = []
        results = []
        evaluation = 0.2
        while line:=input.readline():
            if line == "\n":
                if half:
                    pgn = io.StringIO(one_png[-1])
                    game = chess.pgn.read_game(pgn)
                    board = game.board()
                    for move_node in game.mainline():
                        # print(board.fen(), evaluation, convert_move(str(move_node.move)))
                        results.append(str([board.fen(), evaluation, convert_move(str(move_node.move))]))
                        if '#' in move_node.comment:
                            evaluation = int(re.search('(?<=val #)[-0-9]+', move_node.comment).group(0))
                            evaluation = 20 * (-1 if evaluation < 0 else 20)
                        elif 'eval' in move_node.comment:
                            evaluation = float(re.search('(?<=val )[0-9\.-]+', move_node.comment).group(0))
                        else:
                            print([board.fen(), evaluation, convert_move(str(move_node.move))])
                            break
                        board.push(move_node.move)
                    
                    output.write('\n'.join(results+['']))
                    one_png = []
                    half = False
                else:
                    half = True
                    one_png.append(line)
            else:
                one_png.append(line)

In [None]:
data_path = '../../chess_data/lichess_evaluated_above_2000_2022-06.pgn'
output_path = '../../chess_data/lichess_nn_2022-06.pgn'
convert_to_nn_input(data_path, output_path)