# üî• Palo Alto Firewall Investigation ‚Äî Sentinel Data Lake Spark Notebook

Interactive security analysis of **Palo Alto PAN-OS** traffic from the `CommonSecurityLog` table using Microsoft Sentinel's **data lake** Spark engine and `MicrosoftSentinelProvider`.

## üìã Analysis Sections

| # | Section | Description |
|---|---------|-------------|
| 1 | **Setup & Connection** | Initialise `MicrosoftSentinelProvider` and configure parameters |
| 2 | **Traffic Overview** | Event volume by class (TRAFFIC vs THREAT) and device action |
| 3 | **Top Talkers** | Busiest source ‚Üí destination pairs by sessions and bytes |
| 4 | **Protocol & Port Heatmap** | Destination port usage by protocol |
| 5 | **Beaconing Detection** | Periodic callback patterns indicating C2 activity |
| 6 | **DNS Tunneling Analysis** | Anomalous DNS payload sizes |
| 7 | **Data Exfiltration** | Large outbound transfers to external IPs |
| 8 | **Threat Breakdown** | IDS/IPS alert categories and actions |
| 9 | **Attack Timeline** | Correlated suspicious events across the kill chain |
| 10 | **Save & Cleanup** | Persist results to data lake and manage tables |


## üîó Step 1: Connect to Sentinel Data Lake

Establish a connection using the **MicrosoftSentinelProvider** ‚Äî your gateway to all Sentinel data lake operations.

### What is MicrosoftSentinelProvider?
The `MicrosoftSentinelProvider` is a Python class that:
- üîó Connects your Spark session to Microsoft Sentinel data lake
- üìã Lists available databases (workspaces) and tables
- üìñ Reads security log data at scale via PySpark DataFrames
- üíæ Saves processed results back to the data lake

‚û°Ô∏è Simply initialise the provider with your active Spark session ‚Äî that's it!

