### Install Libraries and Define constants and helper functions

In [0]:
%pip install zstandard python-chess tqdm

In [0]:
dbutils.library.restartPython()

In [0]:
import io
import os
import json
import copy
import chess
import pickle
import random
import chess.pgn
import pandas as pd
from tqdm import tqdm
import zstandard as zstd
PARENT_CHESS_GPT = '/Volumes/unitygo/lichess/engine/chessgpt'
CHESS_GPT_SFT = '/Volumes/unitygo/lichess/engine/chessgpt/chessgpt_sft_data/'
CHESS_GPT = '/Volumes/unitygo/lichess/engine/chessgpt/chessgpt_data/'

In [0]:
def parse_lichess_pgn(pgn_content):
    """
    Parses a Lichess PGN file and returns structured data with headers, moves, comments, and game state.
    Returns a list of games (for files with multiple games).
    """
    games = []
    pgn = io.StringIO(pgn_content)
    while True:
        game = chess.pgn.read_game(pgn)
        if not game:
            break
        headers = dict(game.headers)
        moves = []
        board = game.board()
        node = game
        move_number = 1
        while node.variations:
            node = node.variation(0)
            move = node.move
            move_data = {
                "move_number": move_number,
                "turn": "white" if board.turn == chess.WHITE else "black",
                "san": board.san(move),
                "uci": move.uci(),
                "comment": node.comment,
                "fen_before": board.fen(),
            }
            try:
                move_data["nags"] = [chess.pgn.symbol_for_nag(nag) for nag in node.nags]
            except:
                move_data["nags"] = []
            # Update board state
            board.push(move)
            move_data["fen_after"] = board.fen()
            moves.append(move_data)
            move_number += (0 if board.turn == chess.BLACK else 1)  # Increment after black moves
        games.append({
            "headers": headers,
            "moves": moves,
            "termination": headers.get("Termination", ""),
            "result": headers.get("Result", "")
        })
    return games

structured_game_data = []

system_prompt = {"role": "system", "content": """
### INSTRUCTIONS:
You are a professional chess commentator. Your task is to provide an in-depth analysis of the following chess game moves and overall board state. Follow these guidelines:

1. Add detailed commentary related to the chess game moves, overall board state, player profile details like Elo rating, and tournament details if available, for example time control or game variant.
2. Explain the opening principles, key middlegame plans, and the final outcome. 
3. Explain what the player might be thinking with each move on the game.
4. Do not add any text before or after the list.
5. Not every move is supposed to have a commentary, make sure you sound as human as possible.
"""}
def generate_commentary(pgn_text_user_message,pgn_text_assistant_prompt):
    """Converts a chess game JSON entry into a prompt-response format suitable for generating expert-level commentary."""
    user_prompt = {"role": "user", "content": str(pgn_text_user_message)}
    assistant_prompt = {"role": "assistant", "content": str(pgn_text_assistant_prompt)}
    return {'messages':[system_prompt, user_prompt, assistant_prompt]}

### Data Read and Conversion - Run only once as data gets saved into Volumes in a pickle file

In [0]:
CHESS_CLIP = '/Volumes/unitygo/lichess/engine/chessgpt/chessclip_data/annotated_pgn/'
for folder in os.listdir(CHESS_CLIP):
    if 'tar.gz' in folder:
        pass
    else:
        for file in tqdm(os.listdir(CHESS_CLIP+folder)):
            try:
                with open(f"{CHESS_CLIP}{folder}/{file}", encoding='latin-1') as f:
                    pgn_content = f.read()
                    games = parse_lichess_pgn(pgn_content)
                    if games:
                        structured_game_data.append(games[0])
            except:
                print(f"{CHESS_CLIP}{folder}/{file}")
pickle.dump(structured_game_data, open(f"{PARENT_CHESS_GPT}/structured_game_data.pkl", "wb"))

### Prompt Design for Chat Completion

In [0]:
structured_game_data = pickle.load(open(f"{PARENT_CHESS_GPT}/structured_game_data.pkl", "rb"))
output_dict_list_with_header = []
updated_moves_list_with_header = []
exclusion_header = ['Result','ECO','Opening','Termination','termination','result']
structured_game_data_updated = []
for item in structured_game_data:
    item['headers'].pop('Annotator',None)
    structured_game_data_updated.append(item)
output_dict_list_with_header = copy.deepcopy(structured_game_data_updated)
for item_2 in structured_game_data:
    for x in exclusion_header:
        item_2['headers'].pop(x,None)
    updated_moves_list = []
    for x in item_2['moves']:
        x.pop('comment',None)
        x.pop('nags',None)
        updated_moves_list.append(x)
    updated_moves_list_with_header.append({'headers':item_2['headers'],'moves':updated_moves_list})
print(len(updated_moves_list_with_header),len(output_dict_list_with_header))
len_moves = len(updated_moves_list_with_header)
random_split = len_moves - random.randint(int(len_moves*0.05),int(len_moves*0.1))
train_list_user,train_list_assistant = updated_moves_list_with_header[:random_split],output_dict_list_with_header[:random_split]
eval_list_user,eval_list_assistant = updated_moves_list_with_header[random_split:],output_dict_list_with_header[random_split:]
print(len(train_list_user),len(train_list_assistant),len(eval_list_user),len(eval_list_assistant))
train_list = []
eval_list = []
for user,assistant in zip(train_list_user,train_list_assistant):
    train_list.append(generate_commentary(user,assistant))
