In [1]:
from pyspark.sql import SparkSession
import pandas as pd

In [2]:
import numpy as py
import re
from pyspark.sql.types import Row

In [3]:
import pyspark.sql.functions as fn 

In [4]:
spark = SparkSession \
    .builder \
    .appName("PySpark assignment") \
    .getOrCreate()

In [5]:
spark.sparkContext._jsc.hadoopConfiguration().set('textinputformat.record.delimiter', '[Event ')

In [25]:
rdd = spark.sparkContext.textFile('lichess_db_chess960_rated_2020-12.pgn')

In [26]:
# url = "https://database.lichess.org/chess960/lichess_db_chess960_rated_2020-12.pgn.bz2"
# rdd =spark.sparkContext.addFile(url)

In [27]:
#rdd.take(10)

In [28]:
#rdd.filter(lambda x: len(x) > 0).map(lambda x: '[Event ' +x).take(1)

In [29]:
rdd = rdd.filter(lambda x: len(x) > 0).map(lambda x: '[Event '+ x)

In [30]:
def parse_pgn(pgn):
    # Do some Python to turn our PGN into a Row
    # Regular expression 
    string = pgn
    # note: regular expression '.*' -> any thing for any number of times
    f = re.search(r'\[FEN "(.*)"\]',string)
    w = re.search(r'\[WhiteElo "(.*)"\]',string)
    b = re.search(r'\[BlackElo "(.*)"\]',string)
    #d = re.search(r'\[WhiteRatingDiff "(.*)"\]',string)
    r = re.search(r'\[Result "(.*)"\]',string)
    if f and w and b and r:
        return Row(FEN = f.groups()[0],White_Rating = w.groups()[0], Black_Rating = b.groups()[0], Result = r.groups()[0]) 
    else:
        return None  
    
    



In [31]:
df = rdd.map(parse_pgn).toDF()

In [32]:
def percentageSummary (df,lowerRate=0,upperRate=0):
    # filter the ranking 
    if upperRate == 0:
        dfRate = df.filter(df.White_Rating >lowerRate).filter(df.Black_Rating >lowerRate)
    elif lowerRate == 0 and upperRate == 0:
        dfRate = df
    else:
        dfRate = df.filter(df.White_Rating.between(lowerRate,upperRate)).filter(df.Black_Rating.between(lowerRate,upperRate))
    
    
    #compute the total count of Games
    dfTotal = dfRate.groupBy('FEN').count().sort('count', ascending =False)
    dfTotal = dfTotal.withColumnRenamed("count","Total")
    
    
    #comput the count for White winning
    dfCount = dfRate.filter(dfRate.Result == '1-0').groupBy('FEN').count().sort('count', ascending =False)
    dfCount = dfCount.withColumnRenamed("count","WhiteWin")

    #merge/join the two df on FEN number
    dfNew = dfCount.join(dfTotal, on=['FEN'], how='left_outer')
    
    dfNew = dfNew\
        .withColumn('%', fn.expr('WhiteWin/Total'))\
        .sort('%', ascending =False)
    
    return dfNew

In [33]:
df_Under1200 = percentageSummary(df,0,1200)
df_Under1200.show(truncate = False)

