In [None]:
# This Python 3 environment comes with many helpful analytics libraries installed
# It is defined by the kaggle/python Docker image: https://github.com/kaggle/docker-python
# For example, here's several helpful packages to load

import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)

# Input data files are available in the read-only "../input/" directory
# For example, running this (by clicking run or pressing Shift+Enter) will list all files under the input directory

import os
for dirname, _, filenames in os.walk('/kaggle/input'):
    for filename in filenames:
        print(os.path.join(dirname, filename))

# You can write up to 20GB to the current directory (/kaggle/working/) that gets preserved as output when you create a version using "Save & Run All" 
# You can also write temporary files to /kaggle/temp/, but they won't be saved outside of the current session

In [None]:
!pip install zstandard python-chess

#### Getting Data
Decompressiing `.zst` compressed PGN (Portable Game Notation) file containing chess game, and read the first games from it.


In [None]:
import zstandard as zstd
import chess.pgn
import io

# Define input file path
input_path = "/kaggle/input/lichess-data-february-standard-rated-2025/lichess_db_standard_rated_2025-02.pgn.zst"

# Open the compressed file and stream decompress
with open(input_path, 'rb') as compressed_file:
    dctx = zstd.ZstdDecompressor()
    with dctx.stream_reader(compressed_file) as reader:
        # Wrap decompressed stream with TextIOWrapper to behave like a file
        text_stream = io.TextIOWrapper(reader, encoding='utf-8')

        # Read the first game
        game = chess.pgn.read_game(text_stream)

        if game:
            print(game.headers)  # Print game metadata
        else:
            print("No games found in the PGN file!")


### Creating Dataframe 

This code extracts and loads chess game data from a compressed PGN file into a Pandas DataFrame, including player names, game result, opening, and moves. The data is ready for **Exploratory Data Analysis (EDA)**, where you can analyze player performance, popular openings, and game outcomes. Visualizations and statistical analysis can further explore trends in game results and move sequences.


In [None]:
import pandas as pd

games_data = []
game_count = 0
max_games = 10000  # Change this to read more games

with open(input_path, 'rb') as compressed_file:
    dctx = zstd.ZstdDecompressor()
    with dctx.stream_reader(compressed_file) as reader:
        text_stream = io.TextIOWrapper(reader, encoding='utf-8')

        while game_count < max_games:
            game = chess.pgn.read_game(text_stream)
            if game is None:
                break  # No more games

            games_data.append({
                "White": game.headers.get("White", ""),
                "Black": game.headers.get("Black", ""),
                "Result": game.headers.get("Result", ""),
                "ECO": game.headers.get("ECO", ""),
                "Opening": game.headers.get("Opening", ""),
                "Moves": " ".join(str(move) for move in game.mainline_moves())  # Convert moves to string
            })
            game_count += 1

print(f"Loaded {len(games_data)} games into DataFrame.")

# Convert to DataFrame and display
df = pd.DataFrame(games_data)



## Inspecting Dataframe


In [None]:
df.head(100)


In [None]:
df.tail(100)

In [None]:
df.info()

### Data Cleaning Process

1. **Unknown Openings**:
   - Counts and prints the number of games with unknown openings (`"?"` in the "Opening" column).


In [None]:
unknown_openings_count = df[df["Opening"] == "?"].shape[0]
print(f"Games with unknown openings: {unknown_openings_count}")


2. **Short Move Games**:
   - Counts and prints the number of games with fewer than 12 moves.


In [None]:
short_move_games_count = df[df["Moves"].apply(lambda x: len(x.split()) < 12)].shape[0]
print(f"Games with less than 12 moves: {short_move_games_count}")


In [None]:
df_copy = df.copy()
len(df_copy)


 **Data Cleaning**:
   - **Remove Unknown Openings**: Drops rows where the "Opening" column is `"?"`.
   - **Remove Short Games**: Drops games with fewer than 12 moves.


