In [0]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F

In [0]:
def create_spark_session(app_name: str) -> SparkSession:
    spark = SparkSession.builder.appName(app_name).getOrCreate()
    return spark

In [0]:
app_name = "Desafio"
spark = create_spark_session(app_name)

## BRONZE

In [0]:
path_arquivo_atendimento = "/Volumes/workspace/desafio/raw"

df = (spark.read.csv(path_arquivo_atendimento, sep="\t", header=True))

In [0]:
bronze_table = "workspace.desafio.bronze_tb"
df.write.saveAsTable(bronze_table)

In [0]:
spark.sql(f""" SELECT * FROM {bronze_table}""").show(10)

+--------------+----------------+--------------+--------------------+---+---------------------+-----------------+-----------------------+-----------------+---------------+--------------------+---------------+
|ID_Atendimento|Data_Atendimento|   CPF_Cliente|        Nome_Cliente| UF|Categoria_Atendimento|Canal_Atendimento|Duracao_Atendimento_Min|Valor_Em_Cobranca|Parcelas_Acordo|Situacao_Atendimento|ID_Especialista|
+--------------+----------------+--------------+--------------------+---+---------------------+-----------------+-----------------------+-----------------+---------------+--------------------+---------------+
|             1|      2023-11-01|267.845.903-92|        Lucca Mendes| DF|   Cobrança Receptiva|           E-mail|                     42|          3735.89|              3|           Resolvido|           1080|
|             2|      2024-05-22|351.982.074-97|Dr. Vitor Hugo Ca...| SC|           Negociação|              SMS|                     41|          2754.61|         

## SILVER

In [0]:
# Path
bronze_table = "workspace.desafio.bronze_tb"

(spark.read.table(bronze_table).printSchema)


<bound method DataFrame.printSchema of DataFrame[ID_Atendimento: string, Data_Atendimento: string, CPF_Cliente: string, Nome_Cliente: string, UF: string, Categoria_Atendimento: string, Canal_Atendimento: string, Duracao_Atendimento_Min: string, Valor_Em_Cobranca: string, Parcelas_Acordo: string, Situacao_Atendimento: string, ID_Especialista: string]>

Data Types esperados: 
  - ID_Atendimento:**integer**, 
  - Data_Atendimento:**date**,
  - CPF_Cliente:**string**,
  - Nome_Cliente:**string**,
  - UF:**string**,
  - Categoria_Atendimento:**string**,
  - Canal_Atendimento:**string**,
  - Duracao_Atendimento_Min:**integer**,
  - Valor_Em_Cobranca:**double**,
  - Parcelas_Acordo:**integer**,
  - Situacao_Atendimento:**string**,
  - ID_Especialista:**integer**

In [0]:
def silver_layer(spark: SparkSession, bronze_table: str):
  df = spark.read.table(bronze_table)

  df = df.withColumn("ID_Atendimento", F.col("ID_Atendimento").cast("int")) \
                    .withColumn("Data_Atendimento", F.to_date(F.col("Data_Atendimento")))\
                    .withColumn("Valor_Em_Cobranca", F.col("Valor_Em_Cobranca").cast("double"))\
                    .withColumn("Parcelas_Acordo", F.col("Parcelas_Acordo").cast("double"))\
                    .withColumn("Duracao_Atendimento_Min", F.col("Duracao_Atendimento_Min"))
  
  df_medio_por_especialista = df.groupBy("ID_Especialista") \
    .agg(F.avg("Duracao_Atendimento_Min").alias("Tempo_Medio_Atendimento_Min"))

  # Flag Fechou_Acordo
  df = df.withColumn(
      "Fechou_Acordo",
      F.when(
          (F.col("Situacao_Atendimento") == "Resolvido") &
          (F.col("Categoria_Atendimento") == "Acordo Fechado"), 
          True
      ).otherwise(False)
  )

  # Unir os dados de tempo médio com a tabela Silver
  df_final = df.join(df_medio_por_especialista, on="ID_Especialista", how="left")

  return df_final

# Path
silver_table = "workspace.desafio.silver_tb"

silver_df = silver_layer(spark, bronze_table)
silver_df.write.saveAsTable(silver_table)

In [0]:
(spark.sql(f""" SELECT * FROM {silver_table}""").show(10))

+---------------+--------------+----------------+--------------+--------------------+---+---------------------+-----------------+-----------------------+-----------------+---------------+--------------------+-------------+---------------------------+
|ID_Especialista|ID_Atendimento|Data_Atendimento|   CPF_Cliente|        Nome_Cliente| UF|Categoria_Atendimento|Canal_Atendimento|Duracao_Atendimento_Min|Valor_Em_Cobranca|Parcelas_Acordo|Situacao_Atendimento|Fechou_Acordo|Tempo_Medio_Atendimento_Min|
+---------------+--------------+----------------+--------------+--------------------+---+---------------------+-----------------+-----------------------+-----------------+---------------+--------------------+-------------+---------------------------+
|           1080|             1|      2023-11-01|267.845.903-92|        Lucca Mendes| DF|   Cobrança Receptiva|           E-mail|                     42|          3735.89|            3.0|           Resolvido|        false|         29.1038461538461

## GOLD

In [0]:
def gold_layer(spark: SparkSession, silver_table: str):
  df = (spark.read.table(silver_table))

  # Extrair o mês e ano da coluna Data_Atendimento
  df_gold = df.withColumn(
    "Ano_Mes", 
    F.date_format(F.col("Data_Atendimento"), "yyyy-MM")  # Formato "Ano-Mês"
)

  # Agregar as métricas por UF e Ano-Mês
  df_gold_agg = df_gold.groupBy("UF", "Ano_Mes").agg(
      F.count("ID_Atendimento").alias("Total_Atendimentos"),
      F.avg("Valor_Em_Cobranca").alias("Valor_Medio_Em_Cobranca"),
      # % de acordos fechados
      (F.sum(F.when(F.col("Fechou_Acordo") == True, 1).otherwise(0)) / F.count("ID_Atendimento") * 100).alias("Percentual_Acordos_Fechados")
  )

  return df_gold_agg

def salvar_delta(df, gold_table: str):
  return df.write.format("delta").mode("overwrite").saveAsTable(gold_table)


# Path
silver_table = "workspace.desafio.silver_tb"
gold_table_name = "workspace.desafio.gold_tb"

df_gold = gold_layer(spark, silver_table)
salvar_delta(df_gold, gold_table_name)

In [0]:
(spark.sql(f""" SELECT * FROM {gold_table_name}""").show())

+---+-------+------------------+-----------------------+---------------------------+
| UF|Ano_Mes|Total_Atendimentos|Valor_Medio_Em_Cobranca|Percentual_Acordos_Fechados|
+---+-------+------------------+-----------------------+---------------------------+
| BA|2023-11|              1008|      2577.343214285716|          4.761904761904762|
| RJ|2024-11|               956|     2548.2474267782436|          4.497907949790795|
| MG|2023-08|              1068|     2475.0291292134857|          4.588014981273409|
| RJ|2025-01|              1082|     2564.3735120147853|         5.0831792975970425|
| MG|2023-11|              1052|      2530.586055133082|          4.942965779467681|
| RS|2025-06|               568|     2530.2885563380273|          5.809859154929577|
| CE|2023-12|              1068|     2567.2138202247197|           4.49438202247191|
| PR|2023-06|               467|     2575.7170235546005|          5.139186295503212|
| MG|2023-07|              1007|     2585.9442899702035|         