# Demo of water utility billing


A note on clocks. The system deals with 3 groups of clocks, which are allowed to drift by at-most 5 minutes of each other:
- meter clocks (one clock per device)
- ingestion clocks (one clock per ingestion machine)
- job-trigger clock (single clock)

Meter and ingestion clocks give wall-clock time. Trigger clock also typically gives wall-clock time, except during backfills and reprocessing where it may be days in the past.

In [None]:
# Notebook Parameters - These can be overridden when running programmatically

# Tumbling window parameters for batch processing
# WINDOW_END_TIME must be at least 5 minutes less than wall-clock time to avoid clock-drift issues
# between the orchestrator and data sources. This ensures late-arriving data within the drift
# tolerance is not accidentally excluded.
WINDOW_START_TIME = '2026-01-15T08:00:00'  # ISO format: YYYY-MM-DDTHH:MM:SS
WINDOW_END_TIME = '2026-01-15T14:00:00'    # ISO format: YYYY-MM-DDTHH:MM:SS

INPUT_PATH = 'data/input/raw'
HOURLY_OUTPUT_PATH = 'data/output/hourly_usage'
CUMULATIVE_OUTPUT_PATH = 'data/output/cumulative_usage'
DEDUPLICATED_RAW_PATH = 'data/output/deduplicated_raw'
VERBOSE = True  # Set VERBOSE to False to skip display-only cells (useful for testing)

spark = None  # Set to an existing SparkSession to use it instead of creating a new one

In [None]:
# Initialize Spark session if not provided externally
# When running tests, spark session is passed via the spark parameter
if spark is None:
    from spark_utils import create_spark_session
    spark = create_spark_session(app_name='WaterBilling', log_level='WARN')

## Handle raw-data duplicates and out-of-order records

In [None]:
import os
from datetime import datetime, timedelta

from pyspark.sql import functions as F
from pyspark.sql.window import Window
from delta.tables import DeltaTable

from schemas import USAGE_INPUT_SCHEMA

In [None]:
# Read input data as batch source for the tumbling window
# Data is stored in Hive-style partitioned directories: INPUT_PATH/ingestion_hour=yyyy-MM-dd-HH/
# Spark partition discovery automatically adds ingestion_hour as a column
# WINDOW_START_TIME is inclusive, WINDOW_END_TIME is exclusive for partition selection

# Parse window parameters
window_start = datetime.fromisoformat(WINDOW_START_TIME)
window_end = datetime.fromisoformat(WINDOW_END_TIME)

# Format window boundaries for partition filtering (matches Hive partition format)
window_start_partition = window_start.strftime('%Y-%m-%d-%H')
window_end_partition = window_end.strftime('%Y-%m-%d-%H')

# Read with partition discovery - Spark automatically discovers ingestion_hour partitions
# and adds ingestion_hour as a string column to the DataFrame
if os.path.isdir(INPUT_PATH):
    raw_batch = (
        spark.read
        .format('json')
        .schema(USAGE_INPUT_SCHEMA)
        .option('basePath', INPUT_PATH)
        .load(f'{INPUT_PATH}/ingestion_hour=*')
        # Filter partitions: start is inclusive, end is exclusive
        .filter(
            (F.col('ingestion_hour') >= window_start_partition) &
            (F.col('ingestion_hour') < window_end_partition)
        )
    )
else:
    # No data directory - create empty DataFrame with schema plus partition column
    raw_batch = (
        spark.createDataFrame([], USAGE_INPUT_SCHEMA)
        .withColumn('ingestion_hour', F.lit(None).cast('string'))
    )

In [None]:
# Filter records based on watermark logic (similar to streaming watermark)
# Watermark is computed relative to the partition's ingestion_hour, not window_end
# This allows each partition to have its own 7-day late data tolerance
#
# Clock drift tolerance: Allow recording_time to be up to 5 minutes beyond the partition
# end time. This handles cases where the meter clock is slightly ahead of the ingestion system.

# Convert ingestion_hour string partition column to timestamp for filtering
raw_with_ingestion = (
    raw_batch
    .withColumn('ingestion_time', 
        F.to_timestamp(F.col('ingestion_hour'), 'yyyy-MM-dd-HH'))
)

# Compute watermark cutoff: 7 days before the partition's ingestion hour
# Compute clock drift allowance: 5 minutes after the partition's hour ends
filtered_batch = (
    raw_with_ingestion
    .filter(
        # Exclude records older than 7 days relative to their ingestion partition
        (F.col('recording_time') >= F.col('ingestion_time') - F.expr('INTERVAL 7 DAYS')) &
        # Allow recording_time up to 5 minutes after partition hour ends (clock drift tolerance)
        (F.col('recording_time') <= F.col('ingestion_time') + F.expr('INTERVAL 1 HOUR') + F.expr('INTERVAL 5 MINUTES'))
    )
    .drop('ingestion_hour', 'ingestion_time')
)