for user,assistant in zip(eval_list_user,eval_list_assistant):
    eval_list.append(generate_commentary(user,assistant))
print(len(train_list),len(eval_list))
train_df = spark.createDataFrame(pd.DataFrame(train_list))
eval_df = spark.createDataFrame(pd.DataFrame(eval_list))
train_df.write.mode('overwrite').saveAsTable("unitygo.lichess.chessgpt_chat_completion_train")
eval_df.write.mode('overwrite').saveAsTable("unitygo.lichess.chessgpt_chat_completion_eval")
display(train_df)
display(eval_df)

In [0]:
# !rm /Volumes/unitygo/lichess/engine/chessgpt/train_data.jsonl
# !rm /Volumes/unitygo/lichess/engine/chessgpt/merged_data.jsonl
# !rm /Volumes/unitygo/lichess/engine/chessgpt/eval_data.jsonl

# merge_file = '/Volumes/unitygo/lichess/engine/chessgpt/merged_data.jsonl'
# train_file = '/Volumes/unitygo/lichess/engine/chessgpt/train_data.jsonl'
# eval_file = '/Volumes/unitygo/lichess/engine/chessgpt/eval_data.jsonl'

In [0]:
# def merge_jsonl_files(output_path, eval_path,input_dict): #, additional_files=[]
#     eval_list = []
#     train_list = []
#     for category, file_list in input_dict.items():
#         if category == 'annotated_pgn':
#             for file_path in file_list:
#                 if file_path.endswith('dataset_info.json'):
#                     continue
#                 else:
#                     counter = 0
#                     with open(file_path, 'r') as infile:
#                         len_infile = sum(1 for _ in infile)
#                         infile.seek(0)
#                         random_split = len_infile - random.randint(int(len_infile*0.05),int(len_infile*0.1))
#                         for line in infile:
#                             if counter == random_split:
#                                 eval_list.append(json.loads(line))
#                             else:
#                                 train_list.append(json.loads(line))
#                                 counter += 1
#     print(f"Merged {len(train_list)} train lines and {len(eval_list)} eval lines"            )                  
#     with open(eval_path, 'w') as evalfile:
#         for line in eval_list:
#             #prompt_design = generate_commentary(line)
#             #if prompt_design:
#             json.dump(line, evalfile) #prompt_design
#             evalfile.write("\n")

#     with open(output_path, 'w') as outfile:
#         for line in train_list:
#             #prompt_design = generate_commentary(line)
#             #if prompt_design:
#             json.dump(line, outfile) #prompt_design
#             outfile.write("\n")
#         # if additional_files:
#         #     for file_path in additional_files:
#         #         try:
#         #             with open(file_path, 'r') as infile:
#         #                 for line in infile:
#         #                     try:
#         #                         json.loads(line)
#         #                         outfile.write(line)
#         #                     except json.JSONDecodeError:
#         #                         print(f"Invalid JSON in {file_path}, skipping line")
#         #         except FileNotFoundError:
#         #             print(f"File not found: {file_path}")
#         return train_list, eval_list
# train_list, eval_list = merge_jsonl_files(train_file,eval_file, files_dict) #files_in_chess_gpt_sft
# print(f"Merged all JSONL files into {train_file} and {eval_file}")

In [0]:


# # Access structured data
# for game in games:
#     print("Headers:", game["headers"])
#     print("Result:", game["result"])
#     for move in game["moves"]:
#         print(f"Move {move['move_number']} ({move['turn']}):")
#         print(f"  SAN: {move['san']}")
#         print(f"  Comment: {move['comment']}")
#         print(f"  NAGs: {move['nags']}")
#         print(f"  FEN Before: {move['fen_before']}")
#         print(f"  FEN After: {move['fen_after']}")
#         print("-" * 40)

In [0]:
# !pip install chess
### !tar -xvzf /Volumes/unitygo/lichess/engine/chessgpt/chessclip_data/annotated_pgn/annotated_pgn_free.tar.gz -C /Volumes/unitygo/lichess/engine/chessgpt/chessclip_data/annotated_pgn/
# import subprocess
# CHESS_CLIP = '/Volumes/unitygo/lichess/engine/chessgpt/chessclip_data/annotated_pgn/'
# !rm /Volumes/unitygo/lichess/engine/chessgpt/chessclip_data/annotated_pgn/gameknot/game_44.jsonl
# for folder in os.listdir(CHESS_CLIP):
#     if 'tar.gz' in folder:
#         pass
#     else:
#         for file in os.listdir(CHESS_CLIP+folder):
#             if 'game_44' in file:
#                 print(CHESS_CLIP+folder+file)
#             else:
#                 continue
#             awk_command = [
#                             'awk',
#                             '-v', 'FPAT=([^ ]*)|("[^"]+")',
#                             '-f', '/Volumes/unitygo/lichess/engine/chessgpt/pgn_converter/convert_pgn_to_json.awk',
#                             f'{CHESS_CLIP}{folder}/{file}'
#                             ]
#             # python_command = [
#             #     'python',
#             #     '/Volumes/unitygo/lichess/engine/chessgpt/pgn_converter/pgn_to_json.py',
#             #     f'{CHESS_CLIP}{folder}/{file}',
#             # ]
#             result = subprocess.run(awk_command, text=True, capture_output=True)
#             if result.returncode == 0:
#                 print("Output:\n", result.stdout)
#             else:
#                 print("Error:\n", result.stderr)
#             break