# Worm-Style Lateral Movement & Shai-Hulud Detection (Sentinel Data Lake)

This notebook detects worm-style exploitation and lateral movement behaviors using Microsoft Defender for Endpoint Advanced Hunting tables from Sentinel Data Lake. It focuses on **behavioral correlation** across multiple signals rather than static IOC matching.

## Detection focus
- **Lateral movement** via remote execution (WMI, PsExec, WinRM, services)
- **Credential reuse & fan-out** across multiple devices in short time windows
- **East-west network fan-out** on common lateral movement ports (445, 135, 5985, 5986, 3389, 22)
- **Identical command/binary/hash reuse** across devices
- **LOLBin abuse** (powershell, wmic, rundll32, mshta, certutil, bitsadmin)

## Primary tables
- `DeviceProcessEvents` (process execution, command lines)
- `DeviceNetworkEvents` (network connections, lateral movement ports)
- `DeviceLogonEvents` (credential reuse, account fan-out)
- `DeviceFileEvents` (binary/hash propagation)

## Threat Intelligence enrichment
- TI is used for **enrichment only**, not as primary detection trigger
- Correlates file hashes, domains, IPs with `ThreatIntelIndicator` table
- Behavioral findings are separated from TI context in outputs

## How to use
1. Run Cells 2‚Äì4 to initialize and load tables
2. Run detection cells (5+) for each behavioral pattern
3. Review findings: behavioral signals are primary, TI adds confidence
4. Pivot into entity timelines and fan-out analysis as needed

**Note**: Default lookback is 7 days UTC. Adjust `LOOKBACK_DAYS` in Cell 2 as needed.

In [None]:
# üì¶ Imports + time window configuration
import pandas as pd
import numpy as np
import seaborn as sns
import matplotlib.pyplot as plt
from datetime import datetime, timedelta, timezone
import warnings
warnings.filterwarnings('ignore')

from sentinel_lake.providers import MicrosoftSentinelProvider
from pyspark.sql import DataFrame
from pyspark.sql.functions import (
    col, lit, lower, upper, trim, coalesce,
    count as spark_count, countDistinct,
    date_trunc, to_timestamp, regexp_extract,
    when, expr
)

sns.set_theme(style='whitegrid')

# Default time window (UTC): last 7 days
LOOKBACK_DAYS = 7
UTC_NOW = datetime.now(timezone.utc)
START_TIME = UTC_NOW - timedelta(days=LOOKBACK_DAYS)
print(f"‚è±Ô∏è Time window (UTC): {START_TIME.isoformat()} ‚Üí {UTC_NOW.isoformat()} (last {LOOKBACK_DAYS} days)")

In [None]:
# üîå Data provider + smart table loader
data_provider = MicrosoftSentinelProvider(spark)
print('‚úÖ MicrosoftSentinelProvider initialized')

# Optional: set these if you want deterministic workspace routing
PRIMARY_WORKSPACE = None  # e.g. 'ak-SecOps'
FALLBACK_WORKSPACES = ['default', 'ak-SecOps']

def _try_read(table_name: str, workspace: str | None):
    if workspace:
        return data_provider.read_table(table_name, workspace)
    return data_provider.read_table(table_name)

def smart_load(table_name: str):
    """Attempt to load a table with minimal assumptions.
    Order: PRIMARY_WORKSPACE (if set) ‚Üí auto/default ‚Üí FALLBACK_WORKSPACES"""
    last_error = None
    if PRIMARY_WORKSPACE:
        try:
            return _try_read(table_name, PRIMARY_WORKSPACE), PRIMARY_WORKSPACE, None
        except Exception as e:
            last_error = str(e)
    try:
        return _try_read(table_name, None), 'auto', None
    except Exception as e:
        last_error = str(e)
    for ws in FALLBACK_WORKSPACES:
        if PRIMARY_WORKSPACE and ws == PRIMARY_WORKSPACE:
            continue
        try:
            return _try_read(table_name, ws), ws, None
        except Exception as e:
            last_error = str(e)
    return None, None, last_error

def filter_time(df: DataFrame, ts_col_candidates=("Timestamp", "TimeGenerated")):
    if df is None:
        return None, None
    for ts_col in ts_col_candidates:
        if ts_col in df.columns:
            return df.filter((col(ts_col) >= lit(START_TIME)) & (col(ts_col) <= lit(UTC_NOW))), ts_col
    return df, None

def to_pandas(df: DataFrame, limit: int = 2000, sort_by: str | None = None, ascending: bool = False) -> pd.DataFrame:
    if df is None:
        return pd.DataFrame()
    if sort_by and sort_by in df.columns:
        df = df.sort(col(sort_by).asc() if ascending else col(sort_by).desc())
    pdf = df.limit(limit).toPandas()
    return pdf

def print_findings_summary(title: str, pdf: pd.DataFrame, extra: str | None = None):
    print(f"\nüßæ {title}")
    print(f"   Rows: {len(pdf):,}")
    if extra:
        print(f"   {extra}")

In [None]:
# üìö Load relevant tables (graceful if missing)
TABLE_CANDIDATES = {
    'DeviceProcessEvents': ['DeviceProcessEvents'],
    'DeviceNetworkEvents': ['DeviceNetworkEvents'],
    'DeviceLogonEvents': ['DeviceLogonEvents'],
    'DeviceFileEvents': ['DeviceFileEvents'],
    # Optional TI table (use only for enrichment)
    'ThreatIntelIndicators': ['ThreatIntelIndicators'],
}

loaded = {}
availability = []

