In [1]:
from pyspark.sql import SparkSession

# New API
spark_session = SparkSession.builder\
        .master("spark://192.168.2.133:7077") \
        .appName("PART_AB_DanielCeoca")\
        .config("spark.dynamicAllocation.enabled", True)\
        .config("spark.dynamicAllocation.shuffleTracking.enabled",True)\
        .config("spark.shuffle.service.enabled", False)\
        .config("spark.dynamicAllocation.executorIdleTimeout","30s")\
        .config("spark.executor.cores", 4)\
        .config("spark.driver.port",9999)\
        .config("spark.blockManager.port",10005)\
        .getOrCreate()

# Old API (RDD)
spark_context = spark_session.sparkContext

spark_context.setLogLevel("WARN")

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/03/15 13:38:08 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [2]:
df = spark_session.read.json('hdfs://192.168.2.133:9000/user/ubuntu/yasp-chunks_test_output/*')

24/03/15 13:38:28 WARN TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient resources
                                                                                

In [3]:
from pyspark.sql.functions import col, count, when

# Calclulate the times of wins each team
radiant_wins = df.select(col("radiant_win")).where(col("radiant_win")).count()
dire_wins = df.select(col("radiant_win")).where(~col("radiant_win")).count()

# Victory percentage
total_games = df.count()
radiant_win_rate = radiant_wins / total_games
dire_win_rate = dire_wins / total_games


win_rate_diff = abs(radiant_win_rate - dire_win_rate) * 100


print(f"Radiant win rate: {radiant_win_rate:.2%}")
print(f"Dire win rate: {dire_win_rate:.2%}")
print(f"Winning percentage gap: {win_rate_diff:.2f}%")




Radiant win rate: 49.38%
Dire win rate: 46.63%
Winning percentage gap: 2.75%


                                                                                

In [4]:
from pyspark.sql.functions import col, when, size
#confirm
df = df.withColumn("has_10_minutes", size(col("radiant_gold_adv")) >= 10)

# Filter
df = df.filter(col("has_10_minutes"))

# Extract
df = df.withColumn("radiant_gold_adv_10min", col("radiant_gold_adv")[9]) \
       .withColumn("radiant_xp_adv_10min", col("radiant_xp_adv")[9])

radiant_gold_lead = df.filter(col("radiant_gold_adv_10min") > 0)
dire_gold_lead = df.filter(col("radiant_gold_adv_10min") < 0)

radiant_xp_lead = df.filter(col("radiant_xp_adv_10min") > 0)
dire_xp_lead = df.filter(col("radiant_xp_adv_10min") < 0)

# Calculate win rates
radiant_gold_lead_win_rate = radiant_gold_lead.filter(col("radiant_win") == True).count() / radiant_gold_lead.count()
dire_gold_lead_win_rate = dire_gold_lead.filter(col("radiant_win") == False).count() / dire_gold_lead.count()

radiant_xp_lead_win_rate = radiant_xp_lead.filter(col("radiant_win") == True).count() / radiant_xp_lead.count()
dire_xp_lead_win_rate = dire_xp_lead.filter(col("radiant_win") == False).count() / dire_xp_lead.count()

print(f"Radiant 10-minute gold lead win rate: {radiant_gold_lead_win_rate * 100:.2f}%")
print(f"Dire 10-minute gold lead win rate: {dire_gold_lead_win_rate * 100:.2f}%")
print(f"Radiant 10-minute XP lead win rate: {radiant_xp_lead_win_rate * 100:.2f}%")
print(f"Dire 10-minute XP lead win rate: {dire_xp_lead_win_rate * 100:.2f}%")




Radiant 10-minute gold lead win rate: 65.16%
Dire 10-minute gold lead win rate: 62.75%
Radiant 10-minute XP lead win rate: 63.87%
Dire 10-minute XP lead win rate: 61.01%


                                                                                

In [5]:
from pyspark.sql.functions import explode, col, when, avg

