# 🔐 Password Spray Backfill Notebook

## 📖 Overview

This notebook is designed to **backfill historical data** from Microsoft Entra ID `SigninLogs` into two derived tables:

- 🧩 **Features Table** → Per-IP behavioral features with spray score and labels.
- 📊 **Daily Stats Table** → Rollup metrics of sign-in activity per day.

It provides both **long lookback detection** (to uncover low-and-slow password spray campaigns) and **daily snapshots** for monitoring trends.

---

## 🎯 Objectives

- ✅ Load raw `SigninLogs` with relevant fields (user, IP, ASN, geo, status).
- ✅ Expand JSON location details into structured columns (City, Country, Latitude, Longitude).
- ✅ Compute **rolling window features** per IP: attempts, distinct users, days active, entropy.
- ✅ Calculate a **spray score** and assign **labels** (LOW / MEDIUM / HIGH).
- ✅ Write results to dedicated Sentinel data lake tables for further use (detections, dashboards, investigations).
- ✅ Provide preview of schema and sample rows for validation.
- ✅ Include safeguards for table deletion in case of incorrect backfill.

---

## 🛠️ Workflow

1. 🔧 **Parameters & Config** — define date ranges, table names, and selected fields.
2. 📥 **Load Raw SigninLogs** — read source logs, expand location details, and prepare base DataFrame.
3. 🔄 **Backfill Loop** — iterate day by day, compute candidate features + daily stats, save to Sentinel data lake.
4. 📊 **Preview Outputs** — inspect schemas and sample rows for both output tables.
5. ⚠️ **Optional Reset** — safeguard-protected section to delete tables if a re-run is required.

---

## 🗺️ Data Flow (Mermaid Diagram)

