### This setup will capture the streaming metrics and send them to Azure Application Insights, using the app insights connection string for autentication.

#### Step 1: Import Required Libraries.
Import the necessary libraries for Application Insights.

In [0]:
# https://github.com/Azure-Samples/databricks-observability/blob/main/modules/databricks/notebooks/telemetry-helper.py

In [0]:
import os
import json
import logging
from pyspark.sql.functions import col
from pyspark.sql.streaming import StreamingQueryListener
from pyspark.sql.session import SparkSession
from pyspark.sql.types import TimestampType
from pyspark.sql import DataFrame
import azure.identity
from azure.identity import DefaultAzureCredential, EnvironmentCredential, ManagedIdentityCredential, SharedTokenCacheCredential
from azure.identity import ClientSecretCredential
from azure.monitor.ingestion import LogsIngestionClient
from azure.core.exceptions import HttpResponseError
from opentelemetry._logs import (
    get_logger_provider,
    set_logger_provider,
)
from opentelemetry.sdk._logs import (
    LoggerProvider,
    LoggingHandler,
)
from opentelemetry.sdk._logs.export import BatchLogRecordProcessor
from azure.monitor.opentelemetry.exporter import AzureMonitorLogExporter

#### Step 3: Set Up Application Insights Logger.
Configure the logger to send metrics to Application Insights.

In [0]:
set_logger_provider(LoggerProvider())
exporter = AzureMonitorLogExporter.from_connection_string(
    dbutils.secrets.get("myscope", key="appinsightsconnstr")
)
get_logger_provider().add_log_record_processor(BatchLogRecordProcessor(exporter))

# Attach LoggingHandler to namespaced logger
handler = LoggingHandler()
logger = logging.getLogger(__name__)
logger.addHandler(handler)
logger.setLevel(logging.INFO)

####Step 4: Modify the StreamingQueryListener to Send Metrics.
Update the ValueTrackingListener class to send metrics to Application Insights.

In [0]:
# COMMAND ----------
from pyspark.sql.streaming import StreamingQueryListener
from pyspark.sql.functions import *
import time

class ValueTrackingListener(StreamingQueryListener):
    def __init__(self):
        self.total_rows = 0
        self.total_avg_value = 0.0
        self.num_batches = 0
        self.start_time = None
        self.end_time = None

    def onQueryStarted(self, event):
        self.start_time = time.time()
        logger.info(f"'{event.name}' [{event.id}] got started!")
    
    def onQueryProgress(self, event):
        row = event.progress.observedMetrics.get("metric")
        if row is not None:
            cnt = row.cnt
            avg_value = row.avg_value
            self.total_rows += cnt
            self.total_avg_value += avg_value
            self.num_batches += 1
            logger.info(f"Recorded metric avg_value: {avg_value}")
            # Send custom metric to Application Insights
            logger.info({
                'custom_dimensions': {
                    'avg_value': avg_value,
                    'query_name': event.name,
                    'query_id': event.id
                }
            })
    
    def onQueryTerminated(self, event):
        self.end_time = time.time()
        total_time_taken = self.end_time - self.start_time
        if self.num_batches > 0:
            overall_avg_value = self.total_avg_value / self.num_batches
        else:
            overall_avg_value = 0.0
        logger.info(f"{event.id} got terminated!")
        logger.info(f"Total rows processed: {self.total_rows}")
        logger.info(f"Overall average value: {overall_avg_value}")
        logger.info(f"Number of batches: {self.num_batches}")
        logger.info(f"Total time taken: {total_time_taken} seconds")
        
        # Send total rows, number of batches, and total time taken to Application Insights
        logger.info({
            'custom_dimensions': {
                'total_rows': self.total_rows,
                'num_batches': self.num_batches,
                'total_time_taken': total_time_taken,
                'query_name': event.name,
                'query_id': event.id
            }
        })

# Add listener
listener = ValueTrackingListener()
spark.streams.addListener(listener)

####Step 5: Run the Streaming Query.
Ensure your streaming query is set up to use the listener.

In [0]:
# COMMAND ----------
streaming_df = (spark
    .readStream
    .format("rate")
    .option("rowsPerSecond", 10)
    .load())

observed_streaming_df = streaming_df.observe(
    "metric",
    count(lit(1)).alias("cnt"),  # number of processed rows
    avg(col("value")).alias("avg_value"))  # average of row values

# COMMAND ----------
query = (observed_streaming_df
    .writeStream
    .format("console")
    .queryName("Rate query")
    .start())

# COMMAND ----------
import time
time.sleep(120)
query.stop()
spark.streams.removeListener(listener)