# MLOps Phase 1 - Data Collection

In [None]:
# Importing Libraries
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, IntegerType, StringType
import re
import os

Visualizing a sample pgn to see the dataset.

In [None]:
pgn_file_path = 'lichess_db_standard_rated_2013-01.pgn'

# Open the file and print its contents
with open(pgn_file_path, 'r') as file:
    pgn_content = file.read()
print(pgn_content[:10000])

[Event "Rated Classical game"]
[Site "https://lichess.org/j1dkb5dw"]
[White "BFG9k"]
[Black "mamalak"]
[Result "1-0"]
[UTCDate "2012.12.31"]
[UTCTime "23:01:03"]
[WhiteElo "1639"]
[BlackElo "1403"]
[WhiteRatingDiff "+5"]
[BlackRatingDiff "-8"]
[ECO "C00"]
[Opening "French Defense: Normal Variation"]
[TimeControl "600+8"]
[Termination "Normal"]

1. e4 e6 2. d4 b6 3. a3 Bb7 4. Nc3 Nh6 5. Bxh6 gxh6 6. Be2 Qg5 7. Bg4 h5 8. Nf3 Qg6 9. Nh4 Qg5 10. Bxh5 Qxh4 11. Qf3 Kd8 12. Qxf7 Nc6 13. Qe8# 1-0

[Event "Rated Classical game"]
[Site "https://lichess.org/a9tcp02g"]
[White "Desmond_Wilson"]
[Black "savinka59"]
[Result "1-0"]
[UTCDate "2012.12.31"]
[UTCTime "23:04:12"]
[WhiteElo "1654"]
[BlackElo "1919"]
[WhiteRatingDiff "+19"]
[BlackRatingDiff "-22"]
[ECO "D04"]
[Opening "Queen's Pawn Game: Colle System, Anti-Colle"]
[TimeControl "480+2"]
[Termination "Normal"]

1. d4 d5 2. Nf3 Nf6 3. e3 Bf5 4. Nh4 Bg6 5. Nxg6 hxg6 6. Nd2 e6 7. Bd3 Bd6 8. e4 dxe4 9. Nxe4 Rxh2 10. Ke2 Rxh1 11. Qxh1 Nc6 12. Bg5 K

In [None]:
# Initialize Spark Session
spark = SparkSession.builder.appName("MLOps_Phase1").getOrCreate()

# Define the schema of the DataFrame
schema = StructType([
    StructField("black_rating", IntegerType(), True),
    StructField("white_rating", IntegerType(), True),
    StructField("time_control", StringType(), True),
    StructField("result", IntegerType(), True)
])

In [None]:
# Function to parse a single game's PGN data
def parse_game(game_text):
    game_info = {}

    # Find Ratings
    black_rating_match = re.search(r'\[BlackElo "(\d+)"\]', game_text)
    white_rating_match = re.search(r'\[WhiteElo "(\d+)"\]', game_text)
    if black_rating_match and white_rating_match:
        game_info['black_rating'] = int(black_rating_match.group(1))
        game_info['white_rating'] = int(white_rating_match.group(1))

    # Find Time Control
    time_control_match = re.search(r'\[TimeControl "(\d+\+\d+)"\]', game_text)
    if time_control_match:
        game_info['time_control'] = time_control_match.group(1)

    # Find Game Result
    result_match = re.search(r'\[Result "(.*?)"\]', game_text)
    if result_match:
        result = result_match.group(1)
        if result == "1-0":
            game_info['result'] = 1
        elif result == "0-1":
            game_info['result'] = -1
        elif result == "1/2-1/2":
            game_info['result'] = 0
        else:
            game_info['result'] = None

    return game_info

In [None]:
def process_pgn_file(file_path):
    games_data = []
    with open(file_path, 'r') as f:
        game_text = []
        for line in f:
            if line.strip() == "":
                games_data.append(parse_game("\n".join(game_text)))
                game_text = []
            else:
                game_text.append(line.strip())
    if game_text:
        games_data.append(parse_game("\n".join(game_text)))
    games_data = [game for game in games_data if game]
    return games_data

In [None]:
all_games_data = []

