
####  Ingerindo o arquivo torneios ###

Objetivo: realizar um processo de ETL e disponibilizar os dados em uma camada apropriada

Ler -> Transformar -> Escrever (ETL)

1. Ler o arquivo csv do Data Lake
2. Renomear as colunas para português utilizando o snake case
3. Definir o esquema de dados correto
4. Precisa ter uma coluna com a data em que o arquivo foi consumido
5. Salvar os dados em formato parquet na camada silver




### Importando as bibliotecas necessárias

In [None]:
from pyspark.sql.functions import current_timestamp, lit #função que traz a data atual
from pyspark.sql.types import BooleanType # função que retorna o tipo booleano

### Lendo o arquivo csv

In [None]:
nome_arquivo = "torneios"
caminho_arquivo = f"/mnt/datalake/bronze/{nome_arquivo}.csv"

torneios_df = spark.read
.option("encondig", "UTF-16")
.option("header", "true")
.option("inferSchema", "true")
.csv(caminho arquivo)

display(torneios_df)

In [None]:
# Visualizar o schema do dataframe criado
torneios_df.printSchema()

### Renomeando colunas

In [None]:
# Renomear as colunas para português

torneios_df_renomeado = torneios_df \
    .withColumnRenamed("league_id", "id_torneio") \
    .withColumnRenamed("league_name", "nm_torneio") \
    .withColumnRenamed("country_name", "nm_pais") \
    .withColumnRenamed("country_code", "cd_pais") \
    .withColumnRenamed("season_year", "ano_torneio") \
    .withColumnRenamed("season_start", "dt_inicio") \
    .withColumnRenamed("season_end", "dt_fim") \
    .withColumnRenamed("season_curent", "e_atual")

display(torneios_df_renomeado)

In [None]:
# Incluir a coluna de data de execução do script

torneios_df_data = torneios_df_renomeado.withColumn("dt_ingestao", current_timestamp())
display(torneios_df_data)

In [None]:
# Converter a coluna e_atual para o tipo lógico (está salvo como string)
torneios_df_retipado = torneios_df_data.withColumn("e_atual", torneios_df_data.e_atual.cast(BooleanType()))

In [None]:
# Inclusão de uma coluna com o texto literal "Brasil" em todas as linhas - desafio adicional
torneios_df_final = torneios_df_retipado.withColumn("nm_traduzido", lit("Brasil"))

In [None]:
# essa parte escreve o arquivo formato parquet na pasta silver
# Se o arquivo já existir, ele irá sobrescrever
torneios_df_final.write.mode("overwrite").parquet(f"/mnt/datalake/silver/{nome_arquivo}")