In [48]:
import os
import glob
import json
import ndjson
import shutil
import requests

def load_json_from_api(url, header=None, params = None, fmt = None):
    response = requests.get(url, headers=header, params=params)
    print(url)
    if response.status_code == 200:
        
        #return response
        if fmt == None:
            json_data = response.json()
        else:
            json_data = response.json(cls=fmt)
        return json_data
    else:
        print(f"Error: Failed to retrieve data from the API. Status Code {response.status_code}")
        return None


def get_chess_com_games(username=None, time_formats=None):
    
    with open(f".\\config.json", "r") as f:
        config = json.load(f)
        
    if username==None:
        username = config.get("chess_com_user")
    if time_formats == None:
        time_formats = config.get("time_formats")
    imported_dir = config.get("imported_dir")
    header = {'User-Agent': config.get("email")}
    api_url = f"https://api.chess.com/pub/player/{username}/games/archives"
    monthlyURLS = load_json_from_api(api_url, header)
    if monthlyURLS is None:
        return None
    
    monthlyURLS = monthlyURLS.get("archives")
    
    games = []

    for url in monthlyURLS:
        month = load_json_from_api(url, header)
        games.extend(month.get("games"))
        print(len(games))
    games = list(filter(lambda g: g.get("rated") == True and g.get("time_format") in time_formats and g.get("initial_setup") == "rnbqkbnr/pppppppp/8/8/8/8/PPPPPPPP/RNBQKBNR w KQkq - 0 1", games))
    for g in games:
        g["variant"] = "standard" if g.get("rules") == "chess" else g.get("rules")
        g["platform"] = "chess.com"
        g["player_color"] = "white" if g.get("white").get("username") == username else "black"
    

    
    with open(f".\\{imported_dir}\\games.json", "w") as f:
        json.dump(games,f)
    

    return games


def get_lichess_games(username=None, time_formats=None):
        
    with open(f".\\config.json", "r") as f:
        config = json.load(f)
        
    if username==None:
        username = config.get("lichess_user")
    if time_formats == None:
        time_formats = config.get("time_formats")
    imported_dir = config.get("imported_dir")
    header = {"Authorization": f"Bearer {config.get('lichess_token')}",
             "Accept": "application/x-ndjson"}

    if username==None:
        username = config.get('user')

    api_url = f"https://lichess.org/api/games/user/{username}"
    
    params = {
        "pgnInJson": True,
        
    }
    
    # params = {
    #     "since": since,
    #     "until": until,
    #     "max": max,
    #     "vs": vs,
    #     "rated": rated,
    #     "perfType": perf_type,
    #     "color": color,
    #     "analysed": analysed,
    #     "moves": moves,
    #     "pgnInJson": "true",
    #     "tags": tags,
    #     "clocks": clocks,
    #     "evals": evals,
    #     "opening": opening,
    #     "ongoing": ongoing,
    #     "finished": finished,
    #     "players": players,
    #     "sort": sort,
    #     "literate": literate,
    # }
    games = load_json_from_api(api_url,header=header, params = params, fmt = ndjson.Decoder)
    
    games = list(filter(lambda g: g.get("rated") == True and g.get("variant")== "standard" and g.get("perf") in time_formats, games))
    print(len(games))
    for g in games:
        g["end_time"] = g.get("lastMoveAt")
        g["time_class"] = g.get("speed")
        g["winner"] = g.get("winner", "draw")
        players = g.pop("players", None)
        
        g["white"] = players.get("white")
        g["white"]["username"] = g.get("white").pop("user", None).get("id")
        g["black"] = players.get("black")
        g["black"]["username"] = g.get("black").pop("user", None).get("id")
        g["platform"] = "lichess"
        g["player_color"] = "white" if g.get("white").get("username") == username else "black"
        

    white_games = list(filter(lambda g: g.get("player_color") == "white", games))
    black_games = list(filter(lambda g: g.get("player_color") == "black", games))
    
    with open(f".\\{imported_dir}\\lc_{username}_all_games.json", "w") as f:
        json.dump(games,f)
   

    return games

def import_pgn_file(file_path, username=None):
    
    with open(f".\\config.json", "r") as f:
        config = json.load(f)
    if username==None:
        username = config.get('name')
    imported_dir = config.get("imported_dir")
    with open(f".\\{file_path}{'.pgn' if not file_path.endswith('.pgn') else''}", "r") as f:
        lines = f.readlines()
    games = []
    g = None
    pgn=""
    tag_start_flag = True
    for line in lines:
        pgn+=f"{line}\n"
        if line.startswith("[Event"):
            if g!= None:
                g["pgn"]=pgn
                games.append(g)
                pgn=""
            g=dict()
            tag_start_flag = True
        if line[0] == "[":
            tag = line[1:].split('"')
            tag[0]=tag[0].lower()
            g[tag[0]] = tag[1]
            if (tag[0] == "white" or tag[0] == "black") and tag[1] == username:
                g["player_color"] = tag[0]
            
        elif line.startswith("1."):
            g["moves"]=f"{g.get('moves','')}{line}\n"
    if g!= None:
        g["pgn"]=pgn
        games.append(g)
        pgn=""
    file_name = file_path.split("\\")[-1]
    with open(f".\\{imported_dir}\\pgn_{file_name.split('.')[0]}_all_games.json", "w") as f:
        json.dump(games,f)
        
    return games

