In [0]:
# Importando bibliotecas necessárias
from pyspark.sql.functions import col, year, month, dayofmonth, date_format, lit

# --- Configuração dos Caminhos ---

storage_account_name = "stacdatatrabalhoed01"


silver_path = f"abfss://silver@{storage_account_name}.dfs.core.windows.net"
gold_path = f"abfss://gold@{storage_account_name}.dfs.core.windows.net"

Criando Dimensões...
Dimensões criadas.
Criando Tabela Fato...
Processo de transformação para a camada Gold concluído.


In [0]:
# Carregar tabelas da Silver ---
df_paciente = spark.read.format("delta").load(f"{silver_path}/teste/paciente_enriquecido")
df_odontologista = spark.read.format("delta").load(f"{silver_path}/teste/odontologista")
df_procedimento = spark.read.format("delta").load(f"{silver_path}/teste/procedimento")
df_tipo_pagamento = spark.read.format("delta").load(f"{silver_path}/teste/tipo_pagamento")
df_consulta = spark.read.format("delta").load(f"{silver_path}/teste/consulta_consolidada")
df_consulta_procedimento = spark.read.format("delta").load(f"{silver_path}/teste/consulta_procedimento")

In [0]:
# --- Criação das Dimensões ---

print("Criando Dimensões...")

# 1. dim_paciente
dim_paciente = df_paciente.select(
    "id_paciente", "nome_paciente", "cpf_paciente", "genero",
    "data_nasc", "cidade", "estado", "pais"
)
dim_paciente.write.format("delta").mode("overwrite").save(f"{gold_path}/teste/dim_paciente")
spark.sql(f"CREATE TABLE IF NOT EXISTS gold_dim_paciente USING DELTA LOCATION '{gold_path}/teste/dim_paciente'")

# 2. dim_odontologista
dim_odontologista = df_odontologista.select(
    "id_odontologista", "nome_odontologista", "especialidade", "cro"
)
dim_odontologista.write.format("delta").mode("overwrite").save(f"{gold_path}/teste/dim_odontologista")
spark.sql(f"CREATE TABLE IF NOT EXISTS gold_dim_odontologista USING DELTA LOCATION '{gold_path}/teste/dim_odontologista'")


# 3. dim_procedimento
dim_procedimento = df_procedimento.select(
    "id_procedimento", "nome_procedimento", "descricao_procedimento"
)
dim_procedimento.write.format("delta").mode("overwrite").save(f"{gold_path}/teste/dim_procedimento")
spark.sql(f"CREATE TABLE IF NOT EXISTS gold_dim_procedimento USING DELTA LOCATION '{gold_path}/teste/dim_procedimento'")


# 4. dim_tipo_pagamento
dim_tipo_pagamento = df_tipo_pagamento.select(
    "id_tipo_pagamento", "descricao_tipo_pagamento"
)
dim_tipo_pagamento.write.format("delta").mode("overwrite").save(f"{gold_path}/teste/dim_tipo_pagamento")
spark.sql(f"CREATE TABLE IF NOT EXISTS gold_dim_tipo_pagamento USING DELTA LOCATION '{gold_path}/teste/dim_tipo_pagamento'")

# 5. dim_consulta
dim_consulta = df_consulta.select("id_consulta", "data_hora", "diagnostico", "tratamento", "id_agendamento")
dim_consulta.write.format("delta").mode("overwrite").save(f"{gold_path}/teste/dim_consulta")
spark.sql(f"CREATE TABLE IF NOT EXISTS gold_dim_consulta USING DELTA LOCATION '{gold_path}/teste/dim_consulta'")


# 6. dim_tempo (Gerada a partir dos dados)
# Usamos a data do pagamento como base para a dimensão tempo.
df_datas = df_consulta.select(col("data_pagamento").alias("data")).filter(col("data").isNotNull()).distinct()

dim_tempo = df_datas.select(
    col("data").cast("date").alias("id_tempo_data"), # Usar a própria data como chave
    col("data"),
    year("data").alias("ano"),
    month("data").alias("mes"),
    dayofmonth("data").alias("dia"),
    date_format(col("data"), "E").alias("dia_semana") # 'E' para nome do dia da semana
)
# Para usar um ID inteiro (id_tempo), você pode gerar com a função monotonically_increasing_id() ou zipWithIndex()
from pyspark.sql.functions import monotonically_increasing_id
dim_tempo = dim_tempo.withColumn("id_tempo", monotonically_increasing_id())

dim_tempo.write.format("delta").mode("overwrite").save(f"{gold_path}/teste/dim_tempo")
spark.sql(f"CREATE TABLE IF NOT EXISTS gold_dim_tempo USING DELTA LOCATION '{gold_path}/teste/dim_tempo'")

print("Dimensões criadas.")

In [0]:
# --- Criação da Tabela Fato ---

print("Criando Tabela Fato...")

df_base_fato = df_consulta_procedimento.alias("cp") \
    .join(df_consulta.alias("c"), col("cp.consulta_id_consulta") == col("c.id_consulta")) \
    .join(dim_tempo.alias("t"), col("c.data_pagamento").cast("date") == col("t.id_tempo_data")) \
    .select(
        col("c.id_consulta"),
        col("c.paciente_id_paciente").alias("id_paciente"),
        col("c.odontologista_id_odontologista").alias("id_odontologista"),
        col("cp.procedimento_id_procedimento").alias("id_procedimento"),
        col("c.tipo_pagamento_id_tipo_pagamento").alias("id_tipo_pagamento"),
        col("t.id_tempo"),
        col("c.valor_pago")
    ).withColumn("quantidade_procedimentos", lit(1)) # Cada linha representa um procedimento

# Renomeia colunas para corresponder exatamente ao schema da Fato Gold
fato_consulta_pagamento = df_base_fato.select(
    "id_consulta",
    "id_paciente",
    "id_odontologista",
    "id_procedimento",
    "id_tipo_pagamento",
    "id_tempo",
    "valor_pago",
    "quantidade_procedimentos"
).filter(col("valor_pago").isNotNull())

In [0]:
# Adiciona a surrogate key `id_fato`
fato_consulta_pagamento = fato_consulta_pagamento.withColumn("id_fato", monotonically_increasing_id())


fato_consulta_pagamento.write.format("delta").mode("overwrite").save(f"{gold_path}/teste/fato_consulta_pagamento")
spark.sql(f"CREATE TABLE IF NOT EXISTS gold_fato_consulta_pagamento USING DELTA LOCATION '{gold_path}/teste/fato_consulta_pagamento'")

print("Processo de transformação para a camada Gold concluído.")