#Create Hourly Meter Data
This notebook aggregates all date to an hour ending time period. This is frequently needed to align meter sample rates, align with other inputs, and reduce data volume for vizualization. This is done incrementally to reduce load.

In [0]:
%run ../Utilities/ConfigUtilities

In [0]:
# Set up the environment using a function in ConfigUtilties.
set_spark_config()

In [0]:
# Imports and debug
from pyspark.sql.functions import lit, sum, col, max, to_date
from pyspark.sql.utils import AnalysisException
from delta.tables import DeltaTable

debug = 1

In [0]:
uri = CONTAINER_URI_PATH
upstream_table_name = MDM_NO_SUBMETERS_TABLE
downstream_table_name = MDM_HOURLY_NO_SUBMETERS_TABLE
downstream_table_path = MDM_HOURLY_NO_SUBMETERS_PATH

In [0]:
# Get last change from the downstream table history (hourly no submeter data).
from delta.tables import *

downstream_table = DeltaTable.forPath(spark, downstream_table_path)
history_df = downstream_table.history()

merge_history_df = history_df.filter((col('operation')=="MERGE") | (col('operation')=="WRITE"))

if (merge_history_df.count() > 0):
    last_change = merge_history_df.select("timestamp").orderBy("timestamp", ascending=False).first()[0]
    downstream_has_data = True
else:
    downstream_has_data = False

if debug:
    display(history_df)
    print("Downstream has data:" + str(downstream_has_data))
    if downstream_has_data:
        print(last_change)

In [0]:
# Get changes from the upstream table since the last update to the downstream table.

# If the downstream  table is empty, get all the changes.
if downstream_has_data == False:
        upstream_changes_all_df = spark.read \
                .table(upstream_table_name)
# If the downstream table has data, get the upstream changes.  If there are no changes, an exception will be 
# thrown, caught, and found_changes will be set to False.
else:
        try:
            upstream_changes_all_df = spark.read \
                    .option("readChangeFeed", "true") \
                    .option("startingTimestamp", last_change) \
                    .table(upstream_table_name)
        except AnalysisException as e:
            if "DELTA_TIMESTAMP_GREATER_THAN_COMMIT" in str(e):
                print("No changes found after the last commit timestamp.")
                # No need to continue if there are no clean changes found.
                dbutils.notebook.exit("No upstream changes found.") 

if debug: 
        display(upstream_changes_all_df)
        print("Upstream changes count: " + str(upstream_changes_all_df.count()))

In [0]:
# To avoid partial aggregates, we need to get the full data for any possible aggregate.
# In this case for hourly data, get all data starting with the first day of the changes.  
# This will ensure complete hours for all data.  

# Get the earliest date.
earliest_date = upstream_changes_all_df.select(to_date("StartDateTime")).orderBy("StartDateTime", ascending=True).first()[0]

# Get data on or after the earliest date.    
upstream_revised_df = spark.read.table(upstream_table_name).filter(col("StartDateTime") >= earliest_date)

if debug:
    print("Earliest date: " + str(earliest_date))
    display(upstream_revised_df)  
    print("Revised upstream changes count: " + str(upstream_revised_df.count()))

In [0]:
# Check for duplicates on the input table.
duplicates_df = upstream_revised_df.groupBy(upstream_revised_df.columns).count().filter("count > 1")

if duplicates_df.count() > 0:
    upstream_revised_df = upstream_revised_df.dropDuplicates()
    if debug:
        print(duplicates_df.count())
        display(duplicates_df)
        display(upstream_revised_df)
else:
    print("No full duplicates found on the input data.")

In [0]:
# Get the calendar data.  We want to work with local time, so remove the UTC time information.
calendar_df = spark.read.format('parquet').load(INDEXED_CALENDAR_PATH)

# Eliminate the UTC time info.
calendar_df = calendar_df.select('MeterSampleIndex', 'LocalTimeStamp', 'LocalYear', 'LocalMonth', 'LocalDay', 'LocalHour', 'LocalMinute')



In [0]:
# Join with the new data.  Use the start sample as it's easier to calculate an hour ending 
# (calendar goes from 0->23; we want 1->24).  
upstream_data_dates_df = upstream_revised_df.join(calendar_df, 
                                              upstream_revised_df.StartMeterSampleIndex==calendar_df.MeterSampleIndex, how='inner')

if debug:
    display(upstream_data_dates_df)

In [0]:
# Create an HourEnding column.  Since the join was on the start index for all time periods, we can just add an hour.
upstream_data_dates_df = upstream_data_dates_df.withColumn('HourEnding', col('LocalHour')+1)

if debug:
    display(upstream_data_dates_df)

In [0]:
# Aggregate to hourly data.
new_data_hourly_df = upstream_data_dates_df.groupBy('MeterNumber', 'UnitOfMeasure', 'FlowDirection', 'Channel', 'LocalYear', 
                                               'LocalMonth', 'LocalDay', 'HourEnding').agg(
                                                    sum("AMIValue").alias("HourlyAMIValue"),
                                                    sum("VEEValue").alias("HourlyVEEValue"),
                                                    max("EndMeterSampleIndex").alias("EndMeterSampleIndex"))

if debug:
    display(new_data_hourly_df)

In [0]:
# Upsert the changes to the downstream table
if downstream_has_data:
    # Convert the DataFrame to a DeltaTable
    downstream_table = DeltaTable.forName(spark, downstream_table_name)

    # Do an upsert of the changes.
    downstream_table.alias('down') \
        .merge(new_data_hourly_df.alias('up'), 
            'up.MeterNumber = down.MeterNumber AND up.UnitOfMeasure = down.UnitOfMeasure AND up.FlowDirection = down.FlowDirection AND up.Channel = down.Channel AND up.LocalYear = down.LocalYear AND up.LocalMonth = down.LocalMonth AND up.LocalDay = down.LocalDay AND up.HourEnding = down.HourEnding') \
            .whenMatchedUpdateAll() \
            .whenNotMatchedInsertAll() \
            .execute()
# Else just insert the new data (downstream table is empty)
else:  
    new_data_hourly_df.write.format("delta") \
            .mode("overwrite") \
            .option("mergeSchema", "True") \
            .save(downstream_table_path)

In [0]:
# Clean up the delta history.
spark.sql(f"VACUUM '{MDM_HOURLY_NO_SUBMETERS_PATH}'")