In [1]:
#!pip install pyspark

In [2]:
from pyspark.sql import SparkSession
import os
from pyspark.sql.types import StructType, StructField, StringType

In [3]:
import pandas as pd

In [4]:
# Create a spark session
spark = SparkSession.builder.getOrCreate()

In [29]:
# Create an RDD for the pgn files
text = spark.sparkContext.textFile("../Desktop/Data1/*.pgn")

In [30]:
# Show the first 20 elements of the RDD
text.take(20)

['[Event "Rated Classical game"]',
 '[Site "https://lichess.org/j1dkb5dw"]',
 '[White "BFG9k"]',
 '[Black "mamalak"]',
 '[Result "1-0"]',
 '[UTCDate "2012.12.31"]',
 '[UTCTime "23:01:03"]',
 '[WhiteElo "1639"]',
 '[BlackElo "1403"]',
 '[WhiteRatingDiff "+5"]',
 '[BlackRatingDiff "-8"]',
 '[ECO "C00"]',
 '[Opening "French Defense: Normal Variation"]',
 '[TimeControl "600+8"]',
 '[Termination "Normal"]',
 '',
 '1. e4 e6 2. d4 b6 3. a3 Bb7 4. Nc3 Nh6 5. Bxh6 gxh6 6. Be2 Qg5 7. Bg4 h5 8. Nf3 Qg6 9. Nh4 Qg5 10. Bxh5 Qxh4 11. Qf3 Kd8 12. Qxf7 Nc6 13. Qe8# 1-0',
 '',
 '[Event "Rated Classical game"]',
 '[Site "https://lichess.org/a9tcp02g"]']

In [7]:
# Count the number of elements in the RDD
text.count()

131101223

In [8]:
# Show the number of partitions
text.getNumPartitions()

189

In [9]:
# Filter out elements that are empty and do not start with "["
text = text.filter(lambda x: len(x) > 0)
text = text.filter(lambda x: x[0] == '[')

In [10]:
# Remove the "[]" from the start and end of each element
text = text.map(lambda x: x[1:-1])

In [31]:
# Split the elements into a list seperated by the spaces
text = text.map(lambda x: x.split(" "))

In [12]:
# Create a list for the words we want to keep
variables = ['WhiteElo', 'BlackElo', 'TimeControl', 'Result']
text = text.filter(lambda x: x[0] in variables)

In [13]:
# Remove the double quotes from the second element
text = text.map(lambda x: [x[0], x[1].replace('"', '')])

In [14]:
# Select the elements that only have the results
results_only = text.filter(lambda x: x[0] == 'Result')
other_text = text.filter(lambda x: x[0] != 'Result')

In [15]:
# Map the results to the correct 1 digit result
results_map = {'1-0': 1, '0-1': -1, '1/2-1/2': 0}
results_only = results_only.map(lambda x: [x[0], results_map[x[1]]])

In [16]:
# Group the elements by key
r_group = results_only.groupByKey().mapValues(list)
other_grouped = other_text.groupByKey().mapValues(list)

In [17]:
# Get the first four grouped elements
results = other_grouped.take(3)

In [18]:
# Create variables to store the list of values for each columns
black_rating = results[1][1]
white_rating = results[2][1]
time_control = results[0][1]
result = r_group.take(1)[0][1]

In [19]:
# Create a list of tuples representing the data
data = list(zip(black_rating, white_rating, time_control, result))

In [20]:
# Define the schema for the DataFrame
schema = StructType([
    StructField("black_rating", StringType(), True),
    StructField("white_rating", StringType(), True),
    StructField("time_control", StringType(), True),
    StructField("result", StringType(), True)
])

In [21]:
# Create a pyspark dataframe
df = spark.createDataFrame(data, schema)

In [22]:
# Show the resulting dataframe
df.show()

+------------+------------+------------+------+
|black_rating|white_rating|time_control|result|
+------------+------------+------------+------+
|        1403|        1639|       600+8|     1|
|        1919|        1654|       480+2|     1|
|        1747|        1643|      420+17|     1|
|        1973|        1824|        60+1|    -1|
|        1815|        1765|        60+1|    -1|
|        1487|        1477|       300+3|     1|
|        1500|        1541|       300+0|    -1|
|        1752|        1765|       540+0|     1|
|        1169|        1445|       900+0|     1|
|        1428|        1522|       180+5|    -1|
|        1544|        1644|       600+8|    -1|
|        1755|        1957|       300+8|    -1|
|        1812|        1656|      420+17|     1|
|        1506|        1436|       420+0|     1|
|        1940|        1878|       300+0|     0|
|        1775|        1742|       540+0|    -1|
|        1811|        1824|       360+6|     1|
|        1395|        1653|      1200+0|

In [23]:
# Save the dataframe to a parquet file
df.write.parquet('../Desktop/results.parquet')

In [24]:
# Check to see if you can read the parquet file
parquetfile = spark.read.parquet('../Desktop/results.parquet')

In [25]:
parquetfile.show()

+------------+------------+------------+------+
|black_rating|white_rating|time_control|result|
+------------+------------+------------+------+
|        1436|         884|           -|    -1|
|        1742|        1606|       240+3|     1|
|        1481|        1624|       120+0|     1|
|        1330|        1582|       300+0|     1|
|        1780|        1758|       180+0|    -1|
|        1482|        1669|       300+1|     1|
|        1823|        1413|       480+8|    -1|
|        1591|        1366|    9000+180|    -1|
|         887|        1434|           -|     1|
|        1269|        1073|       600+5|    -1|
|        1853|        1982|        60+1|     1|
|        1666|        1754|         0+2|    -1|
|        1692|        1832|         0+1|     1|
|        1611|        1500|       300+8|     1|
|        1730|        1701|       120+0|     1|
|        1566|        1601|       120+0|    -1|
|        1443|         874|           -|    -1|
|        1826|        1835|       300+8|