In [None]:
# Import python packages
import streamlit as st
import pandas as pd
from croniter import croniter
from datetime import datetime, timedelta
import pytz
import matplotlib.pyplot as plt
import seaborn as sns
import plotly.express as px
import nbformat

# We can also use Snowpark for our analyses!
from snowflake.snowpark.context import get_active_session
session = get_active_session()

In [None]:
show tasks in account;
set QUERY_ID_TASKS = last_query_id();

select * FROM TABLE(RESULT_SCAN($QUERY_ID_TASKS));

In [None]:
-- get the high level details and normalize the target_completion_interval

with high_level_details as (
   SELECT 
          "name" as task_name, 
          "database_name" || '.' || "schema_name" || '.' || "name" AS fq_task_name, 
          "owner" as owner, "warehouse" as warehouse, 
          "schedule" as schedule, 
          "predecessors" as predecessors, "definition" as definition, 
          "task_relations" as task_relations, 
          "scheduling_mode" as scheduling_mode,
          "target_completion_interval" as target_completion_interval,
     IFF(warehouse IS NULL, 'SERVERLESS', 'USER_MANAGED') AS task_type
    FROM TABLE(RESULT_SCAN($QUERY_ID_TASKS))
),
target_interval as (
SELECT
    -- task_name,
    fq_task_name,
    CASE
        WHEN UPPER(REGEXP_SUBSTR(target_completion_interval, '\\s*(\\d+)\\s*[A-Z]+')) IS NOT NULL THEN
            CASE 
                WHEN REGEXP_LIKE(UPPER(target_completion_interval), '\\d+\\s*(MINUTE|M)') THEN 
                    TO_NUMBER(REGEXP_SUBSTR(target_completion_interval, '\\d+'))
                WHEN REGEXP_LIKE(UPPER(target_completion_interval), '\\d+\\s*(HOUR|H)') THEN 
                    TO_NUMBER(REGEXP_SUBSTR(target_completion_interval, '\\d+')) * 60
                WHEN REGEXP_LIKE(UPPER(target_completion_interval), '\\d+\\s*(SECOND|S)') THEN 
                    TO_NUMBER(REGEXP_SUBSTR(target_completion_interval, '\\d+')) / 60
                ELSE NULL
            END
        ELSE NULL
    END::integer AS target_completion_interval_mins
 FROM high_level_details
 )
 select 
 -- *
    hld.task_name, 
    hld.fq_task_name, 
    hld.owner, 
    hld.warehouse, 
    hld.schedule, 
    -- hld.scheduling_mode,
    hld.definition, 
    t.target_completion_interval_mins, 
    hld.task_relations, 
  CASE
    WHEN NOT WAREHOUSE IS NULL THEN 'USER_MANAGED'
    WHEN SCHEDULING_MODE ILIKE '%FLEXIBLE%' THEN 'FLEXIBLE'
    ELSE 'SERVERLESS'
  END AS TASK_TYPE
   from target_interval t
   join high_level_details hld 
     on t.fq_task_name = hld.fq_task_name;

SET QUERY_ID_CORE_INFORMATION = last_query_id();

SELECT * 
  FROM TABLE(RESULT_SCAN($QUERY_ID_CORE_INFORMATION));

In [None]:
-- Get hierarchy
WITH show_tasks AS (
   SELECT 
     "database_name" AS database_name, 
     "schema_name" AS schema_name, 
     "name" AS task_name, 
     database_name || '.' || schema_name || '.' || "name" AS fq_task_name, 
     "id" AS task_id, 
     "predecessors" AS predecessors,
     "warehouse" AS warehouse,
     IFNULL("warehouse", 'SERVERLESS') AS task_type
   FROM TABLE(RESULT_SCAN($QUERY_ID_TASKS))
),

-- Flatten predecessors (including tasks without predecessors)
tasks AS (
    SELECT
        st.task_name,
        st.fq_task_name,
        f.value::string AS predecessor_task,
        st.warehouse,
        st.task_type
    FROM show_tasks st,
         LATERAL FLATTEN(input => TRY_PARSE_JSON(st.predecessors::variant)) f

    UNION ALL

    SELECT
        st.task_name,
        st.fq_task_name,
        NULL AS predecessor_task,
        st.warehouse,
        st.task_type        
    FROM show_tasks st
    WHERE ARRAY_SIZE(TRY_PARSE_JSON(st.predecessors::variant)) = 0
),

