# Data Collection

This jupyter notebook can be used to Query the Perf data with large result sets from Azure Log Analytics Workspace. 

It also has the KQL queries to collect Perf data from a Azure Log Analytics Workspace. The KQL queries relevant for CPU, Memory Usage data of pods, and their scaling parameters is included here. This notebook can be used to overcome the result export limitation from the Azure Log Analytics Workspace console.

It has been modified from the original sample notebook provided by azure. https://github.com/Azure/azure-sdk-for-python/blob/azure-monitor-query_1.2.0/sdk/monitor/azure-monitor-query/samples/notebooks/sample_large_query.ipynb

## Getting started

In [None]:
import sys

!{sys.executable} -m pip install --upgrade azure-monitor-query azure-identity pandas

In [None]:
# get current date
from datetime import datetime
now = datetime.now()
date_time = now.strftime("%d-%m-%Y")

### Setup

An authenticated client is required to query data from Log Analytics. The following code shows how to create a `LogsQueryClient` using the `DefaultAzureCredential`. Note that an async credential and client are used.

In [None]:
from azure.identity.aio import DefaultAzureCredential
from azure.monitor.query.aio import LogsQueryClient

credential = DefaultAzureCredential()
client = LogsQueryClient(credential)

#### Set Log Analytics workspace ID

Set the `LOGS_WORKSPACE_ID` variable below to the ID of your Log Analytics workspace.

In [None]:
LOGS_WORKSPACE_ID = ""

#### Define helper functions

In order to overcome the service limits, the strategy is to query data in smaller chunks based on some time column (i.e. `TimeGenerated`). The following helper functions are useful for this by querying your data in order to find suitable start and end times for the batch queries.

- The `get_batch_endpoints_by_row_count` function will return a list of times that can be used in the query time spans while ensuring that the number of rows returned will be less than the specified row limit. 
- The `get_batch_endpoints_by_size` function will return a list of times that can be used in the query time spans while ensuring that the size of the data returned is less than the specified byte size limit.

In [None]:
from datetime import datetime, timedelta

import pandas as pd

from azure.core.exceptions import HttpResponseError
from azure.monitor.query import LogsQueryStatus


async def get_batch_endpoints_by_row_count(
    query: str,
    end_time: datetime, 
    days_back: int, 
    max_rows_per_query: int = int(1e5),
    time_col: str = "TimeGenerated",
):
    """
    Determine the timestamp endpoints for each chunked query
    such that number of rows returned by each query is (approximately) `max_rows_per_query`
    """

    # This query will assign a batch number to each row depending on the maximum number of rows per query.
    # Then the earliest timestamp for each batch number is used for each query endpoint.
    find_batch_endpoints_query = f"""
        {query}
        | sort by {time_col} desc
        | extend batch_num = row_cumsum(1) / {max_rows_per_query}
        | summarize endpoint=min({time_col}) by batch_num
        | sort by batch_num asc
        | project endpoint
    """
    
    start_time = end_time - timedelta(days=days_back)
    try:
        response = await client.query_workspace(
            workspace_id=LOGS_WORKSPACE_ID,
            query=find_batch_endpoints_query,
            timespan=(start_time, end_time),
        )
    except HttpResponseError as e:
        print("Error batching endpoints by row count")
        raise e

    batch_endpoints = [end_time]
    batch_endpoints += [row[0] for row in response.tables[0].rows]
    return batch_endpoints


