In [1]:
# Import Libraries #
import findspark
# findspark.find()
findspark.init()
from pyspark.sql.functions import split, col, array_contains, translate, round, size, when, udf, lit, mean, count, format_number, collect_list, expr
from pyspark.sql.types import TimestampType, MapType, IntegerType, StringType, ArrayType, FloatType, StructField, StructType
from pyspark.sql import SparkSession
from helper import *

In [2]:
spark = SparkSession.builder.appName('test').master("yarn") \
.config("spark.executor.instances", 9) \
.config("spark.executor.memory", "1G")  \
.getOrCreate()

2023-04-26 21:37:41,676 INFO spark.SparkContext: Running Spark version 3.3.2
2023-04-26 21:37:42,132 INFO resource.ResourceUtils: No custom resources configured for spark.driver.
2023-04-26 21:37:42,133 INFO spark.SparkContext: Submitted application: test
2023-04-26 21:37:42,177 INFO resource.ResourceProfile: Default ResourceProfile created, executor resources: Map(cores -> name: cores, amount: 1, script: , vendor: , memory -> name: memory, amount: 1024, script: , vendor: , offHeap -> name: offHeap, amount: 0, script: , vendor: ), task resources: Map(cpus -> name: cpus, amount: 1.0)
2023-04-26 21:37:42,209 INFO resource.ResourceProfile: Limiting resource is cpus at 1 tasks per executor
2023-04-26 21:37:42,217 INFO resource.ResourceProfileManager: Added ResourceProfile id: 0
2023-04-26 21:37:42,314 INFO spark.SecurityManager: Changing view acls to: ubuntu
2023-04-26 21:37:42,317 INFO spark.SecurityManager: Changing modify acls to: ubuntu
2023-04-26 21:37:42,318 INFO spark.SecurityManage

In [3]:
## MUTE OUTPUT FROM SPARK
logger = spark._jvm.org.apache.log4j
logger.LogManager.getLogger("org").setLevel(logger.Level.OFF)
logger.LogManager.getLogger("akka").setLevel(logger.Level.OFF)
spark.conf.set("spark.driver.log.level", "OFF")

In [4]:
spark

In [5]:
#Event, White, Black, Result, WhiteElo, BlackElo, Opening, TimeControl, Termination, Moves,Eval, UTCTimestamp
schema = StructType([ \
    StructField("Event",StringType(),True), \
    StructField("White",StringType(),True), \
    StructField("Black",StringType(),True), \
    StructField("Result", StringType(), True), \
    StructField("WhiteElo", IntegerType(), True), \
    StructField("BlackElo", IntegerType(), True), \
    StructField("Opening",StringType(),True), \
    StructField("TimeControl",StringType(),True), \
    StructField("Termination",StringType(),True), \
    StructField("Moves", StringType(), True), \
    StructField("Eval", StringType(), True), \
    StructField("UTCTimestamp", TimestampType(), True) \
  ])
df = spark.read.csv("hdfs://namenode:9000/chess_2016_dataset/output/part*", schema=schema).cache()

                                                                                

##### Check Shape of Data

In [6]:
print("shape: ", (df.count(), len(df.columns)))

                                                                                

shape:  (4871421, 12)


##### Convert columns to appropriate types

In [7]:
df = convert_types(df)
df.dtypes

[('Event', 'string'),
 ('White', 'string'),
 ('Black', 'string'),
 ('Result', 'string'),
 ('WhiteElo', 'int'),
 ('BlackElo', 'int'),
 ('Opening', 'string'),
 ('TimeControl', 'string'),
 ('Termination', 'string'),
 ('Moves', 'array<string>'),
 ('Eval', 'array<float>'),
 ('UTCTimestamp', 'timestamp')]

### Find Eval Games and add blunders column for black and white.

In [8]:
eval_difference = 3.0
eval_games = df.where(col("Eval")[0].isNotNull())
eval_games = eval_games.withColumn("WhiteBlunders", (find_white_blunders(col("Eval"), lit(eval_difference)))).cache()
eval_games = eval_games.withColumn("BlackBlunders", (find_black_blunders(col("Eval"), lit(eval_difference))))
eval_games.count()



In [None]:
eval_games.select("TimeControl", "White", "WhiteElo", "WhiteBlunders", "Black", "BlackElo", "BlackBlunders", "Result", "Termination") \
    .orderBy(col("WhiteBlunders").desc(), col("BlackBlunders").desc()).limit(10).toPandas().head(10)

In [None]:
df.select("TimeControl", "White", "WhiteElo", "Black", "BlackElo", "Result", "Termination") \
    .orderBy(col("White").desc()).limit(10).toPandas().head(10)

