In [3]:
import json
from datetime import datetime, timezone, timedelta
import pandas as pd
import s3fs

# Configuration
BUCKET_NAME = "iot-security-logs-ln-2025"
SERVICE_PATH = "zeek-logs"  # change to "suricata-logs" for that set

# Calculate yesterday's date in UTC
now_utc = datetime.now(timezone.utc)
day_utc = (now_utc).date()
day_prefix = f"{day_utc:%Y/%m/%d}/"      # e.g., "2025/09/17/"
root_prefix = f"s3://{BUCKET_NAME}/{SERVICE_PATH}/{day_prefix}"

print(f"Using day prefix (UTC): {day_prefix}")
print(f"Root S3 path: {root_prefix}\n")

# Initialize S3
s3_fs = s3fs.S3FileSystem(anon=False)

def read_firehose_object_to_messages(s3_path: str):
    """
    Reads a Firehose object that contains JSON records and returns a list of message values
    """
    with s3_fs.open(s3_path, "r") as f:
        raw = f.read()

    raw = raw.strip()
    if not raw:
        return []

    # Normalize concatenated JSON objects
    normalized = raw.replace("}\n{", "},{").replace("}{", "},{")
    payload = "[" + normalized + "]"
    records = json.loads(payload)
    return [r.get("message", "") for r in records if r.get("message")]

# Find all objects for the day
try:
    all_files = s3_fs.find(root_prefix)
except FileNotFoundError:
    all_files = []

# Filter and sort files
all_files = [p for p in all_files if not p.endswith("/")]
all_files.sort()

print(f"Found {len(all_files)} objects under {root_prefix}")
if len(all_files) <= 50:
    for i, p in enumerate(all_files[:50], 1):
        print(f"  [{i:>3}] {p}")
    if len(all_files) > 50:
        print("  ... (truncated)")

# Process data and discover headers
all_data_rows = []
zeek_column_names = None
row_width = None

processed_count = 0
for i, s3_path in enumerate(all_files, 1):
    try:
        msgs = read_firehose_object_to_messages(s3_path)
    except Exception as e:
        print(f"Skip (read error): {s3_path} -> {e}")
        continue

    if not msgs:
        continue

    # Try to find column headers
    if zeek_column_names is None:
        header_lines = [m for m in msgs if m.startswith("#fields")]
        if header_lines:
            zeek_column_names = header_lines[0].replace("#fields\t", "").split("\t")

    # Process non-comment data rows
    for m in msgs:
        if m.startswith("#"):
            continue
        cols = m.split("\t")
        if row_width is None:
            row_width = len(cols)
        if len(cols) == row_width:
            all_data_rows.append(cols)

    processed_count += 1
    if processed_count % 50 == 0:
        print(f"Processed {processed_count} objects... (rows so far: {len(all_data_rows)})")

# Create DataFrame
if not all_data_rows:
    print("\nNo data rows found for the entire day prefix.")
    df_zeek_raw = pd.DataFrame()
else:
    if not zeek_column_names or len(zeek_column_names) != row_width:
        hdr_len = 0 if zeek_column_names is None else len(zeek_column_names)
        print(
            f"\nWarning: header missing or mismatched "
            f"(row width={row_width}, header len={hdr_len}). "
            f"Using generic names c0..c{row_width-1}."
        )
        zeek_column_names = [f"c{i}" for i in range(row_width)]

    df_zeek_raw = pd.DataFrame(all_data_rows, columns=zeek_column_names)
    print(
        f"\nSuccessfully combined Zeek logs into a DataFrame with "
        f"{len(df_zeek_raw)} rows x {df_zeek_raw.shape[1]} columns."
    )
    display(df_zeek_raw.head(20))


Using day prefix (UTC): 2025/09/18/
Root S3 path: s3://iot-security-logs-ln-2025/zeek-logs/2025/09/18/

