## Converting the data set Pyspark!

In [1]:
import chess
import chess.pgn
from pyspark.sql import SparkSession, Row
from pyspark.sql.types import StructType, StructField, IntegerType, StringType
from pyspark.sql.functions import col, split, when, substring, lit
from pyspark.sql import functions as F

import pandas as pd

# Start a new SparkSession
spark = SparkSession.builder \
    .appName("ChessDataProcessing") \
    .config("spark.executor.cores", "4") \
    .config("spark.driver.memory", "64g") \
    .config("spark.executor.memory", "64g") \
    .config("spark.sql.shuffle.partitions", "100") \
    .getOrCreate()

spark.conf.set("spark.sql.codegen.wholeStage", "false")

spark.conf.set("spark.sql.codegen.maxFields", "2000") 




Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/11/27 16:00:00 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
24/11/27 16:00:01 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


## Our data

# Run this

In [2]:
# Starting by first converting starting the pieces at their initial positions.
# Every game begins like this
 
initial_positions = {
    "Move": 0,
    "White_Rook_1": "a1",
    "White_Rook_2": "h1",
    "White_Knight_1": "b1",
    "White_Knight_2": "g1",
    "White_Bishop_1": "c1",
    "White_Bishop_2": "f1",
    "White_Queen_1": "d1",
    "White_King_1": "e1",
    "White_Pawn_1": "a2",
    "White_Pawn_2": "b2",
    "White_Pawn_3": "c2",
    "White_Pawn_4": "d2",
    "White_Pawn_5": "e2",
    "White_Pawn_6": "f2",
    "White_Pawn_7": "g2",
    "White_Pawn_8": "h2",
    "Black_Rook_1": "a8",
    "Black_Rook_2": "h8",
    "Black_Knight_1": "b8",
    "Black_Knight_2": "g8",
    "Black_Bishop_1": "c8",
    "Black_Bishop_2": "f8",
    "Black_Queen_1": "d8",
    "Black_King_1": "e8",
    "Black_Pawn_1": "a7",
    "Black_Pawn_2": "b7",
    "Black_Pawn_3": "c7",
    "Black_Pawn_4": "d7",
    "Black_Pawn_5": "e7",
    "Black_Pawn_6": "f7",
    "Black_Pawn_7": "g7",
    "Black_Pawn_8": "h7",
}

df = pd.DataFrame([initial_positions])



In [3]:
# Saving and or upload combined games data...
file_path_saved_games = '/scratch/zrc3hc/combined_games.csv'
#combined_games_df.write.csv(file_path, header=True, mode="overwrite")
combined_games_df = spark.read.csv(file_path_saved_games, header=True, inferSchema=True)


                                                                                

In [4]:
combined_games_df.filter(combined_games_df.game_id == 10).show(10)

+----+-------+---------+------+
|Move|game_id|next_move|result|
+----+-------+---------+------+
|   0|     10|     e2e4|   1-0|
|   1|     10|     e7e5|   1-0|
|   2|     10|     g1f3|   1-0|
|   3|     10|     g8f6|   1-0|
|   4|     10|     d2d4|   1-0|
|   5|     10|     f6e4|   1-0|
|   6|     10|     d4e5|   1-0|
|   7|     10|     f8c5|   1-0|
|   8|     10|     f1c4|   1-0|
|   9|     10|     c5f2|   1-0|
+----+-------+---------+------+
only showing top 10 rows



# Run This

In [5]:
schema = StructType([
    StructField("Move", IntegerType(), True),
    StructField("White_Rook_1", StringType(), True),
    StructField("White_Rook_2", StringType(), True),
    StructField("White_Knight_1", StringType(), True),
    StructField("White_Knight_2", StringType(), True),
    StructField("White_Bishop_1", StringType(), True),
    StructField("White_Bishop_2", StringType(), True),
    StructField("White_Queen_1", StringType(), True),
    StructField("White_King_1", StringType(), True),
    StructField("White_Pawn_1", StringType(), True),
    StructField("White_Pawn_2", StringType(), True),
    StructField("White_Pawn_3", StringType(), True),
    StructField("White_Pawn_4", StringType(), True),
    StructField("White_Pawn_5", StringType(), True),
    StructField("White_Pawn_6", StringType(), True),
    StructField("White_Pawn_7", StringType(), True),
    StructField("White_Pawn_8", StringType(), True),
    StructField("Black_Rook_1", StringType(), True),
    StructField("Black_Rook_2", StringType(), True),
    StructField("Black_Knight_1", StringType(), True),
    StructField("Black_Knight_2", StringType(), True),
    StructField("Black_Bishop_1", StringType(), True),
    StructField("Black_Bishop_2", StringType(), True),
    StructField("Black_Queen_1", StringType(), True),
    StructField("Black_King_1", StringType(), True),
    StructField("Black_Pawn_1", StringType(), True),
    StructField("Black_Pawn_2", StringType(), True),
    StructField("Black_Pawn_3", StringType(), True),
    StructField("Black_Pawn_4", StringType(), True),
    StructField("Black_Pawn_5", StringType(), True),
    StructField("Black_Pawn_6", StringType(), True),
    StructField("Black_Pawn_7", StringType(), True),
    StructField("Black_Pawn_8", StringType(), True),
])

