# Get winrate of each champion

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import explode, col

In [2]:
spark = SparkSession.builder \
    .master("spark://10.90.254.80:7077") \
    .appName("MySparkApp") \
    .getOrCreate()

In [3]:
file_path = "D:/BigData/Data/MatchDetail/*.json"
df = spark.read.json(file_path)

In [None]:
df.printSchema()

In [5]:
df.count()

127546

In [6]:
df.show()

+--------------------+--------------------+
|                info|            metadata|
+--------------------+--------------------+
|{1702218495729, 1...|{2, VN2_263621944...|
|{1702214839655, 1...|{2, VN2_263529508...|
|{1702213456722, 1...|{2, VN2_263503073...|
|{1702212129650, 1...|{2, VN2_263457862...|
|{1702210448240, 1...|{2, VN2_263425557...|
|{1702208898115, 1...|{2, VN2_263406060...|
|{1702204199914, 1...|{2, VN2_263311776...|
|{1702141622861, 9...|{2, VN2_262473336...|
|{1702132388766, 1...|{2, VN2_262242252...|
|{1702130960818, 1...|{2, VN2_262199389...|
|{1702222055120, 2...|{2, VN2_263714400...|
|{1702219444484, 1...|{2, VN2_263646609...|
|{1702199204289, 9...|{2, VN2_263218782...|
|{1702198173404, 8...|{2, VN2_263197676...|
|{1702191634109, 1...|{2, VN2_263044875...|
|{1702188564973, 1...|{2, VN2_262988380...|
|{1702185444896, 1...|{2, VN2_262925166...|
|{1702184466858, 1...|{2, VN2_262906009...|
|{1702183314969, 9...|{2, VN2_262877464...|
|{1702182090806, 9...|{2, VN2_26

In [7]:
classic_df= df.select(
    explode("info.participants").alias("participant"),
    "info.gameId",
    "info.gameMode",
    "info.gameVersion").where("gameMode = 'CLASSIC'")

In [8]:
classic_df.count()

908620

In [9]:
classic_df = classic_df.dropDuplicates()

In [10]:
classic_df.count()

847300

In [11]:
classic_df.show()

+--------------------+---------+--------+--------------+
|         participant|   gameId|gameMode|   gameVersion|
+--------------------+---------+--------+--------------+
|{0, 4, 5, 0, 0, 0...|263378474| CLASSIC|13.24.547.9214|
|{1, 0, 1, 0, 0, 0...|263352246| CLASSIC|13.24.547.9214|
|{0, 0, 17, 0, 0, ...|263030128| CLASSIC|13.24.547.9214|
|{0, 1, 3, 0, 0, 0...|263776021| CLASSIC|13.24.547.9214|
|{0, 0, 5, 0, 0, 0...|263574301| CLASSIC|13.24.547.9214|
|{0, 0, 0, 0, 0, 0...|262797486| CLASSIC|13.24.547.9214|
|{2, 1, 7, 0, 0, 0...|248821392| CLASSIC|13.23.544.5515|
|{0, 0, 7, 0, 1, 0...|249950577| CLASSIC|13.23.544.5515|
|{1, 1, 5, 0, 0, 0...|260411028| CLASSIC|13.24.547.9214|
|{0, 0, 5, 0, 0, 0...|263225375| CLASSIC|13.24.547.9214|
|{0, 0, 5, 0, 0, 0...|263186247| CLASSIC|13.24.547.9214|
|{1, 3, 4, 0, 0, 0...|263144186| CLASSIC|13.24.547.9214|
|{3, 5, 7, 0, 0, 0...|263483558| CLASSIC|13.24.547.9214|
|{0, 1, 5, 0, 0, 0...|260967025| CLASSIC|13.24.547.9214|
|{0, 1, 14, 0, 0, ...|260876398

In [12]:
match_df = classic_df.select(
    "gameId",
    "participant.riotIdGameName",
    "participant.championId",
    "participant.championName",
    "participant.teamId",
    "participant.teamPosition",
    "participant.win"
)
match_df = match_df.withColumn("win", col("win").cast("integer"))

In [13]:
match_df.show()

+---------+----------------+----------+------------+------+------------+---+
|   gameId|  riotIdGameName|championId|championName|teamId|teamPosition|win|
+---------+----------------+----------+------------+------+------------+---+
|263378474|          Mập Bư|        41|   Gangplank|   100|         TOP|  0|
|263352246|         Tiến Vy|        39|      Irelia|   200|         TOP|  0|
|263030128| Origy Lissandra|       111|    Nautilus|   100|     UTILITY|  1|
|263776021|            Ted1|       498|       Xayah|   100|      BOTTOM|  0|
|263574301|            Yuta|        84|       Akali|   100|      MIDDLE|  0|
|262797486|Là bạn K thể yêu|        24|         Jax|   200|         TOP|  1|
|248821392| ko biết đi rừng|        60|       Elise|   200|     UTILITY|  0|
|249950577|    Kh47MKinzhal|       145|       Kaisa|   100|      BOTTOM|  0|
|260411028|         pe Buoi|        24|         Jax|   100|         TOP|  0|
|263225375|   Yobexa Darius|       122|      Darius|   100|         TOP|  1|

In [14]:
from pyspark.sql.functions import col, count, sum
win_rate_df = match_df.groupBy("championName") \
    .agg(
        sum("win").alias("totalWins"), 
        count("win").alias("totalGames")
    ) \
    .withColumn("winRate", col("totalWins") / col("totalGames")) \
    .orderBy("championName")

In [15]:

# csv_path = "D:/BigData/Data/result/win_rate.csv"
# win_rate_df.coalesce(1).write.csv(csv_path)

In [16]:
win_rate_df.show(200)

+------------+---------+----------+-------------------+
|championName|totalWins|totalGames|            winRate|
+------------+---------+----------+-------------------+
|      Aatrox|     9992|     19579| 0.5103427141324889|
|        Ahri|     2895|      5879| 0.4924306854907297|
|       Akali|     5264|     11031| 0.4772006164445653|
|      Akshan|      548|      1053| 0.5204178537511871|
|     Alistar|     1944|      4033|0.48202330771138113|
|       Amumu|      622|      1265|  0.491699604743083|
|      Anivia|      815|      1563| 0.5214331413947537|
|       Annie|      731|      1368| 0.5343567251461988|
|    Aphelios|     2208|      4773|0.46260213702074165|
|        Ashe|     5599|     11331| 0.4941311446474274|
| AurelionSol|      721|      1374|  0.524745269286754|
|        Azir|     2128|      4552|0.46748681898066785|
|        Bard|     1062|      2182| 0.4867094408799267|
|     Belveth|     1256|      2475| 0.5074747474747475|
|  Blitzcrank|     3573|      7060| 0.5060906515

# GET THE WIN RATE OF MATCHUP

In [17]:
match_df.createOrReplaceTempView("match")
join_query = """
SELECT m1.gameId, m1.championName as champion1, m2.championName as champion2,m1.teamPosition, m2.teamPosition,m1.teamId as team1, m2.teamId as team2, m1.win
FROM match m1, match m2
WHERE m1.gameId = m2.gameId 
    AND m1.teamPosition = m2.teamPosition
    AND m1.teamId != m2.teamId
"""
matchup_each_game_df = spark.sql(join_query)

In [18]:
matchup_each_game_df.show()

+--------+-----------+-----------+------------+------------+-----+-----+---+
|  gameId|  champion1|  champion2|teamPosition|teamPosition|team1|team2|win|
+--------+-----------+-----------+------------+------------+-----+-----+---+
|48512747|     Lucian|MissFortune|      BOTTOM|      BOTTOM|  200|  100|  0|
|48512747|MissFortune|     Lucian|      BOTTOM|      BOTTOM|  100|  200|  1|
|48512747|   Nocturne|       Kayn|      JUNGLE|      JUNGLE|  100|  200|  1|
|48512747|       Kayn|   Nocturne|      JUNGLE|      JUNGLE|  200|  100|  0|
|48512747|      Sylas|     Gragas|      MIDDLE|      MIDDLE|  100|  200|  1|
|48512747|     Gragas|      Sylas|      MIDDLE|      MIDDLE|  200|  100|  0|
|48531466|       Zeri|       Jinx|      BOTTOM|      BOTTOM|  100|  200|  0|
|48531466|       Jinx|       Zeri|      BOTTOM|      BOTTOM|  200|  100|  1|
|48531466|      Shaco|      Diana|      JUNGLE|      JUNGLE|  100|  200|  0|
|48531466|      Diana|      Shaco|      JUNGLE|      JUNGLE|  200|  100|  1|

In [19]:
matchup_df = matchup_each_game_df.groupBy('champion1', 'champion2').agg(
    sum('win').alias('total_wins'),
    count('gameId').alias('total_games')
).withColumn('champion1_win_rate', col('total_wins') / col('total_games')).orderBy('champion1', 'champion2')

In [20]:
from pyspark.sql.window import Window
from pyspark.sql import functions as F
matchups_over_50_games = matchup_df.filter(F.col('total_games') > 50)
windowSpec = Window.partitionBy('champion1').orderBy('champion1_win_rate')
ranked_matchups = matchups_over_50_games.withColumn("rank", F.row_number().over(windowSpec))
worst_matchups = ranked_matchups.filter(F.col("rank") <= 5)

In [21]:
windowSpec = Window.partitionBy('champion1').orderBy(col('champion1_win_rate').desc())
ranked_matchups = matchups_over_50_games.withColumn("rank", F.row_number().over(windowSpec))
best_matchups = ranked_matchups.filter(F.col("rank") <= 5)

In [22]:
# csv_path = "D:/BigData/Data/result/best_matchups.csv"
# best_matchups.coalesce(1).write.csv(csv_path)

In [23]:
# csv_path = "D:/BigData/Data/result/worst_matchups.csv"
# worst_matchups.coalesce(1).write.csv(csv_path)

In [24]:
worst_matchups.show()

+---------+---------+----------+-----------+-------------------+----+
|champion1|champion2|total_wins|total_games| champion1_win_rate|rank|
+---------+---------+----------+-----------+-------------------+----+
|   Aatrox|     Kled|        51|        131| 0.3893129770992366|   1|
|   Aatrox|    Kayle|        55|        141| 0.3900709219858156|   2|
|   Aatrox|   Gragas|        29|         68| 0.4264705882352941|   3|
|   Aatrox|    Riven|       164|        368|0.44565217391304346|   4|
|   Aatrox|     Olaf|        63|        141|0.44680851063829785|   5|
|     Ahri| Kassadin|        25|         65|0.38461538461538464|   1|
|     Ahri|   Veigar|        50|        126| 0.3968253968253968|   2|
|     Ahri| Tristana|        36|         86| 0.4186046511627907|   3|
|     Ahri|  Orianna|       139|        332| 0.4186746987951807|   4|
|     Ahri|    Yasuo|       126|        285| 0.4421052631578947|   5|
|    Akali|TahmKench|        12|         51|0.23529411764705882|   1|
|    Akali|      Vex

In [25]:
best_matchups.show()

+---------+---------+----------+-----------+-------------------+----+
|champion1|champion2|total_wins|total_games| champion1_win_rate|rank|
+---------+---------+----------+-----------+-------------------+----+
|   Aatrox|    Quinn|        36|         51| 0.7058823529411765|   1|
|   Aatrox|     Sion|       125|        216| 0.5787037037037037|   2|
|   Aatrox|  Chogath|        69|        120|              0.575|   3|
|   Aatrox|  DrMundo|        31|         54| 0.5740740740740741|   4|
|   Aatrox|   Rumble|       159|        278| 0.5719424460431655|   5|
|     Ahri|     Hwei|       146|        218| 0.6697247706422018|   1|
|     Ahri|      Vex|        41|         68| 0.6029411764705882|   2|
|     Ahri|     Ekko|        44|         73| 0.6027397260273972|   3|
|     Ahri|  Taliyah|        37|         62| 0.5967741935483871|   4|
|     Ahri|     Fizz|       105|        188| 0.5585106382978723|   5|
|    Akali|    Nasus|        77|        122| 0.6311475409836066|   1|
|    Akali|     Hwei

# Calculate perks, summonerSpell and item for each champion

In [26]:
from pyspark.sql.functions import explode, col, when
champion_perks = classic_df.select(
    explode("participant.perks.styles").alias("styles"),
    "participant.perks",
    "participant.championName",
    "participant.win",
    "gameId")

In [27]:
champion_perks_2 = champion_perks.select(
    col("gameId"),
    col("championName").alias("champ"),
    col("win").alias("w"),
    col("perks.statPerks.defense").alias("r1"),
    col("perks.statPerks.flex").alias("r2"),
    col("perks.statPerks.offense").alias("r3"),
    when(col("styles.description") == "primaryStyle", col("styles.style")).alias("primary"),
    when(col("styles.description") == "subStyle", col("styles.style")).alias("sub"),
    explode("styles.selections.perk").alias("perk"))

In [28]:
champion_perks_2.show()

+---------+-------+-----+----+----+----+-------+----+----+
|   gameId|  champ|    w|  r1|  r2|  r3|primary| sub|perk|
+---------+-------+-----+----+----+----+-------+----+----+
|262459713|  Akali| true|5001|5008|5008|   8000|NULL|8010|
|262459713|  Akali| true|5001|5008|5008|   8000|NULL|8009|
|262459713|  Akali| true|5001|5008|5008|   8000|NULL|9105|
|262459713|  Akali| true|5001|5008|5008|   8000|NULL|8014|
|262459713|  Akali| true|5001|5008|5008|   NULL|8400|8429|
|262459713|  Akali| true|5001|5008|5008|   NULL|8400|8451|
|261253487|Leblanc|false|5002|5002|5007|   8100|NULL|8112|
|261253487|Leblanc|false|5002|5002|5007|   8100|NULL|8143|
|261253487|Leblanc|false|5002|5002|5007|   8100|NULL|8138|
|261253487|Leblanc|false|5002|5002|5007|   8100|NULL|8106|
|261253487|Leblanc|false|5002|5002|5007|   NULL|8200|8226|
|261253487|Leblanc|false|5002|5002|5007|   NULL|8200|8237|
|257695691|   Yone|false|5002|5008|5005|   8000|NULL|8008|
|257695691|   Yone|false|5002|5008|5005|   8000|NULL|911

In [29]:
from pyspark.sql.functions import collect_list
champion_perks_3 = champion_perks_2.groupBy("gameId", "champ", "w", "r1", "r2", "r3", "primary", "sub") \
    .agg(collect_list("perk").alias("perks")).orderBy("gameId", "champ")

In [30]:
champion_perks_3.show()

+--------+-----------+-----+----+----+----+-------+----+--------------------+
|  gameId|      champ|    w|  r1|  r2|  r3|primary| sub|               perks|
+--------+-----------+-----+----+----+----+-------+----+--------------------+
|48512747|     Gragas|false|5003|5008|5008|   8300|NULL|[8369, 8304, 8345...|
|48512747|     Gragas|false|5003|5008|5008|   NULL|8200|        [8226, 8237]|
|48512747|       Gwen|false|5002|5008|5005|   NULL|8400|        [8429, 8451]|
|48512747|       Gwen|false|5002|5008|5005|   8000|NULL|[8010, 8009, 9104...|
|48512747|     KSante| true|5003|5002|5005|   NULL|8000|        [9111, 9105]|
|48512747|     KSante| true|5003|5002|5005|   8400|NULL|[8437, 8446, 8429...|
|48512747|       Kayn|false|5002|5008|5008|   8000|NULL|[8010, 9111, 9105...|
|48512747|       Kayn|false|5002|5008|5008|   NULL|8300|        [8347, 8304]|
|48512747|     Lucian|false|5002|5008|5005|   8000|NULL|[8005, 8009, 9103...|
|48512747|     Lucian|false|5002|5008|5005|   NULL|8300|        

In [31]:
champion_perks_3.createOrReplaceTempView("champ_perks")
join_query = """
    SELECT c1.gameId, c1.champ, c1.w, c1.r1, c1.r2, 
           c1.r3, c1.primary, c1.perks as primaryPerks, c2.sub,
           c2.perks as subPerks
    FROM champ_perks c1
    JOIN champ_perks c2 ON c1.gameId = c2.gameId
        AND c1.champ = c2.champ
    WHERE c1.primary is not NULL
        AND c2.sub is not NULL
    Order by c1.gameId, c1.w
    """
champ_perks_4 = spark.sql(join_query)


In [32]:
champ_perks_4.show()

+--------+-----------+-----+----+----+----+-------+--------------------+----+------------+
|  gameId|      champ|    w|  r1|  r2|  r3|primary|        primaryPerks| sub|    subPerks|
+--------+-----------+-----+----+----+----+-------+--------------------+----+------------+
|48512747|       Kayn|false|5002|5008|5008|   8000|[8010, 9111, 9105...|8300|[8347, 8304]|
|48512747|     Gragas|false|5003|5008|5008|   8300|[8369, 8304, 8345...|8200|[8226, 8237]|
|48512747|     Lucian|false|5002|5008|5005|   8000|[8005, 8009, 9103...|8300|[8304, 8345]|
|48512747|       Gwen|false|5002|5008|5005|   8000|[8010, 8009, 9104...|8400|[8429, 8451]|
|48512747|       Nami|false|5002|5008|5008|   8200|[8214, 8226, 8210...|8300|[8345, 8347]|
|48512747|MissFortune| true|5002|5008|5005|   8200|[8229, 8226, 8233...|8300|[8345, 8304]|
|48512747|     KSante| true|5003|5002|5005|   8400|[8437, 8446, 8429...|8000|[9111, 9105]|
|48512747|      Sylas| true|5002|5008|5008|   8100|[8112, 8143, 8138...|8200|[8233, 8232]|

In [33]:
# Define a function to select the perk based on the index
def select_perk(index, column):
    return when(col(column).getItem(index).isNotNull(), col(column).getItem(index))

# Add columns for primary1 to primary4 and sub1 to sub2
for i in range(4):
    champ_perks_4 = champ_perks_4.withColumn(f'p{i+1}', select_perk(i,"primaryPerks"))

for i in range(2):
    champ_perks_4 = champ_perks_4.withColumn(f's{i+1}', select_perk(i, "subPerks"))

# Now select and order the columns as required
champ_perks_5 = champ_perks_4.select(
    col("gameId"),
    col("champ"),
    col("w"),
    col("r1"),
    col("r2"),
    col("r3"),
    col("primary"),
    col("p1"),
    col("p2"),
    col("p3"),
    col("p4"),
    col("sub"),
    col("s1"),
    col("s2")).orderBy("gameId", "w")


In [34]:
champ_perks_5.show()

+--------+-----------+-----+----+----+----+-------+----+----+----+----+----+----+----+
|  gameId|      champ|    w|  r1|  r2|  r3|primary|  p1|  p2|  p3|  p4| sub|  s1|  s2|
+--------+-----------+-----+----+----+----+-------+----+----+----+----+----+----+----+
|48512747|     Gragas|false|5003|5008|5008|   8300|8369|8304|8345|8347|8200|8226|8237|
|48512747|       Kayn|false|5002|5008|5008|   8000|8010|9111|9105|8299|8300|8347|8304|
|48512747|       Gwen|false|5002|5008|5005|   8000|8010|8009|9104|8017|8400|8429|8451|
|48512747|     Lucian|false|5002|5008|5005|   8000|8005|8009|9103|8014|8300|8304|8345|
|48512747|       Nami|false|5002|5008|5008|   8200|8214|8226|8210|8237|8300|8345|8347|
|48512747|MissFortune| true|5002|5008|5005|   8200|8229|8226|8233|8237|8300|8345|8304|
|48512747|     KSante| true|5003|5002|5005|   8400|8437|8446|8429|8451|8000|9111|9105|
|48512747|      Sylas| true|5002|5008|5008|   8100|8112|8143|8138|8135|8200|8233|8232|
|48512747|        Lux| true|5003|5008|5008|

In [35]:
# Group by 'champ' and 'r1' and count the occurrences
r1_occurrences = champ_perks_5.groupBy('champ', 'r1').count()
r2_occurrences = champ_perks_5.groupBy('champ', 'r2').count()
r3_occurrences = champ_perks_5.groupBy('champ', 'r3').count()

# Calculate the total number of games for each champion
total_games_per_champ = champ_perks_5.groupBy('champ').count().withColumnRenamed('count', 'total_games')

# Join the occurrences with the total games
r1_occurrence_rates = r1_occurrences.join(total_games_per_champ, 'champ')
r2_occurrence_rates = r2_occurrences.join(total_games_per_champ, 'champ')
r3_occurrence_rates = r3_occurrences.join(total_games_per_champ, 'champ')

# Calculate the occurrence rate for each 'r1' for each champion
r1_occurrence_rates =r1_occurrence_rates.withColumn('occurrence_rate', col('count') / col('total_games'))
r2_occurrence_rates =r2_occurrence_rates.withColumn('occurrence_rate', col('count') / col('total_games'))
r3_occurrence_rates =r3_occurrence_rates.withColumn('occurrence_rate', col('count') / col('total_games'))

# Sort the DataFrame for better readability (if needed)
r1_occurrence_rates = r1_occurrence_rates.orderBy('champ', 'r1')
r2_occurrence_rates = r2_occurrence_rates.orderBy('champ', 'r2')
r3_occurrence_rates = r3_occurrence_rates.orderBy('champ', 'r3')


In [36]:
r1_occurrence_rates.show(200)

+------------+----+-----+-----------+--------------------+
|       champ|  r1|count|total_games|     occurrence_rate|
+------------+----+-----+-----------+--------------------+
|      Aatrox|5001|  547|      19084|0.028662754139593377|
|      Aatrox|5002|16719|      19084|  0.8760741982812827|
|      Aatrox|5003| 1818|      19084| 0.09526304757912388|
|        Ahri|5001|  410|       5665| 0.07237422771403354|
|        Ahri|5002| 1109|       5665| 0.19576345984112975|
|        Ahri|5003| 4146|       5665|  0.7318623124448367|
|       Akali|5001|  416|      10642|0.039090396542003386|
|       Akali|5002| 4738|      10642|  0.4452170644615674|
|       Akali|5003| 5488|      10642|  0.5156925389964292|
|      Akshan|5001|   60|       1019|0.058881256133464184|
|      Akshan|5002|  383|       1019|  0.3758586849852797|
|      Akshan|5003|  576|       1019|  0.5652600588812562|
|     Alistar|5001|  427|       3910|  0.1092071611253197|
|     Alistar|5002| 3128|       3910|                 0.

In [37]:
r2_occurrence_rates.show(200)

+------------+----+-----+-----------+--------------------+
|       champ|  r2|count|total_games|     occurrence_rate|
+------------+----+-----+-----------+--------------------+
|      Aatrox|5002|  261|      19084|0.013676378117795011|
|      Aatrox|5003|   77|      19084|0.004034793544330329|
|      Aatrox|5008|18746|      19084|  0.9822888283378747|
|        Ahri|5002|   58|       5665|0.010238305383936452|
|        Ahri|5003|   56|       5665|0.009885260370697264|
|        Ahri|5008| 5551|       5665|  0.9798764342453663|
|       Akali|5002|   93|      10642|0.008738958842322871|
|       Akali|5003|   76|      10642|0.007141514752866003|
|       Akali|5008|10473|      10642|  0.9841195264048112|
|      Akshan|5002|    8|       1019|0.007850834151128557|
|      Akshan|5003|    8|       1019|0.007850834151128557|
|      Akshan|5008| 1003|       1019|  0.9842983316977428|
|     Alistar|5002| 3311|       3910|  0.8468030690537084|
|     Alistar|5003|  279|       3910| 0.0713554987212276

In [38]:
r3_occurrence_rates.show(200)

+------------+----+-----+-----------+--------------------+
|       champ|  r3|count|total_games|     occurrence_rate|
+------------+----+-----+-----------+--------------------+
|      Aatrox|5005| 2282|      19084| 0.11957660867742612|
|      Aatrox|5007|  218|      19084|0.011423181722909244|
|      Aatrox|5008|16584|      19084|  0.8690002095996646|
|        Ahri|5005| 4459|       5665|  0.7871138570167696|
|        Ahri|5007|  276|       5665| 0.04872021182700794|
|        Ahri|5008|  930|       5665|  0.1641659311562224|
|       Akali|5005|  672|      10642| 0.06314602518323624|
|       Akali|5007|  129|      10642|0.012121781619996242|
|       Akali|5008| 9841|      10642|  0.9247321931967675|
|      Akshan|5005|  884|       1019|  0.8675171736997056|
|      Akshan|5007|    4|       1019|0.003925417075564278|
|      Akshan|5008|  131|       1019| 0.12855740922473013|
|     Alistar|5005|  179|       3910| 0.04578005115089514|
|     Alistar|5007| 3384|       3910|  0.865473145780051

In [39]:
from pyspark.sql.functions import col, count, dense_rank
from pyspark.sql.window import Window

# Calculate the total number of games for each champion
total_games_per_champ = champ_perks_5.groupBy('champ').count().withColumnRenamed('count', 'total_games')

# Group by 'champ' and the primary perk columns, and count the occurrences
perk_counts = champ_perks_5.groupBy('champ', 'primary', 'p1', 'p2', 'p3', 'p4', 'sub', 's1', 's2').count()

# Define a window spec partitioned by 'champ' and ordered by the count in descending order
windowSpec = Window.partitionBy('champ').orderBy(col('count').desc())

# Apply the window spec to add a dense rank to each row within the partition
ranked_perks = perk_counts.withColumn('rank', dense_rank().over(windowSpec))

# Join the ranked perks with the total games per champion to calculate the occurrence rate
ranked_perks_with_total_games = ranked_perks.join(total_games_per_champ, 'champ') \
                                            .withColumn('occurrence_rate', col('count') / col('total_games'))

# Filter for the top 3 combination of perks for each champion
most_popular_perks = ranked_perks_with_total_games.filter(col('rank') <= 3) \
                                           .select('champ', 'primary', 'p1', 'p2', 'p3', 'p4','sub','s1','s2', 'count', 'total_games', 'occurrence_rate')





In [40]:
# Show the result
most_popular_perks.show(600)

+------------+-------+----+----+----+----+----+----+----+-----+-----------+--------------------+
|       champ|primary|  p1|  p2|  p3|  p4| sub|  s1|  s2|count|total_games|     occurrence_rate|
+------------+-------+----+----+----+----+----+----+----+-----+-----------+--------------------+
|      Aatrox|   8000|8010|9111|9105|8299|8400|8473|8453| 8639|      19084| 0.45268287570739885|
|      Aatrox|   8000|8010|9111|9105|8299|8100|8139|8135| 1789|      19084| 0.09374345001047998|
|      Aatrox|   8000|8010|9111|9105|8299|8400|8444|8453|  770|      19084| 0.04034793544330329|
|        Ahri|   8100|8112|8139|8138|8106|8200|8226|8210| 2016|       5665|  0.3558693733451015|
|        Ahri|   8100|8112|8139|8138|8106|8300|8345|8347|  979|       5665| 0.17281553398058253|
|        Ahri|   8200|8214|8226|8210|8237|8300|8345|8347|  509|       5665| 0.08984995586937335|
|       Akali|   8000|8021|8009|9105|8014|8400|8451|8444| 2612|      10642| 0.24544258598007893|
|       Akali|   8000|8010|800

In [41]:
champ_items_1 = classic_df.select(
    col("gameId"),
    col("participant.championName").alias("champ"),
    col("participant.teamPosition").alias("position"),
    col("participant.item0"),
    col("participant.item1"),
    col("participant.item2"),
    col("participant.item3"),
    col("participant.item4"),
    col("participant.item5"),
    col("participant.item6"),
)

In [42]:
champ_items_1.show()

+---------+----------+--------+-----+-----+-----+-----+-----+-----+-----+
|   gameId|     champ|position|item0|item1|item2|item3|item4|item5|item6|
+---------+----------+--------+-----+-----+-----+-----+-----+-----+-----+
|262459713|     Akali|  MIDDLE| 4645| 3152| 3089| 3157| 3020| 3135| 3364|
|261253487|   Leblanc|  MIDDLE| 6655| 3135| 3157| 3020| 4645| 1056| 3363|
|257695691|      Yone|     TOP| 3026| 3031| 6673| 6672| 3181| 3006| 3340|
|261082499|     Galio|  MIDDLE| 3157| 6656| 1082| 1058| 3916| 3020| 3364|
|260967025|     Akali|     TOP| 3152| 3157| 4645| 1082| 1058| 3020| 3364|
|263248079|Blitzcrank| UTILITY| 3860| 3190| 3109| 2055| 3047| 1057| 3364|
|251758418|    Syndra|  MIDDLE| 3157| 4645|    0| 6655| 3020| 1026| 3363|
|263483558|      Hwei|  MIDDLE| 3089| 6653| 2421| 3020| 1082| 4645| 3363|
|263432464|  Nautilus| UTILITY| 3023| 2055| 3860| 1028| 3001| 3111| 3364|
|240106541|      Rell| UTILITY| 3860| 6667|    0| 3047| 3076| 1011| 3364|
|260358254|    Gragas|  JUNGLE| 4645| 

In [43]:
champ_items_1.filter(col("position") == "UTILITY").show()

+---------+----------+--------+-----+-----+-----+-----+-----+-----+-----+
|   gameId|     champ|position|item0|item1|item2|item3|item4|item5|item6|
+---------+----------+--------+-----+-----+-----+-----+-----+-----+-----+
|263030128|  Nautilus| UTILITY| 2065| 3047| 3860| 3075| 1028| 2055| 3364|
|248821392|     Elise| UTILITY| 3152| 3853| 2421| 3191| 4645| 3117| 3364|
|261374673|      Nami| UTILITY| 3011| 6616| 2065| 4005| 3158| 3853| 3364|
|263248079|Blitzcrank| UTILITY| 3860| 3190| 3109| 2055| 3047| 1057| 3364|
|254141692|    Thresh| UTILITY| 3860| 2055| 3050| 3001| 3111|    0| 3364|
|262118622|     Senna| UTILITY| 3009| 3095| 3864|    0| 1038| 1018| 3364|
|259948585|   Taliyah| UTILITY| 3853| 3070| 1082| 3158| 6653| 1058| 3364|
|263337087|     Senna| UTILITY| 3179| 3133| 3864| 6692| 3123| 3009| 3364|
|262054208|     Rakan| UTILITY| 1033| 3859| 2055| 1011| 3111| 1029| 3364|
|263267587|     Yuumi| UTILITY| 2065| 3011| 3853| 6616| 3012| 3114| 3364|
|257594222|   Morgana| UTILITY| 6656| 

In [44]:
import pyspark.sql.functions as F
champ_items_2 = champ_items_1.withColumn("items", F.array_distinct(F.sort_array(F.array("item0", "item1", "item2", "item3", "item4", "item5"))))

In [45]:
champ_items_2.show()

+---------+----------+--------+-----+-----+-----+-----+-----+-----+-----+--------------------+
|   gameId|     champ|position|item0|item1|item2|item3|item4|item5|item6|               items|
+---------+----------+--------+-----+-----+-----+-----+-----+-----+-----+--------------------+
|262459713|     Akali|  MIDDLE| 4645| 3152| 3089| 3157| 3020| 3135| 3364|[3020, 3089, 3135...|
|261253487|   Leblanc|  MIDDLE| 6655| 3135| 3157| 3020| 4645| 1056| 3363|[1056, 3020, 3135...|
|257695691|      Yone|     TOP| 3026| 3031| 6673| 6672| 3181| 3006| 3340|[3006, 3026, 3031...|
|261082499|     Galio|  MIDDLE| 3157| 6656| 1082| 1058| 3916| 3020| 3364|[1058, 1082, 3020...|
|260967025|     Akali|     TOP| 3152| 3157| 4645| 1082| 1058| 3020| 3364|[1058, 1082, 3020...|
|263248079|Blitzcrank| UTILITY| 3860| 3190| 3109| 2055| 3047| 1057| 3364|[1057, 2055, 3047...|
|251758418|    Syndra|  MIDDLE| 3157| 4645|    0| 6655| 3020| 1026| 3363|[0, 1026, 3020, 3...|
|263483558|      Hwei|  MIDDLE| 3089| 6653| 2421| 

In [46]:
champ_summoners_1 = classic_df.select(
    col("gameId"),
    col("participant.teamPosition").alias("position"),
    col("participant.championName").alias("champ"),
    col("participant.summoner1Id"),
    col("participant.summoner2Id"))

In [47]:
champ_summoners_1.show()

+---------+--------+----------+-----------+-----------+
|   gameId|position|     champ|summoner1Id|summoner2Id|
+---------+--------+----------+-----------+-----------+
|262459713|  MIDDLE|     Akali|          4|         12|
|261253487|  MIDDLE|   Leblanc|          4|         12|
|257695691|     TOP|      Yone|          4|         12|
|261082499|  MIDDLE|     Galio|          4|         12|
|260967025|     TOP|     Akali|          4|         12|
|263248079| UTILITY|Blitzcrank|          4|         14|
|251758418|  MIDDLE|    Syndra|         12|          4|
|263483558|  MIDDLE|      Hwei|         12|          4|
|263432464| UTILITY|  Nautilus|          4|         14|
|240106541| UTILITY|      Rell|          4|         14|
|260358254|  JUNGLE|    Gragas|          4|         11|
|263902489|  JUNGLE|     Viego|         11|          4|
|263626139| UTILITY|      Nami|          4|         14|
|258176924| UTILITY|      Pyke|          4|         14|
|252034482|  BOTTOM|     Kaisa|          4|     

In [48]:
champ_summoners_2 = champ_summoners_1.withColumn("summoners", F.array_distinct(F.sort_array(F.array("summoner1Id", "summoner2Id"))))

In [49]:

total_games_per_champ = champ_summoners_2.groupBy('champ').count().withColumnRenamed('count', 'total_games')


summoners_counts = champ_summoners_2.groupBy('champ', 'summoners').count()

windowSpec = Window.partitionBy('champ').orderBy(col('count').desc())

ranked_summoners = summoners_counts.withColumn('rank', dense_rank().over(windowSpec))

ranked_summoners_with_total_games = ranked_summoners.join(total_games_per_champ, 'champ') \
                                            .withColumn('occurrence_rate', col('count') / col('total_games'))

most_popular_summoners = ranked_summoners_with_total_games.filter(col('rank') <= 2) \
                                           .select('champ', 'summoners', 'count', 'total_games', 'occurrence_rate')


In [50]:
most_popular_summoners.show()

+--------+---------+-----+-----------+-------------------+
|   champ|summoners|count|total_games|    occurrence_rate|
+--------+---------+-----+-----------+-------------------+
|  Aatrox|  [4, 12]|15763|      19579| 0.8050972981255426|
|  Aatrox|  [4, 14]| 3411|      19579|0.17421727360947956|
|    Ahri|  [4, 14]| 3067|       5879| 0.5216873617962239|
|    Ahri|  [4, 12]| 2657|       5879| 0.4519476101377785|
|   Akali|  [4, 14]| 4172|      11031|0.37820687154383104|
|   Akali|  [4, 12]| 3929|      11031|0.35617804369504125|
|  Akshan|  [4, 14]|  772|       1053| 0.7331433998100665|
|  Akshan|  [4, 12]|  143|       1053|0.13580246913580246|
| Alistar|  [4, 14]| 3325|       4033| 0.8244483015125217|
| Alistar|   [3, 4]|  512|       4033| 0.1269526407141086|
|   Amumu|  [4, 11]|  961|       1265| 0.7596837944664032|
|   Amumu|  [4, 14]|  228|       1265|0.18023715415019761|
|  Anivia|  [4, 12]|  846|       1563| 0.5412667946257198|
|  Anivia|  [4, 14]|  576|       1563| 0.368522072936660

In [51]:
csv_path = "D:\BigData\Data\result\summoners_champ.csv"
most_popular_summoners.write.format("csv").mode("overwrite").save("csv_path")

AnalysisException: [UNSUPPORTED_DATA_TYPE_FOR_DATASOURCE] The CSV datasource doesn't support the column `summoners` of the type "ARRAY<BIGINT>".

In [None]:
csv_path = "D:/BigData/Data/result/worst_matchups.csv"
most_popular_summoners.coalesce(1).write.csv(csv_path)

In [54]:
# Define custom field names
field_names = ["Champion1", "Champion2", "Win",	"Toltal", "Win_rate",	"Index"]

# Convert DataFrame to a format with custom field names included
data_with_custom_fields = df.rdd.map(lambda row: dict(zip(field_names, row)))

# Write DataFrame to MongoDB
data_with_custom_fields.toDF().write \
    .format("mongo") \
    .mode("overwrite") \
    .option("uri", "mongodb+srv://cbnprinze:cbnprince@cbnprince.qijvvlq.mongodb.net/result.worst_matchup") \
    .save()

Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.runJob.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 290.0 failed 4 times, most recent failure: Lost task 0.3 in stage 290.0 (TID 2232) (10.90.254.80 executor 1): org.apache.spark.SparkException: Python worker failed to connect back.
	at org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:203)
	at org.apache.spark.api.python.PythonWorkerFactory.create(PythonWorkerFactory.scala:109)
	at org.apache.spark.SparkEnv.createPythonWorker(SparkEnv.scala:124)
	at org.apache.spark.api.python.BasePythonRunner.compute(PythonRunner.scala:174)
	at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:67)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
	at org.apache.spark.scheduler.Task.run(Task.scala:141)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
Caused by: java.net.SocketTimeoutException: Accept timed out
	at java.net.DualStackPlainSocketImpl.waitForNewConnection(Native Method)
	at java.net.DualStackPlainSocketImpl.socketAccept(DualStackPlainSocketImpl.java:135)
	at java.net.AbstractPlainSocketImpl.accept(AbstractPlainSocketImpl.java:409)
	at java.net.PlainSocketImpl.accept(PlainSocketImpl.java:199)
	at java.net.ServerSocket.implAccept(ServerSocket.java:545)
	at java.net.ServerSocket.accept(ServerSocket.java:513)
	at org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:190)
	... 17 more

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2844)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2780)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2779)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2779)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1242)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1242)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1242)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:3048)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2982)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2971)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:984)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2398)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2419)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2438)
	at org.apache.spark.api.python.PythonRDD$.runJob(PythonRDD.scala:181)
	at org.apache.spark.api.python.PythonRDD.runJob(PythonRDD.scala)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.SparkException: Python worker failed to connect back.
	at org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:203)
	at org.apache.spark.api.python.PythonWorkerFactory.create(PythonWorkerFactory.scala:109)
	at org.apache.spark.SparkEnv.createPythonWorker(SparkEnv.scala:124)
	at org.apache.spark.api.python.BasePythonRunner.compute(PythonRunner.scala:174)
	at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:67)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
	at org.apache.spark.scheduler.Task.run(Task.scala:141)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	... 1 more
Caused by: java.net.SocketTimeoutException: Accept timed out
	at java.net.DualStackPlainSocketImpl.waitForNewConnection(Native Method)
	at java.net.DualStackPlainSocketImpl.socketAccept(DualStackPlainSocketImpl.java:135)
	at java.net.AbstractPlainSocketImpl.accept(AbstractPlainSocketImpl.java:409)
	at java.net.PlainSocketImpl.accept(PlainSocketImpl.java:199)
	at java.net.ServerSocket.implAccept(ServerSocket.java:545)
	at java.net.ServerSocket.accept(ServerSocket.java:513)
	at org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:190)
	... 17 more
