In [None]:
import chess as chess_old
import chess.polyglot
chess_old.__version__
#for maia, we need python chess version '0.25.0'

#https://github.com/lichess-org/berserk
import berserk
import random
import time
import datetime
import pickle
import pandas as pd
import openpyxl
import math
import os 
import requests
from bs4 import BeautifulSoup
import statistics

#import library for game analysis here
from get_analysis import analyze_game

from openai import OpenAI
from dotenv import load_dotenv, find_dotenv
# find .env automagically by walking up directories until it's found

dotenv_path = find_dotenv()

# load up the entries as environment variables
load_dotenv(dotenv_path)


In [None]:
import json
from maia_handler import EngineHandler

with open('session_properties.json') as f:
    session_properties = json.load(f)

#https://lczero.org/play/quickstart/
#need lczero installed and this path updated
engine_path = session_properties['engine_path']

#https://github.com/CSSLab/maia-chess
weights_path = session_properties['weights_path']
maia_handler = EngineHandler(engine_path,weights_path, threads=2)
board = chess_old.Board('r1b1kbnr/pppp1ppp/8/4N1q1/2BnP3/8/PPPP1PPP/RNBQK2R w KQkq - 1 5')

#maia picks a blunder...
maia_handler.getBoardProbs(board, nodes = 1)


bots = ['ucb_123','chess4gerry','BCU_555','sandman353','bingman_nyc']

In [None]:
#function to pass board to maia handler to get best maia move
def get_maia_move(board):
    moves = maia_handler.getBoardProbs(board, nodes = 1)[1]
    nodes = len(moves)
    move = moves[nodes-2].split(" ")[0]
    return move

def getTime():
    return datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')

#class to manage an experiment session
class ExperimentSession():
    def __init__(self, token, session_minutes = 60, rated=True, do_game_logging=True):
        #initialize a token session and client using the berserk library
        self.session = berserk.TokenSession(token)
        self.client = berserk.Client(session=self.session)

        #get the lichess account id associated with the session
        self.li_account = berserk.clients.Account(self.session).get()['id']

        #set the duration of the session using the session_minutes
        self.session_minutes = session_minutes

        #record the start time of the session using epoch time
        self.start_time = time.time() #epoch time

        #initialize games played to be 0
        self.games_played = 0

        #defaulted to True, but will be controlled by rated_games flag in
        #session_properties.json
        self.rated = rated

        #initialize an empty set to keep track of unique games played
        self.game_set = set()

        #initialize a variable to store the current game's information
        self.current_game = ''

        self.do_game_logging = do_game_logging

    def find_start_game(self):
        #retrive a list of ongoing games from the lichess API
        games = self.client.games.get_ongoing()

        while len(games) == 0:
            #seek finds/matches to a game
            self.seek_match_time = self.client.board.seek(10,0,self.rated)

            #update the list of ongoing games
            games = self.client.games.get_ongoing()

            #wait 1 second before checking again
            time.sleep(1)
            
        #get first game, should be only one    
        game = games[0]
        self.game_set.add(game['gameId']) #add the game id to set of played games
        self.games_played = len(self.game_set) #update number of games played
        print(game)
        #create an ExperimentGame instance with the game details and start playing the game
        experiment_game = ExperimentGame(game,self.session, self.client, self.li_account, self.do_game_logging)
        if experiment_game.aborted_game:
            print("we have aborted")
        else:
            experiment_game.play_game()
        # experiment_game.log_game()


    def check_session_run_flag(self):
        #open and read session_properties.json file
        with open('session_properties.json') as f:
        
            #load the JSON data from the file into the session_properties variable
            session_properties = json.load(f)
        
            #update the session_minutes attribute based on
        #the 'length_minutes' value in the JSON file
        self.session_minutes = session_properties['length_minutes']

        #return the run_flag value (either true or false)
        return session_properties['run_flag']

    def continue_session(self):
        #check if the session run flag is set to false
        if not self.check_session_run_flag():
            print("session run flag is set to false")
            print("change session flag to true if you want to play")
            return False

        #calculate the total minutes played so far in the session and
        #assign to variable
        minutes_played = int((time.time() - self.start_time) / 60)

        #print total minutes played and the remaining session time
        print(minutes_played, " minutes played, ",self.session_minutes - int(minutes_played)," minutes left")

        #return True if the total minutes played is less
        #than the specified session duration, else return False
        return minutes_played < self.session_minutes
        
    def run(self):
        #while the session flag remains "true"
        while self.continue_session():
            #print the current time and a message indicating that
            #the session is still within the specified time
            print(getTime(),":still_within_session_time")
            
            #check to see if we've played 8 games during the session
            if self.games_played > 0 and self.games_played % 8 == 0:
                print("Games Played:",self.games_played, '  Wheew! Taking a Break')
                #if 8, ten minute games in a row, take a break, like a human
                time.sleep(random.randint(150,450))
            else:
                #else short break
                if self.games_played > 0:
                    print("Games Played:",self.games_played)
                    time.sleep(random.randint(1,40))
            print(getTime(),":find_start_game")
            self.find_start_game()  

