In [98]:
import os
from re import compile
from loaders.load_games_from_s3 import LoaderGamesFromS3
from pyspark.sql import DataFrame, SparkSession, Row
from pyspark.sql.functions import (
    monotonically_increasing_id,
    regexp_extract,
    when,
    col,
    lit,
    regexp_replace,
    first,
    coalesce,
    sum,
    explode,
    udf
)
from pyspark.sql.window import Window
from pyspark.sql.types import (
    StructType,
    StructField,
    StringType,
    IntegerType,
    TimestampType,
    ArrayType
)

from delta import *

development_phase_flag = 'development'

In [99]:
builder = (
    SparkSession.builder.appName("LoadGamesFromS3")
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
    .config(
        "spark.sql.catalog.spark_catalog",
        "org.apache.spark.sql.delta.catalog.DeltaCatalog",
    )
    .config("spark.sql.shuffle.partitions", "1")
)

spark = configure_spark_with_delta_pip(builder).getOrCreate()

In [100]:
# read csv from data dir
if development_phase_flag == "development":
    games_df: DataFrame = (
        spark.read.csv("../data/sample_games.csv", header=True)
        .filter(
            (col("value") != "")
            & (~col("value").like("%UTCTime%"))
            & (~col("value").like("%Result%"))
        )
        .withColumn("value", regexp_replace("value", '"', ""))
        .drop("_c0")
    )

    games_df.show()
else:
    s3_bucket = LoaderGamesFromS3(
        object_key="lichess_standard_rated_2024-08.pgn.zst",
        bucket_name="chess-pipeline",
        endpoint="https://s3.cubbit.eu",
        region="eu-west-1",
    )

    games_df = (
        s3_bucket.load_games_dataframe_from_s3()
        .filter(
            (col("value") != "")
            & (~col("value").like("%UTCTime%"))
            & (~col("value").like("%Result%"))
        )
        .withColumn("value", regexp_replace("value", '"', ""))
        .drop("_c0")
    )