for logical_name, candidates in TABLE_CANDIDATES.items():
    df = None; ws = None; err = None; chosen = None
    for t in candidates:
        df_try, ws_try, err_try = smart_load(t)
        if df_try is not None:
            df = df_try; ws = ws_try; chosen = t; err = None
            break
        err = err_try
    df, ts_col = filter_time(df)
    loaded[logical_name] = {'df': df, 'workspace': ws, 'table': chosen, 'error': err, 'ts_col': ts_col}
    availability.append({
        'LogicalName': logical_name,
        'Workspace': ws or '-',
        'Available': df is not None,
        'Table': chosen or '-',
        'TimestampColumn': ts_col or '-',
        'Error': err or '-',
    })

availability_df = pd.DataFrame(availability)
availability_df

## 1. Lateral Movement Detection
Detects remote execution patterns across multiple devices via WMI, PsExec-like behavior, WinRM, and services.

In [None]:
# üîç Lateral Movement Detection

dpe = loaded.get('DeviceProcessEvents', {}).get('df')
if dpe is None:
    print('‚ÑπÔ∏è DeviceProcessEvents not available; skipping lateral movement detection.')
else:
    # Define lateral movement patterns
    lateral_move_patterns = [
        # WMI execution
        ('wmiprvse.exe', 'wmi'),
        ('wmic.exe', 'wmi'),
        # PsExec-like behavior
        ('psexec', 'psexec'),
        ('paexec', 'psexec'),
        # WinRM/PowerShell remoting
        ('wsmprovhost.exe', 'winrm'),
        # Service installation/execution
        ('services.exe', 'service'),
        ('sc.exe', 'service'),
        # Remote scheduled tasks
        ('schtasks.exe', 'schtask'),
        ('taskeng.exe', 'schtask'),
        ('at.exe', 'schtask'),
    ]
    
    print('‚úÖ Analyzing lateral movement patterns...')
    
    # Filter to relevant processes
    dpe_cols = set(dpe.columns)
    if 'FileName' not in dpe_cols or 'InitiatingProcessFileName' not in dpe_cols:
        print('‚ö†Ô∏è DeviceProcessEvents schema incomplete; cannot detect lateral movement.')
    else:
        from functools import reduce
        
        # Build condition for lateral movement indicators
        conditions = []
        for pattern, tag in lateral_move_patterns:
            conditions.append(
                (lower(col('FileName')).like(f'%{pattern.lower()}%')) |
                (lower(col('InitiatingProcessFileName')).like(f'%{pattern.lower()}%'))
            )
        
        combined_condition = reduce(lambda a, b: a | b, conditions)
        
        lateral_df = dpe.filter(combined_condition)
        
        # Group by DeviceName, AccountName to find fan-out
        account_col = col('AccountName') if 'AccountName' in dpe_cols else col('InitiatingProcessAccountName') if 'InitiatingProcessAccountName' in dpe_cols else lit(None).alias('AccountName')
        device_col = col('DeviceName') if 'DeviceName' in dpe_cols else lit(None).alias('DeviceName')
        ts_col = col('Timestamp') if 'Timestamp' in dpe_cols else col('TimeGenerated') if 'TimeGenerated' in dpe_cols else lit(None).alias('Timestamp')
        cmdline_col = col('ProcessCommandLine') if 'ProcessCommandLine' in dpe_cols else lit(None).alias('ProcessCommandLine')
        
        lateral_summary = (
            lateral_df
            .select(
                ts_col,
                device_col,
                account_col,
                col('FileName').alias('Process'),
                col('InitiatingProcessFileName').alias('ParentProcess'),
                cmdline_col,
            )
            .groupBy('AccountName')
            .agg(
                countDistinct('DeviceName').alias('DeviceCount'),
                spark_count('*').alias('EventCount'),
                expr('collect_set(DeviceName)').alias('Devices'),
            )
            .filter(col('DeviceCount') > 1)  # Multi-device execution
            .orderBy(col('DeviceCount').desc())
        )
        
        lateral_pdf = to_pandas(lateral_summary, limit=1000)
        print_findings_summary('Lateral Movement: Multi-device execution by account', lateral_pdf, extra='Accounts executing on 2+ devices')
        
        if len(lateral_pdf) == 0:
            print('   (no multi-device lateral movement detected)')
        else:
            # Show top 20 results
            print(lateral_pdf.head(20).to_string(index=False))
            
            # Show detailed events for top account
            if len(lateral_pdf) > 0:
                top_account = lateral_pdf.iloc[0]['AccountName']
                print(f'\nüîé Detailed events for top account: {top_account}')
                
                detailed = (
                    lateral_df
                    .filter(account_col == lit(top_account))
                    .select(
                        ts_col,
                        device_col,
                        col('FileName').alias('Process'),
                        col('InitiatingProcessFileName').alias('ParentProcess'),
                        cmdline_col,
                    )
                    .orderBy(ts_col.desc())
                )
                
                detailed_pdf = to_pandas(detailed, limit=50)
                print(detailed_pdf.to_string(index=False))

## 2. Credential Reuse & Account Fan-Out
Detects accounts authenticating to multiple devices in short time windows (potential credential reuse/pass-the-hash).

In [None]:
# üîë Credential Reuse & Fan-Out Detection

dle = loaded.get('DeviceLogonEvents', {}).get('df')
if dle is None:
    print('‚ÑπÔ∏è DeviceLogonEvents not available; skipping credential fan-out detection.')