class ExperimentGame():
    def __init__(self, game, session, client, li_account, do_logging=True):
        """
        Class to manage an experiment game

        Parameters:
        - game: dictionary containing game information
        - session: session information related to the game
        - client: client information associated with the game
        - li_account: lichess account information related to the game
        - do_logging: set to True to analyze and log games

        Initializes various attributes representing the state of the game.
        """

        #basic game information
        self.session = session
        self.client = client
        self.li_account = li_account
        self.game_id = game['gameId']
        self.full_id = game['fullId']
        self.color = game['color']
        self.rated = None
        self.opponent = game['opponent']['username']
        self.do_logging = do_logging


        #chess boards and book initialization
        self.lichess_board = berserk.clients.Board(berserk_session, "https://lichess.org/" + self.full_id)
        self.board = chess_old.Board()
        self.book = chess.polyglot.open_reader("polyglot/Human.bin")

        #randomly assign a treatment type
        self.treatment_type = random.randint(0,1)
        print("     This game will be treatment type: ", self.treatment_type)
        

        #events and game state tracking initialization
        self.chat = ChessChat(self.treatment_type, self.li_account)

        self.events = []
        self.playing = True
        self.my_turn = []
        self.seconds_left = 0
        self.last_move = []
        self.fen = []
        self.has_moved = []
        self.num_moves = 0
        self.num_book_moves = 5
        self.exceptions = 0

        self.aborted_game = self.have_we_played_opp()


    def have_we_played_opp(self):
        """ abort game if we've played this person before"""

        params = {
            "vs":self.opponent,
        }
        headers = {
            "Accept": "application/x-ndjson"
        }
        
        for botname in bots:
            # API endpoint
            api_url = f"https://lichess.org/api/games/user/{botname}"
            # Make the GET request
            response = requests.get(api_url, params=params, headers=headers)

            soupy = BeautifulSoup(response.text, 'html.parser')
            games = soupy.prettify()
            
            if games == '':
                #this bot hasn't played opp
                pass
            else:
                print("bot ",botname, " has played this opponent ",self.opponent, " , abort game")
                self.lichess_board.abort_game(self.game_id)
                return True

        return False

    def get_game_update(self):
        #get a list of ongoing games
        games = self.client.games.get_ongoing()

        #if there is a game ongoing, retrieve the first one
        if len(games) > 0:
            game = games[0]

        #otherwise, update playing flag to False
        else:
            self.playing = False
            print(getTime(),':no game')
            if self.num_moves > 1 and self.do_logging:
                self.log_game()
            return 0

        #update game state based on returned information from above
        self.last_move = game['lastMove']
        self.my_turn = game['isMyTurn']
        self.seconds_left = game['secondsLeft']
        self.fen = game['fen']
        self.has_moved = game['hasMoved']
        self.rated = game['rated']

        #update chess board
        self.board = chess.Board(self.fen)
        return 1

    def get_book_move(self):
        #try to find teh main entry in the opening book
        #for the current board position
        #if found, return in uci format
        try:
            main_entry = self.book.find(self.board)
            move = main_entry.move().uci()
        #exception if no bookmoves are left
        except:
            print('     no bookmoves left')
            self.num_book_moves = 0 #update num_book_moves to 0
            move = '' #set move to empty string
        return move
    
    def do_game_chat(self):
        #initiate empty message variable
        msg = ''

        #get a scripted message based on the move number
        msg = self.chat.get_message(self.num_moves)
        msg_len = len(msg)
        if msg_len > 0:
            #sleep based on length of message and some small random number
            time.sleep(msg_len/15 + random.random())

            #got fancy and sometimes we want to post two messages in a row.. feels more natural chat, e.g hello <enter> how are you today?
            msgs = msg.split(" <enter> ")
            for i, ms in enumerate(msgs):
                #post the message
                self.lichess_board.post_message(self.game_id,ms)
                #if multi msg post and on first post, sleep a little
                if len(ms) > 1 and i == 0: time.sleep(.3)

    def make_move(self):
        #random delay between each move
        time.sleep(random.randint(1,8) + random.random())
        move = ''

        #if the number of moves is within the limit specified by
        #num_book_moves, try to get a book move
        if self.num_moves <= self.num_book_moves:
            move = self.get_book_move()

        #if no bookmove is available,
        #get a move from maia
        if move == '':
            move = get_maia_move(self.board)

        #make the associated book move or maia move based
        #on outcome above
        self.lichess_board.make_move(self.game_id,move)

    def log_game(self):
        print("     about to pickle")
        #jerry click_lichess_analysis(experiment_game.game_id)
        analyze_game(self.game_id)
        print("     Clicked analysis button, waiting for analysis to process (10 second sleep)")
        #would be better to parse, clean events into df, but pickeling for now to save time now, analyze later
        time.sleep(10)
        print("     ...pulling analysis")
        stats = {}
        stats = self.get_analysis()
        if len(stats) == 0:
            print("No analysis for game: ", self.game_id, " creating short pkl file")
        print("     got analysis")
        
        if self.treatment_type == 0:
            chat_type = 'Control'
        else:
            chat_type = self.chat.chat_treat_type
        
        exp_info = dict(game_id = self.game_id,
                             rated = self.rated,
                             treat_type = self.treatment_type,
                             chat_type = chat_type,
                             opp_respond = self.chat.opponent_has_responded)
        
        pickle_dict = exp_info | stats
        #create a filename for the pickle file based on account and game information
        file_name = 'game_logs/gpt_exp/{}-{}-log.pkl'.format(self.li_account,self.game_id)
        with open(file_name, 'ab+') as fp:

            #dump game metadata into the pickle file
            pickle.dump(pickle_dict, fp)

            #dump the events list into the pickle file
            pickle.dump(self.events,fp)
        print("     pickled!")
        
    def get_analysis(self):
        print("     entered get_analysis")
        username = self.li_account
        # API endpoint
        api_url = f"https://lichess.org/api/games/user/{username}"

        # Query parameters
        params = {
            #"analysed": "true", #show only games that have been analysed
            "evals":"true",
            "accuracy":"true",
            "literate":"true",
            "since":"1696866406816"}

        # Set the Accept header for ndjson format
        headers = {
            "Accept": "application/x-ndjson"}
        stats = {}
        data_found = 0

        # Retry 3 times
        for i in range(3):
            # Make the GET request
            response = requests.get(api_url, params=params, headers=headers)
            soupy = BeautifulSoup(response.text, 'html.parser')
            game_with_acc = soupy.prettify()
            df = pd.read_json(game_with_acc, lines=True)
            last_game = pd.DataFrame(df).iloc[0]

            # Check if analysis is ready
            if 'analysis' not in last_game['players']['white'].keys():
                print('     no analysis found')
                print("     analysis could not be pulled, trying again ", i+1, "/3")
                analyze_game(self.game_id)
                time.sleep(20)
            
            if 'analysis' in last_game['players']['white'].keys():
                data_found = 1

                # get game time and convert to seconds
                stats["gametime_seconds"] = (last_game['lastMoveAt'] - last_game['createdAt'])/1000
                # get how game was won
                stats["status"] = last_game['status'] 
                # find which player is white and black
                if last_game['players']['white']['user']['name'].lower() == username.lower():
                    stats["maia_color"] = 'white'
                    white_player = "maia"
                    black_player = "opp"
                    opp_name = last_game['players']['black']['user']['name']
                    opp_name_hashed = hash(opp_name)

                else:
                    stats["maia_color"] = 'black'
                    black_player = "maia"
                    white_player = "opp"
                    opp_name = last_game['players']['white']['user']['name']
                    opp_name_hashed = hash(opp_name)

                stats['num_moves'] = len(last_game['analysis'])
                stats["opp_name_hashed"] = opp_name_hashed
                stats["opp_name"] = opp_name
                stats["maia_name"] = username
                # get winner of game
                if last_game['status'] == 'draw':
                    stats["winner"] = "draw"
                elif last_game['winner'] == stats["maia_color"]:
                    stats["winner"] = "maia"
                else:
                    stats["winner"] = "opp"
                # loop through other parameters
                for color in ['white','black']:
                    if color == 'white':
                        player = white_player
                    else:
                        player = black_player
                    print('     analysis found')
                    stats[player+"_rating"] = last_game['players'][color]['rating']
                    stats[player+"_acc"] = last_game['players'][color]['analysis']['accuracy']
                    stats[player+"_blunders"] = last_game['players'][color]['analysis']['blunder']
                    stats[player+"_mistakes"] = last_game['players'][color]['analysis']['mistake']
                    stats[player+"_acpl"] = last_game['players'][color]['analysis']['acpl']
                    stats[player+"_inacc"] = last_game['players'][color]['analysis']['inaccuracy']
                break                
                
        if data_found == 0:
            print("     lichess analysis could not be pulled for: ", self.game_id)
            title = 'game_logs/failed_analysis/no_Analysis_'+ self.li_account + '_' + self.game_id + '.txt'
            with open(title, "w") as file:
                file.write(self.game_id)
        # return 'stats' dict of analysis info          
        return(stats)
    
    def play_game(self):
        print(getTime(),":playing a game -> https://lichess.org/{}".format(self.game_id))
        while(self.playing):
            #stream game events from the lichess API
            for event in self.client.board.stream_game_state(self.game_id):
                try:
                    eventtype = event['type']

                    #append the current event to the events list along with a timestamp
                    self.events.append((int(time.time()),event))
                    if eventtype in ['gameFull','gameState']:

                        #determine the type of the received event (gameFull or gameState)
                        #if event type is 'gameFull':
                        if eventtype == 'gameFull':
                            #extract the state information from the 'gameFull' event
                            state = event['state']

                            #check if the game state is finished
                            if state["status"] != 'started':
                                #if the game has finished, print a message,
                                #update the playing flag, and exit the loop
                                print(getTime(),'finished game')
                                self.playing == False
                                return

                        #else, if the event type is 'gameState',
                        #directly use the event data as the game state
                        else:
                            state = event

                        #update the internal game state based on the received event
                        game_ended = self.get_game_update()

                        # check if the game has ended
                        if game_ended == 0:
                            return
                        else:
                            pass
                    
                        #if it is the our turn,
                        #calculate the number of moves and make a move
                        if self.my_turn:
                            self.num_moves = math.floor(len(state['moves'].split(' ')) / 2)
                            self.make_move() 
                        #If it is our opponent's turn and the treatment type is 1,
                        #perform game chat
                        else:
                            if self.treatment_type == 1:
                                self.do_game_chat()
                    elif eventtype == 'chatLine' and event['username'] == self.opponent:
                        print("sending chat text to object:",event['text'])
                        self.chat.opp_responded(event['text'])
                        

                #handle exceptions by logging the game and error details
                except Exception as err:
                    if self.exceptions <= 3:
                        err_msg = getTime(),':error:',err, ':event:',event 
                        logf = open("error_log.log", "a")
                        logf.write(str(err_msg))
                        logf.close()
                        self.exceptions += 1
                        print("numexceptions:", self.exceptions,"->", err_msg)
                        self.play_game()
                    else:
                        print("too many exceptions, bailing out")
                        return
        print("     play game has ended or disconnected")

