## Criação das partições de competition

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
import os
import boto3

In [2]:
spark = SparkSession.builder.getOrCreate()

SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/home/glue_user/spark/jars/log4j-slf4j-impl-2.17.2.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/home/glue_user/spark/jars/slf4j-reload4j-1.7.36.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/home/glue_user/aws-glue-libs/jars/log4j-slf4j-impl-2.17.2.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/home/glue_user/aws-glue-libs/jars/slf4j-reload4j-1.7.36.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.apache.logging.slf4j.Log4jLoggerFactory]
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


In [9]:
# read competitions

path_competitions =  os.path.join("s3://","sor","football","data","competitions.json")
df_competition = spark.read.option("multiline", "true").json(path_competitions)

In [10]:
# Write competitions in LandingZone

path_competitions_landing = os.path.join("s3://","landing","competitions") 
print(path_competitions_landing)
df_competition.write.partitionBy("season_id").format("json").mode("overwrite").save(path_competitions_landing)

s3://landing/competitions


                                                                                

## Criação das partições de Matches

In [3]:
# Read all matches
path_all_matches = os.path.join("s3://","sor","football","data","matches","*","*.json")

df_match = spark.read.option("multiline", "true").json(path_all_matches)
print(df_match.count())

24/01/23 00:54:56 WARN MetricsConfig: Cannot locate configuration: tried hadoop-metrics2-s3a-file-system.properties,hadoop-metrics2.properties
[Stage 2:>                                                          (0 + 4) / 4]

2886


                                                                                

In [4]:
df_match.printSchema()

root
 |-- away_score: long (nullable = true)
 |-- away_team: struct (nullable = true)
 |    |-- away_team_gender: string (nullable = true)
 |    |-- away_team_group: string (nullable = true)
 |    |-- away_team_id: long (nullable = true)
 |    |-- away_team_name: string (nullable = true)
 |    |-- country: struct (nullable = true)
 |    |    |-- id: long (nullable = true)
 |    |    |-- name: string (nullable = true)
 |    |-- managers: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- country: struct (nullable = true)
 |    |    |    |    |-- id: long (nullable = true)
 |    |    |    |    |-- name: string (nullable = true)
 |    |    |    |-- dob: string (nullable = true)
 |    |    |    |-- id: long (nullable = true)
 |    |    |    |-- name: string (nullable = true)
 |    |    |    |-- nickname: string (nullable = true)
 |-- competition: struct (nullable = true)
 |    |-- competition_id: long (nullable = true)
 |    |-- competition_nam

In [17]:
df_match.show()

