Setting up paths and imports

In [0]:
from pyspark.sql import functions as F

base = "/Volumes/workspace/default/osu-energy-analysis/DATA I-O 2026 Advanced Datasets/advanced_core/advanced_core"
meter_glob = f"{base}/meter-readings-*-2025.csv"
meta_path  = f"{base}/building_metadata.csv"
weather_path = f"{base}/weather_data_hourly_2025.csv"

print(meter_glob)


Loading meter data into a bronze table -> RAW DATA

In [0]:
meter_raw = (spark.read
    .option("header", True)
    .option("inferSchema", True)
    .csv(meter_glob)
)

print("meter_raw rows:", meter_raw.count())
display(meter_raw.limit(5))
meter_raw.printSchema()


Cleaning into a silver table
<ul>
    <li>parsed timestamps</li>
    <li>added time columns</li>
    <li>converted numeric values</li>
    <li>standardized data types</li>
</ul>

In [0]:
from pyspark.sql import functions as F

meter_silver = (meter_raw
    .withColumn("meterid", F.col("meterid").cast("string"))
    .withColumn("siteid", F.col("siteid").cast("string"))
    .withColumn("sitename", F.col("sitename").cast("string"))
    .withColumn("utility", F.upper(F.col("utility").cast("string")))
    .withColumn("readingvalue", F.col("readingvalue").cast("double"))  # already double, safe
    .withColumn("ts", F.col("readingtime"))                            # already timestamp
    .withColumn("date", F.to_date("readingtime"))
    .withColumn("hour_ts", F.date_trunc("hour", F.col("readingtime")))
    .filter(F.col("ts").isNotNull())
)

print("silver rows:", meter_silver.count())
display(meter_silver.select("siteid","utility","ts","readingvalue").limit(5))


Load building meta data

In [0]:
bmeta_raw = spark.read.option("header", True).csv(meta_path)
display(bmeta_raw.limit(5))
print(bmeta_raw.columns)


Loading weather raw data

In [0]:
weather_raw = (spark.read
    .option("header", True)
    .option("inferSchema", True)
    .csv(weather_path)
)

display(weather_raw.limit(5))
print(weather_raw.columns)
weather_raw.printSchema()


In [0]:
dim_weather = (weather_raw
    .withColumnRenamed("date", "weather_ts")
    .withColumnRenamed("latitude", "weather_latitude")
    .withColumnRenamed("longitude", "weather_longitude")
    .withColumn("hour_ts", F.date_trunc("hour", F.col("weather_ts")))
    .drop("partition_0")
)


Joined table, meter + building + weather

In [0]:
from pyspark.sql import functions as F

def norm(col):
    x = col.cast("string")
    x = F.lower(F.trim(x))
    x = F.regexp_replace(x, r"&", " and ")
    x = F.regexp_replace(x, r"[^a-z0-9 ]", " ")   # drop punctuation
    x = F.regexp_replace(x, r"\s+", " ")
    return x


# create normalized name keys
m2 = meter_silver.withColumn("site_name_key", norm(F.col("sitename")))

b2 = (bmeta_raw
      .withColumn("building_name_key", norm(F.col("buildingname")))
      .withColumn("formal_name_key",   norm(F.col("formalname")))
      .withColumn("aka_name_key",      norm(F.col("alsoknownas")))
)

# JOIN USING NAME KEYS
silver_meter_buildings = (
    m2.join(
        b2,
        (m2.site_name_key == b2.building_name_key) |
        (m2.site_name_key == b2.formal_name_key) |
        (m2.site_name_key == b2.aka_name_key),
        "left"
    )
)

# join weather (this part was already correct)
silver_joined = silver_meter_buildings.join(dim_weather, on="hour_ts", how="left")

display(silver_joined.limit(10))


In [0]:
# --- FAST CONTAINS UPGRADE (run right after exact join) ---

# Recompute the exact join result (optional safety; remove if you already have silver_meter_buildings)
# silver_meter_buildings = (m2.join(b2, ..., "left"))

# Rows that didn't match any building metadata in the exact join
unmatched_rows = (silver_meter_buildings
    .filter(F.col("buildingname").isNull())
    .select(m2["*"])          # keep only meter columns
    .distinct()
)

# Fallback match: building/formal/aka contains the meter sitename key
fallback = unmatched_rows.join(
    b2,
    (b2.building_name_key.contains(unmatched_rows.site_name_key)) |
    (b2.formal_name_key.contains(unmatched_rows.site_name_key)) |
    (b2.aka_name_key.contains(unmatched_rows.site_name_key)),
    "left"
)

# Keep the exact matches from pass 1
matched = silver_meter_buildings.filter(F.col("buildingname").isNotNull())

# Combine pass1 matches + pass2 fallback matches
silver_meter_buildings_2pass = matched.unionByName(fallback, allowMissingColumns=True)

# Check improved match rate
total = silver_meter_buildings_2pass.count()
unmatched = silver_meter_buildings_2pass.filter(F.col("buildingname").isNull()).count()
print("building match rate (2-pass):", 1 - unmatched/total)


In [0]:
silver_joined.printSchema()


In [0]:
display(
    silver_meter_buildings
      .filter(F.col("buildingname").isNull())
      .select("sitename")
      .groupBy("sitename")
      .count()
      .orderBy(F.desc("count"))
      .limit(20)
)


In [0]:
total = silver_joined.count()
b_unmatched = silver_joined.filter(F.col("buildingname").isNull()).count()
w_unmatched = silver_joined.filter(F.col("temperature_2m").isNull()).count()

print("building match rate:", 1 - b_unmatched/total)
print("weather match rate:",  1 - w_unmatched/total)


In [0]:
silver_joined.filter((F.col("year") == 2025) & (F.col("month").between(1, 4))) \
    .write.mode("overwrite").format("delta").saveAsTable(
        "workspace.default.silver_energy_joined_2025"
    )

Display the filtered silver table that only includes the necessary columns from janurary to april

In [0]:
display(

    silver_joined
    .select(
        # time
        "ts",
        "hour_ts",
        "date",

        # energy
        "meterid",
        "siteid",
        "sitename",
        "utility",
        "readingvalue",
        "readingunits",

        # building
        "buildingname",
        "campusname",
        "grossarea",
        "latitude",
        "longitude",

        # weather
        "temperature_2m",
        "relative_humidity_2m",
        "precipitation",
        "wind_speed_10m",
        "cloud_cover",
        "apparent_temperature"
    )
    .filter(
        (F.col("year") == 2025) &
        (F.col("month").between(1,4))
    )
    .orderBy("ts")   # optional but makes timeline clean

)
