In [None]:
from pyspark.sql import SparkSession, functions as F
from pyspark.sql.functions import col, when
from functools import reduce


def cleanData(chess):
    chessMain = chess.select( "Moves","Site","TimeControl","WhiteElo","BlackElo").withColumn(
        "WhiteMoves",F.trim(F.regexp_replace(F.col("Moves"),r"\d+\.{3}.*?(?=\d+\.\s|\Z)",""))).withColumn(
            "BlackMoves",F.trim(F.regexp_replace(F.col("Moves"),r"\d+\.\s+.*?(?=\d+\.{3}\s|\Z)","")))
    chessBlunders = chessMain.select( "Moves","Site","TimeControl","WhiteElo","WhiteMoves",(F.size(F.split(F.col("WhiteMoves"), r"\?\?")) - 1).alias("WhiteBlunderCount"),"BlackElo","BlackMoves",
        (F.size(F.split(F.col("BlackMoves"), r"\?\?")) - 1).alias("BlackBlunderCount"),(F.size(F.split(F.col("Moves"), r"\?\?")) - 1).alias("TotalBlunderCount")).withColumn("TimeControlName", F.when(
            (F.split("TimeControl", r"\+")[0].cast("int") +
            F.split("TimeControl", r"\+")[1].cast("int") * 40) < 30,
            "Ultra Bullet"
        ).when(
            (F.split("TimeControl", r"\+")[0].cast("int") +
            F.split("TimeControl", r"\+")[1].cast("int") * 40) < 180,
            "Bullet"
        ).when(
            (F.split("TimeControl", r"\+")[0].cast("int") +
            F.split("TimeControl", r"\+")[1].cast("int") * 40) < 500,
            "Blitz"
        ).when(
            (F.split("TimeControl", r"\+")[0].cast("int") +
            F.split("TimeControl", r"\+")[1].cast("int") * 40) < 1500,
            "Rapid"
        ).otherwise("Classic"))
    chessResult = chessBlunders.filter(~F.col("TimeControl").contains('-'))
    chessResult = chessBlunders.filter(F.col("Moves").contains("eval"))
    return chessResult
    

def createDF(Date, chess):
    df = chess.withColumn(
        "PlayerElo",
        F.when((F.col("WhiteElo") < 1400) | (F.col("BlackElo") < 1400), "Beginner")
         .when(
             (F.col("WhiteElo").between(1400, 1999)) |
             (F.col("BlackElo").between(1400, 1999)),
             "Intermediate"
         )
         .otherwise("Expert")
    )
    result = (
        df.groupBy("PlayerElo", "TimeControlName")
          .agg(
              F.count("*").alias("TotalGames"),
              F.sum("TotalBlunderCount").alias("TotalBlunders")
          )
          .withColumn("Date", F.lit(Date))
          .withColumn(
              "AverageBlunders",
              F.when(F.col("TotalGames") > 0,
                     F.col("TotalBlunders") / F.col("TotalGames"))
               .otherwise(F.lit(0.0))
          )
          .select(
              "Date",
              "PlayerElo",
              F.col("TimeControlName").alias("TimeControlLevel"),
              "TotalGames",
              "TotalBlunders",
              "AverageBlunders"
          )
    )
    return result


#2023
chess23_01= spark.read.parquet("/data/doina/Lichess/lichess_db_standard_rated_2023-01.parquet")
chess23_02= spark.read.parquet("/data/doina/Lichess/lichess_db_standard_rated_2023-02.parquet")
chess23_03= spark.read.parquet("/data/doina/Lichess/lichess_db_standard_rated_2023-03.parquet")
chess23_04= spark.read.parquet("/data/doina/Lichess/lichess_db_standard_rated_2023-04.parquet")
chess23_05= spark.read.parquet("/data/doina/Lichess/lichess_db_standard_rated_2023-05.parquet")
chess23_06= spark.read.parquet("/data/doina/Lichess/lichess_db_standard_rated_2023-06.parquet")
chess23_07= spark.read.parquet("/data/doina/Lichess/lichess_db_standard_rated_2023-07.parquet")
chess23_08= spark.read.parquet("/data/doina/Lichess/lichess_db_standard_rated_2023-08.parquet")
chess23_09= spark.read.parquet("/data/doina/Lichess/lichess_db_standard_rated_2023-09.parquet")
chess23_10= spark.read.parquet("/data/doina/Lichess/lichess_db_standard_rated_2023-10.parquet")
chess23_11= spark.read.parquet("/data/doina/Lichess/lichess_db_standard_rated_2023-11.parquet")
chess23_12= spark.read.parquet("/data/doina/Lichess/lichess_db_standard_rated_2023-12.parquet")