# Cycling through all the pgn files for each month to combine for 2013
for month in range(1, 13):
    month_str = f"{month:02d}"
    file_name = f"lichess_db_standard_rated_2013-{month_str}.pgn"
    file_path = file_name

    # Check if file exists
    if os.path.exists(file_path):
        print(f"Processing file: {file_name}")

            # Process the PGN file and extend the list with the new games
        game_data = process_pgn_file(file_path)
        all_games_data.extend(game_data)
    else:
        print(f"File does not exist: {file_name}")

# Create the DataFrame
combined_df = spark.createDataFrame(all_games_data, schema)

combined_df.show()

# Save the combined DataFrame as Parquet file
combined_df.write.parquet('processed_chess_data_2013.parquet')

Processing file: lichess_db_standard_rated_2013-01.pgn
Processing file: lichess_db_standard_rated_2013-02.pgn
Processing file: lichess_db_standard_rated_2013-03.pgn
Processing file: lichess_db_standard_rated_2013-04.pgn
Processing file: lichess_db_standard_rated_2013-05.pgn
Processing file: lichess_db_standard_rated_2013-06.pgn
Processing file: lichess_db_standard_rated_2013-07.pgn
Processing file: lichess_db_standard_rated_2013-08.pgn
Processing file: lichess_db_standard_rated_2013-09.pgn
Processing file: lichess_db_standard_rated_2013-10.pgn
Processing file: lichess_db_standard_rated_2013-11.pgn
Processing file: lichess_db_standard_rated_2013-12.pgn
+------------+------------+------------+------+
|black_rating|white_rating|time_control|result|
+------------+------------+------------+------+
|        1403|        1639|       600+8|     1|
|        1919|        1654|       480+2|     1|
|        1747|        1643|      420+17|     1|
|        1973|        1824|        60+1|    -1|
|   

In [None]:
number_of_rows = combined_df.count()
print(f"Number of rows in the combined DataFrame: {number_of_rows}")

Number of rows in the combined DataFrame: 2090230


In [None]:
combined_df.printSchema()

root
 |-- black_rating: integer (nullable = true)
 |-- white_rating: integer (nullable = true)
 |-- time_control: string (nullable = true)
 |-- result: integer (nullable = true)



In [None]:
for column in combined_df.columns:
    unique_values_count = combined_df.select(column).distinct().count()
    print(f"Unique values in column {column}: {unique_values_count}")

Unique values in column black_rating: 1702
Unique values in column white_rating: 1691
Unique values in column time_control: 860
Unique values in column result: 3


In [None]:
spark.stop()

Downloading the parquet file from colab.

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
!zip -r chess_data_2013.zip processed_chess_data_2013.parquet

  adding: processed_chess_data_2013.parquet/ (stored 0%)
  adding: processed_chess_data_2013.parquet/.part-00000-43cb5cd1-b5ce-41b1-9224-b14272ebe848-c000.snappy.parquet.crc (deflated 0%)
  adding: processed_chess_data_2013.parquet/part-00000-43cb5cd1-b5ce-41b1-9224-b14272ebe848-c000.snappy.parquet (deflated 9%)
  adding: processed_chess_data_2013.parquet/part-00001-43cb5cd1-b5ce-41b1-9224-b14272ebe848-c000.snappy.parquet (deflated 10%)
  adding: processed_chess_data_2013.parquet/.part-00001-43cb5cd1-b5ce-41b1-9224-b14272ebe848-c000.snappy.parquet.crc (deflated 0%)
  adding: processed_chess_data_2013.parquet/_SUCCESS (stored 0%)
  adding: processed_chess_data_2013.parquet/._SUCCESS.crc (stored 0%)


In [32]:
# Visualizing Results
import pandas as pd

parquet_file_path = 'processed_chess_data_2013.parquet'
df = pd.read_parquet(parquet_file_path)

print(df)

         black_rating  white_rating time_control  result
0              1403.0        1639.0        600+8       1
1              1919.0        1654.0        480+2       1
2              1747.0        1643.0       420+17       1
3              1973.0        1824.0         60+1      -1
4              1815.0        1765.0         60+1      -1
...               ...           ...          ...     ...
2090225        1952.0        1894.0         60+0      -1
2090226        1609.0        1361.0        300+5      -1
2090227        1461.0        1429.0        300+0      -1
2090228        2122.0        2048.0        180+4       1
2090229        1624.0        1626.0        120+0       1

[2090230 rows x 4 columns]


In [34]:
df.to_csv('processed_chess_data_2013_extracted.csv', index=False)