# LEAGUE OF LEGENDS ETL USING PYSPARK

The objective of this notebook is to prototype an ETL process to extract useful information from different tables stored in CSV at AWS s3, and then save the files in Parquet format on AWS s3. Our datasets are based on the famous game League of Legends and contain statistics from ranked games in 2020. At the end of this process, we will have tables containing information about the best player and the best builds for each champion. Programs like Blitz perform similar queries to help players to improve their performance

In [1]:
# import functions
from pyspark.sql import SparkSession
from pyspark.sql import SQLContext
from pyspark.sql.functions import udf, explode, flatten, explode_outer
from pyspark.sql.types import ArrayType, IntegerType, StringType, StructType, StructField, DoubleType, LongType, MapType, BooleanType
from pyspark.sql import functions as F
from pyspark.sql import Row
from pyspark.sql.functions import col, concat
import json


# Start Spark Session

In [3]:
# create local session
spark = SparkSession.builder.appName('lol').\
        master("local").\
        getOrCreate()
sqlContext = SQLContext(spark)

# Define initial Schema

An initial approach to extract data using spark is to define a schema. Schemas are useful to indicate which type each column has. Unfortunately, our data is stored in CSV format, and some of their columns are defined as arrays of structs, which are not supported on CSV format. To solve this issue, we first cast the columns [participants, participantIdentities] as StringType, and then create a function to parse each string row into a data frame row.

In [4]:
# partial shcmea for match data
match_schema = StructType(
    [
        StructField('_c0', IntegerType(), True),
        StructField('gameCreation', DoubleType(), True),
        StructField('gameDuration', DoubleType(), True),
        StructField('gameId', DoubleType(), True),
        StructField('gameMode', StringType(), True),
        StructField('gameType', StringType(), True),
        StructField('gameVersion', StringType(), True),
        StructField('mapId', DoubleType(), True),
        StructField('participantIdentities', StringType(), True),
        StructField('participants',  StringType(), True),
        StructField('platformId', StringType(), True),
        StructField('queueId', DoubleType(), True),
        StructField('seasonId', DoubleType(), True),
        StructField('status.message', StringType(), True),
        StructField('status.status_code', StringType(), True)
    ]
)

# shcmea for itens data
itens_schema = StructType(
    [
        StructField('_c0', IntegerType(), True),
        StructField('item_id', IntegerType(), True),
        StructField('name', StringType(), True),
        StructField('upper_item', StringType(), True),
        StructField('explain', StringType(), True),
        StructField('buy_price', IntegerType(), True),
        StructField('sell_price', IntegerType(), True),
        StructField('tag', StringType(), True)
    ]
)

# shcmea for champions data

champions_schema = StructType(
    [
        StructField('_c0', IntegerType(), True),
        StructField('version', StringType(), True),
        StructField('id', StringType(), True),
        StructField('key', IntegerType(), True),
        StructField('name', StringType(), True),
        StructField('title', StringType(), True),
        StructField('blurb', StringType(), True),
        StructField('tags', StringType(), True),
        StructField('partype', StringType(), True),
        StructField('info.attack', IntegerType(), True),
        StructField('info.defense', IntegerType(), True),
        StructField('info.magic', IntegerType(), True),
        StructField('info.difficulty', IntegerType(), True),
        StructField('image.full', StringType(), True),
        StructField('image.sprite', StringType(), True),
        StructField('image.group', StringType(), True),
        StructField('image.x', IntegerType(), True),
        StructField('image.y', IntegerType(), True),
        StructField('image.w', IntegerType(), True),
        StructField('image.h', IntegerType(), True),
        StructField('stats.hp', DoubleType(), True),
        StructField('stats.hpperlevel', IntegerType(), True),
        StructField('stats.mp', DoubleType(), True),
        StructField('stats.mpperlevel', DoubleType(), True),
        StructField('stats.movespeed', IntegerType(), True),
        StructField('stats.armor', DoubleType(), True),
        StructField('stats.armorperlevel', DoubleType(), True),
        StructField('stats.spellblock', DoubleType(), True),
        StructField('stats.spellblockperlevel', DoubleType(), True),
        StructField('stats.attackrange', IntegerType(), True),
        StructField('stats.hpregen', DoubleType(), True),
        StructField('stats.hpregenperlevel', DoubleType(), True),
        StructField('stats.mpregen', DoubleType(), True),
        StructField('stats.mpregenperlevel', DoubleType(), True),
        StructField('stats.crit', IntegerType(), True),
        StructField('stats.critperlevel', IntegerType(), True),
        StructField('stats.attackdamage', DoubleType(), True),
        StructField('stats.attackdamageperlevel', DoubleType(), True),
        StructField('stats.attackspeedperlevel', DoubleType(), True),
        StructField('stats.attackspeed', DoubleType(), True),
    ]
)