async def get_batch_endpoints_by_byte_size(
    query: str,
    end_time: datetime, 
    days_back: int,
    max_bytes_per_query: int = 100 * 1024 * 1024, # 100 MiB
    time_col: str = "TimeGenerated",
):
    """
    Determine the timestamp endpoints for each chunked query such that
    the size of the data returned is less than `max_bytes_per_query`.
    """
    
    # This query will assign a batch number to each row depending on the estimated data size.
    # Then the earliest timestamp for each batch number is used for each query endpoint.
    find_batch_endpoints_query = f"""
        {query}
        | sort by {time_col} desc
        | extend batch_num = row_cumsum(estimate_data_size(*)) / {max_bytes_per_query}
        | summarize endpoint=min({time_col}) by batch_num
        | sort by batch_num asc
        | project endpoint
    """

    start_time = end_time - timedelta(days=days_back)
    try:
        response = await client.query_workspace(
            workspace_id=LOGS_WORKSPACE_ID,
            query=find_batch_endpoints_query,
            timespan=(start_time, end_time)
        )
    except HttpResponseError as e:
        print("Error batching endpoints by byte size")
        raise e

    batch_endpoints = [end_time]
    batch_endpoints += [row[0] for row in response.tables[0].rows]
    return batch_endpoints

Next, define a function that will asynchronously execute a given query over a time range specified by a given start time and end time. This function will call the `query_workspace` method of the `LogsQueryClient`. The Azure Monitor Query library will automatically handle retries in case of connection-related errors or server errors (i.e. 500, 503, and 504 status codes). Check [here](https://github.com/Azure/azure-sdk-for-python/tree/main/sdk/core/azure-core#configurations) for more information on configuring retries.

In [None]:
async def execute_query(
    query: str, 
    start_time: datetime, 
    end_time: datetime, 
    *, 
    query_id: str = "",
    correlation_request_id: str = "",
):
    """
    Asynchronously execute the given query, restricted to the given time range, and parse the API response.

    :param str query: The query to execute.
    :param datetime start_time: The start of the time range to query.
    :param datetime end_time: The end of the time range to query.
    :keyword str query_id: Optional identifier for the query, used for printing.
    :keyword str correlation_request_id, Optional correlation ID to use in the query headers for tracing.
    """
    headers = {}
    if correlation_request_id:
        headers["x-ms-correlation-request-id"] = correlation_request_id

    try:
        response = await client.query_workspace(
            workspace_id=LOGS_WORKSPACE_ID,
            query=query,
            timespan=(start_time, end_time),
            server_timeout=360,                 
            include_statistics=False, # Can be used for debugging.
            headers=headers,
            retry_on_methods=["POST"]
        )
    except HttpResponseError as e:
        print(f"Error when attempting query {query_id} (query time span: {start_time} to {end_time}):\n\t", e)
        return []

    if response.status == LogsQueryStatus.SUCCESS:
        print(f"Query {query_id} successful (query time span: {start_time} to {end_time}). Row count: {len(response.tables[0].rows)}")
        return response.tables[0]
    else:
        # This will be a LogsQueryPartialResult.
        error = response.partial_error
        print(f"Partial results returned for query {query_id} (query time span: {start_time} to {end_time}):\n\t", error)
        return response.partial_data[0]

## Query data

With the helper functions defined, you can now query the data in chunks that won't hit the row count and data size service limits.

### Set variables

Before running the queries, some variables will need to be configured.

- `QUERY` - KQL query to run. Change the table name and specify any required columns and filters as needed. When constructing this query, the recommendation is to use [reduced KQL](https://learn.microsoft.com/azure/azure-monitor/logs/basic-logs-query?tabs=portal-1#kql-language-limits) which are optimized for data retrieval. To get all rows/columns, just set `QUERY = <name-of-table>`. 
- `END_TIME` - End of the time range to query over.
- `DAYS_BACK` - The number of days to go back from the end time. For example, if `END_TIME = datetime.now()` and `DAYS_BACK = 7`, the query will return data from the last 7 days. Note that fetched data will (initially) be stored in memory on your system, so it is possible to run into memory limitations if the query returns a large amount of data. If this issue is encountered, consider querying the data in time segments. For example, instead of querying 365 days of data at once, query 100 days of data at a time by setting the values of `END_TIME` and `DAYS_BACK` appropriately and re-running the notebook from this cell onwards for each separate segment.
- `MAX_ROWS_PER_QUERY` - The max number of rows that is returned from a single query. This is defaulted to the service limit of 500,000 rows multiplied by some factor to allow for some wiggle room. This limit may sometimes be exceeded if many entries share the same timestamp.
- `MAX_BYTES_PER_QUERY` - The max size in bytes of data returned from a single query. This is defaulted to the service limit of 100 MiB multiplied by some factor to allow for some wiggle room.
- `MAX_CONCURRENT_QUERIES` - The max number of concurrent queries to run at once. This is defaulted to 5. Reducing this can help avoid errors due to rate limits.

In [None]:
# EDIT THIS VALUE WITH YOUR QUERY.
# If necessary, add a KQL `project` operator or any filtering operators to limit the number of rows returned.
QUERY_CPU = """
let k8sNamespace = "";
let subscriptionId = "";
let resourceGroup = "";
let clusterName = "";
let metricName = "cpuUsageNanoCores";
// let metricName = "memoryWorkingSetBytes";
let clusterIdPrefix = strcat("/subscriptions/", subscriptionId, "/resourceGroups/", resourceGroup, "/providers/Microsoft.ContainerService/managedClusters/", clusterName, "/");
let ContainerNames = KubePodInventory
// | where TimeGenerated >= startDateTime and TimeGenerated < endDateTime
    | where ClusterName == clusterName
    | where ControllerKind == "ReplicaSet"
    | where PodStatus == "Running"
    | where Namespace startswith_cs k8sNamespace
    | distinct InstanceName=strcat(clusterIdPrefix, ContainerName), Name, PodCreationTimeStamp, Namespace, PodStartTime;
let InstanceNames = ContainerNames
| project InstanceName;
Perf
| where ObjectName == 'K8SContainer'
| where InstanceName in (InstanceNames)
| where CounterName == metricName
| join kind=inner (ContainerNames) on InstanceName
| project TimeGenerated, CpuCounter=CounterValue, InstanceName, Name, PodCreationTimeStamp, Namespace, PodStartTime
"""

In [None]:
QUERY_MEMORY = """
let k8sNamespace = "";
let subscriptionId = "";
let resourceGroup = "";
let clusterName = "";
// let metricName = "cpuUsageNanoCores";
let metricName = "memoryWorkingSetBytes";
let clusterIdPrefix = strcat("/subscriptions/", subscriptionId, "/resourceGroups/", resourceGroup, "/providers/Microsoft.ContainerService/managedClusters/", clusterName, "/");
let ContainerNames = KubePodInventory
// | where TimeGenerated >= startDateTime and TimeGenerated < endDateTime
    | where ClusterName == clusterName
    | where ControllerKind == "ReplicaSet"
    | where PodStatus == "Running"
    | where Namespace startswith_cs k8sNamespace
    | distinct InstanceName=strcat(clusterIdPrefix, ContainerName), Name, PodCreationTimeStamp, Namespace, PodStartTime;
let InstanceNames = ContainerNames
| project InstanceName;
Perf
// | where TimeGenerated >= startDateTime and TimeGenerated < endDateTime
| where ObjectName == 'K8SContainer'
| where InstanceName in (InstanceNames)
| where CounterName == metricName
| join kind=inner (
ContainerNames
) on InstanceName
| project TimeGenerated, MemoryCounter=CounterValue, InstanceName, Name, PodCreationTimeStamp, Namespace, PodStartTime
"""

The below queries needed to get the instance and scaling information can be run directly on the azure log analytics workspace and be exported.

In [None]:
# run on azure console
QUERY_INSTANCES = """
let endDateTime = ""
let startDateTime = ""
let k8sNamespace = "";
let subscriptionId = "";
let resourceGroup = "";
let clusterName = "";
let clusterIdPrefix = strcat("/subscriptions/", subscriptionId, "/resourceGroups/", resourceGroup, "/providers/Microsoft.ContainerService/managedClusters/", clusterName, "/");
KubePodInventory
 | where TimeGenerated >= startDateTime and TimeGenerated < endDateTime
    | where Namespace startswith_cs k8sNamespace
    | where ControllerKind == "ReplicaSet"
    | where PodStatus == "Running"
    | distinct InstanceName=strcat(clusterIdPrefix, ContainerName), Name
 | join kind=inner hint.strategy=shuffle (
    Perf
    | where CounterName in ("cpuLimitNanoCores", "cpuRequestNanoCores", "memoryRequestBytes", "memoryLimitBytes")
    | summarize Value = max(CounterValue) by InstanceName, CounterName
    | project
        InstanceName,
        Value,
        CounterName
    )
    on InstanceName
| project InstanceName, Name, CounterName, Value 
"""

In [None]:
QUERY_HPA = """
let endDateTime = ""
let startDateTime = ""
KubePodInventory
| where TimeGenerated >= startDateTime 
| where ClusterName == ""
| where Namespace startswith ""
| extend labels = todynamic(PodLabel)
| extend deployment_hpa = reverse(substring(reverse(ControllerName), indexof(reverse(ControllerName), "-") + 1))
| distinct tostring(deployment_hpa)
| join kind=inner (InsightsMetrics 
    | where TimeGenerated >= startDateTime and TimeGenerated < endDateTime
    | where Name == 'kube_hpa_status_current_replicas'
    | extend pTags = todynamic(Tags) //parse the tags for values
    | extend ns = todynamic(pTags.k8sNamespace) //parse namespace value from tags
    | extend deployment_hpa = todynamic(pTags.targetName) //parse HPA target name from tags
    | extend last_scale_time = todynamic(pTags.lastScaleTime)
    | extend min_reps = todynamic(pTags.spec_min_replicas)
    | extend max_reps = todynamic(pTags.spec_max_replicas) // Parse maximum replica settings from HPA deployment
    | extend desired_reps = todynamic(pTags.status_desired_replicas) // Parse desired replica settings from HPA deployment
    | summarize arg_max(TimeGenerated, *) by tostring(ns), tostring(deployment_hpa), Cluster=toupper(tostring(split(_ResourceId, '/')[8])), toint(desired_reps), toint(max_reps), scale_out_percentage=(desired_reps * 100 / max_reps)
    //| where scale_out_percentage > _minthreshold and scale_out_percentage <= _maxthreshold
    )
    on deployment_hpa
"""

In [None]:

# If you want to use a different end time, uncomment the following line and adjust as needed.
END_TIME = datetime.strptime("2024-04-25 03:30:00 +0530", "%Y-%m-%d %H:%M:%S %z")
DAYS_BACK = 28

MAX_ROWS_PER_QUERY_SERVICE_LIMIT = int(5e5)  # 500K
MAX_ROWS_PER_QUERY = int(MAX_ROWS_PER_QUERY_SERVICE_LIMIT * 0.9)

MAX_BYTES_PER_QUERY_SERVICE_LIMIT = 100 * 1024 * 1024 # 100 MiB of compressed data
MAX_BYTES_PER_QUERY = int(MAX_BYTES_PER_QUERY_SERVICE_LIMIT * 0.6)

MAX_CONCURRENT_QUERIES = 5

### Estimate data and costs (optional)

Before running the chunked queries, it might first be prudent to estimate the size of the data if planning on exporting the query results to another service. The below cell defines another helper function that can be used to estimate the size of the data.

In [None]:
async def estimate_data_size(query: str, end_time: datetime, days_back: int):
    query = f"{query} | summarize n_rows = count(), estimate_data_size = sum(estimate_data_size(*))"
    start_time = end_time - timedelta(days=days_back)
    response = await client.query_workspace(
        workspace_id=LOGS_WORKSPACE_ID,
        query=query,
        timespan=(start_time, end_time),
    )

    columns = response.tables[0].columns
    rows = response.tables[0].rows
    df = pd.DataFrame(data=rows, columns=columns)
    return df

Now, run the following cell to estimate the size of the data that will be returned by the queries. Note that this is just an estimate and the actual size may vary slightly. This information can be used in conjunction with the Azure storage [pricing calculator](https://azure.microsoft.com/pricing/calculator/?service=storage) to determine costs that will be incurred for your storage setup. If using Azure Data Lake Storage Gen2, full billing details can be found [here](https://azure.microsoft.com/pricing/details/storage/data-lake/).

In [None]:
data_size_df_cpu = await estimate_data_size(QUERY_CPU, END_TIME, DAYS_BACK)
data_size_df_cpu["estimate_data_size_MB"] = data_size_df_cpu["estimate_data_size"] / (1000 **2)
data_size_df_cpu["estimate_data_size_GB"] = data_size_df_cpu["estimate_data_size_MB"] / 1000
data_size_df_cpu

In [None]:
data_size_df_memory = await estimate_data_size(QUERY_MEMORY, END_TIME, DAYS_BACK)
data_size_df_memory["estimate_data_size_MB"] = data_size_df_memory["estimate_data_size"] / (1000 **2)
data_size_df_memory["estimate_data_size_GB"] = data_size_df_memory["estimate_data_size_MB"] / 1000
data_size_df_memory

### Fetch log data

Use the helper functions to create an async wrapper function that will query the data in chunks using the variables defined above. This function will return the results as a single pandas DataFrame. 

In [None]:
import asyncio
import itertools
import uuid

# Limit the number of concurrent queries.
semaphore = asyncio.Semaphore(MAX_CONCURRENT_QUERIES)

async def fetch_logs(query: str, start_time: datetime, end_time: datetime, query_id: str, correlation_request_id: str):
    async with semaphore:
        return await execute_query(query, start_time, end_time, query_id=query_id, correlation_request_id=correlation_request_id)


async def run(QUERY):
    # Below, we combine the endpoints retrieved from both endpoint methods to ensure that the number of rows
    # and the size of the data returned are both within the limits.
    # Worst case performance is double the theoretical minimum number of queries necessary.
    row_batch_endpoints = await get_batch_endpoints_by_row_count(QUERY, END_TIME, days_back=DAYS_BACK, max_rows_per_query=MAX_ROWS_PER_QUERY)
    byte_batch_endpoints = await get_batch_endpoints_by_byte_size(QUERY, END_TIME, days_back=DAYS_BACK, max_bytes_per_query=MAX_BYTES_PER_QUERY)
    batch_endpoints = sorted(set(byte_batch_endpoints + row_batch_endpoints), reverse=True)

    queries = []
    end_time = batch_endpoints[0]
    correlation_request_id = str(uuid.uuid4())

    print(f"Querying {len(batch_endpoints) - 1} time ranges, from {batch_endpoints[-1]} to {end_time}")
    print(f"Correlation request ID: {correlation_request_id}")
    
    for i in range(1, len(batch_endpoints)):
        start_time = batch_endpoints[i]
        queries.append(fetch_logs(QUERY, start_time, end_time, query_id=str(i), correlation_request_id=correlation_request_id))
        end_time = start_time - timedelta(microseconds=1) # Subtract 1 microsecond to avoid overlap between queries.

    responses = await asyncio.gather(*queries)

    rows = itertools.chain.from_iterable([table.rows for table in responses if table])
    columns = responses[0].columns
    return pd.DataFrame(data=rows, columns=columns)

Now, go ahead and run the following cell to fetch the data. Note that this may take some time depending on the size of the data and the number of queries that need to be run.

In [None]:
df_cpu = await run(QUERY_CPU)
print(f"Retrieved {len(df_cpu)} rows")

In [None]:
# save as csv
df_cpu.to_csv('./data/raw/cpu/cpu-{}.csv'.format(date_time), index=False)

In [None]:
df_memory = await run(QUERY_MEMORY)
print(f"Retrieved {len(df_memory)} rows")

In [None]:
# save as csv
df_memory.to_csv('./data/raw/memory/memory-{}.csv'.format(date_time), index=False)

After the data has been fetched, you can now use the `df` DataFrame for further analysis.