````mermaid
flowchart TD
    A[📥 SigninLogs<br/>Raw Data] --> B[🔍 Preprocessing<br/>Select Fields + Expand JSON]
    B --> C[🧮 Candidate Features<br/> Attempts, Users, Entropy, Geo, ASN]
    B --> D[📊 Daily Stats<br/> Totals, Distinct IPs/Users, Lockouts, Successes]

    C --> E[💾 password_spray_features<br/>Table]
    D --> F[💾 signin_stats_daily_SPRK<br/>Table]

    E --> G[📈 Dashboards & Reports]
    F --> G
    E --> H[🚨 Alerts]
    F --> H
    E --> I[🕵️ Investigations]
    ```
````


# 🔧 Parameters & Config

This section defines the **runtime parameters** and ensures the notebook can be easily reused or adapted.

- 📅 **Date ranges** — specify the `backfill_start_date` and `backfill_end_date` along with the rolling `lookback_window_days`.
- 🗂️ **Table names** — input (`SigninLogs`) and output (`password_spray_features_SPRK`, `signin_stats_daily_SPRK`) tables are set here for easy swapping.
- 📝 **Selected fields** — only the most relevant fields are read (User, IP, ASN, Location, Status, etc.), reducing processing overhead.
- ⚙️ **Write options** — define how results are written (append mode preserves history).

This modular setup means analysts can quickly adjust **dates, fields, or destinations** without touching the core logic.

---


In [None]:
from datetime import datetime, timedelta
from pyspark.sql import functions as F
from pyspark.sql.window import Window
from sentinel_lake.providers import MicrosoftSentinelProvider

# Initialize provider
data_provider = MicrosoftSentinelProvider(spark)

# -----------------------------
# Parameters
# -----------------------------

# Lookback range (e.g., last 30 days up to yesterday)
lookback_days = 4
backfill_end_date = datetime.now().date() - timedelta(days=1)  # yesterday
backfill_start_date = backfill_end_date - timedelta(days=lookback_days - 1)

# Sentinel Workspace name (update for your environment)
workspace_name = "<YourWorkspaceName>"  # Replace with your actual workspace name

# Table names (easy to swap)
input_table = "SigninLogs"
output_datalake_summary_table = "signin_summary_daily_SPRK"
output_datalake_stats_table = "signin_stats_daily_SPRK"
output_datalake_features_table = "password_spray_features_SPRK"

# Write options (append mode keeps history)
write_options = {"mode": "append"}

# -----------------------------
# Field selection
# -----------------------------
# Core fields + native enrichment (ASN, geolocation)
signin_fields = [
    "TimeGenerated",
    "UserPrincipalName",
    "UserDisplayName",
    "IPAddress",
    "ResultType",
    "ResultSignature",
    "Status",
    "UserDisplayName",
    "UserType",
    "UserAgent",
    # Geo & ASN enrichment -natively available in SigninLogs
    "LocationDetails",  # JSON object containing detailed location info - city, state, countryorRegion, geoCoordinates
    "AutonomousSystemNumber",
]


# -----------------------------
# Status output
# -----------------------------
print("📅 Backfill Parameters")
print(f"   Start → End: {backfill_start_date} → {backfill_end_date}")
print(f"   Lookback:   {lookback_days} days\n")

print("📂 Tables")
print(f"   Input:      {input_table}")
print(f"   Signins Summary: {output_datalake_summary_table}")
print(f"   Signins Stats: {output_datalake_stats_table}")
print(f"   Password Spray Features:      {output_datalake_features_table}\n")

print("📝 Selected Fields:")
print("   " + ", ".join(signin_fields))

## 📥 Load Raw SigninLogs

We read the source authentication data from the Microsoft Entra AD `SigninLogs` table.

- 📌 **Purpose**: Focus only on fields relevant to password spray detection.
- 🌍 **Enrichment**: Expand the `LocationDetails` JSON into flat columns (City, State, Country, Latitude, Longitude).
- 🔢 **Network context**: Capture `AutonomousSystemNumber (ASN)` for attribution to ISPs and hosting providers.
- 📅 **Date column**: Add a derived `date` column (from `TimeGenerated`) to support partition pruning and time-based queries.

✅ At this stage, the data is **normalized and structured**, ready for feature engineering in the backfill loop.

---


In [11]:
signin_df = (
    data_provider.read_table(input_table, workspace_name)
        .select(*signin_fields)
        .withColumn("date", F.to_date("TimeGenerated"))
        .filter((F.col("TimeGenerated") >= F.lit(backfill_start_date)) &
                (F.col("TimeGenerated") <  F.lit(backfill_end_date + timedelta(days=1))))
        # Expand LocationDetails JSON
        .withColumn("City", F.get_json_object("LocationDetails", "$.city"))
        .withColumn("State", F.get_json_object("LocationDetails", "$.state"))
        .withColumn("Country", F.get_json_object("LocationDetails", "$.countryOrRegion"))
        .withColumn("Latitude", F.get_json_object("LocationDetails", "$.geoCoordinates.latitude").cast("double"))
        .withColumn("Longitude", F.get_json_object("LocationDetails", "$.geoCoordinates.longitude").cast("double"))
        # Rename ASN for consistency
        .withColumnRenamed("AutonomousSystemNumber", "ASN")
)

signin_df.printSchema()  # Prints only the DataFrame schema (metadata), does not scan the data so lightweight
print("✅ Loaded SigninLogs with expanded LocationDetails and ASN")

## 🔄 Backfill Loop

The heart of the notebook — iterating from `backfill_start_date` → `backfill_end_date`.  
For each day, we build **two sets of outputs**: _features_ and _daily stats_.

### Step-by-Step

1. 🔍 **Select rolling window**

   - Extract a sliding window of activity (`day - lookback_window_days` → `day`).
   - Ensures we capture **low-and-slow spray patterns** that span multiple days.

2. 🧮 **Compute candidate features**

   - Aggregate per IP: total attempts, distinct targeted users, days active.
   - Compute entropy of usernames (distribution spread).
   - Normalize metrics and calculate a **spray_score**.
   - Add **spray_score_label** (LOW / MEDIUM / HIGH).
   - Include attribution fields (IP, ASN, City, Country).

3. 💾 **Append to `password_spray_features_SPRK`**

   - Stores per-IP features with scores and context.
   - Enables downstream dashboards, detections, and investigations.

4. 📊 **Compute daily stats**

   - Aggregate per day: total attempts, distinct targeted users, distinct source IPs.
   - Count lockouts (ResultType=50053) and successes.

5. 💾 **Append to `signins_stats_daily_SPRK`**
   - Provides daily rollups to track attack volume and trends.
   - Useful for high-level reporting and long-term baselines.

---

📌 Together, these outputs provide **both granular IP-level signals** (features) and **strategic daily summaries** (stats), enabling analysts to detect and contextualize password spray activity.

```mermaid
flowchart TD
    subgraph BackfillLoop[Backfill Loop - Daily]
        A[Current day = start_date] --> B[Define Lookback Window<br/>day - N days → day]
        B --> C[Filter SigninLogs for Window]
        C --> D[Compute Candidate Features<br/>per IP]
        D --> E[Write to password_spray_features_SPRK<br/>partition run_date = day]
        B --> F[Filter SigninLogs for Current Day Only]
        F --> G[Aggregate Daily Stats]
        G --> H[Write to signins_stats_daily_SPRK<br/>row for date = day]
        E --> I[Increment Day → Next]
        H --> I
        I -->|Loop until end_date| A
    end