chess23_ = [
     chess23_01,
     chess23_02,
     chess23_03,
     chess23_04,
     chess23_05,
     chess23_06,
     chess23_07,
     chess23_08,
     chess23_09,
     chess23_10,
     chess23_11,
     chess23_12
]

chess23 = reduce(lambda a,b: a.unionByName(b), chess23_)
df23 = createDF("2023", cleanData(chess23))

#2022
chess22_01= spark.read.parquet("/data/doina/Lichess/lichess_db_standard_rated_2022-01.parquet")
chess22_02= spark.read.parquet("/data/doina/Lichess/lichess_db_standard_rated_2022-02.parquet")
chess22_03= spark.read.parquet("/data/doina/Lichess/lichess_db_standard_rated_2022-03.parquet")
chess22_04= spark.read.parquet("/data/doina/Lichess/lichess_db_standard_rated_2022-04.parquet")
chess22_05= spark.read.parquet("/data/doina/Lichess/lichess_db_standard_rated_2022-05.parquet")
chess22_06= spark.read.parquet("/data/doina/Lichess/lichess_db_standard_rated_2022-06.parquet")
chess22_07= spark.read.parquet("/data/doina/Lichess/lichess_db_standard_rated_2022-07.parquet")
chess22_08= spark.read.parquet("/data/doina/Lichess/lichess_db_standard_rated_2022-08.parquet")
chess22_09= spark.read.parquet("/data/doina/Lichess/lichess_db_standard_rated_2022-09.parquet")
chess22_10= spark.read.parquet("/data/doina/Lichess/lichess_db_standard_rated_2022-10.parquet")
chess22_11= spark.read.parquet("/data/doina/Lichess/lichess_db_standard_rated_2022-11.parquet")
chess22_12= spark.read.parquet("/data/doina/Lichess/lichess_db_standard_rated_2022-12.parquet")


chess22_ = [
     chess22_01,
     chess22_02,
     chess22_03,
     chess22_04,
     chess22_05,
     chess22_06,
     chess22_07,
     chess22_08,
     chess22_09,
     chess22_10,
     chess22_11,
     chess22_12
]

chess22 = reduce(lambda a,b: a.unionByName(b), chess22_)
df22 = createDF("2022", cleanData(chess22))

#2021
chess21_01= spark.read.parquet("/data/doina/Lichess/lichess_db_standard_rated_2021-01.parquet")
chess21_02= spark.read.parquet("/data/doina/Lichess/lichess_db_standard_rated_2021-02.parquet")
chess21_03= spark.read.parquet("/data/doina/Lichess/lichess_db_standard_rated_2021-03.parquet")
chess21_04= spark.read.parquet("/data/doina/Lichess/lichess_db_standard_rated_2021-04.parquet")
chess21_05= spark.read.parquet("/data/doina/Lichess/lichess_db_standard_rated_2021-05.parquet")
chess21_06= spark.read.parquet("/data/doina/Lichess/lichess_db_standard_rated_2021-06.parquet")
chess21_07= spark.read.parquet("/data/doina/Lichess/lichess_db_standard_rated_2021-07.parquet")
chess21_08= spark.read.parquet("/data/doina/Lichess/lichess_db_standard_rated_2021-08.parquet")
chess21_09= spark.read.parquet("/data/doina/Lichess/lichess_db_standard_rated_2021-09.parquet")
chess21_10= spark.read.parquet("/data/doina/Lichess/lichess_db_standard_rated_2021-10.parquet")
chess21_11= spark.read.parquet("/data/doina/Lichess/lichess_db_standard_rated_2021-11.parquet")
chess21_12= spark.read.parquet("/data/doina/Lichess/lichess_db_standard_rated_2021-12.parquet")