Found 14 objects under s3://iot-security-logs-ln-2025/zeek-logs/2025/09/18/
  [  1] iot-security-logs-ln-2025/zeek-logs/2025/09/18/16/zeek-log-stream-1-2025-09-18-16-41-23-ff4c9957-571f-4bd0-8158-bda2e95f85d0
  [  2] iot-security-logs-ln-2025/zeek-logs/2025/09/18/16/zeek-log-stream-1-2025-09-18-16-51-46-6f5a8249-c478-4569-b26c-14a716539490
  [  3] iot-security-logs-ln-2025/zeek-logs/2025/09/18/16/zeek-log-stream-1-2025-09-18-16-56-48-bcb7b495-5e4f-4d76-8aad-64cd99be2171
  [  4] iot-security-logs-ln-2025/zeek-logs/2025/09/18/18/zeek-log-stream-1-2025-09-18-18-06-18-3c612f2f-1706-46b1-a2cc-688c976ca930
  [  5] iot-security-logs-ln-2025/zeek-logs/2025/09/18/18/zeek-log-stream-1-2025-09-18-18-11-22-e7c5b2db-d6c9-4136-b678-3274c9508fe8
  [  6] iot-security-logs-ln-2025/zeek-logs/2025/09/18/18/zeek-log-stream-1-2025-09-18-18-16-21-810d20cc-a89e-45aa-bb7a-396a1fdae042
  [  7] iot-security-l

Unnamed: 0,ts,uid,id.orig_h,id.orig_p,id.resp_h,id.resp_p,proto,service,duration,orig_bytes,...,local_orig,local_resp,missed_bytes,history,orig_pkts,orig_ip_bytes,resp_pkts,resp_ip_bytes,tunnel_parents,ip_proto
0,1757531788.846078,CPpRhb2Z3hJMxzg5me,fe80::86c7:6d32:e44b:ad4,5353,ff02::fb,5353,udp,dns,0.781751,1028,...,T,F,0,D,6,1316,0,0,-,17
1,1757531909.634593,C8OlSA4FbABxJoRxAg,fe80::86c7:6d32:e44b:ad4,5353,ff02::fb,5353,udp,dns,0.796473,1028,...,T,F,0,D,6,1316,0,0,-,17
2,1757532030.441009,CqeHlI1gSQyTzviDNd,fe80::86c7:6d32:e44b:ad4,5353,ff02::fb,5353,udp,dns,0.792198,1028,...,T,F,0,D,6,1316,0,0,-,17
3,1757532272.053283,Cn7pD9345dsI9YlBTe,fe80::86c7:6d32:e44b:ad4,5353,ff02::fb,5353,udp,dns,0.790143,1028,...,T,F,0,D,6,1316,0,0,-,17
4,1757532392.851877,CygO1k3AGbyN7dFFa4,fe80::86c7:6d32:e44b:ad4,5353,ff02::fb,5353,udp,dns,0.780683,1028,...,T,F,0,D,6,1316,0,0,-,17
5,1757532513.643143,ClEJK74fvHwKxip061,fe80::86c7:6d32:e44b:ad4,5353,ff02::fb,5353,udp,dns,0.757751,1028,...,T,F,0,D,6,1316,0,0,-,17
6,1757531426.446776,Cyi129W4KrQSiUtLh,fe80::86c7:6d32:e44b:ad4,5353,ff02::fb,5353,udp,dns,0.7796,1028,...,T,F,0,D,6,1316,0,0,-,17
7,1757531547.245073,C1miqOaOa3Vf6bcv4,fe80::86c7:6d32:e44b:ad4,5353,ff02::fb,5353,udp,dns,0.779854,1028,...,T,F,0,D,6,1316,0,0,-,17
8,1757531668.041677,CJrb53nZj9m2SrKpj,fe80::86c7:6d32:e44b:ad4,5353,ff02::fb,5353,udp,dns,0.789413,1028,...,T,F,0,D,6,1316,0,0,-,17
9,1757532151.251206,CpGuq8BuoVAh7n0P4,fe80::86c7:6d32:e44b:ad4,5353,ff02::fb,5353,udp,dns,0.792117,1028,...,T,F,0,D,6,1316,0,0,-,17


In [4]:
# 1-Data-Exploration.ipynb — Step 4: Suricata Logs
# Cell #2 (load all Suricata logs for an entire day by walking hour subfolders)

import json
from datetime import datetime, timezone, timedelta

import pandas as pd
import s3fs

# =========================
# Configuration
# =========================
BUCKET_NAME  = "iot-security-logs-ln-2025"
SERVICE_PATH = "suricata-logs"   # <-- Suricata path

# Use *today's UTC day* to match your Zeek cell (or switch to yesterday if you want completed partitions)
now_utc   = datetime.now(timezone.utc)
day_utc   = (now_utc).date()     # use (now_utc - timedelta(days=1)).date() for "yesterday"
day_prefix = f"{day_utc:%Y/%m/%d}/"            # e.g., "2025/09/18/"
root_prefix = f"s3://{BUCKET_NAME}/{SERVICE_PATH}/{day_prefix}"

print(f"Using day prefix (UTC): {day_prefix}")
print(f"Root S3 path: {root_prefix}\n")

