<a href="https://colab.research.google.com/github/sean-halpin/chess_website/blob/models_init/models/Chess.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Download Lichess Puzzles

In [None]:
!pip install zstandard

In [None]:
import requests
import zstandard
import csv
import io

url = "https://database.lichess.org/lichess_db_puzzle.csv.zst"
compressed_file_path = "lichess_db_puzzle.csv.zst"
output_csv_path = "lichess_db_puzzle.csv"

# Download the Zstandard compressed file
response = requests.get(url)
if response.status_code == 200:
    with open(compressed_file_path, 'wb') as file:
        file.write(response.content)
    print(f"File downloaded successfully to {compressed_file_path}")
else:
    print(f"Failed to download the file. Status code: {response.status_code}")

# Extract the Zstandard compressed file to CSV
with open(compressed_file_path, 'rb') as compressed_file:
    dctx = zstandard.ZstdDecompressor()
    with dctx.stream_reader(compressed_file) as reader:
        with io.TextIOWrapper(reader, encoding='utf-8') as text_reader:
            with open(output_csv_path, 'w', newline='', encoding='utf-8') as output_csv:
                writer = csv.writer(output_csv)

                for line in text_reader:
                    decoded_line = line.strip()
                    csv_row = decoded_line.split(',')
                    writer.writerow(csv_row)

print(f"File extracted successfully to {output_csv_path}")


# Load Puzzle Data

In [None]:
import pandas as pd

# Specify the path to your CSV file
csv_file_path = "lichess_db_puzzle.csv"

# Use read_csv to load the data into a DataFrame
df = pd.read_csv(csv_file_path, sep=",")

# df_description = df.describe()

# # Print the summary
# print(df_description)

# Explore Data

In [None]:
# Display the first few rows of the DataFrame to verify the data has been loaded
pd.set_option('display.max_rows', None)
print(df.dtypes)
df.head(100)

In [None]:
print(df.dtypes)
df.describe()

In [None]:
pd.set_option('display.max_columns', None)

In [None]:
print(df.iloc[84]['FEN'])
print(df.iloc[84]['Moves'])

In [None]:
# Select only the FEN and moves columns
selected_columns = ["FEN", "Moves"]
df_subset = df[selected_columns]

In [None]:
df_subset.head()

In [None]:
df_subset.iloc[84]

In [None]:
!pip install python-chess

In [None]:
def move_to_index(move):
    """
    Convert chess move (e.g., "e2e4") to index number based on the specified mapping.
    """
    file_mapping = {'a': 0, 'b': 1, 'c': 2, 'd': 3, 'e': 4, 'f': 5, 'g': 6, 'h': 7}
    rank_mapping = {str(i+1): i * 8 for i in range(8)}

    start_square = move[:2]
    end_square = move[2:4]

    start_index = file_mapping[start_square[0]] + rank_mapping[start_square[1]]
    end_index = file_mapping[end_square[0]] + rank_mapping[end_square[1]]

    return start_index, end_index


move_to_index("e2e4")

In [None]:
def chess_move_one_hot_encoding(chess_move):
    # Create an empty dictionary to map squares to column names
    square_columns = {}
    columns = []

    # Define the columns based on chess board squares
    for file in 'abcdefgh':
        for rank in '12345678':
            square = file + rank
            from_col_name = 'from_' + square
            to_col_name = 'to_' + square
            columns.extend([from_col_name, to_col_name])
            square_columns[square] = (from_col_name, to_col_name)

    # Initialize a dictionary to store the one-hot encoded values
    encoding = {col: 0 for col in columns}

    # Extract the "from" and "to" squares from the chess move
    from_square = chess_move[:2]
    to_square = chess_move[2:4]

    # Update the corresponding columns to 1
    encoding[square_columns[from_square][0]] = 1
    encoding[square_columns[to_square][1]] = 1

    return encoding

# Example usage:
chess_move = "e2e4"
one_hot_encoding = chess_move_one_hot_encoding(chess_move)
print(one_hot_encoding)
print(list(one_hot_encoding.values()))


In [None]:
import chess

# Iterate through each row in the DataFrame
for index, row in df_subset.head(1).iterrows():
    # Get the FEN string from the current row
    fen_string = row["FEN"]

    # Create a chess.Board object from the FEN string
    board = chess.Board(fen_string)

    for move in row["Moves"].split(" "):
      # Print the board state and moves
      print(move, "\n")
      print(f"Board state for puzzle {index + 1} (FEN: {board.board_fen()}):")
      print(board)
      print("Moves:", row["Moves"])
      print("\n" + "=" * 30 + "\n")
      # Extract and print the piece at each square
      i=0
      for square in chess.SQUARES:
          piece = board.piece_at(square)
          print(f"{i} - Square {chess.square_name(square)}: {piece}")
          i+=1
      print(move_to_index(move))
      m = chess.Move.from_uci(move)
      board.push(m)  # Make the move
      break


In [None]:
board

In [None]:
print(dir(board.piece_at(15)))
print(board.piece_at(15).piece_type)
print(board.piece_at(22).color)
print(board.piece_at(22).symbol())

