# Azure Data Explorer Kusto Query Queue: Run

In [None]:
from azure.kusto.data import KustoClient, KustoConnectionStringBuilder
from azure.kusto.data.helpers import dataframe_from_result_table
from azure.storage.queue import QueueClient
from typing import List, Dict

import pandas as pd
import logging 
import time
import json
import os

logging_timestamp = str(pd.Timestamp.today())
logging_timestamp = logging_timestamp.replace("-","").replace(":","").replace(".","").replace(" ","")
logging.basicConfig(
    filename=f"kqq-run-{logging_timestamp}.log",
    format='%(asctime)s %(levelname)s %(message)s',
    filemode='w'
)
logger = logging.getLogger()
logger.setLevel(logging.INFO)

class KustoQueryQueue:
    """
    Class that implements a Azure Data Explorer Query Queue for executing a series of management commands 
    Submits up to the concurrent limit and checks status until complete 
    Reads kql queries from a Azure Storage Queue
    """
    def __init__(
        self, 
        kusto_client: KustoClient, 
        kusto_database: str, 
        queue_client: QueueClient,
        max_concurrent_queries: int = 5,
        request_wait_seconds: int = 5,
        loop_wait_seconds: int = 30,
        message_visibility_timeout: int = 600,
    ):
        """
        Args:
            kusto_cleint: azure kusto cluster client object 
            kusto_database: name of data explorer database
            queue_client: azure storage queue client object 
            max_concurrent_queries: max number of concurrent queries in adx
            request_wait_seconds: sleep between http requests 
            loop_wait_seconds: sleep between loops
            message_visibility_timeout: time message is hidden after pulling a message from the queue
            *note* this value should exceed the time to run a query, otherwise it will processed multiple times
        Returns:
            KustoQueryQueue object 
        """
        self.kusto_client = kusto_client
        self.kusto_database = kusto_database
        self.queue_client = queue_client
        self.max_concurrent_queries = max_concurrent_queries
        self.request_wait_seconds = request_wait_seconds
        self.loop_wait_seconds = loop_wait_seconds
        self.message_visibility_timeout = message_visibility_timeout
        self.submitted_queries = []
        self.completed_queries = []
        self.failed_queries = []    
        
    def submit_query(self, query: str) -> str:
        """
        Submits a query 
        Args:
            query
        Returns:
            operation_id
        """
        response = kusto_client.execute(self.kusto_database, query)
        df = dataframe_from_result_table(response.primary_results[0])
        operation_id = df.iloc[0]["OperationId"]
        return operation_id
    
    def get_query_info(self, operation_id: str) -> Dict:
        """
        Get query info from operation_id
        Args:
            operation_id
        Returns:
            dictionary
        """
        response = kusto_client.execute(
            self.kusto_database, 
            f".show operations {operation_id}"
        )
        query_info = dataframe_from_result_table(response.primary_results[0])
        query_info_dict = query_info.to_dict(orient="records")[0]
        return query_info_dict
    
    def run(self): 
        """
        1. pulls from storage queue and submits queries up to the concurrent limit
        2. checks status of queries in progress
        3. if query is sucessful, query is removed from the storage queue
        4. exits when storage queue is empty and no queries are in progress 
        Returns:
            "run complete" string 
        """
        
        # queue size
        queue_size = self.queue_client.get_queue_properties().approximate_message_count
        logger.info(f"Queue Approximate Size: {queue_size}")
        print(f"Queue Approximate Size: {queue_size}")
        logger.info(f"Concurrent Query Limit: {self.max_concurrent_queries}")
        print(f"Concurrent Query Limit: {self.max_concurrent_queries}")
        
        # main program loop, runs until queue is empty and no queries are in progress 
        while True:            
        
            # display queue status 
            queue_status = {
                "In Progress" : len(self.submitted_queries),
                "Completed" : len(self.completed_queries),
                "Failed" : len(self.failed_queries),
            }
            logger.info(f"Queue Status: {queue_status}")
            print(f"Queue Status: {queue_status}")
            
            # ----------------------------------------------------
            # get queries from queue and submit loop 
            # ----------------------------------------------------
            
            # when number of in progress queries is less than or equal to the concurrent max limit
            while len(self.submitted_queries) <= self.max_concurrent_queries:
                
                # get next query from queue
                try:
                    queue_message = queue_client.receive_message(
                        visibility_timeout=self.message_visibility_timeout
                    )
                except Exception as e:
                    logger.error(f"Unable to get queue message, exception: {e}")
                    break
                finally:
                    time.sleep(self.request_wait_seconds)

                # queue not empty
                if queue_message:
                    message_content = json.loads(queue_message.content)
                    message_start_datetime = message_content["start_datetime"]
                    message_end_datetime = message_content["end_datetime"]
                    message_query = message_content["query"]
                    # submit query
                    try:
                        operation_id = self.submit_query(message_query)
                        self.submitted_queries.append(
                            {
                                "operation_id" : operation_id,
                                "queue_message" : queue_message
                            }
                        )
                    except Exception as e:
                        logger.error(f"Unable to get operation id, exception: {e}")
                        break
                    finally:
                        time.sleep(self.request_wait_seconds)
                    
                # queue empty
                else:
                
                    # no queries in progress
                    if len(self.submitted_queries) == 0:
                        # wait for failed query messages to become visible
                        logger.info(f"Waiting for visibility timeout to check for new messages in queue...")
                        print(f"Waiting for visibility timeout to check for new messages in queue...")
                        time.sleep(self.message_visibility_timeout + 60)
                        # check if queue still empty
                        try:
                            queue_message = queue_client.receive_message(
                                visibility_timeout=self.message_visibility_timeout
                            )
                        except Exception as e:
                            logger.error(f"Unable to check for queue messages, exception: {e}")
                            break
                        finally:
                            time.sleep(self.request_wait_seconds)
                        # exit program 
                        if not queue_message:
                            return "run complete"
                        
                    # message queue empty but queries still in progress
                    else:
                        break
    
            # ----------------------------------------------------
            # check submitted query status loop
            # ----------------------------------------------------
            
            for each_submitted_query in self.submitted_queries:                
                
                # get query state
                try:
                    each_operation_id = each_submitted_query["operation_id"]
                    each_query_info = self.get_query_info(each_operation_id)
                    each_query_state = each_query_info["State"]
                except Exception as e:
                    logger.error(f"Unable to get query info for {each_operation_id}, exception: {e}")
                    continue
                finally:
                    time.sleep(self.request_wait_seconds)
                    
                # if complete, remove from in progress list and storage queue
                if each_query_state == "Completed":
                    self.submitted_queries.remove(each_submitted_query)
                    self.completed_queries.append(each_operation_id)
                    logger.info(each_query_info)
                    try:
                        queue_client.delete_message(each_submitted_query["queue_message"])
                    except Exception as e:
                        logger.error(f"Unable to delete message, exception: {e}")
                    finally:
                        time.sleep(self.request_wait_seconds)

                # if failed, remove from in progress list, will retry after visibility timeout
                elif each_query_state in ["Throttled", "Failed"]:
                    self.submitted_queries.remove(each_submitted_query)
                    self.failed_queries.append(each_operation_id)
                    logger.error(each_query_info)
                    
            # main loop delay to prevent too many requests
            time.sleep(self.loop_wait_seconds)
        

