In [0]:
def get_streaming_stats():
    """
    Retrieves streaming statistics such as elapsed time, input row count, and processing time.

    Returns:
        pd.DataFrame: A dataframe containing streaming statistics for active queries.
    """
    data = []
    start_time = None  # Track when the job started

    for q in spark.streams.active:
        progress = q.recentProgress
        if progress:
            for p in progress:
                timestamp = datetime.strptime(p["timestamp"], "%Y-%m-%dT%H:%M:%S.%fZ")

                # Set the start time on the first iteration
                if start_time is None:
                    start_time = timestamp

                elapsed_time = (timestamp - start_time).total_seconds()  # Convert to seconds

                # Check if 'addBatch' exists in 'durationMs' before accessing it
                processing_time = p["durationMs"].get("addBatch", None) if "durationMs" in p else None

                data.append({
                    "query": q.name,
                    "elapsed_time": elapsed_time,  # Time in seconds since job start
                    "input_rows": p.get("numInputRows", 0),  # Default to 0 if missing
                    "processing_time": processing_time,  # Could be None if not available
                    "memory_used": p.get("aggregatedStateOperators", [{}])[0].get("stateMemory", 0) if p.get("aggregatedStateOperators") else 0
                })

    return pd.DataFrame(data)


def clean_up_delta_tables():
    """
    Removes existing Delta tables for a fresh run.
    """
    dbutils.fs.rm(delta_table_path, True)
    dbutils.fs.rm(count_table_path, True)
    dbutils.fs.rm(analytics_table_path, True)
    dbutils.fs.rm(word_count_path, True)

# Function to wait for the Delta table to be ready
def wait_for_delta_table(path, timeout=30, check_interval=2):
    """
    Waits for a Delta table to be available before proceeding.

    Args:
        path (str): Path to the Delta table.
        timeout (int): Maximum wait time in seconds.
        check_interval (int): Time interval to check for table availability.

    Returns:
        bool: True if the table is ready, False otherwise.
    """
    elapsed_time = 0
    while elapsed_time < timeout:
        try:
            if spark.read.format("delta").load(path).count() > 0:
                return True
        except:
            pass
        time.sleep(check_interval)
        elapsed_time += check_interval
    return False

# Explicitly define a more complex schema for the JSON files
json_schema = StructType([
    StructField("id", StringType(), True),
    StructField("subject", StringType(), True),
    StructField("value", StringType(), True),
    StructField("timestamp", TimestampType(), True),
    StructField("category", StringType(), True),
    StructField("text", StringType(), True),  # Added for text analysis
    StructField("metrics", StructType([  # Nested structure
        StructField("temperature", DoubleType(), True),
        StructField("pressure", DoubleType(), True),
        StructField("humidity", DoubleType(), True)
    ]), True)
])

def start_json_stream():
    """
    Starts a Spark Structured Streaming job to ingest JSON files and write to Delta Lake.
    Now with more files per trigger for increased load.

    Returns:
        StreamingQuery: A reference to the running streaming query.
    """
    # Add some CPU-intensive transformations
    return (
        spark.readStream
        .format("cloudFiles")
        .option("cloudFiles.format", "json")
        .option("maxFilesPerTrigger", MAX_FILES_PER_TRIGGER)  # Increased from 1
        .schema(json_schema)  # Apply the explicit schema
        .load(input_path)
        .withColumn("timestamp", current_timestamp())
        # Add CPU-intensive operations
        .withColumn("random_value", rand() * 100)  # Generate random values
        .withColumn("value_squared", expr("cast(value as double) * cast(value as double)"))
        .withColumn("complex_calculation", 
                   expr("CASE WHEN metrics.temperature > 0 THEN LOG(metrics.temperature) * SQRT(metrics.pressure) ELSE 0 END"))
        .writeStream
        .format("delta")
        .outputMode("append")
        .option("checkpointLocation", f"{delta_table_path}/_checkpoint")
        .option("mergeSchema", "true")  # Enable schema merging
        .trigger(processingTime="2 seconds")  # Faster trigger
        .queryName("JSON_Ingestion_Stream")
        .start(delta_table_path)
    )
    
