In [0]:
dbutils.widgets.text('ServerlessJobID', 'MISSING', 'Serverless Job ID')
dbutils.widgets.text('ClassicJobID', 'MISSING', 'Classic Job ID')

ServerlessJobID = dbutils.widgets.get('ServerlessJobID')
ClassicJobID = dbutils.widgets.get('ClassicJobID')

assert ServerlessJobID != 'MISSING', "Need to provide Job ID for Serverless Job"
assert ClassicJobID != 'MISSING', "Need to provide Job ID for Classic Job"

In [0]:
job_runs = spark.sql(f"""
with latest_price as (
  select
    sku_name,
    pricing.default as list_price,
    row_number() over (partition by sku_name order by price_start_time desc) as rn
  FROM
    system.billing.list_prices
),
jobs_metadata AS (
  select
    job_id,
    `name` as job_name,
    row_number() over (partition by job_id order by change_time desc) as rn
  from
    system.lakeflow.jobs
  WHERE
    `job_id` IN ({ServerlessJobID}, {ClassicJobID})
),
job_run_metadata AS (
  select
    jrt.run_id,
    jrt.period_start_time as period_start_time,
    jrt.job_parameters,
    row_number() over (partition by jrt.run_id order by period_start_time) as rn
  from
    system.lakeflow.job_run_timeline jrt
  WHERE jrt.job_id IN ({ServerlessJobID}, {ClassicJobID})
),
latest_job_status AS (
  select
    jrt.run_id,
    jrt.period_end_time as period_end_time,
    jrt.termination_code,
    row_number() over (partition by jrt.run_id order by period_start_time DESC) as rn
  from
    system.lakeflow.job_run_timeline jrt
  WHERE jrt.job_id IN ({ServerlessJobID}, {ClassicJobID})
)
select
  job_name,
  sum(u.usage_quantity) as dbus_emitted,
  u.usage_metadata.job_id,
  u.usage_metadata.job_run_id,
  CONVERT_TIMEZONE('UTC', 'Asia/Singapore', jrm.period_start_time) as job_run_start,
  CONVERT_TIMEZONE('UTC', 'Asia/Singapore', latest_job_status.period_end_time) as job_run_end,
  jrm.job_parameters,
  latest_job_status.termination_code,
  lp.list_price,
  sum(u.usage_quantity * lp.list_price) as list_cost
from
  system.billing.usage u
    left join latest_price lp
      on u.sku_name = lp.sku_name
      and lp.rn = 1
    inner join jobs_metadata job
      on u.usage_metadata.job_id = job.job_id
      and job.rn = 1
    inner join job_run_metadata jrm
      on u.usage_metadata.job_run_id = jrm.run_id
      and jrm.rn = 1
    inner join latest_job_status
      ON latest_job_status.run_id = jrm.run_id
      and latest_job_status.rn = 1
WHERE
  `usage_date` >= '2025-08-18' 
GROUP BY
  ALL
ORDER BY
  job_run_start;
  """)

In [0]:
values = [('Standard_D4ds_v5', 0.2260),('Standard_E8ads_v5', 0.5240)]
columns = ['node_type', 'price_per_hour']
df = spark.createDataFrame(values, columns)

df.createOrReplaceTempView('node_prices')

In [0]:
query = f"""
WITH clusters as (
  SELECT
    get(regexp_extract_all(cluster_name, '\\\\d+', 0),0) as job_id,
    get(regexp_extract_all(cluster_name, '\\\\d+', 0),1) as run_id,
    cluster_id
  FROM
    system.compute.clusters
  WHERE
    cluster_name LIKE 'job-{ClassicJobID}%'
),
cluster_size_timeline AS (
  SELECT DISTINCT
    clusters.job_id,
    clusters.run_id,
    count(1) as num_nodes,
    CONVERT_TIMEZONE('UTC', 'Asia/Singapore', nodes.start_time) as start_time,
    CONVERT_TIMEZONE('UTC', 'Asia/Singapore', nodes.end_time) as end_time,
    nodes.node_type
  FROM
    system.compute.node_timeline nodes
      INNER JOIN clusters
        ON nodes.cluster_id = clusters.cluster_id
  GROUP BY
    ALL
),
cluster_cost as (SELECT
  clusters.*, date_diff(MINUTE, clusters.start_time, clusters.end_time)/60.0 as duration,
  node_prices.price_per_hour, duration * clusters.num_nodes * node_prices.price_per_hour as cluster_cost
FROM
  cluster_size_timeline clusters
    left join node_prices
      on clusters.node_type = node_prices.node_type)

SELECT job_id, run_id, min(start_time) as start_time, max(end_time) as end_time, sum(cluster_cost) as cluster_cost_dollars FROM cluster_cost
GROUP BY job_id, run_id;
"""

compute_cluster_costs = spark.sql(query)


In [0]:
spark.sql(
    """SELECT 
    job_name, 
    job_runs.job_id, 
    job_runs.job_run_id, 
    job_run_start, 
    job_run_end, 
    dbus_emitted, 
    list_cost as list_cost_dbus,
    coalesce(cluster_costs.cluster_cost_dollars,0) as list_cost_vms,
    list_cost_dbus + list_cost_vms as total_cost,
    job_runs.job_parameters
    from {benchmark_job_runs} job_runs 
    left join {compute_cluster_costs} cluster_costs
    on job_runs.job_id = cluster_costs.job_id
    and job_runs.job_run_id = cluster_costs.run_id
    WHERE termination_code = 'SUCCESS'""",
    benchmark_job_runs=job_runs,
    compute_cluster_costs=compute_cluster_costs,
).display()