## Criação das partições de competition

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as sf

In [2]:
spark = SparkSession.builder.getOrCreate()

SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/home/glue_user/spark/jars/log4j-slf4j-impl-2.17.2.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/home/glue_user/spark/jars/slf4j-reload4j-1.7.36.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/home/glue_user/aws-glue-libs/jars/log4j-slf4j-impl-2.17.2.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/home/glue_user/aws-glue-libs/jars/slf4j-reload4j-1.7.36.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.apache.logging.slf4j.Log4jLoggerFactory]
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


In [4]:
df_competition = spark.read.option("multiline", "true").json("s3://sor/football/data/competitions.json")

24/01/09 01:51:37 WARN MetricsConfig: Cannot locate configuration: tried hadoop-metrics2-s3a-file-system.properties,hadoop-metrics2.properties
                                                                                

In [50]:
partitions = df_competition.select('season_id').distinct().orderBy("season_id").collect()
print(partitions)

[Row(season_id=1), Row(season_id=2), Row(season_id=3), Row(season_id=4), Row(season_id=21), Row(season_id=22), Row(season_id=23), Row(season_id=24), Row(season_id=25), Row(season_id=26), Row(season_id=27), Row(season_id=30), Row(season_id=37), Row(season_id=38), Row(season_id=39), Row(season_id=40), Row(season_id=41), Row(season_id=42), Row(season_id=43), Row(season_id=44), Row(season_id=48), Row(season_id=51), Row(season_id=54), Row(season_id=55), Row(season_id=68), Row(season_id=71), Row(season_id=75), Row(season_id=76), Row(season_id=84), Row(season_id=86), Row(season_id=90), Row(season_id=106), Row(season_id=107), Row(season_id=108), Row(season_id=235), Row(season_id=268), Row(season_id=269), Row(season_id=270), Row(season_id=272), Row(season_id=274), Row(season_id=275), Row(season_id=276), Row(season_id=277), Row(season_id=278), Row(season_id=279)]


In [62]:
dataframes = {}
for partition in partitions:
    partition_value = partition['season_id']
    partition_competition = df_competition.filter(df_competition['season_id'] == partition_value)
    dataframes[partition_value] = partition_competition

In [73]:
# Caminho base no S3
base_path = "s3://landing/"

# Iterar sobre os DataFrames e salvá-los no S3
for partition_value, dataframe in dataframes.items():
    path = f"{base_path}/competitions/season={partition_value}"
    dataframe.write.json(path, mode='overwrite')

In [78]:
read_df = spark.read.json(f"{base_path}/competitions")
read_df.show()
read_df.count()

+------------------+--------------+-------------------------+--------------------+-----------------+--------------------+--------------------+--------------------+--------------------+--------------------+---------+-----------+------+
|competition_gender|competition_id|competition_international|    competition_name|competition_youth|        country_name|     match_available| match_available_360|       match_updated|   match_updated_360|season_id|season_name|season|
+------------------+--------------+-------------------------+--------------------+-----------------+--------------------+--------------------+--------------------+--------------------+--------------------+---------+-----------+------+
|              male|             9|                    false|       1. Bundesliga|            false|             Germany|2023-12-12T07:43:...|                null|2023-12-12T07:43:...|                null|       27|  2015/2016|    27|
|              male|            16|                    false

70

## Criação das partições de Matches

In [73]:

# obs Isso aqui é um paliativo. O correto é ser automatizado.
# Criação da lista manual de competicoes e jogos

competitions_seasons = ["2/27", "2/44", "7/27", "7/108", "7/235", "9/27",
                        "11/1", "11/2", "11/4", "11/21", "11/22", "11/23", "11/24", "11/25", "11/26",
                        "11/27", "11/37", "11/38", "11/39", "11/40", "11/41", "11/42", "11/90", "11/278",
                        "12/27","12/86", "16/1", "16/2", "16/4", "16/21", "16/22", "16/23", "16/24", "16/25",
                        "16/26", "16/27", "16/37", "16/39", "16/41", "16/44", "16/71", "16/76", "16/276", "16/277",
                        "35/75","37/4", "37/42", "37/90", "43/3", "43/51", "43/54", "43/55", "43/106", "43/269", "43/270" ,"43/272",
                        "44/107", "49/3", "53/106", "55/43", "72/30", "72/107", "81/48", "81/275", "87/84", "87/268", "87/279", "116/68",
                        "1238/108", "1470/274"]

In [68]:
# Percorrer todos os arquivos e appendar todos os registros de partidas num só dataframe

# Outra solução paliativa:
# O arquivo da competição 11 e partida 278 está com uma coluna a menos, não tem a coluna de referee.
# Decidi fazer uma validação na hora do empilhamento da base, ignorando arquivos que não tenha quantidade de colunas do primeiro arquivo.