class ChessChat():
    def __init__(self,type,bot_id):
        self.chat_types = ['ChatGPT']
        self.chat_treat_type = self.chat_types[random.randint(0,len(self.chat_types)-1)]
        self.script_df = pd.read_excel('script/ProtoScript.xlsx', sheet_name=self.chat_treat_type)
        self.opponent_has_responded = False
        if self.chat_treat_type == 'ChatGPT':
            self.chatgpt = ChatGPT_bot(bot_id)
            self.opp_resp_since_last_comment = False 

    def get_message(self,move_num):
         if self.chat_treat_type == 'ChatGPT' and self.opp_resp_since_last_comment:
            self.opp_resp_since_last_comment = False
            return self.chatgpt.get_chatgpt_response()
         else:
             return self.get_scripted_message(move_num)

    def get_scripted_message(self, move_num):
        comment = self.script_df[self.script_df.Move == move_num]['Comment']
        if len(comment) == 1:
            return comment.values[0]
        else:
            return ''
    def opp_responded(self, opp_chat_msg):
        self.opponent_has_responded = True
        if self.chat_treat_type == 'ChatGPT':
            print("about to call updated msgs")
            self.chatgpt.update_messages('user',opp_chat_msg)
            self.opp_resp_since_last_comment = True


class ChatGPT_bot():
    def __init__(self,bot_id):
        self.client = OpenAI(api_key=os.environ.get("OPENAI_API_KEY"))
        self.messages = [{"role": "system", "content": self.get_prompt(bot_id)}]

    def get_prompt(self, bot_id):
        path = f"script/{bot_id}_prompt.txt"
        with open(path) as f:
            prompt = f.read()

        return prompt.replace("\n"," ")

    def update_messages(self, role, message):
        if role == 'user':
            msg = dict(role='user',content=message)
        elif role == 'system':
            msg = dict(role='system',content=message)
        self.messages.append(msg)

    def get_chatgpt_response(self):
        completion = self.client.chat.completions.create(
            model="gpt-4",
            messages=self.messages
        )
        #get response.. remove ' character b/c gpt loves those, but aint nobody got time for that
        gpt_respn = completion.choices[0].message.content.replace("'", "").replace("ChatGPT: ","").replace("ChatGPT:","")
        self.update_messages("system",f"ChatGPT: {gpt_respn}")
        print("gptrespnse:",gpt_respn)
        return gpt_respn