chess21_ = [
     chess21_01,
     chess21_02,
     chess21_03,
     chess21_04,
     chess21_05,
     chess21_06,
     chess21_07,
     chess21_08,
     chess21_09,
     chess21_10,
     chess21_11,
     chess21_12
]

chess21 = reduce(lambda a,b: a.unionByName(b), chess21_)
df21 = createDF("2021", cleanData(chess21))

#2020
chess20_01= spark.read.parquet("/data/doina/Lichess/lichess_db_standard_rated_2020-01.parquet")
chess20_02= spark.read.parquet("/data/doina/Lichess/lichess_db_standard_rated_2020-02.parquet")
chess20_03= spark.read.parquet("/data/doina/Lichess/lichess_db_standard_rated_2020-03.parquet")
chess20_04= spark.read.parquet("/data/doina/Lichess/lichess_db_standard_rated_2020-04.parquet")
chess20_05= spark.read.parquet("/data/doina/Lichess/lichess_db_standard_rated_2020-05.parquet")
chess20_06= spark.read.parquet("/data/doina/Lichess/lichess_db_standard_rated_2020-06.parquet")
chess20_07= spark.read.parquet("/data/doina/Lichess/lichess_db_standard_rated_2020-07.parquet")
chess20_08= spark.read.parquet("/data/doina/Lichess/lichess_db_standard_rated_2020-08.parquet")
chess20_09= spark.read.parquet("/data/doina/Lichess/lichess_db_standard_rated_2020-09.parquet")
chess20_10= spark.read.parquet("/data/doina/Lichess/lichess_db_standard_rated_2020-10.parquet")
chess20_11= spark.read.parquet("/data/doina/Lichess/lichess_db_standard_rated_2020-11.parquet")
chess20_12= spark.read.parquet("/data/doina/Lichess/lichess_db_standard_rated_2020-12.parquet")


chess20_ = [
     chess20_01,
     chess20_02,
     chess20_03,
     chess20_04,
     chess20_05,
     chess20_06,
     chess20_07,
     chess20_08,
     chess20_09,
     chess20_10,
     chess20_11,
     chess20_12
]

chess20 = reduce(lambda a,b: a.unionByName(b), chess20_)
df20 = createDF("2020", cleanData(chess20))

#2019
chess19_01= spark.read.parquet("/data/doina/Lichess/lichess_db_standard_rated_2019-01.parquet")
chess19_02= spark.read.parquet("/data/doina/Lichess/lichess_db_standard_rated_2019-02.parquet")
chess19_03= spark.read.parquet("/data/doina/Lichess/lichess_db_standard_rated_2019-03.parquet")
chess19_04= spark.read.parquet("/data/doina/Lichess/lichess_db_standard_rated_2019-04.parquet")
chess19_05= spark.read.parquet("/data/doina/Lichess/lichess_db_standard_rated_2019-05.parquet")
chess19_06= spark.read.parquet("/data/doina/Lichess/lichess_db_standard_rated_2019-06.parquet")
chess19_07= spark.read.parquet("/data/doina/Lichess/lichess_db_standard_rated_2019-07.parquet")
chess19_08= spark.read.parquet("/data/doina/Lichess/lichess_db_standard_rated_2019-08.parquet")
chess19_09= spark.read.parquet("/data/doina/Lichess/lichess_db_standard_rated_2019-09.parquet")
chess19_10= spark.read.parquet("/data/doina/Lichess/lichess_db_standard_rated_2019-10.parquet")
chess19_11= spark.read.parquet("/data/doina/Lichess/lichess_db_standard_rated_2019-11.parquet")
chess19_12= spark.read.parquet("/data/doina/Lichess/lichess_db_standard_rated_2019-12.parquet")


chess19_ = [
     chess19_01,
     chess19_02,
     chess19_03,
     chess19_04,
     chess19_05,
     chess19_06,
     chess19_07,
     chess19_08,
     chess19_09,
     chess19_10,
     chess19_11,
     chess19_12
]

chess19 = reduce(lambda a,b: a.unionByName(b), chess19_)
df19 = createDF("2019", cleanData(chess19))