In [None]:
# Drop rows with unknown openings
df_clean = df_copy[df_copy["Opening"] != "?"]

# Drop games with less than 10 moves (e.g. weird games or aborted)
df_clean = df_clean[df_clean["Moves"].apply(lambda x: len(x.split()) >= 12)]

print(f"Remaining games after cleaning: {len(df_clean)}")



3. **Result Conversion**:
   - Converts the game result into binary values: `1` for a White win, `1` for a Black win, and `0` for a draw.


In [None]:
def result_to_binary(result):
    if result == "1-0":
        return 1, 0
    elif result == "0-1":
        return 0, 1
    else:  # "1/2-1/2" or others
        return 0, 0

df_clean["White_Win"], df_clean["Black_Win"] = zip(*df_clean["Result"].map(result_to_binary))

# Quick check
print(df_clean[["Result", "White_Win", "Black_Win"]].head())



5. **Final Data**:
   - The cleaned DataFrame is stored in `df_clean` with new columns for `White_Win` and `Black_Win`

In [None]:
df_clean.head()

#### The `split_moves` function splits a sequence of chess moves into separate lists for White and Black, based on even and odd indices.


In [None]:
def split_moves(moves_sequence):
    """Splits a sequence of chess moves into separate lists for White and Black."""
    moves = moves_sequence.split()  # Split by spaces
    white_moves = moves[0::2]  # Even indices (White's moves)
    black_moves = moves[1::2]  # Odd indices (Black's moves)
    return white_moves, black_moves




In [None]:

# Apply function to split moves
df_clean["White Moves"], df_clean["Black Moves"] = zip(*df_clean["Moves"].apply(split_moves))

# Display the transformed data
print(df_clean)

## Preparation for Tensors

In [None]:
df_clean["MoveList"] = df_clean.apply(lambda row: [mv for pair in zip(row["White Moves"], row["Black Moves"]) for mv in pair if mv], axis=1)
df_clean.drop("Moves",axis=1, inplace=True)


In [None]:
from sklearn.preprocessing import LabelEncoder
import torch

# Extract move sequences
all_move_seqs = df_clean["MoveList"].tolist()

# Flatten to get full move vocabulary
all_moves = [move for seq in all_move_seqs for move in seq]

# Encode moves
encoder = LabelEncoder()
encoder.fit(all_moves)

# Transform move sequences
encoded_seqs = [encoder.transform(seq) for seq in all_move_seqs]

# Sliding window dataset
seq_len = 10  # First 10 to predict the 11th
X, y = [], []
for seq in encoded_seqs:
    if len(seq) <= seq_len:
        continue
    for i in range(seq_len, len(seq)):
        X.append(seq[i - seq_len:i])
        y.append(seq[i])


# Creating a tensor from a list of numpy.ndarrays is extremely slow
X_np = np.array(X)
y_np = np.array(y)

# Final tensors
X_tensor = torch.tensor(X_np, dtype=torch.long)
y_tensor = torch.tensor(y_np, dtype=torch.long)


In [None]:
print("X shape:", X_tensor.shape)
print("y shape:", y_tensor.shape)

print("X dtype:", X_tensor.dtype)
print("y dtype:", y_tensor.dtype)


Print first few samples

In [None]:
print("First input sequence (token IDs):", X_tensor[0])
print("First target move (token ID):", y_tensor[0])


If we want to decode back to UCI move for sanity Check

In [None]:
decoded_input = encoder.inverse_transform(X_tensor[0].numpy())
decoded_target = encoder.inverse_transform([y_tensor[0].item()])

print("Decoded input:", decoded_input)
print("Decoded target:", decoded_target[0])


Value Ranges

In [None]:
print("Min token ID:", X_tensor.min().item())
print("Max token ID:", X_tensor.max().item())
print("Vocabulary size:", len(encoder.classes_))  # Total unique moves


### Distribution of Targets and Labels

In [None]:
import matplotlib.pyplot as plt