In [None]:
# Add partition column for the deduplicated raw table
# Partitioning by recording hour improves merge performance and data organization
filtered_with_partition = (
    filtered_batch
    .withColumn('recording_hour', F.date_format(F.date_trunc('hour', F.col('recording_time')), 'yyyy-MM-dd-HH'))
)

# Track min/max recording_hour for efficient partition pruning when reading deduplicated data
# This avoids reading all historical partitions during hourly aggregation
recording_hour_range = filtered_with_partition.agg(
    F.min('recording_hour').alias('min_hour'),
    F.max('recording_hour').alias('max_hour')
).collect()[0]
min_recording_hour = recording_hour_range['min_hour']
max_recording_hour = recording_hour_range['max_hour']
print(f'Recording hour range: {min_recording_hour} to {max_recording_hour}')

In [None]:
# Merge current window's records into intermediate Delta table
# Deduplication is handled in two stages:
# 1. dropDuplicates() removes duplicates within the current batch
# 2. Delta merge with whenNotMatchedInsertAll() prevents cross-batch duplicates
# Table is partitioned by recording_hour for efficient querying and merges.

# Deduplicate within the current batch first (required by Delta merge)
deduplicated_df = filtered_with_partition.dropDuplicates(['record_id'])

if DeltaTable.isDeltaTable(spark, DEDUPLICATED_RAW_PATH):
    delta_table = DeltaTable.forPath(spark, DEDUPLICATED_RAW_PATH)
    
    # Merge: insert only if record_id doesn't exist (idempotent deduplication)
    # Include recording_hour in condition to enable partition pruning
    delta_table.alias('target').merge(
        deduplicated_df.alias('source'),
        'target.recording_hour = source.recording_hour AND target.record_id = source.record_id'
    ).whenNotMatchedInsertAll().execute()
    
    print('Merged records into deduplicated raw table')
else:
    # First write - create the table partitioned by recording_hour
    (deduplicated_df.write
        .format('delta')
        .partitionBy('recording_hour')
        .mode('overwrite')
        .save(DEDUPLICATED_RAW_PATH))
    print(f'Created deduplicated raw table with {deduplicated_df.count()} records')

## Compute hourly usage

In [None]:
# Read only the affected partitions from deduplicated data for downstream processing
# Uses min/max recording_hour tracked earlier for efficient partition pruning
if min_recording_hour is not None:
    all_deduplicated = (
        spark.read.format('delta').load(DEDUPLICATED_RAW_PATH)
        .filter(
            (F.col('recording_hour') >= min_recording_hour) &
            (F.col('recording_hour') <= max_recording_hour)
        )
    )
else:
    # No data in current window - create empty DataFrame
    all_deduplicated = spark.read.format('delta').load(DEDUPLICATED_RAW_PATH).limit(0)

In [None]:
# Aggregate usage by customer and hour
# Keep the partition column for writing
hourly_aggregated = (
    all_deduplicated
    .groupBy('customer_id', 'recording_hour')
    .agg(F.sum('usage_gallons').alias('total_usage_gallons'))
)

In [None]:
# Write hourly aggregated data using partition overwrite
# Since we recompute complete aggregations for [min_hour, max_hour], we can use replaceWhere
# to atomically replace those partitions. This is simpler and more efficient than merge.
(hourly_aggregated.write
    .format('delta')
    .mode('overwrite')
    .option('replaceWhere', f"recording_hour >= '{min_recording_hour}' AND recording_hour <= '{max_recording_hour}'")
    .save(HOURLY_OUTPUT_PATH))

print(f'Replaced hourly partitions from {min_recording_hour} to {max_recording_hour}')

## Compute cumulative monthly usage

In [None]:
# Determine the upper bound for cumulative recomputation
# Use the greater of: max_recording_hour from current batch, or max existing in cumulative table
if DeltaTable.isDeltaTable(spark, CUMULATIVE_OUTPUT_PATH):
    existing_max = (
        spark.read.format('delta').load(CUMULATIVE_OUTPUT_PATH)
        .agg(F.max('recording_hour'))
        .collect()[0][0]
    )
    if existing_max and max_recording_hour:
        cumulative_max_hour = max(max_recording_hour, existing_max)
    else:
        cumulative_max_hour = max_recording_hour or existing_max
else:
    cumulative_max_hour = max_recording_hour

print(f'Cumulative recomputation range: {min_recording_hour} to {cumulative_max_hour}')

