In [None]:
12_continuidade_de_servico_indicadores_gerais_de_continuidade_de_servico

In [None]:
from pyspark.sql import SparkSession
    from pyspark.sql.functions import col, sha2, concat_ws, upper, trim, split, lpad

In [None]:
# ======================================
    # 0. Start Spark Session
    # ======================================
    spark = SparkSession.builder.getOrCreate()

In [None]:
# ======================================
    # 1. Load Bronze CSV
    # ======================================
    print("📥 Loading Bronze CSV...")
    bronze_path = "Files/bronze/12_continuidade_de_servico_indicadores_gerais_de_continuidade_de_servico/*.csv"
    df = spark.read.option("header", True).option("delimiter", ",").csv(bronze_path)

In [None]:
# Verificar se está mal delimitado (uma única coluna)
    if len(df.columns) == 1:
        print("⚠️ Detected malformed CSV (single column). Forcing manual split using ';' ...")
        expected_cols = [
            "ano", "nuts_iii", "codigo_concelho", "concelho", "zona_rqs",
            "saifi_at_num", "saidi_at_min", "maifi_at_num",
            "tiepi_mt_min", "end_mt_mwh",
            "saifi_mt_num", "saidi_mt_min", "maifi_mt_num",
            "saifi_bt_num", "saidi_bt_min"
        ]
        df = df.withColumn("split", split(col(df.columns[0]), ";"))
        for i, name in enumerate(expected_cols):
            df = df.withColumn(name, col("split").getItem(i))
        df = df.drop("split").drop(df.columns[0])

In [None]:
# ======================================
    # 2. Print Bronze Schema and Sample
    # ======================================
    print("✅ Bronze schema loaded:")
    df.printSchema()
    print(f"📊 Total Bronze rows: {df.count()}")

In [None]:
# ======================================
    # 3. Clean + Normalize + SK Generation
    # ======================================
    print("🧼 Cleaning and normalizing...")

In [None]:
df_clean = (
        df.withColumn("district", upper(trim(col("nuts_iii"))))
          .withColumn("municipality", upper(trim(col("concelho"))))
          .withColumn("codigo_concelho", lpad(trim(col("codigo_concelho")), 4, "0"))  # garantir 4 dígitos
          .withColumn("sk_municipality", sha2(concat_ws("-", "district", "municipality"), 256))
    )

In [None]:
# ======================================
    # 4. Preview Cleaned Data
    # ======================================
    print("🔍 Preview of cleaned data:")
    df_clean.select("codigo_concelho", "municipality", "district", "sk_municipality").show(10, truncate=False)

In [None]:
# ======================================
    # 5. Save to Silver Layer
    # ======================================
    silver_path = "Files/silver/12_continuidade_de_servico_indicadores_gerais_de_continuidade_de_servico_cleaned"
    print(f"💾 Saving cleaned data to: {silver_path}")
    df_clean.write.mode("overwrite").format("delta").save(silver_path)
    print("✅ Cleaned dataset saved to Silver layer.")

In [None]:
{"execution_finish_time":"2025-06-28T14:15:53.4933805Z","execution_start_time":"2025-06-28T14:15:24.5835861Z","livy_statement_state":"available","normalized_state":"finished","parent_msg_id":"461964c9-65f6-44cf-925b-0f9903846690","queued_time":"2025-06-28T14:15:24.5823726Z","session_id":"38d4c515-912e-48e0-808f-82a2c705192f","session_start_time":null,"spark_pool":null,"state":"finished","statement_id":5,"statement_ids":[5]}

In [None]:
📥 Loading Bronze CSV...
    ✅ Bronze schema loaded:
    root
     |-- ano: string (nullable = true)
     |-- nuts_iii: string (nullable = true)
     |-- codigo_concelho: string (nullable = true)
     |-- concelho: string (nullable = true)
     |-- zona_rqs: string (nullable = true)
     |-- saifi_at_num: string (nullable = true)
     |-- saidi_at_min: string (nullable = true)
     |-- maifi_at_num: string (nullable = true)
     |-- tiepi_mt_min: string (nullable = true)
     |-- end_mt_mwh: string (nullable = true)
     |-- saifi_mt_num: string (nullable = true)
     |-- saidi_mt_min: string (nullable = true)
     |-- maifi_mt_num: string (nullable = true)
     |-- saifi_bt_num: string (nullable = true)
     |-- saidi_bt_min: string (nullable = true)

In [None]:
📊 Total Bronze rows: 11120
    🧼 Cleaning and normalizing...
    🔍 Preview of cleaned data:
    +---------------+-----------------+----------------+----------------------------------------------------------------+
    |codigo_concelho|municipality     |district        |sk_municipality                                                 |
    +---------------+-----------------+----------------+----------------------------------------------------------------+
    |0806           |LAGOA            |ALGARVE         |87c88a4613182d9dd4d9c67127ffae9f7ec6cc19ac3765ed85ef951352edcdc8|
    |0802           |ALCOUTIM         |ALGARVE         |153c8701e194af333c983d4f6da2e9fb8e7027b76ddc3a2f6563bb6adee359cf|
    |0802           |ALCOUTIM         |ALGARVE         |153c8701e194af333c983d4f6da2e9fb8e7027b76ddc3a2f6563bb6adee359cf|
    |1509           |SANTIAGO DO CACÉM|ALENTEJO LITORAL|9d93c97b4d68506ed8448b7c6939fea257c9fdc505d9418a1a8af66d6bf030d3|
    |0801           |ALBUFEIRA        |ALGARVE         |68081357adc2636e3fdf27341130e238619f9971b9fa92a2954d24fc6e5f71d0|
    |0803           |ALJEZUR          |ALGARVE         |0db955640f1d153dea8deaf526cfa4bfa25d529bc06c0eb7a6e655d28daf3d27|
    |0807           |LAGOS            |ALGARVE         |afe3601fd44acfa81074a3abdb4ba6bb6fa9b15f1f3ba0f845596411fa498a49|
    |1513           |SINES            |ALENTEJO LITORAL|352d2d9ac3f7f8d3670196b306ef46f8b5644a44b9b66ccdc793fae85874a100|
    |0802           |ALCOUTIM         |ALGARVE         |153c8701e194af333c983d4f6da2e9fb8e7027b76ddc3a2f6563bb6adee359cf|
    |1513           |SINES            |ALENTEJO LITORAL|352d2d9ac3f7f8d3670196b306ef46f8b5644a44b9b66ccdc793fae85874a100|
    +---------------+-----------------+----------------+----------------------------------------------------------------+
    only showing top 10 rows

In [None]:
💾 Saving cleaned data to: Files/silver/12_continuidade_de_servico_indicadores_gerais_de_continuidade_de_servico_cleaned
    ✅ Cleaned dataset saved to Silver layer.

In [None]:
from pyspark.sql.functions import col

In [None]:
dim_parish = spark.read.table("DimParish").alias("dim")
    fact = consumos_normalized.alias("fact")

In [None]:
joined = fact.join(dim_parish, on=col("fact.sk_parish") == col("dim.sk_parish"), how="inner")

In [None]:
print(f"✅ Matched rows: {joined.count()}")

In [None]:
joined.select(
        col("fact.district").alias("fact_district"),
        col("fact.municipality").alias("fact_municipality"),
        col("fact.parish").alias("fact_parish"),
        col("dim.district").alias("dim_district"),
        col("dim.municipality").alias("dim_municipality"),
        col("dim.parish").alias("dim_parish")
    ).show(truncate=False)

In [None]:
{"execution_finish_time":"2025-06-15T10:40:24.147609Z","execution_start_time":"2025-06-15T10:40:21.8059866Z","livy_statement_state":"available","normalized_state":"finished","parent_msg_id":"0574c683-add2-4fae-845c-18ed7ac780e5","queued_time":"2025-06-15T10:40:21.8047574Z","session_id":"22509c8d-a70a-44d4-b413-7142381ca96a","session_start_time":null,"spark_pool":null,"state":"finished","statement_id":8,"statement_ids":[8]}

In [None]:
✅ Matched rows: 117042
    +-------------+--------------------+-----------------------+------------+--------------------+-----------------------+
    |fact_district|fact_municipality   |fact_parish            |dim_district|dim_municipality    |dim_parish             |
    +-------------+--------------------+-----------------------+------------+--------------------+-----------------------+
    |AVEIRO       |VAGOS               |GAFANHA DA BOA HORA    |AVEIRO      |VAGOS               |GAFANHA DA BOA HORA    |
    |AVEIRO       |VAGOS               |GAFANHA DA BOA HORA    |AVEIRO      |VAGOS               |GAFANHA DA BOA HORA    |
    |AVEIRO       |VAGOS               |OUCA                   |AVEIRO      |VAGOS               |OUCA                   |
    |AVEIRO       |VALE DE CAMBRA      |MACIEIRA DE CAMBRA     |AVEIRO      |VALE DE CAMBRA      |MACIEIRA DE CAMBRA     |
    |AVEIRO       |VALE DE CAMBRA      |MACIEIRA DE CAMBRA     |AVEIRO      |VALE DE CAMBRA      |MACIEIRA DE CAMBRA     |
    |BEJA         |ALJUSTREL           |ERVIDEL                |BEJA        |ALJUSTREL           |ERVIDEL                |
    |BEJA         |ALMODÔVAR           |ALDEIA DOS FERNANDES   |BEJA        |ALMODÔVAR           |ALDEIA DOS FERNANDES   |
    |BEJA         |ALVITO              |VILA NOVA DA BARONIA   |BEJA        |ALVITO              |VILA NOVA DA BARONIA   |
    |BEJA         |ALVITO              |VILA NOVA DA BARONIA   |BEJA        |ALVITO              |VILA NOVA DA BARONIA   |
    |BEJA         |BEJA                |BERINGEL               |BEJA        |BEJA                |BERINGEL               |
    |BEJA         |CUBA                |VILA ALVA              |BEJA        |CUBA                |VILA ALVA              |
    |BEJA         |FERREIRA DO ALENTEJO|FIGUEIRA DOS CAVALEIROS|BEJA        |FERREIRA DO ALENTEJO|FIGUEIRA DOS CAVALEIROS|
    |BEJA         |FERREIRA DO ALENTEJO|ODIVELAS               |BEJA        |FERREIRA DO ALENTEJO|ODIVELAS               |
    |BEJA         |MÉRTOLA             |ALCARIA RUIVA          |BEJA        |MÉRTOLA             |ALCARIA RUIVA          |
    |BEJA         |MÉRTOLA             |CORTE DO PINTO         |BEJA        |MÉRTOLA             |CORTE DO PINTO         |
    |BEJA         |MOURA               |AMARELEJA              |BEJA        |MOURA               |AMARELEJA              |
    |BEJA         |ODEMIRA             |VILA NOVA DE MILFONTES |BEJA        |ODEMIRA             |VILA NOVA DE MILFONTES |
    |BEJA         |ODEMIRA             |LONGUEIRA/ALMOGRAVE    |BEJA        |ODEMIRA             |LONGUEIRA/ALMOGRAVE    |
    |BEJA         |ODEMIRA             |COLOS                  |BEJA        |ODEMIRA             |COLOS                  |
    |BEJA         |OURIQUE             |SANTANA DA SERRA       |BEJA        |OURIQUE             |SANTANA DA SERRA       |
    +-------------+--------------------+-----------------------+------------+--------------------+-----------------------+
    only showing top 20 rows

In [None]:
02_consumos_faturados_por_codigo_postal

In [None]:
# ============================
    # Notebook 2: Normalize Bronze Dataset - 02_consumos_faturados_por_codigo_postal
    # ============================

In [None]:
from pyspark.sql import SparkSession
    from pyspark.sql.functions import col, sha2, concat_ws, upper, trim, split

In [None]:
spark = SparkSession.builder.getOrCreate()

In [None]:
# Path to the bronze dataset
    bronze_path = "Files/bronze/02_consumos_faturados_por_codigo_postal_ultimos_5_anos/*.csv"

In [None]:
# Read the dataset (with semicolon delimiter)
    df = spark.read.option("header", True).option("delimiter", ";").csv(bronze_path)

In [None]:
# Print schema to inspect initial structure
    print("Schema of loaded DataFrame:")
    df.printSchema()

In [None]:
# Split compound column if necessary (only first column was parsed)
    columns = ["ano", "mes", "date", "zipcode", "energia_ativa_kwh"]
    if len(df.columns) == 1:
        df = df.withColumn("split", split(col(df.columns[0]), ","))
        for i, name in enumerate(columns):
            df = df.withColumn(name, col("split").getItem(i))
        df = df.drop("split").drop(df.columns[0])

In [None]:
# Normalize zip code field and create surrogate key
    normalized_df = (
        df.withColumn("zipcode", trim(col("zipcode")))
          .withColumn("sk_zipcode", sha2(col("zipcode"), 256))
    )

In [None]:
# Show sample after transformation
    print("✅ Preview of normalized zipcode dataset:")
    normalized_df.select("zipcode", "sk_zipcode").show(truncate=False)

In [None]:
# Save to Silver layer
    output_path = "Files/silver/02_consumos_faturados_por_codigo_postal_ultimos_5_anos_cleaned.delta"
    normalized_df.write.mode("overwrite").format("delta").save(output_path)

In [None]:
print("✅ Bronze dataset '02_consumos_faturados_por_codigo_postal' normalized and saved to Silver layer.")

In [None]:
{"execution_finish_time":"2025-06-29T10:37:53.7981915Z","execution_start_time":"2025-06-29T10:37:45.8941004Z","livy_statement_state":"available","normalized_state":"finished","parent_msg_id":"cb0c95d8-e2f8-4352-a98c-ca446d09bac8","queued_time":"2025-06-29T10:37:45.8928837Z","session_id":"f219c06e-7574-471e-be5d-58be425e2185","session_start_time":null,"spark_pool":null,"state":"finished","statement_id":4,"statement_ids":[4]}

In [None]:
Schema of loaded DataFrame:
    root
     |-- ano,mes,date,codigo_postal,energia_ativa_kwh: string (nullable = true)

In [None]:
✅ Preview of normalized zipcode dataset:
    +-------+----------------------------------------------------------------+
    |zipcode|sk_zipcode                                                      |
    +-------+----------------------------------------------------------------+
    |4910   |0a38b4739f62ff43b1039f63641c1cd522c80d67f6218db8551f4cd2a8e1187c|
    |4970   |a0a82602907053ff2520baf33f02dc26daf3228099eb2b7c23f28b3c0f8b28cd|
    |4980   |f393ed8e6e581fb69cfb1a5cf6e3c362cfb309dab026c1b7af2c3be4008c9904|
    |5060   |3c2ea00c905c2d6de9299763ef81e9363a12f4ef5f0c7ff0a550a5b33d5df13a|
    |5070   |739ec77b846ad913811cc124579cc44f902f83f5bd4e89256ff0e826ddb64ce4|
    |5140   |2bd06acbea242c196ba2883861ebe895246d2f186d90c823f9ed475353b3cda7|
    |5155   |5a8571752ba7e7a2c0d8f7abda39932a2baab4292efb0d62232e585bfff56735|
    |5160   |9690463a9b39796b7f38d97cb7a5f62eaea6496b3f7ec2cd65ab8987765e7985|
    |5300   |7f7e4d7eba491f6fdf3b6c2db5485b75b516cd2c1415cafe2f930d8e7e791511|
    |5425   |44246e7c147681e311b4227338efe9a91d7b87a1f63f16fd5eca8cf3767daaff|
    |5445   |3eac69e760d9ec571e57ed806a244168f12b4374af47f53d983d71ca6d01b61b|
    |5450   |c54bea44135df61e37bdca547a01c223ff648ddf790677cafe9d30a484cac7c8|
    |5460   |7fed43c640957555ddac588be64822538078409a0acdaf22126623203ef9954a|
    |6030   |f18af665c04861d0e2d82a1fd57687173267c064b089c264caaef3359daf7372|
    |6050   |e2868eed331751cb9b25391c47df29597e2d5579068a46b7f989e521bdd3499b|
    |6090   |0e74fa5f84db6f9d5f38b50b205450660387ad8cd310c303f2a0f73e52ea6489|
    |6100   |b8b3a9403ceee5e6e0d0ba51773e6195c2227e0e55dbd9aa348b4999feeb8a2d|
    |6300   |defdae444b7e18190479ab79b244be809e144bc9a8d6033426919efcf51ceb06|
    |6400   |a7dccef9ce1ae31c49a4cfc484f8411dd52b11c914c1fe3d781633642d1e1327|
    |7000   |b698d86c67a2cff80405bd47af322216c552fd3a52f9c58a70f7b3a3313895b1|
    +-------+----------------------------------------------------------------+
    only showing top 20 rows