unique, counts = torch.unique(y_tensor, return_counts=True)
plt.bar(unique.numpy(), counts.numpy())
plt.title("Distribution of Target Move Tokens")
plt.xlabel("Move Token ID")
plt.ylabel("Frequency")
plt.show()


### Building our first Model

In [None]:
import torch.nn as nn

class MoveRNN(nn.Module):
    def __init__(self, vocab_size, embed_dim=128, hidden_dim=256):
        super(MoveRNN, self).__init__()
        self.embedding = nn.Embedding(vocab_size, embed_dim)
        self.lstm = nn.LSTM(embed_dim, hidden_dim, batch_first=True)
        self.fc = nn.Linear(hidden_dim, vocab_size)

    def forward(self, x):
        # x: [batch_size, seq_len]
        x = self.embedding(x)           # -> [batch, seq_len, embed_dim]
        output, _ = self.lstm(x)        # -> [batch, seq_len, hidden_dim]
        last_hidden = output[:, -1, :]  # -> [batch, hidden_dim]
        return self.fc(last_hidden)     # -> [batch, vocab_size]


### Training Loop

In [None]:
from torch.utils.data import TensorDataset, DataLoader
import torch.optim as optim

# Prepare DataLoader
batch_size = 64
dataset = TensorDataset(X_tensor, y_tensor)
dataloader = DataLoader(dataset, batch_size=batch_size, shuffle=True)

# Model init
vocab_size = len(encoder.classes_)
model = MoveRNN(vocab_size)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)

# Loss & Optimizer
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

# Training loop
epochs = 10
for epoch in range(epochs):
    model.train()
    total_loss = 0
    for X_batch, y_batch in dataloader:
        X_batch, y_batch = X_batch.to(device), y_batch.to(device)

        optimizer.zero_grad()
        output = model(X_batch)              # [batch, vocab_size]
        loss = criterion(output, y_batch)    # classification loss
        loss.backward()
        optimizer.step()
        total_loss += loss.item()

    print(f"Epoch {epoch+1}/{epochs}, Loss: {total_loss:.4f}")


 ### Quick Evaluation after Training

In [None]:
model.eval()
with torch.no_grad():
    sample_input = X_tensor[0].unsqueeze(0).to(device)  # [1, 10]
    prediction = model(sample_input)                    # [1, vocab_size]
    predicted_index = prediction.argmax(dim=1).item()
    predicted_move = encoder.inverse_transform([predicted_index])[0]

    print("Input Moves:", encoder.inverse_transform(X_tensor[0].numpy()))
    print("Target Move:", encoder.inverse_transform([y_tensor[0].item()])[0])
    print("Predicted Move:", predicted_move)


So our model predicts moves. However these moves are not legal moves. Hence we need to 

In [None]:
import chess

# Reconstruct board from input moves
input_moves = encoder.inverse_transform(X_tensor[0].numpy())
board = chess.Board()
for move in input_moves:
    try:
        board.push_san(move)
    except:
        print(f"Illegal move in input: {move}")
        break

# Get all legal UCI moves
legal_uci = [move.uci() for move in board.legal_moves]
legal_tokens = encoder.transform([m for m in legal_uci if m in encoder.classes_])

# Mask prediction output
with torch.no_grad():
    sample_input = X_tensor[0].unsqueeze(0).to(device)
    logits = model(sample_input)[0]  # [vocab_size]
    logits_filtered = logits[legal_tokens]
    top_index = legal_tokens[logits_filtered.argmax().item()]
    predicted_move = encoder.inverse_transform([top_index])[0]

    print("Input Moves:", encoder.inverse_transform(X_tensor[0].numpy()))
    print("Target Move:", encoder.inverse_transform([y_tensor[0].item()])[0])
    print("Predicted Move:", predicted_move)


**Summary**
The code above, is now able to predict moves based on first 10 moves. And these moves are legal moves. GOOD 
However we need to do reinforcement learning so that it predicts moves that are actually good. Not based on the data it was trained on.