-- Recursive CTE to find root for each task
recursive_roots (
    task_name, 
    fq_task_name, 
    predecessor_task, 
    root_task, 
    root_fq_task, 
    warehouse,
    task_type
) AS (

    -- Base case: task is its own root
    SELECT
        task_name,
        fq_task_name,
        predecessor_task,
        task_name AS root_task,
        fq_task_name AS root_fq_task,
        warehouse,
        task_type
    FROM tasks
    WHERE predecessor_task IS NULL

    UNION ALL

    -- Recursive step: find root through predecessor
    SELECT
        t.task_name,
        t.fq_task_name,
        t.predecessor_task,
        r.root_task,
        r.root_fq_task,
        t.warehouse,
        t.task_type
    FROM tasks t
    JOIN recursive_roots r
      ON t.predecessor_task = r.fq_task_name
)

-- Final output with root info
SELECT 
    -- task_name,
    fq_task_name,
    -- predecessor_task,
    -- root_task,
    root_fq_task,
    -- warehouse,
    task_type
FROM recursive_roots
GROUP BY ALL
ORDER BY fq_task_name;

SET QUERY_ID_HIERARCHY = last_query_id();

select * from table(result_scan($QUERY_ID_HIERARCHY));

In [None]:
select * from snowflake.account_usage.task_history where query_start_time > dateadd(d, -30, current_timestamp);
SET QUERY_ID_TASK_HISTORY = last_query_id();

-- get execution TIMES from task_history for all tasks
SELECT name task_name, 
        database_name || '.' || schema_name || '.' || name fq_task_name, 
        sum(TIMESTAMPDIFF(s, scheduled_time, completed_time)) as total_execution_seconds,
        avg(TIMESTAMPDIFF(s, scheduled_time, completed_time)) as avg_execution_seconds,   
        count(*) runs
FROM 
TABLE(RESULT_SCAN($QUERY_ID_TASK_HISTORY))
where 1=1
  -- AND database_name = 'SERVERLESS_TASKS'
  -- and scheduled_time > dateadd(d, -30, current_timestamp())
group by all;

SET QUERY_ID_RUN_TIMES = last_query_id();

In [None]:
WITH CORE_INFORMATION AS (
    SELECT * FROM TABLE(RESULT_SCAN($QUERY_ID_CORE_INFORMATION))
),
HIERARCHY AS (
    SELECT * FROM TABLE(RESULT_SCAN($QUERY_ID_HIERARCHY))
),
RUN_TIMES AS (
    SELECT * FROM TABLE(RESULT_SCAN($QUERY_ID_RUN_TIMES))
)
SELECT CI.TASK_NAME, 
       CI.FQ_TASK_NAME, 
       OWNER, 
       WAREHOUSE, 
       DEFINITION, 
       SCHEDULE, 
       SCHEDULING_MODE,
       TARGET_COMPLETION_INTERVAL_MINS, 
       CI.TASK_TYPE, 
       TOTAL_EXECUTION_SECONDS, 
       AVG_EXECUTION_SECONDS, 
       RUNS
  FROM CORE_INFORMATION ci
  JOIN HIERARCHY h
    ON ci.fq_task_name = h.fq_task_name
  left JOIN RUN_TIMES rt
    ON rt.fq_task_name = ci.fq_task_name

In [None]:
-- select * from snowflake.account_usage.task_history where query_start_time > dateadd(d, -30, current_timestamp);
-- SET QUERY_ID_WH_QUERY_COST = last_query_id();
-- SELECT $QUERY_ID_WH_QUERY_COST;


SELECT 
    query_id,
    database_name,
    warehouse_name,
    warehouse_size,
    -- total_elapsed_time,  -- in milliseconds
    total_elapsed_time / 3600000.0 AS total_elapsed_hours,
    CASE UPPER(warehouse_size)
        WHEN 'X-SMALL'   THEN 1
        WHEN 'SMALL'     THEN 2
        WHEN 'MEDIUM'    THEN 4
        WHEN 'LARGE'     THEN 8
        WHEN 'X-LARGE'   THEN 16
        WHEN '2X-LARGE'  THEN 32
        WHEN '3X-LARGE'  THEN 64
        WHEN '4X-LARGE'  THEN 128
        ELSE NULL
    END AS credits_per_hour,
    total_elapsed_hours * credits_per_hour estimated_credits_used
FROM snowflake.account_usage.query_history
WHERE start_time > dateadd(d, -30, current_date())
  and warehouse_name IS NOT NULL 
  and database_name is not null
  AND DATABASE_NAME = 'SERVERLESS_TASKS'
  and warehouse_size is not null
  AND query_id IN (
      SELECT DISTINCT query_id 
      FROM TABLE(RESULT_SCAN($QUERY_ID_TASK_HISTORY))
  );

In [None]:
SELECT
  task_name,
  database_name || '.' || schema_name || '.' || task_name AS fq_task_name,
  SUM(credits_used) AS total_credits,
  COUNT(*) AS task_execution_count,
  ROUND(total_credits / task_execution_count, 5) AS avg_task_credits
