In [14]:
# Test script for execute_kql_query function
from azure.identity import DefaultAzureCredential
from azure.monitor.query import LogsQueryClient, LogsQueryStatus
from utils.kql_query import execute_kql_query
from dotenv import load_dotenv
import os
import pandas as pd

# Initialize credentials and client
load_dotenv()
credential = DefaultAzureCredential()
client = LogsQueryClient(credential=credential)
workspace_id = os.getenv("SENTINEL_WORKSPACE_ID")

# Check if workspace_id is set, if set then good to go, else raise error
if not workspace_id:
    raise ValueError("SENTINEL_WORKSPACE_ID environment variable is not set.")


In [15]:
rule_title = "Security Incident Analysis"
rule_description = "Identifies Microsoft Sentinel Incidents created in response to security alerts, providing insights into potential threats and vulnerabilities within the environment."
rule_file_name = "security_incident_analysis.yaml"
rule_save_path = os.path.join("queries", "analysis", "xdr", rule_file_name) 
rule_references = [
    "https://learn.microsoft.com/en-us/azure/sentinel/manage-soc-with-incident-metrics?utm_source=chatgpt.com",
    "https://learn.microsoft.com/en-us/azure/azure-monitor/reference/tables/securityincident?utm_source=chatgpt.com"
]
rule_author = "Kevin Flint"
rule_tags = [
    "xdr",
    "security incident",
    "microsoft sentinel",
    "incident analysis",
    "threat detection"
]
rule_table = "SecurityIncident"
rule_category = "xdr"
rule_false_positives = [
    "Legitimate security incidents created by security operations teams.",
    "Incidents generated from known benign alerts or activities."
    "RH Red Team activity and/or Protivity Offensive Security activity."
]
rule_level = "high" 
rule_kql_query = """SecurityIncident\n| where TimeGenerated between ({{ start_time }} .. {{ end_time }})\n{% if device_name and user_name %}| where * contains "{{ device_name }}" or * contains "{{ user_name }}"\n{% elif device_name %}| where * contains "{{ device_name }}"\n{% elif user_name %}| where * contains "{{ user_name }}"\n{% endif %}"""

In [16]:
import uuid
from datetime import datetime
import yaml

# Build the YAML structure as a dictionary first
analytic_rule_dict = {
    'title': rule_title,
    'id': str(uuid.uuid4()),
    'status': 'test',
    'description': rule_description,
    'references': rule_references,
    'author': rule_author,
    'date': datetime.now().strftime('%Y-%m-%d'),
    'modified': datetime.now().strftime('%Y-%m-%d'),
    'tags': rule_tags,
    'logsource': {
        'product': 'windows',
        'table': rule_table,
        'category': rule_category
    },
    'kql': rule_kql_query.strip(),
    'falsepositives': rule_false_positives,
    'level': rule_level
}

# Convert to YAML string
analytic_rule_yaml = yaml.dump(analytic_rule_dict, default_flow_style=False, sort_keys=False)

print("✓ Valid YAML")
print("\nGenerated YAML:")

# Parse back to verify
parsed_yaml = yaml.safe_load(analytic_rule_yaml)
print(f"\nTitle: {parsed_yaml['title']}")
print(f"ID: {parsed_yaml['id']}")

# Save analytic rule to file
with open(rule_save_path, 'w') as file:
    file.write(analytic_rule_yaml)
print(f"✓ Analytic rule saved to {rule_save_path}")

✓ Valid YAML

Generated YAML:

Title: Security Incident Analysis
ID: 7a2c23b7-5c6b-4bc1-ac14-0827f5b6d1fc
✓ Analytic rule saved to queries\analysis\xdr\security_incident_analysis.yaml


In [17]:
from utils.config_loader import load_config

# Set investigation file path
investigation_file_path = os.path.join("investigations", "rtbt")
# Set investigation config file path
investigation_config_path = os.path.join(investigation_file_path, "config.yaml")

# Print config and kql file paths
print(f"Investigation config file path: {investigation_config_path}")

# Test to ensure config file and KQL query file exist
assert os.path.exists(investigation_config_path), f"Config file not found at {investigation_config_path}"

# Read in config file
config = load_config(str(investigation_config_path))

# Print devicename from config
print(f"Device Name from config: {config['device_name']}")

# Print username from config
print(f"Username from config: {config['user_name']}")

# Print start_time from config
print(f"Start Time from config: {config['start_time']}")

# Print end_time from config
print(f"End Time from config: {config['end_time']}")


Investigation config file path: investigations\rtbt\config.yaml
Device Name from config: RHIAVD-EISP-8
Username from config: alekoz
Start Time from config: 2025-11-17T14:00:00Z
End Time from config: 2025-11-17T23:00:00Z


In [18]:
# Render the KQL query with parameters from config
from utils.query_template import render_kql_file

# Read in and render KQL file
rendered_query = render_kql_file(str(rule_save_path), config)

print(rendered_query)

SecurityIncident
| where TimeGenerated between (2025-11-17T14:00:00Z .. 2025-11-17T23:00:00Z)
| where * contains "RHIAVD-EISP-8" or * contains "alekoz"



In [19]:
# Test it
# Render the KQL query with parameters from config
from utils.kql_query import execute_kql_query

# Read in and render KQL file
df = execute_kql_query(
    client=client,
    workspace_id=workspace_id,
    kql_query=rendered_query
)
    
df.head()

RuntimeError: Failed to execute KQL query: (BadArgumentError) The request had some invalid properties
Code: BadArgumentError
Message: The request had some invalid properties
Inner error: {
    "code": "SyntaxError",
    "message": "A recognition error occurred in the query.",
    "innererror": {
        "code": "SYN0002",
        "message": "Query could not be parsed at ':' on line [2,48]",
        "line": 2,
        "pos": 48,
        "token": ":"
    }
}