# Expand the players array and calculate the KDA for each player
df_players = df.withColumn("player", explode(col("players"))).\
    withColumn("KDA", 
               (col("player.kills") + col("player.assists")) / 
               when(col("player.deaths") == 0, 1).otherwise(col("player.deaths"))).\
    select(col("match_id"), col("player.account_id"), col("KDA"))

# Average KDA per game
df_avg_kda_per_match = df_players.groupBy("match_id").agg(avg("KDA").alias("avg_KDA"))


highest_avg_kda = df_avg_kda_per_match.orderBy(col("avg_KDA").desc()).first()
lowest_avg_kda = df_avg_kda_per_match.orderBy(col("avg_KDA").asc()).first()


print(f"The Match id with the highest KDA {highest_avg_kda['match_id']}, Average KDA: {highest_avg_kda['avg_KDA']}")
print(f"The Match id with the lowest KDA: {lowest_avg_kda['match_id']}, Average KDA: {lowest_avg_kda['avg_KDA']}")

AnalysisException: [FIELD_NOT_FOUND] No such struct field `assists` in `account_id`, `assists, `, `deaths`, `kills`.

In [None]:
from pyspark.sql.functions import explode, sum as spark_sum

df_exploded = df.withColumn("players", explode("players"))

df_kills = df_exploded.groupBy("match_id").agg(spark_sum("players.kills").alias("total_kills"))

# Find the match with the highest total kills
highest_kills_match = df_kills.orderBy("total_kills", ascending=False).first()

highest_kills_match_id = highest_kills_match["match_id"]
highest_kills = highest_kills_match["total_kills"]

print(f"Match with the highest total kills: {highest_kills_match_id}, Total kills: {highest_kills}")




In [7]:
from pyspark.sql.functions import col
longest_matches = df.orderBy(col("duration").desc()).select("match_id", "duration").limit(5)

shortest_matches = df.where(col("duration") > 0).orderBy("duration").select("match_id", "duration").limit(5)

print("Five Longest Matches:")
longest_matches.show()

print("Five Shortest Matches:")
shortest_matches.show()

24/03/15 13:47:09 WARN HeartbeatReceiver: Removing executor 2 with no recent heartbeats: 138510 ms exceeds timeout 120000 ms
24/03/15 13:47:09 WARN HeartbeatReceiver: Removing executor 0 with no recent heartbeats: 140444 ms exceeds timeout 120000 ms
24/03/15 13:47:09 ERROR TaskSchedulerImpl: Lost executor 2 on 192.168.2.165: Executor heartbeat timed out after 138510 ms
24/03/15 13:47:09 WARN TaskSetManager: Lost task 22.0 in stage 39.0 (TID 1510) (192.168.2.165 executor 2): ExecutorLostFailure (executor 2 exited caused by one of the running tasks) Reason: Executor heartbeat timed out after 138510 ms
24/03/15 13:47:09 WARN TaskSetManager: Lost task 30.0 in stage 39.0 (TID 1518) (192.168.2.165 executor 2): ExecutorLostFailure (executor 2 exited caused by one of the running tasks) Reason: Executor heartbeat timed out after 138510 ms
24/03/15 13:47:09 WARN TaskSetManager: Lost task 6.0 in stage 39.0 (TID 1494) (192.168.2.165 executor 2): ExecutorLostFailure (executor 2 exited caused by one

+----------+--------+
|  match_id|duration|
+----------+--------+
|1999816459|    7109|
|1999966671|    6366|
|2000865722|    6358|
|2000816501|    5924|
|2001015774|    5754|
+----------+--------+

Five Shortest Matches:




+----------+--------+
|  match_id|duration|
+----------+--------+
|2001181449|     277|
|1999937575|     324|
|2000823661|     330|
|2000434863|     332|
|1999268345|     335|
+----------+--------+



                                                                                

In [8]:
spark_context.stop()