In [None]:
✅ Bronze dataset '02_consumos_faturados_por_codigo_postal' normalized and saved to Silver layer.

In [None]:
15_ordens_de_servico

In [None]:
# ============================
    # Normalize Bronze Dataset - 15_ordens_de_servico
    # ============================

In [None]:
from pyspark.sql import SparkSession
    from pyspark.sql.functions import col, sha2, concat_ws, upper, trim, split

In [None]:
spark = SparkSession.builder.getOrCreate()

In [None]:
# Caminho para o bronze dataset
    bronze_path = "Files/bronze/15_ordens_de_servico/*.csv"

In [None]:
# 🔍 Leitura com verificação defensiva
    df = (
        spark.read.option("header", True)
                  .option("delimiter", ",")
                  .option("encoding", "UTF-8")
                  .option("multiLine", False)
                  .csv(bronze_path)
    )

In [None]:
# Debug inicial
    print("🔍 Pré-visualização do dataset lido:")
    df.show(5, truncate=False)

In [None]:
# Verificar se só foi lida uma coluna (problema comum)
    expected_cols = [
        "ano", "mes", "data", "distrito", "concelho", "freguesia",
        "tipo_de_servico", "ordens_servico_realizadas",
        "coddistrito", "coddistritoconcelho", "coddistritoconcelhofreguesia"
    ]

In [None]:
if len(df.columns) == 1:
        df = df.withColumn("split", split(col(df.columns[0]), ","))
        for i, name in enumerate(expected_cols):
            df = df.withColumn(name, col("split").getItem(i))
        df = df.drop("split").drop(df.columns[0])

In [None]:
# ✅ Normalização e criação das surrogate keys
    normalized_df = (
        df.withColumn("district", upper(trim(col("distrito"))))
          .withColumn("municipality", upper(trim(col("concelho"))))
          .withColumn("parish", upper(trim(col("freguesia"))))
          .withColumn("tipo_de_servico", upper(trim(col("tipo_de_servico"))))
          .withColumn("sk_district", sha2(col("district"), 256))
          .withColumn("sk_municipality", sha2(concat_ws("-", "district", "municipality"), 256))
          .withColumn("sk_parish", sha2(concat_ws("-", "district", "municipality", "parish"), 256))
    )

In [None]:
# 🧼 Manter apenas colunas úteis (ordem correta)
    normalized_df = normalized_df.select(
        "ano", "mes", "data",
        "district", "municipality", "parish",
        "tipo_de_servico", "ordens_servico_realizadas",
        "coddistrito", "coddistritoconcelho", "coddistritoconcelhofreguesia",
        "sk_district", "sk_municipality", "sk_parish"
    )

In [None]:
# 💾 Guardar em Silver
    output_path = "Files/silver/15_ordens_de_servico_cleaned.delta"
    normalized_df.write.mode("overwrite").format("delta").save(output_path)
    print("✅ Cleaned dataset saved to Silver layer.")

In [None]:
# 🗃️ (Opcional) Criar tabela SQL para referência
    spark.sql("DROP TABLE IF EXISTS 15_ordens_de_servico_cleaned")
    spark.sql(f"""
        CREATE TABLE 15_ordens_de_servico_cleaned
        USING DELTA
        LOCATION '{output_path}'
    """)
    print("📘 Tabela SQL '15_ordens_de_servico_cleaned' criada com sucesso.")

In [None]:
{"execution_finish_time":"2025-06-19T00:18:29.4037212Z","execution_start_time":"2025-06-19T00:18:02.2740689Z","livy_statement_state":"available","normalized_state":"finished","parent_msg_id":"c2b8f802-ea09-4522-bb4a-27ae25cc0375","queued_time":"2025-06-19T00:18:02.2725446Z","session_id":"890d1dc3-8cb1-4ac0-af2b-0d1335715fdd","session_start_time":null,"spark_pool":null,"state":"finished","statement_id":5,"statement_ids":[5]}

In [None]:
🔍 Pré-visualização do dataset lido:
    +----+---+-------+--------+--------------------------+-------------------+----------------------------------------+-------------------------+-----------+-------------------+----------------------------+
    |ano |mes|data   |distrito|concelho                  |freguesia          |tipo_de_servico                         |ordens_servico_realizadas|coddistrito|coddistritoconcelho|coddistritoconcelhofreguesia|
    +----+---+-------+--------+--------------------------+-------------------+----------------------------------------+-------------------------+-----------+-------------------+----------------------------+
    |2025|3  |2025-03|Lisboa  |Torres Vedras             |Silveira           |Desativações                            |3                        |11         |1113               |111316                      |
    |2025|3  |2025-03|Porto   |Vila Nova de Gaia         |Pedroso e Seixezelo|Alteração Contratual                    |65                       |13         |1317               |131728                      |
    |2025|3  |2025-03|Faro    |Vila Real de Santo António|Vila Nova de Cacela|Reduções temporárias Potência Contratada|33                       |8          |816                |081601                      |
    |2025|3  |2025-03|Faro    |Olhão                     |Quelfes            |Alteração Contratual                    |46                       |8          |810                |081005                      |
    |2025|3  |2025-03|Braga   |Guimarães                 |Brito              |Alteração Contratual                    |5                        |3          |308                |030807                      |
    +----+---+-------+--------+--------------------------+-------------------+----------------------------------------+-------------------------+-----------+-------------------+----------------------------+
    only showing top 5 rows

In [None]:
✅ Cleaned dataset saved to Silver layer.
    📘 Tabela SQL '15_ordens_de_servico_cleaned' criada com sucesso.

In [None]:
outages_per_geography

In [None]:
from pyspark.sql import SparkSession
    from pyspark.sql.functions import col, split, trim, sha2

In [None]:
spark = SparkSession.builder.getOrCreate()

In [None]:
# ============================
    # 1. Load and parse Bronze CSV
    # ============================

In [None]:
bronze_path = "Files/bronze/outages_per_geography/*.csv"
    df = spark.read.option("header", True).option("delimiter", ";").csv(bronze_path)

In [None]:
# Force column split if only one column was detected
    if len(df.columns) == 1:
        first_col = df.columns[0]
        split_cols = ["zipcode", "municipality", "extractiondatetime", "municipalitycode", "interrupcao_ativa"]
        df = df.withColumn("split", split(col(first_col), ";"))
        for idx, col_name in enumerate(split_cols):
            df = df.withColumn(col_name, col("split").getItem(idx))
        df = df.drop("split").drop(first_col)

In [None]:
print("✅ Schema after parsing:")
    df.printSchema()

In [None]:
# ============================
    # 2. Normalize zip codes
    # ============================

In [None]:
df_clean = (
        df.withColumn("zipcode", trim(split(col("zipcode"), "-").getItem(0)))  # e.g., "2985-202" → "2985"
          .withColumn("sk_zipcode", sha2(col("zipcode"), 256))
    )

In [None]:
print("✅ Preview of normalized zipcodes:")
    df_clean.select("zipcode", "sk_zipcode").distinct().show(10, truncate=False)

In [None]:
# Save to Silver
    silver_clean_path = "Files/silver/outages_per_geography_cleaned.delta"
    df_clean.write.mode("overwrite").format("delta").save(silver_clean_path)
    print("✅ Bronze dataset 'outages_per_geography' normalized and saved to Silver layer.")

In [None]:
# ============================
    # 3. Enrich with DimZipCode
    # ============================

In [None]:
dim_zip = spark.read.table("DimZipCode").alias("dim")
    fact_zip = df_clean.alias("fact")

In [None]:
joined = fact_zip.join(
        dim_zip,
        on=col("fact.sk_zipcode") == col("dim.sk_zipcode"),
        how="inner"
    )

In [None]:
print(f"✅ Matched rows: {joined.count()}")
    joined.select(
        col("fact.zipcode").alias("fact_zipcode"),
        col("dim.postal_label").alias("dim_postal_label"),
        col("fact.municipality"),
        col("fact.extractiondatetime"),
        col("fact.interrupcao_ativa")
    ).show(truncate=False)

In [None]:
# Save to Silver
    silver_enriched_path = "Files/silver/outages_per_geography_enriched.delta"
    joined.drop("zipcode").drop("sk_zipcode").write.mode("overwrite").format("delta").save(silver_enriched_path)
    print("✅ Enriched dataset 'outages_per_geography' saved to Silver layer.")

In [None]:
{"execution_finish_time":"2025-06-27T21:26:31.4984906Z","execution_start_time":"2025-06-27T21:26:25.2485896Z","livy_statement_state":"available","normalized_state":"finished","parent_msg_id":"781284b4-8a05-4e6c-ab00-88ec2d78afaf","queued_time":"2025-06-27T21:26:14.2590862Z","session_id":"5c145d45-2b6a-48d7-a748-2ec06f037a02","session_start_time":null,"spark_pool":null,"state":"finished","statement_id":4,"statement_ids":[4]}

In [None]:
✅ Schema after parsing:
    root
     |-- zipcode: string (nullable = true)
     |-- municipality: string (nullable = true)
     |-- extractiondatetime: string (nullable = true)
     |-- municipalitycode: string (nullable = true)
     |-- interrupcao_ativa: string (nullable = true)

In [None]:
✅ Preview of normalized zipcodes:
    +-------+----------------------------------------------------------------+
    |zipcode|sk_zipcode                                                      |
    +-------+----------------------------------------------------------------+
    |,,2025 |2a7d13f3f108650145e792e547e8af3d9c9b0f254243516220c105a7bf18df45|
    +-------+----------------------------------------------------------------+

In [None]:
✅ Bronze dataset 'outages_per_geography' normalized and saved to Silver layer.
    ✅ Matched rows: 0
    +------------+----------------+------------+------------------+-----------------+
    |fact_zipcode|dim_postal_label|municipality|extractiondatetime|interrupcao_ativa|
    +------------+----------------+------------+------------------+-----------------+
    +------------+----------------+------------+------------------+-----------------+

In [None]:
✅ Enriched dataset 'outages_per_geography' saved to Silver layer.

In [None]:
8_unidades_de_producao_para_autoconsumo

In [None]:
from pyspark.sql import SparkSession
    from pyspark.sql.functions import col, sha2, concat_ws, upper, trim, split

In [None]:
spark = SparkSession.builder.getOrCreate()

In [None]:
# ============================
    # 1. Load Bronze CSV
    # ============================
    bronze_path = "Files/bronze/8_unidades_de_producao_para_autoconsumo/*.csv"
    df = spark.read.option("header", True).option("delimiter", ",").csv(bronze_path)

In [None]:
# Forçar parsing se só uma coluna tiver sido detetada
    if len(df.columns) == 1:
        first_col = df.columns[0]
        expected_cols = [
            "trimestre", "distrito", "concelho", "freguesias",
            "escalao_de_potencia_instalada", "numero_de_instalacoes",
            "potencia_total_instalada_upac_kw", "coddistrito",
            "coddistritoconcelho", "coddistritoconcelhofreguesia"
        ]
        df = df.withColumn("split", split(col(first_col), ";"))  # usa ; corretamente
        for i, name in enumerate(expected_cols):
            df = df.withColumn(name, col("split").getItem(i))
        df = df.drop("split").drop(first_col)

In [None]:
print("✅ Bronze schema loaded:")
    df.printSchema()

In [None]:
# ============================
    # 2. Normalize and generate SKs
    # ============================

In [None]:
df_clean = (
        df.withColumn("district", upper(trim(col("distrito"))))
          .withColumn("municipality", upper(trim(col("concelho"))))
          .withColumn("parish", upper(trim(col("freguesias"))))
          .withColumn("sk_district", sha2(col("district"), 256))
          .withColumn("sk_municipality", sha2(concat_ws("-", "district", "municipality"), 256))
          .withColumn("sk_parish", sha2(concat_ws("-", "district", "municipality", "parish"), 256))
    )

In [None]:
df_clean.select("district", "municipality", "parish", "sk_parish").show(10, truncate=False)

In [None]:
# Save cleaned to Silver
    df_clean.write.mode("overwrite").format("delta").save("Files/silver/8_unidades_de_producao_para_autoconsumo_cleaned.delta")
    print("✅ Cleaned dataset saved to Silver layer.")

In [None]:
# ============================
    # 3. Join with DimParish
    # ============================

In [None]:
dim_parish = spark.read.table("DimParish").alias("dim")
    fact = df_clean.alias("fact")

In [None]:
joined = fact.join(
        dim_parish,
        on=col("fact.sk_parish") == col("dim.sk_parish"),
        how="inner"
    )

In [None]:
print(f"✅ Matched rows: {joined.count()}")

In [None]:
# ============================
    # 4. Save enriched version (com colunas renomeadas para evitar conflito)
    # ============================

In [None]:
joined.select(
        col("fact.trimestre"),
        col("fact.district").alias("fact_district"),
        col("fact.municipality").alias("fact_municipality"),
        col("fact.parish").alias("fact_parish"),
        col("dim.district").alias("dim_district"),
        col("dim.municipality").alias("dim_municipality"),
        col("dim.parish").alias("dim_parish"),
        col("fact.escalao_de_potencia_instalada"),
        col("fact.numero_de_instalacoes"),
        col("fact.potencia_total_instalada_upac_kw")
    ).write.mode("overwrite").format("delta").save("Files/silver/8_unidades_de_producao_para_autoconsumo_enriched.delta")

In [None]:
print("✅ Enriched dataset saved to Silver layer.")

In [None]:
{"execution_finish_time":"2025-07-02T14:39:28.9091425Z","execution_start_time":"2025-07-02T14:38:55.7347621Z","livy_statement_state":"available","normalized_state":"finished","parent_msg_id":"875dda50-9306-463a-a69a-d14876881f23","queued_time":"2025-07-02T14:38:45.9028805Z","session_id":"7093a691-1993-4226-b462-53f1624924ca","session_start_time":"2025-07-02T14:38:45.9039311Z","spark_pool":null,"state":"finished","statement_id":3,"statement_ids":[3]}

In [None]:
✅ Bronze schema loaded:
    root
     |-- trimestre: string (nullable = true)
     |-- distrito: string (nullable = true)
     |-- concelho: string (nullable = true)
     |-- freguesias: string (nullable = true)
     |-- escalao_de_potencia_instalada: string (nullable = true)
     |-- numero_de_instalacoes: string (nullable = true)
     |-- potencia_total_instalada_upac_kw: string (nullable = true)
     |-- coddistrito: string (nullable = true)
     |-- coddistritoconcelho: string (nullable = true)
     |-- coddistritoconcelhofreguesia: string (nullable = true)

In [None]:
+--------+------------------+------------------------------------------+----------------------------------------------------------------+
    |district|municipality      |parish                                    |sk_parish                                                       |
    +--------+------------------+------------------------------------------+----------------------------------------------------------------+
    |SANTARÉM|ENTRONCAMENTO     |NOSSA SENHORA DE FÁTIMA                   |55715b8c50ea42a511d131f6cdd850da839153ed79dd3ffa01ddab677a88d8b7|
    |SANTARÉM|FERREIRA DO ZÊZERE|FERREIRA DO ZÊZERE                        |3568b927b32a8b86541e443b01cf1668f6d85d849cc649951fe60a1855957d25|
    |SANTARÉM|FERREIRA DO ZÊZERE|AREIAS E PIAS                             |e6a1cbe3c3822e406faec713f9035b6cfc0ec1e8a034c084c2be66d2c9a90f49|
    |SANTARÉM|MAÇÃO             |CARVOEIRO                                 |de59a03460be0d054a3946d7c07bc563844e077d5bda2e8dd98eaca5713b2dd6|
    |SANTARÉM|MAÇÃO             |ENVENDOS                                  |ba2a8157ff9cf0601da031d062943e5a30c104a585fd03a70d64050073366a04|
    |SANTARÉM|RIO MAIOR         |RIO MAIOR                                 |4b7711f4ee9feb1f104f9997fd43b732fda737149cab6119d809630ea3df2bfa|
    |SANTARÉM|SANTARÉM          |ABRÃ                                      |c7eea2e919546cf6c41bc1db27724e4f3f41fd7be4dfd539def1141bb53a8fe3|
    |SANTARÉM|SANTARÉM          |ALCANEDE                                  |eea52d2ab00856ea8a3acaa63add3b77030f1a4dc3fdc2b16af9544a907aa2dc|
    |SANTARÉM|SANTARÉM          |GANÇARIA                                  |a9466b6318a3f7bedcb35f46b1501d9bb507e3b51b641d5d52385833a6989d8f|
    |SANTARÉM|SANTARÉM          |UNIÃO DAS FREGUESIAS DA CIDADE DE SANTARÉM|2563ef4a69991316cd7bf130bd188c04e988e51152c16825891f5f15286675db|
    +--------+------------------+------------------------------------------+----------------------------------------------------------------+
    only showing top 10 rows

