# User Risk Score Sample

This sample notebook demonstrates how to retrieve data from Microsoft Sentinel Data lake, calculate a security risk score based on the data, and store it back to a custom lake table. We calculates a **100-point user risk score** for each user by analyzing authentication patterns, application access, administrative activities, and security incidents across 4 data tables (SigninLogs, EntraUsers, AuditLogs, SecurityAlert).

## References

* [Available workspace tables](https://learn.microsoft.com/en-us/azure/azure-monitor/reference/tables-index)
* [Available system tables](https://learn.microsoft.com/en-us/azure/sentinel/datalake/enable-data-connectors)
* [Microsoft Sentinel Provider class](https://learn.microsoft.com/en-us/azure/sentinel/datalake/sentinel-provider-class-reference)
* [Notebook examples](https://learn.microsoft.com/en-us/azure/sentinel/datalake/notebook-examples)

## Risk Categories (100 points)

- **Sign-in Behavior (30 pts)** - IP/device diversity, frequency vs. baseline
- **Application Access (25 pts)** - Apps and resources accessed
- **Privileged Activity (20 pts)** - Admin operations, high-risk actions
- **Security Alerts (15 pts)** - Active alerts and severity
- **Geographic (5 pts)** - Location patterns
- **Temporal (5 pts)** - Off-hours activity

**Risk Levels:** Low (0-30), Medium (31-60), High (61-100)

## Key Features

‚úÖ Multi-dimensional analysis combining behavior, privilege, and threat data  
‚úÖ Department-aware baselines for context-sensitive scoring  
‚úÖ Graceful degradation (works with 2-4 tables)  
‚úÖ Production-ready with error handling and metadata  
‚úÖ Outputs to `UserRiskScores_SPRK` table (27 fields)

## Use Cases

- **Insider threats:** High admin activity + alerts + unusual patterns
- **Compromised accounts:** Geographic anomalies + security alerts
- **Privileged users:** Continuous monitoring of admin operations
- **Investigation prioritization:** Quantitative risk-based triage

**Full documentation:** [`USER_RISK_SCORE_DESIGN.md`](USER_RISK_SCORE_DESIGN.md)

## 1. Setup & Configuration

In this section, we'll set up our environment and configure the parameters for our risk scoring analysis.

**What we're doing:**
- Import required PySpark libraries for data processing
- Initialize the Microsoft Sentinel provider to access data lake tables
- Configure analysis parameters (time window, workspace, thresholds)

**Key Configuration Parameters:**
- `ANALYSIS_DAYS`: How far back to look for data (default: 14 days)
- `WORKSPACE_NAME`: Your Sentinel workspace name
- `BUSINESS_HOUR_START/END`: Define normal working hours for temporal risk analysis
- `MIN_SIGNIN_THRESHOLD`: Minimum sign-ins required to calculate a meaningful risk score

**Expected Output:**
You'll see confirmation of the configuration settings that will be used for the analysis.

In [None]:
# Import required libraries
from sentinel_lake.providers import MicrosoftSentinelProvider
from pyspark.sql.functions import (
    col, count, countDistinct, sum, when, hour, lit, expr, 
    current_timestamp, date_sub, avg, collect_list, coalesce,
    from_json, get_json_object
)
from pyspark.sql.window import Window

WORKSPACE_NAME = "<YOUR_WORKSPACE_NAME>"

# Analysis Configuration (can be overridden)
ANALYSIS_DAYS = 14
MIN_SIGNIN_THRESHOLD = 3
BUSINESS_HOUR_START = 6
BUSINESS_HOUR_END = 18

# ============================================================

# Initialize Sentinel provider
sentinel_provider = MicrosoftSentinelProvider(spark)

print("="*60)
print("USER RISK SCORE CONFIGURATION")
print("="*60)
print(f"Analysis Window: {ANALYSIS_DAYS} days")
print(f"Workspace: {WORKSPACE_NAME}")
print(f"Business Hours: {BUSINESS_HOUR_START}:00 - {BUSINESS_HOUR_END}:00")
print(f"Minimum Sign-in Threshold: {MIN_SIGNIN_THRESHOLD} events")
print("="*60)


## 2. Data Loading (4 Tables)

In this section, we load data from **4 different Sentinel tables** to build a comprehensive risk profile. Each table is loaded in a separate cell for independent troubleshooting.

**Table 1: SigninLogs** - User authentication and access patterns
**Table 2: EntraUsers** - User identity and organizational context
**Table 3: AuditLogs** - Administrative and privileged operations
**Table 4: SecurityAlert** - Active security incidents and detections

### 2.1 Load SigninLogs

Load sign-in events for behavioral analysis:
- Filters for Member users only
- Includes IP addresses, devices, applications
- Used for access pattern analysis

In [None]:
print("üìä Loading Table 1: SigninLogs...")
signin_df = (
    sentinel_provider.read_table('SigninLogs', WORKSPACE_NAME)
    .filter(
        (col("UserType") == "Member") & 
        (col("UserId").isNotNull()) &
        (col("TimeGenerated") >= expr(f"current_timestamp() - INTERVAL {ANALYSIS_DAYS} DAYS"))
    )
    .select(
        "UserId", "UserPrincipalName", "UserDisplayName",
        "IPAddress", "UserAgent", "AppId", "ResourceId",
        "TimeGenerated", "Location"
    )
    .persist()
)
print(f"‚úÖ Loaded {signin_df.count()} sign-in events")

### 2.2 Load EntraUsers

Load user profile information:
- Department, country, job title
- Used for enrichment and baseline calculations

In [None]:
print("üìä Loading Table 2: EntraUsers...")
users_df = (
    sentinel_provider.read_table('EntraUsers')
    .filter(
        (col("id").isNotNull()) &
        (col("id") != "")
    )
    .select("id", "displayName", "mail", "department", "country", "jobTitle", "accountEnabled")
    .dropDuplicates(["id"])
    .persist()
)
print(f"‚úÖ Loaded {users_df.count()} user profiles")

### 2.3 Load AuditLogs

Load administrative activity logs:
- Tracks privileged operations
- InitiatedBy field is JSON: `{"user": {"userPrincipalName": "..."}}`
- Uses `get_json_object()` to parse and extract user principal name
- Only includes user-initiated actions (filters out app-initiated)

**Note:** This table may not be available in all environments.

In [None]:
print("üìä Loading Table 3: AuditLogs...")
try:
    # Import here in case this cell is run independently
    from pyspark.sql.functions import col, expr, get_json_object
    
    # InitiatedBy is a JSON string: {"user": {"userPrincipalName": "user@domain.com", ...}}
    audit_df = (
        sentinel_provider.read_table('AuditLogs', WORKSPACE_NAME)
        .filter(
            col("ActivityDateTime") >= expr(f"current_timestamp() - INTERVAL {ANALYSIS_DAYS} DAYS")
        )
        .withColumn(
            "UserPrincipalName",
            get_json_object(col("InitiatedBy"), "$.user.userPrincipalName")
        )
        .select(
            "UserPrincipalName",
            "OperationName",
            "Category",
            "Result",
            "ActivityDateTime"
        )
        .filter(col("UserPrincipalName").isNotNull())
        .persist()
    )
    audit_available = True
    print(f"‚úÖ Loaded {audit_df.count()} audit log entries (user-initiated only)")
except Exception as e:
    print(f"‚ö†Ô∏è  AuditLogs not available: {str(e)}")
    print(f"‚ÑπÔ∏è  Will continue without privileged activity scoring")
    audit_available = False

### 2.4 Load SecurityAlert

Load active security alerts:
- Only unresolved alerts (Status: New or InProgress)
- Provides direct threat indicators
- Used for alert-based risk scoring

**Note:** This table may not be available in all environments.

In [None]:
print("üìä Loading Table 4: SecurityAlert...")
try:
    security_alert_df = (
        sentinel_provider.read_table('SecurityAlert', WORKSPACE_NAME)
        .filter(
            (col("Status").isin(["New", "InProgress"])) &
            (col("TimeGenerated") >= expr(f"current_timestamp() - INTERVAL {ANALYSIS_DAYS} DAYS"))
        )
        .select(
            "AlertName",
            "AlertSeverity",
            "CompromisedEntity",
            "Status",
            "TimeGenerated",
            "Tactics"
        )
        .filter(col("CompromisedEntity").isNotNull())
        .persist()
    )
    alert_available = True
    print(f"‚úÖ Loaded {security_alert_df.count()} active security alerts")
except Exception as e:
    print(f"‚ö†Ô∏è  SecurityAlert not available: {str(e)}")
    print(f"‚ÑπÔ∏è  Will continue without alert scoring")
    alert_available = False

### 2.5 Data Loading Summary

Display which tables were successfully loaded:

In [None]:
print("="*60)
print("Data Loading Summary:")
print(f"  ‚úÖ SigninLogs: Available")
print(f"  ‚úÖ EntraUsers: Available")
print(f"  {'‚úÖ' if audit_available else '‚ùå'} AuditLogs: {'Available' if audit_available else 'Not Available'}")
print(f"  {'‚úÖ' if alert_available else '‚ùå'} SecurityAlert: {'Available' if alert_available else 'Not Available'}")
print("="*60)

## 3. Calculate Base Metrics from SigninLogs

Now we'll aggregate the sign-in data to calculate per-user metrics that will feed into our risk scoring.

**Metrics Calculated:**
- **Sign-in Behavior**: Unique IP count, unique device count, total sign-ins
- **Application Access**: Unique app count, unique resource count
- **Temporal Patterns**: Off-hours sign-in count and percentage

These metrics form the foundation for behavioral risk assessment.

**Expected Output:**
- Count of users with calculated metrics
- Each user will have aggregated statistics from their sign-in activity

In [None]:
print("üîç Calculating sign-in behavior metrics...")

# Aggregate sign-in metrics per user
signin_metrics = (
    signin_df
    .groupBy("UserId", "UserPrincipalName", "UserDisplayName")
    .agg(
        # Sign-in behavior metrics
        countDistinct("IPAddress").alias("unique_ip_count"),
        countDistinct("UserAgent").alias("unique_device_count"),
        count("*").alias("total_signins"),
        
        # Application access metrics
        countDistinct("AppId").alias("unique_app_count"),
        countDistinct("ResourceId").alias("unique_resource_count"),
        
        # Temporal metrics
        sum(
            when(
                (hour("TimeGenerated") < BUSINESS_HOUR_START) | 
                (hour("TimeGenerated") >= BUSINESS_HOUR_END), 
                1
            ).otherwise(0)
        ).alias("offhours_signins")
    )
)

# Calculate off-hours percentage
signin_metrics = signin_metrics.withColumn(
    "offhours_signin_percent",
    (col("offhours_signins") / col("total_signins") * 100).cast("float")
)

print(f"‚úÖ Calculated metrics for {signin_metrics.count()} users")

# Show sample of calculated metrics
print("\nüìä Sample of Calculated Metrics (Top 10 users by sign-in count):")
signin_metrics.orderBy(col("total_signins").desc()).show(10, truncate=False)

## 4. Calculate Privileged Activity Metrics (AuditLogs)

In this section, we analyze **administrative operations** from AuditLogs to identify privileged activity patterns.

**What we're doing:**
- Define a list of high-risk operations (role changes, user deletions, policy updates)
- Aggregate audit metrics per user: total admin operations, high-risk operation count, unique operations
- Handle gracefully when AuditLogs aren't available (create empty DataFrame)

**Risk Considerations:**
- Users with many admin operations may have elevated access
- Certain operations (e.g., "Add member to role") are inherently higher risk
- Unusual volume of administrative activity can indicate compromise or insider threat

**Expected Output:**
- Count of users who performed administrative operations
- Empty metrics (all zeros) if AuditLogs unavailable

In [None]:
if audit_available:
    print("üîç Calculating privileged activity metrics...")
    
    # Define high-risk operations
    high_risk_ops = [
        "Add member to role",
        "Remove member from role",
        "Delete user",
        "Update application",
        "Add owner to application",
        "Update policy",
        "Delete application"
    ]
    
    # Aggregate audit metrics per user
    audit_metrics = (
        audit_df
        .groupBy("UserPrincipalName")
        .agg(
            count("*").alias("total_admin_operations"),
            sum(
                when(col("OperationName").isin(high_risk_ops), 1)
                .otherwise(0)
            ).alias("high_risk_operations"),
            countDistinct("OperationName").alias("unique_operations"),
            collect_list("OperationName").alias("operation_list")
        )
    )
    
    print(f"‚úÖ Calculated audit metrics for {audit_metrics.count()} users with admin activity")
    
    # Show sample
    print("\nüìä Top 10 Users by Administrative Operations:")
    audit_metrics.orderBy(col("total_admin_operations").desc()).show(10, truncate=False)
    
else:
    print("‚ö†Ô∏è  Skipping privileged activity metrics (AuditLogs not available)")
    # Create empty DataFrame with expected schema for downstream joins
    audit_metrics = spark.createDataFrame(
        [], 
        "UserPrincipalName string, total_admin_operations int, high_risk_operations int, unique_operations int"
    )
    print("‚ÑπÔ∏è  Empty audit_metrics DataFrame created for compatibility")

## 5. Calculate Security Alert Metrics

In this section, we analyze **active security alerts** from SecurityAlert table to identify users with current security concerns.

**What we're doing:**
- Aggregate alert data per compromised entity (user)
- Count total active alerts for each user
- Calculate weighted severity score (High=4, Medium=2, Low=1)
- Collect alert types for context
- Handle gracefully when SecurityAlert isn't available

**Risk Considerations:**
- Active alerts are the strongest indicator of current compromise
- High-severity alerts warrant immediate attention
- Multiple alerts on same user suggest persistent threat
- Alert types (Tactics) provide insight into attack patterns

**Expected Output:**
- Count of users with active security alerts
- Empty metrics (all zeros) if SecurityAlert unavailable

In [None]:
if alert_available:
    print("üîç Calculating security alert metrics...")
    
    # Aggregate alert metrics per compromised entity (user)
    alert_metrics = (
        security_alert_df
        .groupBy("CompromisedEntity")
        .agg(
            count("*").alias("active_alert_count"),
            sum(
                when(col("AlertSeverity") == "High", 4)
                .when(col("AlertSeverity") == "Medium", 2)
                .when(col("AlertSeverity") == "Low", 1)
                .otherwise(0)
            ).alias("alert_severity_score"),
            collect_list("AlertName").alias("alert_types")
        )
    )
    
    print(f"‚úÖ Calculated alert metrics for {alert_metrics.count()} users with active alerts")
    
    # Show sample
    print("\nüö® Top 10 Users by Alert Severity Score:")
    alert_metrics.orderBy(col("alert_severity_score").desc()).show(10, truncate=False)
    
else:
    print("‚ö†Ô∏è  Skipping security alert metrics (SecurityAlert not available)")
    # Create empty DataFrame with expected schema for downstream joins
    alert_metrics = spark.createDataFrame(
        [],
        "CompromisedEntity string, active_alert_count int, alert_severity_score int"
    )
    print("‚ÑπÔ∏è  Empty alert_metrics DataFrame created for compatibility")

## 6. Join All Metrics

Now we combine data from all 4 tables into a single unified view for each user.

**What we're doing:**
- Start with signin_metrics as the base (all users with sign-ins)
- LEFT JOIN EntraUsers for organizational context (department, country, job title)
- LEFT JOIN audit_metrics for admin activity (many users will have nulls ‚Üí fill with 0)
- LEFT JOIN alert_metrics for security alerts (many users will have nulls ‚Üí fill with 0)

**Join Strategy:**
- Use LEFT joins to retain all users who signed in
- Fill nulls for users without admin activity or alerts
- Result: One row per user with all metrics combined

**Expected Output:**
- Count of users in combined dataset
- All users from sign-ins with enriched data from other tables

In [None]:
print("üîó Joining metrics from all tables...")

# Start with sign-in metrics as base
combined_metrics = signin_metrics

# Join user profile data (EntraUsers)
combined_metrics = (
    combined_metrics
    .join(users_df, combined_metrics.UserId == users_df.id, "left")
    .select(
        col("UserId"),
        col("UserPrincipalName"),
        col("UserDisplayName"),
        col("unique_ip_count"),
        col("unique_device_count"),
        col("total_signins"),
        col("unique_app_count"),
        col("unique_resource_count"),
        col("offhours_signins"),
        col("offhours_signin_percent"),
        col("department"),
        col("country"),
        col("jobTitle")
    )
)

print(f"  ‚úÖ Joined EntraUsers data")

# Join audit metrics (if available)
if audit_available:
    combined_metrics = (
        combined_metrics
        .join(
            audit_metrics,
            combined_metrics.UserPrincipalName == audit_metrics.UserPrincipalName,
            "left"
        )
        .drop(audit_metrics.UserPrincipalName)
    )
    # Fill nulls for users with no admin activity
    combined_metrics = combined_metrics.fillna(
        {"total_admin_operations": 0, "high_risk_operations": 0, "unique_operations": 0}
    )
    print(f"  ‚úÖ Joined AuditLogs data (nulls filled with 0)")
else:
    # Add columns with 0 values
    combined_metrics = combined_metrics.withColumn("total_admin_operations", lit(0))
    combined_metrics = combined_metrics.withColumn("high_risk_operations", lit(0))
    combined_metrics = combined_metrics.withColumn("unique_operations", lit(0))
    print(f"  ‚ÑπÔ∏è  AuditLogs not available - added zero columns")

# Join alert metrics (if available)
if alert_available:
    combined_metrics = (
        combined_metrics
        .join(
            alert_metrics,
            combined_metrics.UserPrincipalName == alert_metrics.CompromisedEntity,
            "left"
        )
        .drop(alert_metrics.CompromisedEntity)
    )
    # Fill nulls for users with no alerts
    combined_metrics = combined_metrics.fillna(
        {"active_alert_count": 0, "alert_severity_score": 0}
    )
    print(f"  ‚úÖ Joined SecurityAlert data (nulls filled with 0)")
else:
    # Add columns with 0 values
    combined_metrics = combined_metrics.withColumn("active_alert_count", lit(0))
    combined_metrics = combined_metrics.withColumn("alert_severity_score", lit(0))
    print(f"  ‚ÑπÔ∏è  SecurityAlert not available - added zero columns")

print(f"\n‚úÖ Combined metrics ready for {combined_metrics.count()} users")

# Show sample of combined data
print("\nüìä Sample Combined Metrics (First 5 users):")
combined_metrics.select(
    "UserPrincipalName", "department", "total_signins", 
    "unique_ip_count", "total_admin_operations", "active_alert_count"
).show(5, truncate=False)

## 7. Calculate Department Baselines

To detect **frequency anomalies**, we need to establish what "normal" sign-in activity looks like for each department.

**What we're doing:**
- Calculate average sign-in frequency per department
- Calculate global average as fallback for users without department
- Join baseline back to user data for comparison

**Why This Matters:**
- Engineering teams may have different normal patterns than Sales
- Allows context-aware risk scoring (high activity for Sales may be normal, but unusual for HR)
- Users significantly above their department baseline get higher frequency risk scores

**Expected Output:**
- Global average sign-in count
- Each user gets a baseline_signins value (department avg or global avg)

In [None]:
print("üìä Calculating department baselines...")

# Calculate department-level average sign-in frequency
dept_baselines = (
    combined_metrics
    .filter(col("department").isNotNull())
    .groupBy("department")
    .agg(avg("total_signins").alias("dept_avg_signins"))
)

# Calculate global average as fallback
global_avg = combined_metrics.agg(
    avg("total_signins").alias("global_avg_signins")
).collect()[0]["global_avg_signins"]

print(f"  ‚ÑπÔ∏è  Global average sign-ins: {global_avg:.1f}")

# Join baselines to user data - drop any existing baseline columns first
if "baseline_signins" in combined_metrics.columns:
    combined_metrics = combined_metrics.drop("baseline_signins")
if "dept_avg_signins" in combined_metrics.columns:
    combined_metrics = combined_metrics.drop("dept_avg_signins")

combined_metrics = (
    combined_metrics
    .join(
        dept_baselines,
        combined_metrics.department == dept_baselines.department,
        "left"
    )
    .withColumn(
        "baseline_signins",
        coalesce(dept_baselines["dept_avg_signins"], lit(global_avg))
    )
    .select(
        combined_metrics["*"],  # All original columns
        col("baseline_signins")  # New baseline column
    )
)

print(f"\n‚úÖ Baselines calculated and joined")

# Show sample with baselines
print("\nüìä Sample with Department Baselines (First 10 users):")
combined_metrics.select(
    "UserPrincipalName", "department", "total_signins", "baseline_signins"
).show(10, truncate=False)

## 8. Calculate Individual Risk Scores

Now we'll calculate **9 individual risk scores** based on the metrics we've collected. Each risk factor is scored on its own scale, then combined into category subscores.

**Risk Categories & Scoring:**

1. **Sign-in Behavior (30 points total)**
   - IP Risk (0-10): Based on unique IP count
   - Device Risk (0-10): Based on unique device count
   - Frequency Risk (0-10): Based on comparison to department baseline

2. **Application Access (25 points total)**
   - App Risk (0-12): Based on unique application count
   - Resource Risk (0-13): Based on unique resource count

3. **Privileged Activity (20 points total)** - if AuditLogs available
   - Admin Operations Risk (0-10): Total admin operations volume
   - High-Risk Operations (0-10): Count of sensitive operations

4. **Security Alerts (15 points total)** - if SecurityAlert available
   - Alert Count (0-8): Number of active alerts
   - Alert Severity (0-7): Weighted severity score

5. **Geographic Risk (5 points)**: Based on IP diversity

6. **Temporal Risk (5 points)**: Based on off-hours activity percentage

**Total Possible: 100 points**

**Expected Output:**
- Each user gets 9+ risk factor scores
- All scores ready for combination into total risk score

In [None]:
print("üéØ Calculating individual risk scores...")

# Import here in case this cell runs independently
from pyspark.sql.functions import col, when, lit

# Start with combined_metrics
risk_scores = combined_metrics

# ========================================
# Sign-in Behavior Risk (30 points)
# ========================================

# IP Risk (0-10)
risk_scores = risk_scores.withColumn("ip_risk_score",
    when(col("unique_ip_count") <= 2, 0)
    .when(col("unique_ip_count") <= 5, 3)
    .when(col("unique_ip_count") <= 10, 7)
    .otherwise(10)
)

# Device Risk (0-10)
risk_scores = risk_scores.withColumn("device_risk_score",
    when(col("unique_device_count") <= 2, 0)
    .when(col("unique_device_count") <= 4, 3)
    .when(col("unique_device_count") <= 7, 6)
    .otherwise(10)
)

# Frequency Risk (0-10) - Compare to baseline
risk_scores = risk_scores.withColumn("frequency_ratio", 
    col("total_signins") / col("baseline_signins")
)
risk_scores = risk_scores.withColumn("frequency_risk_score",
    when(col("frequency_ratio") < 1.0, 0)
    .when(col("frequency_ratio") < 2.0, 3)
    .when(col("frequency_ratio") < 3.0, 6)
    .otherwise(10)
)

print("  ‚úÖ Sign-in behavior risk calculated (30 points)")

# ========================================
# Application Access Risk (25 points)
# ========================================

# App Risk (0-12)
risk_scores = risk_scores.withColumn("app_risk_score",
    when(col("unique_app_count") <= 5, 0)
    .when(col("unique_app_count") <= 10, 4)
    .when(col("unique_app_count") <= 15, 8)
    .otherwise(12)
)

# Resource Risk (0-13)
risk_scores = risk_scores.withColumn("resource_risk_score",
    when(col("unique_resource_count") <= 3, 0)
    .when(col("unique_resource_count") <= 6, 4)
    .when(col("unique_resource_count") <= 10, 8)
    .otherwise(13)
)

print("  ‚úÖ Application access risk calculated (25 points)")

# ========================================
# Privileged Activity Risk (20 points)
# ========================================

if audit_available:
    # Admin Operations Risk (0-10)
    risk_scores = risk_scores.withColumn("admin_ops_risk_score",
        when(col("total_admin_operations") == 0, 0)
        .when(col("total_admin_operations") <= 5, 3)
        .when(col("total_admin_operations") <= 15, 7)
        .otherwise(10)
    )
    
    # High-Risk Operations (0-10)
    risk_scores = risk_scores.withColumn("high_risk_ops_score",
        when(col("high_risk_operations") == 0, 0)
        .when(col("high_risk_operations") <= 2, 4)
        .when(col("high_risk_operations") <= 5, 7)
        .otherwise(10)
    )
    print("  ‚úÖ Privileged activity risk calculated (20 points)")
else:
    # No audit data - set to 0
    risk_scores = risk_scores.withColumn("admin_ops_risk_score", lit(0))
    risk_scores = risk_scores.withColumn("high_risk_ops_score", lit(0))
    print("  ‚ÑπÔ∏è  Privileged activity risk set to 0 (AuditLogs not available)")

# ========================================
# Security Alert Risk (15 points)
# ========================================

if alert_available:
    # Active Alert Count (0-8)
    risk_scores = risk_scores.withColumn("alert_count_score",
        when(col("active_alert_count") == 0, 0)
        .when(col("active_alert_count") == 1, 3)
        .when(col("active_alert_count") <= 3, 6)
        .otherwise(8)
    )
    
    # Alert Severity (0-7)
    risk_scores = risk_scores.withColumn("alert_severity_risk_score",
        when(col("alert_severity_score") == 0, 0)
        .when(col("alert_severity_score") <= 3, 2)
        .when(col("alert_severity_score") <= 7, 5)
        .otherwise(7)
    )
    print("  ‚úÖ Security alert risk calculated (15 points)")
else:
    # No alert data - set to 0
    risk_scores = risk_scores.withColumn("alert_count_score", lit(0))
    risk_scores = risk_scores.withColumn("alert_severity_risk_score", lit(0))
    print("  ‚ÑπÔ∏è  Security alert risk set to 0 (SecurityAlert not available)")

# ========================================
# Geographic Risk (5 points)
# ========================================

# Simplified based on IP diversity
risk_scores = risk_scores.withColumn("geographic_risk_score",
    when(col("unique_ip_count") <= 3, 0)
    .when(col("unique_ip_count") <= 6, 2)
    .when(col("unique_ip_count") <= 10, 4)
    .otherwise(5)
)

print("  ‚úÖ Geographic risk calculated (5 points)")

# ========================================
# Temporal Risk (5 points)
# ========================================

risk_scores = risk_scores.withColumn("temporal_risk_score",
    when(col("offhours_signin_percent") <= 10, 0)
    .when(col("offhours_signin_percent") <= 25, 2)
    .when(col("offhours_signin_percent") <= 50, 4)
    .otherwise(5)
)

print("  ‚úÖ Temporal risk calculated (5 points)")

print("\n‚úÖ All individual risk factors calculated")

# Show sample of risk scores
print("\nüìä Sample Risk Scores (First 5 users):")
risk_scores.select(
    "UserPrincipalName",
    "ip_risk_score",
    "device_risk_score",
    "frequency_risk_score",
    "app_risk_score",
    "resource_risk_score",
    "admin_ops_risk_score",
    "high_risk_ops_score",
    "alert_count_score",
    "alert_severity_risk_score",
    "geographic_risk_score",
    "temporal_risk_score"
).show(5, truncate=False)

## 8. Calculate Individual Risk Scores

Now we'll calculate **9 individual risk scores** based on the metrics we've collected. Each risk factor is scored on its own scale, then combined into category subscores.

**Risk Categories & Scoring:**

1. **Sign-in Behavior (30 points total)**
   - IP Risk (0-10): Based on unique IP count
   - Device Risk (0-10): Based on unique device count
   - Frequency Risk (0-10): Based on comparison to department baseline

2. **Application Access (25 points total)**
   - App Risk (0-12): Based on unique application count
   - Resource Risk (0-13): Based on unique resource count

3. **Privileged Activity (20 points total)** - if AuditLogs available
   - Admin Operations Risk (0-10): Total admin operations volume
   - High-Risk Operations (0-10): Count of sensitive operations

4. **Security Alerts (15 points total)** - if SecurityAlert available
   - Alert Count (0-8): Number of active alerts
   - Alert Severity (0-7): Weighted severity score

5. **Geographic Risk (5 points)**: Based on IP diversity

6. **Temporal Risk (5 points)**: Based on off-hours activity percentage

**Total Possible: 100 points**

**Expected Output:**
- Each user gets 9+ risk factor scores
- All scores ready for combination into total risk score

In [None]:
print("üéØ Calculating individual risk scores...")

# Import here in case this cell runs independently
from pyspark.sql.functions import col, when, lit

# Start with combined_metrics
risk_scores = combined_metrics

# ========================================
# Sign-in Behavior Risk (30 points)
# ========================================

# IP Risk (0-10)
risk_scores = risk_scores.withColumn("ip_risk_score",
    when(col("unique_ip_count") <= 2, 0)
    .when(col("unique_ip_count") <= 5, 3)
    .when(col("unique_ip_count") <= 10, 7)
    .otherwise(10)
)

# Device Risk (0-10)
risk_scores = risk_scores.withColumn("device_risk_score",
    when(col("unique_device_count") <= 2, 0)
    .when(col("unique_device_count") <= 4, 3)
    .when(col("unique_device_count") <= 7, 6)
    .otherwise(10)
)

# Frequency Risk (0-10) - Compare to baseline
risk_scores = risk_scores.withColumn("frequency_ratio", 
    col("total_signins") / col("baseline_signins")
)
risk_scores = risk_scores.withColumn("frequency_risk_score",
    when(col("frequency_ratio") < 1.0, 0)
    .when(col("frequency_ratio") < 2.0, 3)
    .when(col("frequency_ratio") < 3.0, 6)
    .otherwise(10)
)

print("  ‚úÖ Sign-in behavior risk calculated (30 points)")

# ========================================
# Application Access Risk (25 points)
# ========================================

# App Risk (0-12)
risk_scores = risk_scores.withColumn("app_risk_score",
    when(col("unique_app_count") <= 5, 0)
    .when(col("unique_app_count") <= 10, 4)
    .when(col("unique_app_count") <= 15, 8)
    .otherwise(12)
)

# Resource Risk (0-13)
risk_scores = risk_scores.withColumn("resource_risk_score",
    when(col("unique_resource_count") <= 3, 0)
    .when(col("unique_resource_count") <= 6, 4)
    .when(col("unique_resource_count") <= 10, 8)
    .otherwise(13)
)

print("  ‚úÖ Application access risk calculated (25 points)")

# ========================================
# Privileged Activity Risk (20 points)
# ========================================

if audit_available:
    # Admin Operations Risk (0-10)
    risk_scores = risk_scores.withColumn("admin_ops_risk_score",
        when(col("total_admin_operations") == 0, 0)
        .when(col("total_admin_operations") <= 5, 3)
        .when(col("total_admin_operations") <= 15, 7)
        .otherwise(10)
    )
    
    # High-Risk Operations (0-10)
    risk_scores = risk_scores.withColumn("high_risk_ops_score",
        when(col("high_risk_operations") == 0, 0)
        .when(col("high_risk_operations") <= 2, 4)
        .when(col("high_risk_operations") <= 5, 7)
        .otherwise(10)
    )
    print("  ‚úÖ Privileged activity risk calculated (20 points)")
else:
    # No audit data - set to 0
    risk_scores = risk_scores.withColumn("admin_ops_risk_score", lit(0))
    risk_scores = risk_scores.withColumn("high_risk_ops_score", lit(0))
    print("  ‚ÑπÔ∏è  Privileged activity risk set to 0 (AuditLogs not available)")

# ========================================
# Security Alert Risk (15 points)
# ========================================

if alert_available:
    # Active Alert Count (0-8)
    risk_scores = risk_scores.withColumn("alert_count_score",
        when(col("active_alert_count") == 0, 0)
        .when(col("active_alert_count") == 1, 3)
        .when(col("active_alert_count") <= 3, 6)
        .otherwise(8)
    )
    
    # Alert Severity (0-7)
    risk_scores = risk_scores.withColumn("alert_severity_risk_score",
        when(col("alert_severity_score") == 0, 0)
        .when(col("alert_severity_score") <= 3, 2)
        .when(col("alert_severity_score") <= 7, 5)
        .otherwise(7)
    )
    print("  ‚úÖ Security alert risk calculated (15 points)")
else:
    # No alert data - set to 0
    risk_scores = risk_scores.withColumn("alert_count_score", lit(0))
    risk_scores = risk_scores.withColumn("alert_severity_risk_score", lit(0))
    print("  ‚ÑπÔ∏è  Security alert risk set to 0 (SecurityAlert not available)")

# ========================================
# Geographic Risk (5 points)
# ========================================

# Simplified based on IP diversity
risk_scores = risk_scores.withColumn("geographic_risk_score",
    when(col("unique_ip_count") <= 3, 0)
    .when(col("unique_ip_count") <= 6, 2)
    .when(col("unique_ip_count") <= 10, 4)
    .otherwise(5)
)

print("  ‚úÖ Geographic risk calculated (5 points)")

# ========================================
# Temporal Risk (5 points)
# ========================================

risk_scores = risk_scores.withColumn("temporal_risk_score",
    when(col("offhours_signin_percent") <= 10, 0)
    .when(col("offhours_signin_percent") <= 25, 2)
    .when(col("offhours_signin_percent") <= 50, 4)
    .otherwise(5)
)

print("  ‚úÖ Temporal risk calculated (5 points)")

print("\n‚úÖ All individual risk factors calculated")

# Show sample of risk scores
print("\nüìä Sample Risk Scores (First 5 users):")
risk_scores.select(
    "UserPrincipalName",
    "ip_risk_score",
    "device_risk_score",
    "frequency_risk_score",
    "app_risk_score",
    "resource_risk_score",
    "admin_ops_risk_score",
    "high_risk_ops_score",
    "alert_count_score",
    "alert_severity_risk_score",
    "geographic_risk_score",
    "temporal_risk_score"
).show(5, truncate=False)

## 9. Calculate Composite Scores

Now we combine the individual risk scores into **category subscores** and calculate the **total risk score** (0-100 points).

**What we're doing:**

1. **Category Subscores**: Sum individual scores within each category
   - `signin_behavior_score` (0-30): IP + Device + Frequency
   - `application_access_score` (0-25): App + Resource
   - `privileged_activity_score` (0-20): Admin Ops + High-Risk Ops
   - `security_alert_score` (0-15): Alert Count + Alert Severity

2. **Total Risk Score** (0-100): Sum all category scores + Geographic + Temporal

3. **Risk Level Classification**:
   - **Low**: 0-30 points
   - **Medium**: 31-60 points
   - **High**: 61-100 points

4. **Alert Flag**: Boolean indicator for users with active security alerts

**Expected Output:**
- Each user gets a total_risk_score (0-100)
- Risk level classification (Low/Medium/High)
- Summary statistics showing score distribution

In [None]:
print("üìà Calculating composite risk scores...")

# Import here in case this cell runs independently
from pyspark.sql.functions import col, when, lit

# ========================================
# Category Subscores
# ========================================

# Sign-in Behavior Score (0-30)
risk_scores = risk_scores.withColumn("signin_behavior_score",
    col("ip_risk_score") + col("device_risk_score") + col("frequency_risk_score")
)

# Application Access Score (0-25)
risk_scores = risk_scores.withColumn("application_access_score",
    col("app_risk_score") + col("resource_risk_score")
)

# Privileged Activity Score (0-20)
risk_scores = risk_scores.withColumn("privileged_activity_score",
    col("admin_ops_risk_score") + col("high_risk_ops_score")
)

# Security Alert Score (0-15)
risk_scores = risk_scores.withColumn("security_alert_score",
    col("alert_count_score") + col("alert_severity_risk_score")
)

print("  ‚úÖ Category subscores calculated")

# ========================================
# Total Risk Score (0-100)
# ========================================

risk_scores = risk_scores.withColumn("total_risk_score",
    col("signin_behavior_score") +
    col("application_access_score") +
    col("privileged_activity_score") +
    col("security_alert_score") +
    col("geographic_risk_score") +
    col("temporal_risk_score")
)

print("  ‚úÖ Total risk score calculated")

# ========================================
# Risk Level Classification
# ========================================

risk_scores = risk_scores.withColumn("risk_level",
    when(col("total_risk_score") <= 30, "Low")
    .when(col("total_risk_score") <= 60, "Medium")
    .otherwise("High")
)

print("  ‚úÖ Risk level classification applied")

# ========================================
# Alert Flag
# ========================================

if alert_available:
    risk_scores = risk_scores.withColumn("has_active_alerts",
        when(col("active_alert_count") > 0, True).otherwise(False)
    )
else:
    risk_scores = risk_scores.withColumn("has_active_alerts", lit(False))

print("  ‚úÖ Alert flag added")

print("\n‚úÖ All composite scores calculated")

# ========================================
# Summary Statistics
# ========================================

print("\n" + "="*60)
print("RISK SCORE SUMMARY")
print("="*60)

# Risk level distribution
risk_distribution = risk_scores.groupBy("risk_level").count().orderBy("risk_level")

# Calculate total users for visualization
total_users = risk_scores.count()

# Show top 10 highest risk users
print("\nüî¥ Top 10 Highest Risk Users:")
risk_scores.select(
    "UserPrincipalName",
    "department",
    "total_risk_score",
    "risk_level",
    "signin_behavior_score",
    "application_access_score",
    "privileged_activity_score",
    "security_alert_score"
).orderBy(col("total_risk_score").desc()).show(10, truncate=False)

print("="*60)

# ========================================
# Visualization: Risk Level Distribution
# ========================================

import matplotlib.pyplot as plt
import pandas as pd

# Convert to pandas for easy plotting
risk_dist_pd = risk_distribution.toPandas()

# Create bar chart
plt.figure(figsize=(10, 6))
bars = plt.bar(risk_dist_pd["risk_level"], risk_dist_pd["count"], 
               color=["#2ecc71", "#f39c12", "#e74c3c"],  # Green, Orange, Red
               edgecolor="black", linewidth=1.2)

# Add value labels on top of bars
for bar in bars:
    height = bar.get_height()
    plt.text(bar.get_x() + bar.get_width()/2., height,
             f"{int(height)}\n({int(height)/total_users*100:.1f}%)",
             ha="center", va="bottom", fontsize=12, fontweight="bold")

plt.xlabel("Risk Level", fontsize=14, fontweight="bold")
plt.ylabel("Number of Users", fontsize=14, fontweight="bold")
plt.title(f"User Risk Level Distribution (Total: {total_users} users)", 
          fontsize=16, fontweight="bold", pad=20)
plt.grid(axis="y", alpha=0.3, linestyle="--")
plt.tight_layout()
plt.show()

print("\nüìä Risk distribution chart generated")
print("="*60)

## 10. Add Metadata & Final Selection

In this section, we prepare the final output DataFrame by adding metadata and selecting the exact columns we want to write to the custom table.

**What we're doing:**

- Add timestamp columns: calculation_date, analysis window dates, TimeGenerated
- Select and order all output columns (25+ fields)
- Use coalesce() to ensure no null values for optional metrics
- Order by total_risk_score descending for easy identification of high-risk users

**Output Columns (27 total):**

1. **Identity** (6): UserId, UserPrincipalName, UserDisplayName, department, country, jobTitle
2. **Risk Scores** (8): total_risk_score, risk_level, 6 category subscores
3. **Metrics** (9): Sign-in counts, app/resource counts, admin operations, alerts
4. **Metadata** (4): calculation_date, analysis_start_date, analysis_end_date, TimeGenerated

**Expected Output:**

- Final DataFrame ready for writing to custom table
- Ordered by risk score (highest first)
- All columns properly named and typed

In [None]:
from datetime import datetime, timedelta
print("üìù Preparing final output...")

# Calculate analysis date range
analysis_end = datetime.now()
analysis_start = analysis_end - timedelta(days=ANALYSIS_DAYS)

# Add metadata columns
risk_scores_final = (
    risk_scores
    .withColumn("calculation_date", current_timestamp())
    .withColumn("analysis_start_date", lit(analysis_start))
    .withColumn("analysis_end_date", lit(analysis_end))
    .withColumn("TimeGenerated", current_timestamp())
)

print("  ‚úÖ Metadata columns added")

# Select final output columns in specific order
output_df = risk_scores_final.select(
    # ========================================
    # Identity (6 columns)
    # ========================================
    col("UserId"),
    col("UserPrincipalName"),
    col("UserDisplayName"),
    col("department"),
    col("country"),
    col("jobTitle"),

    # ========================================
    # Risk Scores (8 columns)
    # ========================================
    col("total_risk_score"),
    col("risk_level"),
    col("signin_behavior_score"),
    col("application_access_score"),
    col("privileged_activity_score"),
    col("security_alert_score"),
    col("geographic_risk_score"),
    col("temporal_risk_score"),

    # ========================================
    # Sign-in Metrics (6 columns)
    # ========================================

    col("unique_ip_count"),
    col("unique_device_count"),
    col("total_signins"),
    col("unique_app_count"),
    col("unique_resource_count"),
    col("offhours_signin_percent"),

    # ========================================
    # Privileged Activity Metrics (2 columns)
    # ========================================
    coalesce(col("total_admin_operations"), lit(0)).alias("total_admin_operations"),
    coalesce(col("high_risk_operations"), lit(0)).alias("high_risk_operations"),

    # ========================================
    # Alert Metrics (3 columns)
    # ========================================
    coalesce(col("active_alert_count"), lit(0)).alias("active_alert_count"),
    coalesce(col("alert_severity_score"), lit(0)).alias("alert_severity_score"),
    col("has_active_alerts"),

    # ========================================
    # Metadata (4 columns)
    # ========================================
    col("calculation_date"),
    col("analysis_start_date"),
    col("analysis_end_date"),
    col("TimeGenerated")
).orderBy(col("total_risk_score").desc())

print("  ‚úÖ Final output columns selected")

# Display summary
user_count = output_df.count()
column_count = len(output_df.columns)

print(f"\n‚úÖ Final output prepared:")
print(f"  üìä {user_count} users")
print(f"  üìã {column_count} columns")
print(f"  üéØ Ordered by total_risk_score (descending)")

# Show sample of final output
print("\nüìã Sample Final Output (Top 5 highest risk users):")
output_df.select(
    "UserPrincipalName",
    "department",
    "total_risk_score",
    "risk_level",
    "signin_behavior_score",
    "application_access_score",
    "privileged_activity_score",
    "security_alert_score"
).show(5, truncate=False)


## 11. Results Analysis & Visualization

Now that we have our final risk scores, let's analyze the results in detail to understand the risk landscape across our user population.

**Analysis Sections:**

1. **Top 10 Highest Risk Users** - Identify users requiring immediate attention
2. **Risk Level Distribution** - Overall security posture visualization
3. **Users with Active Alerts** - Security incidents requiring investigation
4. **Users with Admin Activity** - Privileged access patterns
5. **Risk Factor Contributions** - Which factors drive overall risk
6. **Department Risk Analysis** - Organizational risk patterns
7. **Summary Statistics** - Key metrics and percentages

**Expected Insights:**

- Identify high-risk users for investigation
- Understand which risk factors are most significant
- Spot organizational patterns (departments with elevated risk)
- Quantify the overall security posture


In [None]:
print("\n" + "="*60)
print("RISK SCORE ANALYSIS RESULTS")
print("="*60)

# ========================================
# 1. Top 10 Highest Risk Users
# ========================================

print("\nüî¥ Top 10 Highest Risk Users:")
output_df.select(
    "UserPrincipalName",
    "department",
    "total_risk_score",
    "risk_level",
    "signin_behavior_score",
    "application_access_score",
    "privileged_activity_score",
    "security_alert_score"
).show(10, truncate=False)

# ========================================
# 2. Risk Level Distribution
# ========================================

print("\nüìä Risk Level Distribution:")
risk_distribution = output_df.groupBy("risk_level").count().orderBy("risk_level")
risk_distribution.show()

# ========================================
# 3. Users with Active Alerts
# ========================================

if alert_available:
    alert_users = output_df.filter(col("has_active_alerts") == True).count()
    print(f"\n‚ö†Ô∏è  Users with Active Security Alerts: {alert_users}")
    
    if alert_users > 0:
        print("\nüö® Top Users by Active Alerts:")
        output_df.filter(col("active_alert_count") > 0)\
            .select(
                "UserPrincipalName",
                "active_alert_count",
                "alert_severity_score",
                "total_risk_score",
                "risk_level"
            )\
            .orderBy(col("active_alert_count").desc())\
            .show(10, truncate=False)
else:
    print("\n‚ö†Ô∏è  SecurityAlert data not available - skipping alert analysis")

# ========================================
# 4. Users with Administrative Activity
# ========================================

if audit_available:
    admin_users = output_df.filter(col("total_admin_operations") > 0).count()
    print(f"\nüîë Users with Administrative Activity: {admin_users}")
    
    if admin_users > 0:
        print("\nüë§ Top Users by Admin Operations:")
        output_df.filter(col("total_admin_operations") > 0)\
            .select(
                "UserPrincipalName",
                "total_admin_operations",
                "high_risk_operations",
                "total_risk_score",
                "risk_level"
            )\
            .orderBy(col("total_admin_operations").desc())\
            .show(10, truncate=False)
else:
    print("\n‚ö†Ô∏è  AuditLogs data not available - skipping privileged activity analysis")

# ========================================
# 5. Risk Factor Contribution Analysis
# ========================================

print("\nüìà Average Risk Factor Contributions:")
output_df.select(
    avg("signin_behavior_score").alias("Avg_SignIn_Behavior"),
    avg("application_access_score").alias("Avg_App_Access"),
    avg("privileged_activity_score").alias("Avg_Privileged_Activity"),
    avg("security_alert_score").alias("Avg_Security_Alerts"),
    avg("geographic_risk_score").alias("Avg_Geographic"),
    avg("temporal_risk_score").alias("Avg_Temporal")
).show(truncate=False)

# ========================================
# 6. Department Risk Analysis
# ========================================

print("\nüè¢ Average Risk Score by Department (Top 10):")
output_df.filter(col("department").isNotNull())\
    .groupBy("department")\
    .agg(
        avg("total_risk_score").alias("avg_risk_score"),
        count("*").alias("user_count")
    )\
    .orderBy(col("avg_risk_score").desc())\
    .show(10, truncate=False)

# ========================================
# 7. Summary Statistics
# ========================================

high_risk_count = output_df.filter(col("risk_level") == "High").count()
medium_risk_count = output_df.filter(col("risk_level") == "Medium").count()
low_risk_count = output_df.filter(col("risk_level") == "Low").count()
total_users = output_df.count()

print("\n" + "="*60)
print("SUMMARY STATISTICS")
print("="*60)
print(f"Total Users Analyzed: {total_users}")
print(f"High Risk:   {high_risk_count:4d} ({high_risk_count/total_users*100:5.1f}%)")
print(f"Medium Risk: {medium_risk_count:4d} ({medium_risk_count/total_users*100:5.1f}%)")
print(f"Low Risk:    {low_risk_count:4d} ({low_risk_count/total_users*100:5.1f}%)")

if alert_available:
    alert_user_count = output_df.filter(col("has_active_alerts") == True).count()
    print(f"\nUsers with Active Alerts: {alert_user_count} ({alert_user_count/total_users*100:5.1f}%)")

if audit_available:
    admin_user_count = output_df.filter(col("total_admin_operations") > 0).count()
    print(f"Users with Admin Activity: {admin_user_count} ({admin_user_count/total_users*100:5.1f}%)")

print("="*60)

print("\n‚úÖ Analysis complete - output_df ready for writing to custom table")


## 12. Write to Custom Table

Now we'll write our calculated risk scores to a custom Sentinel table for use in queries, workbooks, and analytics rules.

**Required Permissions:**

Writing to custom tables requires specific Azure permissions:
- **Microsoft Sentinel Contributor** role on the workspace, OR
- **Storage Blob Data Contributor** role on the underlying storage account
- See: [Microsoft Sentinel RBAC roles](https://learn.microsoft.com/en-us/azure/sentinel/roles)

**What we're doing:**

- Write the `output_df` DataFrame to a custom table
- Table name: `UserRiskScores_SPRK` (following the _SPRK convention for notebook-generated tables)
- Mode: `overwrite` - replaces existing data with current analysis
- Format: Delta Lake for ACID transactions

**If you don't have write permissions:**

The code includes fallback options:
1. Export to CSV for manual ingestion
2. Display results in notebook for analysis
3. Contact your Sentinel administrator to request permissions

**Custom Table Benefits:**

- Query risk scores directly in KQL
- Create workbooks and dashboards
- Use in analytics rules for automated alerting
- Join with other Sentinel tables for correlation

**Expected Output:**

- Confirmation message showing records written, OR
- Permissions error with alternative export options


In [None]:
import traceback
import sys
import io
import os

print("\nüíæ Writing risk scores to custom table...")
print("\n‚ö†Ô∏è  Note: Writing custom tables requires Microsoft Sentinel Contributor permissions")
print("   If you don't have permissions, the data will remain in 'output_df' for analysis\n")

# Custom table name - following the _SPRK convention
CUSTOM_TABLE_NAME = "UserRiskScores_SPRK"
write_success = False

# Save original stderr
original_stderr = sys.stderr

try:
    # Suppress stderr during the write attempt to avoid mixed output
    sys.stderr = open(os.devnull, 'w')
    
    # Write using Sentinel provider's save_as_table method
    # Saving to system tables, so no `database_name` needed
    sentinel_provider.save_as_table(
        output_df, 
        CUSTOM_TABLE_NAME,
        write_options={
            "mode": "overwrite",
            "mergeSchema": "true"
        }
    )
    
    write_success = True
    
    # Restore stderr before printing success
    sys.stderr = original_stderr
    
    record_count = output_df.count()
    print(f"‚úÖ Successfully wrote {record_count} records to {CUSTOM_TABLE_NAME}")
    print(f"\nüìä Table Details:")
    print(f"   Table Name: {CUSTOM_TABLE_NAME}")
    print(f"   Records: {record_count}")
    print(f"   Columns: {len(output_df.columns)}")
    print(f"   Method: sentinel_provider.save_as_table()")
    
    print(f"\nüîç Query in KQL:")
    print(f"   {CUSTOM_TABLE_NAME}")
    print(f"   | where risk_level == 'High'")
    print(f"   | order by total_risk_score desc")
    
except Exception as e:
    # Restore stderr
    sys.stderr = original_stderr
    
    print(f"‚ùå Could not write to custom table")
    print(f"\nüìã Exception Details (for reporting):")
    print(f"   Exception Type: {type(e).__name__}")
    
    # Capture clean traceback to string buffer
    print(f"\nüìÑ Full Stack Trace:")
    print("-" * 60)
    
    # Create string buffer and suppress stderr while formatting
    error_buffer = io.StringIO()
    sys.stderr = open(os.devnull, 'w')
    try:
        traceback.print_exc(file=error_buffer)
    finally:
        sys.stderr = original_stderr
    
    error_text = error_buffer.getvalue()
    print(error_text)
    print("-" * 60)
    
    print(f"\n‚ö†Ô∏è  Common causes:")
    print(f"   - Missing Microsoft Sentinel Contributor permissions")
    print(f"   - Storage account access issues")
    print(f"   - DataFrame schema incompatibility")
    print(f"   See: https://learn.microsoft.com/en-us/azure/sentinel/roles")
finally:
    # Ensure stderr is always restored
    sys.stderr = original_stderr

if not write_success:
    print(f"\nüí° Your risk scores are still available for analysis!")
    print(f"\nüìä Sample Results (Top 10 Highest Risk Users):")
    output_df.select(
        "UserPrincipalName",
        "department",
        "total_risk_score",
        "risk_level",
        "signin_behavior_score",
        "security_alert_score"
    ).orderBy(col("total_risk_score").desc()).show(10, truncate=False)
    
    print(f"\nüìÅ To export to CSV:")
    print(f"   pdf = output_df.toPandas()")
    print(f"   pdf.to_csv('user_risk_scores.csv', index=False)")
    
    print(f"\n‚úÖ Continue with Section 13 for sample queries")