+--------------------------------------------------------+--------+-----+---+
|FEN                                                     |WhiteWin|Total|%  |
+--------------------------------------------------------+--------+-----+---+
|rqkbbnnr/pppppppp/8/8/8/8/PPPPPPPP/RQKBBNNR w KQkq - 0 1|2       |2    |1.0|
|nnrkbbrq/pppppppp/8/8/8/8/PPPPPPPP/NNRKBBRQ w KQkq - 0 1|1       |1    |1.0|
|nrkbnqbr/pppppppp/8/8/8/8/PPPPPPPP/NRKBNQBR w KQkq - 0 1|1       |1    |1.0|
|nqrbnkbr/pppppppp/8/8/8/8/PPPPPPPP/NQRBNKBR w KQkq - 0 1|1       |1    |1.0|
|bnnrkqrb/pppppppp/8/8/8/8/PPPPPPPP/BNNRKQRB w KQkq - 0 1|1       |1    |1.0|
|rkrbbqnn/pppppppp/8/8/8/8/PPPPPPPP/RKRBBQNN w KQkq - 0 1|1       |1    |1.0|
|bnrnkqrb/pppppppp/8/8/8/8/PPPPPPPP/BNRNKQRB w KQkq - 0 1|1       |1    |1.0|
|rknrbqnb/pppppppp/8/8/8/8/PPPPPPPP/RKNRBQNB w KQkq - 0 1|1       |1    |1.0|
|nrbknbqr/pppppppp/8/8/8/8/PPPPPPPP/NRBKNBQR w KQkq - 0 1|1       |1    |1.0|
|bbrknrqn/pppppppp/8/8/8/8/PPPPPPPP/BBRKNRQN w KQkq - 0 1|1     

In [34]:
df_1200To1800 = percentageSummary(df,1200,1800)
df_1200To1800.show(truncate = False)

+--------------------------------------------------------+--------+-----+------------------+
|FEN                                                     |WhiteWin|Total|%                 |
+--------------------------------------------------------+--------+-----+------------------+
|rnbnkbrq/pppppppp/8/8/8/8/PPPPPPPP/RNBNKBRQ w KQkq - 0 1|93      |148  |0.6283783783783784|
|bbrkqnrn/pppppppp/8/8/8/8/PPPPPPPP/BBRKQNRN w KQkq - 0 1|88      |143  |0.6153846153846154|
|brkbnrqn/pppppppp/8/8/8/8/PPPPPPPP/BRKBNRQN w KQkq - 0 1|85      |140  |0.6071428571428571|
|bbnqnrkr/pppppppp/8/8/8/8/PPPPPPPP/BBNQNRKR w KQkq - 0 1|91      |150  |0.6066666666666667|
|nrkbrqbn/pppppppp/8/8/8/8/PPPPPPPP/NRKBRQBN w KQkq - 0 1|93      |156  |0.5961538461538461|
|qbrnbknr/pppppppp/8/8/8/8/PPPPPPPP/QBRNBKNR w KQkq - 0 1|93      |156  |0.5961538461538461|
|rqkbrnbn/pppppppp/8/8/8/8/PPPPPPPP/RQKBRNBN w KQkq - 0 1|94      |160  |0.5875            |
|qrbknrnb/pppppppp/8/8/8/8/PPPPPPPP/QRBKNRNB w KQkq - 0 1|92      |157

In [35]:
df_1800To2200 = percentageSummary(df,1800,2200)
df_1800To2200.show(truncate = False)

+--------------------------------------------------------+--------+-----+------------------+
|FEN                                                     |WhiteWin|Total|%                 |
+--------------------------------------------------------+--------+-----+------------------+
|rkrqbnnb/pppppppp/8/8/8/8/PPPPPPPP/RKRQBNNB w KQkq - 0 1|39      |58   |0.6724137931034483|
|brnqkbrn/pppppppp/8/8/8/8/PPPPPPPP/BRNQKBRN w KQkq - 0 1|38      |58   |0.6551724137931034|
|nrkbbnqr/pppppppp/8/8/8/8/PPPPPPPP/NRKBBNQR w KQkq - 0 1|41      |63   |0.6507936507936508|
|nqrbbkrn/pppppppp/8/8/8/8/PPPPPPPP/NQRBBKRN w KQkq - 0 1|35      |54   |0.6481481481481481|
|nnbbrkqr/pppppppp/8/8/8/8/PPPPPPPP/NNBBRKQR w KQkq - 0 1|49      |76   |0.6447368421052632|
|nrbkqbnr/pppppppp/8/8/8/8/PPPPPPPP/NRBKQBNR w KQkq - 0 1|29      |45   |0.6444444444444445|
|bnrqnbkr/pppppppp/8/8/8/8/PPPPPPPP/BNRQNBKR w KQkq - 0 1|38      |59   |0.6440677966101694|
|qnrbnkbr/pppppppp/8/8/8/8/PPPPPPPP/QNRBNKBR w KQkq - 0 1|36      |56 