In [None]:
✅ Cleaned dataset saved to Silver layer.
    ✅ Matched rows: 59263
    ✅ Enriched dataset saved to Silver layer.

In [None]:
postos_carregamento_ves

In [None]:
from pyspark.sql import SparkSession
    from pyspark.sql.functions import col, sha2, concat_ws, upper, trim, split
    import shutil
    import os

In [None]:
spark = SparkSession.builder.getOrCreate()

In [None]:
# ============================
    # 1. Load Bronze CSV
    # ============================

In [None]:
bronze_path = "Files/bronze/postos_carregamento_ves/*.csv"
    df = spark.read.option("header", True).option("delimiter", ",").csv(bronze_path)

In [None]:
# Fallback: forçar parsing caso só haja 1 coluna
    if len(df.columns) == 1:
        first_col = df.columns[0]
        expected_cols = [
            "trimestre", "distrito", "concelho", "freguesia",
            "potencia_maxima_admissivel", "num_instalacoes",
            "coddistrito", "coddistritoconcelho", "coddistritoconcelhofreguesia"
        ]
        df = df.withColumn("split", split(col(first_col), ";"))
        for i, name in enumerate(expected_cols):
            df = df.withColumn(name, col("split").getItem(i))
        df = df.drop("split").drop(first_col)

In [None]:
print("✅ Bronze schema loaded:")
    df.printSchema()

In [None]:
# ============================
    # 2. Normalize and cast
    # ============================

In [None]:
df_clean = (
        df.withColumn("district", upper(trim(col("distrito"))))
          .withColumn("municipality", upper(trim(col("concelho"))))
          .withColumn("parish", upper(trim(col("freguesia"))))
          .withColumn("sk_district", sha2(col("district"), 256))
          .withColumn("sk_municipality", sha2(concat_ws("-", "district", "municipality"), 256))
          .withColumn("sk_parish", sha2(concat_ws("-", "district", "municipality", "parish"), 256))
          .withColumn("potencia_maxima_admissivel", col("potencia_maxima_admissivel").cast("double"))
          .withColumn("num_instalacoes", col("num_instalacoes").cast("int"))
    )

In [None]:
df_clean.select("district", "municipality", "parish", "sk_parish").show(10, truncate=False)

In [None]:
# Save cleaned version
    df_clean.write.mode("overwrite").format("delta").option("overwriteSchema", "true").save("Files/silver/postos_carregamento_ves_cleaned.delta")
    print("✅ Cleaned dataset saved to Silver layer.")

In [None]:
# ============================
    # 3. Join with DimParish
    # ============================

In [None]:
dim_parish = spark.read.table("DimParish").alias("dim")
    fact = df_clean.alias("fact")

In [None]:
joined = fact.join(dim_parish, on=col("fact.sk_parish") == col("dim.sk_parish"), how="inner")
    print(f"✅ Matched rows: {joined.count()}")

In [None]:
# ============================
    # 4. Clean target folder (safe overwrite)
    # ============================

In [None]:
# Caminho local do Delta (ajusta se necessário)
    enriched_path = "/lakehouse/default/Files/silver/postos_carregamento_ves_enriched.delta"

In [None]:
if os.path.exists(enriched_path):
        print(f"⚠️ Deleting existing folder: {enriched_path}")
        shutil.rmtree(enriched_path)
    else:
        print(f"✅ Path clear: {enriched_path}")

In [None]:
# ============================
    # 5. Save enriched version
    # ============================

In [None]:
joined.select(
        col("fact.trimestre"),
        col("fact.district").alias("fact_district"),
        col("fact.municipality").alias("fact_municipality"),
        col("fact.parish").alias("fact_parish"),
        col("dim.district").alias("dim_district"),
        col("dim.municipality").alias("dim_municipality"),
        col("dim.parish").alias("dim_parish"),
        col("fact.potencia_maxima_admissivel"),
        col("fact.num_instalacoes")
    ).write.mode("overwrite").format("delta").save("Files/silver/postos_carregamento_ves_enriched.delta")

In [None]:
print("✅ Enriched dataset saved to Silver layer.")

In [None]:
{"execution_finish_time":"2025-07-02T14:42:00.9377801Z","execution_start_time":"2025-07-02T14:41:25.1230605Z","livy_statement_state":"available","normalized_state":"finished","parent_msg_id":"c18409bf-ff81-4654-bdd2-58deddb31ec9","queued_time":"2025-07-02T14:41:25.1220088Z","session_id":"7093a691-1993-4226-b462-53f1624924ca","session_start_time":null,"spark_pool":null,"state":"finished","statement_id":4,"statement_ids":[4]}

In [None]:
✅ Bronze schema loaded:
    root
     |-- trimestre: string (nullable = true)
     |-- distrito: string (nullable = true)
     |-- concelho: string (nullable = true)
     |-- freguesia: string (nullable = true)
     |-- potencia_maxima_admissivel: string (nullable = true)
     |-- num_instalacoes: string (nullable = true)
     |-- coddistrito: string (nullable = true)
     |-- coddistritoconcelho: string (nullable = true)
     |-- coddistritoconcelhofreguesia: string (nullable = true)

In [None]:
+----------+-------------------+--------------------------------------------------+----------------------------------------------------------------+
    |district  |municipality       |parish                                            |sk_parish                                                       |
    +----------+-------------------+--------------------------------------------------+----------------------------------------------------------------+
    |FARO      |OLHÃO              |PECHÃO                                            |c9bdee1372b2088463cc6f264d3c047602e5773de7ace9c5bc18ca153ef35208|
    |PORTO     |MATOSINHOS         |SÃO MAMEDE DE INFESTA E SENHORA DA HORA           |2e0b8fda54dce67ea1569e5193060458d8b4c2a8f979b6e0585bc84b60bcbcd9|
    |PORTO     |MAIA               |CIDADE DA MAIA                                    |8e7fe01c2f788352e05b59f362e631ed1b888d1470f9a91328f2bf98cbcae702|
    |PORTO     |VILA DO CONDE      |VILA DO CONDE                                     |014d5562a0f9dbcfd3c36ce4b5ca80dde9f560413d494c8bb2ca9bb14b2febc0|
    |LISBOA    |VILA FRANCA DE XIRA|ALVERCA DO RIBATEJO E SOBRALINHO                  |e8973f6b7fcd943ddb3dd02f05b49e5fbb7abf0d5d08740dd5ccebaad7ce4975|
    |PORTALEGRE|PORTALEGRE         |SÉ E SÃO LOURENÇO                                 |6c561b398ebbfcd3809998fd8578c2e2dcbc1678120c2df530efcc9b604a6319|
    |PORTO     |PENAFIEL           |RIO MAU                                           |eb60d89265a12805ab42133f15de6f6fa703b36f0a9fc7a0ef1a39d02de5ac0b|
    |LISBOA    |AZAMBUJA           |MANIQUE DO INTENDENTE, V. N. DE S. PEDRO E MAÇUSSA|8e9059902ccc18a22e1b3e0f09b3b56575a6efafc416640a73d9636394f07206|
    |SETÚBAL   |SESIMBRA           |QUINTA DO CONDE                                   |5abff732342b31659efb53fb837c8b35194a398a12fef05aad13bf6e12bb9f99|
    |LISBOA    |LOURES             |MOSCAVIDE E PORTELA                               |785b0eaf2432cc6bd0dba2ccab587b6393b53f05f7996aaccc2f1a89ba36a897|
    +----------+-------------------+--------------------------------------------------+----------------------------------------------------------------+
    only showing top 10 rows

In [None]:
✅ Cleaned dataset saved to Silver layer.
    ✅ Matched rows: 18057
    ⚠️ Deleting existing folder: /lakehouse/default/Files/silver/postos_carregamento_ves_enriched.delta
    ✅ Enriched dataset saved to Silver layer.

In [None]:
20_caracterizacao_pes_contrato_ativo

In [None]:
from pyspark.sql import SparkSession
    from pyspark.sql.functions import col, sha2, concat_ws, upper, trim, split

In [None]:
spark = SparkSession.builder.getOrCreate()

In [None]:
# ============================
    # 1. Load Bronze CSV
    # ============================
    bronze_path = "Files/bronze/20_caracterizacao_pes_contrato_ativo/*.csv"
    df = spark.read.option("header", True).option("delimiter", ",").csv(bronze_path)

In [None]:
# Forçar parsing se só uma coluna tiver sido detetada (malformatação)
    if len(df.columns) == 1:
        first_col = df.columns[0]
        expected_cols = [
            "ano", "mes", "data", "distrito", "concelho", "freguesia",
            "tipo_de_instalacao", "nivel_de_tensao", "cpes",
            "coddistrito", "coddistritoconcelho", "coddistritoconcelhofreguesia"
        ]
        df = df.withColumn("split", split(col(first_col), ";"))
        for i, name in enumerate(expected_cols):
            df = df.withColumn(name, col("split").getItem(i))
        df = df.drop("split").drop(first_col)

In [None]:
print("✅ Bronze schema loaded:")
    df.printSchema()

In [None]:
# ============================
    # 2. Normalize and generate SKs
    # ============================

In [None]:
df_clean = (
        df.withColumn("district", upper(trim(col("distrito"))))
          .withColumn("municipality", upper(trim(col("concelho"))))
          .withColumn("parish", upper(trim(col("freguesia"))))
          .withColumn("sk_district", sha2(col("district"), 256))
          .withColumn("sk_municipality", sha2(concat_ws("-", "district", "municipality"), 256))
          .withColumn("sk_parish", sha2(concat_ws("-", "district", "municipality", "parish"), 256))
    )

In [None]:
df_clean.select("district", "municipality", "parish", "sk_parish").show(10, truncate=False)

In [None]:
# Save cleaned to Silver
    df_clean.write.mode("overwrite").format("delta").save("Files/silver/20_caracterizacao_pes_contrato_ativo_cleaned.delta")
    print("✅ Cleaned dataset saved to Silver layer.")

In [None]:
# ============================
    # 3. Join with DimParish
    # ============================

In [None]:
dim_parish = spark.read.table("DimParish").alias("dim")
    fact = df_clean.alias("fact")

In [None]:
joined = fact.join(dim_parish, on=col("fact.sk_parish") == col("dim.sk_parish"), how="inner")

In [None]:
print(f"✅ Matched rows: {joined.count()}")

In [None]:
# ============================
    # 4. Save enriched version
    # ============================

In [None]:
joined.select(
        col("fact.ano"),
        col("fact.mes"),
        col("fact.data"),
        col("fact.district").alias("fact_district"),
        col("fact.municipality").alias("fact_municipality"),
        col("fact.parish").alias("fact_parish"),
        col("dim.district").alias("dim_district"),
        col("dim.municipality").alias("dim_municipality"),
        col("dim.parish").alias("dim_parish"),
        col("fact.tipo_de_instalacao"),
        col("fact.nivel_de_tensao"),
        col("fact.cpes")
    ).write.mode("overwrite").format("delta").save("Files/silver/20_caracterizacao_pes_contrato_ativo_enriched.delta")

In [None]:
print("✅ Enriched dataset saved to Silver layer.")

In [None]:
{"execution_finish_time":"2025-07-02T14:42:39.6906291Z","execution_start_time":"2025-07-02T14:42:27.052766Z","livy_statement_state":"available","normalized_state":"finished","parent_msg_id":"e36199b6-797d-4b52-97ab-04e639faf1c2","queued_time":"2025-07-02T14:42:27.0516266Z","session_id":"7093a691-1993-4226-b462-53f1624924ca","session_start_time":null,"spark_pool":null,"state":"finished","statement_id":5,"statement_ids":[5]}

In [None]:
✅ Bronze schema loaded:
    root
     |-- ano: string (nullable = true)
     |-- mes: string (nullable = true)
     |-- data: string (nullable = true)
     |-- distrito: string (nullable = true)
     |-- concelho: string (nullable = true)
     |-- freguesia: string (nullable = true)
     |-- tipo_de_instalacao: string (nullable = true)
     |-- nivel_de_tensao: string (nullable = true)
     |-- cpes: string (nullable = true)
     |-- coddistrito: string (nullable = true)
     |-- coddistritoconcelho: string (nullable = true)
     |-- coddistritoconcelhofreguesia: string (nullable = true)

In [None]:
+----------------+--------------------+--------------------------+----------------------------------------------------------------+
    |district        |municipality        |parish                    |sk_parish                                                       |
    +----------------+--------------------+--------------------------+----------------------------------------------------------------+
    |GUARDA          |CELORICO DA BEIRA   |RATOEIRA                  |f08c5d430d0e1dd873ce1607165137d466339738367e7f615a522b99e77905a9|
    |VISEU           |MOIMENTA DA BEIRA   |SEVER                     |33d8de21c0aec926a87ab5f8a4a5ceef3a5d3c3737497d265c2081280a3e4380|
    |VISEU           |TABUAÇO             |SENDIM                    |0ee7efdd891449b34b5dfa4e256200d75c1e857656b513a90dfd86bece23e454|
    |COIMBRA         |TÁBUA               |PÓVOA DE MIDÕES           |efaa10f5362f88a95794bb94046ec4068542a60f67629440b8d7e8b48e5e2252|
    |COIMBRA         |CANTANHEDE          |TOCHA                     |a9d1b0f0ed81c69e97b20a37537c9127927e8d862533f966362514b5db0a138c|
    |CASTELO BRANCO  |FUNDÃO              |CASTELO NOVO              |375dbb7fdec2b5c35947df201027fab59d9f422ea45d03c8932c7f82acf5bb73|
    |BRAGANÇA        |MACEDO DE CAVALEIROS|LOMBO                     |14dfe106528deb8d011f9ad1207db0c52924b0965eaefe5b5566bcf4d3cf97c3|
    |VIANA DO CASTELO|MELGAÇO             |PARADA DO MONTE E CUBALHÃO|3969e4060e42c61ffabf84b9745f20dfebc392f8937a80c54f91ea51c655ad2a|
    |PORTO           |VILA NOVA DE GAIA   |OLIVEIRA DO DOURO         |6329aa6a362766799e53e9e7428b61a39e860ed7ac081788e13611ad08630a2a|
    |BRAGANÇA        |MACEDO DE CAVALEIROS|BORNES E BURGA            |85f512fc865a52b16821107fd337e165c37ab159e05dece44a41d8c203a69ea1|
    +----------------+--------------------+--------------------------+----------------------------------------------------------------+
    only showing top 10 rows

In [None]:
✅ Cleaned dataset saved to Silver layer.
    ✅ Matched rows: 467326
    ✅ Enriched dataset saved to Silver layer.

In [None]:
23_leituras_recolhidas_remotamente

In [None]:
from pyspark.sql import SparkSession
    from pyspark.sql.functions import col, sha2, concat_ws, upper, trim, split

In [None]:
spark = SparkSession.builder.getOrCreate()

In [None]:
# ============================
    # 1. Load Bronze CSV
    # ============================
    bronze_path = "Files/bronze/23_leituras_recolhidas_remotamente/*.csv"
    df = spark.read.option("header", True).option("delimiter", ",").csv(bronze_path)

In [None]:
# Forçar parsing se só tiver uma coluna
    if len(df.columns) == 1:
        first_col = df.columns[0]
        expected_cols = [
            "ano", "mes", "data", "distrito", "concelho", "freguesia",
            "cpes_com_leituras", "coddistrito", "coddistritoconcelho", "coddistritoconcelhofreguesia"
        ]
        df = df.withColumn("split", split(col(first_col), ";"))
        for i, name in enumerate(expected_cols):
            df = df.withColumn(name, col("split").getItem(i))
        df = df.drop("split").drop(first_col)

In [None]:
print("✅ Bronze schema loaded:")
    df.printSchema()

In [None]:
# ============================
    # 2. Normalize & Generate SKs
    # ============================
    df_clean = (
        df.withColumn("district", upper(trim(col("distrito"))))
          .withColumn("municipality", upper(trim(col("concelho"))))
          .withColumn("parish", upper(trim(col("freguesia"))))
          .withColumn("sk_district", sha2(col("district"), 256))
          .withColumn("sk_municipality", sha2(concat_ws("-", "district", "municipality"), 256))
          .withColumn("sk_parish", sha2(concat_ws("-", "district", "municipality", "parish"), 256))
    )

In [None]:
df_clean.select("district", "municipality", "parish", "sk_parish").show(10, truncate=False)

