## Design note

This pipeline intentionally parses files one-by-one using a Python loop.
Reason:
- Header length varies per file
- Key-value pairs are not uniform
- Row-based Spark ingestion caused pivot explosion

This is a conscious design choice.


silver.energy_header
- One row per source file
- Parsed from file header section
- Header length may vary per file

silver.energy_measurements
- Long format
- One row per timestamp per variable
- Parsed from measurement section


## School Energy ML Dimension loading

In [0]:
import glob
from pyspark.sql import Row

desired_columns = [
    'Header_line', 'location', 'year_of_construction', 'floor_area', 'number_of_users', 'building_category', 'dhw_heat_source', 'sh_heat_source',
    'timestamp_format', 'timezone', 'building_id'
]

# Add 'file_name' to desired columns
final_columns = desired_columns + ['file_name']

df = (
    spark.read.format("csv")
    .load("/Volumes/school_energy_ml/bronze/raw_energy")
)

first_line = df.first()[0]
first_line_split = first_line.strip().split(";")
read_rows = int(first_line_split[1]) - 0

file_paths = glob.glob("/Volumes/school_energy_ml/bronze/raw_energy/*.txt")
display(file_paths)
result_rows = []
all_header_rows = []

for file_path in file_paths:
    df = spark.read.text(file_path)
    header_rows = df.limit(read_rows).collect()
    all_header_rows.extend(header_rows)
    columns = [row.value.split(";", 1)[0].strip() for row in header_rows]
    values = [row.value.split(";", 1)[1].strip() if ";" in row.value else "" for row in header_rows]
    filtered = {col: val for col, val in zip(columns, values) if col in desired_columns}
    # Add file name to the row
    row_with_file = [filtered.get(col, "") for col in desired_columns] + [file_path]
    result_rows.append(row_with_file)

result_df = spark.createDataFrame(result_rows, final_columns)
display(result_df) 
result_df.write.mode("overwrite").saveAsTable("school_energy_ml.silver.energy_header")

## School Energy ML Fact data loading

In [0]:
from pyspark.sql.functions import lit
import glob

# Get all file paths
file_paths = glob.glob("/Volumes/school_energy_ml/bronze/raw_energy/export_*.txt")

dfs = []
all_columns = set()

for file_path in file_paths:
    # Read first line to get skip_rows
    first_line = spark.read.text(file_path).first()[0]
    first_line_split = first_line.strip().split(";")
    skip_rows = int(first_line_split[1])

    # Read header row using DataFrame limit (no RDD)
    header_row = (
        spark.read.text(file_path)
        .limit(skip_rows)
        .collect()[-1]
        .value
    )
    header = [col.strip() for col in header_row.split(";")]

    # Read actual data and add file_name column
    df = (
        spark.read
        .option("delimiter", ";")
        .option("inferSchema", True)
        .option("skipRows", skip_rows + 1)
        .csv(file_path)
        .toDF(*header)
        .withColumn("file_name", lit(file_path))
    )

    dfs.append(df)
    all_columns.update(header)

# Add 'file_name' to all_columns
all_columns = list(all_columns) + ["file_name"]
standardized_dfs = []

for df in dfs:
    for col in all_columns:
        if col not in df.columns:
            df = df.withColumn(col, lit(None))
    df = df.select(*all_columns)
    standardized_dfs.append(df)

# Concatenate all DataFrames
final_df = standardized_dfs[0]
for df in standardized_dfs[1:]:
    final_df = final_df.unionByName(df, allowMissingColumns=True)

#display(final_df)
final_df.write.mode("overwrite").saveAsTable("school_energy_ml.silver.energy_measurements")

In [0]:
%sql
select * from school_energy_ml.silver.energy_measurements
where HtBio is not null