+----------+--------------------+--------------------+-------------------+----------+--------------------+------------+--------------------+----------------+----------+--------+------------+----------------+----------+-------------+--------------------+---------------+--------------------+
|away_score|           away_team|         competition|  competition_stage|home_score|           home_team|    kick_off|        last_updated|last_updated_360|match_date|match_id|match_status|match_status_360|match_week|     metadata|             referee|         season|             stadium|
+----------+--------------------+--------------------+-------------------+----------+--------------------+------------+--------------------+----------------+----------+--------+------------+----------------+----------+-------------+--------------------+---------------+--------------------+
|         2|{male, null, 322,...|{11, La Liga, Spain}|{1, Regular Season}|         2|{{214, Spain}, ma...|20:00:00.000|2023-02-

In [19]:
df_match.select("season.season_id", "competition.competition_id").show()

+---------+--------------+
|season_id|competition_id|
+---------+--------------+
|       27|            11|
|       27|            11|
|       27|            11|
|       27|            11|
|       27|            11|
|       27|            11|
|       27|            11|
|       27|            11|
|       27|            11|
|       27|            11|
|       27|            11|
|       27|            11|
|       27|            11|
|       27|            11|
|       27|            11|
|       27|            11|
|       27|            11|
|       27|            11|
|       27|            11|
|       27|            11|
+---------+--------------+
only showing top 20 rows



In [28]:
df_match_add_col_competition = df_match.withColumn("competition_id", F.col("competition.competition_id"))
df_match_add_col_season = df_match_add_col_competition.withColumn("season_id", F.col("season.season_id"))
df_match_add_col_season.show()

+----------+--------------------+--------------------+-------------------+----------+--------------------+------------+--------------------+----------------+----------+--------+------------+----------------+----------+-------------+--------------------+---------------+--------------------+--------------+---------+
|away_score|           away_team|         competition|  competition_stage|home_score|           home_team|    kick_off|        last_updated|last_updated_360|match_date|match_id|match_status|match_status_360|match_week|     metadata|             referee|         season|             stadium|competition_id|season_id|
+----------+--------------------+--------------------+-------------------+----------+--------------------+------------+--------------------+----------------+----------+--------+------------+----------------+----------+-------------+--------------------+---------------+--------------------+--------------+---------+
|         2|{male, null, 322,...|{11, La Liga, Spain

In [30]:
path_match_landing = os.path.join("s3://","landing","matches") 

# Escreve na camada landing particionando por competition_id e season_id
df_match_add_col_season.write.partitionBy("competition_id","season_id").format("json").mode("overwrite").save(path_match_landing)

                                                                                

# análise de casos de matches

In [40]:
# Consulta por competicao e temporada

competition_id = "2"
season_id = "27"

df_competition2_season27 = spark.read.json(f"s3://landing/matches/competition_id={competition_id}/season_id={season_id}")
print(df_competition2_season27.count())
df_competition2_season27.select("away_team").orderBy("match_id").show(truncate=False)



380
+---------------------------------------------------------------------------------------------------------------------------------------------------------+
|away_team                                                                                                                                                |
+---------------------------------------------------------------------------------------------------------------------------------------------------------+
|{male, 28, AFC Bournemouth, {68, England}, [{{68, England}, 1977-11-29, 38, Eddie Howe, null}]}                                                          |
|{male, 40, West Ham United, {68, England}, [{{56, Croatia}, 1968-09-11, 150, Slaven Bilić, null}]}                                                       |
|{male, 29, Everton, {68, England}, [{{214, Spain}, 1973-07-13, 263, Roberto Martínez Montoliú, Roberto Martínez}]}                                       |
|{male, 28, AFC Bournemouth, {68, England}, [{{68, England},

In [38]:
# Consulta por competicao e temporada usando where
spark.conf.set("spark.sql.repl.eagerEval.enabled", False)

df_matches_landing = spark.read.json(path_match_landing)
df_matches_landing.select("away_team").where(f"season_id = {season_id} and competition_id = {competition_id}").orderBy("match_id").show(truncate=False)
print(df_matches_landing.where(f"season_id = {season_id} and competition_id = {competition_id}").count())

+---------------------------------------------------------------------------------------------------------------------------------------------------------------+
|away_team                                                                                                                                                      |
+---------------------------------------------------------------------------------------------------------------------------------------------------------------+
|{male, null, 28, AFC Bournemouth, {68, England}, [{{68, England}, 1977-11-29, 38, Eddie Howe, null}]}                                                          |
|{male, null, 40, West Ham United, {68, England}, [{{56, Croatia}, 1968-09-11, 150, Slaven Bilić, null}]}                                                       |
|{male, null, 29, Everton, {68, England}, [{{214, Spain}, 1973-07-13, 263, Roberto Martínez Montoliú, Roberto Martínez}]}                                       |
|{male, null, 28, AFC Bourne

In [41]:
# consulta por competicao, exibindo temporadas distintas
df_competition2 = spark.read.json(f"s3://landing/matches/competition_id={competition_id}")
df_competition2.select("season_id").distinct().show()

+---------+
|season_id|
+---------+
|       27|
|       44|
+---------+



## Particao LineUp

In [2]:

MINIO_ENDPOINT = "http://storage:9000"

s3_endpoint_url = MINIO_ENDPOINT
aws_access_key_id = "root"
aws_secret_access_key = "password"
bucket_name = "sor"
prefix = "football/data/lineups/"

s3 = boto3.client(
                            "s3",
                            endpoint_url=s3_endpoint_url,
                            aws_access_key_id=aws_access_key_id,
                            aws_secret_access_key=aws_secret_access_key
                        )

print(s3)


<botocore.client.S3 object at 0x7fc7c11cbc40>


In [3]:
# Necessário percorrer todas as páginas. O S3 faz paginação de 1000 em 1000 objetos, por conta disso estou percorrendo todas as páginas para capturar todos os objetos.

paginator = s3.get_paginator('list_objects_v2')
pages = paginator.paginate(Bucket=bucket_name, Prefix=prefix)

list_matches = []

# Adiciona na lista de jogs todos os nomes de arquivos sem extensão
for page in pages:
    for obj in page["Contents"]:
        # pega o nome dos arquivos sem prefixo e sem extensão
        file_name = str(obj["Key"].replace(prefix, "")).replace(".json", "")
        list_matches.append(file_name)
print(list_matches)


['15946', '15956', '15973', '15978', '15986', '15998', '16010', '16023', '16029', '16056', '16073', '16079', '16086', '16095', '16109', '16120', '16131', '16136', '16149', '16157', '16173', '16182', '16190', '16196', '16205', '16215', '16231', '16240', '16248', '16265', '16275', '16289', '16306', '16317', '18235', '18236', '18237', '18240', '18241', '18242', '18243', '18244', '18245', '19714', '19715', '19716', '19717', '19718', '19719', '19720', '19722', '19723', '19724', '19725', '19726', '19727', '19728', '19729', '19730', '19731', '19732', '19733', '19734', '19735', '19736', '19737', '19738', '19739', '19740', '19741', '19742', '19743', '19744', '19745', '19746', '19747', '19748', '19749', '19750', '19751', '19752', '19753', '19754', '19755', '19756', '19757', '19758', '19759', '19760', '19761', '19762', '19763', '19764', '19765', '19766', '19767', '19768', '19769', '19770', '19771', '19772', '19773', '19774', '19775', '19776', '19777', '19778', '19779', '19780', '19781', '19782', 

In [31]:
# Percorre toda lista de jogos e cria a coluna 'match_id' e joga na camada landing
# Obs: Optei por não utilizar o partitionBy pois os arquivos já estão separados. 

for match in list_matches:
    path_lineups_landing = os.path.join("s3://","landing","lineups","match_id="+match)
    path_lineups =  os.path.join("s3://",bucket_name,prefix,match+".json")
    df_lineups = spark.read.option("multiline", "true").json(path_lineups)
    df_lineups_matchid = df_lineups.withColumn("match_id", F.lit(match))
    df_lineups_matchid.write.format("json").save(path_lineups_landing)

In [12]:
print(list_matches)

['15946', '15956', '15973', '15978', '15986', '15998', '16010', '16023', '16029', '16056', '16073', '16079', '16086', '16095', '16109', '16120', '16131', '16136', '16149', '16157', '16173', '16182', '16190', '16196', '16205', '16215', '16231', '16240', '16248', '16265', '16275', '16289', '16306', '16317', '18235', '18236', '18237', '18240', '18241', '18242', '18243', '18244', '18245', '19714', '19715', '19716', '19717', '19718', '19719', '19720', '19722', '19723', '19724', '19725', '19726', '19727', '19728', '19729', '19730', '19731', '19732', '19733', '19734', '19735', '19736', '19737', '19738', '19739', '19740', '19741', '19742', '19743', '19744', '19745', '19746', '19747', '19748', '19749', '19750', '19751', '19752', '19753', '19754', '19755', '19756', '19757', '19758', '19759', '19760', '19761', '19762', '19763', '19764', '19765', '19766', '19767', '19768', '19769', '19770', '19771', '19772', '19773', '19774', '19775', '19776', '19777', '19778', '19779', '19780', '19781', '19782', 

In [32]:
# Ler escalacoes na camada landing
path_lineups_landing = os.path.join("s3://","landing","lineups")
df_lineups_landing = spark.read.json(path_lineups_landing)
df_lineups_landing.show()
print(df_lineups_landing.count())

24/01/23 03:18:44 WARN DataSource: Found duplicate column(s) in the data schema and the partition schema: `match_id`


+--------------------+--------+-------+--------------------+
|              lineup|match_id|team_id|           team_name|
+--------------------+--------+-------+--------------------+
|[{[], {214, Spain...| 3902240|    863|       Spain Women's|
|[{[], {160, Nethe...| 3902240|    851| Netherlands Women's|
|[{[], {241, Unite...| 3857278|   1839|       United States|
|[{[], {107, Iran,...| 3857278|    797|                Iran|
|[{[], {31, Brazil...| 3773466|    217|           Barcelona|
|[{[], {214, Spain...| 3773466|    209|          Celta Vigo|
|[{[], {121, Korea...| 3893802|   1211|Korea Republic Wo...|
|[{[], {49, Colomb...| 3893802|  16802|    Colombia Women's|
|[{[{Yellow Card, ...| 3869321|    941|         Netherlands|
|[{[], {11, Argent...| 3869321|    779|           Argentina|
|[{[{Yellow Card, ...| 3869685|    771|              France|
|[{[], {11, Argent...| 3869685|    779|           Argentina|
|[{[], {121, Korea...| 3857262|    791|         South Korea|
|[{[], {183, Portu...| 3



6526


                                                                                

In [34]:
# Ler escalacoes de um jogo específico
path_lineups_landing = os.path.join("s3://","landing","lineups","match_id=7298")
df_lineups_landing = spark.read.json(path_lineups_landing)
df_lineups_landing.show()
print(df_lineups_landing.count())

+--------------------+--------+-------+-------------------+
|              lineup|match_id|team_id|          team_name|
+--------------------+--------+-------+-------------------+
|[{[], {220, Swede...|    7298|    971|        Chelsea FCW|
|[{[], {220, Swede...|    7298|    746|Manchester City WFC|
+--------------------+--------+-------+-------------------+

2


## Particao Events

In [5]:

MINIO_ENDPOINT = "http://storage:9000"

s3_endpoint_url = MINIO_ENDPOINT
aws_access_key_id = "root"
aws_secret_access_key = "password"
bucket_name = "sor"
prefix = "football/data/events/"

s3 = boto3.client(
                            "s3",
                            endpoint_url=s3_endpoint_url,
                            aws_access_key_id=aws_access_key_id,
                            aws_secret_access_key=aws_secret_access_key
                        )

print(s3)


<botocore.client.S3 object at 0x7fbdcaf13580>


In [6]:
paginator = s3.get_paginator('list_objects_v2')
pages = paginator.paginate(Bucket=bucket_name, Prefix=prefix)

list_matches = []

for page in pages:
    for obj in page["Contents"]:
        # Adiciona na lista de jogs todos os nomes de arquivos sem extensão
        # pega o nome dos arquivos sem prefixo e sem extensão
        file_name = str(obj["Key"].replace(prefix, "")).replace(".json", "")
        list_matches.append(file_name)
print(len(list_matches))




3263


In [7]:
# Percorre toda lista de jogos e cria a coluna 'match_id' e joga na camada landing
# Obs: Optei por não utilizar o partitionBy pois os arquivos já estão separados. 

for match in list_matches:
    path_events_landing = os.path.join("s3://","landing","events","match_id="+match)
    path_events =  os.path.join("s3://",bucket_name,prefix,match+".json")
    df_events = spark.read.option("multiline", "true").json(path_events)
    df_events_matchid = df_events.withColumn("match_id", F.lit(match))
    df_events_matchid.write.format("json").mode("overwrite").save(path_events_landing)

24/01/20 03:43:40 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
                                                                                

In [36]:
df_events_landing = spark.read.json(os.path.join("s3://","landing","events", "match_id=7298"))
df_events_landing.show()
print(df_events_landing.count())

+-------------+-----------------+-------------+-----+---------------+------------+-------+----+--------+--------------+--------+----------+--------------------+-----+---------------+------------+-------------+--------+------+--------------------+------+-------------------+--------------------+--------------------+----------+--------------------+--------------------+------+----+------------+--------------------+--------------------+------------+-------------------+--------------+
|bad_behaviour|     ball_receipt|ball_recovery|block|          carry|counterpress|dribble|duel|duration|foul_committed|foul_won|goalkeeper|                  id|index|injury_stoppage|interception|     location|match_id|minute|                pass|period|       play_pattern|              player|            position|possession|     possession_team|      related_events|second|shot|substitution|             tactics|                team|   timestamp|               type|under_pressure|
+-------------+-----------------