In [None]:
# Save cleaned to Silver
    df_clean.write.mode("overwrite").format("delta").save("Files/silver/23_leituras_recolhidas_remotamente_cleaned.delta")
    print("✅ Cleaned dataset saved to Silver layer.")

In [None]:
# ============================
    # 3. Join with DimParish
    # ============================
    dim_parish = spark.read.table("DimParish").alias("dim")
    fact = df_clean.alias("fact")

In [None]:
joined = fact.join(dim_parish, on=col("fact.sk_parish") == col("dim.sk_parish"), how="inner")
    print(f"✅ Matched rows: {joined.count()}")

In [None]:
# ============================
    # 4. Save enriched version (renaming to avoid conflicts)
    # ============================
    joined.select(
        col("fact.ano"),
        col("fact.mes"),
        col("fact.data"),
        col("fact.cpes_com_leituras"),                    # ✅ Métrica factual
        col("fact.coddistritoconcelho"),                  # ✅ Código útil opcional
        col("fact.district").alias("fact_district"),
        col("fact.municipality").alias("fact_municipality"),
        col("fact.parish").alias("fact_parish"),
        col("dim.district").alias("dim_district"),
        col("dim.municipality").alias("dim_municipality"),
        col("dim.parish").alias("dim_parish"),
        col("fact.sk_parish")                             # ✅ SK para joins posteriores
    ).write.option("mergeSchema", "true").mode("overwrite").format("delta").save("Files/silver/23_leituras_recolhidas_remotamente_enriched.delta")

In [None]:
print("✅ Enriched dataset saved to Silver layer.")

In [None]:
{"execution_finish_time":"2025-06-28T17:53:19.0195833Z","execution_start_time":"2025-06-28T17:53:08.8726754Z","livy_statement_state":"available","normalized_state":"finished","parent_msg_id":"450cdac6-853f-44f9-a045-2dd40cecfbfe","queued_time":"2025-06-28T17:53:08.8713544Z","session_id":"f936bd51-85df-456e-9278-2ea5b1c85269","session_start_time":null,"spark_pool":null,"state":"finished","statement_id":4,"statement_ids":[4]}

In [None]:
✅ Bronze schema loaded:
    root
     |-- ano: string (nullable = true)
     |-- mes: string (nullable = true)
     |-- data: string (nullable = true)
     |-- distrito: string (nullable = true)
     |-- concelho: string (nullable = true)
     |-- freguesia: string (nullable = true)
     |-- cpes_com_leituras: string (nullable = true)
     |-- coddistrito: string (nullable = true)
     |-- coddistritoconcelho: string (nullable = true)
     |-- coddistritoconcelhofreguesia: string (nullable = true)

In [None]:
+----------------+--------------------+-------------------------------------------+----------------------------------------------------------------+
    |district        |municipality        |parish                                     |sk_parish                                                       |
    +----------------+--------------------+-------------------------------------------+----------------------------------------------------------------+
    |LISBOA          |LOURINHÃ            |MIRAGAIA E MARTELEIRA                      |c8484fc0079dc5ce4768114748796fca451684671ee659660c23e0c308fcab99|
    |VILA REAL       |VILA POUCA DE AGUIAR|PENSALVOS E PARADA DE MONTEIROS            |466de2113bccea16650c9bb5107eeea3216ae84a4c1aafd1fcf84e07d8451fa7|
    |VISEU           |PENEDONO            |PÓVOA DE PENELA                            |1882cd06a0d1bcce5772af5772612f1214ec8b2fb7136638c50222c144416bcc|
    |GUARDA          |FORNOS DE ALGODRES  |QUEIRIZ                                    |a6238b46b8a03399b2589a18b0022cc5fd1b6edd5f6613974b863d0efe88ded9|
    |VILA REAL       |MONDIM DE BASTO     |BILHÓ                                      |a645858a4852afb7f4cb2f9134cc1817befbb484f528aa6a3394b05b0992f936|
    |LISBOA          |MAFRA               |VENDA DO PINHEIRO E SANTO ESTÊVÃO DAS GALÉS|573760f0b2417c81d00aa4eb716ba438465d83b57e5d7ed16d2fe0ac699b4d9c|
    |BRAGA           |BRAGA               |PALMEIRA                                   |f4fd55973e3e61727070e63d6bf376ed93cee97c61ad333f1c0de13dcbb1cfef|
    |VIANA DO CASTELO|MONÇÃO              |ANHÕES E LUZIO                             |0edfaa49bcf2c6050cf00d6244293ab13eb34f8a4b63c5754291a3a52a7c8b5c|
    |GUARDA          |MÊDA                |RANHADOS                                   |578aaeec840318fbacf491d2624033ca5e39887820ff86110cb8aeac615cf3fa|
    |AVEIRO          |AROUCA              |CANELAS E ESPIUNCA                         |4d8dbd08b11fea78740a05ccb9304f598a86f18c64a7a1bee5406e0c6dc321d7|
    +----------------+--------------------+-------------------------------------------+----------------------------------------------------------------+
    only showing top 10 rows

In [None]:
✅ Cleaned dataset saved to Silver layer.
    ✅ Matched rows: 86158
    ✅ Enriched dataset saved to Silver layer.

In [None]:
12_continuidade_de_servico_indicadores_gerais_de_continuidade_de_servico

In [None]:
from pyspark.sql import SparkSession
    from pyspark.sql.functions import col, sha2, concat_ws, upper, trim, split

In [None]:
spark = SparkSession.builder.getOrCreate()

In [None]:
# ============================
    # 1. Load Bronze CSV
    # ============================
    bronze_path = "Files/bronze/12_continuidade_de_servico_indicadores_gerais_de_continuidade_de_servico/*.csv"
    df = spark.read.option("header", True).option("delimiter", ",").csv(bronze_path)

In [None]:
# Forçar parsing se só existir uma coluna
    if len(df.columns) == 1:
        expected_cols = [
            "ano", "nuts_iii", "codigo_concelho", "concelho", "zona_rqs",
            "saifi_at_num", "saidi_at_min", "maifi_at_num",
            "tiepi_mt_min", "end_mt_mwh",
            "saifi_mt_num", "saidi_mt_min", "maifi_mt_num",
            "saifi_bt_num", "saidi_bt_min"
        ]
        df = df.withColumn("split", split(col(df.columns[0]), ";"))
        for i, name in enumerate(expected_cols):
            df = df.withColumn(name, col("split").getItem(i))
        df = df.drop("split").drop(df.columns[0])

In [None]:
print("✅ Bronze schema loaded:")
    df.printSchema()

In [None]:
# Mostrar total de registos no Bronze
    print(f"📊 Total Bronze rows: {df.count()}")

In [None]:
# ============================
    # 2. Normalize and create SK
    # ============================
    df_clean = (
        df.withColumn("district", upper(trim(col("nuts_iii"))))
          .withColumn("municipality", upper(trim(col("concelho"))))
          .withColumn("sk_municipality", sha2(concat_ws("-", "district", "municipality"), 256))
    )

In [None]:
df_clean.select("codigo_concelho", "municipality", "sk_municipality").show(10, truncate=False)

In [None]:
# Save cleaned version to Silver
    df_clean.write.mode("overwrite").format("delta").save("Files/silver/12_continuidade_de_servico_indicadores_gerais_de_continuidade_de_servico_cleaned")

In [None]:
print("✅ Cleaned dataset saved to Silver layer.")

In [None]:
# ============================
    # 3. Join with DimMunicipality
    # ============================
    dim = spark.read.table("DimMunicipality").alias("dim")
    fact = df_clean.alias("fact")

In [None]:
joined = fact.join(
        dim,
        on=col("fact.sk_municipality") == col("dim.sk_municipality"),
        how="inner"
    )

In [None]:
print(f"✅ Matched rows: {joined.count()}")

In [None]:
# ============================
    # 4. Save Enriched Version
    # ============================
    joined.select(
        col("fact.ano"),
        col("fact.district").alias("fact_district"),
        col("fact.municipality").alias("fact_municipality"),
        col("dim.district").alias("dim_district"),
        col("dim.municipality").alias("dim_municipality"),
        col("fact.codigo_concelho"),
        col("fact.zona_rqs"),
        col("fact.saifi_at_num"),
        col("fact.saidi_at_min"),
        col("fact.maifi_at_num"),
        col("fact.tiepi_mt_min"),
        col("fact.end_mt_mwh"),
        col("fact.saifi_mt_num"),
        col("fact.saidi_mt_min"),
        col("fact.maifi_mt_num"),
        col("fact.saifi_bt_num"),
        col("fact.saidi_bt_min")
    ).write.mode("overwrite").format("delta").save("Files/silver/12_continuidade_servico_enriched.delta")

In [None]:
print("✅ Enriched dataset saved to Silver layer.")

In [None]:
{"execution_finish_time":"2025-07-02T14:48:57.6764394Z","execution_start_time":"2025-07-02T14:48:47.7956506Z","livy_statement_state":"available","normalized_state":"finished","parent_msg_id":"cdb1f4b6-ffe4-4060-9daa-bf12c4df5db6","queued_time":"2025-07-02T14:48:47.794449Z","session_id":"7093a691-1993-4226-b462-53f1624924ca","session_start_time":null,"spark_pool":null,"state":"finished","statement_id":7,"statement_ids":[7]}

In [None]:
✅ Bronze schema loaded:
    root
     |-- ano: string (nullable = true)
     |-- nuts_iii: string (nullable = true)
     |-- codigo_concelho: string (nullable = true)
     |-- concelho: string (nullable = true)
     |-- zona_rqs: string (nullable = true)
     |-- saifi_at_num: string (nullable = true)
     |-- saidi_at_min: string (nullable = true)
     |-- maifi_at_num: string (nullable = true)
     |-- tiepi_mt_min: string (nullable = true)
     |-- end_mt_mwh: string (nullable = true)
     |-- saifi_mt_num: string (nullable = true)
     |-- saidi_mt_min: string (nullable = true)
     |-- maifi_mt_num: string (nullable = true)
     |-- saifi_bt_num: string (nullable = true)
     |-- saidi_bt_min: string (nullable = true)

In [None]:
📊 Total Bronze rows: 11120
    +---------------+--------------------------+----------------------------------------------------------------+
    |codigo_concelho|municipality              |sk_municipality                                                 |
    +---------------+--------------------------+----------------------------------------------------------------+
    |801            |ALBUFEIRA                 |68081357adc2636e3fdf27341130e238619f9971b9fa92a2954d24fc6e5f71d0|
    |1509           |SANTIAGO DO CACÉM         |9d93c97b4d68506ed8448b7c6939fea257c9fdc505d9418a1a8af66d6bf030d3|
    |802            |ALCOUTIM                  |153c8701e194af333c983d4f6da2e9fb8e7027b76ddc3a2f6563bb6adee359cf|
    |811            |PORTIMÃO                  |a04818d85609e86dd1d8cd0e28ad8d0ae694406fe10b311872f51591cc298c3b|
    |813            |SILVES                    |7236be143225509fcf9f73ebd51813ae7ad7cecb0cb23038c4297096c26e546a|
    |807            |LAGOS                     |afe3601fd44acfa81074a3abdb4ba6bb6fa9b15f1f3ba0f845596411fa498a49|
    |808            |LOULÉ                     |aa32869c2f57a4132f2138c6c0d339f5cbeb8dcc0db380a616754544be7bf531|
    |816            |VILA REAL DE SANTO ANTÓNIO|f8db8f0291bca6ace3c9e67e0ae17f0823317edfc6363b7a5baa5ef3f6222b4b|
    |815            |VILA DO BISPO             |050f6c0aaec4f008680f565c5b636c8ce82e1cdbb584e6a8dddb708fd17388bf|
    |815            |VILA DO BISPO             |050f6c0aaec4f008680f565c5b636c8ce82e1cdbb584e6a8dddb708fd17388bf|
    +---------------+--------------------------+----------------------------------------------------------------+
    only showing top 10 rows

In [None]:
✅ Cleaned dataset saved to Silver layer.
    ✅ Matched rows: 0
    ✅ Enriched dataset saved to Silver layer.

In [None]:
16_pedidos_concluidos_plrs

In [None]:
from pyspark.sql import SparkSession
    from pyspark.sql.functions import (
        col, trim, upper, sha2, split, lpad, concat_ws,
        concat, lit, to_date, date_format, when
    )

In [None]:
spark = SparkSession.builder.getOrCreate()

In [None]:
# ============================
    # 1. Load Bronze CSV
    # ============================
    bronze_path = "Files/bronze/16_pedidos_concluidos_plrs/*.csv"
    df = spark.read.option("header", True).option("delimiter", ",").csv(bronze_path)

In [None]:
# Se só existir uma coluna (mal delimitado), forçar split
    if len(df.columns) == 1:
        first_col = df.columns[0]
        expected_cols = [
            "ano", "semestre", "data_semestre", "concelho",
            "pedidos_de_ligacao_a_rede_executados", "codconcelho"
        ]
        df = df.withColumn("split", split(col(first_col), ";"))
        for i, name in enumerate(expected_cols):
            df = df.withColumn(name, col("split").getItem(i))
        df = df.drop("split").drop(first_col)

In [None]:
print("✅ Bronze schema loaded:")
    df.printSchema()
    print("📊 Total Bronze rows:", df.count())

In [None]:
# ============================
    # 2. Normalize (municipality + pedidos)
    # ============================
    df_clean = (
        df.withColumn("municipality", upper(trim(col("concelho"))))
          .withColumn("codconcelho_clean", lpad(trim(col("codconcelho").cast("int").cast("string")), 4, "0"))
          .withColumn("pedidos", col("pedidos_de_ligacao_a_rede_executados").cast("int"))
          .withColumn("data_semestre", col("data_semestre"))  # manter original para referência
    )

In [None]:
# ============================
    # 3. Join with DimMunicipality to get district
    # ============================
    dim_path = "Files/silver/DimMunicipality.delta"
    dim_mun = spark.read.format("delta").load(dim_path).select("district", "municipality").dropDuplicates()

In [None]:
df_enriched = df_clean.join(dim_mun, on="municipality", how="left")

In [None]:
# ============================
    # 4. Generate uniform SK
    # ============================
    df_enriched = df_enriched.withColumn(
        "sk_municipality",
        sha2(concat_ws("-", col("district"), col("municipality")), 256)
    )

In [None]:
# ============================
    # 5. Generate SK_Date from semestre
    # ============================
    df_enriched = df_enriched.withColumn(
        "date_final_str",
        when(col("semestre") == "1", concat(col("ano"), lit("-06-30")))
        .when(col("semestre") == "2", concat(col("ano"), lit("-12-31")))
    )

In [None]:
df_enriched = df_enriched.withColumn("date", to_date(col("date_final_str"), "yyyy-MM-dd"))
    df_enriched = df_enriched.withColumn("sk_date", date_format(col("date"), "yyyyMMdd").cast("int"))

In [None]:
# Mostrar preview completo
    df_enriched.select(
        "district", "municipality", "ano", "semestre", "date", "sk_date", "pedidos", "sk_municipality"
    ).show(10, truncate=False)

In [None]:
# ============================
    # 6. Save to Silver Layer
    # ============================
    silver_path = "Files/silver/16_pedidos_concluidos_plrs_cleaned.delta"
    df_enriched.write \
        .option("mergeSchema", "true") \
        .mode("overwrite") \
        .format("delta") \
        .save(silver_path)

In [None]:
print("✅ Cleaned dataset saved to Silver layer com SKs normalizados e sk_date incluído.")

In [None]:
{"execution_finish_time":"2025-06-28T18:25:38.6759456Z","execution_start_time":"2025-06-28T18:25:33.9087437Z","livy_statement_state":"available","normalized_state":"finished","parent_msg_id":"5392c41e-09c8-40a8-b7b0-d812757726ba","queued_time":"2025-06-28T18:25:33.9073985Z","session_id":"f936bd51-85df-456e-9278-2ea5b1c85269","session_start_time":null,"spark_pool":null,"state":"finished","statement_id":9,"statement_ids":[9]}

In [None]:
✅ Bronze schema loaded:
    root
     |-- ano: string (nullable = true)
     |-- semestre: string (nullable = true)
     |-- data_semestre: string (nullable = true)
     |-- concelho: string (nullable = true)
     |-- pedidos_de_ligacao_a_rede_executados: string (nullable = true)
     |-- codconcelho: string (nullable = true)