In [36]:
df_Over2200 = percentageSummary(df,2200,0)
df_Over2200.show(truncate = False)

+--------------------------------------------------------+--------+-----+---+
|FEN                                                     |WhiteWin|Total|%  |
+--------------------------------------------------------+--------+-----+---+
|brnbnkqr/pppppppp/8/8/8/8/PPPPPPPP/BRNBNKQR w KQkq - 0 1|3       |3    |1.0|
|nnrqkbbr/pppppppp/8/8/8/8/PPPPPPPP/NNRQKBBR w KQkq - 0 1|1       |1    |1.0|
|nqbrkbnr/pppppppp/8/8/8/8/PPPPPPPP/NQBRKBNR w KQkq - 0 1|1       |1    |1.0|
|nnrkbbrq/pppppppp/8/8/8/8/PPPPPPPP/NNRKBBRQ w KQkq - 0 1|2       |2    |1.0|
|rbnnbqkr/pppppppp/8/8/8/8/PPPPPPPP/RBNNBQKR w KQkq - 0 1|1       |1    |1.0|
|rbbnnqkr/pppppppp/8/8/8/8/PPPPPPPP/RBBNNQKR w KQkq - 0 1|2       |2    |1.0|
|nrkbbqrn/pppppppp/8/8/8/8/PPPPPPPP/NRKBBQRN w KQkq - 0 1|1       |1    |1.0|
|rkbqnbrn/pppppppp/8/8/8/8/PPPPPPPP/RKBQNBRN w KQkq - 0 1|1       |1    |1.0|
|bbnqnrkr/pppppppp/8/8/8/8/PPPPPPPP/BBNQNRKR w KQkq - 0 1|1       |1    |1.0|
|rnqknrbb/pppppppp/8/8/8/8/PPPPPPPP/RNQKNRBB w KQkq - 0 1|1     

In [37]:
df_All = percentageSummary(df,0,0)
df_All.show(truncate = False)

+--------------------------------------------------------+--------+-----+------------------+
|FEN                                                     |WhiteWin|Total|%                 |
+--------------------------------------------------------+--------+-----+------------------+
|qrnknrbb/pppppppp/8/8/8/8/PPPPPPPP/QRNKNRBB w KQkq - 0 1|171     |291  |0.5876288659793815|
|rkbrnnqb/pppppppp/8/8/8/8/PPPPPPPP/RKBRNNQB w KQkq - 0 1|182     |314  |0.5796178343949044|
|qbnrnkbr/pppppppp/8/8/8/8/PPPPPPPP/QBNRNKBR w KQkq - 0 1|192     |334  |0.5748502994011976|
|nrkbrqbn/pppppppp/8/8/8/8/PPPPPPPP/NRKBRQBN w KQkq - 0 1|175     |307  |0.5700325732899023|
|qbrkbnrn/pppppppp/8/8/8/8/PPPPPPPP/QBRKBNRN w KQkq - 0 1|174     |307  |0.5667752442996743|
|nbbqnrkr/pppppppp/8/8/8/8/PPPPPPPP/NBBQNRKR w KQkq - 0 1|187     |330  |0.5666666666666667|
|brkbnrqn/pppppppp/8/8/8/8/PPPPPPPP/BRKBNRQN w KQkq - 0 1|167     |295  |0.5661016949152542|
|bbrkqnrn/pppppppp/8/8/8/8/PPPPPPPP/BBRKQNRN w KQkq - 0 1|155     |275