## engineer_model_features

This notebook extracts user behavior features from five types of activity logs (device, email, file, HTTP, and logon) to support insider threat detection using an anomaly detection model (see: investigate_anomalies.ipynb).

#### 🧠 Feature Engineering Strategy

**Temporal windowing approach to capture behavioral shifts:** 
* 14-day recent window: Captures user behavior in the 14 days leading up to their last recorded event. This reflects short-term activity and is crucial for detecting pre-departure anomalies.
* 60-day baseline window: Captures typical user behavior in the 60 days prior to the recent window. This helps establish a personalized behavioral norm.

This strategy enables us to detect deviations from a user's baseline — a key signal for insider threats, especially for users who may engage in risky actions shortly before exiting the organization.

#### 🛠️ Features Include:
* **Volume metrics:** email_sent_count, logon_count, http_request_count, etc
* **Behavioral flags:** after_hours_logon, after_hours_file_access, to_external_email_count, etc
* **Uniqueness metrics:** unique_files_count, unique_url_count, etc
* **Spike ratios**: compare recent vs. baseline activity to quantify unusual surges (ex. logon_spike_ratio, file_access_spike_ratio)

This combination of temporal and behavioral features helps the model differentiate between normal variation and potential insider threats.

---


In [None]:
from pyspark.sql import functions as F
from pyspark.sql.window import Window
from functools import reduce

In [None]:
# set up
WORK_HOURS_START = 6
WORK_HOURS_END = 18
is_after_hours = (F.hour("date") < WORK_HOURS_START) | (F.hour("date") >= WORK_HOURS_END)
is_weekend = F.dayofweek("date").isin([1,7])

In [None]:
# Load datasets
device_df = spark.read.table("clean_device_events")
email_df = spark.read.table("clean_email_events")
file_df = spark.read.table("clean_file_events")
http_df = spark.read.table("clean_http_events")
logon_df = spark.read.table("clean_logon_events")

In [None]:
# Combine all events and determine users first and last activity date
all_events = device_df.select("user","date") \
            .union(email_df.select("user","date")) \
            .union(file_df.select("user","date")) \
            .union(http_df.select("user","date")) \
            .union(logon_df.select("user","date")) 

user_activity_window = all_events.groupBy("user").agg(
    F.min("date").alias("first_seen"),
    F.max("date").alias("last_seen")
)

In [None]:
# Define dynamic 14-day recent and 60-day baseline window per user
# 14 day window - most recent 14 days of activity for that user (most recent activity)
# 60 day window - 60 days prior to the 7 day window (baselines users activity) 
user_activity_window = all_events.groupBy("user").agg(
    F.min("date").alias("first_seen"),
    F.max("date").alias("last_seen")
)

user_activity_window = user_activity_window.withColumn("14d_recent_window_start", F.expr("last_seen - interval 14 days")) \
                                    .withColumn("60d_baseline_window_end", F.expr("14d_recent_window_start - interval 1 day")) \
                                    .withColumn("60d_baseline_window_start", F.expr("60d_baseline_window_end - interval 60 days"))

user_window_broadcast = F.broadcast(user_activity_window)

In [None]:
def get_recent_and_baseline(df):
    df = df.join(user_window_broadcast, on="user")
    recent_df = df.filter((F.col("date") >= F.col("14d_recent_window_start")) & (F.col("date") <= F.col("last_seen")))
    baseline_df = df.filter((F.col("date") >= F.col("60d_baseline_window_start")) & (F.col("date") <= F.col("60d_baseline_window_end")))
    return recent_df, baseline_df

In [None]:
# DEVICE FEATURES
device_recent_df, device_baseline_df = get_recent_and_baseline(device_df)
device_recent_df = device_recent_df.groupBy("user").agg(F.count("*").alias("recent_device_count"))
device_baseline_df = device_baseline_df.groupBy("user").agg(F.count("*").alias("baseline_device_count"))