In [None]:
📊 Total Bronze rows: 2508
    +----------------+----------------+----+--------+----------+--------+-------+----------------------------------------------------------------+
    |district        |municipality    |ano |semestre|date      |sk_date |pedidos|sk_municipality                                                 |
    +----------------+----------------+----+--------+----------+--------+-------+----------------------------------------------------------------+
    |BRAGANÇA        |VILA FLOR       |2021|1       |2021-06-30|20210630|31     |5ba6ca8d0cde380634fb960d92cb437dc72721652ebf781f710aa3f4715399ce|
    |BRAGANÇA        |BRAGANÇA        |2021|1       |2021-06-30|20210630|217    |c2b4bde3bdd6c0a6eaca9599113549b21f80fc6bc74c4802525c83bd2d7dac6f|
    |BRAGANÇA        |ALFÂNDEGA DA FÉ |2021|1       |2021-06-30|20210630|43     |637e366ab29d5ed49a0dce561f333e46e1f17ba36f750fd8eaa824f298010f96|
    |COIMBRA         |GÓIS            |2021|1       |2021-06-30|20210630|28     |6c0b00216fa0ac39c46d2bd5264a0ed53f9152627db963acb6ef750aab8197a1|
    |BRAGA           |ESPOSENDE       |2021|1       |2021-06-30|20210630|244    |88fe36573344bb425d22cb6120b8273322b3f79f6f0228d519dd0e8b1c153688|
    |VIANA DO CASTELO|MELGAÇO         |2021|1       |2021-06-30|20210630|35     |97583b5c743a1cabef7b0320667dbc51bf82b85fa0ebace976a4e8ff483dc179|
    |VISEU           |PENEDONO        |2021|1       |2021-06-30|20210630|13     |8d42f8abc06a7135fe974c11b4e0eb5cfc3564b318d2c4232a8685b32dfe2c11|
    |LEIRIA          |ÓBIDOS          |2021|1       |2021-06-30|20210630|81     |05a0c565eb5f30c69360c1e74ddab325e04057f38a9c71457158d0a8dd2703ec|
    |SANTARÉM        |BENAVENTE       |2021|1       |2021-06-30|20210630|95     |fdc6ce3cd22852873d4e9695772d68785b8d30c8c1f48fbcc88ef605833cf61a|
    |COIMBRA         |MIRANDA DO CORVO|2021|1       |2021-06-30|20210630|36     |a7fb70c3249a24ed593850d297538b59b0bcfa5caa21851acb7f910df87fb9fc|
    +----------------+----------------+----+--------+----------+--------+-------+----------------------------------------------------------------+
    only showing top 10 rows

In [None]:
✅ Cleaned dataset saved to Silver layer com SKs normalizados e sk_date incluído.

In [None]:
9_plr_mobilidade_eletrica

In [None]:
from pyspark.sql import SparkSession
    from pyspark.sql.functions import col, trim, upper

In [None]:
spark = SparkSession.builder.getOrCreate()

In [None]:
# ============================
    # 1. Load Bronze CSV
    # ============================
    bronze_path = "Files/bronze/9_plr_mobilidade_eletrica/*.csv"
    df = spark.read.option("header", True).option("delimiter", ",").csv(bronze_path)

In [None]:
# ============================
    # 2. Normalize and Join with DimMunicipality
    # ============================
    df = df.withColumn("municipality", upper(trim(col("concelho"))))

In [None]:
# Load dimension
    dim_muni = spark.read.format("delta").load("Files/silver/DimMunicipality.delta")

In [None]:
# Join to get the correct SK
    df_clean = df.join(
        dim_muni.select("municipality", "sk_municipality"),
        on="municipality",
        how="left"
    )

In [None]:
df_clean.select("ano", "semestre", "municipality", "pedidos_de_ligacao_a_rede_executados", "sk_municipality").show(10, False)

In [None]:
# ============================
    # 3. Save to Silver Layer
    # ============================
    df_clean.write.mode("overwrite").format("delta").save("Files/silver/9_plr_mobilidade_eletrica_cleaned.delta")
    print("✅ Cleaned dataset saved to Silver layer.")

In [None]:
{"execution_finish_time":"2025-06-16T21:54:18.0440452Z","execution_start_time":"2025-06-16T21:54:10.0522793Z","livy_statement_state":"available","normalized_state":"finished","parent_msg_id":"3ff26c9d-5aa3-458f-8ce7-6500ff49649d","queued_time":"2025-06-16T21:54:10.0511017Z","session_id":"04e08fd7-3298-4f43-b914-6d4ec1c1dc0c","session_start_time":null,"spark_pool":null,"state":"finished","statement_id":7,"statement_ids":[7]}

In [None]:
+----+--------+------------------+------------------------------------+----------------------------------------------------------------+
    |ano |semestre|municipality      |pedidos_de_ligacao_a_rede_executados|sk_municipality                                                 |
    +----+--------+------------------+------------------------------------+----------------------------------------------------------------+
    |2024|2       |TORRES VEDRAS     |1.0                                 |455a03289814e2cba1f7caf868d5e82f2dca4e4f2da79f37adc8f78bf4bab9d8|
    |2024|2       |AMARANTE          |1.0                                 |b8d9a743c1a13a1eb5d0be9fad32301db7419724352cd1bf16cc94ee53c7ea40|
    |2024|2       |GONDOMAR          |2.0                                 |5fb03ae2892f744eb5d6cf675c5739c2eaff0c6f001010022447e841196a807e|
    |2024|2       |ALANDROAL         |1.0                                 |97b1a797cb9c93623a5c4e84e76fbea0ce724e0bca06b3a47adf331187f00f83|
    |2024|2       |ANADIA            |5.0                                 |f60534b5361a3a29fc1b35985d4dba70acb42e4b65f7315c82aacc2e79b59792|
    |2024|2       |ALBERGARIA-A-VELHA|5.0                                 |4664effdfd10c006c69d0d80d7e9218e241cef39fdc2a66e561f34908290c411|
    |2024|2       |ALMEIDA           |2.0                                 |0eaca4d4137a204678d09dbcbb4e111c4e2ce5ed4c7da7598b4d05c1eee8cfd7|
    |2024|2       |ÓBIDOS            |2.0                                 |05a0c565eb5f30c69360c1e74ddab325e04057f38a9c71457158d0a8dd2703ec|
    |2024|2       |LOUSÃ             |6.0                                 |431b6e6aeb03fb7698ae5f38f4e712d82d50f10179b57699158dd50610c2ca09|
    |2024|2       |GUARDA            |2.0                                 |de5ffc8f87c6858317fc30ee335a5b729d9303ba9d2adf7a9d02d9eb105148de|
    +----+--------+------------------+------------------------------------+----------------------------------------------------------------+
    only showing top 10 rows

In [None]:
✅ Cleaned dataset saved to Silver layer.

In [None]:
26_centrais

In [None]:
from pyspark.sql import SparkSession
    from pyspark.sql.functions import (
        col, trim, upper, sha2, split, lpad, concat_ws,
        concat, lit, to_date, date_format, when
    )
    import shutil
    import os

In [None]:
spark = SparkSession.builder.getOrCreate()

In [None]:
# ============================
    # 1. Load Bronze CSV
    # ============================
    bronze_path = "Files/bronze/26_centrais/*.csv"
    df = spark.read.option("header", True).option("delimiter", ",").csv(bronze_path)

In [None]:
# Se só existir uma coluna (mal delimitado), forçar split
    if len(df.columns) == 1:
        first_col = df.columns[0]
        expected_cols = ["ano", "semestre", "concelho", "potencia_de_ligacao", "processos_concluidos", "codconcelho"]
        df = df.withColumn("split", split(col(first_col), ";"))
        for i, name in enumerate(expected_cols):
            df = df.withColumn(name, col("split").getItem(i))
        df = df.drop("split").drop(first_col)

In [None]:
print("✅ Bronze schema loaded:")
    df.printSchema()
    print("📊 Total Bronze rows:", df.count())

In [None]:
# ============================
    # 2. Normalize + SK generation
    # ============================
    df_clean = (
        df.withColumn("municipality", upper(trim(col("concelho"))))
          .withColumn("codconcelho_clean", lpad(trim(col("codconcelho").cast("int").cast("string")), 4, "0"))
          .withColumn("potencia", col("potencia_de_ligacao").cast("double"))
          .withColumn("processos", col("processos_concluidos").cast("int"))
    )

In [None]:
# ============================
    # 3. Join with DimMunicipality to get district
    # ============================
    dim_path = "Files/silver/DimMunicipality.delta"
    dim_mun = spark.read.format("delta").load(dim_path).select("district", "municipality").dropDuplicates()

In [None]:
df_enriched = df_clean.join(dim_mun, on="municipality", how="left")

In [None]:
# ============================
    # 4. Generate SK_Municipality and SK_Date
    # ============================
    df_enriched = df_enriched.withColumn(
        "sk_municipality",
        sha2(concat_ws("-", col("district"), col("municipality")), 256)
    )

In [None]:
df_enriched = df_enriched.withColumn(
        "date_final_str",
        when(col("semestre") == "1", concat(col("ano"), lit("-06-30")))
        .when(col("semestre") == "2", concat(col("ano"), lit("-12-31")))
    )

In [None]:
df_enriched = df_enriched.withColumn("date", to_date(col("date_final_str"), "yyyy-MM-dd"))
    df_enriched = df_enriched.withColumn("sk_date", date_format(col("date"), "yyyyMMdd").cast("int"))

In [None]:
# ============================
    # 5. Add SK_InstallationType (fixo = Subestação)
    # ============================
    df_enriched = df_enriched.withColumn("sk_installation_type", sha2(lit("Subestação"), 256))

In [None]:
# ============================
    # 6. Mostrar preview final
    # ============================
    df_enriched.select(
        "district", "municipality", "ano", "semestre", "date", "sk_date",
        "potencia", "processos", "sk_municipality", "sk_installation_type"
    ).show(10, truncate=False)

In [None]:
# ============================
    # 7. Save to Silver Layer
    # ============================
    silver_path = "Files/silver/26_centrais_enriched.delta"

In [None]:
if os.path.exists(silver_path):
        shutil.rmtree(silver_path)
        print(f"🧨 Removed existing folder: {silver_path}")

In [None]:
df_enriched.write \
        .mode("overwrite") \
        .option("mergeSchema", "true") \
        .option("overwriteSchema", "true") \
        .format("delta") \
        .save(silver_path)

In [None]:
print("✅ Enriched dataset saved to Silver layer com SKs normalizados e sk_date incluído.")

In [None]:
{"execution_finish_time":"2025-07-02T14:53:38.9835899Z","execution_start_time":"2025-07-02T14:53:34.078874Z","livy_statement_state":"available","normalized_state":"finished","parent_msg_id":"d7bd950e-c42e-44dc-bc94-4cea5771cf05","queued_time":"2025-07-02T14:53:34.0776712Z","session_id":"7093a691-1993-4226-b462-53f1624924ca","session_start_time":null,"spark_pool":null,"state":"finished","statement_id":9,"statement_ids":[9]}

In [None]:
✅ Bronze schema loaded:
    root
     |-- ano: string (nullable = true)
     |-- semestre: string (nullable = true)
     |-- concelho: string (nullable = true)
     |-- potencia_de_ligacao: string (nullable = true)
     |-- processos_concluidos: string (nullable = true)
     |-- codconcelho: string (nullable = true)

In [None]:
📊 Total Bronze rows: 2223
    +----------+-----------------+----+--------+----------+--------+------------------+---------+----------------------------------------------------------------+----------------------------------------------------------------+
    |district  |municipality     |ano |semestre|date      |sk_date |potencia          |processos|sk_municipality                                                 |sk_installation_type                                            |
    +----------+-----------------+----+--------+----------+--------+------------------+---------+----------------------------------------------------------------+----------------------------------------------------------------+
    |BEJA      |ALMODÔVAR        |2021|2       |2021-12-31|20211231|19.87             |12       |c022c4305a1cd5ae9eb424345f4fa01c4d6f5959e87892eaaf1324cbabb021fd|582b550743f26f611ada407238cec931ee22a3054c938ac842c639b3a2eae5f8|
    |BRAGA     |GUIMARÃES        |2021|2       |2021-12-31|20211231|1331.2400000000002|552      |c0f233027848cb5d10aea7869cb75ed4aea81c6127c98325a2d14b89b2c3f9ed|582b550743f26f611ada407238cec931ee22a3054c938ac842c639b3a2eae5f8|
    |LEIRIA    |PEDRÓGÃO GRANDE  |2021|2       |2021-12-31|20211231|9.2               |10       |9e18ccb341dc37cc2c217e15b2642b0bcec00dbffa125bb032c8d3d9d13ede7c|582b550743f26f611ada407238cec931ee22a3054c938ac842c639b3a2eae5f8|
    |SETÚBAL   |GRÂNDOLA         |2021|2       |2021-12-31|20211231|69.73999999999998 |34       |477e4a232c53ea0cd8fdb815548716fe3c35abb617bb25e9590330c2549f79d7|582b550743f26f611ada407238cec931ee22a3054c938ac842c639b3a2eae5f8|
    |PORTO     |FELGUEIRAS       |2021|2       |2021-12-31|20211231|675.04            |162      |a79128db844c02483d44e128263c81b0f3067a6ed14246eed583d345bdfdb406|582b550743f26f611ada407238cec931ee22a3054c938ac842c639b3a2eae5f8|
    |PORTO     |PAÇOS DE FERREIRA|2021|2       |2021-12-31|20211231|479.1099999999999 |207      |d18583f3f38788a7214ffbff14fd100274ff70b148e45a12c1ce69aa50ed341f|582b550743f26f611ada407238cec931ee22a3054c938ac842c639b3a2eae5f8|
    |SANTARÉM  |GOLEGÃ           |2021|2       |2021-12-31|20211231|83.3              |23       |c9635a83b3e7d4356df8be785175e6735fd054fff967e5dc4aa0bda001f6cb35|582b550743f26f611ada407238cec931ee22a3054c938ac842c639b3a2eae5f8|
    |ÉVORA     |ÉVORA            |2021|2       |2021-12-31|20211231|1222.37           |208      |1bb49365695c63984e2bd71b26f123b7ca3e0315e34a49c2edef2e579c0fa03c|582b550743f26f611ada407238cec931ee22a3054c938ac842c639b3a2eae5f8|
    |PORTALEGRE|CASTELO DE VIDE  |2021|2       |2021-12-31|20211231|10.67             |7        |8952100a70375948580f87c3690a338160b168767927bc132ff6099a3c84a245|582b550743f26f611ada407238cec931ee22a3054c938ac842c639b3a2eae5f8|
    |SANTARÉM  |SARDOAL          |2021|2       |2021-12-31|20211231|24.75             |15       |fb4209057c91c99cf5a6b2bbc470d7df176cff6ca2b9f7f9edbe380632688944|582b550743f26f611ada407238cec931ee22a3054c938ac842c639b3a2eae5f8|
    +----------+-----------------+----+--------+----------+--------+------------------+---------+----------------------------------------------------------------+----------------------------------------------------------------+
    only showing top 10 rows

In [None]:
✅ Enriched dataset saved to Silver layer com SKs normalizados e sk_date incluído.

In [None]:
25_plr_producao_renovavel

In [None]:
from pyspark.sql import SparkSession
    from pyspark.sql.functions import (
        col, trim, upper, sha2, split, lpad, concat_ws,
        concat, lit, to_date, date_format, when
    )
    import shutil
    import os

In [None]:
spark = SparkSession.builder.getOrCreate()

In [None]:
# ============================
    # 1. Load Bronze CSV
    # ============================
    bronze_path = "Files/bronze/25_plr_producao_renovavel/*.csv"
    df = spark.read.option("header", True).option("delimiter", ",").csv(bronze_path)

In [None]:
# Se só existir uma coluna (mal delimitado), forçar split
    if len(df.columns) == 1:
        first_col = df.columns[0]
        expected_cols = [
            "ano", "semestre", "concelho",
            "potencia_de_ligacao", "pedidos_de_ligacao_a_rede_executados",
            "codconcelho"
        ]
        df = df.withColumn("split", split(col(first_col), ";"))
        for i, name in enumerate(expected_cols):
            df = df.withColumn(name, col("split").getItem(i))
        df = df.drop("split").drop(first_col)

In [None]:
print("✅ Bronze schema loaded:")
    df.printSchema()
    print("📊 Total Bronze rows:", df.count())

In [None]:
# ============================
    # 2. Normalize + SK generation
    # ============================
    df_clean = (
        df.withColumn("municipality", upper(trim(col("concelho"))))
          .withColumn("codconcelho_clean", lpad(trim(col("codconcelho").cast("int").cast("string")), 4, "0"))
          .withColumn("potencia", col("potencia_de_ligacao").cast("double"))
          .withColumn("pedidos", col("pedidos_de_ligacao_a_rede_executados").cast("int"))
    )