# =========================
# Initialize S3
# =========================
s3_fs = s3fs.S3FileSystem(anon=False)

# =========================
# Helper: read a Firehose object safely into a list of Suricata event dicts
# =========================
def read_firehose_object_to_events(s3_path: str):
    """
    Reads a Firehose object that contains JSON records like:
        {"message": "<stringified JSON of suricata event>"} {"message": "..."} ...
    Returns a list[dict] of parsed Suricata events. Skips bad/malformed records gracefully.
    """
    with s3_fs.open(s3_path, "r") as f:
        raw = f.read()

    raw = raw.strip()
    if not raw:
        return []

    # Normalize concatenated JSON objects into a proper JSON array
    normalized = raw.replace("}\n{", "},{").replace("}{", "},{")
    payload = "[" + normalized + "]"
    outer_records = json.loads(payload)

    events = []
    for rec in outer_records:
        msg = rec.get("message")
        if not msg:
            continue
        try:
            # Some Firehose setups already store dicts; most store a JSON string
            evt = json.loads(msg) if isinstance(msg, str) else msg
            if isinstance(evt, dict):
                events.append(evt)
        except json.JSONDecodeError:
            # Skip malformed inner JSON
            continue
    return events

# =========================
# Find ALL objects under the day (walks through every HH/)
# =========================
try:
    all_files = s3_fs.find(root_prefix)
except FileNotFoundError:
    all_files = []

# Keep only actual objects (not "directory" markers) and sort for readability
all_files = [p for p in all_files if not p.endswith("/")]
all_files.sort()

print(f"Found {len(all_files)} objects under {root_prefix}")
if len(all_files) <= 50:
    for i, p in enumerate(all_files[:50], 1):
        print(f"  [{i:>3}] {p}")
    if len(all_files) > 50:
        print("  ... (truncated)")

# =========================
# Collect all events
# =========================
all_suricata_events = []
processed_count = 0

for i, s3_path in enumerate(all_files, 1):
    try:
        events = read_firehose_object_to_events(s3_path)
    except Exception as e:
        print(f"Skip (read error): {s3_path} -> {e}")
        continue

    if events:
        all_suricata_events.extend(events)

    processed_count += 1
    if processed_count % 50 == 0:
        print(f"Processed {processed_count} objects... (events so far: {len(all_suricata_events)})")

# =========================
# Build DataFrame
# =========================
if not all_suricata_events:
    print("\nNo Suricata events found for the entire day prefix.")
    df_suricata_raw = pd.DataFrame()  # empty
else:
    df_suricata_raw = pd.DataFrame(all_suricata_events)
    print(
        f"\nSuccessfully combined Suricata logs into a DataFrame with "
        f"{len(df_suricata_raw)} rows x {df_suricata_raw.shape[1]} columns."
    )

    # Helpful preview & quick sanity fields
    preview_cols = [c for c in ["timestamp", "event_type", "src_ip", "src_port", "dest_ip", "dest_port"] if c in df_suricata_raw.columns]
    if "alert" in df_suricata_raw.columns:
        # extract a few common alert details if present
        df_suricata_raw["alert_signature"] = df_suricata_raw["alert"].map(lambda a: a.get("signature") if isinstance(a, dict) else None)
        df_suricata_raw["alert_sev"] = df_suricata_raw["alert"].map(lambda a: a.get("severity") if isinstance(a, dict) else None)
        preview_cols += ["alert_signature", "alert_sev"]

    display(df_suricata_raw[preview_cols].head(20) if preview_cols else df_suricata_raw.head(20))


Using day prefix (UTC): 2025/09/18/
Root S3 path: s3://iot-security-logs-ln-2025/suricata-logs/2025/09/18/

Found 24 objects under s3://iot-security-logs-ln-2025/suricata-logs/2025/09/18/
  [  1] iot-security-logs-ln-2025/suricata-logs/2025/09/18/16/suricata-log-stream-1-2025-09-18-16-51-47-7bd84094-bf90-40c7-b555-a0a36722ff25
  [  2] iot-security-logs-ln-2025/suricata-logs/2025/09/18/16/suricata-log-stream-1-2025-09-18-16-58-32-fda3d4f9-85dc-4c02-8143-ac3c604ed233
  [  3] iot-security-logs-ln-2025/suricata-logs/2025/09/18/17/suricata-log-stream-1-2025-09-18-17-08-32-030eee42-4fca-4f4a-a5d2-ae51901999aa
  [  4] iot-security-logs-ln-2025/suricata-logs/2025/09/18/17/suricata-log-stream-1-2025-09-18-17-13-35-6a2f77b6-ecc5-4fe6-86e2-c48c42d8771d
  [  5] iot-security-logs-ln-2025/suricata-logs/2025/09/18/17/suricata-log-stream-1-2025-09-18-17-23-56-a11a1cda-0b96-4f28-ab94-0061e1c79c82
  [  6] iot-security-logs-ln-2025/suricata-logs/2025/09/18/17/suricata-log-stream-1-2025-09-18-17-29-11-36f

