In [0]:
dbutils.widgets.text("job_name_contains","")
dbutils.widgets.dropdown("pause_schedule","pause",["pause","resume"])
dbutils.widgets.dropdown("cancel_all_runs","N",["Y","N"])
dbutils.widgets.dropdown("cancel_queued_runs","N",["Y","N"])
dbutils.widgets.dropdown("dry_run","Y",["Y","N"])

In [0]:
from databricks.sdk import WorkspaceClient
from databricks.sdk.service import jobs
from databricks.sdk.service.jobs import JobSettings
import logging
from typing import List, Optional

In [0]:
def _str_null_blanks(inp_str):
  if not inp_str or str(inp_str).strip() == '':
    return None
  else:
    return str(inp_str).strip()

def _get_bool(inp_str, default=False):
  inp_str = _str_null_blanks(inp_str)
  if not inp_str:
    return default
  inp_str = inp_str.upper()
  if inp_str in ('Y', 'YES', 'T', 'TRUE'):
    return True
  elif inp_str in ('N', 'NO', 'F', 'FALSE'):
    return False

In [0]:
job_name_contains = _str_null_blanks(dbutils.widgets.get("job_name_contains"))
pause_schedule = _str_null_blanks(dbutils.widgets.get("pause_schedule"))
cancel_all_runs = _get_bool(dbutils.widgets.get("cancel_all_runs"))
cancel_queued_runs = _get_bool(dbutils.widgets.get("cancel_queued_runs"))
dry_run = _get_bool(dbutils.widgets.get("dry_run"))
w = WorkspaceClient()
print("job_name_contains:", job_name_contains)
print("pause_schedule:", pause_schedule)
print("cancel_all_runs:", cancel_all_runs)
print("cancel_queued_runs:", cancel_queued_runs)
print("dry_run:", dry_run)

job_name_contains: None
pause_schedule: resume
cancel_all_runs: True
cancel_queued_runs: True
dry_run: False


In [0]:
def cancel_runs(job_id,cancel_all_runs:bool=True, cancel_queued_runs:bool=True, dry_run=False):
  l_runs = list(w.jobs.list_runs(job_id=job_id, active_only=True))
  if not l_runs:
    print(f"{job_id}:No active runs found")
    return
  if cancel_all_runs:
    pass # no change
  elif cancel_queued_runs:
    l_runs = [x for x in l_runs if x.state.life_cycle_state in (jobs.RunLifeCycleState.QUEUED,jobs.RunLifeCycleState.PENDING)]
    if not l_runs:
      print(f"{job_id}:No queued runs found")
      return
  # end of if
  for x in l_runs:
    if not dry_run:
      w.jobs.cancel_run(run_id=x.run_id)
    print(f"{job_id}: cancelled run {x.run_id}, state: {x.state.life_cycle_state}")
  # end of for
    
def process(l_jobs, pause_schedule, cancel_all_runs, cancel_queued_runs, dry_run):
  required_status = jobs.PauseStatus.PAUSED if pause_schedule == 'pause' else jobs.PauseStatus.UNPAUSED
  for x in l_jobs:
    schedule = x.settings.schedule
    if schedule:
      job_pause_status = schedule.pause_status
      if job_pause_status != required_status:
        if not dry_run:
          x.settings.schedule.pause_status = required_status
          res = w.jobs.update(job_id=x.job_id, new_settings=JobSettings(schedule=x.settings.schedule))
        print(f"is_dry_run:{dry_run}::{x.settings.name}:{x.job_id}: updated pause status to {x.settings.schedule.pause_status}")
    if cancel_queued_runs or cancel_all_runs:
      print(f"{x.settings.name}::{x.job_id}: checking runs...")
      cancel_runs(x.job_id, cancel_all_runs,cancel_queued_runs, dry_run)
  # end of for

In [0]:
l_jobs = [x for x in w.jobs.list()]
if job_name_contains:
  l_jobs = [x for x in l_jobs if job_name_contains in x.settings.name]


In [0]:
print(f"{len(l_jobs)} jobs found")
process(l_jobs, pause_schedule, cancel_all_runs, cancel_queued_runs, dry_run)

5 found
abhishek_test_job1::855441139647785: checking runs...
855441139647785:No active runs found
abhishek_test_job134::985744522215864: checking runs...
985744522215864:No active runs found
is_dry_run:False::abhishek_test_job2:756216877515660: updated pause status to PauseStatus.PAUSED
abhishek_test_job2::756216877515660: checking runs...
756216877515660:No active runs found
abhishek_test_job123::842149557860124: checking runs...
842149557860124:No active runs found
dbdemos_lakehouse_hls_patient_init_abhishek_saxena::476811623256531: checking runs...
476811623256531:No active runs found
