In [None]:
%md
* Project: Denodo Query Logs
* Author: Ullas Vashista
* Last Update: 11/09/2025

In [None]:
from datetime import datetime, timedelta
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql.utils import AnalysisException
import re
import configparser

In [None]:
# ------------------- CONFIG -------------------
# Create parser and read config file
config = configparser.ConfigParser()
config.read("./config/config.ini")
 
# Read values
# Storage - Source
source_storage_account_name = config.get("SourceStorage", "account_name")
source_container_name       = config.get("SourceStorage", "container_name")
source_mount_name           = config.get("SourceStorage", "mount_name")
 
# Storage - Target
logm_storage_account_name = config.get("TargetStorage", "account_name")
logm_container_name       = config.get("TargetStorage", "container_name")
logm_mount_name           = config.get("TargetStorage", "mount_name")

# Key vault scope name
scope_name = config.get("KeyVaultScope", "scope_name")

In [None]:
# Secrets
client_id     = dbutils.secrets.get(scope=scope_name, key="adls-client-id")
client_secret = dbutils.secrets.get(scope=scope_name, key="adls-client-secret")
tenant_id     = dbutils.secrets.get(scope=scope_name, key="tenant-id")

In [None]:
# Base paths
source_path = f"abfss://{source_container_name}@{source_storage_account_name}.dfs.core.windows.net/{source_mount_name}"
delta_table_path = f"abfss://{logm_container_name}@{logm_storage_account_name}.dfs.core.windows.net/{logm_mount_name}/delta/denodo_query_logs"
checkpoint_table = f"abfss://{logm_container_name}@{logm_storage_account_name}.dfs.core.windows.net/{logm_mount_name}/delta/logm_denodo_query_checkpoint"
 
# File structures
directories = ["AVDP1", "AVDP2"]
file_prefixes = {
    "AVDP1": "dev_azure_vdp1-queries.log.",
    "AVDP2": "dev_azure_vdp2-queries.log."
}
 
# Optional manual override
start_date_str = "09102025"  # Only used if no checkpoint exists
default_start_date = datetime.strptime(start_date_str, "%m%d%Y")
end_date = datetime.today()

In [None]:
# ------------------- FUNCTIONS -------------------
 
# Spark Configuration for ADLS Mounting
def configure_spark_for_adls_oauth(storage_account_name, client_id, client_secret, tenant_id):
    spark.conf.set(f"fs.azure.account.auth.type.{storage_account_name}.dfs.core.windows.net", "OAuth")
    spark.conf.set(f"fs.azure.account.oauth.provider.type.{storage_account_name}.dfs.core.windows.net",
                   "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider")
    spark.conf.set(f"fs.azure.account.oauth2.client.id.{storage_account_name}.dfs.core.windows.net", client_id)
    spark.conf.set(f"fs.azure.account.oauth2.client.secret.{storage_account_name}.dfs.core.windows.net", client_secret)
    spark.conf.set(f"fs.azure.account.oauth2.client.endpoint.{storage_account_name}.dfs.core.windows.net",
                   f"https://login.microsoftonline.com/{tenant_id}/oauth2/token")
 
#
def extract_date_from_filename(filename):
    match = re.search(r"\.(\d{8})$", filename)
    return datetime.strptime(match.group(1), "%m%d%Y").date() if match else None
 
#To Get File Paths to read from
def list_latest_file_paths(last_file_date, start_date, end_date, source_path, directories, file_prefixes):
    file_paths = []
 
    # Determine starting date
    current_date1 = start_date if last_file_date is None else last_file_date
    current_date = datetime.strptime(current_date1, "%m%d%Y")
 
    # Generate file paths from current_date to end_date
    while current_date <= end_date:
        date_str = current_date.strftime("%m%d%Y")
        for dir_name in directories:
            file_name = file_prefixes.get(dir_name, "") + date_str
            full_path = f"{source_path}/{dir_name}/{file_name}"
            file_paths.append(full_path)
        current_date += timedelta(days=1)
 
    return file_paths
 
# Look up for checkpoints
def get_checkpoint():
    try:
        df = spark.read.format("delta").load(checkpoint_table)
        df = df.withColumn("reversed_parsed_date",to_date(col("file_date"), "yyyyMMdd"))
        df = df.withColumn("date_MMddyyyy",date_format(col("reversed_parsed_date"), "MMddyyyy"))
        row = df.orderBy(col("file_date").desc()).first()
       
        return row["date_MMddyyyy"], row["last_timegenerated"]
    except AnalysisException:
        return None, None
 
#Load latest Metadata to Checkpoints
def update_checkpoint(file_date, last_timegenerated):
    df = spark.createDataFrame([(file_date, last_timegenerated)], ["file_date", "last_timegenerated"])
    df.write.format("delta").mode("overwrite").save(checkpoint_table)

In [None]:
%md
# ------------------- MAIN -------------------

In [None]:
#Source ADLS config
configure_spark_for_adls_oauth(source_storage_account_name, client_id, client_secret, tenant_id)