Unnamed: 0,timestamp,event_type,src_ip,src_port,dest_ip,dest_port,alert_signature,alert_sev
0,2025-09-18T16:50:34.200000+0000,alert,192.168.1.110,57621,192.168.1.255,57621,ET INFO Spotify P2P Client,3
1,2025-09-18T16:53:03.368885+0000,alert,192.168.1.110,57621,192.168.1.255,57621,ET INFO Spotify P2P Client,3
2,2025-09-18T16:55:26.847587+0000,alert,2001:08d8:100f:f000:0000:0000:0000:0208,80,2600:1700:21f9:efe0:0000:0000:0000:002e,46164,GPL ATTACK_RESPONSE id check returned root,2
3,2025-09-18T16:55:27.179288+0000,alert,2001:08d8:100f:f000:0000:0000:0000:0208,80,2600:1700:21f9:efe0:0000:0000:0000:002e,46180,GPL ATTACK_RESPONSE id check returned root,2
4,2025-09-18T16:55:27.639473+0000,alert,2001:08d8:100f:f000:0000:0000:0000:0208,80,2600:1700:21f9:efe0:0000:0000:0000:002e,46190,GPL ATTACK_RESPONSE id check returned root,2
5,2025-09-18T16:55:28.004541+0000,alert,2001:08d8:100f:f000:0000:0000:0000:0208,80,2600:1700:21f9:efe0:0000:0000:0000:002e,46200,GPL ATTACK_RESPONSE id check returned root,2
6,2025-09-18T16:55:28.282266+0000,alert,2001:08d8:100f:f000:0000:0000:0000:0208,80,2600:1700:21f9:efe0:0000:0000:0000:002e,46214,GPL ATTACK_RESPONSE id check returned root,2
7,2025-09-18T16:55:35.655316+0000,alert,2001:08d8:100f:f000:0000:0000:0000:0208,80,2600:1700:21f9:efe0:0000:0000:0000:002e,53332,GPL ATTACK_RESPONSE id check returned root,2
8,2025-09-18T16:55:36.220519+0000,alert,2001:08d8:100f:f000:0000:0000:0000:0208,80,2600:1700:21f9:efe0:0000:0000:0000:002e,53338,GPL ATTACK_RESPONSE id check returned root,2
9,2025-09-18T16:55:28.583443+0000,alert,2001:08d8:100f:f000:0000:0000:0000:0208,80,2600:1700:21f9:efe0:0000:0000:0000:002e,46220,GPL ATTACK_RESPONSE id check returned root,2


In [5]:
# Cell 3: Feature Engineering (robust + memory-friendly for whole-day Zeek)

import pandas as pd
import numpy as np

print("--- Starting Feature Engineering ---")

# 1) Map columns whether we have real Zeek headers or generic c0..cN.
#    Adjust the index numbers if your schema differs. Print a row to confirm once.
if "ts" in df_zeek_raw.columns:
    col_ts        = "ts"
    col_uid       = "uid"          if "uid" in df_zeek_raw.columns else None
    col_orig_h    = "id.orig_h"
    col_resp_h    = "id.resp_h"
    col_resp_p    = "id.resp_p"
    col_duration  = "duration"
    col_orig_b    = "orig_bytes"
    col_resp_b    = "resp_bytes"
    col_orig_pkts = "orig_pkts"    if "orig_pkts" in df_zeek_raw.columns else None
    col_resp_pkts = "resp_pkts"    if "resp_pkts" in df_zeek_raw.columns else None