üìö **Learn More**: [Microsoft Sentinel Provider Class Reference](https://learn.microsoft.com/en-us/azure/sentinel/datalake/sentinel-provider-class-reference)

In [34]:
# Generic libraries for data manipulation and PySpark operations
from datetime import datetime, timedelta
from pyspark.sql import functions as F
from pyspark.sql.window import Window

# Loading the MicrosoftSentinelProvider from sentinel_lake library
from sentinel_lake.providers import MicrosoftSentinelProvider

# Initialize provider
data_provider = MicrosoftSentinelProvider(spark)

print("‚úÖ MicrosoftSentinelProvider initialized")
print("üìã Available workspaces:")
for db in data_provider.list_databases():
    print(f"   ‚Ä¢ {db}")

StatementMeta(MSGLarge, 0, 35, Finished, Available, Finished)

‚úÖ MicrosoftSentinelProvider initialized
üìã Available workspaces:
   ‚Ä¢ System Tables
   ‚Ä¢ cybersoc


## ‚öôÔ∏è Step 2: Configure Parameters

Before diving into the data, set up configuration parameters for reusability and performance.

### üìã What We're Configuring:
- **üïí Time Window** ‚Üí How far back to look (e.g., last 4 hours)
- **üìÇ Input Table** ‚Üí `CommonSecurityLog` (Palo Alto PAN-OS logs)
- **üéØ Output Table** ‚Üí Where to save enriched investigation results
- **üè¢ Workspace** ‚Üí Your Sentinel workspace name
- **üè≠ Vendor Filter** ‚Üí `"Palo Alto Networks"` to isolate PAN-OS events

### üîß Why This Matters:
- **Reusability**: Change dates/tables without modifying analysis code
- **Performance**: Smaller time windows = faster Spark queries
- **Organisation**: Clear naming tracks your investigation

> **‚ö†Ô∏è Remember**: Replace `<YOUR_WORKSPACE_NAME>` with your actual Sentinel workspace name!

In [None]:
# ‚îÄ‚îÄ Time window ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ
lookback_hours = 24  # Adjust lookback period to match your investigation needs
run_end = datetime.now().replace(minute=0, second=0, microsecond=0)
run_start = run_end - timedelta(hours=lookback_hours)

# ‚îÄ‚îÄ Workspace ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ
workspace_name = "cybersoc" 

# ‚îÄ‚îÄ Table names ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ
input_table = "CommonSecurityLog"
output_table = "PaloAlto_Investigation_SPRK"  # _SPRK suffix ‚Üí writes to data lake tier

# ‚îÄ‚îÄ Vendor filter ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ
VENDOR_FILTER = "Palo Alto Networks"

# ‚îÄ‚îÄ Write options ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ
write_options = {"mode": "append", "partitionBy": ["date"]}

print(f"üìÖ Time Window: {run_start} ‚Üí {run_end}")
print(f"üìÇ Input Table: {input_table}")
print(f"üíæ Output Table: {output_table}")
print("‚úÖ Parameters configured")

StatementMeta(MSGLarge, 0, 36, Finished, Available, Finished)

üìÖ Time Window: 2026-02-11 01:00:00 ‚Üí 2026-02-12 01:00:00
üìÇ Input Table: CommonSecurityLog
üíæ Output Table: PaloAlto_Investigation_SPRK_CL
‚úÖ Parameters configured


## üìä Step 3: Load & Prepare CommonSecurityLog Data

Load the raw Palo Alto firewall data from `CommonSecurityLog`, filter to the configured time window, and enrich with derived columns.

### üéØ What This Code Does:
1. **üìñ Reads** `CommonSecurityLog` from the Sentinel data lake via `data_provider.read_table()`
2. **üè≠ Filters** to Palo Alto Networks events only (`DeviceVendor`)
3. **üïí Time-filters** to the configured lookback window
4. **üèóÔ∏è Enriches** with:
   - `date` column for partitioning and time-based analysis
   - `TotalBytes` computed column (SentBytes + ReceivedBytes)
   - `IsExternal` flag for external destination detection
5. **‚ö° Caches** the DataFrame in memory for fast subsequent analysis

> **üí° Pro Tip**: The `.cache()` operation stores data in Spark memory, making all downstream analysis cells significantly faster!

In [36]:
# ‚îÄ‚îÄ Fields of interest from CommonSecurityLog ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ
csl_fields = [
    "TimeGenerated",
    "DeviceVendor",
    "DeviceProduct",
    "DeviceEventClassID",       # TRAFFIC or THREAT
    "Activity",
    "LogSeverity",
    "DeviceAction",
    "SimplifiedDeviceAction",   # Allowed / Blocked / Dropped / Reset
    "ApplicationProtocol",
    "Protocol",
    "SourceIP",
    "SourcePort",
    "SourceUserName",
    "SourceHostName",
    "DestinationIP",
    "DestinationPort",
    "DestinationHostName",
    "SentBytes",
    "ReceivedBytes",
    "RequestURL",
    "CommunicationDirection",
    "Message",
    "DeviceName",
]

# ‚îÄ‚îÄ Load, filter, and enrich ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ

# Helper: simple check for RFC-1918 private IPs
_private_prefixes = ("10.", "172.16.", "172.17.", "172.18.", "172.19.",
                     "172.20.", "172.21.", "172.22.", "172.23.", "172.24.",
                     "172.25.", "172.26.", "172.27.", "172.28.", "172.29.",
                     "172.30.", "172.31.", "192.168.")

is_private_udf = F.udf(lambda ip: ip is not None and ip.startswith(_private_prefixes), "boolean")

df_paloalto = (
    data_provider.read_table(input_table, workspace_name)
    .select(*csl_fields)
    # Filter to Palo Alto only
    .filter(F.col("DeviceVendor") == VENDOR_FILTER)
    # Filter to time window
    .filter(
        (F.col("TimeGenerated") > F.lit(run_start)) &
        (F.col("TimeGenerated") <= F.lit(run_end))
    )
    # Enrichment columns
    .withColumn("date", F.to_date("TimeGenerated"))
    .withColumn("TotalBytes", F.col("SentBytes") + F.col("ReceivedBytes"))
    .withColumn("IsExternal", ~is_private_udf(F.col("DestinationIP")))
    .cache()
)

# Print schema (metadata only ‚Äî no data scan, very lightweight)
df_paloalto.printSchema()
total_events = df_paloalto.count()
print(f"‚úÖ Loaded {total_events:,} Palo Alto events from {input_table}")
print(f"   TRAFFIC: {df_paloalto.filter(F.col('DeviceEventClassID') == 'TRAFFIC').count():,}")
print(f"   THREAT:  {df_paloalto.filter(F.col('DeviceEventClassID') == 'THREAT').count():,}")

StatementMeta(MSGLarge, 0, 37, Finished, Available, Finished)

{"Level": "INFO", "TraceId": "5032b519-dc4d-4af2-879b-93235ae3aa81", "Message": "Loading table: CommonSecurityLog"}
{"Level": "INFO", "TraceId": "5032b519-dc4d-4af2-879b-93235ae3aa81", "Message": "Successfully loaded table CommonSecurityLog"}
root
 |-- TimeGenerated: timestamp (nullable = true)
 |-- DeviceVendor: string (nullable = true)
 |-- DeviceProduct: string (nullable = true)
 |-- DeviceEventClassID: string (nullable = true)
 |-- Activity: string (nullable = true)
 |-- LogSeverity: string (nullable = true)
 |-- DeviceAction: string (nullable = true)
 |-- SimplifiedDeviceAction: string (nullable = true)
 |-- ApplicationProtocol: string (nullable = true)
 |-- Protocol: string (nullable = true)
 |-- SourceIP: string (nullable = true)
 |-- SourcePort: integer (nullable = true)
 |-- SourceUserName: string (nullable = true)
 |-- SourceHostName: string (nullable = true)
 |-- DestinationIP: string (nullable = true)
 |-- DestinationPort: integer (nullable = true)
 |-- DestinationHostName:

## üîç Step 4: Traffic Overview

Volume breakdown by event class (`TRAFFIC` vs `THREAT`) and device action.

This gives you an immediate feel for:
- üìä How much traffic is normal vs threat-classified
- üö´ How many events were blocked/dropped vs allowed
- üïí Event volume trends over time

In [37]:
import plotly.express as px
import plotly.graph_objects as go
from plotly.subplots import make_subplots

# ‚îÄ‚îÄ 4a: Event distribution sunburst ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ
df_overview = (
    df_paloalto
    .groupBy("DeviceEventClassID", "SimplifiedDeviceAction")
    .count()
    .orderBy(F.desc("count"))
    .toPandas()
)

fig = px.sunburst(
    df_overview,
    path=["DeviceEventClassID", "SimplifiedDeviceAction"],
    values="count",
    title="üî• Firewall Event Distribution ‚Äî Class ‚Üí Action",
    color="DeviceEventClassID",
    color_discrete_map={"TRAFFIC": "#636EFA", "THREAT": "#EF553B"},
)
fig.update_layout(width=700, height=500)
fig.show()
print(f"‚úÖ {len(df_overview)} unique class/action combinations")

StatementMeta(MSGLarge, 0, 38, Finished, Available, Finished)

‚úÖ 3 unique class/action combinations


In [38]:
# ‚îÄ‚îÄ 4b: Event volume over time (15-minute bins) ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ
df_timeline = (
    df_paloalto
    .withColumn("TimeBin", F.window("TimeGenerated", "15 minutes").start)
    .groupBy("TimeBin", "DeviceEventClassID")
    .count()
    .orderBy("TimeBin")
    .toPandas()
)

fig = px.area(
    df_timeline,
    x="TimeBin",
    y="count",
    color="DeviceEventClassID",
    title="üìà Firewall Event Volume Over Time (15m bins)",
    color_discrete_map={"TRAFFIC": "#636EFA", "THREAT": "#EF553B"},
)
fig.update_layout(
    xaxis_title="Time (UTC)", yaxis_title="Events",
    width=950, height=400,
)
fig.show()

StatementMeta(MSGLarge, 0, 39, Finished, Available, Finished)

## üèÜ Step 5: Top Talkers

Busiest source ‚Üí destination pairs by **session count** and **total bytes transferred**.

High session counts can indicate:
- üîç Scanning or reconnaissance
- üîÑ Persistent connections (C2)
- üì§ Bulk data movement

In [39]:
# ‚îÄ‚îÄ Top 15 talkers by sessions and data volume ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ
df_talkers = (
    df_paloalto
    .filter(F.col("DeviceEventClassID") == "TRAFFIC")
    .groupBy("SourceIP", "DestinationIP")
    .agg(
        F.count("*").alias("Sessions"),
        F.sum("SentBytes").alias("TotalBytesSent"),
        F.sum("ReceivedBytes").alias("TotalBytesRecv"),
    )
    .withColumn("TotalMB", F.round((F.col("TotalBytesSent") + F.col("TotalBytesRecv")) / 1048576.0, 2))
    .orderBy(F.desc("Sessions"))
    .limit(15)
    .toPandas()
)

df_talkers["Pair"] = df_talkers["SourceIP"] + " ‚Üí " + df_talkers["DestinationIP"]

fig = make_subplots(rows=1, cols=2, subplot_titles=("Sessions", "Data Transferred (MB)"))

fig.add_trace(
    go.Bar(y=df_talkers["Pair"], x=df_talkers["Sessions"],
           orientation="h", marker_color="#636EFA", name="Sessions"),
    row=1, col=1,
)
fig.add_trace(
    go.Bar(y=df_talkers["Pair"], x=df_talkers["TotalMB"],
           orientation="h", marker_color="#EF553B", name="MB"),
    row=1, col=2,
)
fig.update_layout(title="üèÜ Top 15 Talkers", height=500, width=1000, showlegend=False)
fig.update_yaxes(autorange="reversed")
fig.show()

StatementMeta(MSGLarge, 0, 40, Finished, Available, Finished)

## üó∫Ô∏è Step 6: Protocol & Port Heatmap

Destination ports by protocol ‚Äî highlights **unusual port usage** that may indicate:
- üîì Non-standard services on unexpected ports
- üïµÔ∏è Covert channels or tunnelling
- üîç Reconnaissance (many ports from one source)

In [40]:
# ‚îÄ‚îÄ Top 30 destination ports by protocol ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ
df_ports = (
    df_paloalto
    .filter(F.col("DeviceEventClassID") == "TRAFFIC")
    .groupBy(F.col("DestinationPort").cast("string").alias("DestPort"), "Protocol")
    .agg(F.count("*").alias("Sessions"))
    .orderBy(F.desc("Sessions"))
    .limit(30)
    .toPandas()
)

fig = px.treemap(
    df_ports,
    path=["Protocol", "DestPort"],
    values="Sessions",
    title="üó∫Ô∏è Top 30 Destination Ports by Protocol",
    color="Sessions",
    color_continuous_scale="Blues",
)
fig.update_layout(width=900, height=500)
fig.show()

StatementMeta(MSGLarge, 0, 41, Finished, Available, Finished)

## üì° Step 7: Beaconing Detection

Look for **periodic callback patterns** ‚Äî regular intervals between sessions from the same source to the same destination suggest C2 beaconing.

### üéØ Detection Logic:
1. Collect all timestamps per source ‚Üí destination pair
2. Compute time deltas between consecutive sessions
3. Calculate **jitter** = StdDev / Mean of intervals
4. **Low jitter** (< 0.5) with multiple intervals = highly suspicious

> **üö® Security Insight**: Automated malware callbacks tend to be very regular (low jitter), while human traffic is irregular (high jitter).

In [41]:
# ‚îÄ‚îÄ Beaconing: compute inter-arrival jitter per pair ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ
# Focus on external destinations with at least 5 sessions
df_beacon_raw = (
    df_paloalto
    .filter(F.col("DeviceEventClassID").isin("TRAFFIC", "THREAT"))
    .filter(F.col("IsExternal") == True)
    .select("SourceIP", "DestinationIP", "TimeGenerated")
)

# Window: order by time within each (Src, Dst) pair
pair_window = Window.partitionBy("SourceIP", "DestinationIP").orderBy("TimeGenerated")

df_beacon = (
    df_beacon_raw
    # Previous timestamp in same pair
    .withColumn("PrevTs", F.lag("TimeGenerated").over(pair_window))
    .filter(F.col("PrevTs").isNotNull())
    # Delta in seconds
    .withColumn("DeltaSec", F.unix_timestamp("TimeGenerated") - F.unix_timestamp("PrevTs"))
    .filter(F.col("DeltaSec") > 0)
    # Aggregate per pair
    .groupBy("SourceIP", "DestinationIP")
    .agg(
        F.avg("DeltaSec").alias("AvgDelta"),
        F.stddev("DeltaSec").alias("StdDelta"),
        F.min("DeltaSec").alias("MinDelta"),
        F.max("DeltaSec").alias("MaxDelta"),
        F.count("*").alias("Intervals"),
    )
    .filter(F.col("Intervals") >= 4)
    # Jitter = StdDev / Mean ‚Äî low jitter = suspicious
    .withColumn("Jitter", F.when(F.col("AvgDelta") > 0, F.col("StdDelta") / F.col("AvgDelta")).otherwise(0.0))
    .filter(F.col("Jitter") < 0.5)
    .orderBy("Jitter")
    .toPandas()
)

if len(df_beacon) > 0:
    df_beacon["Pair"] = df_beacon["SourceIP"] + " ‚Üí " + df_beacon["DestinationIP"]

    fig = px.scatter(
        df_beacon,
        x="AvgDelta", y="Jitter",
        size="Intervals", hover_name="Pair",
        title="üì° Beaconing Detection ‚Äî Low Jitter = Suspicious",
        labels={"AvgDelta": "Avg Interval (sec)", "Jitter": "Jitter (StdDev/Avg)"},
        color="Jitter", color_continuous_scale="RdYlGn",
    )
    fig.update_layout(width=900, height=500)
    fig.show()
    print(f"‚ö†Ô∏è Found {len(df_beacon)} potential beaconing pair(s)")
else:
    print("üéâ No beaconing patterns detected in the current time window.")

StatementMeta(MSGLarge, 0, 42, Finished, Available, Finished)

üéâ No beaconing patterns detected in the current time window.


## üåê Step 8: DNS Tunneling Analysis

DNS sessions with **anomalous byte counts** ‚Äî DNS tunnels push data through oversized DNS payloads.

### üéØ What to Look For:
- Average sent bytes per DNS session **> 512 bytes** (typical DNS max)
- High query counts from a single source
- Multiple distinct DNS destinations

> **üö® Security Insight**: Legitimate DNS queries are small (< 512 bytes). Large payloads suggest data exfiltration via DNS tunnelling.

In [42]:
# ‚îÄ‚îÄ DNS tunneling: flag sources with abnormally large DNS payloads ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ
df_dns = (
    df_paloalto
    .filter(
        (F.col("DestinationPort") == 53) |
        (F.lower(F.col("ApplicationProtocol")).contains("dns"))
    )
    .groupBy("SourceIP")
    .agg(
        F.count("*").alias("QueryCount"),
        F.sum("SentBytes").alias("TotalBytesSent"),
        F.sum("ReceivedBytes").alias("TotalBytesRecv"),
        F.avg("SentBytes").alias("AvgSentBytes"),
        F.max("SentBytes").alias("MaxSentBytes"),
        F.countDistinct("DestinationIP").alias("DistinctDests"),
    )
    .withColumn("AvgPayload", F.round("AvgSentBytes", 0))
    .filter(F.col("QueryCount") >= 3)
    .orderBy(F.desc("AvgPayload"))
    .toPandas()
)

if len(df_dns) > 0:
    fig = px.bar(
        df_dns,
        x="SourceIP",
        y=["AvgPayload", "MaxSentBytes"],
        barmode="group",
        title="üåê DNS Traffic per Source ‚Äî High Avg Payload = Possible Tunnelling",
        labels={"value": "Bytes", "variable": "Metric"},
    )
    fig.add_hline(y=512, line_dash="dash", line_color="red",
                  annotation_text="Typical DNS max (512B)")
    fig.update_layout(width=900, height=450)
    fig.show()
    print(f"üìä {len(df_dns)} source(s) with DNS traffic")
else:
    print("‚ÑπÔ∏è No DNS traffic found in the current time window.")

StatementMeta(MSGLarge, 0, 43, Finished, Available, Finished)

üìä 6 source(s) with DNS traffic


## üì§ Step 9: Data Exfiltration ‚Äî Large Outbound Transfers

Sessions with unusually high bytes sent to **external** destinations.

### üéØ Detection Logic:
- Filter to TRAFFIC events with external (non-RFC-1918) destinations
- Compute **Sent/Received ratio** ‚Äî exfiltration shows high ratio (sending >> receiving)
- Flag transfers > 1 MB

> **üö® Security Insight**: A high sent/receive ratio to an external IP, especially one not in your threat intelligence allowlist, is a strong exfiltration indicator.

In [43]:
# ‚îÄ‚îÄ Exfiltration: large outbound transfers to external destinations ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ
df_exfil = (
    df_paloalto
    .filter(F.col("DeviceEventClassID") == "TRAFFIC")
    .filter(F.col("IsExternal") == True)
    .groupBy("SourceIP", "DestinationIP")
    .agg(
        F.count("*").alias("Sessions"),
        F.round(F.sum("SentBytes") / 1048576.0, 2).alias("TotalSentMB"),
        F.round(F.sum("ReceivedBytes") / 1048576.0, 2).alias("TotalRecvMB"),
        F.round(F.max("SentBytes") / 1048576.0, 2).alias("MaxSingleSentMB"),
        F.min("TimeGenerated").alias("FirstSeen"),
        F.max("TimeGenerated").alias("LastSeen"),
    )
    # Sent/Recv ratio ‚Äî high = exfiltration candidate
    .withColumn("Ratio", F.when(F.col("TotalRecvMB") > 0,
                                F.round(F.col("TotalSentMB") / F.col("TotalRecvMB"), 2))
                          .otherwise(F.col("TotalSentMB")))
    .filter(F.col("TotalSentMB") > 1)
    .orderBy(F.desc("TotalSentMB"))
    .limit(20)
    .toPandas()
)

if len(df_exfil) > 0:
    df_exfil["Pair"] = df_exfil["SourceIP"] + " ‚Üí " + df_exfil["DestinationIP"]

    fig = px.scatter(
        df_exfil,
        x="TotalSentMB", y="Ratio",
        size="Sessions", hover_name="Pair",
        title="üì§ Exfiltration Candidates ‚Äî Large Outbound + High Send/Recv Ratio",
        labels={"TotalSentMB": "Total Sent (MB)", "Ratio": "Sent/Recv Ratio"},
        color="TotalSentMB", color_continuous_scale="Reds",
    )
    fig.update_layout(width=900, height=500)
    fig.show()
    print(f"‚ö†Ô∏è {len(df_exfil)} potential exfiltration pair(s)")
else:
    print("üéâ No large outbound transfers found.")

StatementMeta(MSGLarge, 0, 44, Finished, Available, Finished)

‚ö†Ô∏è 1 potential exfiltration pair(s)


## üõ°Ô∏è Step 10: Threat Breakdown

IDS/IPS threat categories and actions from `THREAT` events ‚Äî shows what the firewall's threat engine is detecting.

This reveals:
- ü¶† **Spyware** detections (e.g., C2 callbacks)
- üîó **URL filtering** blocks (malicious/phishing sites)
- üí• **Vulnerability** exploits (CVE-based signatures)
- üìä **Action effectiveness** ‚Äî are threats being blocked or just alerted?

In [44]:
# ‚îÄ‚îÄ Threat categories by action ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ
df_threats = (
    df_paloalto
    .filter(F.col("DeviceEventClassID") == "THREAT")
    .groupBy("Activity", "SimplifiedDeviceAction", "LogSeverity")
    .agg(F.count("*").alias("Count"))
    .orderBy(F.desc("Count"))
    .toPandas()
)

if len(df_threats) > 0:
    fig = px.bar(
        df_threats,
        x="Activity", y="Count",
        color="SimplifiedDeviceAction",
        title="üõ°Ô∏è Threat Categories by Action",
        text="Count",
        color_discrete_sequence=px.colors.qualitative.Bold,
    )
    fig.update_layout(width=900, height=450, xaxis_tickangle=-30)
    fig.show()
    print(f"üìä {df_threats['Count'].sum()} total THREAT events across {df_threats['Activity'].nunique()} categories")
else:
    print("üéâ No THREAT events found in the current time window.")

StatementMeta(MSGLarge, 0, 45, Finished, Available, Finished)

üìä 2 total THREAT events across 1 categories


## ‚è±Ô∏è Step 11: Attack Timeline

All suspicious and threat events plotted chronologically ‚Äî correlate with kill chain stages.

### üéØ Events Included:
- All `THREAT` events (IDS/IPS detections)
- Blocked / Dropped / Reset traffic actions
- Sessions with > 500 KB sent (large transfers)
- DNS sessions with > 512 bytes sent (tunnelling candidates)

> **üí° Pro Tip**: Use this timeline to identify attack progression ‚Äî reconnaissance ‚Üí delivery ‚Üí exploitation ‚Üí C2 ‚Üí exfiltration.

In [45]:
# ‚îÄ‚îÄ Attack timeline: all suspicious/threat events ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ
df_attack = (
    df_paloalto
    .filter(
        (F.col("DeviceEventClassID") == "THREAT") |
        (F.col("SimplifiedDeviceAction").isin("Blocked", "Dropped", "Reset")) |
        (F.col("SentBytes") > 500000) |
        ((F.col("DestinationPort") == 53) & (F.col("SentBytes") > 512))
    )
    .select(
        "TimeGenerated", "DeviceEventClassID", "Activity",
        "SimplifiedDeviceAction", "SourceIP", "DestinationIP",
        "DestinationPort", "SentBytes", "ReceivedBytes",
        "ApplicationProtocol", "SourceUserName",
    )
    .orderBy("TimeGenerated")
    .toPandas()
)

if len(df_attack) > 0:
    import pandas as pd
    df_attack["TimeGenerated"] = pd.to_datetime(df_attack["TimeGenerated"])
    df_attack["SentBytes"] = pd.to_numeric(df_attack["SentBytes"], errors="coerce").fillna(0)
    df_attack["Label"] = df_attack["Activity"] + " (" + df_attack["SimplifiedDeviceAction"] + ")"

    fig = px.scatter(
        df_attack,
        x="TimeGenerated", y="DestinationPort",
        color="DeviceEventClassID",
        size="SentBytes",
        hover_data=["SourceIP", "DestinationIP", "Activity", "ApplicationProtocol"],
        title="‚è±Ô∏è Attack Timeline ‚Äî Suspicious & Threat Events",
        color_discrete_map={"TRAFFIC": "#636EFA", "THREAT": "#EF553B"},
    )
    fig.update_layout(width=1000, height=500, yaxis_title="Destination Port")
    fig.show()

    # Summary
    print(f"\n{'='*80}")
    print(f"  Total suspicious/threat events: {len(df_attack)}")
    print(f"  Time range: {df_attack['TimeGenerated'].min()} ‚Üí {df_attack['TimeGenerated'].max()}")
    print(f"  Unique source IPs: {df_attack['SourceIP'].nunique()}")
    print(f"  Unique destinations: {df_attack['DestinationIP'].nunique()}")
    print(f"{'='*80}")
else:
    print("üéâ No suspicious events found in the current time window.")

StatementMeta(MSGLarge, 0, 46, Finished, Available, Finished)


  Total suspicious/threat events: 415
  Time range: 2026-02-11 01:01:50.033970 ‚Üí 2026-02-11 23:07:00.396850
  Unique source IPs: 6
  Unique destinations: 97


## üíæ Step 12: Save Results to Data Lake

Save the enriched investigation DataFrame back to the Sentinel data lake for:
- üìä Dashboard consumption
- üîç Future investigation reference
- ü§ù Sharing with your SOC team
- ‚ö° Faster follow-up queries

### üìã Save Details:
- **Table**: `PaloAlto_Investigation_SPRK` (the `_SPRK` suffix writes to the data lake tier)
- **Mode**: `append` (adds to existing data)
- **Partition**: By `date` column for optimised queries

> **‚ö†Ô∏è Important**: This creates a custom table in the data lake tier. Use `delete_table()` to clean up when no longer needed.

In [46]:
# ‚îÄ‚îÄ Save enriched data to the data lake tier ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ
try:
    run_id = data_provider.save_as_table(
        df_paloalto,
        output_table,
        "System tables",        # "System tables" ‚Üí data lake tier
        write_options,
    )
    print(f"‚úÖ Saved investigation data to {output_table}")
    print(f"   Run ID: {run_id}")
except Exception as e:
    print(f"‚ùå Failed to save: {e}")

StatementMeta(MSGLarge, 0, 47, Finished, Available, Finished)

{"Level": "INFO", "TraceId": "5032b519-dc4d-4af2-879b-93235ae3aa81", "Message": "Saving DataFrame as table: PaloAlto_Investigation_SPRK_CL"}
{"Level": "ERROR", "TraceId": "5032b519-dc4d-4af2-879b-93235ae3aa81", "ErrorName": "MissingTableSuffixLake", "ErrorType": "User", "ErrorCategory": "InputError", "ErrorCode": 2603, "Message": "Invalid custom table name. All names of custom tables in the data lake must end with _SPRK."}
‚ùå Failed to save: 
ErrorName: MissingTableSuffixLake
ErrorCode: 2603
ErrorType: User
ErrorCategory: InputError
ErrorMessage: Invalid custom table name. All names of custom tables in the data lake must end with _SPRK.


In [47]:
# ‚îÄ‚îÄ Verify: read back the saved table ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ
try:
    df_verify = data_provider.read_table(output_table, "System tables")
    row_count = df_verify.count()
    print(f"‚úÖ Verified: {output_table} contains {row_count:,} rows")
    print("üìã Table schema:")
    df_verify.printSchema()
except Exception as e:
    print(f"‚ö†Ô∏è Table not yet available (may need a moment): {e}")

StatementMeta(MSGLarge, 0, 48, Finished, Available, Finished)

{"Level": "INFO", "TraceId": "5032b519-dc4d-4af2-879b-93235ae3aa81", "Message": "Loading table: PaloAlto_Investigation_SPRK_CL"}
{"Level": "ERROR", "TraceId": "5032b519-dc4d-4af2-879b-93235ae3aa81", "ErrorName": "TableDoesNotExist", "ErrorType": "User", "ErrorCategory": "TableError", "ErrorCode": 2100, "Message": "Table PaloAlto_Investigation_SPRK_CL not found in the database System Tables."}
‚ö†Ô∏è Table not yet available (may need a moment): 
ErrorName: TableDoesNotExist
ErrorCode: 2100
ErrorType: User
ErrorCategory: TableError
ErrorMessage: Table PaloAlto_Investigation_SPRK_CL not found in the database System Tables.


## üéâ Investigation Complete!

### üöÄ What You've Accomplished:
- ‚úÖ Connected to Sentinel data lake using `MicrosoftSentinelProvider`
- ‚úÖ Loaded and enriched Palo Alto `CommonSecurityLog` data with PySpark
- ‚úÖ Analysed traffic patterns ‚Äî event distribution, top talkers, port usage
- ‚úÖ Detected **beaconing** (C2 callbacks) via inter-arrival jitter analysis
- ‚úÖ Identified **DNS tunnelling** candidates via payload size anomalies
- ‚úÖ Flagged **data exfiltration** via large outbound transfer ratios
- ‚úÖ Broke down IDS/IPS **threat categories** by action
- ‚úÖ Built a unified **attack timeline** across all suspicious events
- ‚úÖ Saved results back to the data lake for dashboards and follow-up

### üéØ Next Steps:
- **Cross-source correlation**: Pivot suspicious IPs into CrowdStrike, Okta, or AWS tables
- **Watchlist enrichment**: Join against `known_bad_ips.csv` or `high_value_assets.csv`
- **Schedule this notebook**: Use [Notebook Jobs](https://learn.microsoft.com/en-us/azure/sentinel/datalake/notebook-jobs) for automated recurring analysis
- **Promote to analytics tier**: Change `_SPRK` suffix to `_SPRK_CL` to make results queryable in KQL

---

## üìö References & Further Learning

### üîó Microsoft Sentinel Data Lake:
- [Sentinel Provider Class Reference](https://learn.microsoft.com/en-us/azure/sentinel/datalake/sentinel-provider-class-reference) ‚Äî Complete `MicrosoftSentinelProvider` API docs
- [Notebook Jobs in Sentinel](https://learn.microsoft.com/en-us/azure/sentinel/datalake/notebook-jobs) ‚Äî Schedule and automate notebooks

### üìñ Apache Spark:
- [PySpark DataFrame API](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/dataframe.html) ‚Äî DataFrame operations reference
- [Spark SQL Functions](https://spark.apache.org/docs/latest/api/sql/index.html) ‚Äî Built-in functions catalogue
- [PySpark Quickstart](https://spark.apache.org/docs/latest/api/python/getting_started/quickstart_df.html) ‚Äî Getting started guide

### üõ°Ô∏è Security Analysis:
- [Microsoft Sentinel Skill-up Training](https://learn.microsoft.com/en-us/azure/sentinel/skill-up-resources) ‚Äî Free training modules
- [MITRE ATT&CK Framework](https://attack.mitre.org/) ‚Äî Threat categorisation reference

---

> **üí° Pro Tip**: Use this notebook as a template ‚Äî swap `CommonSecurityLog` for any other Sentinel table to investigate different data sources!