def start_count_stream():
    """
    Starts a streaming query to compute row counts and timestamps from the Delta table.
    Now with windowed aggregations for more complexity.

    Returns:
        StreamingQuery: A reference to the running count streaming query.
    """
    return (
        spark.readStream
        .format("delta")
        .load(delta_table_path)
        .withWatermark("timestamp", "10 seconds")  # Add watermarking
        # Add window-based aggregation for more CPU load
        .groupBy(
            window("timestamp", "5 seconds", "2 seconds")
        )
        .agg(
            count("*").alias("row_count"),
            min("timestamp").alias("min_timestamp"),
            max("timestamp").alias("max_timestamp"),
            expr("percentile_approx(random_value, 0.5)").alias("median_value"),  # More complex aggregation
            expr("stddev(value_squared)").alias("std_dev"),  # Add statistical calculation
            expr("avg(complex_calculation)").alias("avg_complex")
        )
        .writeStream
        .format("delta")
        .outputMode("append")  # Changed to append for windowed aggregation
        .option("checkpointLocation", f"{count_table_path}/_checkpoint")
        .queryName("Row_Count_Stream")
        .trigger(processingTime="3 seconds")
        .start(count_table_path)
    )

def start_word_count_stream():
    """
    Adds a text analysis stream to process text fields and count words.
    
    Returns:
        StreamingQuery: A reference to the running word count streaming query.
    """
    return (
        spark.readStream
        .format("delta")
        .load(delta_table_path)
        .select("id", "text", "timestamp")
        .filter(col("text").isNotNull())
        # Explode the text into words for counting
        .withColumn("word", explode(split(col("text"), " ")))
        .groupBy("word")
        .agg(count("*").alias("count"))
        .orderBy(col("count").desc())
        .writeStream
        .format("delta")
        .outputMode("complete")
        .option("checkpointLocation", f"{word_count_path}/_checkpoint")
        .queryName("Word_Count_Stream")
        .trigger(processingTime="4 seconds")
        .start(word_count_path)
    )

def start_analytics_stream():
    """
    Adds a more complex analytics stream with joins and multiple aggregations.
    
    Returns:
        StreamingQuery: A reference to the running analytics streaming query.
    """
    return (
        spark.readStream
        .format("delta")
        .load(delta_table_path)
        .withWatermark("timestamp", "15 seconds")
        .groupBy(
            col("category"),
            window("timestamp", "10 seconds", "5 seconds")
        )
        .agg(
            count("*").alias("record_count"),
            expr("avg(metrics.temperature)").alias("avg_temp"),
            expr("max(metrics.pressure)").alias("max_pressure"),
            expr("min(metrics.humidity)").alias("min_humidity"),
            expr("stddev(value_squared)").alias("value_stddev"),
            expr("sum(complex_calculation)").alias("total_complex")
        )
        .select(
            col("category"),
            col("window.start").alias("window_start"),
            col("window.end").alias("window_end"),
            col("record_count"),
            col("avg_temp"),
            col("max_pressure"),
            col("min_humidity"),
            col("value_stddev"),
            col("total_complex"),
            expr("total_complex / record_count").alias("complex_per_record")
        )
        .writeStream
        .format("delta")
        .outputMode("append")
        .option("checkpointLocation", f"{analytics_table_path}/_checkpoint")
        .queryName("Analytics_Stream")
        .trigger(processingTime="5 seconds")
        .start(analytics_table_path)
    )

def generate_json_files(dbfs_path, num_files=5):
    """
    Generates more complex JSON files with additional fields for the enhanced test.

    Args:
        dbfs_path (str): The DBFS directory to write the JSON files to.
        num_files (int): The number of JSON files to generate.
    """
    categories = ["sensor", "device", "system", "network", "application"]
    texts = [
        "This is a sample text with multiple words for analysis purposes",
        "Spark streaming performance testing with complex data structures",
        "Big data processing at scale requires efficient resource management",
        "Streaming analytics can provide real-time insights into your data",
        "Optimizing Spark jobs is essential for maintaining performance"
    ]

    for i in tqdm(range(num_files)):
        file_name = f"data_{i}.json"
        full_dbfs_path = f"{dbfs_path}/{file_name}"
        local_tmp_path = f"/tmp/{file_name}"  # Use a temporary local path

        # Generate more complex data
        data = {
            "id": f"ID-{i}",
            "subject": f"Subject_{i % 10}",
            "value": str(i * 10),  # Convert to string for stringType
            "category": categories[i % len(categories)],
            "text": texts[i % len(texts)] + f" iteration {i}",
            "metrics": {
                "temperature": round(20 + (i % 15) + np.random.random(), 2),
                "pressure": round(1000 + (i % 50) + np.random.random() * 10, 2),
                "humidity": round(30 + (i % 70) + np.random.random() * 5, 2)
            }
        }

        # Write JSON to a local temp file
        with open(local_tmp_path, 'w') as f:
            json.dump(data, f)

        # Copy the file to DBFS
        dbutils.fs.cp(f"file://{local_tmp_path}", full_dbfs_path)