In [0]:
"""   Neste notebook construída a camada Silver responsável por integrar os dados de afastamentos, remuneração e cadastro de servidores.
"""

In [0]:
from pyspark.sql import functions as F



In [0]:
df_cadastro = spark.table("bronze_cadastro")
df_remuneracao = spark.table("bronze_remuneracao")
df_afastamentos = spark.table("bronze_afastamentos")


In [0]:
df_silver_cadastro = (
    df_cadastro
    .select(
        F.col("CPF").alias("cod_cpf"),
        F.col("NOME").alias("nom_servidor"),
        F.col("DESCRICAO_CARGO").alias("cargo")
    )
    .dropDuplicates(["cod_cpf"])
)


In [0]:
df_silver_remuneracao = (
    df_remuneracao
    .select(
        "cod_cpf",
        "num_ano",
        "num_mes",
        "vlr_remuneracao_bruta_brl"
    )
    .withColumn(
        "vlr_remuneracao_bruta_brl",
        F.regexp_replace("vlr_remuneracao_bruta_brl", ",", ".").cast("double")
    )
    .filter(F.col("vlr_remuneracao_bruta_brl").isNotNull())
)


In [0]:
df_silver_afastamentos = (
    df_afastamentos
    .select(
        F.col("CPF").alias("cod_cpf"),
        F.col("DATA_INICIO_AFASTAMENTO").alias("dt_inicio_afastamento"),
        F.col("DATA_FIM_AFASTAMENTO").alias("dt_fim_afastamento")
    )
    .filter(
        F.col("dt_inicio_afastamento").isNotNull() &
        F.col("dt_fim_afastamento").isNotNull()
    )
    .withColumn(
        "dias_afastados",
        F.datediff("dt_fim_afastamento", "dt_inicio_afastamento") + F.lit(1)
    )
)


In [0]:
df_base = (
    df_silver_remuneracao
    .join(df_silver_cadastro, on="cod_cpf", how="left")
)


In [0]:
df_base = (
    df_base
    .join(df_silver_afastamentos, on="cod_cpf", how="inner")
)


In [0]:
df_silver_case2 = (
    df_base
    .withColumn(
        "remuneracao_diaria",
        F.col("vlr_remuneracao_bruta_brl") / F.lit(30)
    )
    .withColumn(
        "impacto_financeiro",
        F.col("dias_afastados") * F.col("remuneracao_diaria")
    )
)


In [0]:
df_silver_case2.write \
    .format("delta") \
    .mode("overwrite") \
    .saveAsTable("silver_case2_afastamentos")


In [0]:
spark.table("silver_case2_afastamentos").count()

spark.table("silver_case2_afastamentos") \
    .select("cargo", "dias_afastados", "impacto_financeiro") \
    .orderBy(F.desc("impacto_financeiro")) \
    .show(10, truncate=False)


+-------------------------------------+--------------+------------------+
|cargo                                |dias_afastados|impacto_financeiro|
+-------------------------------------+--------------+------------------+
|TECNICO EM ASSUNTOS EDUCACIONAIS     |1461          |1277288.5029999998|
|PROFESSOR ENS BASICO TECN TECNOLOGICO|1096          |823746.6586666668 |
|PROFESSOR DO MAGISTERIO SUPERIOR     |1461          |684624.6          |
|PROFESSOR ENS BASICO TECN TECNOLOGICO|1285          |655774.0499999999 |
|PROFESSOR ENS BASICO TECN TECNOLOGICO|1285          |655774.0499999999 |
|PROFESSOR DO MAGISTERIO SUPERIOR     |1097          |612234.603        |
|TECNICO EM ASSUNTOS EDUCACIONAIS     |1461          |544256.5900000001 |
|PROFESSOR ENS BASICO TECN TECNOLOGICO|1034          |527681.22         |
|PROFESSOR DO MAGISTERIO SUPERIOR     |1461          |520180.284        |
|TECNICO EM ENFERMAGEM                |629           |516729.37066666665|
+-------------------------------------