else:
    print('‚úÖ Analyzing credential fan-out patterns...')
    
    dle_cols = set(dle.columns)
    account_col = col('AccountName') if 'AccountName' in dle_cols else lit(None).alias('AccountName')
    device_col = col('DeviceName') if 'DeviceName' in dle_cols else lit(None).alias('DeviceName')
    ts_col = col('Timestamp') if 'Timestamp' in dle_cols else col('TimeGenerated') if 'TimeGenerated' in dle_cols else lit(None).alias('Timestamp')
    logon_type_col = col('LogonType') if 'LogonType' in dle_cols else lit(None).alias('LogonType')
    remote_ip_col = col('RemoteIP') if 'RemoteIP' in dle_cols else lit(None).alias('RemoteIP')
    
    # Focus on network logons (type 3) and remote interactive (type 10)
    fan_out = (
        dle
        .filter((col('LogonType') == 'Network') | (col('LogonType') == 'RemoteInteractive') if 'LogonType' in dle_cols else lit(True))
        .select(
            ts_col,
            account_col,
            device_col,
            logon_type_col,
            remote_ip_col,
        )
        .groupBy('AccountName')
        .agg(
            countDistinct('DeviceName').alias('DeviceCount'),
            spark_count('*').alias('LogonCount'),
            expr('min(Timestamp)').alias('FirstSeen') if 'Timestamp' in dle_cols else expr('min(TimeGenerated)').alias('FirstSeen'),
            expr('max(Timestamp)').alias('LastSeen') if 'Timestamp' in dle_cols else expr('max(TimeGenerated)').alias('LastSeen'),
            expr('collect_set(DeviceName)').alias('Devices'),
        )
        .filter(col('DeviceCount') >= 3)  # Account accessed 3+ devices
        .orderBy(col('DeviceCount').desc())
    )
    
    fan_out_pdf = to_pandas(fan_out, limit=1000)
    print_findings_summary('Credential Fan-Out: Accounts across multiple devices', fan_out_pdf, extra='Accounts with 3+ device logons')
    
    if len(fan_out_pdf) == 0:
        print('   (no significant credential fan-out detected)')
    else:
        # Calculate time window for each account
        if 'FirstSeen' in fan_out_pdf.columns and 'LastSeen' in fan_out_pdf.columns:
            fan_out_pdf['TimeWindow'] = (pd.to_datetime(fan_out_pdf['LastSeen']) - pd.to_datetime(fan_out_pdf['FirstSeen'])).dt.total_seconds() / 60
            fan_out_pdf = fan_out_pdf.sort_values('TimeWindow')
        
        print(fan_out_pdf.head(20).to_string(index=False))

## 3. Network Fan-Out Detection
Detects east-west network fan-out on lateral movement ports (445, 135, 5985, 5986, 3389, 22).

In [None]:
# üåê Network Fan-Out Detection

dne = loaded.get('DeviceNetworkEvents', {}).get('df')
if dne is None:
    print('‚ÑπÔ∏è DeviceNetworkEvents not available; skipping network fan-out detection.')
else:
    print('‚úÖ Analyzing network fan-out on lateral movement ports...')
    
    # Define lateral movement ports
    lateral_ports = [445, 135, 5985, 5986, 3389, 22]
    
    dne_cols = set(dne.columns)
    device_col = col('DeviceName') if 'DeviceName' in dne_cols else lit(None).alias('DeviceName')
    remote_ip_col = col('RemoteIP') if 'RemoteIP' in dne_cols else lit(None).alias('RemoteIP')
    remote_port_col = col('RemotePort') if 'RemotePort' in dne_cols else lit(None).alias('RemotePort')
    ts_col = col('Timestamp') if 'Timestamp' in dne_cols else col('TimeGenerated') if 'TimeGenerated' in dne_cols else lit(None).alias('Timestamp')
    init_process_col = col('InitiatingProcessFileName') if 'InitiatingProcessFileName' in dne_cols else lit(None).alias('InitiatingProcessFileName')
    
    # Filter to lateral movement ports
    if 'RemotePort' in dne_cols:
        port_condition = col('RemotePort').isin(lateral_ports)
        
        fan_out = (
            dne
            .filter(port_condition)
            .select(
                ts_col,
                device_col,
                remote_ip_col,
                remote_port_col,
                init_process_col,
            )
            .groupBy('DeviceName', 'RemotePort')
            .agg(
                countDistinct('RemoteIP').alias('UniqueDestinations'),
                spark_count('*').alias('ConnectionCount'),
                expr('min(Timestamp)').alias('FirstSeen') if 'Timestamp' in dne_cols else expr('min(TimeGenerated)').alias('FirstSeen'),
                expr('max(Timestamp)').alias('LastSeen') if 'Timestamp' in dne_cols else expr('max(TimeGenerated)').alias('LastSeen'),
                expr('collect_set(RemoteIP)').alias('DestinationIPs'),
            )
            .filter(col('UniqueDestinations') >= 5)  # Device contacted 5+ destinations
            .orderBy(col('UniqueDestinations').desc())
        )
        
        fan_out_pdf = to_pandas(fan_out, limit=1000)
        print_findings_summary('Network Fan-Out: Devices scanning lateral movement ports', fan_out_pdf, extra='Devices with 5+ unique destinations')
        
        if len(fan_out_pdf) == 0:
            print('   (no significant network fan-out detected)')
        else:
            # Calculate scan velocity (destinations per minute)
            if 'FirstSeen' in fan_out_pdf.columns and 'LastSeen' in fan_out_pdf.columns:
                fan_out_pdf['TimeWindow_min'] = (pd.to_datetime(fan_out_pdf['LastSeen']) - pd.to_datetime(fan_out_pdf['FirstSeen'])).dt.total_seconds() / 60
                fan_out_pdf['TimeWindow_min'] = fan_out_pdf['TimeWindow_min'].replace(0, 1)  # Avoid divide by zero
                fan_out_pdf['ScanVelocity'] = fan_out_pdf['UniqueDestinations'] / fan_out_pdf['TimeWindow_min']
                fan_out_pdf = fan_out_pdf.sort_values('ScanVelocity', ascending=False)
            
            print(fan_out_pdf.head(20).to_string(index=False))
    else:
        print('‚ö†Ô∏è RemotePort column not available in DeviceNetworkEvents schema')