In [6]:
schema1 = StructType([
    StructField("Move", IntegerType(), True),
    StructField("White_Rook_1", StringType(), True),
    StructField("White_Rook_2", StringType(), True),
    StructField("White_Knight_1", StringType(), True),
    StructField("White_Knight_2", StringType(), True),
    StructField("White_Bishop_1", StringType(), True),
    StructField("White_Bishop_2", StringType(), True),
    StructField("White_Queen_1", StringType(), True),
    StructField("White_King_1", StringType(), True),
    StructField("White_Pawn_1", StringType(), True),
    StructField("White_Pawn_2", StringType(), True),
    StructField("White_Pawn_3", StringType(), True),
    StructField("White_Pawn_4", StringType(), True),
    StructField("White_Pawn_5", StringType(), True),
    StructField("White_Pawn_6", StringType(), True),
    StructField("White_Pawn_7", StringType(), True),
    StructField("White_Pawn_8", StringType(), True),
    StructField("Black_Rook_1", StringType(), True),
    StructField("Black_Rook_2", StringType(), True),
    StructField("Black_Knight_1", StringType(), True),
    StructField("Black_Knight_2", StringType(), True),
    StructField("Black_Bishop_1", StringType(), True),
    StructField("Black_Bishop_2", StringType(), True),
    StructField("Black_Queen_1", StringType(), True),
    StructField("Black_King_1", StringType(), True),
    StructField("Black_Pawn_1", StringType(), True),
    StructField("Black_Pawn_2", StringType(), True),
    StructField("Black_Pawn_3", StringType(), True),
    StructField("Black_Pawn_4", StringType(), True),
    StructField("Black_Pawn_5", StringType(), True),
    StructField("Black_Pawn_6", StringType(), True),
    StructField("Black_Pawn_7", StringType(), True),
    StructField("Black_Pawn_8", StringType(), True),
    StructField("game_id", StringType(), True),
    StructField("next_move", StringType(), True),
    StructField("result", IntegerType(), True),
])

In [13]:
# Specify the game ids! 
game_ids = [row["game_id"] for row in combined_games_df.select("game_id").distinct().collect()[2000:2500]]
len(game_ids)

500

In [8]:
df = spark.createDataFrame(df, schema=schema)

# It's here!

In [9]:
chess_squares = [
    "a1", "b1", "c1", "d1", "e1", "f1", "g1", "h1",
    "a2", "b2", "c2", "d2", "e2", "f2", "g2", "h2",
    "a3", "b3", "c3", "d3", "e3", "f3", "g3", "h3",
    "a4", "b4", "c4", "d4", "e4", "f4", "g4", "h4",
    "a5", "b5", "c5", "d5", "e5", "f5", "g5", "h5",
    "a6", "b6", "c6", "d6", "e6", "f6", "g6", "h6",
    "a7", "b7", "c7", "d7", "e7", "f7", "g7", "h7",
    "a8", "b8", "c8", "d8", "e8", "f8", "g8", "h8"
]

# Define the schema
schema3 = StructType([
    StructField("Move", IntegerType(), True),
    StructField("game_id", IntegerType(), True),
    StructField("next_move", StringType(), True),
    StructField("result", StringType(), True),
    *[
        StructField(square, IntegerType(), True)  # Each chess square is an integer field
        for square in chess_squares
    ]
])

In [10]:
def update_squares(row, piece, value, chess_squares):
    for square in chess_squares:
        if row[piece] == square:
            row[square] = value
    return row