In [None]:
# ============================
    # 3. Join with DimMunicipality to get district
    # ============================
    dim_path = "Files/silver/DimMunicipality.delta"
    dim_mun = spark.read.format("delta").load(dim_path).select("district", "municipality").dropDuplicates()

In [None]:
df_enriched = df_clean.join(dim_mun, on="municipality", how="left")

In [None]:
# ============================
    # 4. Generate SK_Municipality and SK_Date
    # ============================
    df_enriched = df_enriched.withColumn(
        "sk_municipality",
        sha2(concat_ws("-", col("district"), col("municipality")), 256)
    )

In [None]:
df_enriched = df_enriched.withColumn(
        "date_final_str",
        when(col("semestre") == "1", concat(col("ano"), lit("-06-30")))
        .when(col("semestre") == "2", concat(col("ano"), lit("-12-31")))
    )

In [None]:
df_enriched = df_enriched.withColumn("date", to_date(col("date_final_str"), "yyyy-MM-dd"))
    df_enriched = df_enriched.withColumn("sk_date", date_format(col("date"), "yyyyMMdd").cast("int"))

In [None]:
# ============================
    # 5. Add SK_InstallationType (fixo = Unidade de Autoconsumo)
    # ============================
    df_enriched = df_enriched.withColumn("sk_installation_type", sha2(lit("Unidade de Autoconsumo"), 256))

In [None]:
# ============================
    # 6. Mostrar preview final
    # ============================
    df_enriched.select(
        "district", "municipality", "ano", "semestre", "date", "sk_date",
        "potencia", "pedidos", "sk_municipality", "sk_installation_type"
    ).show(10, truncate=False)

In [None]:
# ============================
    # 7. Save to Silver Layer
    # ============================
    silver_path = "Files/silver/25_plr_producao_renovavel_enriched.delta"

In [None]:
if os.path.exists(silver_path):
        shutil.rmtree(silver_path)
        print(f"🧨 Removed existing folder: {silver_path}")

In [None]:
df_enriched.write \
        .mode("overwrite") \
        .option("mergeSchema", "true") \
        .option("overwriteSchema", "true") \
        .format("delta") \
        .save(silver_path)

In [None]:
print("✅ Enriched dataset saved to Silver layer com SKs normalizados e sk_date incluído.")

In [None]:
{"execution_finish_time":"2025-07-02T14:55:43.6175286Z","execution_start_time":"2025-07-02T14:55:38.6258659Z","livy_statement_state":"available","normalized_state":"finished","parent_msg_id":"d035b471-e69f-42d0-8cd7-d64de6fa8d3c","queued_time":"2025-07-02T14:55:38.6246552Z","session_id":"7093a691-1993-4226-b462-53f1624924ca","session_start_time":null,"spark_pool":null,"state":"finished","statement_id":10,"statement_ids":[10]}

In [None]:
✅ Bronze schema loaded:
    root
     |-- ano: string (nullable = true)
     |-- semestre: string (nullable = true)
     |-- concelho: string (nullable = true)
     |-- potencia_de_ligacao: string (nullable = true)
     |-- pedidos_de_ligacao_a_rede_executados: string (nullable = true)
     |-- codconcelho: string (nullable = true)

In [None]:
📊 Total Bronze rows: 157
    +----------+-----------------+----+--------+----------+--------+--------+-------+----------------------------------------------------------------+----------------------------------------------------------------+
    |district  |municipality     |ano |semestre|date      |sk_date |potencia|pedidos|sk_municipality                                                 |sk_installation_type                                            |
    +----------+-----------------+----+--------+----------+--------+--------+-------+----------------------------------------------------------------+----------------------------------------------------------------+
    |SANTARÉM  |SANTARÉM         |2021|1       |2021-06-30|20210630|18000.0 |1      |689f166df13060e54795e05641abcc9ad63e49a3be6f2f3578383e0e689bcee7|a50615e798f5a8f902681c6172ef1c97f4df280b8084f0c95e694e5522f1d2fd|
    |PORTO     |VILA NOVA DE GAIA|2021|1       |2021-06-30|20210630|26.0    |1      |aa9d178ffd83809e8c676ca28c9f59097c12ab0d0a28b6c29f6a6d37a1db9515|a50615e798f5a8f902681c6172ef1c97f4df280b8084f0c95e694e5522f1d2fd|
    |BRAGANÇA  |TORRE DE MONCORVO|2021|1       |2021-06-30|20210630|9600.0  |1      |54d04766d152a0de2a3819e115a0afa6c7b6a5c6770c47f766ee190df278d4ef|a50615e798f5a8f902681c6172ef1c97f4df280b8084f0c95e694e5522f1d2fd|
    |ÉVORA     |ÉVORA            |2021|2       |2021-12-31|20211231|1000.0  |1      |1bb49365695c63984e2bd71b26f123b7ca3e0315e34a49c2edef2e579c0fa03c|a50615e798f5a8f902681c6172ef1c97f4df280b8084f0c95e694e5522f1d2fd|
    |BEJA      |MOURA            |2021|2       |2021-12-31|20211231|42000.0 |1      |9683060a2304d108fdafbde951c7daf7b50bb0a5f229835e952c9609a6a5f86b|a50615e798f5a8f902681c6172ef1c97f4df280b8084f0c95e694e5522f1d2fd|
    |BEJA      |SERPA            |2022|1       |2022-06-30|20220630|45000.0 |1      |c0fd6d5b3f33e447757613affb840390fd9bcc43bd278e0676eec71678a99eb1|a50615e798f5a8f902681c6172ef1c97f4df280b8084f0c95e694e5522f1d2fd|
    |VILA REAL |RIBEIRA DE PENA  |2022|1       |2022-06-30|20220630|4600.0  |1      |f5a0c02525358558fabfc6f893c9b65581bab8a8d039322aac36b9fc76135c02|a50615e798f5a8f902681c6172ef1c97f4df280b8084f0c95e694e5522f1d2fd|
    |BRAGA     |BARCELOS         |2022|2       |2022-12-31|20221231|1.0     |1      |16e003285eccfe5f9ee211c782d705bad241f04726cf6c03511dc337d3dbaafb|a50615e798f5a8f902681c6172ef1c97f4df280b8084f0c95e694e5522f1d2fd|
    |PORTALEGRE|FRONTEIRA        |2022|2       |2022-12-31|20221231|1000.0  |1      |78b5c5ee339bf74abcfecd4ce1d4e05ca8008bb093c081dbf9bf8ee2bc593ff2|a50615e798f5a8f902681c6172ef1c97f4df280b8084f0c95e694e5522f1d2fd|
    |SETÚBAL   |ALCÁCER DO SAL   |2022|2       |2022-12-31|20221231|12500.0 |1      |24e9ceba453a0004de633b3adcd7a6c845dc4b243243f8d60c7493ca868b1641|a50615e798f5a8f902681c6172ef1c97f4df280b8084f0c95e694e5522f1d2fd|
    +----------+-----------------+----+--------+----------+--------+--------+-------+----------------------------------------------------------------+----------------------------------------------------------------+
    only showing top 10 rows

In [None]:
✅ Enriched dataset saved to Silver layer com SKs normalizados e sk_date incluído.

In [None]:
network_scheduling_work

In [None]:
from pyspark.sql import SparkSession
    from pyspark.sql.functions import col, trim, upper, sha2, concat_ws, split, lit

In [None]:
spark = SparkSession.builder.getOrCreate()

In [None]:
# ============================
    # 1. Load Bronze CSV
    # ============================

In [None]:
bronze_path = "Files/bronze/network_scheduling_work/*.csv"

In [None]:
df = spark.read.option("header", True).option("delimiter", ",").csv(bronze_path)

In [None]:
print("✅ Bronze schema loaded:")
    df.printSchema()
    print("📊 Total Bronze rows:", df.count())

In [None]:
# ============================
    # 2. Normalize & generate SKs
    # ============================

In [None]:
df_clean = (
        df.withColumn("zipcode4", split(col("zipcode"), "-").getItem(0))
          .withColumn("district", upper(trim(lit(None))))  # Placeholder
          .withColumn("municipality", upper(trim(col("municipality"))))
          .withColumn("parish", upper(trim(col("parish"))))
          .withColumn("sk_municipality", sha2(concat_ws("-", "district", "municipality"), 256))
          .withColumn("sk_parish", sha2(concat_ws("-", "district", "municipality", "parish"), 256))
          .withColumn("sk_zipcode", sha2(col("zipcode4"), 256))
    )

In [None]:
# ============================
    # 3. Visual check
    # ============================

In [None]:
df_clean.select(
        "zipcode", "zipcode4", "municipality", "parish",
        "sk_zipcode", "sk_municipality", "sk_parish"
    ).show(10, truncate=False)

In [None]:
# ============================
    # 4. Save to Silver Layer
    # ============================