## 4. Command/Binary Reuse Detection
Detects identical command lines, binaries, or file hashes reused across multiple devices (worm propagation pattern).

In [None]:
# üîÅ Command/Binary Reuse Detection

dpe = loaded.get('DeviceProcessEvents', {}).get('df')
dfe = loaded.get('DeviceFileEvents', {}).get('df')

if dpe is None:
    print('‚ÑπÔ∏è DeviceProcessEvents not available; skipping command reuse detection.')
else:
    print('‚úÖ Analyzing command line reuse across devices...')
    
    dpe_cols = set(dpe.columns)
    cmdline_col = col('ProcessCommandLine') if 'ProcessCommandLine' in dpe_cols else None
    device_col = col('DeviceName') if 'DeviceName' in dpe_cols else lit(None).alias('DeviceName')
    ts_col = col('Timestamp') if 'Timestamp' in dpe_cols else col('TimeGenerated') if 'TimeGenerated' in dpe_cols else lit(None).alias('Timestamp')
    sha256_col = col('SHA256') if 'SHA256' in dpe_cols else lit(None).alias('SHA256')
    filename_col = col('FileName') if 'FileName' in dpe_cols else lit(None).alias('FileName')
    
    if 'ProcessCommandLine' in dpe_cols:
        # Find command lines executed on multiple devices
        cmdline_reuse = (
            dpe
            .filter(col('ProcessCommandLine').isNotNull())
            .filter(expr('length(ProcessCommandLine) > 20'))  # Skip trivial commands
            .select(
                ts_col,
                device_col,
                cmdline_col,
                filename_col,
                sha256_col,
            )
            .groupBy('ProcessCommandLine')
            .agg(
                countDistinct('DeviceName').alias('DeviceCount'),
                spark_count('*').alias('ExecutionCount'),
                expr('collect_set(DeviceName)').alias('Devices'),
                expr('min(Timestamp)').alias('FirstSeen') if 'Timestamp' in dpe_cols else expr('min(TimeGenerated)').alias('FirstSeen'),
                expr('max(Timestamp)').alias('LastSeen') if 'Timestamp' in dpe_cols else expr('max(TimeGenerated)').alias('LastSeen'),
            )
            .filter(col('DeviceCount') >= 3)  # Command on 3+ devices
            .orderBy(col('DeviceCount').desc())
        )
        
        cmdline_pdf = to_pandas(cmdline_reuse, limit=1000)
        print_findings_summary('Command Reuse: Identical commands on multiple devices', cmdline_pdf, extra='Commands on 3+ devices')
        
        if len(cmdline_pdf) == 0:
            print('   (no significant command reuse detected)')
        else:
            # Truncate long command lines for display
            if 'ProcessCommandLine' in cmdline_pdf.columns:
                cmdline_pdf['CommandLine_short'] = cmdline_pdf['ProcessCommandLine'].str[:100] + '...'
                display_cols = ['DeviceCount', 'ExecutionCount', 'CommandLine_short', 'FirstSeen', 'LastSeen']
                print(cmdline_pdf[display_cols].head(20).to_string(index=False))
            else:
                print(cmdline_pdf.head(20).to_string(index=False))
    else:
        print('‚ö†Ô∏è ProcessCommandLine column not available')

# Hash reuse detection
if dfe is not None:
    print('\n‚úÖ Analyzing file hash reuse across devices...')
    
    dfe_cols = set(dfe.columns)
    if 'SHA256' in dfe_cols:
        hash_reuse = (
            dfe
            .filter(col('SHA256').isNotNull())
            .select(
                col('Timestamp') if 'Timestamp' in dfe_cols else col('TimeGenerated') if 'TimeGenerated' in dfe_cols else lit(None).alias('Timestamp'),
                col('DeviceName') if 'DeviceName' in dfe_cols else lit(None).alias('DeviceName'),
                col('SHA256'),
                col('FileName') if 'FileName' in dfe_cols else lit(None).alias('FileName'),
                col('FolderPath') if 'FolderPath' in dfe_cols else lit(None).alias('FolderPath'),
            )
            .groupBy('SHA256')
            .agg(
                countDistinct('DeviceName').alias('DeviceCount'),
                spark_count('*').alias('FileCount'),
                expr('collect_set(FileName)').alias('FileNames'),
                expr('collect_set(DeviceName)').alias('Devices'),
            )
            .filter(col('DeviceCount') >= 3)  # Hash on 3+ devices
            .orderBy(col('DeviceCount').desc())
        )
        
        hash_pdf = to_pandas(hash_reuse, limit=1000)
        print_findings_summary('Hash Reuse: Identical file hashes on multiple devices', hash_pdf, extra='Hashes on 3+ devices')
        
        if len(hash_pdf) == 0:
            print('   (no significant hash reuse detected)')
        else:
            print(hash_pdf.head(20).to_string(index=False))
    else:
        print('‚ö†Ô∏è SHA256 column not available in DeviceFileEvents')

## 5. LOLBin Abuse Detection
Detects abuse of living-off-the-land binaries (powershell, wmic, rundll32, mshta, certutil, bitsadmin) with suspicious patterns.

In [None]:
# ü´† LOLBin Abuse Detection

dpe = loaded.get('DeviceProcessEvents', {}).get('df')
if dpe is None:
    print('‚ÑπÔ∏è DeviceProcessEvents not available; skipping LOLBin detection.')