#2018
chess18_01= spark.read.parquet("/data/doina/Lichess/lichess_db_standard_rated_2018-01.parquet")
chess18_02= spark.read.parquet("/data/doina/Lichess/lichess_db_standard_rated_2018-02.parquet")
chess18_03= spark.read.parquet("/data/doina/Lichess/lichess_db_standard_rated_2018-03.parquet")
chess18_04= spark.read.parquet("/data/doina/Lichess/lichess_db_standard_rated_2018-04.parquet")
chess18_05= spark.read.parquet("/data/doina/Lichess/lichess_db_standard_rated_2018-05.parquet")
chess18_06= spark.read.parquet("/data/doina/Lichess/lichess_db_standard_rated_2018-06.parquet")
chess18_07= spark.read.parquet("/data/doina/Lichess/lichess_db_standard_rated_2018-07.parquet")
chess18_08= spark.read.parquet("/data/doina/Lichess/lichess_db_standard_rated_2018-08.parquet")
chess18_09= spark.read.parquet("/data/doina/Lichess/lichess_db_standard_rated_2018-09.parquet")
chess18_10= spark.read.parquet("/data/doina/Lichess/lichess_db_standard_rated_2018-10.parquet")
chess18_11= spark.read.parquet("/data/doina/Lichess/lichess_db_standard_rated_2018-11.parquet")
chess18_12= spark.read.parquet("/data/doina/Lichess/lichess_db_standard_rated_2018-12.parquet")


chess18_ = [
     chess18_01,
     chess18_02,
     chess18_03,
     chess18_04,
     chess18_05,
     chess18_06,
     chess18_07,
     chess18_08,
     chess18_09,
     chess18_10,
     chess18_11,
     chess18_12
]

chess18 = reduce(lambda a,b: a.unionByName(b), chess18_)
df18 = createDF("2018", cleanData(chess18))

#2017
chess17_01= spark.read.parquet("/data/doina/Lichess/lichess_db_standard_rated_2017-01.parquet")
chess17_02= spark.read.parquet("/data/doina/Lichess/lichess_db_standard_rated_2017-02.parquet")
chess17_03= spark.read.parquet("/data/doina/Lichess/lichess_db_standard_rated_2017-03.parquet")
chess17_04= spark.read.parquet("/data/doina/Lichess/lichess_db_standard_rated_2017-04.parquet")
chess17_05= spark.read.parquet("/data/doina/Lichess/lichess_db_standard_rated_2017-05.parquet")
chess17_06= spark.read.parquet("/data/doina/Lichess/lichess_db_standard_rated_2017-06.parquet")
chess17_07= spark.read.parquet("/data/doina/Lichess/lichess_db_standard_rated_2017-07.parquet")
chess17_08= spark.read.parquet("/data/doina/Lichess/lichess_db_standard_rated_2017-08.parquet")
chess17_09= spark.read.parquet("/data/doina/Lichess/lichess_db_standard_rated_2017-09.parquet")
chess17_10= spark.read.parquet("/data/doina/Lichess/lichess_db_standard_rated_2017-10.parquet")
chess17_11= spark.read.parquet("/data/doina/Lichess/lichess_db_standard_rated_2017-11.parquet")
chess17_12= spark.read.parquet("/data/doina/Lichess/lichess_db_standard_rated_2017-12.parquet")


chess17_ = [
     chess17_01,
     chess17_02,
     chess17_03,
     chess17_04,
     chess17_05,
     chess17_06,
     chess17_07,
     chess17_08,
     chess17_09,
     chess17_10,
     chess17_11,
     chess17_12
]

chess17 = reduce(lambda a,b: a.unionByName(b), chess17_)
df17 = createDF("2017", cleanData(chess17))


dfs = [
    df17,
    df18,
    df19,
    df20,
    df21,
    df22,
    df23
]


chess = reduce(lambda a, b: a.unionByName(b), dfs)
chess = chess.orderBy("Date", "PlayerElo", "TimeControlLevel")
chess.coalesce(1).write.mode("overwrite").option("header", True).csv("Chess_Blunders_and_Time_year.csv")



