In [0]:

from pyspark.sql.session import SparkSession
import time
import pandas as pd
from delta import *
import os

# Function to optimize Delta table if it exists
def optimize_table(path):
    if os.path.exists(path):
        DeltaTable.forPath(spark, path).optimize().executeCompaction()
        print(f"Optimized Delta Table at {path}")
    else:
        print(f"Delta Table at {path} does not exist.")

# This routine requires the paths defined in the includes notebook
# and it clears data from the previous run.
def clear_previous_run() -> bool:
    # delete previous run 
    dbutils.fs.rm(BRONZE_CHECKPOINT, True)
    dbutils.fs.rm(BRONZE_DELTA, True)
    dbutils.fs.rm(SILVER_CHECKPOINT, True)
    dbutils.fs.rm(SILVER_DELTA, True)
    dbutils.fs.rm(GOLD_CHECKPOINT, True)
    dbutils.fs.rm(GOLD_DELTA, True)
    return True

def stop_all_streams() -> bool:
    stopped = False
    for stream in spark.streams.active:
        stopped = True
        stream.stop()
    return stopped


def stop_named_stream(spark: SparkSession, namedStream: str) -> bool:
    stopped = False
    for stream in spark.streams.active:
        if stream.name == namedStream:
            stopped = True 
            stream.stop()
    return stopped

def wait_stream_start(spark: SparkSession, namedStream: str) -> bool:
    started = False
    count = 0
    if started == False and count <= 3:
        for stream in spark.streams.active:
            if stream.name == namedStream:
                started = True
        count += 1
        time.sleep(10)
    return started    

# Function to wait for the Delta table to be ready
def wait_for_delta_table(path, timeout=30, check_interval=2):
    """
    Waits for a Delta table to be available before proceeding.

    Args:
        path (str): Path to the Delta table.
        timeout (int): Maximum wait time in seconds.
        check_interval (int): Time interval to check for table availability.

    Returns:
        bool: True if the table is ready, False otherwise.
    """
    elapsed_time = 0
    while elapsed_time < timeout:
        try:
            if spark.read.format("delta").load(path).count() > 0:
                return True
        except:
            pass
        time.sleep(check_interval)
        elapsed_time += check_interval
    return False

# Function to retrieve streaming statistics
def get_streaming_stats():
    """
    Retrieves streaming statistics such as elapsed time, input row count, and processing time.

    Returns:
        pd.DataFrame: A dataframe containing streaming statistics for active queries.
    """
    data = []
    start_time = None  # Track when the job started

    for q in spark.streams.active:
        progress = q.recentProgress
        if progress:
            for p in progress:
                timestamp = datetime.strptime(p["timestamp"], "%Y-%m-%dT%H:%M:%S.%fZ")

                # Set the start time on the first iteration
                if start_time is None:
                    start_time = timestamp

                elapsed_time = (timestamp - start_time).total_seconds()  # Convert to seconds

                # Check if 'addBatch' exists in 'durationMs' before accessing it
                processing_time = p["durationMs"].get("addBatch", None) if "durationMs" in p else None

                data.append({
                    "query": q.name,
                    "elapsed_time": elapsed_time,  # Time in seconds since job start
                    "input_rows": p.get("numInputRows", 0),  # Default to 0 if missing
                    "processing_time": processing_time,  # Could be None if not available
                    "memory_used": p.get("aggregatedStateOperators", [{}])[0].get("stateMemory", 0) if p.get("aggregatedStateOperators") else 0
                })

    return pd.DataFrame(data)