99997

In [None]:
output_file = '/home/zrc3hc/Chess/1. Preprocessing/saved_games_final4.csv'

# Blank canvas for each game
df = pd.DataFrame([initial_positions])

df = spark.createDataFrame(df, schema=schema)

saved_games = spark.createDataFrame([], schema = schema3)

#game_ids = [row["game_id"] for row in combined_games_df.select("game_id").distinct().collect()]

saved_games_final = pd.DataFrame()


# initializing the board

piece_to_value = {
    "White_Rook_1": 5, "White_Rook_2": 5,
    "White_Knight_1": 3, "White_Knight_2": 3,
    "White_Bishop_1": 3, "White_Bishop_2": 3,
    "White_Queen_1": 9,
    "White_King_1": 10, 
    "White_Pawn_1": 1, "White_Pawn_2": 1, "White_Pawn_3": 1, "White_Pawn_4": 1,
    "White_Pawn_5": 1, "White_Pawn_6": 1, "White_Pawn_7": 1, "White_Pawn_8": 1,
    "Black_Rook_1": -5, "Black_Rook_2": -5,
    "Black_Knight_1": -3, "Black_Knight_2": -3,
    "Black_Bishop_1": -3, "Black_Bishop_2": -3,
    "Black_Queen_1": -9,
    "Black_King_1": -10, 
    "Black_Pawn_1": -1, "Black_Pawn_2": -1, "Black_Pawn_3": -1, "Black_Pawn_4": -1,
    "Black_Pawn_5": -1, "Black_Pawn_6": -1, "Black_Pawn_7": -1, "Black_Pawn_8": -1
}

chess_squares = [
    "a1", "b1", "c1", "d1", "e1", "f1", "g1", "h1",
    "a2", "b2", "c2", "d2", "e2", "f2", "g2", "h2",
    "a3", "b3", "c3", "d3", "e3", "f3", "g3", "h3",
    "a4", "b4", "c4", "d4", "e4", "f4", "g4", "h4",
    "a5", "b5", "c5", "d5", "e5", "f5", "g5", "h5",
    "a6", "b6", "c6", "d6", "e6", "f6", "g6", "h6",
    "a7", "b7", "c7", "d7", "e7", "f7", "g7", "h7",
    "a8", "b8", "c8", "d8", "e8", "f8", "g8", "h8"
]


 
saved_games = df_combined = spark.createDataFrame([], schema1)

for game_id in game_ids:
    
    current_game_info = combined_games_df.filter(combined_games_df.game_id == game_id)
    
    # Step 1. First, calculate which rows we will be saving (only the last 10 percent of each game...)

    next_moves = [row["next_move"] for row in current_game_info.select("next_move").collect()]

    total_moves = current_game_info.count()

    number_of_moves = int(total_moves * .05)

    moves_captured_range = range(total_moves - number_of_moves, total_moves)
    
    print(f"Processing Game ID: {game_id}, total moves: {total_moves}")

    # Step 2. Denote the previous row

    current_game = df.join(current_game_info, on="Move", how="left")

    # removing duplicate columns

    current_game = current_game.select(*(col for col in current_game.columns if current_game.columns.count(col) == 1))
          
    # Reset move count for each game 
    move_count = 0

    previous_row = current_game.filter(col("Move") == move_count)

    next_moves = [row["next_move"] for row in current_game_info.select("next_move").collect()]

    row_list = []
    
    for move in range(0, total_moves - 1): 
        
        if move % 15 == 0:

            print(f"Processing Move: {move}")

        # Step 2: Duplicate the previous row

        new_row = previous_row.withColumn("Move", col("Move") + 1)

        # Step 3: Split 'next_move' into 'from_square' and 'to_square'

        new_row = new_row.withColumn("from_square", substring(col("next_move"), 1, 2)) \
                         .withColumn("to_square", substring(col("next_move"), 3, 2))

        # Step 4a: Update the piece positions based on 'to_square'
        columns_to_check = [c for c in current_game.columns if c not in ["Move", "next_move", "from_square", "to_square", "game_id", "result"]]
        new_row = new_row.select(
            *[
                when(col("to_square") == col(column), "0").otherwise(col(column)).alias(column)
                if column in columns_to_check else col(column)
                for column in new_row.columns
            ]
        )

        # Step 4b: Update the piece positions based on 'from_square'
        new_row = new_row.select(
            *[
                when(col(column) == col("from_square"), col("to_square")).otherwise(col(column)).alias(column)
                if column in columns_to_check else col(column)
                for column in new_row.columns
            ]
        )

        # Step 5: Update the 'Next Move'
        move_count += 1
        new_row = new_row.withColumn("Move", lit(move_count)).drop("from_square", "to_square")
        if move + 1 < len(next_moves):
            new_row = new_row.withColumn("next_move", lit(next_moves[move + 1]))
        else:
            new_row = new_row.withColumn("next_move", lit(None))
            print("Ran out of moves")
            print(move)

        previous_row = new_row

        if move_count in moves_captured_range:
            pandas_df = new_row.toPandas()

            for square in chess_squares:
                pandas_df[square] = 0

            # Update chess squares based on piece positions
            for piece, value in piece_to_value.items():
                pandas_df = pandas_df.apply(
                    lambda row: update_squares(row, piece, value, chess_squares),
                    axis=1
            )

            pandas_df = pandas_df.drop(columns=list(piece_to_value.keys()))

            saved_games_final = pd.concat([saved_games_final, pandas_df], ignore_index=True)
            print(f"Saved Games Final Updated: {saved_games_final.shape[0]} rows")
            
            # Save intermediate results
            
            saved_games_final.to_csv(output_file, index = False)

