In [1]:
import pandas as pd
import numpy as np

In [2]:
from modules.utils.rating_to_category import rating_to_number
from modules.chessPreprocessor.preprocessor import Preprocessor
import chess.pgn

Попробуем собрать статистику по 1 игре

In [3]:
proc_first = Preprocessor()
required_headers = ['Event', 'Result', 'WhiteElo', 'BlackElo', 'WhiteRatingDiff', 'BlackRatingDiff', 'ECO', 'Opening',
                    'TimeControl', 'Termination']
with open("../data/lichess_db_standard_rated_2013-01.pgn") as pgn:
    while True:
        game = chess.pgn.read_game(pgn)
        if game is None:
            break
        if not all(header in game.headers for header in required_headers) or '?' in game.headers['Event'] + \
                game.headers['Result'] + game.headers['WhiteElo'] + game.headers['BlackElo'] + game.headers[
            'WhiteRatingDiff'] + game.headers['BlackRatingDiff'] + game.headers['ECO'] + game.headers[
            'Opening'] + game.headers['TimeControl'] + game.headers['Termination']:
            continue
        proc_first.read_pgn_from_string(str(game.mainline_moves()))
        break

Засечем время, за которое мы получаем статистику по 1 игре

In [4]:
%%time
proc_first.calculate_wdl()
proc_first.calculate_evaluation_stat()
proc_first.calculate_n_best_lines(n=3)

Видим, что если мы захотим собрать статистику по большому датасету, то ударимся об то, что это будет НУ ОЧЕНЬ долго. Давайте попробуем ускорить этот процесс (начнем с разных процессов). Для начала поймем в каком формате хотим хранить данные. Давайте запоминать id игры, номер хода и сохранять всю имеющуюся статистику. Также будем хранить id для наших игр. Таким образом создадим свзять между датасетами

In [5]:
df_stat_first = proc_first.get_stats_per_move(add_wdl_stats=True, add_evaluation_stats=True, n_best_lines=3)
df_stat_first['game_id'] = 0
df_stat_first['move_number'] = np.arange(df_stat_first['game_id'].shape[0])
df_stat_first.head(3)

Отлично! Такой формат нас более чем устроит. Запомним его, дальше он нам пригодится

Но вот незадача, мы не знаем размеры нашего датафрейма. Давайте посмотрим на распределение классов. Хорошо что мы уже получали весь датафрейм, воспользуемся им

In [6]:
df = pd.read_csv('../data/lichess_db_standard_rated_2013-01.csv')
df['white_rating_num'] = df['white_elo'].apply(rating_to_number)
df['black_rating_num'] = df['black_elo'].apply(rating_to_number)
df['white_rating_num'].value_counts()

In [7]:
df['black_rating_num'].value_counts()

Мы знаем, что одна игра обрабатывается ~1 минуту. Если мы спим ~6 часов, тогда нам хватит на 360 игр! Тогда давайте возьмем с каждого класса ~500 игр, но т.к. в игре могут играть разные классы, то ~1000 участников с каждым рейтингом. Тогда посчитаем количество игр, которые будут в итоговом датасете

In [8]:
proc = Preprocessor()
MAX_USER_NUM = 1000
rating_num_count = {0: 0, 1: 0, 2: 0, 3: 0, 4: 0, 5: 0, 6: 0, 7: 0}

i = 0
required_headers = ['Event', 'Result', 'WhiteElo', 'BlackElo', 'WhiteRatingDiff', 'BlackRatingDiff', 'ECO', 'Opening',
                    'TimeControl', 'Termination']
with open("../data/lichess_db_standard_rated_2013-01.pgn") as pgn:
    while True:
        game = chess.pgn.read_game(pgn)
        if game is None:
            break

        if not all(header in game.headers for header in required_headers) or '?' in game.headers['Event'] + \
                game.headers['Result'] + game.headers['WhiteElo'] + game.headers['BlackElo'] + game.headers[
            'WhiteRatingDiff'] + game.headers['BlackRatingDiff'] + game.headers['ECO'] + game.headers[
            'Opening'] + game.headers['TimeControl'] + game.headers['Termination']:
            continue
        white_elo = int(game.headers['WhiteElo'])
        white_num = rating_to_number(white_elo)

        black_elo = int(game.headers['BlackElo'])
        black_num = rating_to_number(black_elo)
        if rating_num_count[white_num] < MAX_USER_NUM or rating_num_count[black_num] < MAX_USER_NUM:
            rating_num_count[white_num] += 1
            rating_num_count[black_num] += 1
            i += 1