else:
    # Fallback: generic names (based on your earlier 22-col sample).
    # Print a row once to verify these indices match your data.
    # Example guess (common conn.log-ish layout) — tweak if needed:
    col_ts_idx        = 0
    col_uid_idx       = 1
    col_orig_h_idx    = 2
    col_resp_h_idx    = 4
    col_resp_p_idx    = 5
    col_duration_idx  = 8
    col_orig_b_idx    = 9
    col_resp_b_idx    = 10
    # Packet columns (may differ by build):
    col_orig_pkts_idx = 16
    col_resp_pkts_idx = 18

    # Build a projection dict into normalized names:
    rename_map = {
        f"c{col_ts_idx}": "ts",
        f"c{col_uid_idx}": "uid",
        f"c{col_orig_h_idx}": "id.orig_h",
        f"c{col_resp_h_idx}": "id.resp_h",
        f"c{col_resp_p_idx}": "id.resp_p",
        f"c{col_duration_idx}": "duration",
        f"c{col_orig_b_idx}": "orig_bytes",
        f"c{col_resp_b_idx}": "resp_bytes",
        f"c{col_orig_pkts_idx}": "orig_pkts",
        f"c{col_resp_pkts_idx}": "resp_pkts",
    }
    # Keep only present columns from rename_map:
    present = {k: v for k, v in rename_map.items() if k in df_zeek_raw.columns}
    df_zeek_raw = df_zeek_raw[present.keys()].rename(columns=present)

    col_ts        = "ts"
    col_uid       = "uid" if "uid" in df_zeek_raw.columns else None
    col_orig_h    = "id.orig_h"
    col_resp_h    = "id.resp_h"
    col_resp_p    = "id.resp_p"
    col_duration  = "duration"
    col_orig_b    = "orig_bytes"
    col_resp_b    = "resp_bytes"
    col_orig_pkts = "orig_pkts" if "orig_pkts" in df_zeek_raw.columns else None
    col_resp_pkts = "resp_pkts" if "resp_pkts" in df_zeek_raw.columns else None

# 2) Select only columns we actually need (drop everything else to save memory)
need_cols = [col for col in [
    col_ts, col_uid, col_orig_h, col_resp_h, col_resp_p,
    col_duration, col_orig_b, col_resp_b, col_orig_pkts, col_resp_pkts
] if col is not None and col in df_zeek_raw.columns]

df = df_zeek_raw[need_cols].copy()

# 3) Clean numeric columns (replace '-' with NaN, coerce, fill 0, downcast)
def to_num(series, downcast=None):
    s = pd.to_numeric(series.replace("-", np.nan), errors="coerce").fillna(0)
    if downcast:
        return pd.to_numeric(s, downcast=downcast)
    return s

df[col_duration] = to_num(df[col_duration], downcast="float")
df[col_orig_b]   = to_num(df[col_orig_b],   downcast="integer")
df[col_resp_b]   = to_num(df[col_resp_b],   downcast="integer")
if col_orig_pkts:
    df[col_orig_pkts] = to_num(df[col_orig_pkts], downcast="integer")
else:
    df["orig_pkts"] = 0
    col_orig_pkts = "orig_pkts"
if col_resp_pkts:
    df[col_resp_pkts] = to_num(df[col_resp_pkts], downcast="integer")
else:
    df["resp_pkts"] = 0
    col_resp_pkts = "resp_pkts"

# 4) Timestamp → minute bucket (avoid giant index; use Grouper)
# ts might be epoch seconds in string form — coerce to float first
ts_numeric = pd.to_numeric(df[col_ts], errors="coerce")
df[col_ts] = pd.to_datetime(ts_numeric, unit="s", utc=True)

# 5) Some fields may be '-' strings — keep as strings, nunique still works
df[col_resp_h] = df[col_resp_h].astype("string")
df[col_resp_p] = df[col_resp_p].astype("string")
df[col_orig_h] = df[col_orig_h].astype("string")
if col_uid and col_uid in df.columns:
    df[col_uid] = df[col_uid].astype("string")

# 6) Group by device + 1-minute windows using Grouper (more memory-friendly)
grouped = df.groupby(
    [col_orig_h, pd.Grouper(key=col_ts, freq="1min", label="right")]
)

agg_df = grouped.agg(
    orig_bytes_sum=(col_orig_b, "sum"),
    resp_bytes_sum=(col_resp_b, "sum"),
    orig_pkts_sum=(col_orig_pkts, "sum"),
    resp_pkts_sum=(col_resp_pkts, "sum"),
    duration_sum=(col_duration, "sum"),
    unique_dest_ips=(col_resp_h, "nunique"),
    unique_dest_ports=(col_resp_p, "nunique"),
    conn_count=(col_uid, "count") if (col_uid and col_uid in df.columns) else (col_resp_h, "size"),
).reset_index()

# 7) Convert duration_sum → duration_mean safely
#    (avoid dividing zeros; cast to float32 to save memory)
agg_df["duration_mean"] = (
    agg_df["duration_sum"] / agg_df["conn_count"].replace(0, np.nan)
).fillna(0).astype("float32")
agg_df.drop(columns=["duration_sum"], inplace=True)