else:
    print('‚úÖ Analyzing LOLBin abuse patterns...')
    
    # Define LOLBins of interest
    lolbins = [
        'powershell.exe',
        'pwsh.exe',
        'wmic.exe',
        'rundll32.exe',
        'mshta.exe',
        'certutil.exe',
        'bitsadmin.exe',
        'regsvr32.exe',
        'msiexec.exe',
        'cscript.exe',
        'wscript.exe',
    ]
    
    dpe_cols = set(dpe.columns)
    filename_col = col('FileName') if 'FileName' in dpe_cols else lit(None).alias('FileName')
    cmdline_col = col('ProcessCommandLine') if 'ProcessCommandLine' in dpe_cols else lit(None).alias('ProcessCommandLine')
    device_col = col('DeviceName') if 'DeviceName' in dpe_cols else lit(None).alias('DeviceName')
    account_col = col('AccountName') if 'AccountName' in dpe_cols else col('InitiatingProcessAccountName') if 'InitiatingProcessAccountName' in dpe_cols else lit(None).alias('AccountName')
    ts_col = col('Timestamp') if 'Timestamp' in dpe_cols else col('TimeGenerated') if 'TimeGenerated' in dpe_cols else lit(None).alias('Timestamp')
    parent_col = col('InitiatingProcessFileName') if 'InitiatingProcessFileName' in dpe_cols else lit(None).alias('InitiatingProcessFileName')
    
    # Build LOLBin filter
    lolbin_condition = None
    for lolbin in lolbins:
        condition = lower(col('FileName')) == lolbin.lower()
        lolbin_condition = condition if lolbin_condition is None else lolbin_condition | condition
    
    if 'FileName' in dpe_cols:
        lolbin_df = (
            dpe
            .filter(lolbin_condition)
            .select(
                ts_col,
                device_col,
                account_col,
                filename_col,
                parent_col,
                cmdline_col,
            )
        )
        
        # Detect suspicious patterns:
        # 1. Encoded commands (base64, -enc, -e)
        # 2. Download/exec patterns (downloadstring, downloadfile, iex, invoke-expression)
        # 3. WMI abuse (process call create)
        # 4. Certutil abuse (-decode, -urlcache, -split)
        
        if 'ProcessCommandLine' in dpe_cols:
            suspicious_lolbin = (
                lolbin_df
                .filter(
                    (lower(col('ProcessCommandLine')).like('%base64%')) |
                    (lower(col('ProcessCommandLine')).like('%-enc%')) |
                    (lower(col('ProcessCommandLine')).like('%-e %')) |
                    (lower(col('ProcessCommandLine')).like('%downloadstring%')) |
                    (lower(col('ProcessCommandLine')).like('%downloadfile%')) |
                    (lower(col('ProcessCommandLine')).like('%iex%')) |
                    (lower(col('ProcessCommandLine')).like('%invoke-expression%')) |
                    (lower(col('ProcessCommandLine')).like('%process call create%')) |
                    (lower(col('ProcessCommandLine')).like('%-decode%')) |
                    (lower(col('ProcessCommandLine')).like('%-urlcache%')) |
                    (lower(col('ProcessCommandLine')).like('%-split%'))
                )
                .groupBy('FileName', 'AccountName')
                .agg(
                    countDistinct('DeviceName').alias('DeviceCount'),
                    spark_count('*').alias('ExecutionCount'),
                    expr('collect_set(DeviceName)').alias('Devices'),
                )
                .filter(col('DeviceCount') >= 2)  # LOLBin abuse on 2+ devices
                .orderBy(col('DeviceCount').desc())
            )
            
            lolbin_pdf = to_pandas(suspicious_lolbin, limit=1000)
            print_findings_summary('LOLBin Abuse: Suspicious patterns across devices', lolbin_pdf, extra='LOLBin abuse on 2+ devices')
            
            if len(lolbin_pdf) == 0:
                print('   (no suspicious LOLBin abuse detected)')
            else:
                print(lolbin_pdf.head(20).to_string(index=False))
                
                # Show detailed command lines for top result
                if len(lolbin_pdf) > 0:
                    top_lolbin = lolbin_pdf.iloc[0]['FileName']
                    top_account = lolbin_pdf.iloc[0]['AccountName']
                    print(f'\nüîé Detailed commands for {top_lolbin} / {top_account}')
                    
                    detailed = (
                        lolbin_df
                        .filter((col('FileName') == lit(top_lolbin)) & (account_col == lit(top_account)))
                        .select(ts_col, device_col, filename_col, cmdline_col)
                        .orderBy(ts_col.desc())
                    )
                    
                    detailed_pdf = to_pandas(detailed, limit=20)
                    if 'ProcessCommandLine' in detailed_pdf.columns:
                        detailed_pdf['CommandLine_short'] = detailed_pdf['ProcessCommandLine'].str[:150] + '...'
                        display_cols = ['Timestamp', 'DeviceName', 'FileName', 'CommandLine_short']
                        print(detailed_pdf[display_cols].to_string(index=False))
                    else:
                        print(detailed_pdf.to_string(index=False))
        else:
            print('‚ö†Ô∏è ProcessCommandLine column not available for LOLBin pattern detection')

## 6. Threat Intelligence Enrichment
Enriches behavioral findings with TI indicators (file hashes, domains, IPs). TI is used for **context only**, not primary detection.

In [None]:
# üîñ Threat Intelligence Enrichment (Optional)

ti_df_entry = loaded.get('ThreatIntelIndicators', {}).get('df')
if ti_df_entry is None:
    print('‚ÑπÔ∏è ThreatIntelIndicators table not available. Behavioral detections above remain valid without TI enrichment.')