In [None]:
# EMAIL FEATURES
email_recent_df, email_baseline_df = get_recent_and_baseline(email_df)
email_recent_df = email_recent_df.groupBy("user").agg(
        F.count("*").alias("recent_email_sent_count"),
        F.avg("size").alias("recent_avg_email_size"),
        F.avg("attachments").alias("recent_avg_attachment_size"),
        F.count(F.when(~F.col("from").rlike("@dtaa\\.com$"), True)).alias("recent_from_external_count"),
        F.count(F.when(~F.col("to").rlike("@dtaa\\.com$"), True)).alias("recent_to_external_count"),
)

email_baseline_df = email_baseline_df.groupBy("user").agg(
        F.count("*").alias("baseline_email_sent_count"),
        F.avg("size").alias("baseline_avg_email_size"),
        F.avg("attachments").alias("baseline_avg_attachment_size"),
        F.count(F.when(~F.col("from").rlike("@dtaa\\.com$"), True)).alias("baseline_from_external_count"),
        F.count(F.when(~F.col("to").rlike("@dtaa\\.com$"), True)).alias("baseline_to_external_count"),
)

In [None]:
# FILE FEATURES
file_recent_df, file_baseline_df = get_recent_and_baseline(file_df)
file_recent_df = file_recent_df.groupBy("user").agg(
        F.count("*").alias("recent_file_access_count"),
        F.countDistinct("filename").alias("recent_unique_file_count")
)

file_baseline_df = file_baseline_df.groupBy("user").agg(
        F.count("*").alias("baseline_file_access_count"),
        F.countDistinct("filename").alias("baseline_unique_file_count")
)

In [None]:
# HTTP FEATURES
risky_keywords = ["wikileaks", "keylogger", "malware", "malicious","exploit","leak"]
risky_pattern = "|".join(risky_keywords)

http_recent_df, http_baseline_df = get_recent_and_baseline(http_df)
http_recent_df = http_recent_df.groupBy("user").agg(
        F.count("*").alias("recent_http_request_count"),
        F.countDistinct("url").alias("recent_unique_url_count"),
        F.count(F.when(F.col("url").rlike(risky_pattern), True)).alias("recent_risky_url_count")

)

http_baseline_df = http_baseline_df.groupBy("user").agg(
        F.count("*").alias("baseline_http_request_count"),
        F.countDistinct("url").alias("baseline_unique_url_count"),
        F.count(F.when(F.col("url").rlike(risky_pattern), True)).alias("baseline_risky_url_count")
)

In [None]:
# LOGON FEATURES
logon_recent_df, logon_baseline_df = get_recent_and_baseline(logon_df)
logon_recent_df = logon_recent_df.groupBy("user").agg(
        F.count("*").alias("recent_logon_count"),
        F.sum(F.when(is_after_hours, 1).otherwise(0)).alias("recent_after_hours_logon")
)

logon_baseline_df = logon_baseline_df.groupBy("user").agg(
        F.count("*").alias("baseline_logon_count"),
        F.sum(F.when(is_after_hours, 1).otherwise(0)).alias("baseline_after_hours_logon")
)

In [None]:
# Join feature tables
def join_all_dfs(dfs, join_col="user"):
    return reduce(lambda df1, df2: df1.join(df2, on=join_col, how="outer"), dfs)

final_features = join_all_dfs([
    device_recent_df, device_baseline_df,
    email_recent_df, email_baseline_df,
    file_recent_df, file_baseline_df,
    http_recent_df, http_baseline_df,
    logon_recent_df, logon_baseline_df
])

In [None]:
final_features = final_features.withColumns({
    "device_spike_ratio" : F.col("recent_device_count") / (F.col("baseline_device_count") + 1),
    "file_spike_ratio" : F.col("recent_file_access_count") / (F.col("baseline_file_access_count") + 1),
    "email_spike_ratio" : F.col("recent_email_sent_count") / (F.col("baseline_email_sent_count") + 1),
    "http_spike_ratio" : F.col("recent_http_request_count") / (F.col("baseline_http_request_count") + 1),
    "logon_spike_ratio" : F.col("recent_logon_count") / (F.col("baseline_logon_count") + 1)
})

In [None]:
final_features.write.mode("overwrite").saveAsTable("model_features")