# Azure Authentication

In [None]:
# storage account authentication for queue
# how to: storage account -> access keys -> connection string
# os.environ["AZURE_STORAGE_CONNECTION_STRING"] = "XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX"
storage_connection_str = os.getenv('AZURE_STORAGE_CONNECTION_STRING')

In [None]:
# queue connection
queue_name = "kusto-query-queue-123"
queue_client = QueueClient.from_connection_string(
    storage_connection_str, 
    queue_name
)

In [None]:
# service principal auth for adx
# docs: https://learn.microsoft.com/en-us/azure/developer/python/sdk/authentication-local-development-service-principal?tabs=azure-portal
# 1. azure portal -> app registrations -> client_id and tenant_id
# 2. add certificate & secrets -> new client secret -> secret value (only shown once)
# 3. *may need to add roles/permissions in adx or other apps* 
# os.environ["AZURE_CLIENT_ID"] = "XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX"
# os.environ["AZURE_TENANT_ID"] = "XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX"
# os.environ["AZURE_CLIENT_SECRET"] = "XXXXXXXXXXXXXXXXXXXXXXXXXXXXXX"
client_id = os.getenv('AZURE_CLIENT_ID')
tenant_id = os.getenv('AZURE_TENANT_ID')
client_secret = os.getenv('AZURE_CLIENT_SECRET')

In [None]:
# adx connection
kusto_uri = "https://XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX"
kusto_database = "XXXXXXXXXXXXXXXX"
kcsb = KustoConnectionStringBuilder.with_aad_application_key_authentication(
    kusto_uri, 
    client_id, 
    client_secret, 
    tenant_id
)
kusto_client = KustoClient(kcsb)

## Run

In [None]:
kqq = KustoQueryQueue(kusto_client, kusto_database, queue_client)
kqq.run()