##### Plot Most Blundered Game

In [None]:
plot_eval_game_optimized(eval_games=eval_games)


In [None]:
# plot_eval_game(eval_games=eval_games) # garbo shitballz

### Group By
- Time control ~ (60, 120, 180, 600) etc...
- Elo-Brackets ~ ([1200, 1400], [1500, 1700], [2000-2200]) etc...

### Time Control Grouping

In [None]:
time_control_blunders_averages = eval_games \
    .groupBy("TimeControl") \
    .agg(mean("WhiteBlunders").alias("avg(WhiteBlunders)"), \
        mean("BlackBlunders").alias("avg(BlackBlunders)"), \
        mean("BlackElo").alias("avg(BlackElo)"), \
        mean("WhiteElo").alias("avg(WhiteElo)"), \
        count("TimeControl").alias("count(TimeControl)")) \
    .withColumn("avg(WhiteBlunders)", format_number("avg(WhiteBlunders)", 1)) \
    .withColumn("avg(BlackBlunders)", format_number("avg(BlackBlunders)", 1)) \
    .withColumn("avg(WhiteElo)", format_number("avg(WhiteElo)", 0)) \
    .withColumn("avg(BlackElo)", format_number("avg(BlackElo)", 0)) 

In [None]:
time_control_blunders_averages \
    .orderBy(col("avg(WhiteBlunders)").desc(), col("avg(BlackBlunders)").desc()) \
    .where(col("count(TimeControl)")>10000) \
    .limit(10) \
    .toPandas() \
    .head(10)

### Elo Brackets Grouping

Start off by Creating a new spark daraframe column called "EloBracket" which we will later use to group and aggregrate by. When grouping the players by elo brackets we want to use a range that makes sense such that there are not 1 bracket that contains 80% of the playerbase and ones that only contain a small fraction. E.g We want evenly distributed amount of players in each bracket (as far as that is possible).

In [None]:
# Start by getting all the elo column values in the dataframe.
elo_list = eval_games.select(collect_list("WhiteElo")).first()[0]
sns.set_theme(style="ticks")
sns.set_style('darkgrid')
sns.distplot(elo_list, kde=True, color ='green', bins=20)

"EloBracket" column should be of type String and contain values in format: "0-1200", "1200-1600", "1600-2000", "2000-3000"

In [None]:
eval_games = eval_games.withColumn("EloBracket", \
                         when((0 < eval_games.WhiteElo) & (eval_games.WhiteElo < 1500), lit("<1500")) \
                        .when((1500 <= eval_games.WhiteElo) & (eval_games.WhiteElo <= 1750), lit("1500-1750")) \
                        .when((1750 < eval_games.WhiteElo) & (eval_games.WhiteElo <= 2000), lit("1751-2000")) \
                        .otherwise(lit(">2000")))

In [None]:
eval_games.show()

### ELO and Blunders

In [None]:
elo_bracket_white_blunders_averages = eval_games \
    .groupBy("EloBracket") \
    .agg(mean("WhiteBlunders"), count("EloBracket")) \
    .withColumn("avg(WhiteBlunders)", format_number("avg(WhiteBlunders)", 1))

elo_bracket_black_blunders_averages = eval_games \
    .groupBy("EloBracket") \
    .agg(mean("BlackBlunders"), count("EloBracket")) \
    .withColumn("avg(BlackBlunders)", format_number("avg(BlackBlunders)", 1))

In [None]:
elo_bracket_white_blunders_averages.orderBy(col("avg(WhiteBlunders)").desc()).limit(10).toPandas().head()

In [None]:
elo_bracket_black_blunders_averages.orderBy(col("avg(BlackBlunders)").desc()).limit(10).toPandas().head()

## Joining two tables to test optimization 

In [None]:
games_eval = df.join(eval_games, \
                [df.White == eval_games.White, \
                df.UTCTimestamp == eval_games.UTCTimestamp ]\
                ,"outer" ) 
                

In [None]:
games_eval.show()

#### With hash-broadcast

In [None]:
#Enable broadcast Join and 
#Set Threshold limit of size in bytes of a DataFrame to broadcast
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", 104857600)

#Disable broadcast Join
# spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)

In [None]:
# Perform broadast join
from pyspark.sql.functions import broadcast, col

games_eval= df.join(
  broadcast(eval_games),
  ["White", "UTCTimestamp"],
  "outer"
).show()

  # eval_games("code")==df("UNIT")

In [None]:
# games_eval \
#     .orderBy(col("count(TimeControl)").desc()) \
#     .limit(10) \
#     .toPandas() \
#     .head(10)