In [5]:
# read match data
match_data = spark.read.csv("../data/match_data_version1.csv",
                    header='true',
                    schema=match_schema)

# https://www.kaggle.com/tk0802kim/kerneld01a1ec7ad
itens = spark.read.csv("../data/riot_item.csv",
                    header='true',
                    schema=itens_schema)

champions = spark.read.csv("../data/riot_champion.csv",
                    header='true',
                    schema=champions_schema)


# Transform

In [6]:
from pyspark.sql.functions import *
from pyspark.sql.types import *
import pyspark.sql.functions as F

# Convenience function for turning JSON strings into DataFrames.
# https://docs.databricks.com/_static/notebooks/transform-complex-data-types-scala.html
def jsonToDataFrame(json_input, schema=None):
    # SparkSessions are available with Spark 2.0+
    reader = spark.read
    if schema:
        reader.schema(schema)
    return reader.json(sc.parallelize([json_input]))


# Convenience function flatten dataframes with structs.
#https://stackoverflow.com/questions/38753898/how-to-flatten-a-struct-in-a-spark-dataframe
def flatten_df(nested_df):
    stack = [((), nested_df)]
    columns = []

    while len(stack) > 0:
        parents, df = stack.pop()

        flat_cols = [
            col(".".join(parents + (c[0],))).alias("_".join(parents + (c[0],)))
            for c in df.dtypes
            if c[1][:6] != "struct"
        ]

        nested_cols = [
            c[0]
            for c in df.dtypes
            if c[1][:6] == "struct"
        ]

        columns.extend(flat_cols)

        for nested_col in nested_cols:
            projected_df = df.select(nested_col + ".*")
            stack.append((parents + (nested_col,), projected_df))

    return nested_df.select(columns)

# Convenience function transform dict array string into rows
def transform_colum(df, column):
    df_select = df.select(col(column))
    str_ = df_select.take(1)[0].asDict()[column]
    df_select = jsonToDataFrame(json.dumps(eval(str_)))
    schema = df_select.schema
    
    eval_column = udf(lambda x : eval(x), ArrayType(schema))

    df = df.withColumn(column, eval_column(col(column)))
    
    return df, schema

## Extract new schema for loser and winner data

### Get schema from first row

## Extract new schema for match data

In [7]:
# Rename columns with '.'
match_data = match_data.withColumnRenamed("status.message", "status_message")
match_data = match_data.withColumnRenamed("status.status_code", "status_status_code")

get first row of participants, convert into json, then converti into dataframe and then extract the schema

In [8]:
# transform string rows into psypsark rows
match_data, schema_partifipants = transform_colum(match_data, "participants")
match_data, schema_identities = transform_colum(match_data, "participantIdentities")

In [9]:
# new schema
match_data.printSchema()