```


In [21]:
# -----------------------------
# Backfill Loop
# -----------------------------
print(f" Backfilling the data range: {backfill_start_date} → {backfill_end_date}")
current = backfill_start_date
while current <= backfill_end_date:
    try:
        # Define day window (midnight → next midnight UTC)
        run_start = datetime(current.year, current.month, current.day)  # midnight start
        run_end   = run_start + timedelta(days=1)                       # midnight next day

        print(f"▶️ Processing {current} → {run_start} to {run_end}")

        print(f"▶️ Currently Processing {current}...")

        # Load raw logs for this day
        df_day = (
            data_provider.read_table(input_table, workspace_name)
                .select(*signin_fields)
                .filter((F.col("TimeGenerated") >= F.lit(run_start)) &
                        (F.col("TimeGenerated") <  F.lit(run_end)))
                .withColumn("date", F.to_date("TimeGenerated"))
                .withColumn("City", F.get_json_object("LocationDetails", "$.city"))
                .withColumn("Country", F.get_json_object("LocationDetails", "$.countryOrRegion"))
                .withColumn("Latitude", F.get_json_object("LocationDetails", "$.geoCoordinates.latitude").cast("double"))
                .withColumn("Longitude", F.get_json_object("LocationDetails", "$.geoCoordinates.longitude").cast("double"))
                .withColumnRenamed("AutonomousSystemNumber", "ASN")
                .cache()
        )

        # ------------------
        # Daily summary per IP
        # ------------------
        agg_all = (
            df_day.groupBy("IPAddress", "ASN", "City", "Country", "date")
                .agg(
                    F.count("*").alias("attempts_total"),
                    F.sum(F.when(F.col("ResultSignature")=="Success",1).otherwise(0)).alias("success_count"),
                    F.countDistinct("UserPrincipalName").alias("distinct_users"),
                    F.min("TimeGenerated").alias("first_seen"),
                    F.max("TimeGenerated").alias("last_seen")
                )
        )

        user_counts = df_day.groupBy("IPAddress","UserPrincipalName").count()
        entropy = (
            user_counts
                .withColumn("p", F.col("count") / F.sum("count").over(Window.partitionBy("IPAddress")))
                .groupBy("IPAddress")
                .agg(
                    F.round(-F.sum(F.col("p") * F.log2("p")), 2).alias("username_entropy")
                )
        )

        summary = agg_all.join(entropy, "IPAddress", "left")

        data_provider.save_as_table(
            summary,
            output_datalake_summary_table,
            write_options={"mode": "append", "partitionBy": "date"}
        )
        print(f"✅ Wrote summary for {current}")

        # ------------------
        # Daily stats rollup
        # ------------------
        stats = (
            df_day.groupBy("date")
                .agg(
                    F.count("*").alias("total_attempts"),
                    F.countDistinct("UserPrincipalName").alias("distinct_targeted_users"),
                    F.countDistinct("IPAddress").alias("distinct_source_ips"),
                    F.sum(F.when(F.col("ResultType")=="50053",1).otherwise(0)).alias("lockouts"),
                    F.sum(F.when(F.col("ResultSignature")=="Success",1).otherwise(0)).alias("successes")
                )
        )

        data_provider.save_as_table(
            stats,
            output_datalake_stats_table,
            write_options={"mode": "append", "partitionBy": "date"}
        )
        print(f"✅ Wrote stats for {current}")

        # Clean up
        df_day.unpersist()

    except Exception as e:
        print(f"❌ Error processing {current}: {e}")

    # Move to next day
    current += timedelta(days=1)

print("🎉 Backfill complete")

In [None]:
# -----------------------------
# Backfill Loop
# -----------------------------
print(f" Backfilling the data range: {backfill_start_date} → {backfill_end_date}")
current = backfill_start_date
while current <= backfill_end_date:
    try:
        window_start = current - timedelta(days=3)
        window_end   = current

        # -----------------
        # Filter once & cache
        # -----------------
        df_window = (
            signin_df.filter(
                (F.col("date") >= F.lit(window_start.isoformat())) &
                (F.col("date") <= F.lit(window_end.isoformat()))
            )
            .cache()
        )

        # -----------------
        # Candidate features (single groupBy)
        # Carry over ASN, City, Country for attribution
        # -----------------
        agg_all = (
            df_window.groupBy("IPAddress", "ASN", "City", "Country")
              .agg(
                F.count("*").alias("attempts_total"),
                F.sum(F.when(F.col("ResultSignature")=="Success",1).otherwise(0)).alias("success_count"),
                F.countDistinct("UserPrincipalName").alias("distinct_users"),
                F.countDistinct("date").alias("days_active"),
                F.min("TimeGenerated").alias("first_seen"),
                F.max("TimeGenerated").alias("last_seen")
              )
        )

        # -----------------
        # Entropy (optimized with window instead of join)
        # -----------------
        user_counts = df_window.groupBy("IPAddress","UserPrincipalName").count()
        entropy = (
            user_counts
              .withColumn("p", F.col("count") / F.sum("count").over(Window.partitionBy("IPAddress")))
              .groupBy("IPAddress")
              .agg((-F.sum(F.col("p")*F.log2("p"))).alias("username_entropy"))
        )

        features = (
            agg_all.join(entropy, "IPAddress", "left")
                   .withColumn("run_date", F.lit(current.isoformat()))
                   .withColumn("detection_window_start", F.lit(window_start.isoformat()))
                   .withColumn("detection_window_end", F.lit(window_end.isoformat()))
        )

        # -----------------
        # Normalization (reuse global precomputed values if available)
        # -----------------
        if 'max_users' not in globals() or 'max_entropy' not in globals():
            # compute once, outside loop ideally
            global_max = features.agg(
                F.max("distinct_users").alias("max_users"),
                F.round(F.max("username_entropy"), 2).alias("max_entropy")
            ).first()

            max_users = global_max.max_users or 1
            max_entropy = global_max.max_entropy or 1.0

        features = (
            features
            .withColumn("distinct_users_norm", F.round(F.col("distinct_users") / F.lit(max_users), 2))
            .withColumn("success_rate", F.round(F.col("success_count") / F.greatest(F.col("attempts_total"), F.lit(1)), 2))
            .withColumn("entropy_norm", F.round(F.col("username_entropy") / F.lit(max_entropy), 2))
            .withColumn(
                "spray_score",
                F.round(
                    0.5*F.col("distinct_users_norm") +
                    0.2*(1 - F.col("success_rate")) +
                    0.3*F.col("entropy_norm"),
                    2
                )
            )
            .withColumn(
                "spray_score_label",
                F.when(F.col("spray_score") < 0.3, F.lit("LOW"))
                .when(F.col("spray_score") < 0.6, F.lit("MEDIUM"))
                .otherwise(F.lit("HIGH"))
            )
        )

        # Write candidates
        data_provider.save_as_table(features, output_datalake_features_table, write_options=write_options)
        print(f"✅ Wrote candidates for {current}")

        # Unpersist cached slice
        df_window.unpersist()

    except Exception as e:
        print(f"❌ Error processing {current}: {str(e)}")

    current += timedelta(days=1)

print("🎉 Backfill complete")

# 👀 Preview Outputs

After running the backfill loop, it’s important to validate that the pipeline produced the expected data.  
This section shows **schemas** and **sample rows** for both output tables.

---

### 🗂️ Daily Summary Table (`signin_summary_daily_SPRK`)

This table provides **per-IP, per-day aggregates** of sign-in activity.  
It compresses raw `SigninLogs` into a daily rollup for each source IP, while preserving attribution context.

- 🌍 **Geo & ASN context** → IPAddress, ASN, City, Country
- 📅 **Date** → reporting day of the aggregation
- 🔢 **Total attempts** → total number of authentication attempts from the IP on that day
- ✅ **Success count** → number of successful logons (helps measure spray effectiveness)
- 👤 **Distinct users** → number of unique targeted accounts
- ⏱️ **First seen / Last seen** → earliest and latest attempt timestamps for that IP within the day
- 🧮 **Username entropy** → entropy score measuring spread/randomness of targeted usernames

---

### 📊 Daily Stats Table (`signin_stats_daily_SPRK`)

This table provides **daily rollups** of authentication activity.  
It helps track the overall level of spray attempts and anomalies over time.

- 📅 **Date** → reporting date
- 🔢 **Total attempts** → all sign-in attempts for the day
- 👤 **Distinct targeted users** → how many unique accounts were hit
- 🌐 **Distinct source IPs** → how many unique IPs attempted logons
- 🚫 **Lockouts** → counts of account lockout events (ResultType=50053)
- ✅ **Successes** → counts of successful logons

---

### 🧩 Features Table (`password_spray_features_SPRK`)

This table contains **per-IP aggregated features** across the rolling lookback window.  
It helps identify which IP addresses exhibit password spray-like behavior.

- 🌍 **Geo & ASN context** → IPAddress, ASN, City, Country
- 📊 **Behavioral metrics** → attempts_total, distinct_users, days_active, entropy
- 🔢 **Normalized features** → distinct_users_norm, entropy_norm, success_rate
- 🎯 **Spray score** → weighted score reflecting spray likelihood
- 🏷️ **Spray score label** → LOW / MEDIUM / HIGH for easier triage

---

These outputs form the foundation for:

- 📈 **Dashboards** (geo heatmaps, trend lines, score distributions)
- 🚨 **Alerts** (triggering on high-score IPs)
- 🕵️ **Investigations** (pivoting into ASN, city, or recurring IPs)


In [23]:
# -----------------------------
# Preview sample outputs
# -----------------------------

print("\n📑 Daily Summary Table Schema")
summary.printSchema()

print("\n🔍 Daily Summary Sample Rows")
display(
    summary.select(
        "date", "IPAddress", "attempts_total", "success_count",
        "distinct_users", "username_entropy", "ASN", "City", "Country"
    ).limit(10)
)

print("\n📑 Daily Stats Table Schema")
stats.printSchema()

print("\n🔍 Daily Stats Sample Rows")
display(
    stats.select(
        "date", "total_attempts", "distinct_targeted_users",
        "distinct_source_ips", "lockouts", "successes"
    ).limit(10)
)

print("📑 Candidates Table Schema")
features.printSchema()   # Prints schema only (no scan)

print("\n🔍 Candidates Sample Rows")
display(
    features.select(
        "IPAddress", "ASN", "City", "Country",
        "attempts_total", "distinct_users", "days_active",
        "username_entropy", "spray_score", "spray_score_label"
    ).limit(10)
)

# ⚠️ Delete entire Sentinel data lake tables

### Proceed with Caution

Occasionally you may need to **reset** the output tables if the backfill wrote incorrect data.  
Use this section **with extreme caution**:

- 🚨 This will **permanently delete** the table from the Sentinel data lake.
- 🔒 Keep the command **commented out by default**.
- ✅ Only **uncomment** if you really intend to wipe and rebuild the table.

Recommended workflow:

1. Double-check table names (`features` vs `stats`) before deletion.
2. Run deletion only in a controlled/test environment.
3. Immediately rerun the notebook to regenerate fresh data.


In [24]:
# ❌ Uncomment the relevant block AND set confirm_delete = True to enable deletion

confirm_delete = False   # Change to True only if you want to proceed

if confirm_delete:
    # Delete candidate table
    # data_provider.delete_table(output_datalake_summary_table)
    
    # Delete daily stats table
    # data_provider.delete_table(output_datalake_stats_table)

    # Delete password spray features table
    # data_provider.delete_table(output_datalake_features_table)
    
    print("⚠️ Tables deleted. You must rerun the backfill to regenerate data.")
else:
    print("✅ Delete set to False, Safeguard active. No tables were deleted or action needed.")