def ingest_pgns(pgn_files=None, username=None):
    with open(f".\\config.json", "r") as f:
        config = json.load(f)
    if pgn_files == None:
        pgn_files = config.get("ingest_dir")
    ingested_dir = config.get("ingested_dir", None)
    games = []
    if os.path.isdir(pgn_files):
        pgn_files = glob.glob(os.path.join(pgn_files, "*.pgn"))
        if not pgn_files:
            print(f"{pgn_files} is empty or contains no .pgn files")
        for pgn_file in pgn_files:
            print(f"Ingesting {pgn_file}")
            games.extend(import_pgn_file(pgn_file, username))
            print(f"Done ingesting {pgn_file}")
            if ingested_dir:
                if not os.path.exists(ingested_dir):
                    os.makedirs(ingested_dir)
                file_name = pgn_file.split('\\')[-1]
                move_file = f"{ingested_dir}\\{file_name}"
                print(f"Moving to {move_file}")
                shutil.move(pgn_file, ingested_dir)
        
        return games
    if os.path.isfile(pgn_files) and pgn_files.lower().endswith(".pgn"):
        return import_pgn_file(pgn_files)
    

def import_all_games(file_name=None, chess_com_user=None, lichess_user=None, pgns_files=None, time_formats = None):
    
    
    with open(f".\\config.json", "r") as f:
        config = json.load(f)
    imported_dir = config.get("imported_dir")
    if not os.path.exists(imported_dir):
        os.makedirs(imported_dir)
    if file_name == None:
        file_name = f"{config.get('name')}"
    if chess_com_user == None:
        chess_com_user = config.get("chess_com_user", False)
    if lichess_user == None:
        lichess_user = config.get("lichess_user", False)
    if pgns_files == None:
        pgns_files = config.get("ingest_dir", False)
    
    full_file_path = f"{imported_dir}\\{file_name}_all_games.json"
    if os.path.isfile(full_file_path):
        print(f"File {full_file_path} is already found so appending games")
        with open(full_file_path, "r") as f:
            games = json.load(f)
    else:
        games = []
    if chess_com_user:
        print(f"Ingesting Chess.com Games from user {chess_com_user}")
        #games.extend(get_chess_com_games(chess_com_user,time_formats))
    if lichess_user:
        print(f"Ingesting Lichess Games from user {lichess_user}")
        #games.extend(get_lichess_games(lichess_user,time_formats))
    if pgns_files:
        print(f"Ingesting Pgns files {pgns_files}")
        games.extend(ingest_pgns(pgns_files))  
    #print(games)
    white_games = []#list(filter(lambda g: g.get("player_color","") == "white", games))
    black_games = []#list(filter(lambda g: g.get("player_color","") == "black", games))
    
    with open(f".\\{imported_dir}\\{file_name}_all_games.json", "w") as f:
        json.dump(games,f)
    with open(f".\\{imported_dir}\\{file_name}_white_games.json", "w") as f:
        json.dump(white_games,f)
    with open(f".\\{imported_dir}\\{file_name}_black_games.json", "w") as f:
        json.dump(black_games,f)
    return games

In [47]:
%%time
games = import_all_games()

File imported_pgns\Remy Rouyer_all_games.json is already found so appending games
Ingesting Chess.com Games from user remyrouyer
Ingesting Lichess Games from user remyrouyer
Ingesting Pgns files ingest_pgns
Ingesting ingest_pgns\chess_com_games_2021-10-17.pgn
Done ingesting ingest_pgns\chess_com_games_2021-10-17.pgn
Moving to ingested_pgns\chess_com_games_2021-10-17.pgn
Ingesting ingest_pgns\lichess_study_urusov-gambit_by_remyrouyer_2020.11.21.pgn
Done ingesting ingest_pgns\lichess_study_urusov-gambit_by_remyrouyer_2020.11.21.pgn
Moving to ingested_pgns\lichess_study_urusov-gambit_by_remyrouyer_2020.11.21.pgn
CPU times: total: 15.6 ms
Wall time: 20.5 ms


In [49]:
games

[[{'event ': 'Live Chess',
   'site ': 'Chess.com',
   'date ': '2021.10.15',
   'round ': '-',
   'white ': 'remyrouyer',
   'black ': 'VJ100NIA',
   'result ': '0-1',
   'whiteelo ': '1662',
   'blackelo ': '1687',
   'timecontrol ': '600',
   'endtime ': '23:18:25 PDT',
   'termination ': 'VJ100NIA won by checkmate',
   'moves': '1. e4 c5 2. c3 e6 3. d4 d5 4. e5 Nc6 5. Bb5 Qb6 6. Qa4 Bd7 7. Be3 a6 8. dxc5\n\n',
   'pgn': '[Event "Live Chess"]\n\n[Site "Chess.com"]\n\n[Date "2021.10.15"]\n\n[Round "-"]\n\n[White "remyrouyer"]\n\n[Black "VJ100NIA"]\n\n[Result "0-1"]\n\n[WhiteElo "1662"]\n\n[BlackElo "1687"]\n\n[TimeControl "600"]\n\n[EndTime "23:18:25 PDT"]\n\n[Termination "VJ100NIA won by checkmate"]\n\n\n\n1. e4 c5 2. c3 e6 3. d4 d5 4. e5 Nc6 5. Bb5 Qb6 6. Qa4 Bd7 7. Be3 a6 8. dxc5\n\nQxb5 9. Qxb5 axb5 10. Ne2 Be7 11. Na3 Nxe5 12. O-O Nc4 13. Nxc4 bxc4 14. Nd4 Bc6\n\n15. Nxc6 bxc6 16. a4 Nf6 17. a5 O-O 18. a6 Rfb8 19. Bf4 Rxb2 20. a7 Nh5 21. Bb8\n\nRbxb8 22. axb8=Q+ Rxb8 23. Rfb1 Rx

In [3]:
n == None

False