# 8) Final tidy + dtypes
agg_df.rename(columns={col_orig_h: "device_id", col_ts: "window_end"}, inplace=True)
for c in ["orig_bytes_sum", "resp_bytes_sum", "orig_pkts_sum", "resp_pkts_sum",
          "unique_dest_ips", "unique_dest_ports", "conn_count"]:
    if c in agg_df.columns:
        # downcast numerics to save memory
        kind = "integer" if c != "duration_mean" else "float"
        agg_df[c] = pd.to_numeric(agg_df[c], downcast=("integer" if kind=="integer" else None))

feature_df = agg_df[
    ["device_id", "window_end",
     "orig_bytes_sum", "resp_bytes_sum",
     "orig_pkts_sum", "resp_pkts_sum",
     "duration_mean",
     "unique_dest_ips", "unique_dest_ports",
     "conn_count"]
].fillna(0)

print("\nSuccessfully aggregated data into 1-minute windows for each device.")
print(f"Created feature table with {len(feature_df)} device-minute rows "
      f"and {feature_df.shape[1]} columns.")
display(feature_df.head(10))


--- Starting Feature Engineering ---

Successfully aggregated data into 1-minute windows for each device.
Created feature table with 819 device-minute rows and 10 columns.


Unnamed: 0,device_id,window_end,orig_bytes_sum,resp_bytes_sum,orig_pkts_sum,resp_pkts_sum,duration_mean,unique_dest_ips,unique_dest_ports,conn_count
0,0.0.0.0,2025-09-18 18:09:00+00:00,600,0,2,0,0.001393,1,1,1
1,0.0.0.0,2025-09-18 18:15:00+00:00,0,0,1,0,0.0,1,1,1
2,0.0.0.0,2025-09-18 18:26:00+00:00,600,0,2,0,0.001184,1,1,1
3,0.0.0.0,2025-09-18 18:43:00+00:00,600,0,2,0,0.00121,1,1,1
4,0.0.0.0,2025-09-18 18:51:00+00:00,0,0,1,0,0.0,1,1,1
5,172.31.0.1,2025-09-10 19:09:00+00:00,4592,0,7,0,6.76428,1,1,1
6,172.31.0.1,2025-09-10 19:11:00+00:00,1028,0,6,0,0.779735,1,1,1
7,172.31.0.1,2025-09-10 19:13:00+00:00,1028,0,6,0,0.779558,1,1,1
8,172.31.0.1,2025-09-10 19:15:00+00:00,1028,0,6,0,0.78896,1,1,1
9,172.31.0.1,2025-09-10 19:17:00+00:00,1028,0,6,0,0.781822,1,1,1


In [14]:
# Cell 4: Process Suricata Alerts (robust + aligned with Zeek features)

import pandas as pd
import numpy as np

print("\n--- Starting Suricata Alert Processing ---")

# 0) Guard: handle empty DF
if df_suricata_raw is None or df_suricata_raw.empty:
    print("No Suricata data frame found or it's empty. Skipping alert features.")
    suricata_features_df = pd.DataFrame(columns=["device_id","window_end","alert_count","unique_alert_signatures"])
