## Initial Ingestion

This notebook uses autoloader to identify what data AWS has updated from your AWS Cost and Usage Reports 2.0 (Data Exports).

AWS refreshes the data multiple times a day rewriting the data files for months that had a change; however, historical data older than a few months usually doesn't get updated.

Should you require additional historical data, you can raise an AWS support ticket and request a data backfill

In [0]:
from pyspark.sql import functions as F
import json

In [0]:
# Databricks notebook widgets in Python
dbutils.widgets.text("source_location_metadata", "")
dbutils.widgets.text("target_catalog_name", "")
dbutils.widgets.text("target_schema_name", "")
dbutils.widgets.text("tracker_table_name", "")
dbutils.widgets.text("checkpoint_location", "")
dbutils.widgets.text("schema_location", "")
dbutils.widgets.text("include_existing", "")
dbutils.widgets.text("volume_name", "")

source_location_metadata = dbutils.widgets.get("source_location_metadata")
target_catalog_name = dbutils.widgets.get("target_catalog_name")
target_schema_name = dbutils.widgets.get("target_schema_name")
tracker_table_name = dbutils.widgets.get("tracker_table_name")
checkpoint_location = dbutils.widgets.get("checkpoint_location")
schema_location = dbutils.widgets.get("schema_location")
include_existing = dbutils.widgets.get("include_existing").lower() == "true"
volume_name = dbutils.widgets.get("volume_name")

In [0]:
#Creating catalog and schema
spark.sql(f"CREATE CATALOG IF NOT EXISTS {target_catalog_name}")
spark.sql(f"CREATE SCHEMA IF NOT EXISTS {target_catalog_name}.{target_schema_name}")
spark.sql(f"CREATE VOLUME IF NOT EXISTS {target_catalog_name}.{target_schema_name}.{volume_name}")

### Tracking table
Table to keep track of the months that have been submitted or processed

In [0]:
#Creating tracker table
tracker_tbl = f'{target_catalog_name}.{target_schema_name}.{tracker_table_name}'
spark.sql(f"""
    CREATE TABLE IF NOT EXISTS {tracker_tbl}(
    billing_period     STRING,
    manifest_path      STRING,
    manifest_mod_time  TIMESTAMP,
    manifest_size      BIGINT,
    status             STRING,  -- PENDING | SUBMITTED | SUCCEEDED | FAILED
    submitted_at       TIMESTAMP,
    completed_at       TIMESTAMP,
    last_error         STRING,
    last_update        TIMESTAMP
)
CLUSTER BY AUTO
""")

In [0]:
#Regex to get the months to process
billing_regex = r"BILLING_PERIOD=(\d{4}-\d{2})"

#Reading the metadata files
src = (spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "json")
    .option("cloudFiles.schemaLocation", schema_location)
    .option("cloudFiles.allowOverwrites", "true")
    .option("cloudFiles.includeExistingFiles", "true" if include_existing else "false")
    .load(source_location_metadata)
    # Use Auto Loader’s metadata columns for change detection
    .select(
           F.col("_metadata.file_path").alias("manifest_path"),
           F.regexp_extract(F.col("_metadata.file_path"), billing_regex, 1).alias("billing_period"),
           F.col("_metadata.file_modification_time").alias("manifest_mod_time"),
           F.col("_metadata.file_size").alias("manifest_size")
    )
    .filter("billing_period != ''")
)

In [0]:
#Updates the tracking table with the data to be processed
def upsert_tracker(batch_df, _):
    if batch_df.isEmpty(): return
    batch_df.createOrReplaceTempView("new_manifests")
    # Newer mod_time → PENDING; brand-new → PENDING; older duplicate → ignore
    spark.sql(f"""
      MERGE INTO {tracker_tbl} t
      USING (
        SELECT billing_period, manifest_path, manifest_mod_time, manifest_size
        FROM new_manifests
        QUALIFY ROW_NUMBER() OVER (PARTITION BY billing_period ORDER BY manifest_mod_time DESC) = 1
      ) s
      ON t.billing_period = s.billing_period
      WHEN MATCHED AND s.manifest_mod_time > t.manifest_mod_time THEN
        UPDATE SET
          t.manifest_path     = s.manifest_path,
          t.manifest_mod_time = s.manifest_mod_time,
          t.manifest_size     = s.manifest_size,
          t.status            = 'PENDING',
          t.submitted_at      = NULL,
          t.completed_at      = NULL,
          t.last_error        = NULL,
          t.last_update       = current_timestamp()
      WHEN NOT MATCHED THEN
        INSERT (billing_period, manifest_path, manifest_mod_time, manifest_size, status, last_update)
        VALUES (s.billing_period, s.manifest_path, s.manifest_mod_time, s.manifest_size, 'PENDING', current_timestamp())
    """)

In [0]:
q = (src.writeStream
        .option("checkpointLocation", checkpoint_location)
        .foreachBatch(upsert_tracker)
        .trigger(availableNow=True)  # processes only new files since last checkpoint, then exits
        .start())
q.awaitTermination()

Once data has been ingested in the tracking table, we create the set of values to be processed by the next step of the job

In [0]:

# Pull PENDING months, mark SUBMITTED (so concurrent discover runs won’t double enqueue),
# and hand them to For Each.
pending = [r["billing_period"] for r in spark.table(tracker_tbl)
           .filter("status IN ('PENDING','FAILED')").select("billing_period").distinct().collect()]

if pending:
    bp_list_sql = ",".join([f"'{bp}'" for bp in pending])
    spark.sql(f"""
      UPDATE {tracker_tbl}
      SET status='SUBMITTED', submitted_at=current_timestamp(), last_update=current_timestamp()
      WHERE billing_period IN ({bp_list_sql}) AND status='PENDING'
    """)

In [0]:
dbutils.jobs.taskValues.set(key="months_json", value=json.dumps(pending))
print(f"Submitting months: {pending}")