In [None]:
print(board.piece_at(44).__str__())
print(board.piece_at(0).__str__())

# Prepare Data

In [None]:
sample_count = 30000
sampled_df = df_subset.sample(n=sample_count, random_state=42)

In [None]:
# Create an empty list to store row data
rows_data = []

i=0
# Iterate through each row in the DataFrame
for index, row in sampled_df.iterrows():
    # Get the FEN string from the current row
    fen_string = row["FEN"]

    # Create a chess.Board object from the FEN string
    try:
        board = chess.Board(fen_string)
    except:
        print("An exception occurred")
        continue

    # Not Good moves
    nextMove = row["Moves"].split(" ")[0]
    if board.legal_moves.count() > 2:
      for move in list(board.legal_moves)[:2]:
          # Extract the piece at each square and append to row_data
          move = move.uci()
          if nextMove != move:
            row_data = [board.piece_at(square).__str__() for square in chess.SQUARES]

            # Append the move index to row_data
            row_data.extend(chess_move_one_hot_encoding(move).values())
            # not good move
            row_data.extend(list([0]))
            # Append the row_data to the list
            rows_data.append(row_data)

    # Good moves
    for move in row["Moves"].split(" "):
        # Extract the piece at each square and append to row_data
        row_data = [board.piece_at(square).__str__() for square in chess.SQUARES]

        # Append the move index to row_data
        row_data.extend(chess_move_one_hot_encoding(move).values())
        # good move
        row_data.extend(list([1]))
        # Make the move on the board
        m = chess.Move.from_uci(move)
        board.push(m)
        # Append the row_data to the list
        rows_data.append(row_data)
    i+=1
    if i % 5000 == 0:
      print(f"{i} of {sample_count}")

# Create the result DataFrame in a single step
columns = [f"Square_{i}" for i in range(64)] + list(chess_move_one_hot_encoding("a2a4").keys()) + list(["good_move"])
print(columns)
result_df = pd.DataFrame(rows_data, columns=columns)

In [None]:
move

In [None]:
result_df.head(4)

In [None]:
sampled_df.head(1)

In [None]:
result_df.dtypes

# Create Model

In [None]:
!pip install torch pandas scikit-learn

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder
import pandas as pd


temp_df = result_df
# Replace 'Square_' columns with numerical labels
label_encoder = LabelEncoder()
for column in temp_df.columns:
    if column.startswith('Square_'):
        temp_df[column] = label_encoder.fit_transform(temp_df[column])

# Convert 'good_move' column to tensor
labels = torch.tensor(temp_df['good_move'].values, dtype=torch.float32)

# Drop unnecessary columns (e.g., 'good_move') for input features
features = temp_df.drop(columns=['good_move'])

# Convert the remaining DataFrame to tensor
features = torch.tensor(features.values, dtype=torch.float32)

# Split the data into training and validation sets
X_train, X_val, y_train, y_val = train_test_split(features, labels, test_size=0.2, random_state=42)

# Define the PyTorch dataset
class ChessDataset(Dataset):
    def __init__(self, features, labels):
        self.features = features
        self.labels = labels

    def __len__(self):
        return len(self.features)

    def __getitem__(self, idx):
        return self.features[idx], self.labels[idx]

# Create DataLoader instances
train_dataset = ChessDataset(X_train, y_train)
train_loader = DataLoader(train_dataset, batch_size=64, shuffle=True)

val_dataset = ChessDataset(X_val, y_val)
val_loader = DataLoader(val_dataset, batch_size=64, shuffle=False)

# Define a simple neural network model
class ChessModel(nn.Module):
    def __init__(self, input_size):
        super(ChessModel, self).__init__()
        self.fc1 = nn.Linear(input_size, 128)
        self.relu = nn.ReLU()
        self.fc2 = nn.Linear(128, 1)
        self.sigmoid = nn.Sigmoid()

    def forward(self, x):
        x = self.fc1(x)
        x = self.relu(x)
        x = self.fc2(x)
        x = self.sigmoid(x)
        return x

# Instantiate the model and move it to GPU if available
model = ChessModel(input_size=features.size(1))
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)

# Define loss function and optimizer
criterion = nn.BCELoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

# Training loop
num_epochs = 100

for epoch in range(num_epochs):
    model.train()
    for inputs, labels in train_loader:
        inputs, labels = inputs.to(device), labels.to(device)

        optimizer.zero_grad()
        outputs = model(inputs)
        loss = criterion(outputs.squeeze(), labels)
        loss.backward()
        optimizer.step()

    # Validation
    model.eval()
    with torch.no_grad():
        val_loss = 0.0
        correct = 0
        total = 0
        for inputs, labels in val_loader:
            inputs, labels = inputs.to(device), labels.to(device)

            outputs = model(inputs)
            loss = criterion(outputs.squeeze(), labels)
            val_loss += loss.item()

            predicted = (outputs > 0.5).float()
            correct += (predicted == labels).sum().item()
            total += labels.size(0)

        accuracy = correct / total
        print(f"Epoch {epoch+1}/{num_epochs}, Loss: {val_loss/len(val_loader)}, Accuracy: {accuracy}")
