# Log Analysis with DuckDB
This notebook demonstrates how to use DuckDB to analyze audit and signin logs stored in Parquet files. We will:
1. Load the Parquet files into DuckDB tables.
2. Inspect the schema and contents of these tables.
3. Merge these tables to create an `events` table.
4. Create a `sessions` table by aggregating the `events` table.
5. Showcase the results.

In [None]:
# Import necessary libraries
import duckdb
import os
import pandas as pd

## Step 1: Load Parquet Files into DuckDB Tables

In [None]:
# Directory where Parquet files are stored
auditlogs_dir = "./parquet_data/insights-logs-auditlogs"
signinlogs_dir = "./parquet_data/insights-logs-signinlogs"

# Initialize DuckDB connection
con = duckdb.connect("azure_logs.db")

### Load data from Parquet files

In [None]:
auditlogs_combined_file = os.path.join(auditlogs_dir, "combined_table.parquet")
signinlogs_combined_file = os.path.join(signinlogs_dir, "combined_table.parquet")

# Drop existing tables if they exist
con.execute("DROP TABLE IF EXISTS combined_table_auditlogs")
con.execute("DROP TABLE IF EXISTS combined_table_signinlogs")
con.execute("DROP TABLE IF EXISTS events")
con.execute("DROP TABLE IF EXISTS sessions")

# Create tables from Parquet files
con.execute(f"CREATE TABLE combined_table_auditlogs AS SELECT * FROM read_parquet('{auditlogs_combined_file}')")
con.execute(f"CREATE TABLE combined_table_signinlogs AS SELECT * FROM read_parquet('{signinlogs_combined_file}')")

## Step 2: Inspect the Tables
Let's inspect the schema and first few rows of each table.

In [None]:
# Describe the schema of the audit logs table
auditlogs_schema_df = con.execute("DESCRIBE SELECT * FROM combined_table_auditlogs").df()
auditlogs_schema_df

In [None]:
# Display the first 10 rows of the audit logs table
auditlogs_sample_df = con.execute("SELECT * FROM combined_table_auditlogs LIMIT 10").df()
auditlogs_sample_df

In [None]:
# Describe the schema of the signin logs table
signinlogs_schema_df = con.execute("DESCRIBE SELECT * FROM combined_table_signinlogs").df()
signinlogs_schema_df

In [None]:
# Display the first 10 rows of the signin logs table
signinlogs_sample_df = con.execute("SELECT * FROM combined_table_signinlogs LIMIT 10").df()
signinlogs_sample_df

## Step 3: Merge Tables to Create `events` Table
In this step, we will merge the `combined_table_auditlogs` and `combined_table_signinlogs` tables into a single `events` table. We will rename `correlationId` to `exfSessionId` and select the relevant columns from both tables. We will use the `UNION ALL` operator to combine rows from both `combined_table_auditlogs` and `combined_table_signinlogs` into the `events` table. The `DISTINCT` keyword ensures that duplicate rows are removed.

In [None]:
# Drop existing events table if it exists
con.execute("DROP TABLE IF EXISTS events")

# Query to merge the tables
query = '''
CREATE TABLE events AS
SELECT DISTINCT
    "time", 
    "operationName", 
    "category",  
    "correlationId" AS "exfSessionId", 
    "callerIpAddress" AS ipAddress, 
    "properties.initiatedBy.user.id" AS "userId", 
    "properties.initiatedBy.user.userPrincipalName" AS "userPrincipalName", 
    "properties.targetResources.displayName" AS "appDisplayName"
FROM combined_table_auditlogs
UNION ALL
SELECT 
    "time", 
    "operationName", 
    "category", 
    "correlationId" AS "exfSessionId", 
    "callerIpAddress" AS ipAddress, 
    "properties.userId" AS "userId", 
    "properties.userPrincipalName" AS "userPrincipalName", 
    "properties.appDisplayName" AS "appDisplayName"
FROM combined_table_signinlogs;
'''

# Execute the query to create the events table
con.execute(query)

# Verify the results
events_df = con.execute("SELECT * FROM events").df()

# Save to CSV
events_df.to_csv("events.csv", index=False)

events_df

## Step 4: Create `sessions` Table by Aggregating the `events` Table
In this step, we will create a `sessions` table by aggregating the `events` table based on `exfSessionId`. For each session `exfSessionId`, we will calculate the earliest (`session_start`) and latest (`session_end`) timestamps. We will also determine the most common `userId` and `userPrincipalName` for each session using subqueries with the `GROUP BY` and `ORDER BY` clauses.

In [None]:
# Drop existing sessions table if it exists
con.execute("DROP TABLE IF EXISTS sessions")

# Query to create the sessions table
query3 = '''
CREATE TABLE sessions AS
SELECT
    exfSessionId,
    MIN(CAST("time" AS TIMESTAMP)) AS session_start,
    MAX(CAST("time" AS TIMESTAMP)) AS session_end,
    -- Get the most common values for userId and userPrincipalName
    (SELECT userId FROM events e2 WHERE e1.exfSessionId = e2.exfSessionId GROUP BY userId ORDER BY COUNT(*) DESC LIMIT 1) AS userId,
    (SELECT userPrincipalName FROM events e2 WHERE e1.exfSessionId = e2.exfSessionId GROUP BY userPrincipalName ORDER BY COUNT(*) DESC LIMIT 1) AS userPrincipalName
FROM
    events e1
GROUP BY
    exfSessionId;
'''

con.execute(query3)

# Verify the results
sessions_df = con.execute("SELECT * FROM sessions").df()
# Save to CSV
sessions_df.to_csv("sessions.csv", index=False)
sessions_df

## Step 5: Verify the Created Tables
Let's verify the contents of the `events` and `sessions` tables by checking their schemas and a few sample rows.

In [None]:
# Verify the schema of the events table
events_schema_df = con.execute("DESCRIBE SELECT * FROM events").df()
events_schema_df

In [None]:
# Display the first 10 rows of the events table
events_sample_df = con.execute("SELECT * FROM events LIMIT 10").df()
events_sample_df

In [None]:
# Verify the schema of the sessions table
sessions_schema_df = con.execute("DESCRIBE SELECT * FROM sessions").df()
sessions_schema_df

In [None]:
# Display the first 10 rows of the sessions table
sessions_sample_df = con.execute("SELECT * FROM sessions LIMIT 10").df()
sessions_sample_df

## Step 6: Close the DuckDB Connection
Finally, we close the DuckDB connection.

In [None]:
# Close the DuckDB connection
con.close()