else:
    # 1) Keep only alert events
    alert_mask = (df_suricata_raw.get("event_type") == "alert")
    alert_data = df_suricata_raw[alert_mask].copy()
    if alert_data.empty:
        print("No Suricata 'alert' events found for this day.")
        suricata_features_df = pd.DataFrame(columns=["device_id","window_end","alert_count","unique_alert_signatures"])
    else:
        # 2) Flatten alert field for signature/severity if present
        if "alert" in alert_data.columns:
            al = pd.json_normalize(alert_data["alert"])
            al.index = alert_data.index
            alert_data = alert_data.join(al, rsuffix="_flat")
            # Standard column name for signature
            if "signature" not in alert_data.columns and "alert.signature" in alert_data.columns:
                alert_data["signature"] = alert_data["alert.signature"]
        else:
            # If no 'alert' dicts, just create a placeholder signature column
            alert_data["signature"] = None

        # 3) Robust timestamp parsing (strings like "2025-09-18T18:06:18.123Z")
        #    Use utc=True instead of tz_convert; handles Z/offsets safely.
        alert_data["timestamp"] = pd.to_datetime(alert_data["timestamp"], utc=True, errors="coerce")

        # 4) Determine Zeek time window (feature_df from Cell 3)
        time_col = "window_end" if "window_end" in feature_df.columns else ("ts" if "ts" in feature_df.columns else None)
        if time_col is None or feature_df.empty:
            # Fallback: no Zeek range available — just use all Suricata alerts
            min_time = alert_data["timestamp"].min()
            max_time = alert_data["timestamp"].max()
        else:
            min_time = feature_df[time_col].min()
            max_time = feature_df[time_col].max()

        # 5) Filter Suricata to Zeek’s time span
        in_range = alert_data["timestamp"].between(min_time, max_time, inclusive="both")
        alert_data = alert_data[in_range].copy()
        print(f"Filtered to {len(alert_data)} Suricata alert events within Zeek's time range.")

        if alert_data.empty:
            suricata_features_df = pd.DataFrame(columns=["device_id","window_end","alert_count","unique_alert_signatures"])
        else:
            # 6) Group by src_ip and 1-minute buckets to build features
            #    Match Zeek’s minute label (we used label='right' there).
            grouped = alert_data.groupby(
                ["src_ip", pd.Grouper(key="timestamp", freq="1min", label="right")]
            )

            suricata_features_df = grouped.agg(
                alert_count=("signature", "count"),
                unique_alert_signatures=("signature", pd.Series.nunique)
            ).reset_index()

            # 7) Align column names with Zeek features for an easy merge
            suricata_features_df.rename(columns={
                "src_ip": "device_id",
                "timestamp": "window_end",
            }, inplace=True)

            # 8) Dtypes & fill
            suricata_features_df["alert_count"] = pd.to_numeric(
                suricata_features_df["alert_count"], downcast="integer"
            )
            suricata_features_df["unique_alert_signatures"] = pd.to_numeric(
                suricata_features_df["unique_alert_signatures"], downcast="integer"
            )

            print(f"\nSuccessfully created Suricata feature table with {len(suricata_features_df)} device-minute rows.")
            display(suricata_features_df.head(10))



--- Starting Suricata Alert Processing ---
Filtered to 667 Suricata alert events within Zeek's time range.

Successfully created Suricata feature table with 46 device-minute rows.


Unnamed: 0,device_id,window_end,alert_count,unique_alert_signatures
0,192.168.1.110,2025-09-18 16:51:00+00:00,1,1
1,192.168.1.110,2025-09-18 16:54:00+00:00,1,1
2,192.168.1.110,2025-09-18 16:59:00+00:00,1,1
3,192.168.1.110,2025-09-18 17:04:00+00:00,1,1
4,192.168.1.110,2025-09-18 17:09:00+00:00,1,1
5,192.168.1.110,2025-09-18 17:14:00+00:00,1,1
6,192.168.1.110,2025-09-18 17:19:00+00:00,1,1
7,192.168.1.110,2025-09-18 17:24:00+00:00,1,1
8,192.168.1.110,2025-09-18 17:30:00+00:00,1,1
9,192.168.1.110,2025-09-18 17:35:00+00:00,1,1


In [15]:
# Cell 5: Combine Zeek and Suricata Features (aligned with new columns)

import pandas as pd
import numpy as np

print("--- Combining Zeek and Suricata Feature Sets ---")

# 1) Normalize Zeek columns to ['device_id', 'window_end', ...]
zf = feature_df.copy()
z_renames = {}
if "id.orig_h" in zf.columns: z_renames["id.orig_h"] = "device_id"
if "ts" in zf.columns:        z_renames["ts"]        = "window_end"
zf = zf.rename(columns=z_renames)

# Ensure required columns exist
assert "device_id" in zf.columns,  "Zeek features missing 'device_id'"
assert "window_end" in zf.columns, "Zeek features missing 'window_end'"

# Coerce Zeek window_end to UTC datetime (minute buckets)
zf["window_end"] = pd.to_datetime(zf["window_end"], utc=True, errors="coerce")

# 2) Normalize Suricata columns to ['device_id', 'window_end', 'alert_count', 'unique_alert_signatures']
sf = suricata_features_df.copy() if 'suricata_features_df' in locals() else pd.DataFrame()

if not sf.empty:
    s_renames = {}
    if "src_ip" in sf.columns:     s_renames["src_ip"]     = "device_id"
    if "timestamp" in sf.columns:  s_renames["timestamp"]  = "window_end"
    sf = sf.rename(columns=s_renames)

    # Coerce Suricata window_end to UTC datetime (minute buckets)
    if "window_end" in sf.columns:
        sf["window_end"] = pd.to_datetime(sf["window_end"], utc=True, errors="coerce")

    # Ensure alert columns exist (create if missing)
    if "alert_count" not in sf.columns:
        sf["alert_count"] = 0
    if "unique_alert_signatures" not in sf.columns:
        sf["unique_alert_signatures"] = 0