In [None]:
# Step 1: Check last processed file and TimeGenerated
last_file_date, last_timegenerated = get_checkpoint()

In [None]:
# Step 2: Get all available log files and latest file
paths_to_read = list_latest_file_paths(last_file_date, start_date_str, end_date, source_path, directories, file_prefixes)
if not paths_to_read:
    raise Exception("No log files found in source.")

In [None]:
# Step 3: Read raw data
raw_df = spark.read.option("mode", "PERMISSIVE").json(paths_to_read)
 
if raw_df.rdd.isEmpty():
    print("No data found.")
    dbutils.notebook.exit("EMPTY")
 
# Add source_dir
raw_df = raw_df.withColumn("source_dir", regexp_extract(input_file_name(), r"/(AVDP[123])/", 1))
raw_df = raw_df.withColumn("TimeGenerated", col("@timestamp").cast(TimestampType()))
raw_df = raw_df.withColumn("input_file_path", input_file_name())

In [None]:
# Step 4: Filter based on last processed TimeGenerated
if last_timegenerated:
    raw_df = raw_df.filter(col("TimeGenerated") > last_timegenerated)
 
if raw_df.rdd.isEmpty():
    print("No new TimeGenerated records.")
    dbutils.notebook.exit("EMPTY")

In [None]:
# Step 5: Split log and extract columns
split_df = raw_df.withColumn("log_split", split("log", "\t"))
final_df = split_df.select(
    col("TimeGenerated"),
    col("log_split")[0].alias("ServerName"),
    col("log_split")[1].alias("Host"),
    col("log_split")[2].alias("Port"),
    col("log_split")[3].alias("Id_"),
    col("log_split")[4].alias("UserName"),
    col("log_split")[5].alias("DatabaseName"),
    col("log_split")[6].alias("NotificationType"),
    col("log_split")[7].alias("SessionId"),
    col("log_split")[8].alias("StartTime"),
    col("log_split")[9].alias("EndTime"),
    col("log_split")[10].alias("Duration"),
    col("log_split")[11].alias("WaitingTime"),
    col("log_split")[12].alias("NumRows"),
    col("log_split")[13].alias("State"),
    col("log_split")[14].alias("Completed"),
    col("log_split")[15].alias("Cache"),
    col("log_split")[16].alias("Query"),
    col("log_split")[17].alias("RequestType"),
    col("log_split")[18].alias("Element"),
    col("log_split")[19].alias("UserAgent"),
    col("log_split")[20].alias("AccessInterface"),
    col("log_split")[21].alias("ClientIP"),
    col("log_split")[22].alias("TransactionId"),
    col("log_split")[23].alias("WebServiceName"),
    col("input_file_path")
)

In [None]:
# Step 6: Add Metadata Columns
final_df = (
    final_df
    .withColumn("body", to_json(struct(*final_df.columns)))
    .withColumn("MetadataLogId", sha2(col("body"), 512).cast("string"))
    .drop("body")
    .withColumn("MetadataLogType", lit("logm.denodo.queries"))
    .withColumn("MetadataLogTimeGenerated", col("TimeGenerated").cast(TimestampType()))
    .withColumn("MetadataLogDate", date_format(col("TimeGenerated"), "yyyyMMddHHmmssSSS").cast(LongType()))
    .drop("TimeGenerated")
    .withColumn("TimeGenerated", current_timestamp().cast(StringType()))
    .withColumn("MetadataLogWindow", date_format(from_utc_timestamp(col("MetadataLogTimeGenerated"), "UTC"), "yyMMddHHmm").cast(LongType()))
    .withColumn("MetadataLogGuid", concat(col("MetadataLogWindow"), lpad(monotonically_increasing_id(), 9, "0")).cast(LongType()))
    .drop("MetadataLogWindow")
    .withColumn("MetadataLogFileName", col("input_file_path"))
    .drop("input_file_path")
    .withColumn("TenantId", lit(tenant_id))
    .withColumn("Type", lit("LOGM_DENODO_QUERIES_CL"))
)

In [None]:
# Step 7: Write to Delta
configure_spark_for_adls_oauth(logm_storage_account_name, client_id, client_secret, tenant_id)
final_df.write.format("delta").mode("append").save(delta_table_path)

In [None]:
# Step 9: Update checkpoint
last_timegenerated = final_df.agg({"MetadataLogTimeGenerated": "max"}).collect()[0][0]
 
input_file_date = final_df.withColumn("input_file_date",regexp_extract(col("MetadataLogFileName"), r"\.(\d{8})$", 1))
input_file_date = input_file_date.withColumn("parsed_date",to_date(col("input_file_date"), "MMddyyyy"))
input_file_date = input_file_date.withColumn("date_yyyymmdd",date_format(col("parsed_date"), "yyyyMMdd"))
file_date = input_file_date.agg({"date_yyyymmdd": "max"}).collect()[0][0]
 
update_checkpoint(file_date, last_timegenerated)