i

Не густо, но будем работать с чем имеем. Если поймем, что данных мало, то на этот случай сохраним индексы игр в датасете, чтобы сохранять игры, на которых мы уже обучились

Теперь самое интересное, давай сохраним в 2 датасета:
1) Информацию о всей партии
2) Информацию про ходы для каждой партии
Сохранять мы будем для того, чтобы читать данные из csv, что хотя-бы реально по времени

In [9]:
proc = Preprocessor()
rating_num_count = {0: 0, 1: 0, 2: 0, 3: 0, 4: 0, 5: 0, 6: 0, 7: 0}
GAME_COUNT = i

i = 0
game_id = 0
required_headers = ['Event', 'Result', 'WhiteElo', 'BlackElo', 'WhiteRatingDiff', 'BlackRatingDiff', 'ECO', 'Opening',
                    'TimeControl', 'Termination']

events = np.empty(GAME_COUNT, dtype=object)
results = np.empty(GAME_COUNT, dtype=object)

white_elo = np.empty(GAME_COUNT, dtype=int)
black_elo = np.empty(GAME_COUNT, dtype=int)
white_rating_diff = np.empty(GAME_COUNT, dtype=int)
black_rating_diff = np.empty(GAME_COUNT, dtype=int)

ecos = np.empty(GAME_COUNT, dtype=object)
openings = np.empty(GAME_COUNT, dtype=object)

time_control = np.empty(GAME_COUNT, dtype=object)
termination = np.empty(GAME_COUNT, dtype=object)

game_ids = np.empty(GAME_COUNT, dtype=int)

all_moves = []

with open("../data/lichess_db_standard_rated_2013-01.pgn") as pgn:
    while True:
        game = chess.pgn.read_game(pgn)
        if game is None:
            break

        if not all(header in game.headers for header in required_headers) or '?' in game.headers['Event'] + \
                game.headers['Result'] + game.headers['WhiteElo'] + game.headers['BlackElo'] + game.headers[
            'WhiteRatingDiff'] + game.headers['BlackRatingDiff'] + game.headers['ECO'] + game.headers[
            'Opening'] + game.headers['TimeControl'] + game.headers['Termination']:
            continue
        game_white_elo = int(game.headers['WhiteElo'])
        white_num = rating_to_number(game_white_elo)

        game_black_elo = int(game.headers['BlackElo'])
        black_num = rating_to_number(game_black_elo)
        if rating_num_count[white_num] < MAX_USER_NUM or rating_num_count[black_num] < MAX_USER_NUM:
            rating_num_count[white_num] += 1
            rating_num_count[black_num] += 1

            events[i] = game.headers['Event']
            results[i] = game.headers['Result']
            white_elo[i] = game_white_elo
            black_elo[i] = game_black_elo
            white_rating_diff[i] = game.headers['WhiteRatingDiff']
            black_rating_diff[i] = game.headers['BlackRatingDiff']
            ecos[i] = game.headers['ECO']
            openings[i] = game.headers['Opening']
            time_control[i] = game.headers['TimeControl']
            termination[i] = game.headers['Termination']
            game_ids[i] = game_id

            all_moves.append(str(game.mainline_moves()))
            i += 1
        game_id += 1

df = pd.DataFrame({'Events': events, 'results': results, 'white_elo': white_elo, 'black_elo': black_elo,
                   'white_rating_diff': white_rating_diff, 'black_rating_diff': black_rating_diff, 'ecos': ecos,
                   'openings': openings, 'time_control': time_control, 'termination': termination,
                   'game_id': game_ids})


In [10]:
df.head(5)

Запишем этот data_frame в csv, чтобы с ним можно было быстрее работать

In [27]:
df.to_csv("../data/clear_data.csv", encoding='utf-8', index=False)

Теперь давайте напишем функцию, которая будет принимать отрезок ходов, которые мы хотим обрабатывать, соответсвующие индексы 