for competition_season in competitions_seasons: 
     
    # Leitura dos jogos por competicao e temporada
    df_match = spark.read.option("multiline", "true").json(f"s3://sor/football/data/matches/{competition_season}.json")
    
    # Quebra os valores de competição e temporada 
    competition_season_split = competition_season.split("/")
    
    # Recebe o primeiro valor que é a competição
    competition_id = competition_season_split[0]
    
    # Recebe o segundo valor que é a temporada
    season_id = competition_season_split[1]
    
    # Criação do prefixo para as partições de competição
    prefixo_competition = "competition_id="
    
    # Criação do prefixo para as partições de temporada
    prefixo_season = "season_id="
    
    # Recebe o nome da partição competition
    partition_competition = prefixo_competition + competition_id
    
    # Recebe o nome da partição season
    partition_season = prefixo_season + season_id
    
    # FIltra Datarame e Salva no bucket se o dataframe tiver pelo menos 1 registro
    if df_match.count() >= 1 : 
        
        df_partition_season = df_match.filter( \
                                                (sf.col("competition").getItem("competition_id") == competition_id) \
                                                & (sf.col("season").getItem("season_id") == season_id) \
                                            )
        
        df_partition_season.write.format("json").option("multiline", "true").save(f"s3://landing/matches/{partition_competition}/{partition_season}", mode='overwrite')
        
    

2/27
2/44
7/27
7/108
7/235
9/27
11/1
11/2
11/4
11/21
11/22
11/23
11/24
11/25
11/26
11/27
11/37
11/38
11/39
11/40
11/41
11/42
11/90
11/278
12/27
12/86
16/1
16/2
16/4
16/21
16/22
16/23
16/24
16/25
16/26
16/27
16/37
16/39
16/41
16/44
16/71
16/76
16/276
16/277
35/75
37/4
37/42
37/90
43/3
43/51
43/54
43/55
43/106
43/269
43/270
43/272
44/107
49/3
53/106
55/43
72/30
72/107
81/48
81/275
87/84
87/268
87/279
116/68
1238/108
1470/274


# análise de casos de matches

In [64]:
# Comparação de valores
competition_season = "2/27"
df_matche_competition =  spark.read.option("multiline", "true").json(f"s3://sor/football/data/matches/{competition_season}.json")

print(df_matche_competition.count())

df_matche_competition.show()

380
+----------+--------------------+--------------------+-------------------+----------+--------------------+------------+--------------------+--------------------+----------+--------+------------+----------------+----------+-------------+--------------------+---------------+--------------------+
|away_score|           away_team|         competition|  competition_stage|home_score|           home_team|    kick_off|        last_updated|    last_updated_360|match_date|match_id|match_status|match_status_360|match_week|     metadata|             referee|         season|             stadium|
+----------+--------------------+--------------------+-------------------+----------+--------------------+------------+--------------------+--------------------+----------+--------+------------+----------------+----------+-------------+--------------------+---------------+--------------------+
|         0|{male, null, 28, ...|{2, Premier Leagu...|{1, Regular Season}|         0|{{68, England}, m...|16:00

In [70]:
# Consulta por competicao e temporada

competition_id = "2"
season_id = "27"

df_competition2_season27 = spark.read.json(f"s3://landing/matches/competition_id={competition_id}/season_id={season_id}")
print(df_competition2_season27.count())
df_competition2_season27.show()



418
+----------+--------------------+--------------------+-------------------+----------+--------------------+------------+--------------------+--------------------+----------+--------+------------+----------------+----------+-------------+--------------------+---------------+--------------------+---------+
|away_score|           away_team|         competition|  competition_stage|home_score|           home_team|    kick_off|        last_updated|    last_updated_360|match_date|match_id|match_status|match_status_360|match_week|     metadata|             referee|         season|             stadium|season_id|
+----------+--------------------+--------------------+-------------------+----------+--------------------+------------+--------------------+--------------------+----------+--------+------------+----------------+----------+-------------+--------------------+---------------+--------------------+---------+
|         0|{male, 28, AFC Bo...|{2, Premier Leagu...|{1, Regular Season}|       

In [72]:
# consulta por competicao, exibindo temporadas distintas
df_competition2 = spark.read.json(f"s3://landing/matches/competition_id={competition_id}")
df_competition2.select("season_id").distinct().show()

+---------+
|season_id|
+---------+
|       27|
|       44|
+---------+



In [None]:
df_competition_season = spark.read.json(f"s3://landing/matches/")
print(df_competition_season.count())
df_competition_season.show()


## Particao LineUp

In [None]:
df_lineup = spark.read.json