In this Notebook we create and sketch tools for the Agent. Building proofs of concept (PoC)

### Setup

In [None]:
# Libraries

import os

In [None]:
# Local Modules


In [None]:
# import Whisper

import whisper

In [None]:
# Import Agent

os.sys.path.append("../src")
os.sys.path.append("../src/agents")

import react  # My AI assistant

In [None]:
# DEBUG

os.sys.path.append("../src/tools")

In [None]:

# Import GAIA Questions
from datasets import load_dataset


In [None]:

import pandas as pd


In [None]:
from dotenv import load_dotenv
from huggingface_hub import snapshot_download, login

In [None]:
# Load Hugging face credentials

#load_dotenv()
#login(os.getenv(key="HF_TOKEN_CHAPPIE"))  # Replace with your hf api key name


In [None]:
#gaia_questions_path = snapshot_download(repo_id="gaia-benchmark/GAIA", repo_type="dataset")
#gaia_questions = load_dataset(path="gaia-benchmark/GAIA", name="2023_level1")

In [None]:
gaia_index_dir = "../../../.cache/huggingface/datasets/gaia-benchmark___gaia/2023_level1/0.0.1/ec492fe4320ee795b1aed6bb46229c5f693226b0f1316347501c24b4baeee005"
#gaia_index_dir = os.path.abspath(gaia_index_dir)
gaia_data_path = os.path.join(gaia_index_dir, "gaia-validation.arrow")

In [None]:
# Temporal cell: wrote because of hugging face api calls limit

import pyarrow.ipc as ipc

with open(gaia_data_path, "rb") as f:
    reader = ipc.RecordBatchStreamReader(f)
    table = reader.read_all()

gaia_df = table.to_pandas()


In [None]:
#gaia_questions = gaia_questions["validation"]  # Filter for dev purposes
#gaia_df = pd.DataFrame(gaia_questions)

In [None]:
gaia_df.head()

In [None]:
# DEL
# temp filter to get chess images
gaia_df[gaia_df["file_path"].map(lambda f: f.endswith("44.png"))]

---

In [None]:
filetypes = {d[1].file_path.split(".")[-1] for d in gaia_df.iterrows()}
filetypes

### Read Historical XPs

In [None]:
xp_paths = "../data/agent_experiments/iterations/"

In [None]:
# Index all XPs
import os
import pandas as pd

xp_dir = "../data/agent_experiments/iterations"
xp_list = []
# FIXME: from the 10th XP iteration, the sorted method is unuseful

for i, xp_path in enumerate(sorted(os.listdir(xp_paths))):  
    xp_path = os.path.join(xp_dir, xp_path)
    temp_xp_df = pd.read_csv(xp_path)
    xp_list.append((i, temp_xp_df))
    del temp_xp_df 

In [None]:
# DEL
xp[xp["Question"] == xp_question][["is_correct"]]

In [None]:
# Join latest XP result to each question

index = gaia_df.index.tolist()  # Index for all the questions
questions = gaia_df["Question"].tolist()
answers = list()


def filter_condition(xp_data: list) -> bool:
    i, xp = xp_data
    for question in questions:
        xp_addressed_questions = xp["Question"].tolist()
        if question in xp_addressed_questions:
            return True
    return False

filtered_xps = list(filter(filter_condition, xp_list))
filtered_xps_reversed = filtered_xps
filtered_xps_reversed.reverse()

# Join latest answer result (i.e. result of the latest xp)

# Join latest answer result (i.e. result of the latest xp)

for question in questions:
    answer_result = 0  # Assume wrong answer by default
    for i, xp in filtered_xps_reversed:
        for xp_question in xp["Question"].tolist():
            
            if question == xp_question:
                answer_result_row = list(xp[xp["Question"] == xp_question]["is_correct"])#.loc[0]
                answer_result_temp = answer_result_row[0]                    
                if answer_result_temp >= answer_result:
                    answer_result = answer_result_temp
        del xp
    answers.append(answer_result)

# Join answers
historical_xp_results = gaia_df.copy()
historical_xp_results["is_correct"] = pd.Series(answers)
del answers, questions

In [None]:
historical_xp_results

In [None]:
historical_xp_results.is_correct.mean()

Let's study first which are the most common wrong tasks. I.e. How can we increase accuracy with a single next step (e.g. implement a new tool, modify sys message, etc.)

In [None]:
wrong_ans_df = historical_xp_results[historical_xp_results["is_correct"]==0]
wrong_ans_df["fp_extension"] = wrong_ans_df["file_path"].map(lambda path: path.split(".")[-1])
wrong_ans_df

In [None]:
# Summary
wrong_ans_df.groupby("fp_extension")["is_correct"].count()

The vast majority of remaining tasks do not include files to read. So we sould study them at first

In [None]:
wrong_ans_df_no_extension = wrong_ans_df[wrong_ans_df["file_path"].map(lambda fp: len(fp)==0)]

In [None]:
wrong_ans_df_no_extension

### Identify Image-like tasks

In [None]:
img_filetypes = ('png', 'jpg')

In [None]:
image_tasks_df = gaia_df[gaia_df["file_path"].apply(lambda row: row.split(".")[-1] in img_filetypes)]
image_tasks_df

In [None]:
image_tasks_df.iloc[0]

In [None]:
chess_task = image_tasks_df.iloc[0]