else:
    # Build an empty frame with expected columns for a painless merge
    sf = pd.DataFrame(columns=["device_id", "window_end", "alert_count", "unique_alert_signatures"])

# 3) Merge on device_id + window_end (left = Zeek minutes)
combined_features_df = zf.merge(
    sf[["device_id", "window_end", "alert_count", "unique_alert_signatures"]],
    on=["device_id", "window_end"],
    how="left"
)

# 4) Fill NAs for alert features and downcast dtypes
for col in ["alert_count", "unique_alert_signatures"]:
    if col in combined_features_df.columns:
        combined_features_df[col] = (
            pd.to_numeric(combined_features_df[col], errors="coerce")
            .fillna(0)
            .astype("int32")
        )
    else:
        combined_features_df[col] = np.int32(0)

# Optional: sort for readability
combined_features_df = combined_features_df.sort_values(["device_id", "window_end"]).reset_index(drop=True)

print("Successfully merged Zeek and Suricata features!")
print(f"Final feature table has {len(combined_features_df)} rows and {combined_features_df.shape[1]} columns.")

# Preview — select common training columns if present
preview_cols = [c for c in [
    "device_id", "window_end",
    "orig_bytes_sum", "resp_bytes_sum",
    "orig_pkts_sum", "resp_pkts_sum",
    "duration_mean",
    "unique_dest_ips", "unique_dest_ports",
    "conn_count",
    "alert_count", "unique_alert_signatures"
] if c in combined_features_df.columns]

display(combined_features_df[preview_cols].head(10) if preview_cols else combined_features_df.head(10))


--- Combining Zeek and Suricata Feature Sets ---
Successfully merged Zeek and Suricata features!
Final feature table has 819 rows and 12 columns.


Unnamed: 0,device_id,window_end,orig_bytes_sum,resp_bytes_sum,orig_pkts_sum,resp_pkts_sum,duration_mean,unique_dest_ips,unique_dest_ports,conn_count,alert_count,unique_alert_signatures
0,0.0.0.0,2025-09-18 18:09:00+00:00,600,0,2,0,0.001393,1,1,1,0,0
1,0.0.0.0,2025-09-18 18:15:00+00:00,0,0,1,0,0.0,1,1,1,0,0
2,0.0.0.0,2025-09-18 18:26:00+00:00,600,0,2,0,0.001184,1,1,1,0,0
3,0.0.0.0,2025-09-18 18:43:00+00:00,600,0,2,0,0.00121,1,1,1,0,0
4,0.0.0.0,2025-09-18 18:51:00+00:00,0,0,1,0,0.0,1,1,1,0,0
5,172.31.0.1,2025-09-10 19:09:00+00:00,4592,0,7,0,6.76428,1,1,1,0,0
6,172.31.0.1,2025-09-10 19:11:00+00:00,1028,0,6,0,0.779735,1,1,1,0,0
7,172.31.0.1,2025-09-10 19:13:00+00:00,1028,0,6,0,0.779558,1,1,1,0,0
8,172.31.0.1,2025-09-10 19:15:00+00:00,1028,0,6,0,0.78896,1,1,1,0,0
9,172.31.0.1,2025-09-10 19:17:00+00:00,1028,0,6,0,0.781822,1,1,1,0,0


In [19]:
# Cell 6: Save Combined Features to S3
import sagemaker
import pandas as pd

print("--- Saving final feature set to S3 ---")

# Ensure key dtypes are friendly for Parquet
if "device_id" in combined_features_df.columns:
    combined_features_df["device_id"] = combined_features_df["device_id"].astype("string")
if "window_end" in combined_features_df.columns:
    combined_features_df["window_end"] = pd.to_datetime(combined_features_df["window_end"], utc=True, errors="coerce")

sagemaker_session = sagemaker.Session()
bucket = sagemaker_session.default_bucket()
prefix = "iot-intrusion-detection/features"
output_path = f"s3://{bucket}/{prefix}/combined_features.parquet"

# If it's totally empty, write the schema (0 rows) so downstream still works
to_save = combined_features_df if len(combined_features_df) else combined_features_df.head(0)

# Write to S3 (pandas will use s3fs/pyarrow under the hood)
to_save.to_parquet(output_path, index=False)

print(f"Successfully saved feature set to: {output_path}")

--- Saving final feature set to S3 ---
Successfully saved feature set to: s3://sagemaker-us-east-2-696680564117/iot-intrusion-detection/features/combined_features.parquet