else:
    print('‚úÖ TI table available for enrichment. Correlating behavioral findings with threat intelligence...')
    
    ti_cols = set(ti_df_entry.columns)
    print(f'   TI columns: {len(ti_cols)}')
    
    # For each behavioral finding that produced hashes, domains, or IPs,
    # you can join against TI to add confidence scores.
    
    # Example: Enrich hash reuse findings with TI
    dfe = loaded.get('DeviceFileEvents', {}).get('df')
    if dfe is not None and 'SHA256' in dfe.columns and 'FileHashValue' in ti_cols:
        print('\nüîç Enriching file hashes with TI indicators...')
        
        # Get unique hashes from behavioral findings
        hash_set = (
            dfe
            .select(col('SHA256'))
            .filter(col('SHA256').isNotNull())
            .distinct()
        )
        
        # Join with TI
        ti_hash_match = (
            hash_set
            .join(
                ti_df_entry.select(
                    col('FileHashValue').alias('SHA256'),
                    col('ThreatType') if 'ThreatType' in ti_cols else lit(None).alias('ThreatType'),
                    col('ConfidenceScore') if 'ConfidenceScore' in ti_cols else lit(None).alias('ConfidenceScore'),
                    col('Description') if 'Description' in ti_cols else lit(None).alias('Description'),
                ),
                'SHA256',
                'inner'
            )
        )
        
        ti_enriched_pdf = to_pandas(ti_hash_match, limit=500)
        print_findings_summary('TI Enrichment: File hashes with threat intelligence', ti_enriched_pdf, extra='Hashes matching TI indicators')
        
        if len(ti_enriched_pdf) == 0:
            print('   (no TI matches for file hashes)')
        else:
            print(ti_enriched_pdf.head(20).to_string(index=False))
    
    # Example: Enrich network destinations with TI
    dne = loaded.get('DeviceNetworkEvents', {}).get('df')
    if dne is not None and 'RemoteIP' in dne.columns and 'NetworkIP' in ti_cols:
        print('\nüîç Enriching network IPs with TI indicators...')
        
        ip_set = (
            dne
            .select(col('RemoteIP'))
            .filter(col('RemoteIP').isNotNull())
            .distinct()
        )
        
        ti_ip_match = (
            ip_set
            .join(
                ti_df_entry.select(
                    col('NetworkIP').alias('RemoteIP'),
                    col('ThreatType') if 'ThreatType' in ti_cols else lit(None).alias('ThreatType'),
                    col('ConfidenceScore') if 'ConfidenceScore' in ti_cols else lit(None).alias('ConfidenceScore'),
                    col('Description') if 'Description' in ti_cols else lit(None).alias('Description'),
                ),
                'RemoteIP',
                'inner'
            )
        )
        
        ti_ip_pdf = to_pandas(ti_ip_match, limit=500)
        print_findings_summary('TI Enrichment: Network IPs with threat intelligence', ti_ip_pdf, extra='IPs matching TI indicators')
        
        if len(ti_ip_pdf) == 0:
            print('   (no TI matches for network IPs)')
        else:
            print(ti_ip_pdf.head(20).to_string(index=False))
    
    print('\n‚ÑπÔ∏è TI enrichment complete. Review behavioral findings first; TI adds context/confidence.')

## 7. Risk Scoring & Entity Prioritization
Aggregates behavioral signals into a composite risk score per entity (DeviceName/AccountName).

**Score Components:**
- `CredentialFanoutScore` (+3): Account logs onto ‚â•5 devices within 60 min
- `RemoteExecScore` (+3): Remote execution on ‚â•2 devices within 60 min
- `BinaryOrCmdReuseScore` (+2): Same hash/command on ‚â•3 devices within 24 hours
- `EastWestFanoutScore` (+2): Device connects to ‚â•20 internal IPs within 30 min on lateral movement ports
- `TIScore` (+1): Observed hash/domain/IP matches active TI indicator

**Severity Bands:**
- `High`: RiskScore ‚â• 6
- `Medium`: RiskScore 4-5
- `Low`: RiskScore < 4

In [None]:
# üéØ Risk Scoring & Entity Prioritization

print('üéØ Calculating risk scores per entity...')

from datetime import timedelta

# Initialize score tracking
entity_scores = {}

# === Component 1: Credential Fan-Out Score ===
print('\n1Ô∏è‚É£ Calculating CredentialFanoutScore...')
dle = loaded.get('DeviceLogonEvents', {}).get('df')
if dle is not None:
    dle_cols = set(dle.columns)
    if 'AccountName' in dle_cols and 'DeviceName' in dle_cols:
        ts_col = 'Timestamp' if 'Timestamp' in dle_cols else 'TimeGenerated'
        
        # Window logons by 60-minute windows
        from pyspark.sql.window import Window
        
        logon_window = (
            dle
            .select(
                col('AccountName'),
                col('DeviceName'),
                col(ts_col).alias('Timestamp'),
            )
            .withColumn('TimeWindow', date_trunc('hour', col('Timestamp')))
            .groupBy('AccountName', 'TimeWindow')
            .agg(
                countDistinct('DeviceName').alias('DeviceCount'),
                expr('min(Timestamp)').alias('FirstLogon'),
                expr('max(Timestamp)').alias('LastLogon'),
                expr('collect_set(DeviceName)').alias('Devices'),
            )
            .withColumn('WindowMinutes', (col('LastLogon').cast('long') - col('FirstLogon').cast('long')) / 60)
            .filter((col('DeviceCount') >= 5) & (col('WindowMinutes') <= 60))
        )
        
        cred_fanout_pdf = to_pandas(logon_window, limit=5000)
        
        if len(cred_fanout_pdf) > 0:
            print(f'   ‚úÖ Found {len(cred_fanout_pdf)} accounts with credential fan-out')
            for _, row in cred_fanout_pdf.iterrows():
                account = row['AccountName']
                if account not in entity_scores:
                    entity_scores[account] = {
                        'EntityType': 'Account',
                        'EntityName': account,
                        'CredentialFanoutScore': 0,
                        'RemoteExecScore': 0,
                        'BinaryOrCmdReuseScore': 0,
                        'EastWestFanoutScore': 0,
                        'TIScore': 0,
                        'Details': {},
                    }
                entity_scores[account]['CredentialFanoutScore'] = 3
                entity_scores[account]['Details']['CredFanout'] = {
                    'DeviceCount': int(row['DeviceCount']),
                    'WindowMinutes': float(row['WindowMinutes']),
                    'Devices': row['Devices'][:5],  # First 5
                }
        else:
            print('   ‚ÑπÔ∏è No credential fan-out detected')