In [None]:
os.path.isfile(chess_task.file_path)

In [None]:
import cv2
import matplotlib.pyplot as plt

image_path = chess_task.file_path
image_bgr = cv2.imread(image_path)

image_rgb = cv2.cvtColor(image_bgr, cv2.COLOR_BGR2RGB)

plt.imshow(image_rgb)
plt.title("Chess Image")
plt.axis("off")
plt.show()


### Chess Tool

In this section, our methodology is to 

1. Use a CV model to get the FEN position of a 2D chess image
2. Usea another model (likely stockfish) to predict the best move from a FEN position

#### Tool POC

To extract the FEN position from the board image, we used [board_to_fen](https://github.com/mcdominik/board_to_fen)

In [None]:
import cairosvg
from io import BytesIO
import numpy as np

In [None]:
from PIL import Image
from board_to_fen.predict import get_fen_from_image

img = Image.open(chess_task.file_path)
fen = get_fen_from_image(img, black_view=True)

In [None]:
fen

We observed this position has no enough information about the current position, so we need to post process it:

In [None]:
fen_cleaned = fen + " b - - 0 1"

In [None]:
fen_cleaned

Looking forward, we will need to a warn the AI agent to post-process the FEN before passing to stockfish. However, we can validate the prected FEN 

In [None]:
# try out with pychess

import chess as c
import chess.engine as ce

board = c.Board(fen_cleaned)

In [None]:
board

In [None]:
svg_data = c.svg.board(board=board, flipped=True)
png_data = BytesIO()
cairosvg.svg2png(bytestring=svg_data.encode('utf-8'), write_to=png_data)
png_data.seek(0)
img_array = np.asarray(bytearray(png_data.read()), dtype=np.uint8)
img = cv2.imdecode(img_array, cv2.COLOR_BGR2RGB)

In [None]:
fig, ax = plt.subplots(nrows=1, ncols=2, figsize=(25, 10))

ax[0].imshow(cv2.imread(chess_task.file_path))
ax[0].axis("off")
ax[0].set_title("Original Chess Board", fontsize=18)

ax[1].imshow(img)
ax[1].axis("off")
ax[1].set_title(f"Predicted Chess Board\nFEN: {fen_cleaned}", fontsize=18)
plt.show()

Once we extracted the FEN chess position, we pass it to Stockfish in order to get the best next move

In [None]:
# Import Stockfish 
stockfish_path = "../external/chess-engines/stockfish/stockfish-ubuntu-x86-64-avx2"
chess_engine = ce.SimpleEngine.popen_uci(stockfish_path)

In [None]:
# Predict next best move 
stockfish_player = chess_engine.play(board=board, limit=ce.Limit(time=10))
best_move_uci = stockfish_player.move
best_move = board.san(move=best_move_uci) # Convert from Universal Chess interface to Standard Algebraic Notation
best_move

In [None]:
print(
    "Results\n",
    "=" * 30,
    f"\nPredicted best next move: {best_move}",
    f"\nCorrect Answer: {chess_task['Final answer']}"
    f"\nIs the prediction correct?: {'yes' if best_move==chess_task['Final answer'] else 'no'}"
    )

There we go! our model finds the correct answer

As a bonus, we might consider the case the board view is set from black or white pieces' perspective, so it worths considering to extract text from the board image

In [None]:
import numpy as np
import cv2 
import matplotlib.pyplot as plt

chess_img_path = "/home/santiagoal/.cache/huggingface/hub/datasets--gaia-benchmark--GAIA/snapshots/897f2dfbb5c952b5c3c1509e648381f9c7b70316/2023/validation/cca530fc-4052-43b2-b130-b30968d8aa44.png"
cropped_chess_board_path = "../data/images/cropped_chess_board.png"

Our method is to crop the bottom-left chess square and extract the text. 
1. If the text is a1 -> The board has white view
2. If the text is h8 -> The board has black view

In [None]:
# Read Chess Board Img and Crop the bottom-left 
img_bgr = cv2.imread(chess_img_path)
img_bgr = cv2.imread(chess_img_path)
img_rgb = cv2.cvtColor(img_bgr, cv2.COLOR_BGR2RGB)

img_np = np.array(img_rgb)
img_np_shape = img_np.shape
img_rows, img_cols = img_np_shape[:2]
img_cropped = img_np[img_rows // 8 * 7:, :img_cols // 8 * 1, :]

In [None]:
fig, ax = plt.subplots(ncols=2, nrows=1, figsize=(20, 10))

ax[0].imshow(img_rgb)
ax[1].imshow(img_cropped)

ax[0].set_title("Original Image")
ax[1].set_title("Image Cropped")

ax[0].axis("off")
ax[1].axis("off")

fig.suptitle("Crop image to identify board orientation", y=0.92, fontsize=15)

Once we crop the board image, we might streamline a simple OCR model to extract the board text

In [None]:
import easyocr

reader = easyocr.Reader(['en'], gpu=False)  

results = reader.readtext(cropped_chess_board_path)
results.reverse()

# Grab text
left_bottom_cell = ""
for _, text_temp, _ in results:
    left_bottom_cell += text_temp
    
print(f"The left-bottom chess board square is {left_bottom_cell}")

---

Let's study where does our current Agent fail, especially on audio-like questions

In [None]:
run = False

In [None]:
gaia_df