print("Final DataFrame:")
print(saved_games_final)





Processing Game ID: 763, total moves: 143
Processing Move: 0
Processing Move: 15
Processing Move: 30
Processing Move: 45
Processing Move: 60
Processing Move: 75
Processing Move: 90
Processing Move: 105
Processing Move: 120
Processing Move: 135


24/11/27 16:11:29 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
24/11/27 16:12:46 WARN DAGScheduler: Broadcasting large task binary with size 2.5 MiB
                                                                                

Saved Games Final Updated: 1 rows


24/11/27 16:14:43 WARN DAGScheduler: Broadcasting large task binary with size 2.5 MiB
                                                                                

Saved Games Final Updated: 2 rows


24/11/27 16:16:42 WARN DAGScheduler: Broadcasting large task binary with size 2.6 MiB
                                                                                

Saved Games Final Updated: 3 rows


24/11/27 16:18:42 WARN DAGScheduler: Broadcasting large task binary with size 2.6 MiB
                                                                                

Saved Games Final Updated: 4 rows


24/11/27 16:20:46 WARN DAGScheduler: Broadcasting large task binary with size 2.6 MiB
                                                                                

Saved Games Final Updated: 5 rows


24/11/27 16:22:52 WARN DAGScheduler: Broadcasting large task binary with size 2.6 MiB
                                                                                

Saved Games Final Updated: 6 rows


24/11/27 16:25:00 WARN DAGScheduler: Broadcasting large task binary with size 2.6 MiB
                                                                                

Saved Games Final Updated: 7 rows
Processing Game ID: 816, total moves: 44
Processing Move: 0
Processing Move: 15
Processing Move: 30
Saved Games Final Updated: 8 rows
Saved Games Final Updated: 9 rows
Processing Game ID: 924, total moves: 25
Processing Move: 0
Processing Move: 15
Saved Games Final Updated: 10 rows
Processing Game ID: 1147, total moves: 72
Processing Move: 0
Processing Move: 15
Processing Move: 30
Processing Move: 45
Processing Move: 60


24/11/27 16:26:51 WARN DAGScheduler: Broadcasting large task binary with size 1335.7 KiB


Saved Games Final Updated: 11 rows


24/11/27 16:27:21 WARN DAGScheduler: Broadcasting large task binary with size 1354.5 KiB


Saved Games Final Updated: 12 rows


24/11/27 16:27:53 WARN DAGScheduler: Broadcasting large task binary with size 1373.2 KiB


Saved Games Final Updated: 13 rows
Processing Game ID: 1484, total moves: 81
Processing Move: 0
Processing Move: 15
Processing Move: 30
Processing Move: 45
Processing Move: 60
Processing Move: 75


24/11/27 16:29:06 WARN DAGScheduler: Broadcasting large task binary with size 1485.6 KiB
                                                                                

Saved Games Final Updated: 14 rows


In [54]:
output_path = '/home/zrc3hc/Chess/1. Preprocessing/saved_games_final1.csv'

saved_games_final.to_csv(output_path, index=False)