# === Component 2: Remote Execution Score ===
print('\n2Ô∏è‚É£ Calculating RemoteExecScore...')
dpe = loaded.get('DeviceProcessEvents', {}).get('df')
if dpe is not None:
    dpe_cols = set(dpe.columns)
    if 'FileName' in dpe_cols:
        ts_col = 'Timestamp' if 'Timestamp' in dpe_cols else 'TimeGenerated'
        account_col = 'AccountName' if 'AccountName' in dpe_cols else 'InitiatingProcessAccountName'
        
        # Remote exec indicators
        remote_exec = (
            dpe
            .filter(
                (lower(col('FileName')).like('%wmic%')) |
                (lower(col('FileName')).like('%powershell%')) |
                (lower(col('FileName')).like('%pwsh%')) |
                (lower(col('FileName')).like('%sc.exe%')) |
                (lower(col('FileName')).like('%wsmprovhost%')) |
                (lower(col('FileName')).like('%psexec%')) |
                (lower(col('InitiatingProcessFileName')).like('%services.exe%'))
            )
            .select(
                col(account_col).alias('AccountName'),
                col('DeviceName'),
                col(ts_col).alias('Timestamp'),
                col('FileName'),
            )
            .withColumn('TimeWindow', date_trunc('hour', col('Timestamp')))
            .groupBy('AccountName', 'TimeWindow')
            .agg(
                countDistinct('DeviceName').alias('DeviceCount'),
                expr('min(Timestamp)').alias('FirstExec'),
                expr('max(Timestamp)').alias('LastExec'),
                expr('collect_set(DeviceName)').alias('Devices'),
            )
            .withColumn('WindowMinutes', (col('LastExec').cast('long') - col('FirstExec').cast('long')) / 60)
            .filter((col('DeviceCount') >= 2) & (col('WindowMinutes') <= 60))
        )
        
        remote_exec_pdf = to_pandas(remote_exec, limit=5000)
        
        if len(remote_exec_pdf) > 0:
            print(f'   ‚úÖ Found {len(remote_exec_pdf)} accounts with remote execution patterns')
            for _, row in remote_exec_pdf.iterrows():
                account = row['AccountName']
                if account not in entity_scores:
                    entity_scores[account] = {
                        'EntityType': 'Account',
                        'EntityName': account,
                        'CredentialFanoutScore': 0,
                        'RemoteExecScore': 0,
                        'BinaryOrCmdReuseScore': 0,
                        'EastWestFanoutScore': 0,
                        'TIScore': 0,
                        'Details': {},
                    }
                entity_scores[account]['RemoteExecScore'] = 3
                entity_scores[account]['Details']['RemoteExec'] = {
                    'DeviceCount': int(row['DeviceCount']),
                    'WindowMinutes': float(row['WindowMinutes']),
                    'Devices': row['Devices'][:5],
                }
        else:
            print('   ‚ÑπÔ∏è No remote execution patterns detected')

# === Component 3: Binary/Command Reuse Score ===
print('\n3Ô∏è‚É£ Calculating BinaryOrCmdReuseScore...')
if dpe is not None and 'SHA256' in dpe_cols:
    # Hash reuse across devices
    hash_reuse = (
        dpe
        .filter(col('SHA256').isNotNull())
        .select(
            col('SHA256'),
            col('DeviceName'),
            col('FileName'),
        )
        .groupBy('SHA256')
        .agg(
            countDistinct('DeviceName').alias('DeviceCount'),
            expr('collect_set(DeviceName)').alias('Devices'),
            expr('collect_set(FileName)').alias('FileNames'),
        )
        .filter(col('DeviceCount') >= 3)
    )
    
    hash_reuse_pdf = to_pandas(hash_reuse, limit=5000)
    
    if len(hash_reuse_pdf) > 0:
        print(f'   ‚úÖ Found {len(hash_reuse_pdf)} hashes reused across ‚â•3 devices')
        # Score devices involved in hash reuse
        for _, row in hash_reuse_pdf.iterrows():
            for device in row['Devices'][:10]:  # Limit to first 10
                if device not in entity_scores:
                    entity_scores[device] = {
                        'EntityType': 'Device',
                        'EntityName': device,
                        'CredentialFanoutScore': 0,
                        'RemoteExecScore': 0,
                        'BinaryOrCmdReuseScore': 0,
                        'EastWestFanoutScore': 0,
                        'TIScore': 0,
                        'Details': {},
                    }
                entity_scores[device]['BinaryOrCmdReuseScore'] = 2
                if 'BinaryReuse' not in entity_scores[device]['Details']:
                    entity_scores[device]['Details']['BinaryReuse'] = []
                entity_scores[device]['Details']['BinaryReuse'].append({
                    'Hash': row['SHA256'][:16] + '...',
                    'DeviceCount': int(row['DeviceCount']),
                })
    else:
        print('   ‚ÑπÔ∏è No hash reuse detected')

