# Setup

In [207]:
# imports
import requests
import datetime
import pandas as pd
from py4j.java_gateway import java_import
from pyspark.sql.functions import to_timestamp, min, max, col, date_trunc, avg, last

In [208]:
base_path = r"abfss://datalake@bessstorage.dfs.core.windows.net/"
bronze_path = base_path + r"bronze/"
silver_path = base_path + r"silver/"

In [209]:
# determine delta table paths in Bronze directory
java_import(spark._jvm, 'org.apache.hadoop.fs.Path')

fs = spark._jvm.org.apache.hadoop.fs.FileSystem.get(spark._jsc.hadoopConfiguration())
status = fs.listStatus(spark._jvm.Path(bronze_path))
delta_table_paths = [str(file.getPath()) for file in status if file.isDirectory()]

In [210]:
# read delta tables as dataframes
dfs = {}

for path in delta_table_paths:
    df = spark.read.format("delta").load(path)
    table_name = path.split("/")[-1]

    print('table name:', table_name)
    dfs[table_name] = df

# Print First 3 Rows of Tables

In [211]:
dfs['meter_meter_1_2025-04-30T20_53_43.157Z_delta'].limit(3).toPandas()

In [212]:
dfs['rtac_rtac_telemetry_2025-04-30T20_46_09.007Z_delta'].limit(3).toPandas()

In [213]:
dfs['site__2025-04-30T20_50_35.909Z_delta'].limit(3).toPandas()

In [214]:
dfs['weather_delta'].limit(3).toPandas()

# Convert time columns to uniform datetime

In [215]:
for df_key in dfs.keys():
    if "_time" in dfs[df_key].columns:
        time_col = "_time"
        date_format = "yyyy-MM-dd'T'HH:mm:ss'Z'"
    elif "hour" in dfs[df_key].columns:
        time_col = "hour"
        date_format = "yyyy-MM-dd'T'HH:mm"

    dfs[df_key] = dfs[df_key].withColumn("datetime", to_timestamp(time_col, date_format))
    dfs[df_key] = dfs[df_key].drop(time_col)

# Delete Null Rows

In [216]:
for df_key in dfs.keys():
    all_cols = dfs[df_key].columns

    cols_to_check = [col for col in all_cols if col != "datetime"]

    dfs[df_key] = dfs[df_key].na.drop(how="all", subset=cols_to_check)

# Change Column Datatypes Appropriately

#### Meter table

In [217]:
# print existing column types
dfs['meter_meter_1_2025-04-30T20_53_43.157Z_delta'].printSchema()

In [218]:
# for each column, print top 3 most common values (to get a sense of appropriate type)
for col_name in dfs['meter_meter_1_2025-04-30T20_53_43.157Z_delta'].columns:
    if col_name not in ["datetime", "_time", "hour"]:
        dfs['meter_meter_1_2025-04-30T20_53_43.157Z_delta'].groupBy(col_name) \
            .count() \
            .orderBy("count", ascending=False) \
            .limit(3) \
            .show(truncate=False)

Since float appears the most appropriate column type for each column, convert them to float

In [219]:
# convert columns to float
for col_name in dfs['meter_meter_1_2025-04-30T20_53_43.157Z_delta'].columns:
    if col_name not in ["datetime", "_time", "hour"]:
        dfs['meter_meter_1_2025-04-30T20_53_43.157Z_delta'] = dfs['meter_meter_1_2025-04-30T20_53_43.157Z_delta'].withColumn(col_name, col(col_name).cast("float"))

In [220]:
# print datatypes again to confirm
dfs['meter_meter_1_2025-04-30T20_53_43.157Z_delta'].printSchema()

#### RTAC table

In [221]:
# print existing column types
dfs['rtac_rtac_telemetry_2025-04-30T20_46_09.007Z_delta'].printSchema()

In [222]:
# for each column, print top 3 most common values (to get a sense of appropriate type)
for col_name in dfs['rtac_rtac_telemetry_2025-04-30T20_46_09.007Z_delta'].columns:
    if col_name not in ["datetime", "_time", "hour"]:
        dfs['rtac_rtac_telemetry_2025-04-30T20_46_09.007Z_delta'].groupBy(col_name) \
            .count() \
            .orderBy("count", ascending=False) \
            .limit(3) \
            .show(truncate=False)

Here, it appears some columns would be best cast as boolean type however we should be okay converting them all to floats as before

In [223]:
# convert columns to float
for col_name in dfs['rtac_rtac_telemetry_2025-04-30T20_46_09.007Z_delta'].columns:
    if col_name not in ["datetime", "_time", "hour"]:
        dfs['rtac_rtac_telemetry_2025-04-30T20_46_09.007Z_delta'] = dfs['rtac_rtac_telemetry_2025-04-30T20_46_09.007Z_delta'].withColumn(col_name, col(col_name).cast("float"))

In [224]:
# print datatypes again to confirm
dfs['rtac_rtac_telemetry_2025-04-30T20_46_09.007Z_delta'].printSchema()

#### Site table

In [225]:
# print existing column types
dfs['site__2025-04-30T20_50_35.909Z_delta'].printSchema()

In [226]:
# for each column, print top 3 most common values (to get a sense of appropriate type)
for col_name in dfs['site__2025-04-30T20_50_35.909Z_delta'].columns:
    if col_name not in ["datetime", "_time", "hour"]:
        dfs['site__2025-04-30T20_50_35.909Z_delta'].groupBy(col_name) \
            .count() \
            .orderBy("count", ascending=False) \
            .limit(3) \
            .show(truncate=False)