In [None]:
# Read hourly data with partition pruning - only load partitions that may need recomputation
# This includes min_recording_hour to cumulative_max_hour range
if min_recording_hour is not None and cumulative_max_hour is not None:
    hourly_usage_df = (
        spark.read.format('delta').load(HOURLY_OUTPUT_PATH)
        .filter(
            (F.col('recording_hour') >= min_recording_hour) &
            (F.col('recording_hour') <= cumulative_max_hour)
        )
    )
else:
    hourly_usage_df = spark.read.format('delta').load(HOURLY_OUTPUT_PATH).limit(0)

if VERBOSE:
    hourly_usage_df.orderBy('customer_id', 'recording_hour').show(10, truncate=False)

+-------------+-------------------+-------------------+
|customer_id  |usage_hour         |total_usage_gallons|
+-------------+-------------------+-------------------+
|customer_0000|2026-01-15 08:00:00|63.02999999999999  |
|customer_0000|2026-01-15 09:00:00|59.129999999999995 |
|customer_0000|2026-01-15 10:00:00|54.440000000000005 |
|customer_0000|2026-01-15 11:00:00|60.93000000000001  |
|customer_0001|2026-01-15 08:00:00|61.61              |
|customer_0001|2026-01-15 09:00:00|61.40999999999999  |
|customer_0001|2026-01-15 10:00:00|64.83999999999999  |
|customer_0001|2026-01-15 11:00:00|65.60000000000001  |
|customer_0002|2026-01-15 08:00:00|54.68000000000001  |
|customer_0002|2026-01-15 09:00:00|80.36999999999999  |
+-------------+-------------------+-------------------+
only showing top 10 rows


In [None]:
# Compute cumulative monthly usage
# Recomputes cumulative totals for all hours from min_recording_hour to cumulative_max_hour
# Cumulative sum resets at the start of each month (partitioned by customer and month)

if min_recording_hour is None:
    print('No data to process for cumulative computation')
    cumulative_usage_df = spark.createDataFrame([], 'customer_id STRING, recording_hour STRING, cumulative_usage_gallons DOUBLE')
else:
    # Get affected months based on the recording_hour range being processed
    affected_months = (
        hourly_usage_df
        .withColumn('recording_time_ts', F.to_timestamp(F.col('recording_hour'), 'yyyy-MM-dd-HH'))
        .select(F.date_trunc('month', F.col('recording_time_ts')).alias('month'))
        .distinct()
        .collect()
    )
    affected_month_values = [row['month'] for row in affected_months]
    print(f'Affected months: {affected_month_values}')
    
    # For correct cumulative computation, we need all hourly data from the START of each affected month
    # This ensures running totals are computed correctly even if min_recording_hour is mid-month
    # Read full months for affected periods
    hourly_full_months = (
        spark.read.format('delta').load(HOURLY_OUTPUT_PATH)
        .withColumn('recording_time_ts', F.to_timestamp(F.col('recording_hour'), 'yyyy-MM-dd-HH'))
        .filter(F.date_trunc('month', F.col('recording_time_ts')).isin(affected_month_values))
        # Only include up to cumulative_max_hour
        .filter(F.col('recording_hour') <= cumulative_max_hour)
        .withColumn('usage_month', F.date_trunc('month', F.col('recording_time_ts')))
    )
    
    # Define window for cumulative sum: partition by customer and month, order by hour
    # This ensures cumulative sum resets at month boundaries
    cumulative_window = (
        Window
        .partitionBy('customer_id', 'usage_month')
        .orderBy('recording_hour')
        .rowsBetween(Window.unboundedPreceding, Window.currentRow)
    )
    
    # Compute cumulative usage for affected months
    cumulative_all = (
        hourly_full_months
        .withColumn('cumulative_usage_gallons', F.sum('total_usage_gallons').over(cumulative_window))
        .select('customer_id', 'recording_hour', 'cumulative_usage_gallons')
    )
    
    # Only keep records >= min_recording_hour (these are the ones that need updating)
    # Earlier records in the month are needed for window computation but don't need updating
    cumulative_usage_df = cumulative_all.filter(F.col('recording_hour') >= min_recording_hour)
    
    print(f'Cumulative records to update: {cumulative_usage_df.count()}')

No batch data tracked, using watermark fallback: 2026-01-19 01:00:00
Cumulative update window starts at: 2026-01-19 01:00:00
Affected months: []
Cumulative records to update: 0


In [None]:
# Write cumulative data using partition overwrite
# Since we recompute complete cumulative values for hours >= min_recording_hour,
# we can use replaceWhere to atomically replace those partitions.
(cumulative_usage_df.write
    .format('delta')
    .mode('overwrite')
    .option('replaceWhere', f"recording_hour >= '{min_recording_hour}'")
    .save(CUMULATIVE_OUTPUT_PATH))

print(f'Replaced cumulative partitions from {min_recording_hour} onwards')