# === Component 4: East-West Fan-Out Score ===
print('\n4Ô∏è‚É£ Calculating EastWestFanoutScore...')
dne = loaded.get('DeviceNetworkEvents', {}).get('df')
if dne is not None:
    dne_cols = set(dne.columns)
    if 'RemoteIP' in dne_cols and 'RemotePort' in dne_cols:
        ts_col = 'Timestamp' if 'Timestamp' in dne_cols else 'TimeGenerated'
        lateral_ports = [445, 135, 5985, 5986, 3389, 22]
        
        network_fanout = (
            dne
            .filter(col('RemotePort').isin(lateral_ports))
            .select(
                col('DeviceName'),
                col('RemoteIP'),
                col('RemotePort'),
                col(ts_col).alias('Timestamp'),
            )
            .withColumn('TimeWindow', date_trunc('hour', col('Timestamp')))
            .groupBy('DeviceName', 'TimeWindow')
            .agg(
                countDistinct('RemoteIP').alias('UniqueIPs'),
                expr('min(Timestamp)').alias('FirstConn'),
                expr('max(Timestamp)').alias('LastConn'),
                expr('collect_set(RemoteIP)').alias('RemoteIPs'),
            )
            .withColumn('WindowMinutes', (col('LastConn').cast('long') - col('FirstConn').cast('long')) / 60)
            .filter((col('UniqueIPs') >= 20) & (col('WindowMinutes') <= 30))
        )
        
        fanout_pdf = to_pandas(network_fanout, limit=5000)
        
        if len(fanout_pdf) > 0:
            print(f'   ‚úÖ Found {len(fanout_pdf)} devices with network fan-out')
            for _, row in fanout_pdf.iterrows():
                device = row['DeviceName']
                if device not in entity_scores:
                    entity_scores[device] = {
                        'EntityType': 'Device',
                        'EntityName': device,
                        'CredentialFanoutScore': 0,
                        'RemoteExecScore': 0,
                        'BinaryOrCmdReuseScore': 0,
                        'EastWestFanoutScore': 0,
                        'TIScore': 0,
                        'Details': {},
                    }
                entity_scores[device]['EastWestFanoutScore'] = 2
                entity_scores[device]['Details']['NetworkFanout'] = {
                    'UniqueIPs': int(row['UniqueIPs']),
                    'WindowMinutes': float(row['WindowMinutes']),
                    'RemoteIPs': row['RemoteIPs'][:10],  # First 10
                }
        else:
            print('   ‚ÑπÔ∏è No network fan-out detected')

# === Component 5: TI Score ===
print('\n5Ô∏è‚É£ Calculating TIScore...')
ti_df = loaded.get('ThreatIntelIndicators', {}).get('df')
if ti_df is not None and dpe is not None:
    ti_cols = set(ti_df.columns)
    if 'FileHashValue' in ti_cols and 'SHA256' in dpe_cols:
        # Match hashes against TI
        ti_hash_matches = (
            dpe
            .select(col('DeviceName'), col('SHA256'))
            .filter(col('SHA256').isNotNull())
            .join(
                ti_df.select(col('FileHashValue').alias('SHA256')),
                'SHA256',
                'inner'
            )
            .select('DeviceName', 'SHA256')
            .distinct()
        )
        
        ti_matches_pdf = to_pandas(ti_hash_matches, limit=5000)
        
        if len(ti_matches_pdf) > 0:
            print(f'   ‚úÖ Found {len(ti_matches_pdf)} TI matches')
            for _, row in ti_matches_pdf.iterrows():
                device = row['DeviceName']
                if device not in entity_scores:
                    entity_scores[device] = {
                        'EntityType': 'Device',
                        'EntityName': device,
                        'CredentialFanoutScore': 0,
                        'RemoteExecScore': 0,
                        'BinaryOrCmdReuseScore': 0,
                        'EastWestFanoutScore': 0,
                        'TIScore': 0,
                        'Details': {},
                    }
                entity_scores[device]['TIScore'] = 1
                if 'TIMatches' not in entity_scores[device]['Details']:
                    entity_scores[device]['Details']['TIMatches'] = []
                entity_scores[device]['Details']['TIMatches'].append(row['SHA256'][:16] + '...')
        else:
            print('   ‚ÑπÔ∏è No TI matches')
else:
    print('   ‚ÑπÔ∏è TI enrichment skipped (table not available)')

# === Calculate Final Risk Scores ===
print('\nüìà Calculating final risk scores...')

risk_results = []
for entity, scores in entity_scores.items():
    risk_score = (
        scores['CredentialFanoutScore'] +
        scores['RemoteExecScore'] +
        scores['BinaryOrCmdReuseScore'] +
        scores['EastWestFanoutScore'] +
        scores['TIScore']
    )
    
    if risk_score >= 6:
        severity = 'High'
    elif risk_score >= 4:
        severity = 'Medium'
    else:
        severity = 'Low'
    
    risk_results.append({
        'EntityType': scores['EntityType'],
        'EntityName': entity,
        'RiskScore': risk_score,
        'Severity': severity,
        'CredentialFanoutScore': scores['CredentialFanoutScore'],
        'RemoteExecScore': scores['RemoteExecScore'],
        'BinaryOrCmdReuseScore': scores['BinaryOrCmdReuseScore'],
        'EastWestFanoutScore': scores['EastWestFanoutScore'],
        'TIScore': scores['TIScore'],
        'Details': str(scores['Details'])[:200] + '...',  # Truncate for display
    })

risk_df = pd.DataFrame(risk_results)

if len(risk_df) > 0:
    risk_df = risk_df.sort_values('RiskScore', ascending=False)
    
    print(f'\n‚úÖ Risk scoring complete: {len(risk_df)} entities scored')
    print(f"   High: {len(risk_df[risk_df['Severity'] == 'High'])}")
    print(f"   Medium: {len(risk_df[risk_df['Severity'] == 'Medium'])}")
    print(f"   Low: {len(risk_df[risk_df['Severity'] == 'Low'])}")
    
    print('\nüí• Top 30 High-Risk Entities:')
    print(risk_df.head(30).to_string(index=False))
else:
    print('\n‚ÑπÔ∏è No entities scored (no behavioral signals detected)')