In the site table, certain columns should definitely remain as strings whereas the rest should be cast to floats. In particular, the string columns are:
* Application
* ChaSt
* LocRemCtl
* Mode
* Status

In [227]:
# convert columns to float
for col_name in dfs['site__2025-04-30T20_50_35.909Z_delta'].columns:
    if col_name not in ["datetime", "_time", "hour", "Application", "ChaSt", "LocRemCtl", "Mode", "Status"]:
        dfs['site__2025-04-30T20_50_35.909Z_delta'] = dfs['site__2025-04-30T20_50_35.909Z_delta'].withColumn(col_name, col(col_name).cast("float"))

In [228]:
# print datatypes again to confirm
dfs['site__2025-04-30T20_50_35.909Z_delta'].printSchema()

#### Weather table

In [229]:
dfs['weather_delta'].dtypes

In [230]:
for col_name in dfs['weather_delta'].columns:
    if col_name not in ["datetime", "_time", "hour"]:
        dfs['weather_delta'].groupBy(col_name) \
            .count() \
            .orderBy("count", ascending=False) \
            .limit(3) \
            .show(truncate=False)

The weather data table is already properly types, so we don't need to convert any column types here

# Rename Ambiguous Columns

In [231]:
dfs['rtac_rtac_telemetry_2025-04-30T20_46_09.007Z_delta'] = dfs['rtac_rtac_telemetry_2025-04-30T20_46_09.007Z_delta'].withColumnRenamed("lmp", "rtac_lmp")
dfs['site__2025-04-30T20_50_35.909Z_delta'] = dfs['site__2025-04-30T20_50_35.909Z_delta'].withColumnRenamed("lmp", "site_lmp")

# Group By Minute
Take Avg of Float or Last of String Columns

Skip weather table because it is already at hourly resolution

In [232]:
for df_key in dfs.keys():
    if df_key != "weather_delta":
        # order dataset
        dfs[df_key] = dfs[df_key].orderBy("datetime")

        # calculate minute column rounded down
        dfs[df_key] = dfs[df_key].withColumn("datetime_minute", date_trunc("minute", col("datetime")))

        # determine which columns are floats and which are strings
        float_cols = [f.name for f in dfs[df_key].schema.fields if f.dataType.simpleString() == "float"]
        string_cols = [f.name for f in dfs[df_key].schema.fields if f.dataType.simpleString() == "string"]

        # build aggregation expression and perform the groupby
        agg_exprs = [avg(col(c)).alias(c) for c in float_cols] + \
                [last(col(c), ignorenulls=True).alias(c) for c in string_cols]
        dfs[df_key] = dfs[df_key].groupBy("datetime_minute").agg(*agg_exprs)

# Merge datasets together with weather dataset
Based on LMP data showing maximum at midnight, minimum at noon, and local maximum at 2pm, the datasets appear to be in UTC. Since weather data is also in UTC, we don't need to perform any timezone conversion

In [233]:
# merge site dataframes on datetime_minute column
to_merge_tables = [table for table in list(dfs.keys()) if table != "weather_delta"]

df_merged = dfs[to_merge_tables[0]].join(
    dfs[to_merge_tables[1]], on="datetime_minute", how="outer")

df_merged = df_merged.join(
    dfs[to_merge_tables[2]], on="datetime_minute", how="outer")

df_merged.limit(5).toPandas()

In [234]:
# calculate hour column rounded down in order to merge with hourly weather data
df_merged = df_merged.withColumn("datetime_hour", date_trunc("hour", col("datetime_minute")))
df_merged.limit(5).toPandas()

In [235]:
# rename weather datetime column to "datetime_hour" because it is hourly
dfs["weather_delta"] = dfs["weather_delta"].withColumnRenamed("datetime", "datetime_hour")
dfs["weather_delta"].limit(5).toPandas()

In [236]:
# merge datasets with weather
df_merged = df_merged.join(
    dfs["weather_delta"], on="datetime_hour", how="outer")
df_merged.limit(5).toPandas()

# Filter datetime_minute to where all datasets have data

In [237]:
latest_min_datetime = pd.NaT
earliest_max_datetime = pd.NaT

In [238]:
for path in delta_table_paths:
    if "weather_delta" not in path:
        df = spark.read.format("delta").load(path)
        table_name = path.split("/")[-1]
        df = df.withColumn("_time", to_timestamp("_time", "yyyy-MM-dd'T'HH:mm:ss'Z'"))
        df_min_datetime = df.select(min("_time")).collect()[0][0]
        df_max_datetime = df.select(max("_time")).collect()[0][0]
        print(f"{table_name}: min datetime= {df_min_datetime}, max datetime= {df_max_datetime}")

        if pd.isna(latest_min_datetime):
            latest_min_datetime = df_min_datetime
        elif df_min_datetime > latest_min_datetime:
            latest_min_datetime = df_min_datetime

        if pd.isna(earliest_max_datetime):
            earliest_max_datetime = df_max_datetime
        elif df_max_datetime < earliest_max_datetime:
            earliest_max_datetime = df_max_datetime

print(f"\nlatest min datetime: {latest_min_datetime}, earliest max datetime: {earliest_max_datetime}")

In [239]:
df_merged = df_merged.filter(
    (col("datetime_minute") >= latest_min_datetime) & (col("datetime_minute") <= earliest_max_datetime)
)
df_merged.limit(5).toPandas()

# Write into Delta Table in Silver Directory

In [240]:
df_merged.write.format("delta").mode("overwrite").save(silver_path + "merged_delta")