+--------------------+
|               value|
+--------------------+
|[Event Rated Bull...|
|[Site https://lic...|
|   [Date 2024.08.01]|
|           [Round -]|
|[White kingskreamer]|
| [Black mysteryvabs]|
|[UTCDate 2024.08.01]|
|     [WhiteElo 2148]|
|     [BlackElo 2155]|
|[WhiteRatingDiff +6]|
|[BlackRatingDiff -6]|
|           [ECO B10]|
|[Opening Caro-Kan...|
|  [TimeControl 60+0]|
|[Termination Time...|
|1. e4 { [%clk 0:0...|
|[Event Rated Bull...|
|[Site https://lic...|
|   [Date 2024.08.01]|
|           [Round -]|
+--------------------+
only showing top 20 rows



In [101]:
# Rename the "value" column to "Line"
games_df = games_df.withColumnRenamed("value", "Line")


# Extract the "Key" and "Value" columns based on the structure of the "Line" column
games_df = (
    games_df.withColumn(
        "Key",
        when(col("Line").startswith("1."), "Moves").otherwise(
            regexp_extract(col("Line"), r"\[(.*?)\s", 1)
        ),
    )
    .withColumn(
        "Value",
        when(col("Line").startswith("1."), col("Line")).otherwise(
            regexp_replace(col("Line"), r"\[\w+\s", ""),
        ),
    )
    .withColumn("Value", regexp_replace(col("Value"), r"\]", ""))
)

# Add a column to identify the start of a game
games_df = games_df.withColumn(
    "StartOfGame", when(col("Line").startswith("[Event"), 1).otherwise(lit(0))
)

# Define a window specification for calculating the cumulative sum
windowSpec = Window.orderBy(monotonically_increasing_id())

# Calculate the cumulative sum of "StartOfGame" to create "GameID"
games_df = games_df.withColumn("GameID", sum(col("StartOfGame")).over(windowSpec))

games_df.show()

24/09/18 22:24:25 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/09/18 22:24:25 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/09/18 22:24:25 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/09/18 22:24:26 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/09/18 22:24:26 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.


+--------------------+---------------+--------------------+-----------+------+
|                Line|            Key|               Value|StartOfGame|GameID|
+--------------------+---------------+--------------------+-----------+------+
|[Event Rated Bull...|          Event|   Rated Bullet game|          1|     1|
|[Site https://lic...|           Site|https://lichess.o...|          0|     1|
|   [Date 2024.08.01]|           Date|          2024.08.01|          0|     1|
|           [Round -]|          Round|                   -|          0|     1|
|[White kingskreamer]|          White|        kingskreamer|          0|     1|
| [Black mysteryvabs]|          Black|         mysteryvabs|          0|     1|
|[UTCDate 2024.08.01]|        UTCDate|          2024.08.01|          0|     1|
|     [WhiteElo 2148]|       WhiteElo|                2148|          0|     1|
|     [BlackElo 2155]|       BlackElo|                2155|          0|     1|
|[WhiteRatingDiff +6]|WhiteRatingDiff|              

                                                                                

In [102]:
col_list = (
    games_df.select("Key")
    .distinct()
    .filter(col("Key") != "")
    .toPandas()["Key"]
    .tolist()
)

col_list

['Event',
 'Site',
 'Date',
 'Round',
 'White',
 'Black',
 'UTCDate',
 'WhiteElo',
 'BlackElo',
 'WhiteRatingDiff',
 'BlackRatingDiff',
 'ECO',
 'Opening',
 'TimeControl',
 'Termination',
 'Moves',
 'WhiteTitle',
 'BlackTitle']

In [103]:
col_list = (
    games_df.select("Key")
    .groupBy("Key").count()
    .filter(col("Key") != "")
    .toPandas()["Key"]
    .tolist()
)

col_list

['Event',
 'Site',
 'Date',
 'Round',
 'White',
 'Black',
 'UTCDate',
 'WhiteElo',
 'BlackElo',
 'WhiteRatingDiff',
 'BlackRatingDiff',
 'ECO',
 'Opening',
 'TimeControl',
 'Termination',
 'Moves',
 'WhiteTitle',
 'BlackTitle']

In [104]:
# Pivot the DataFrame based on "GameID" and the specified columns
pivot_games_df = (
    games_df.groupBy("GameID")
    .pivot("Key", col_list)
    .agg(first("Value"))
    .orderBy("GameID")
)

pivot_games_df = pivot_games_df.filter(col("Moves").contains("%eval"))
pivot_games_df.show()

24/09/18 22:24:30 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/09/18 22:24:30 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/09/18 22:24:31 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/09/18 22:24:31 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.


+------+--------------------+--------------------+----------+-----+-------------------+-------------+----------+--------+--------+---------------+---------------+---+--------------------+-----------+------------+--------------------+----------+----------+
|GameID|               Event|                Site|      Date|Round|              White|        Black|   UTCDate|WhiteElo|BlackElo|WhiteRatingDiff|BlackRatingDiff|ECO|             Opening|TimeControl| Termination|               Moves|WhiteTitle|BlackTitle|
+------+--------------------+--------------------+----------+-----+-------------------+-------------+----------+--------+--------+---------------+---------------+---+--------------------+-----------+------------+--------------------+----------+----------+
|    40|Rated Blitz tourn...|https://lichess.o...|2024.08.01|    -|      TenderBeastXL|    G_Capella|2024.08.01|    1845|    1888|             +7|             -7|B50|Sicilian Defense:...|      180+2|      Normal|1. e4 { [%eval 0....

In [105]:
pivot_games_df.select("Round").distinct().show(truncate=False)
pivot_games_df = pivot_games_df.drop("Round")

24/09/18 22:24:33 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/09/18 22:24:33 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/09/18 22:24:34 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/09/18 22:24:34 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.


+-----+
|Round|
+-----+
|-    |
+-----+



In [106]:
id_and_moves_df = pivot_games_df.select(["GameID", "Moves"]).repartition("GameID")
id_and_moves_df.show()

24/09/18 22:24:34 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/09/18 22:24:34 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/09/18 22:24:35 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/09/18 22:24:35 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.


+------+--------------------+
|GameID|               Moves|
+------+--------------------+
|    40|1. e4 { [%eval 0....|
|    50|1. e4 { [%eval 0....|
|    52|1. e4 { [%eval 0....|
|    58|1. e4 { [%eval 0....|
|    61|1. d4 { [%eval 0....|
|    65|1. Nf3 { [%eval 0...|
|    71|1. d4 { [%eval 0....|
|    83|1. e4 { [%eval 0....|
|    89|1. e4 { [%eval 0....|
|    91|1. e4 { [%eval 0....|
|   110|1. d4 { [%eval 0....|
|   123|1. d4 { [%eval 0....|
|   130|1. e4 { [%eval 0....|
|   132|1. e4 { [%eval 0....|
|   142|1. f4 { [%eval -0...|
|   154|1. d4 { [%eval 0....|
|   173|1. e4 { [%eval 0....|
|   190|1. e4 { [%eval 0....|
|   208|1. e4 { [%eval 0....|
|   209|1. e4 { [%eval 0....|
+------+--------------------+
only showing top 20 rows



## Data Warehouse

In [107]:
re_pattern = compile(
    r"'?(\d+)\.(\w?\-?\w+\!*?\?*?\!?)!?\??\{\[%eval(-?\d+\.\d+)\[%clk(\d+:\d+:\d+)\}(\d+)\.{3}(\w?\-?\w+\!*?\?*?\!?)\{\[%eval(-?\d+\.\d+)\[%clk(\d+:\d+:\d+)\}(\d-\d)?'?"
)


def parse_game_moves(game_string: str) -> list[Row]:
    """
    Parse the game string to extract the moves and evaluations.

    Args:
    game_string: str
        The string containing the game moves.

    Returns:
        list[Row]: The list of parsed moves and evaluations.
    """
    no_spacing_games_string = game_string.replace(" ", "")
    try:
        moves: list[str] = re_pattern.findall(no_spacing_games_string)
    except Exception as e:
        print(f"Error applying regex into moves: {e}")
        return []

    return [
        Row(
            WhiteMoveCount=int(move[0]) if move[0] else None,
            WhiteMove=move[1] if move[1] else None,
            WhiteEval=move[2] if move[2] else None,
            WhiteTime=move[3] if move[3] else None,
            BlackMoveCount=int(move[4]) if move[4] else None,
            BlackMove=move[5] if move[5] else None,
            BlackEval=move[6] if move[6] else None,
            BlackTime=move[7] if move[7] else None,
            Result=move[8] if move[8] else None,
        )
        for move in moves
    ]

In [108]:


schema = StructType(
    [
        StructField("WhiteMoveCount", IntegerType(), True),
        StructField("WhiteMove", StringType(), True),
        StructField("WhiteEval", StringType(), True),
        StructField("WhiteTime", StringType(), True),
        StructField("BlackMoveCount", IntegerType(), True),
        StructField("BlackMove", StringType(), True),
        StructField("BlackEval", StringType(), True),
        StructField("BlackTime", StringType(), True),
        StructField("Result", StringType(), True),
    ]
)

# UDF to parse game moves
parse_game_moves_udf = udf(parse_game_moves, ArrayType(schema))

# Apply UDF and explode the resulting array of structs
parsed_moves_df = id_and_moves_df.withColumn(
    "ParsedMoves", explode(parse_game_moves_udf(col("Moves")))
)

# Select the necessary columns and add GameID
normalized_moves_df = parsed_moves_df.select(
    col("GameID"),
    col("ParsedMoves.WhiteMoveCount"),
    col("ParsedMoves.WhiteMove"),
    col("ParsedMoves.WhiteEval"),
    col("ParsedMoves.WhiteTime"),
    col("ParsedMoves.BlackMoveCount"),
    col("ParsedMoves.BlackMove"),
    col("ParsedMoves.BlackEval"),
    col("ParsedMoves.BlackTime"),
    col("ParsedMoves.Result"),
)

normalized_moves_df.show()

24/09/18 22:24:37 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/09/18 22:24:37 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/09/18 22:24:37 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/09/18 22:24:37 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.


+------+--------------+---------+---------+---------+--------------+---------+---------+---------+------+
|GameID|WhiteMoveCount|WhiteMove|WhiteEval|WhiteTime|BlackMoveCount|BlackMove|BlackEval|BlackTime|Result|
+------+--------------+---------+---------+---------+--------------+---------+---------+---------+------+
|    40|             1|       e4|     0.15|  0:03:00|             1|       c5|     0.25|  0:03:00|  NULL|
|    40|             2|      Nf3|     0.22|  0:03:00|             2|       d6|     0.26|  0:03:01|  NULL|
|    40|             3|      Nc3|     0.13|  0:02:59|             3|       e6|     0.46|  0:03:02|  NULL|
|    40|             4|       d3|     0.04|  0:03:00|             4|      Ne7|     0.41|  0:03:03|  NULL|
|    40|             5|      Be2|     0.22|  0:03:00|             5|    Ng6?!|     0.86|  0:03:04|  NULL|
|    40|             6|      O-O|     0.33|  0:03:01|             6|      Be7|     0.39|  0:03:05|  NULL|
|    40|             7|      Be3|     0.32|  0

In [109]:
moves_values = id_and_moves_df.select("Moves").rdd.flatMap(lambda x: x).collect()
parsed_moves_rows = [parse_game_moves(move) for move in moves_values]
normalized_moves_df = spark.createDataFrame(
    data=[move for moves in parsed_moves_rows for move in moves],
    schema="WhiteMoveCount INT, WhiteMove STRING, WhiteEval STRING, WhiteTime STRING, BlackMoveCount INT, BlackMove STRING, BlackEval STRING, BlackTime STRING, Result STRING",
)

# temporary variable
normalized_moves_df = normalized_moves_df.withColumn(
    "row_num", monotonically_increasing_id()
)

# temporary variable
game_id_df = id_and_moves_df.select("GameID").withColumn(
    "row_num", monotonically_increasing_id()
)

# Join the DataFrames on the "row_num" column and drop the "row_num" column
normalized_moves_df = normalized_moves_df.join(game_id_df, "row_num", "left").drop(
    "row_num"
)

normalized_moves_df.show()

24/09/18 22:24:39 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/09/18 22:24:39 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/09/18 22:24:39 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/09/18 22:24:39 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/09/18 22:24:41 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/09/18 22:24:41 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/09/18 2

+--------------+---------+---------+---------+--------------+---------+---------+---------+------+------+
|WhiteMoveCount|WhiteMove|WhiteEval|WhiteTime|BlackMoveCount|BlackMove|BlackEval|BlackTime|Result|GameID|
+--------------+---------+---------+---------+--------------+---------+---------+---------+------+------+
|             1|       e4|     0.15|  0:03:00|             1|       c5|     0.25|  0:03:00|  NULL|    40|
|             2|      Nf3|     0.22|  0:03:00|             2|       d6|     0.26|  0:03:01|  NULL|    50|
|             3|      Nc3|     0.13|  0:02:59|             3|       e6|     0.46|  0:03:02|  NULL|    52|
|             4|       d3|     0.04|  0:03:00|             4|      Ne7|     0.41|  0:03:03|  NULL|    58|
|             5|      Be2|     0.22|  0:03:00|             5|    Ng6?!|     0.86|  0:03:04|  NULL|    61|
|             6|      O-O|     0.33|  0:03:01|             6|      Be7|     0.39|  0:03:05|  NULL|    65|
|             7|      Be3|     0.32|  0:02:57|

In [110]:
normalized_moves_df = normalized_moves_df.withColumn(
    "Result",
    when(
        (col("Result").isNull())
        & (col("WhiteMove").contains("-"))
        & (col("BlackMove").rlike(r"-")),
        coalesce(col("WhiteMove"), col("BlackMove")),
    ).otherwise(col("Result")),
)

## Ingestion into Delta Lake

In [111]:
current_path = os.path.abspath(os.getcwd())
current_path = current_path.split("/")[:-1]
current_path = "/".join(current_path)
current_path

'/Users/jcbraz/Projects/chess-pipeline'

In [112]:
normalized_moves_df.sort(
    ["WhiteMoveCount", "BlackMoveCount", "WhiteTime", "BlackTime"]
).write.format("delta").mode("overwrite").save(f"{current_path}/data/normalized_moves")

24/09/18 22:24:44 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/09/18 22:24:44 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/09/18 22:24:45 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/09/18 22:24:45 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
                                                                                