FROM
  snowflake.account_usage.serverless_task_history
WHERE
  start_time > DATEADD (DAY, -30, CURRENT_TIMESTAMP())
  -- AND database_name = 'SERVERLESS_TASKS'
GROUP BY
  1,
  2
 order by 1 ;

In [None]:
show warehouses;
SET QUERY_ID_WAREHOUSES = last_query_id();

-- get auto_resume costs. assume that if the query has triggered the warehouse, then the query needs to pay for the auto_suspend time.
-- this is a lookup table where this auto resume is triggered.
with auto_suspend_cost_lookup as (
    select "name" warehouse_name, 
    "auto_suspend" as auto_suspend, 
    "size" as size,   
    CASE UPPER("size")
        WHEN 'X-SMALL' THEN 1
        WHEN 'SMALL' THEN 2
        WHEN 'MEDIUM' THEN 4
        WHEN 'LARGE' THEN 8
        WHEN 'X-LARGE' THEN 16
        WHEN '2X-LARGE' THEN 32
        WHEN '3X-LARGE' THEN 64
        WHEN '4X-LARGE' THEN 128
    ELSE 0
  END AS credit_multiplier,
  (auto_suspend / 3600.0) * credit_multiplier as total_auto_resume_wh_credits
  FROM TABLE(RESULT_SCAN($QUERY_ID_WAREHOUSES))
  ),
-- warehouse events tied to tasks  
task_queries as (
   SELECT distinct(query_id) FROM TABLE(INFORMATION_SCHEMA.TASK_HISTORY())where error_code is null
)
-- need to establish a task query that triggers a warehouse event (and note the query_id)
select timestamp, weh.query_id, weh.warehouse_name, auto_suspend, ascl.size, credit_multiplier, total_auto_resume_wh_credits
  from SNOWFLAKE.ACCOUNT_USAGE.WAREHOUSE_EVENTS_HISTORY weh
  join auto_suspend_cost_lookup ascl
    on ascl.warehouse_name = weh.warehouse_name
  join task_queries tq
    on tq.query_id = weh.query_id
 -- where start_time > dateadd(d, -4, current_date());
;


In [None]:
def get_cron_times_if_safe(cron_expr, max_job_historical_run: timedelta, start_time=None, sample_runs=100):
    """
    Returns list of (weekday, minute of day) for a cron expression,
    but only if:
    - It never runs more frequently than every 1 hour
    - AND every interval is >= 2x max historical run time

    Args:
        cron_expr (str): Cron string like '*/15 * * * *'
        max_job_historical_run (timedelta): Longest known runtime of the job
        start_time (datetime, optional): Start of the interval. Defaults to now.
        sample_runs (int): How many cron executions to sample

    Returns:
        List[Tuple[str, int]]: List of (weekday, minute_of_day) if valid
                               Otherwise, an empty list []
    """
    if start_time is None:
        start_time = datetime.now(pytz.utc)

    iter = croniter(cron_expr, start_time)
    times = []

    last_run = iter.get_next(datetime)
    weekday = last_run.strftime("%A")
    minute_of_day = last_run.hour * 60 + last_run.minute
    times.append((weekday, minute_of_day))

    for _ in range(sample_runs - 1):
        next_run = iter.get_next(datetime)
        interval = next_run - last_run

        # Check both minimum frequency and historical runtime safety margin
        if interval < timedelta(hours=1) or interval < (2 * max_job_historical_run):
            return []

        weekday = next_run.strftime("%A")
        minute_of_day = next_run.hour * 60 + next_run.minute
        times.append((weekday, minute_of_day))
        last_run = next_run

    return times

In [None]:
# Cron: Daily at 9 AM
cron_expr = "0 9 * * *"
max_runtime = timedelta(minutes=30)

get_cron_times_if_safe(cron_expr, max_runtime)

# print(get_cron_times_if_safe(cron_expr, max_runtime))  # ✅ Returns list

# # Cron: Every hour
# cron_expr2 = "0 * * * *"
# max_runtime2 = timedelta(minutes=40)  # 2x = 80 mins > 60 mins interval

# print(get_cron_times_if_safe(cron_expr2, max_runtime2))  # ❌ Returns []

In [None]:
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns

def plot_cron_heatmap(cron_times):
    """
    Plot a static heatmap of cron job times using matplotlib and seaborn.

    Args:
        cron_times (list of tuples): [(weekday_str, minute_of_day), ...]
    """
    if not cron_times:
        print("No data to plot.")
        return

    # Define the order of days for consistent Y-axis placement
    day_order = ['Monday', 'Tuesday', 'Wednesday', 'Thursday', 'Friday', 'Saturday', 'Sunday']
    day_to_num = {day: i for i, day in enumerate(day_order)}

    # Create DataFrame from cron job times
    df = pd.DataFrame(cron_times, columns=['Day', 'MinuteOfDay'])
    df['DayNum'] = df['Day'].map(day_to_num)

    # Aggregate job counts per (day, minute) pair
    heatmap_data = df.groupby(['DayNum', 'MinuteOfDay']).size().unstack(fill_value=0)

    # Ensure all 1440 minutes are represented in columns
    all_minutes = list(range(0, 1440))
    heatmap_data = heatmap_data.reindex(columns=all_minutes, fill_value=0)

    # Reorder rows so Sunday appears at the bottom
    heatmap_data = heatmap_data.reindex(index=sorted(heatmap_data.index, reverse=True))

    # Create X-axis labels (HH:MM format) for each minute
    xtick_labels = [f"{m // 60:02d}:{m % 60:02d}" for m in heatmap_data.columns]
    ytick_labels = [day_order[i] for i in heatmap_data.index]

    # Create the heatmap
    plt.figure(figsize=(18, 6))
    ax = sns.heatmap(
        heatmap_data,
        cmap="YlGnBu",
        cbar_kws={'label': 'Job Runs'},
        xticklabels=xtick_labels,
        yticklabels=ytick_labels
    )
    ax.set_xlabel('Time of Day')
    ax.set_ylabel('Day of Week')
    ax.set_title('Cron Job Schedule Heatmap')

    # Reduce number of X-axis labels to improve readability
    step = max(1, len(xtick_labels) // 24)  # Approx. one label per hour
    ax.set_xticks(range(0, len(xtick_labels), step))
    ax.set_xticklabels(xtick_labels[::step], rotation=45, ha='right')

    plt.tight_layout()
    plt.show()




# # Example usage with output from get_cron_times_if_safe
example_data = [
    # Monday
    ("Monday", 120),    # 02:00 AM
    ("Monday", 540),    # 09:00 AM
    ("Monday", 1020),   # 05:00 PM

    # Tuesday
    ("Tuesday", 75),    # 01:15 AM
    ("Tuesday", 600),   # 10:00 AM
    ("Tuesday", 1230),  # 08:30 PM

    # Wednesday
    ("Wednesday", 180),   # 03:00 AM
    ("Wednesday", 720),   # 12:00 PM
    ("Wednesday", 1320),  # 10:00 PM

    # Thursday
    ("Thursday", 240),   # 04:00 AM
    ("Thursday", 480),   # 08:00 AM
    ("Thursday", 1080),  # 06:00 PM

    # Friday
    ("Friday", 360),    # 06:00 AM
    ("Friday", 660),    # 11:00 AM
    ("Friday", 1380),   # 11:00 PM

    # Saturday
    ("Saturday", 90),    # 01:30 AM
    ("Saturday", 780),   # 01:00 PM
    ("Saturday", 1260),  # 09:00 PM

    # Sunday
    ("Sunday", 0),       # 12:00 AM
    ("Sunday", 300),     # 05:00 AM
    ("Sunday", 1140),    # 07:00 PM
]


plot_cron_heatmap(example_data)

In [None]:
def get_minimum_interval_minutes(cron_times):
    """
    Calculate the minimum time interval (in minutes) between any two job runs
    in a weekly cron schedule.

    Args:
        cron_times (list of tuples): [(weekday_str, minute_of_day), ...]

    Returns:
        int or None: Minimum interval in minutes, or None if < 2 jobs exist.
    """
    if len(cron_times) < 2:
        return None

    # Map days to numbers (0 = Monday ... 6 = Sunday)
    day_to_num = {
        'Monday': 0, 'Tuesday': 1, 'Wednesday': 2,
        'Thursday': 3, 'Friday': 4, 'Saturday': 5, 'Sunday': 6
    }

    # Convert (day, minute_of_day) -> absolute minute in the week
    absolute_minutes = [
        day_to_num[day] * 1440 + minute for day, minute in cron_times
    ]

    # Sort and compute all pairwise differences (modulo 10080 to wrap week)
    absolute_minutes.sort()
    intervals = [
        (absolute_minutes[i+1] - absolute_minutes[i]) for i in range(len(absolute_minutes) - 1)
    ]

    # Add wrap-around interval (last to first, looping into next week)
    intervals.append((absolute_minutes[0] + 10080) - absolute_minutes[-1])

    return min(intervals)


In [None]:
def report_minimum_interval(cron_times, threshold=151):
    min_interval = get_minimum_interval_minutes(cron_times)
    print(min_interval)
    if min_interval is None:
        print("Not enough job runs to calculate interval.")
    else:
        print(f"Minimum interval between runs: {min_interval} minutes")
        if min_interval < threshold:
            print("Task ineligible. Time frame too short")
            
report_minimum_interval(example_data)