In [None]:
silver_path = "Files/silver/network_scheduling_work_cleaned.delta"
    df_clean.write \
        .mode("overwrite") \
        .option("overwriFiles/silver/network_scheduling_work_cleaned.delta"teSchema", "true") \
        .format("delta") \
        .save(silver_path)

In [None]:
print("✅ Cleaned dataset saved to Silver layer.")

In [None]:
{"execution_finish_time":"2025-06-15T15:06:04.5703149Z","execution_start_time":"2025-06-15T15:06:01.1985194Z","livy_statement_state":"available","normalized_state":"finished","parent_msg_id":"c1f5c5ad-422d-447e-a28a-e17f446767a3","queued_time":"2025-06-15T15:06:01.1972833Z","session_id":"f67e43a6-7ca0-4fcc-b74c-d07c3cc92e58","session_start_time":null,"spark_pool":null,"state":"finished","statement_id":11,"statement_ids":[11]}

In [None]:
✅ Bronze schema loaded:
    root
     |-- zipcode: string (nullable = true)
     |-- municipality: string (nullable = true)
     |-- parish: string (nullable = true)
     |-- updatedatetime: string (nullable = true)
     |-- startdatetime: string (nullable = true)
     |-- enddatetime: string (nullable = true)
     |-- durationallocation: string (nullable = true)
     |-- interrupcao_programada: string (nullable = true)
     |-- municipalitycode: string (nullable = true)
     |-- parishcode: string (nullable = true)

In [None]:
📊 Total Bronze rows: 241
    +--------+--------+------------+---------------------------------------------------------+----------------------------------------------------------------+----------------------------------------------------------------+----------------------------------------------------------------+
    |zipcode |zipcode4|municipality|parish                                                   |sk_zipcode                                                      |sk_municipality                                                 |sk_parish                                                       |
    +--------+--------+------------+---------------------------------------------------------+----------------------------------------------------------------+----------------------------------------------------------------+----------------------------------------------------------------+
    |2640-566|2640    |MAFRA       |MAFRA                                                    |6c6ece85b5d6ea8c5b1901b34909d170569478006b74472e49d7535ab4b4a94d|af9456b934b49d2a8e9b828746e7adb3a0005158d903865d49ed07f93f1503a7|fd505e49bc4af64ee9cda5c2eed442ed4e7ca2ac4cb8335d1ca79c2699940295|
    |2640-577|2640    |MAFRA       |MAFRA                                                    |6c6ece85b5d6ea8c5b1901b34909d170569478006b74472e49d7535ab4b4a94d|af9456b934b49d2a8e9b828746e7adb3a0005158d903865d49ed07f93f1503a7|fd505e49bc4af64ee9cda5c2eed442ed4e7ca2ac4cb8335d1ca79c2699940295|
    |2665-315|2665    |MAFRA       |MILHARADO                                                |0e8b7a7f79b3098b0162b4c83a30a789110ea380d6ce4675a938bc90616279d6|af9456b934b49d2a8e9b828746e7adb3a0005158d903865d49ed07f93f1503a7|9c4ae5716ad543c3d49322879022759b80f39b1722fbf10c4a6a7b97e42de76d|
    |2665-374|2665    |MAFRA       |MILHARADO                                                |0e8b7a7f79b3098b0162b4c83a30a789110ea380d6ce4675a938bc90616279d6|af9456b934b49d2a8e9b828746e7adb3a0005158d903865d49ed07f93f1503a7|9c4ae5716ad543c3d49322879022759b80f39b1722fbf10c4a6a7b97e42de76d|
    |2870-517|2870    |MONTIJO     |UNIÃO DAS FREGUESIAS DE ATALAIA E ALTO ESTANQUEIRO-JARDIA|72a2d4365f37780690ee9d05b9a173e9036187fbfd5b5ae61785c5d5b0bf8a8a|8b720bfa5b5466857abeb3a74ec1d2018b672b60c54bcb18ca0b675a389dcd30|e83d03b423ba91b5890b2719bfed1221e69e59e5b2ed7a113ffdb2bfe8dadfcc|
    |2825-481|2825    |ALMADA      |COSTA DA CAPARICA                                        |caa6a0f78b21879ac0cd9221fbf8a4ca335eb29e1f516cc201dffa3d96955817|d84a7bbdbc461b7762a7a439ad304dabc115df528deabe7f8a17a2ee5251f6b0|7a71c2d5db20149832597ffb6ef9969cc0e72a326e193d4c3690772519c2f732|
    |2640-601|2640    |MAFRA       |UNIÃO DAS FREGUESIAS DE AZUEIRA E SOBRAL DA ABELHEIRA    |6c6ece85b5d6ea8c5b1901b34909d170569478006b74472e49d7535ab4b4a94d|af9456b934b49d2a8e9b828746e7adb3a0005158d903865d49ed07f93f1503a7|dccc50ce75d2027d802ca8801cb4af5ba3d2fc5735ef3c19b31901161431e6d6|
    |2640-616|2640    |MAFRA       |UNIÃO DAS FREGUESIAS DE AZUEIRA E SOBRAL DA ABELHEIRA    |6c6ece85b5d6ea8c5b1901b34909d170569478006b74472e49d7535ab4b4a94d|af9456b934b49d2a8e9b828746e7adb3a0005158d903865d49ed07f93f1503a7|dccc50ce75d2027d802ca8801cb4af5ba3d2fc5735ef3c19b31901161431e6d6|
    |2640-618|2640    |MAFRA       |UNIÃO DAS FREGUESIAS DE AZUEIRA E SOBRAL DA ABELHEIRA    |6c6ece85b5d6ea8c5b1901b34909d170569478006b74472e49d7535ab4b4a94d|af9456b934b49d2a8e9b828746e7adb3a0005158d903865d49ed07f93f1503a7|dccc50ce75d2027d802ca8801cb4af5ba3d2fc5735ef3c19b31901161431e6d6|
    |2640-622|2640    |MAFRA       |UNIÃO DAS FREGUESIAS DE AZUEIRA E SOBRAL DA ABELHEIRA    |6c6ece85b5d6ea8c5b1901b34909d170569478006b74472e49d7535ab4b4a94d|af9456b934b49d2a8e9b828746e7adb3a0005158d903865d49ed07f93f1503a7|dccc50ce75d2027d802ca8801cb4af5ba3d2fc5735ef3c19b31901161431e6d6|
    +--------+--------+------------+---------------------------------------------------------+----------------------------------------------------------------+----------------------------------------------------------------+----------------------------------------------------------------+
    only showing top 10 rows

In [None]:
✅ Cleaned dataset saved to Silver layer.

In [None]:
consumos_horario_codigo_postal

In [None]:
from pyspark.sql import SparkSession
    from pyspark.sql.functions import col, sha2

In [None]:
spark = SparkSession.builder.getOrCreate()

In [None]:
# 1. Load Bronze CSV
    bronze_path = "Files/bronze/consumos_horario_codigo_postal/*.csv"
    df = (
        spark.read
        .option("header", True)
        .option("delimiter", ",")
        .csv(bronze_path)
    )

In [None]:
print("✅ Bronze schema loded:")
    df.printSchema()
    print("📊 Total Bronze rows:", df.count())

In [None]:
# 2. Normalize & Generate SK
    df_clean = df.withColumn("sk_zipcode", sha2(col("codigo_postal"), 256))

In [None]:
# 3. Preview
    df_clean.select("codigo_postal", "sk_zipcode").show(10, truncate=False)

In [None]:
# 4. Save to Silver Layer
    silver_path = "Files/silver/consumos_horario_codigo_postal_cleaned.delta"
    df_clean.write.mode("overwrite").format("delta").save(silver_path)

In [None]:
print("✅ Cleaned dataset saved to Silver layer.")

In [None]:
{"execution_finish_time":"2025-06-15T15:27:53.9485189Z","execution_start_time":"2025-06-15T15:27:44.2304312Z","livy_statement_state":"available","normalized_state":"finished","parent_msg_id":"3d032b65-1c47-48ca-a206-31aec77a37e9","queued_time":"2025-06-15T15:27:44.2292346Z","session_id":"f67e43a6-7ca0-4fcc-b74c-d07c3cc92e58","session_start_time":null,"spark_pool":null,"state":"finished","statement_id":15,"statement_ids":[15]}

In [None]:
✅ Bronze schema loded:
    root
     |-- datahora: string (nullable = true)
     |-- dt_consumo: string (nullable = true)
     |-- hr_consumo: string (nullable = true)
     |-- codigo_postal: string (nullable = true)
     |-- consumo: string (nullable = true)
     |-- dia_semana: string (nullable = true)

In [None]:
📊 Total Bronze rows: 3727439
    +-------------+----------------------------------------------------------------+
    |codigo_postal|sk_zipcode                                                      |
    +-------------+----------------------------------------------------------------+
    |2230         |903a4207be29cb52c7c28b6b3e83b7bea776a390167924fe8ff18aa325f10285|
    |6110         |7263af08814e11782e313d81492e2c644c3152b0f42deeff9c0efa80667b5094|
    |3885         |8f1d857513c617aa479c66f399c8b9c509d7c76aba96a82fc4f2b96b807605c3|
    |4750         |a360748ed90aa02c3f79827f821595e497abbd2ea092c7d8fe852ddcb1008393|
    |4930         |afb36973671a3f3a0d2b2078c1d9aac9f2d019b374de201c25942dc3a2e62d15|
    |8900         |0d02233b2626a00cc924ac6c228433c46ec307678ad1d70687831fdfe73b25f7|
    |6290         |416c6556e9aac3efdbb161a80b21f905e94bdb93e8a13fba14f221a240321f02|
    |4850         |13d182f2d86098fafd7e3bdcd19e0fd39826cc3db5e15f8ecc71bcb655f0c07b|
    |6000         |8d284220975da66c6cba4a32aaedfa0b897187d86ed9f7a2419c9bd10a103505|
    |3515         |ed893e02d0d3b0c3e4af6e9323fc027f26601042869dfe909f84692d98a8cd39|
    +-------------+----------------------------------------------------------------+
    only showing top 10 rows

In [None]:
✅ Cleaned dataset saved to Silver layer.

In [None]:
cadastro_iluminacao_publica

In [None]:
from pyspark.sql import SparkSession
    from pyspark.sql.functions import col, trim, upper, sha2, concat_ws, split

In [None]:
spark = SparkSession.builder.getOrCreate()

In [None]:
# ============================
    # 1. Load Bronze CSV
    # ============================

In [None]:
bronze_path = "Files/bronze/cadastro_iluminacao_publica/*.csv"
    df = (
        spark.read
        .option("header", True)
        .option("delimiter", ",")
        .csv(bronze_path)
    )

In [None]:
# Se vier tudo numa só coluna, aplicar split forçado
    if len(df.columns) == 1:
        first_col = df.columns[0]
        expected_cols = [
            "ano", "mes", "distrito", "concelho", "freguesia",
            "tipo_de_lampada", "luminarias", "potencia_instalada_total",
            "coddistrito", "coddistritoconcelho", "coddistritoconcelhofreguesia"
        ]
        df = df.withColumn("split", split(col(first_col), ";"))
        for i, name in enumerate(expected_cols):
            df = df.withColumn(name, col("split").getItem(i))
        df = df.drop("split").drop(first_col)

In [None]:
print("✅ Bronze schema loaded:")
    df.printSchema()
    print("📊 Total Bronze rows:", df.count())

In [None]:
# ============================
    # 2. Normalize & Generate SK
    # ============================

In [None]:
df_clean = (
        df.withColumn("district", upper(trim(col("distrito"))))
          .withColumn("municipality", upper(trim(col("concelho"))))
          .withColumn("parish", upper(trim(col("freguesia"))))
          .withColumn("sk_parish", sha2(concat_ws("-", "district", "municipality", "parish"), 256))
    )

In [None]:
# ============================
    # 3. Preview
    # ============================

In [None]:
df_clean.select("district", "municipality", "parish", "sk_parish").show(10, truncate=False)

In [None]:
# ============================
    # 4. Save to Silver Layer
    # ============================

In [None]:
silver_path = "Files/silver/cadastro_iluminacao_publica_cleaned.delta"
    df_clean.write.mode("overwrite").format("delta").save(silver_path)

In [None]:
print("✅ Cleaned dataset saved to Silver layer.")

In [None]:
{"execution_finish_time":"2025-06-15T15:34:34.7736304Z","execution_start_time":"2025-06-15T15:34:31.3080931Z","livy_statement_state":"available","normalized_state":"finished","parent_msg_id":"8d4689c8-49e5-4d73-a610-42e61f433d4a","queued_time":"2025-06-15T15:34:31.3067925Z","session_id":"f67e43a6-7ca0-4fcc-b74c-d07c3cc92e58","session_start_time":null,"spark_pool":null,"state":"finished","statement_id":17,"statement_ids":[17]}

In [None]:
✅ Bronze schema loaded:
    root
     |-- ano: string (nullable = true)
     |-- mes: string (nullable = true)
     |-- distrito: string (nullable = true)
     |-- concelho: string (nullable = true)
     |-- freguesia: string (nullable = true)
     |-- tipo_de_lampada: string (nullable = true)
     |-- luminarias: string (nullable = true)
     |-- potencia_instalada_total: string (nullable = true)
     |-- coddistrito: string (nullable = true)
     |-- coddistritoconcelho: string (nullable = true)
     |-- coddistritoconcelhofreguesia: string (nullable = true)

In [None]:
📊 Total Bronze rows: 9315
    +--------+------------+------------------------------------------------+----------------------------------------------------------------+
    |district|municipality|parish                                          |sk_parish                                                       |
    +--------+------------+------------------------------------------------+----------------------------------------------------------------+
    |AVEIRO  |ÁGUEDA      |AGUADA DE CIMA                                  |b17a6d4d8ffa115b8eb27d40d3c066e8dd3b81bc5d27ccc1f8ef576437ba868e|
    |AVEIRO  |ÁGUEDA      |FERMENTELOS                                     |1962f0904f21f8d7fedee020a8b8cc292ec1ccc2d724c665b3bbcb0275bbb359|
    |AVEIRO  |ÁGUEDA      |MACINHATA DO VOUGA                              |0ccb08873e78309e306f745ee37b6f592478f3317fdb242b69881f7ec6485b29|
    |AVEIRO  |ÁGUEDA      |MACINHATA DO VOUGA                              |0ccb08873e78309e306f745ee37b6f592478f3317fdb242b69881f7ec6485b29|
    |AVEIRO  |ÁGUEDA      |MACINHATA DO VOUGA                              |0ccb08873e78309e306f745ee37b6f592478f3317fdb242b69881f7ec6485b29|
    |AVEIRO  |ÁGUEDA      |VALONGO DO VOUGA                                |706112d2497e98a124f0581edfe4175b30390392a9a4371e07ac1fe3914b640a|
    |AVEIRO  |ÁGUEDA      |ÁGUEDA E BORRALHA                               |39e4b99cdde83ca2acf4812d27133efd38dd4f3dea4f7a08e0fd0bc597fea444|
    |AVEIRO  |ÁGUEDA      |BELAZAIMA DO CHÃO, CASTANHEIRA DO VOUGA E AGADÃO|20711269c10afe8bdc936403332e2cb0a8879b9e493d8ef48abd501d334b226e|
    |AVEIRO  |ÁGUEDA      |RECARDÃES E ESPINHEL                            |6690a0246a630121c541d4f29c40855288b5a23b623ff0c08086752b837ff6f2|
    |AVEIRO  |ÁGUEDA      |RECARDÃES E ESPINHEL                            |6690a0246a630121c541d4f29c40855288b5a23b623ff0c08086752b837ff6f2|
    +--------+------------+------------------------------------------------+----------------------------------------------------------------+
    only showing top 10 rows

In [None]:
✅ Cleaned dataset saved to Silver layer.

In [None]:
from pyspark.sql import SparkSession
    from pyspark.sql.functions import col, trim, upper, sha2, concat_ws, split

In [None]:
spark = SparkSession.builder.getOrCreate()

In [None]:
# ============================
    # 1. Load Bronze CSV
    # ============================
    bronze_path = "Files/bronze/22_diagrama_de_carga_por_instalacao/*.csv"
    df = (
        spark.read
        .option("header", True)
        .option("delimiter", ",")  # Usa ";" pois os dados vêm assim
        .csv(bronze_path)
    )

In [None]:
print("✅ Bronze schema loaded:")
    df.printSchema()
    print("📊 Total Bronze rows:", df.count())

In [None]:
# ============================
    # 2. Normalize & create SK
    # ============================
    df_clean = (
        df.withColumn("district", upper(trim(col("distrito"))))
          .withColumn("municipality", upper(trim(col("concelho"))))
          .withColumn("parish", upper(trim(col("freguesia"))))
          .withColumn("sk_parish", sha2(concat_ws("-", "district", "municipality", "parish"), 256))
    )

In [None]:
# ============================
    # 3. Preview
    # ============================
    df_clean.select("district", "municipality", "parish", "sk_parish").show(10, truncate=False)

In [None]:
# ============================
    # 4. Save to Silver Layer
    # ============================
    silver_path = "Files/silver/22_diagrama_de_carga_por_instalacao_cleaned.delta"
    df_clean.write.mode("overwrite").format("delta").save(silver_path)

In [None]:
print("✅ Cleaned dataset saved to Silver layer.")

In [None]:
{"execution_finish_time":"2025-06-15T15:43:26.1933534Z","execution_start_time":"2025-06-15T15:43:23.7836993Z","livy_statement_state":"available","normalized_state":"finished","parent_msg_id":"1444a18f-4710-4d52-85bc-44e56f3c47e8","queued_time":"2025-06-15T15:43:23.7824344Z","session_id":"f67e43a6-7ca0-4fcc-b74c-d07c3cc92e58","session_start_time":null,"spark_pool":null,"state":"finished","statement_id":19,"statement_ids":[19]}

In [None]:
✅ Bronze schema loaded:
    root
     |-- ano: string (nullable = true)
     |-- mes: string (nullable = true)
     |-- data: string (nullable = true)
     |-- distrito: string (nullable = true)
     |-- concelho: string (nullable = true)
     |-- freguesia: string (nullable = true)
     |-- cpes_com_dcs_recolhidos: string (nullable = true)
     |-- coddistrito: string (nullable = true)
     |-- coddistritoconcelho: string (nullable = true)
     |-- coddistritoconcelhofreguesia: string (nullable = true)
     |-- mobilidade_eletrica: string (nullable = true)

In [None]:
📊 Total Bronze rows: 89846
    +---------+------------------+-------------------+----------------------------------------------------------------+
    |district |municipality      |parish             |sk_parish                                                       |
    +---------+------------------+-------------------+----------------------------------------------------------------+
    |BRAGA    |VIEIRA DO MINHO   |PINHEIRO           |efd2c2b9ef6bb71de5485d44e9138dec29ee7e1e8c4cb0fbb7a454435a66e7f7|
    |GUARDA   |FORNOS DE ALGODRES|ALGODRES           |30b5420829552c8c58065e43f5f7fe34b070bb90096ad6848c42a787230cf2e8|
    |BRAGA    |BARCELOS          |MANHENTE           |34e6caa7bf715f37f21ccad292373581590f39a8f8fb4e0add73af5d57c8acf2|
    |GUARDA   |FORNOS DE ALGODRES|CASAL VASCO        |45784a04e3ea9af5b99690771934c055ed8672c9b4ddc0c769054a018ba9ddc7|
    |VILA REAL|SABROSA           |SABROSA            |1c393898a1d93e9668bb422d798d027eb28cb70547dc9c0228e353de0fbbf97f|
    |LISBOA   |ALENQUER          |OLHALVO            |6ce312cc57316f7abc0d440a2c288105eea8ef758904e18c5193aad95f6606c4|
    |PORTO    |BAIÃO             |LOIVOS DO MONTE    |3134d5ca2fbd97bd797da40232dcbcd63c4874d9b86004f75498edb36d2c95a2|
    |LISBOA   |LOURINHÃ          |MOITA DOS FERREIROS|79b6dc37284922bbb7e3b1353d867d7534d1bf71cc3a0922cc523e3033fdf108|
    |LISBOA   |OEIRAS            |CARNAXIDE E QUEIJAS|a1252706b4cf3fca89e3d0eddf3383b8d61a21feff6620b12af581cfe613a87d|
    |BRAGA    |FAFE              |REVELHE            |350dfecb269f2eed280fc0c381cc04b0f9944d5d4a38672014726dcc0cd24703|
    +---------+------------------+-------------------+----------------------------------------------------------------+
    only showing top 10 rows

In [None]:
✅ Cleaned dataset saved to Silver layer.

In [None]:
21_contadores_de_energia

In [None]:
from pyspark.sql import SparkSession
    from pyspark.sql.functions import col, trim, upper, sha2, concat_ws, split

In [None]:
spark = SparkSession.builder.getOrCreate()

In [None]:
# ============================
    # 1. Load Bronze CSV
    # ============================

In [None]:
bronze_path = "Files/bronze/21_contadores_de_energia/*.csv"
    df = spark.read.option("header", True).option("delimiter", ";").csv(bronze_path)

In [None]:
# Se só houver uma coluna (erro de leitura), forçar split
    if len(df.columns) == 1:
        first_col = df.columns[0]
        expected_cols = [
            "ano", "mes", "data", "distrito", "concelho", "freguesia",
            "inclui_emi", "cpes", "coddistrito", "coddistritoconcelho",
            "coddistritoconcelhofreguesia", "contrato_ativo"
        ]
        df = df.withColumn("split", split(col(first_col), ","))
        for i, name in enumerate(expected_cols):
            df = df.withColumn(name, col("split").getItem(i))
        df = df.drop("split").drop(first_col)

In [None]:
print("✅ Bronze schema loaded:")
    df.printSchema()
    print("📊 Total Bronze rows:", df.count())

In [None]:
# ============================
    # 2. Normalize & Generate SK
    # ============================

In [None]:
df_clean = (
        df.withColumn("district", upper(trim(col("distrito"))))
          .withColumn("municipality", upper(trim(col("concelho"))))
          .withColumn("parish", upper(trim(col("freguesia"))))
          .withColumn("sk_parish", sha2(concat_ws("-", "district", "municipality", "parish"), 256))
    )

In [None]:
# ============================
    # 3. Preview
    # ============================

In [None]:
df_clean.select("district", "municipality", "parish", "sk_parish").show(10, truncate=False)

In [None]:
# ============================
    # 4. Save to Silver Layer
    # ============================

In [None]:
silver_path = "Files/silver/21_contadores_de_energia_cleaned.delta"
    df_clean.write.mode("overwrite").format("delta").save(silver_path)

In [None]:
print("✅ Cleaned dataset saved to Silver layer.")

In [None]:
{"execution_finish_time":"2025-06-15T21:34:10.1924539Z","execution_start_time":"2025-06-15T21:33:34.3350364Z","livy_statement_state":"available","normalized_state":"finished","parent_msg_id":"9660f5d2-ce79-4461-9cd9-fb5045f3a6fe","queued_time":"2025-06-15T21:33:34.333827Z","session_id":"a5897fa1-d45c-4e35-ad4b-358f1f219a58","session_start_time":null,"spark_pool":null,"state":"finished","statement_id":4,"statement_ids":[4]}

In [None]:
✅ Bronze schema loaded:
    root
     |-- ano: string (nullable = true)
     |-- mes: string (nullable = true)
     |-- data: string (nullable = true)
     |-- distrito: string (nullable = true)
     |-- concelho: string (nullable = true)
     |-- freguesia: string (nullable = true)
     |-- inclui_emi: string (nullable = true)
     |-- cpes: string (nullable = true)
     |-- coddistrito: string (nullable = true)
     |-- coddistritoconcelho: string (nullable = true)
     |-- coddistritoconcelhofreguesia: string (nullable = true)
     |-- contrato_ativo: string (nullable = true)

In [None]:
📊 Total Bronze rows: 198435
    +----------+--------------------+---------------------+----------------------------------------------------------------+
    |district  |municipality        |parish               |sk_parish                                                       |
    +----------+--------------------+---------------------+----------------------------------------------------------------+
    |VISEU     |VISEU               |LORDOSA              |5234bf3c8ddd2cffa3bcd753f6f0f146e5196d6f1c2499bafc5cdd26d2d4c683|
    |SANTARÉM  |TORRES NOVAS        |PEDRÓGÃO             |c0c100a4eb8eb0d27500efee179a1189ef84b87870a1035e995ac1bc6a92ae90|
    |PORTO     |AMARANTE            |"BUSTELO             |17d81b66d41eabcd9c7d2e9a52e5df3c041ab5d81ddbf49f03f805e7b4c3232f|
    |VILA REAL |ALIJÓ               |SANTA EUGÉNIA        |9be1bf5d2598ff4bc3336dea81fb1d34275b5e5a53f581c8d832a3b171eba6c7|
    |VILA REAL |CHAVES              |VILELA DO TÂMEGA     |757fac6a102f8651296b71e6362dab4f62dcd91bd381ae00d336c4ab4b458090|
    |BRAGANÇA  |MACEDO DE CAVALEIROS|TALHINHAS E BAGUEIXE |811791df62571df9b9f76d93f2fa516d66775caa901e9a56a4b2b1bb028b20f2|
    |VISEU     |OLIVEIRA DE FRADES  |SÃO VICENTE DE LAFÕES|855fcd23367495f528439a4d5c5ed5a0be4d302c9460af8e6f5c662777f1787d|
    |BRAGANÇA  |MACEDO DE CAVALEIROS|MACEDO DE CAVALEIROS |5093dfc0dc2894c852c4d291ef71d7430cf4fe94394d4f6416628c422175bca6|
    |PORTALEGRE|NISA                |"ESPÍRITO SANTO      |d0f6d31baeaf2e051827b33acf48f1993e2e9a724673e736308241710b201b61|
    |GUARDA    |GOUVEIA             |GOUVEIA              |b9da7dbc6c6c95c537b6ca477059e2cb8bf7bbf41905f0b7141e777084b2fa5f|
    +----------+--------------------+---------------------+----------------------------------------------------------------+
    only showing top 10 rows

In [None]:
✅ Cleaned dataset saved to Silver layer.

In [None]:
normalize_carga_na_subestacao_cleaned

In [None]:
from pyspark.sql import SparkSession
    from pyspark.sql.functions import col, sha2, concat_ws, upper, trim

In [None]:
spark = SparkSession.builder.getOrCreate()

In [None]:
# --------------------------
    # STEP 1: Read Bronze CSV
    # --------------------------
    bronze_path = "Files/bronze/carga_na_subestacao/*.csv"

In [None]:
df = (
        spark.read.option("header", True)
        .option("delimiter", ",")
        .csv(bronze_path)
    )

In [None]:
print("📥 Bronze data loaded:")
    df.show(5, truncate=False)

In [None]:
# --------------------------
    # STEP 2: Normalize columns
    # --------------------------
    df_cleaned = (
        df.withColumn("ano", trim(col("ano")))
          .withColumn("nome", trim(upper(col("nome"))))
          .withColumn("municipio", trim(upper(col("nome"))))  # TEMP: using 'nome' as proxy for municipality
          .withColumn("sk_municipality", sha2(concat_ws("-", col("municipio")), 256))
    )

In [None]:
# --------------------------
    # STEP 3: Preview cleaned data
    # --------------------------
    print("✅ Cleaned preview with surrogate key:")
    df_cleaned.select(
        "ano", "nome", "municipio", "sk_municipality", "potencia_instalada"
    ).show(10, truncate=False)

In [None]:
# --------------------------
    # STEP 4: Save to Silver as Delta
    # --------------------------
    silver_path = "Files/silver/carga_na_subestacao_cleaned.delta"

In [None]:
df_cleaned.write.mode("overwrite").format("delta").save(silver_path)

In [None]:
print("💾 Saved to Silver layer as Delta:")
    print(f"✅ {silver_path} created successfully.")

In [None]:
{"execution_finish_time":"2025-06-17T08:28:52.4805282Z","execution_start_time":"2025-06-17T08:28:29.6749399Z","livy_statement_state":"available","normalized_state":"finished","parent_msg_id":"ffe072a5-39ad-4285-a583-418b5d11a41e","queued_time":"2025-06-17T08:28:29.673767Z","session_id":"a10d3e37-429a-4a11-a996-ed9a9e4da91f","session_start_time":null,"spark_pool":null,"state":"finished","statement_id":5,"statement_ids":[5]}

In [None]:
📥 Bronze data loaded:

In [None]:
+----+--------------------+----------+------+-------------+-------------+------------------+------------------+----------------------+-----------------+-------------------+--------------------------------+
    |ano |codigo_da_instalacao|nome      |tensao|inverno_verao|carga_natural|potencia_instalada|potencia_garantida|potencia_nao_garantida|disponibilidade  |carga_nao_garantida|potencia_instalada_nao_garantida|
    +----+--------------------+----------+------+-------------+-------------+------------------+------------------+----------------------+-----------------+-------------------+--------------------------------+
    |2024|1105S5901100        |ABÓBODA   |60/10 |Inverno      |30.411       |40.0              |34.0              |6.0                   |3.588999999999998|0.0                |6.0                             |
    |2024|1106S5275000        |AEROPORTO |60/10 |Inverno      |39.335       |80.0              |68.0              |12.0                  |28.665           |0.0                |12.0                            |
    |2024|0101S5000600        |AGUEDA    |60/15 |Verão        |32.545       |63.0              |48.825            |14.174999999999995    |16.28            |0.0                |14.174999999999995              |
    |2024|0613S5003800        |AGUIEIRA  |60/15 |Verão        |6.847        |20.0              |10.0              |10.0                  |3.153            |0.0                |10.0                            |
    |2024|0102S5000700        |ALBERGARIA|60/15 |Verão        |13.022       |20.0              |10.0              |10.0                  |0.0              |3.022              |6.978                           |
    +----+--------------------+----------+------+-------------+-------------+------------------+------------------+----------------------+-----------------+-------------------+--------------------------------+
    only showing top 5 rows

In [None]:
✅ Cleaned preview with surrogate key:
    +----+--------------+--------------+----------------------------------------------------------------+------------------+
    |ano |nome          |municipio     |sk_municipality                                                 |potencia_instalada|
    +----+--------------+--------------+----------------------------------------------------------------+------------------+
    |2024|ABÓBODA       |ABÓBODA       |0b77d6c258244ba9fc879b6a5bcfb38aad46399e01e4fa943c5f17ef865331e2|40.0              |
    |2024|AEROPORTO     |AEROPORTO     |a2af03c6c5ec35c892aa0bfb44d91f622b5c2709900a2c71f4bb7cd1ec25d0e1|80.0              |
    |2024|AGUEDA        |AGUEDA        |f74dba4a1cb6711e7b0a4e0eae897ba020434158bbe29d8256738a02671a6bc7|63.0              |
    |2024|AGUIEIRA      |AGUIEIRA      |08457a3a1057363dcfba0dedc64b624237dda658d2d31995c12daea739bc89f0|20.0              |
    |2024|ALBERGARIA    |ALBERGARIA    |ef4217b9493d2318b749a919da834cd1906884b3d582b1b640c4d51a48203ea4|20.0              |
    |2024|ALBUFEIRA     |ALBUFEIRA     |0dbfdecc70a76916d8f2230c4baf7a345f9eefa2e8ca692d3f04383df023c303|63.0              |
    |2024|ALCÁCER DO SAL|ALCÁCER DO SAL|98052a5421ac8306157573c831dc346948ebaaae764b726480beede0f6969b93|20.0              |
    |2024|ALCÁÇOVA      |ALCÁÇOVA      |6d536d288f25d3f88d2efa96f070c1f638ea47a209b3a4db3307194d271a251a|63.0              |
    |2024|ALCANEDE      |ALCANEDE      |ddc6319a423df48009bdd395032bdc3b62b98fb41387c03f331518755ea3da28|20.0              |
    |2024|ALCOITÃO      |ALCOITÃO      |679337afc1c0b1b8234e890d20e87d487359d03fec37a6248c498b16f8ae1384|40.0              |
    +----+--------------+--------------+----------------------------------------------------------------+------------------+
    only showing top 10 rows

In [None]:
💾 Saved to Silver layer as Delta:
    ✅ Files/silver/carga_na_subestacao_cleaned.delta created successfully.

In [None]:
energia_produzida_total_nacional

In [None]:
from pyspark.sql.functions import col, to_date, date_format

In [None]:
# -------------------------------
    # STEP 1: Load Bronze CSV
    # -------------------------------
    print("📥 Loading Bronze CSV from energia_produzida_total_nacional...")
    path_bronze = "Files/bronze/energia_produzida_total_nacional"
    df = spark.read.option("header", True).option("delimiter", ",").option("inferSchema", True).csv(path_bronze)

In [None]:
print("✅ Raw Schema:")
    df.printSchema()

In [None]:
# -------------------------------
    # STEP 2: Normalize & Clean
    # -------------------------------
    print("🧪 Normalizing columns...")

In [None]:
df_cleaned = df.withColumn("date", to_date(col("datahora"), "yyyy-MM-dd"))

In [None]:
# 🟢 Gerar sk_date compatível com Dim_Date
    df_cleaned = df_cleaned.withColumn("sk_date", date_format(col("date"), "yyyyMMdd").cast("int"))

In [None]:
# Renomear e selecionar colunas relevantes
    df_cleaned = df_cleaned.withColumnRenamed("total", "energy_production")
    df_cleaned = df_cleaned.select("date", "sk_date", "energy_production")

In [None]:
print("🧹 Cleaned Preview:")
    df_cleaned.show(5)

In [None]:
# -------------------------------
    # STEP 3: Save as Silver Delta Table
    # -------------------------------
    path_silver = "Files/silver/energia_produzida_total_nacional_cleaned.delta"
    print("💾 Saving cleaned Silver table...")
    df_cleaned.write.format("delta").mode("overwrite").option("overwriteSchema", "true").save(path_silver)

In [None]:
print("🗃️ Registering SQL table...")
    spark.sql("DROP TABLE IF EXISTS energia_produzida_total_nacional_cleaned")
    spark.sql(f"CREATE TABLE energia_produzida_total_nacional_cleaned USING DELTA LOCATION '{path_silver}'")

In [None]:
print("✅ Silver table created and registered: energia_produzida_total_nacional_cleaned")

In [None]:
{"execution_finish_time":"2025-06-20T11:59:17.4247711Z","execution_start_time":"2025-06-20T11:59:02.8752059Z","livy_statement_state":"available","normalized_state":"finished","parent_msg_id":"887b2470-6a6b-4f90-8a0d-a9ae6998e355","queued_time":"2025-06-20T11:59:02.8740468Z","session_id":"aaf21249-a302-4182-8df3-b2fbf01c111f","session_start_time":null,"spark_pool":null,"state":"finished","statement_id":6,"statement_ids":[6]}

In [None]:
📥 Loading Bronze CSV from energia_produzida_total_nacional...
    ✅ Raw Schema:
    root
     |-- datahora: timestamp (nullable = true)
     |-- dia: integer (nullable = true)
     |-- mes: integer (nullable = true)
     |-- ano: integer (nullable = true)
     |-- date: date (nullable = true)
     |-- time: timestamp (nullable = true)
     |-- dgm: double (nullable = true)
     |-- pre: double (nullable = true)
     |-- total: double (nullable = true)

In [None]:
🧪 Normalizing columns...
    🧹 Cleaned Preview:
    +----------+--------+-----------------+
    |      date| sk_date|energy_production|
    +----------+--------+-----------------+
    |2023-06-30|20230630|       1660888.25|
    |2023-06-30|20230630|       1352939.75|
    |2023-07-01|20230701|       1296546.25|
    |2023-07-01|20230701|        1193683.5|
    |2023-07-01|20230701|        1184886.5|
    +----------+--------+-----------------+
    only showing top 5 rows

In [None]:
💾 Saving cleaned Silver table...
    🗃️ Registering SQL table...
    ✅ Silver table created and registered: energia_produzida_total_nacional_cleaned

In [None]:
energia_injetada_na_rede_de_distribuicao

In [None]:
from pyspark.sql import SparkSession
    from pyspark.sql.functions import col, to_date, date_format, sum as _sum

In [None]:
# Iniciar Spark session
    spark = SparkSession.builder.getOrCreate()

In [None]:
# -------------------------------
    # STEP 1: Load Bronze CSV
    # -------------------------------
    print("📥 Loading Bronze CSV from energia_injetada_na_rede_de_distribuicao...")
    bronze_path = "Files/bronze/energia_injetada_na_rede_de_distribuicao"
    df = (
        spark.read
        .option("header", True)
        .option("delimiter", ",")
        .option("inferSchema", True)
        .csv(bronze_path)
    )

In [None]:
print("✅ Raw Schema:")
    df.printSchema()

In [None]:
# -------------------------------
    # STEP 2: Normalize & Clean
    # -------------------------------
    print("🧪 Normalizing columns...")

In [None]:
df_cleaned = df.withColumn("date", to_date(col("datahora"))) \
                   .withColumn("sk_date", date_format(col("date"), "yyyyMMdd").cast("int")) \
                   .withColumnRenamed("eolica", "wind_kwh") \
                   .withColumnRenamed("fotovoltaica", "solar_kwh") \
                   .withColumnRenamed("hidrica", "hydro_kwh")

In [None]:
# Agregar valores renováveis por data
    df_aggregated = df_cleaned.groupBy("date", "sk_date").agg(
        _sum("wind_kwh").alias("wind_kwh"),
        _sum("solar_kwh").alias("solar_kwh"),
        _sum("hydro_kwh").alias("hydro_kwh")
    )

In [None]:
df_aggregated = df_aggregated.withColumn(
        "renewable_production_gwh",
        (col("wind_kwh") + col("solar_kwh") + col("hydro_kwh")) / 1_000_000
    )

In [None]:
# -------------------------------
    # STEP 3: Preview Result
    # -------------------------------
    print("🧹 Cleaned Preview:")
    df_aggregated.select("date", "sk_date", "renewable_production_gwh").show(5, truncate=False)

In [None]:
# -------------------------------
    # STEP 4: Save as Silver Delta Table
    # -------------------------------
    silver_path = "Files/silver/energia_injetada_na_rede_de_distribuicao_cleaned.delta"
    print("💾 Saving cleaned Silver table...")
    df_aggregated.write.format("delta").mode("overwrite").option("overwriteSchema", "true").save(silver_path)

In [None]:
# -------------------------------
    # STEP 5: Register as SQL Table
    # -------------------------------
    print("🗃️ Registering SQL table...")
    spark.sql("DROP TABLE IF EXISTS energia_injetada_na_rede_de_distribuicao_cleaned")
    spark.sql(f"CREATE TABLE energia_injetada_na_rede_de_distribuicao_cleaned USING DELTA LOCATION '{silver_path}'")

In [None]:
print("✅ Silver table created and registered: energia_injetada_na_rede_de_distribuicao_cleaned")

In [None]:
{"execution_finish_time":"2025-06-27T21:33:16.1468406Z","execution_start_time":"2025-06-27T21:33:09.8524398Z","livy_statement_state":"available","normalized_state":"finished","parent_msg_id":"2fe051b5-ca01-4305-8cf0-14b977dfdb7d","queued_time":"2025-06-27T21:33:09.8511721Z","session_id":"5c145d45-2b6a-48d7-a748-2ec06f037a02","session_start_time":null,"spark_pool":null,"state":"finished","statement_id":7,"statement_ids":[7]}

In [None]:
📥 Loading Bronze CSV from energia_injetada_na_rede_de_distribuicao...
    ✅ Raw Schema:
    root
     |-- datahora: timestamp (nullable = true)
     |-- dia: integer (nullable = true)
     |-- mes: integer (nullable = true)
     |-- ano: integer (nullable = true)
     |-- date: date (nullable = true)
     |-- time: timestamp (nullable = true)
     |-- cogeracao: double (nullable = true)
     |-- eolica: double (nullable = true)
     |-- fotovoltaica: double (nullable = true)
     |-- hidrica: double (nullable = true)
     |-- outras_tecnologias: double (nullable = true)
     |-- rede_dist: double (nullable = true)

In [None]:
🧪 Normalizing columns...
    🧹 Cleaned Preview:
    +----------+--------+------------------------+
    |date      |sk_date |renewable_production_gwh|
    +----------+--------+------------------------+
    |2024-03-20|20240320|24.72743225             |
    |2024-07-26|20240726|42.86764075             |
    |2023-02-09|20230209|63.40180375             |
    |2024-05-22|20240522|30.3270355              |
    |2025-04-03|20250403|71.0119605              |
    +----------+--------+------------------------+
    only showing top 5 rows

In [None]:
💾 Saving cleaned Silver table...
    🗃️ Registering SQL table...
    ✅ Silver table created and registered: energia_injetada_na_rede_de_distribuicao_cleaned