# ######### future enhancements #####
#   1. chess chat that is relative to play, pieces left board, perhaps imbalance of pieces
#   2. maia more human
#   3. chat gpt integrations     

# MAIA MORE HUMAN
    #faster exchanges
    # faster in limited legal moves
    # faster closer to end time
    #longer thinks in complex positions?

# STOCKFISH EVAL

#CHAT GPT

#ENV VARIABLES instead of Json


#load session properties from the 'session_properties.json' file
with open('session_properties.json') as f:
    session_properties = json.load(f)

#check if the run_flag is set to true
#if it is...
if session_properties['run_flag']:
    #GET YOUR ACCOUNT SESSION TOKEN -> https://lichess.org/account/oauth/token

    #obtain the lichess API token from session properties
    token = session_properties['li_chess_token']
    berserk_session = berserk.TokenSession(token)
    berserk_client = berserk.Client(session=berserk_session)

    #extract additional session properties
    rated_games = session_properties['rated_games']
    session_length = session_properties['length_minutes']
    do_game_logging = session_properties['log_games']

    #print session details
    print("running a ",session_length," minute session playing rated games:",rated_games)

    #create an instance of the ExperimentSession class and run the session
    my_exp_session = ExperimentSession(token,session_length,rated_games, do_game_logging)
    my_exp_session.run()

#if run_flag set to false...
else:
    print('set the run flag to true,yea dummy')

In [8]:
#testing gpt block

bot = ChatGPT_bot("ucb_123")
bot.update_messages("user","im good you?")
response = bot.get_chatgpt_response()



gptrespnse: Doin great, thanks! Just playin some chess here. Ever tried pineapple pizza?


In [10]:
bot.update_messages("user","can  you stop talking please?")
response = bot.get_chatgpt_response()

gptrespnse: Oh sure things, no problems. Just outta curiosity, you a cat person too?