root
 |-- _c0: integer (nullable = true)
 |-- gameCreation: double (nullable = true)
 |-- gameDuration: double (nullable = true)
 |-- gameId: double (nullable = true)
 |-- gameMode: string (nullable = true)
 |-- gameType: string (nullable = true)
 |-- gameVersion: string (nullable = true)
 |-- mapId: double (nullable = true)
 |-- participantIdentities: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- participantId: long (nullable = true)
 |    |    |-- player: struct (nullable = true)
 |    |    |    |-- accountId: string (nullable = true)
 |    |    |    |-- currentAccountId: string (nullable = true)
 |    |    |    |-- currentPlatformId: string (nullable = true)
 |    |    |    |-- matchHistoryUri: string (nullable = true)
 |    |    |    |-- platformId: string (nullable = true)
 |    |    |    |-- profileIcon: long (nullable = true)
 |    |    |    |-- summonerId: string (nullable = true)
 |    |    |    |-- summonerName: string (nullable = true

In [10]:
# here we have to array columns. Before flatten our dataset we need first concatanate this arrays into a single array, and them explode their rows. 
combine = udf(lambda x, y: list(zip(x, y)),ArrayType(StructType([StructField("ids", schema_partifipants),
                                    StructField("info", schema_identities)]))
             )
match_data = match_data.withColumn("participants_info", combine("participants", "participantIdentities"))


In [11]:
# remove the old columns
columns_to_drop = ['participants', 'participantIdentities']
match_data = match_data.drop(*columns_to_drop)
match_data = match_data.withColumn("participants_info", explode("participants_info"))


In [12]:
match_data.printSchema()

root
 |-- _c0: integer (nullable = true)
 |-- gameCreation: double (nullable = true)
 |-- gameDuration: double (nullable = true)
 |-- gameId: double (nullable = true)
 |-- gameMode: string (nullable = true)
 |-- gameType: string (nullable = true)
 |-- gameVersion: string (nullable = true)
 |-- mapId: double (nullable = true)
 |-- platformId: string (nullable = true)
 |-- queueId: double (nullable = true)
 |-- seasonId: double (nullable = true)
 |-- status_message: string (nullable = true)
 |-- status_status_code: string (nullable = true)
 |-- participants_info: struct (nullable = true)
 |    |-- ids: struct (nullable = true)
 |    |    |-- championId: long (nullable = true)
 |    |    |-- participantId: long (nullable = true)
 |    |    |-- spell1Id: long (nullable = true)
 |    |    |-- spell2Id: long (nullable = true)
 |    |    |-- stats: struct (nullable = true)
 |    |    |    |-- assists: long (nullable = true)
 |    |    |    |-- champLevel: long (nullable = true)
 |    |    |  

In [13]:
# flatten structs
match_data=flatten_df(match_data)
match_data.printSchema()

root
 |-- _c0: integer (nullable = true)
 |-- gameCreation: double (nullable = true)
 |-- gameDuration: double (nullable = true)
 |-- gameId: double (nullable = true)
 |-- gameMode: string (nullable = true)
 |-- gameType: string (nullable = true)
 |-- gameVersion: string (nullable = true)
 |-- mapId: double (nullable = true)
 |-- platformId: string (nullable = true)
 |-- queueId: double (nullable = true)
 |-- seasonId: double (nullable = true)
 |-- status_message: string (nullable = true)
 |-- status_status_code: string (nullable = true)
 |-- participants_info_info_participantId: long (nullable = true)
 |-- participants_info_info_player_accountId: string (nullable = true)
 |-- participants_info_info_player_currentAccountId: string (nullable = true)
 |-- participants_info_info_player_currentPlatformId: string (nullable = true)
 |-- participants_info_info_player_matchHistoryUri: string (nullable = true)
 |-- participants_info_info_player_platformId: string (nullable = true)
 |-- particip

In [14]:
# get the dictionary with itens names and keys
itens_dict = itens.select("item_id", "name").distinct().collect()
itens_dict = {v["item_id"]:v["name"] for v in itens_dict}


In [15]:
# help function to translate a item key into a item name
def transform_itens(x):
    try:
        value = itens_dict[int(x)] 
    except:
        value = "Name Not Found"
    return value


new_cols_itens = udf(lambda x : transform_itens(x), StringType())

# apply the translate function for each item column
match_data = match_data.withColumn("name_item0", new_cols_itens(col("participants_info_ids_stats_item0")))
match_data = match_data.withColumn("name_item1", new_cols_itens(col("participants_info_ids_stats_item1")))
match_data = match_data.withColumn("name_item2", new_cols_itens(col("participants_info_ids_stats_item2")))
match_data = match_data.withColumn("name_item3", new_cols_itens(col("participants_info_ids_stats_item3")))
match_data = match_data.withColumn("name_item4", new_cols_itens(col("participants_info_ids_stats_item4")))
match_data = match_data.withColumn("name_item5", new_cols_itens(col("participants_info_ids_stats_item5")))
match_data = match_data.withColumn("name_item6", new_cols_itens(col("participants_info_ids_stats_item6")))

In [16]:
# get the dictionary with champions names and keys

champions_dict = champions.select("key", "name").distinct().collect()
champions_dict = {v["key"]:v["name"] for v in champions_dict}


In [17]:
# help function to translate a champion key into a champion name
def transform_champions(x):
    try:
        value = champions_dict[int(x)] 
    except:
        value = "Name Not Found"
    return value


new_cols_champions = udf(lambda x : transform_champions(x), StringType())

# apply the translate function for champion column
match_data = match_data.withColumn("name_champion", new_cols_champions(col("participants_info_ids_championId")))

# SQL Queries

Now, we will perform some SQL queries to transform our data in a better format to extract insights. First, we will discover the best players for each champion. After that, we will find which items those players usually build. This information provides good information for which items the users need to buy when playing with a specific champion.

<b>Disclaimer:<b>In our data, the items column store the last item on the slot, so they may see initial items or final items depending on the gaming in some cases.

In [18]:
# Register the DataFrame as a SQL temporary view
match_data.createOrReplaceTempView("match_data")


In [19]:
# SQL querrie to extract the victory stats for each  champion
champions = sqlContext.sql("""
                              SELECT victorys.name_champion as name_champion, victorys.won_matches, matches.total_matches, victorys.won_matches/matches.total_matches as win_rate \
                              FROM \
                                  (SELECT match_data.name_champion as name_champion, COUNT(DISTINCT(match_data.gameId)) as won_matches \
                                  FROM match_data \
                                  WHERE match_data.participants_info_ids_stats_win == true \
                                  GROUP BY match_data.name_champion) as victorys \
                              LEFT JOIN (SELECT match_data.name_champion as name_champion, COUNT(DISTINCT(match_data.gameId)) as total_matches \
                                         FROM match_data \
                                         GROUP BY match_data.name_champion) as matches \
                              ON victorys.name_champion = matches.name_champion
                              ORDER BY matches.total_matches DESC
                          """) 
champions.createOrReplaceTempView("champions")
champions.show()

+-------------+-----------+-------------+-------------------+
|name_champion|won_matches|total_matches|           win_rate|
+-------------+-----------+-------------+-------------------+
|      Lee Sin|      17527|        35226| 0.4975586214727758|
|       Ezreal|      14920|        30133|0.49513822055553713|
| Miss Fortune|      14834|        28829| 0.5145513198515383|
|       Thresh|      12454|        24663| 0.5049669545472976|
|       Lucian|      11778|        24157| 0.4875605414579625|
|         Sett|      11056|        21694| 0.5096340001843828|
|       Kai'Sa|      10053|        21041|0.47778147426453116|
|     Aphelios|      10259|        20247| 0.5066923494838742|
|        Senna|      10365|        20217| 0.5126873423356582|
|        Sylas|       9508|        19842|0.47918556597117223|
|         Ekko|       9846|        19618| 0.5018860230400652|
|        Elise|       9553|        18065| 0.5288126210905065|
|     Nautilus|       8191|        16555|0.49477499244941103|
|       

In [20]:
# SQL querrie to extract the victory stats for each  player with an especific champion
players = sqlContext.sql("""
                            SELECT victorys.id as user, victorys.name_champion as name_champion, victorys.won_matches, matches.total_matches, victorys.won_matches/matches.total_matches as win_rate \
                            FROM \
                                (SELECT match_data.participants_info_info_player_accountId as id, match_data.name_champion as name_champion, COUNT(DISTINCT(match_data.gameId)) as won_matches \
                                FROM match_data \
                                WHERE match_data.participants_info_ids_stats_win == true \
                                GROUP BY match_data.participants_info_info_player_accountId, match_data.name_champion) as victorys \
                            LEFT JOIN (SELECT match_data.participants_info_info_player_accountId as id, match_data.name_champion as name_champion, COUNT(DISTINCT(match_data.gameId)) as total_matches \
                                       FROM match_data \
                                       GROUP BY match_data.participants_info_info_player_accountId, match_data.name_champion) as matches \
                            ON victorys.id=matches.id AND victorys.name_champion = matches.name_champion
                            ORDER BY matches.total_matches DESC
                        """) 
players.createOrReplaceTempView("players")
players.show()

+--------------------+-------------+-----------+-------------+-------------------+
|                user|name_champion|won_matches|total_matches|           win_rate|
+--------------------+-------------+-----------+-------------+-------------------+
|Dli4ZNwRdA8FOkKO-...|       Anivia|        178|          368|  0.483695652173913|
|Kre-yBBl_dth1hTu8...|         Ekko|        192|          364| 0.5274725274725275|
|qTg_APziIefWzPLf9...|      Alistar|        172|          334| 0.5149700598802395|
|eUEOhQpye02aeTnO-...|          Jax|        166|          333| 0.4984984984984985|
|5VLbcHCjOrxiV5EVj...|         Ekko|        167|          329| 0.5075987841945289|
|HO0Zipwm_4RS7iUAX...|       Graves|        186|          320|            0.58125|
|-K5Dcqaa_SYxpKQUG...|         Pyke|        161|          314| 0.5127388535031847|
|dBCC_W1jTA_-QplOR...|          Zed|        141|          299|0.47157190635451507|
|alEJ6dwgbWnPqNuNT...|      Kindred|        162|          295| 0.5491525423728814|
|Ted

In [21]:
# SQL querrie to extract the most common first iten for each champion

connector = "-"
build_first_item = sqlContext.sql("""
                                     SELECT build.championName, build.build_name as first_item, COUNT(build.build_name) as total_matches \
                                     FROM \
                                         (SELECT match_data.name_champion as championName, match_data.name_item0  as build_name \
                                         FROM match_data \
                                         WHERE match_data.participants_info_info_player_accountId \
                                         IN ( \
                                              SELECT players.user
                                              FROM players \
                                              WHERE players.win_rate > 0.5 AND players.total_matches > 2)) as build \
                                     GROUP BY build.championName, build.build_name \
                                     ORDER BY total_matches DESC
                                 """)
build_first_item = build_first_item.dropDuplicates((['championName'])).sort((['championName']))
build_first_item.show()

+------------+--------------------+-------------+
|championName|          first_item|total_matches|
+------------+--------------------+-------------+
|      Aatrox|      Doran's Shield|         2110|
|        Ahri|     Hextech GLP-800|          801|
|       Akali|    Hextech Gunblade|         2490|
|     Alistar|Bulwark of the Mo...|          643|
|       Amumu|Enchantment: Runi...|           36|
|      Anivia|    Seraph's Embrace|          113|
|       Annie|        Doran's Ring|          117|
|    Aphelios|       Doran's Blade|         4572|
|        Ashe|Blade of the Ruin...|         1594|
|Aurelion Sol|   Corrupting Potion|          408|
|        Azir|      Nashor's Tooth|          324|
|        Bard|   Shard of True Ice|         2007|
|  Blitzcrank|Pauldrons of Whit...|         1647|
|       Brand|        Luden's Echo|           61|
|       Braum|Pauldrons of Whit...|         1021|
|     Caitlyn|       Doran's Blade|         1362|
|     Camille|      Ravenous Hydra|         1210|


In [22]:
# SQL querrie to extract the most common full build for each champion

connector = "-"
build = sqlContext.sql("""
                          SELECT build.championName, build.build_name, COUNT(build.build_name) as total_matches \
                          FROM \
                              (SELECT match_data.name_champion as championName, CONCAT(match_data.name_item0, "%s",
                                             match_data.name_item1, "%s",
                                             match_data.name_item2, "%s",
                                             match_data.name_item3, "%s",
                                             match_data.name_item4, "%s",
                                             match_data.name_item5, "%s",
                                             match_data.name_item6) as build_name \
                              FROM match_data \
                              WHERE match_data.participants_info_info_player_accountId \
                              IN ( \
                                   SELECT players.user
                                   FROM players \
                                   WHERE players.win_rate > 0.5 AND players.total_matches > 2)) as build \
                         GROUP BY build.championName, build.build_name \
                         ORDER BY total_matches DESC
                    """ % tuple([connector]*6))
build = build.dropDuplicates((['championName'])).sort((['championName']))
build.show()

+------------+--------------------+-------------+
|championName|          build_name|total_matches|
+------------+--------------------+-------------+
|      Aatrox|Doran's Shield-He...|           29|
|        Ahri|Doran's Ring-Lost...|            4|
|       Akali|Doran's Ring-Heal...|           11|
|     Alistar|Health Potion-Nam...|           89|
|       Amumu|Hunter's Talisman...|            5|
|      Anivia|Archangel's Staff...|            4|
|       Annie|Doran's Ring-Boot...|           13|
|    Aphelios|Doran's Blade-Hea...|           73|
|        Ashe|Doran's Blade-Bla...|           12|
|Aurelion Sol|Corrupting Potion...|            6|
|        Azir|Doran's Ring-Heal...|            9|
|        Bard|Spellthief's Edge...|           32|
|  Blitzcrank|Name Not Found-Na...|           15|
|       Brand|Doran's Ring-Sorc...|            4|
|       Braum|Name Not Found-Na...|            8|
|     Caitlyn|Doran's Blade-Hea...|           22|
|     Camille|Doran's Blade-Nam...|           14|


# LOAD

Finally, we will store this data in parquet format. 

In [None]:
players.write.parquet("../players.parquet")
champions.write.parquet("../champions.parquet")
build_first_item.write.parquet("../build_first_item.parquet")
build.write.parquet("../build.parquet")