No cumulative records to update


In [None]:
# Read and display cumulative usage
cumulative_df = spark.read.format('delta').load(CUMULATIVE_OUTPUT_PATH)
if VERBOSE:
    cumulative_df.orderBy('customer_id', 'recording_hour').show(10, truncate=False)

+-------------+-------------------+--------------------+------------------------+
|customer_id  |usage_hour         |usage_hour_partition|cumulative_usage_gallons|
+-------------+-------------------+--------------------+------------------------+
|customer_0000|2026-01-15 08:00:00|2026-01-15-08       |63.02999999999999       |
|customer_0000|2026-01-15 09:00:00|2026-01-15-09       |122.15999999999998      |
|customer_0000|2026-01-15 10:00:00|2026-01-15-10       |176.6                   |
|customer_0000|2026-01-15 11:00:00|2026-01-15-11       |237.53                  |
|customer_0000|2026-01-15 12:00:00|2026-01-15-12       |303.51                  |
|customer_0000|2026-01-15 13:00:00|2026-01-15-13       |363.52                  |
|customer_0001|2026-01-15 08:00:00|2026-01-15-08       |61.61                   |
|customer_0001|2026-01-15 09:00:00|2026-01-15-09       |123.01999999999998      |
|customer_0001|2026-01-15 10:00:00|2026-01-15-10       |187.85999999999996      |
|customer_0001|2

In [None]:
# Summary statistics (only when VERBOSE is enabled)
if VERBOSE:
    print('=== Hourly Usage Summary ===')
    hourly_usage_df.describe('total_usage_gallons').show()

    print('=== Cumulative Usage Summary ===')
    cumulative_df.describe('cumulative_usage_gallons').show()

    print(f'Total hourly records: {hourly_usage_df.count()}')
    print(f'Total cumulative records: {cumulative_df.count()}')
    print(f"Unique customers: {hourly_usage_df.select('customer_id').distinct().count()}")

=== Hourly Usage Summary ===
+-------+-------------------+
|summary|total_usage_gallons|
+-------+-------------------+
|  count|                 20|
|   mean| 63.708499999999994|
| stddev|  6.900069622074381|
|    min|  51.28999999999999|
|    max|  80.36999999999999|
+-------+-------------------+

=== Cumulative Usage Summary ===
+-------+------------------------+
|summary|cumulative_usage_gallons|
+-------+------------------------+
|  count|                      20|
|   mean|      157.79900000000004|
| stddev|       75.20760256222702|
|    min|       51.28999999999999|
|    max|                  270.73|
+-------+------------------------+

Total hourly records: 20
Total cumulative records: 20
Unique customers: 5


In [None]:
# Verify Delta tables are partitioned by usage_hour (only when VERBOSE is enabled)
if VERBOSE:
    import subprocess

    print('=== Hourly Usage Delta Table Structure ===')
    result = subprocess.run(['ls', '-la', 'data/output/hourly_usage/'], capture_output=True, text=True)
    print(result.stdout)

    print('\n=== Cumulative Usage Delta Table Structure ===')
    result = subprocess.run(['ls', '-la', 'data/output/cumulative_usage/'], capture_output=True, text=True)
    print(result.stdout)

=== Hourly Usage Delta Table Structure ===
total 0
drwxr-xr-x 1 root root 4096 Jan 25 20:24 .
drwxr-xr-x 1 root root 4096 Jan 25 20:25 ..
drwxr-xr-x 1 root root 4096 Jan 25 20:24 _delta_log
drwxr-xr-x 1 root root 4096 Jan 25 20:24 usage_hour_partition=2026-01-15-08
drwxr-xr-x 1 root root 4096 Jan 25 20:24 usage_hour_partition=2026-01-15-09
drwxr-xr-x 1 root root 4096 Jan 25 20:24 usage_hour_partition=2026-01-15-10
drwxr-xr-x 1 root root 4096 Jan 25 20:24 usage_hour_partition=2026-01-15-11


=== Cumulative Usage Delta Table Structure ===
total 0
drwxr-xr-x 1 root root 4096 Jan 25 20:25 .
drwxr-xr-x 1 root root 4096 Jan 25 20:25 ..
drwxr-xr-x 1 root root 4096 Jan 25 20:25 _delta_log
drwxr-xr-x 1 root root 4096 Jan 25 20:25 usage_hour_partition=2026-01-15-08
drwxr-xr-x 1 root root 4096 Jan 25 20:25 usage_hour_partition=2026-01-15-09
drwxr-xr-x 1 root root 4096 Jan 25 20:25 usage_hour_partition=2026-01-15-10
drwxr-xr-x 1 root root 4096 Jan 25 20:25 usage_hour_partition=2026-01-15-11