In [13]:
def get_preprocessed_df(l, r, moves, _game_ids):
    # count in [l, r)
    _all_stats = []
    for ind in range(l, r, 1):
        _proc = Preprocessor()
        _proc.read_pgn_from_string(moves[ind])
        _proc.calculate_wdl()
        _proc.calculate_evaluation_stat()
        _proc.calculate_n_best_lines(n=3)
        df_stat = _proc.get_stats_per_move(add_wdl_stats=True, add_evaluation_stats=True, n_best_lines=3)
        df_stat['game_id'] = _game_ids[ind]
        df_stat['move_number'] = np.arange(df_stat['game_id'].shape[0])
        _all_stats.append(df_stat)

    return _all_stats

Давайте проверим, действительно ли с потоками программа будет выполняться быстрее. Для этого засечем время и проверим на отрезке [0, 9)

In [14]:
%%time
all_stats_without_thread = get_preprocessed_df(0, 9, all_moves, game_ids)

In [16]:
%%time
from concurrent.futures import ThreadPoolExecutor

with ThreadPoolExecutor() as executor:
    future1 = executor.submit(get_preprocessed_df, 0, 3, all_moves, game_ids)
    future2 = executor.submit(get_preprocessed_df, 3, 6, all_moves, game_ids)
    future3 = executor.submit(get_preprocessed_df, 6, 9, all_moves, game_ids)

    return1 = future1.result()
    return2 = future2.result()
    return3 = future3.result()

len(return1 + return2 + return3)

Прирост хоть и не в 3 раза, но какой-то да есть. Давайте попробуем запустить процессы и посмотреть как долго они будут работать. Для них нужна своя фунция, которая будет принимать очередь (с помощью которой умеют общаться потоки)

In [18]:
def get_preprocessed_df_in_process(l, r, moves, _game_ids, return_dict, process_id):
    # count in [l, r)
    _all_stats = []
    for ind in range(l, r, 1):
        _proc = Preprocessor()
        _proc.read_pgn_from_string(moves[ind])
        _proc.calculate_wdl()
        _proc.calculate_evaluation_stat()
        _proc.calculate_n_best_lines(n=3)
        df_stat = _proc.get_stats_per_move(add_wdl_stats=True, add_evaluation_stats=True, n_best_lines=3)
        df_stat['game_id'] = _game_ids[ind]
        df_stat['move_number'] = np.arange(df_stat['game_id'].shape[0])
        _all_stats.append(df_stat)
    return_dict[process_id] = _all_stats

In [20]:
%%time
from multiprocessing import Process, Manager

with Manager() as manager:
    return_dict = manager.dict()
    processes = [
        Process(target=get_preprocessed_df_in_process, args=(i * 3, (i + 1) * 3, all_moves, game_ids, return_dict, i))
        for i in range(3)]
    for p in processes:
        p.start()
    for p in processes:
        p.join()
    print(len(list(return_dict.values())))


Процессы нас тоже не спасли (почему-то). Тогда давайте напишем на потоках, прочитаем первые 240 записей из 8 потоков. Для этого сначала проверим что результаты потоки вернули верные (сравним их с результатами 1 потока), а затем поставим так скажем на загрузгу наши потоки

8 потоков обрабатывали (каждый) 1 игру в сумме 4 минуты. Что примерно 30 секунд на игру
16 потоков обрабатывали (каждый) 1 игру в сумме 8 минуты. Что примерно также 30 секунд на игру. Поэтому оставим 8 потоков 

In [31]:
%%time
from concurrent.futures import ThreadPoolExecutor

THREAD_COUNT = 8
STEP = 30
all_res = []
with ThreadPoolExecutor() as executor:
    all_future = [executor.submit(get_preprocessed_df, i * STEP, (i + 1) * STEP, all_moves, game_ids) for i in
                  range(THREAD_COUNT)]
    for future in all_future:
        all_res += future.result()
len(all_res)

In [32]:
all_res[239]

In [33]:
res = pd.concat([i for i in all_res])
res.to_csv("../data/first_240.csv", encoding='utf-8', index=False)

Предобработаем ещё 120 игр

In [37]:
%%time
from concurrent.futures import ThreadPoolExecutor

THREAD_COUNT = 8
STEP = 15
all_res_120 = []
with ThreadPoolExecutor() as executor:
    all_future = [executor.submit(get_preprocessed_df, i * STEP + 240, (i + 1) * STEP + 240, all_moves, game_ids) for i in
                  range(THREAD_COUNT)]
    for future in all_future:
        all_res_120 += future.result()
len(all_res_120)

In [38]:
res_120 = pd.concat([i for i in all_res_120])
res_120.to_csv("../data/from_240_to_360.csv", encoding='utf-8', index=False)