In [None]:
# ==========================================
# --- PHASE I: DATA PREPARATION & CAMPAIGN-READY STAGING ---
# ==========================================
# Ingesting raw JSON telemetry and staging enriched dataframes for behavioral analysis

import json
import logging
from pathlib import Path
from typing import Tuple

import pandas as pd
import numpy as np

# Configure standard logging for pipeline health
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

def ingest_telemetry(file_pattern: str, data_dir: str = '.') -> pd.DataFrame:
    """
    Ingests Cowrie JSON telemetry files into a pandas DataFrame.

    Args:
        file_pattern: The glob pattern for the JSON log files.
        data_dir: The relative directory containing the telemetry files.

    Returns:
        pd.DataFrame: The raw telemetry dataframe with parsed timestamps.
    """
    paths = sorted(Path(data_dir).glob(file_pattern))
    data_list = []
    error_count = 0

    for file_path in paths:
        with file_path.open('r', encoding='utf-8') as f:
            for line in f:
                try:
                    data_list.append(json.loads(line))
                except json.JSONDecodeError:
                    error_count += 1
                    continue

    if error_count > 0:
        logger.warning("Skipped %d malformed JSON lines during ingestion.", error_count)

    df = pd.DataFrame(data_list)
    if not df.empty:
        df['timestamp'] = pd.to_datetime(df['timestamp'], errors='coerce')

    return df

def stage_command_telemetry(df: pd.DataFrame) -> Tuple[pd.DataFrame, pd.DataFrame]:
    """
    Isolates shell interactions and stages unique command playbooks.

    Args:
        df: The base telemetry dataframe.

    Returns:
        Tuple containing the command dataframe and the session sequences dataframe.
    """
    cmd_df = df[df['eventid'] == 'cowrie.command.input'].copy()
    cmd_df = cmd_df.sort_values(['session', 'timestamp'])

    # Calculate temporal deltas for ML features
    cmd_df['time_delta'] = cmd_df.groupby('session')['timestamp'].diff().dt.total_seconds()

    # Sequence Fingerprinting for Phase XIV Campaign Correlation
    cmd_df['input_clean'] = cmd_df['input'].astype(str).str.strip()
    session_sequences = cmd_df.groupby('session')['input_clean'].apply(lambda x: " | ".join(x)).reset_index()
    session_sequences.columns = ['session', 'command_sequence']

    return cmd_df, session_sequences

def harvest_credentials(df: pd.DataFrame, top_n: int = 25) -> pd.DataFrame:
    """
    Aggregates brute-force attempts to identify top credential pairs.

    Args:
        df: The base telemetry dataframe.
        top_n: Number of top credentials to return.

    Returns:
        pd.DataFrame: Dataframe of top aggregated credentials.
    """
    creds = df[df['eventid'].isin(['cowrie.login.success', 'cowrie.login.failed'])].copy()
    top_creds = creds.groupby(['username', 'password']).size().reset_index(name='count')
    return top_creds.sort_values('count', ascending=False).head(top_n)

def print_mission_summary(df: pd.DataFrame, cmd_df: pd.DataFrame, top_creds: pd.DataFrame, session_sequences: pd.DataFrame) -> None:
    """Prints a formatted operational summary of the staged telemetry."""
    start_time = df['timestamp'].min()
    end_time = df['timestamp'].max()
    soak_duration = (end_time - start_time).total_seconds() / 3600

    print("\n" + "="*60)
    print("### TELEMETRY STAGING COMPLETE: MISSION DATA READY ###")
    print(f"Total Events Ingested: {len(df):,}")
    print(f"Operational Soak Time: {soak_duration:.1f} Hours")
    print(f"Event Density: {len(df)/soak_duration:.1f} events/hour")
    print("-" * 60)
    print(f"  * Command Telemetry: {len(cmd_df):,} records staged.")
    print(f"  * Credential Dictionary: {len(top_creds):,} unique pairs identified.")
    print(f"  * Campaign Sequences: {len(session_sequences):,} playbooks mapped.")
    print("="*60 + "\n")

# --- EXECUTION BLOCK ---
if __name__ == "__main__":

    # Pointing to a standard data repository structure
    DATA_DIRECTORY = '.'
    FILE_PATTERN = 'cowrie.json.2026-02-0*'

    # 1 & 2. Telemetry Ingestion & Master DF Construction
    df = ingest_telemetry(file_pattern=FILE_PATTERN, data_dir=DATA_DIRECTORY)

    if not df.empty:
        # 3 & 5. Command Staging & Sequence Fingerprinting
        cmd_df, session_sequences = stage_command_telemetry(df)

        # 4. Credential Harvesting
        top_creds = harvest_credentials(df)

        # 6. Operational Summary
        print_mission_summary(df, cmd_df, top_creds, session_sequences)
    else:
        logger.error("CRITICAL ERROR: NO TELEMETRY FOUND.")
        logger.info("Action Required: Verify FILE_PATTERN and ensure cowrie.json files exist in the data directory.")

In [None]:
# ==========================================
# --- PHASE II: ENRICHMENT (Geospatial Attribution) ---
# ==========================================
# Enriching raw IP telemetry with geographic metadata for regional risk profiling

import time
import logging
import requests
import pandas as pd
from typing import Dict

# Reuse the logger from Phase I if integrated, otherwise configure a new one
logger = logging.getLogger(__name__)

# --- CONSTANTS ---
IP_API_URL = "http://ip-api.com/json/{ip}"
REQUEST_TIMEOUT = 5
API_SLEEP_DELAY = 0.5
TOP_N_IPS = 20

def fetch_country_metadata(ip: str, session: requests.Session) -> str:
    """
    Fetches country metadata for a given IP with defensive error handling.

    Args:
        ip: The target IP address.
        session: An active requests.Session object for connection pooling.

    Returns:
        str: The resolved country name, or an error/status string.
    """
    try:
        response = session.get(IP_API_URL.format(ip=ip), timeout=REQUEST_TIMEOUT)

        # Handle API Throttling (HTTP 429)
        if response.status_code == 429:
            logger.warning("Rate limit hit for IP: %s", ip)
            return 'Rate Limited'

        response.raise_for_status()  # Catch other HTTP errors (404, 500, etc.)
        data = response.json()
        return data.get('country', 'Unknown')

    except requests.exceptions.RequestException as e:
        logger.error("Network error fetching geo-data for IP %s: %s", ip, e)
        return 'Lookup Failed'
    except ValueError:
        logger.error("Failed to parse JSON response for IP %s", ip)
        return 'Lookup Failed'

def enrich_geospatial_data(df: pd.DataFrame, top_n: int = TOP_N_IPS) -> pd.DataFrame:
    """
    Enriches the dataframe with geographic metadata for top volume sources.

    Args:
        df: The telemetry dataframe containing a 'src_ip' column.
        top_n: Number of high-volume IPs to process.

    Returns:
        pd.DataFrame: The enriched dataframe.
    """
    if 'src_ip' not in df.columns:
        logger.error("CRITICAL: 'src_ip' column missing from dataframe. Skipping enrichment.")
        return df

    unique_ips = df['src_ip'].value_counts().head(top_n).index.tolist()
    logger.info("[*] Initializing Geospatial Enrichment for %d unique high-volume nodes...", len(unique_ips))

    geo_map: Dict[str, str] = {}

    # Utilizing a Session context manager for efficient connection pooling
    with requests.Session() as session:
        for ip in unique_ips:
            geo_map[ip] = fetch_country_metadata(ip, session)
            # Sleep to respect API fair-use policies and ensure data integrity
            time.sleep(API_SLEEP_DELAY)

    # Map Intelligence back to Master Telemetry
    df['country'] = df['src_ip'].map(geo_map).fillna('Other/Low Volume')
    return df

def print_geospatial_summary(df: pd.DataFrame) -> None:
    """Prints a formatted operational summary of the geospatial attribution."""
    if 'country' not in df.columns:
        return

    enriched_mask = df['country'] != 'Other/Low Volume'
    if not enriched_mask.any():
        logger.warning("No geospatial data was successfully mapped.")
        return

    top_origin = df.loc[enriched_mask, 'country'].value_counts().idxmax()
    coverage_pct = (enriched_mask.sum() / len(df)) * 100
    unique_regions = df.loc[enriched_mask, 'country'].nunique()

    print("\n" + "="*60)
    print("### GEOSPATIAL ENRICHMENT: ATTRIBUTION READY ###")
    print(f"Enrichment Coverage: {coverage_pct:.1f}% of total event volume.")
    print(f"Primary Regional Vector: {top_origin}")
    print(f"Operational Observation: High-volume sources successfully mapped to {unique_regions} distinct regions.")
    print("Assessment: Attribution data successfully injected into master telemetry for Phase V mapping.")
    print("="*60 + "\n")

# --- EXECUTION BLOCK ---
if __name__ == "__main__":
    # Assuming 'df' exists from Phase I
    try:
        df = enrich_geospatial_data(df)
        print_geospatial_summary(df)
    except NameError:
        logger.error("CRITICAL ERROR: 'df' is not defined. Ensure Phase I executed successfully.")

In [None]:
# ==========================================
# PHASE III: ML PIPELINE (ANOMALY, CLUSTERING, EXPLAINABILITY)
# ==========================================

import logging
import pandas as pd
import numpy as np
from typing import List, Dict, Any, Tuple, Optional

from sklearn.preprocessing import StandardScaler
from sklearn.ensemble import IsolationForest
from sklearn.cluster import KMeans
from sklearn.tree import DecisionTreeClassifier, export_text
from scipy.stats import entropy

# Initialize logger for Phase III
logger = logging.getLogger(__name__)

# ------------------------------------------
# PHASE III-A: FEATURE ENGINEERING & ISOLATION FOREST
# ------------------------------------------

def extract_session_features(cmd_df: pd.DataFrame, min_deltas: int = 3) -> pd.DataFrame:
    """Extracts behavioral features from command telemetry for ML modeling."""
    if cmd_df.empty or 'session' not in cmd_df.columns:
        logger.warning("Command telemetry is empty or missing 'session'. Cannot extract features.")
        return pd.DataFrame()

    session_features: List[Dict[str, Any]] = []

    for session_id, group in cmd_df.groupby("session"):
        deltas = group["time_delta"].dropna()

        if len(deltas) < min_deltas:
            continue

        session_features.append({
            "session_id": session_id,
            "mean_delta": deltas.mean(),
            "entropy_delta": entropy(np.histogram(deltas, bins=10)[0] + 1),
            "burst_ratio": (deltas < 0.5).mean(),
            "command_complexity": group['input'].astype(str).str.len().mean(),
            "unique_commands": group['input'].nunique()
        })

    return pd.DataFrame(session_features)

def detect_anomalies(features_df: pd.DataFrame, contamination_rate: float = 0.10, min_samples: int = 5) -> pd.DataFrame:
    """Applies an Isolation Forest to detect anomalous interactive sessions."""
    if len(features_df) < min_samples:
        logger.warning("PIPELINE DEFERRED: Insufficient data volume for Isolation Forest (requires >= %d).", min_samples)
        features_df_fallback = features_df.copy()
        features_df_fallback['anomaly_score'] = 1
        features_df_fallback['predicted_label'] = "Scanner/Low Volume"
        return features_df_fallback

    feature_cols = [col for col in features_df.columns if col != "session_id"]

    scaler = StandardScaler()
    X_scaled = scaler.fit_transform(features_df[feature_cols])

    iso_forest = IsolationForest(contamination=contamination_rate, random_state=42)

    results_df = features_df.copy()
    results_df['anomaly_score'] = iso_forest.fit_predict(X_scaled)

    return results_df

def print_isolation_summary(features_df: pd.DataFrame) -> None:
    """Prints a formatted operational summary of the Sentinel Triage phase."""
    if 'anomaly_score' not in features_df.columns:
        return

    anomalies_detected = len(features_df[features_df['anomaly_score'] == -1])

    print("\n" + "="*60)
    print("### PHASE III-A: SENTINEL TRIAGE COMPLETE ###")
    print(f"Total Sessions Processed: {len(features_df)}")
    print(f"High-Risk Outliers Isolated: {anomalies_detected}")
    print("="*60 + "\n")

# ------------------------------------------
# PHASE III-B: K-MEANS CLUSTERING & TAXONOMY
# ------------------------------------------

def evaluate_centroid_heuristics(centroids_df: pd.DataFrame) -> Dict[int, str]:
    """Evaluates cluster centroids to assign human-readable taxonomy labels."""
    cluster_labels = {}
    for i, row in centroids_df.iterrows():
        if row.get('burst_ratio', 0) > 0.5 and row.get('command_complexity', 0) > 0.5:
            cluster_labels[i] = "AI/LLM Agent (Burst-Think)"
        elif row.get('entropy_delta', 0) > 0.5:
            cluster_labels[i] = "Human"
        else:
            cluster_labels[i] = "Automated Bot"
    return cluster_labels

def apply_kmeans_clustering(features_df: pd.DataFrame, n_clusters: int = 3, min_samples: int = 5) -> Tuple[pd.DataFrame, Dict[int, str]]:
    """Applies K-Means clustering to categorize session behaviors."""
    cluster_labels: Dict[int, str] = {}
    results_df = features_df.copy()

    if len(results_df) < min_samples:
        logger.warning("PIPELINE DEFERRED: Insufficient data volume for K-Means (requires >= %d).", min_samples)
        if 'predicted_label' not in results_df.columns:
            results_df['predicted_label'] = "Scanner/Low Volume"
        return results_df, cluster_labels

    exclude_cols = ['session_id', 'anomaly_score', 'predicted_label', 'cluster_id']
    feature_cols = [col for col in results_df.columns if col not in exclude_cols]

    scaler = StandardScaler()
    X_scaled = scaler.fit_transform(results_df[feature_cols])

    kmeans = KMeans(n_clusters=n_clusters, random_state=42, n_init=10)
    results_df['cluster_id'] = kmeans.fit_predict(X_scaled)

    centroids_df = pd.DataFrame(kmeans.cluster_centers_, columns=feature_cols)
    cluster_labels = evaluate_centroid_heuristics(centroids_df)
    results_df['predicted_label'] = results_df['cluster_id'].map(cluster_labels)

    return results_df, cluster_labels

def print_taxonomy_summary(cluster_labels: Dict[int, str]) -> None:
    """Prints a formatted operational summary of the taxonomy mappings."""
    if not cluster_labels:
        return

    print("\n" + "="*60)
    print("### PHASE III-B: TAXONOMY CLUSTERING COMPLETE ###")
    print("Centroid Mappings Derived:")
    for cluster_id, label in cluster_labels.items():
        print(f"  -> Cluster {cluster_id} mapped to: {label}")
    print("="*60 + "\n")

# ------------------------------------------
# PHASE III-C: EXPLAINABILITY & DECISION TREE
# ------------------------------------------

def train_surrogate_explainer(features_df: pd.DataFrame, max_depth: int = 3, min_samples: int = 5) -> Tuple[Optional[DecisionTreeClassifier], Optional[str], Optional[str]]:
    """Trains a Decision Tree surrogate to explain the behavioral clustering labels."""
    if len(features_df) < min_samples:
        logger.warning("PIPELINE DEFERRED: Insufficient data volume for Decision Tree Explainer (requires >= %d).", min_samples)
        return None, None, None

    if 'predicted_label' not in features_df.columns:
        logger.error("CRITICAL ERROR: 'predicted_label' missing. Ensure Phase III-B ran successfully.")
        return None, None, None

    if features_df['predicted_label'].nunique() <= 1:
        logger.warning("Only one unique taxonomy label found. Surrogate tree cannot perform mathematical splits.")
        return None, None, None

In [None]:
# ==========================================
# PHASE IV: THREAT COMMAND SUMMARY GENERATION
# ==========================================
# Forensic audit of un-truncated command strings to determine tactical objectives

import logging
import pandas as pd
from typing import List, Dict, Any

# Initialize logger for Phase IV
logger = logging.getLogger(__name__)

# --- TACTICAL HEURISTICS ---
RECON_KEYWORDS = ['uname', 'ls', 'whoami', 'id', 'cat /proc', 'ifconfig', 'netstat']
PAYLOAD_KEYWORDS = ['wget', 'curl', 'tftp', 'ftpget', 'scp']

def extract_top_commands(cmd_df: pd.DataFrame, top_n: int = 10) -> pd.DataFrame:
    """
    Aggregates and sorts un-truncated shell commands by frequency.

    Args:
        cmd_df: Dataframe containing command telemetry ('input' column).
        top_n: Number of top commands to extract.

    Returns:
        pd.DataFrame: Dataframe of top commands and their frequencies.
    """
    if cmd_df.empty or 'input' not in cmd_df.columns:
        logger.warning("Command dataframe is empty or missing 'input'. Cannot extract dossier.")
        return pd.DataFrame()

    top_cmds = cmd_df['input'].value_counts().head(top_n).reset_index()
    top_cmds.columns = ['Full_Command', 'Frequency']
    return top_cmds

def evaluate_tactical_intent(top_cmds: pd.DataFrame, recon_keys: List[str] = RECON_KEYWORDS, payload_keys: List[str] = PAYLOAD_KEYWORDS) -> Dict[str, str]:
    """
    Analyzes the primary command payloads to determine the tactical objective.

    Args:
        top_cmds: Dataframe of aggregated top commands.
        recon_keys: List of keywords indicating reconnaissance.
        payload_keys: List of keywords indicating payload delivery.

    Returns:
        Dict containing the tactical assessment strings.
    """
    if top_cmds.empty:
        return {}

    # Safely extract the most frequent command
    primary_payload = str(top_cmds.iloc[0]['Full_Command']).lower()
    top_frequency = top_cmds.iloc[0]['Frequency']

    is_recon = any(k in primary_payload for k in recon_keys)
    is_delivery = any(k in primary_payload for k in payload_keys)

    # Determine Intelligence Summaries
    if is_recon:
        primary_vector = "System Discovery/Reconnaissance"
        mitre_stage = "Discovery"
    elif is_delivery:
        primary_vector = "Malware Ingress/Staging"
        mitre_stage = "Execution"
    else:
        primary_vector = "General Probing"
        mitre_stage = "Unknown/Initial Access"

    delivery_method = "scripted/automated" if top_frequency > 10 else "bespoke/interactive"

    return {
        "primary_vector": primary_vector,
        "delivery_method": delivery_method,
        "mitre_stage": mitre_stage
    }

def print_threat_summary(top_cmds: pd.DataFrame, total_unique_cmds: int, assessment: Dict[str, str]) -> None:
    """Prints a formatted operational dossier and tactical intelligence summary."""
    if top_cmds.empty or not assessment:
        return

    # 1. Forensic Header & Iterative Audit
    print("\n" + "="*60)
    print("### FULL COMMAND DOSSIER: UN-TRUNCATED PAYLOAD ANALYSIS ###")
    print("="*60)

    for i, row in top_cmds.iterrows():
        print(f"RANK: {i+1} | HITS: {row['Frequency']}")
        print(f"PAYLOAD: {row['Full_Command']}")
        print("-" * 60)

    # 2. Professional Intelligence Summary
    print("\n" + "="*60)
    print("### TACTICAL ASSESSMENT: ATTACKER INTENT AUDIT ###")
    print(f"Total Command Diversity: {total_unique_cmds:,} unique strings harvested.")
    print(f"Primary Vector: {assessment.get('primary_vector')}")
    print(f"Operational Observation: The high frequency of the top payload indicates a {assessment.get('delivery_method')} delivery method.")
    print(f"Assessment: Tactics are consistent with the '{assessment.get('mitre_stage')}' stage of the MITRE ATT&CK Matrix.")
    print("="*60 + "\n")

# ------------------------------------------
# EXECUTION BLOCK
# ------------------------------------------
if __name__ == "__main__":
    # Assuming 'cmd_df' exists from Phase I/II
    try:
        # 1. Extraction
        top_cmds_df = extract_top_commands(cmd_df)

        if not top_cmds_df.empty:
            # 2. Assessment
            total_unique = cmd_df['input'].nunique()
            tactical_assessment = evaluate_tactical_intent(top_cmds_df)

            # 3. Operational Summary
            print_threat_summary(top_cmds_df, total_unique, tactical_assessment)
        else:
            logger.error("No valid commands found to generate Threat Summary.")

    except NameError:
        logger.error("CRITICAL ERROR: 'cmd_df' is not defined. Ensure Phase I/II executed successfully.")

In [None]:
# ==========================================
# PHASE V: EXECUTIVE VISUALIZATION & REPORTING ENGINE
# ==========================================
# Generates high-impact visual analytics for behavioral classification,
# tactical intent, geographic origins, and credential targeting.

import logging
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from typing import Optional, Dict, Any, Tuple

# Initialize logger for Phase V
logger = logging.getLogger(__name__)

# --- GLOBAL CONSTANTS ---
TAXONOMY_ORDER = ["Automated Bot", "Human", "AI/LLM Agent (Burst-Think)"]
PALETTE_STYLE = "magma"
RECON_INDICATORS = ['uname', 'ls', 'whoami', 'id', 'cat /proc', 'netstat']
MAX_DISPLAY_LEN = 55
TOP_N_COUNTRIES = 10
PALETTE_GEO = 'viridis'
HEATMAP_CMAP = "YlGnBu"
REQUIRED_COLS = ['username', 'password', 'count']

# ==========================================
# PHASE V-A: ATTACKER CLASSIFICATION (LOG SCALE)
# ==========================================

def prepare_census_data(cmd_df: pd.DataFrame, features_df: pd.DataFrame) -> Optional[pd.DataFrame]:
    """Merges command telemetry with behavioral labels to create a deduplicated census dataset."""
    if features_df.empty or 'predicted_label' not in features_df.columns:
        logger.warning("VISUALIZATION DEFERRED: Pipeline requires valid 'predicted_label' column.")
        return None

    if cmd_df.empty or 'session' not in cmd_df.columns:
        logger.warning("VISUALIZATION DEFERRED: Command telemetry is empty or missing 'session'.")
        return None

    viz_df = cmd_df.merge(
        features_df[['session_id', 'predicted_label']],
        left_on='session',
        right_on='session_id',
        how='inner'
    )
    return viz_df.drop_duplicates('session')

def plot_attacker_census(viz_df: pd.DataFrame) -> None:
    """Generates a log-scale countplot of the behavioral taxonomy."""
    plt.figure(figsize=(10, 6))

    ax = sns.countplot(
        data=viz_df,
        x='predicted_label',
        hue='predicted_label',
        palette=PALETTE_STYLE,
        order=TAXONOMY_ORDER,
        legend=False
    )

    total_sessions = len(viz_df)
    for p in ax.patches:
        height = p.get_height()
        if height > 0:
            percentage = f'{(100 * height / total_sessions):.1f}%'
            ax.annotate(
                percentage,
                (p.get_x() + p.get_width() / 2., height),
                ha='center', va='center',
                xytext=(0, 9),
                textcoords='offset points',
                fontsize=10, fontweight='bold'
            )

    ax.set_yscale("log")
    plt.title('Phase V-A: Attacker Classification Census (Logarithmic Distribution)', fontsize=14)
    plt.xlabel('Behavioral Taxonomy')
    plt.ylabel('Session Count (Log 10 Scale)')
    plt.xticks(rotation=15)
    plt.grid(axis='y', linestyle='--', alpha=0.3)
    plt.ylim(top=ax.get_ylim()[1] * 2)
    plt.tight_layout()
    plt.show()

def print_census_summary(viz_df: pd.DataFrame) -> None:
    """Prints a formatted operational intelligence summary of the threat landscape."""
    total_classified = len(viz_df)
    counts = viz_df['predicted_label'].value_counts()

    bot_count = counts.get("Automated Bot", 0)
    human_count = counts.get("Human", 0)
    ai_count = counts.get("AI/LLM Agent (Burst-Think)", 0)

    noise_ratio = (bot_count / total_classified) * 100 if total_classified > 0 else 0

    print("\n" + "="*60)
    print("### BEHAVIORAL CENSUS: THREAT LANDSCAPE SUMMARY ###")
    print(f"Total Classified Sessions: {total_classified:,}")
    print(f"Background Radiation (Bots): {noise_ratio:.1f}%")
    print(f"Verified Human Activity: {human_count:,} sessions")
    print(f"AI/LLM Agent Activity: {ai_count:,} sessions")

    print("\nOperational Assessment:")
    if ai_count == 0:
        print("-> Threat Ceiling: No high-entropy 'Burst-Think' patterns detected.")
        print("-> Risk Profile: Environment is currently limited to scripted automated probes.")
    else:
        print(f"-> Strategic Finding: Advanced agentic behavior identified in {ai_count} sessions.")

    print(f"Strategic Conclusion: Pipeline successfully filtered {noise_ratio:.1f}% of traffic as non-interactive noise.")
    print("="*60 + "\n")

# ==========================================
# PHASE V-B: TOP 10 COMMANDS (CONTEXTUAL AWARENESS)
# ==========================================

def prepare_top_commands(cmd_df: pd.DataFrame, top_n: int = 10) -> Optional[pd.DataFrame]:
    """Cleans and aggregates shell interactions to identify the top executed commands."""
    if cmd_df.empty or 'input' not in cmd_df.columns:
        logger.error("CRITICAL ERROR: Mission data missing or invalid 'input' column.")
        return None

    valid_cmds = cmd_df['input'].dropna().astype(str).str.strip()
    valid_cmds = valid_cmds[valid_cmds != ""]

    if valid_cmds.empty:
        logger.warning("VISUALIZATION DEFERRED: Telemetry contains only empty inputs.")
        return None

    top_cmds = valid_cmds.value_counts().head(top_n).reset_index()
    top_cmds.columns = ['Full_Command', 'Frequency']
    top_cmds['Display_Label'] = top_cmds['Full_Command'].apply(
        lambda x: f"{x[:MAX_DISPLAY_LEN]}..." if len(x) > MAX_DISPLAY_LEN else x
    )

    return top_cmds

def plot_top_commands(top_cmds: pd.DataFrame) -> None:
    """Generates a horizontal bar plot of the top executed commands."""
    plt.figure(figsize=(12, 7))

    sns.barplot(
        data=top_cmds,
        x='Frequency',
        y='Display_Label',
        hue='Display_Label',
        palette='flare',
        legend=False,
        edgecolor='black',
        alpha=0.9
    )

    plt.title('Phase V-B: Tactical Intent Audit (Top 10 Harvested Commands)', fontsize=14)
    plt.xlabel('Execution Count')
    plt.ylabel('Command Snippet')
    plt.grid(axis='x', linestyle='--', alpha=0.4)
    plt.tight_layout()
    plt.show()

def evaluate_command_cadence(top_cmds: pd.DataFrame) -> Tuple[str, bool, bool]:
    """Evaluates the tactical intent and cadence based on the top command distributions."""
    full_command_list = top_cmds['Full_Command'].tolist()
    frequency_list = top_cmds['Frequency'].tolist()

    primary_cmd = full_command_list[0]
    has_recon = any(ind in str(primary_cmd).lower() for ind in RECON_INDICATORS)

    if len(frequency_list) > 1:
        is_standardized = frequency_list[0] > (frequency_list[1] * 2)
    else:
        is_standardized = True

    return primary_cmd, has_recon, is_standardized

def print_tactical_intelligence(primary_cmd: str, has_recon: bool, is_standardized: bool) -> None:
    """Prints a formatted operational intelligence summary of the command payload audit."""
    display_cmd = f"{primary_cmd[:60]}..." if len(primary_cmd) > 60 else primary_cmd

    print("\n" + "="*60)
    print("### TACTICAL INTELLIGENCE: COMMAND PAYLOAD AUDIT ###")
    print(f"Primary Vector: {display_cmd}")
    print(f"Operational Observation: Command cadence focuses on {'Environment Discovery' if has_recon else 'Payload Delivery/Persistence'}.")
    print(f"Assessment: The high frequency of the top command suggests a {'standardized automated probe' if is_standardized else 'diverse interactive session'}.")
    print("Conclusion: Tactics align with TTPs commonly associated with opportunistic cloud-native botnets.")
    print("="*60 + "\n")

# ==========================================
# PHASE V-C: GEOGRAPHIC ORIGINS (GLOBAL THREAT MAP)
# ==========================================

def prepare_geo_data(df: pd.DataFrame, top_n: int = TOP_N_COUNTRIES) -> Optional[pd.DataFrame]:
    """Aggregates telemetry to find the top geographic origins of threat actors."""
    if df.empty or 'country' not in df.columns:
        logger.error("CRITICAL ERROR: Master dataframe missing or lacks 'country' column.")
        return None

    top_countries = df['country'].value_counts().head(top_n).reset_index()
    top_countries.columns = ['Country', 'Event_Count']

    if top_countries.empty:
        logger.warning("VISUALIZATION DEFERRED: No geographic data available.")
        return None

    return top_countries

def plot_geo_threats(top_countries: pd.DataFrame) -> None:
    """Generates a horizontal bar plot of the top geopolitical risk vectors."""
    plt.figure(figsize=(12, 6))

    sns.barplot(
        data=top_countries,
        x='Event_Count',
        y='Country',
        hue='Country',
        palette=PALETTE_GEO,
        legend=False,
        edgecolor='black',
        alpha=0.8
    )

    plt.title('Phase V-C: Geospatial Threat Attribution (Top 10 Origins)', fontsize=14)
    plt.xlabel('Aggregated Event Volume')
    plt.ylabel('Attacker Country of Origin')
    plt.grid(axis='x', linestyle='--', alpha=0.4)
    plt.tight_layout()
    plt.show()

def evaluate_geo_intel(top_countries: pd.DataFrame, total_events: int, total_unique_countries: int) -> Dict[str, Any]:
    """Evaluates the geographic concentration to generate operational intelligence."""
    primary_origin = top_countries.iloc[0]['Country']
    primary_volume = top_countries.iloc[0]['Event_Count']
    is_concentrated = primary_volume > (total_events / 2)

    return {
        "primary_origin": primary_origin,
        "primary_volume": primary_volume,
        "total_countries": total_unique_countries,
        "is_concentrated": is_concentrated
    }

def print_geo_summary(intel: Dict[str, Any]) -> None:
    """Prints a formatted operational intelligence summary of the geospatial data."""
    if not intel:
        return

    concentration_status = 'highly concentrated' if intel.get('is_concentrated') else 'globally distributed'
    primary_origin = intel.get('primary_origin')

    print("\n" + "="*60)
    print("### GEOSPATIAL INTELLIGENCE: REGIONAL RISK VECTORS ###")
    print(f"Primary Origin Node: {primary_origin} ({intel.get('primary_volume'):,} events)")
    print(f"Global Reach: {intel.get('total_countries')} distinct nations identified in harvest.")
    print(f"Operational Observation: Traffic is {concentration_status}.")
    print(f"Assessment: Geopolitical heatmap suggests a dominance of {primary_origin}-based cloud infrastructure in the current botnet lifecycle.")
    print("="*60 + "\n")

# ==========================================
# PHASE V-D: TARGETED CREDENTIAL HEATMAP
# ==========================================

def prepare_credential_pivot(top_creds: pd.DataFrame) -> Optional[pd.DataFrame]:
    """Pivots the aggregated credential pairs into a matrix for heatmap visualization."""
    if top_creds.empty:
        logger.info("CREDENTIAL INTELLIGENCE: No login attempts captured during this window.")
        return None

    missing_cols = [col for col in REQUIRED_COLS if col not in top_creds.columns]
    if missing_cols:
        logger.error(f"CRITICAL ERROR: top_creds is missing required columns: {missing_cols}")
        return None

    pivot_creds = top_creds.pivot(index='username', columns='password', values='count').fillna(0)
    return pivot_creds

def plot_credential_heatmap(pivot_creds: pd.DataFrame) -> None:
    """Generates a heatmap visualization of targeted brute-force pairs."""
    plt.figure(figsize=(12, 6))

    sns.heatmap(
        pivot_creds,
        annot=True,
        fmt=".0f",
        cmap=HEATMAP_CMAP,
        cbar_kws={'label': 'Attempt Frequency'}
    )

    plt.title('Phase V-D: Credential Harvesting Heatmap (High-Frequency Targets)', fontsize=14)
    plt.xlabel('Target Password')
    plt.ylabel('Target Username')
    plt.tight_layout()
    plt.show()

def evaluate_brute_force_intel(top_creds: pd.DataFrame) -> Dict[str, Any]:
    """Evaluates the credential dataset to determine brute-force entropy and strategy."""
    top_pair = top_creds.sort_values(by='count', ascending=False).iloc[0]
    total_unique_creds = len(top_creds)
    strategy = 'standard dictionary' if total_unique_creds > 5 else 'highly targeted'

    return {
        "top_user": top_pair['username'],
        "top_pass": top_pair['password'],
        "total_unique": total_unique_creds,
        "strategy": strategy
    }

def print_credential_summary(intel: Dict[str, Any]) -> None:
    """Prints a formatted operational intelligence summary of the brute-force attacks."""
    if not intel:
        print("\n" + "="*60)
        print("### CREDENTIAL INTELLIGENCE: NO DATA RECORDED ###")
        print("Observation: No login attempts were captured during this specific telemetry window.")
        print("="*60 + "\n")
        return

    top_user = intel.get('top_user')
    top_pass = intel.get('top_pass')

    print("\n" + "="*60)
    print("### CREDENTIAL INTELLIGENCE: BRUTE-FORCE ENTROPY ###")
    print(f"Primary Target Pair: {top_user} / {top_pass}")
    print(f"Credential Diversity: {intel.get('total_unique')} distinct pairs in top-tier attempts.")
    print(f"Operational Observation: Attackers are prioritizing '{top_user}' as the high-probability entry point.")
    print(f"Assessment: Pattern indicates a {intel.get('strategy')} brute-force strategy.")
    print("="*60 + "\n")

# ==========================================
# EXECUTION BLOCK (PHASES V-A to V-D)
# ==========================================
if __name__ == "__main__":
    # Assuming 'df', 'cmd_df', 'features_df', and 'top_creds' exist from Phases I-IV
    try:
        print("\nInitiating Phase V: Visual Analytics & Intelligence Generation...")

        # --- V-A: Attacker Classification ---
        census_df = prepare_census_data(cmd_df, features_df)
        if census_df is not None and not census_df.empty:
            plot_attacker_census(census_df)
            print_census_summary(census_df)

        # --- V-B: Top 10 Commands ---
        top_cmds_df = prepare_top_commands(cmd_df)
        if top_cmds_df is not None:
            plot_top_commands(top_cmds_df)
            primary, recon_flag, std_flag = evaluate_command_cadence(top_cmds_df)
            print_tactical_intelligence(primary, recon_flag, std_flag)

        # --- V-C: Geographic Threat Map ---
        top_countries_df = prepare_geo_data(df)
        if top_countries_df is not None:
            plot_geo_threats(top_countries_df)
            total_events = len(df)
            total_unique = df['country'].nunique()
            geo_intel = evaluate_geo_intel(top_countries_df, total_events, total_unique)
            print_geo_summary(geo_intel)

        # --- V-D: Targeted Credential Heatmap ---

        pivot_df = prepare_credential_pivot(top_creds)
        if pivot_df is not None:
            plot_credential_heatmap(pivot_df)
            cred_intel = evaluate_brute_force_intel(top_creds)
            print_credential_summary(cred_intel)
        else:
            print_credential_summary({})

    except NameError as e:
        logger.error(f"CRITICAL ERROR: Missing dataframe dependencies from earlier phases. {e}")

In [None]:
# ==========================================
# PHASE VI: TEMPORAL CADENCE & BURST ANALYSIS
# ==========================================
# Evaluating execution velocity to differentiate machine-speed automation from human latency

import logging
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from typing import Optional, Dict, Any

# Initialize logger for Phase VI
logger = logging.getLogger(__name__)

# --- GLOBAL CONSTANTS ---
MAX_DELTA_SECONDS = 300
MACHINE_SPEED_THRESHOLD = 1.0
BOT_DOMINANCE_THRESHOLD = 80.0

def prepare_cadence_data(cmd_df: pd.DataFrame, max_seconds: int = MAX_DELTA_SECONDS) -> Optional[pd.Series]:
    """
    Validates and extracts valid time deltas for temporal execution analysis.

    Args:
        cmd_df: Dataframe containing command telemetry and 'time_delta'.
        max_seconds: Threshold to filter out extreme outliers (e.g., long session pauses).

    Returns:
        pd.Series: A cleaned series of time deltas, or None if invalid.
    """
    if cmd_df.empty or 'time_delta' not in cmd_df.columns:
        logger.error("CRITICAL ERROR: Mission data missing or lacks 'time_delta' column.")
        return None

    # Drop NaN (the first command of any session has no prior delta)
    cadence_data = cmd_df['time_delta'].dropna()

    # Filter extreme outliers for a clean, focused visualization
    valid_cadence = cadence_data[cadence_data < max_seconds]

    if valid_cadence.empty:
        logger.warning("VISUALIZATION DEFERRED: Insufficient consecutive commands to map temporal velocity.")
        return None

    return valid_cadence

def plot_temporal_cadence(valid_cadence: pd.Series, threshold: float = MACHINE_SPEED_THRESHOLD) -> None:
    """
    Generates a log-scaled histogram of command execution cadences.

    Args:
        valid_cadence: Series of valid time deltas between commands.
        threshold: The visual line separating machine vs. human speed.
    """
    plt.figure(figsize=(12, 6))

    # Using a log-scaled X-axis because bot speeds and human speeds span magnitudes
    sns.histplot(
        valid_cadence,
        bins=40,
        kde=True,
        color='darkred',
        log_scale=True,
        edgecolor='black',
        alpha=0.8
    )

    # Adding a visual threshold line for the "Machine Speed Barrier"
    plt.axvline(
        x=threshold,
        color='blue',
        linestyle='--',
        linewidth=2.5,
        label=f'Human/Machine Threshold ({threshold}s)'
    )

    plt.title('Phase VI: Temporal Cadence Audit (Execution Velocity)', fontsize=14)
    plt.xlabel('Seconds Between Commands (Log 10 Scale)')
    plt.ylabel('Command Frequency (Density)')
    plt.legend()
    plt.grid(axis='y', linestyle='--', alpha=0.4)
    plt.tight_layout()
    plt.show()

def evaluate_cadence_intel(cadence_data: pd.Series, threshold: float = MACHINE_SPEED_THRESHOLD) -> Dict[str, Any]:
    """
    Calculates execution velocity metrics to profile the session behaviors.

    Args:
        cadence_data: Cleaned series of time deltas.
        threshold: The temporal cutoff for machine-speed execution.

    Returns:
        Dict containing cadence intelligence metrics.
    """
    total_deltas = len(cadence_data)
    machine_speed_hits = len(cadence_data[cadence_data < threshold])
    human_speed_hits = total_deltas - machine_speed_hits

    machine_ratio = (machine_speed_hits / total_deltas) * 100 if total_deltas > 0 else 0
    avg_cadence = cadence_data.mean()

    return {
        "total_deltas": total_deltas,
        "machine_hits": machine_speed_hits,
        "human_hits": human_speed_hits,
        "machine_ratio": machine_ratio,
        "human_ratio": 100 - machine_ratio,
        "avg_cadence": avg_cadence
    }

def print_cadence_summary(intel: Dict[str, Any], dominance_threshold: float = BOT_DOMINANCE_THRESHOLD) -> None:
    """Prints a formatted operational intelligence summary of the temporal cadence."""
    if not intel:
        return

    machine_ratio = intel.get('machine_ratio', 0)

    print("\n" + "="*60)
    print("### BEHAVIORAL INTELLIGENCE: TEMPORAL CADENCE AUDIT ###")
    print(f"Total Measured Intervals: {intel.get('total_deltas'):,} command transitions.")
    print(f"Machine-Speed Execution (<1s): {machine_ratio:.1f}% ({intel.get('machine_hits'):,} commands)")
    print(f"Human-Speed Execution (>1s): {intel.get('human_ratio', 0):.1f}% ({intel.get('human_hits'):,} commands)")
    print("-" * 60)

    print(f"Operational Observation: The average execution delay across the harvest is {intel.get('avg_cadence', 0):.2f} seconds.")

    if machine_ratio > dominance_threshold:
        print(f"Assessment: The heavy concentration of sub-second execution ({machine_ratio:.1f}%) confirms an environment dominated by highly-scripted, non-interactive botnets.")
    else:
        print("Assessment: The elevated presence of human-speed execution suggests interactive, 'hands-on-keyboard' reconnaissance is actively occurring.")
    print("="*60 + "\n")

# ------------------------------------------
# EXECUTION BLOCK
# ------------------------------------------
if __name__ == "__main__":
    # Assuming 'cmd_df' exists from Phase I
    try:
        print("\nInitiating Phase VI: Temporal Cadence & Burst Analysis...")

        # 1. Prepare Data
        cadence_series = prepare_cadence_data(cmd_df)

        if cadence_series is not None:
            # 2. Visualize
            plot_temporal_cadence(cadence_series)

            # 3. Assess & Summarize
            cadence_intel = evaluate_cadence_intel(cadence_series)
            print_cadence_summary(cadence_intel)

    except NameError:
        logger.error("CRITICAL ERROR: 'cmd_df' is not defined. Ensure Phase I executed successfully.")

In [None]:
# ==========================================
# PHASE VII: HASSH FINGERPRINTING (Technical Attribution)
# ==========================================
# Identifying the underlying software libraries and tooling used by threat actors

import logging
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from typing import Optional, Dict, Any

# Initialize logger for Phase VII
logger = logging.getLogger(__name__)

# --- GLOBAL CONSTANTS ---
TOP_N_HASSH = 10
PALETTE_HASSH = 'coolwarm'

def extract_hassh_telemetry(df: pd.DataFrame) -> Optional[pd.DataFrame]:
    """
    Isolates SSH client key exchange telemetry for fingerprinting.

    Args:
        df: Master telemetry dataframe.

    Returns:
        pd.DataFrame: A dataframe containing 'session', 'hassh', and 'src_ip', or None if invalid.
    """
    if df.empty or 'eventid' not in df.columns or 'hassh' not in df.columns:
        logger.error("CRITICAL ERROR: Master dataframe missing or lacks 'eventid'/'hassh' columns.")
        return None

    # Filter for Key Exchange events and drop empty HASSH values
    hassh_data = df[df['eventid'] == 'cowrie.client.kex'][['session', 'hassh', 'src_ip']].copy()
    hassh_data = hassh_data.dropna(subset=['hassh'])

    if hassh_data.empty:
        logger.warning("ATTRIBUTION DEFERRED: No HASSH key exchange telemetry found in this harvest.")
        return None

    return hassh_data

def prepare_top_fingerprints(hassh_data: pd.DataFrame, top_n: int = TOP_N_HASSH) -> pd.DataFrame:
    """
    Aggregates the most frequently observed SSH fingerprints.

    Args:
        hassh_data: Dataframe of extracted HASSH telemetry.
        top_n: Number of top profiles to extract.

    Returns:
        pd.DataFrame: Aggregated top fingerprints and their frequencies.
    """
    top_hassh = hassh_data['hassh'].value_counts().head(top_n).reset_index()
    top_hassh.columns = ['HASSH_Fingerprint', 'Frequency']
    return top_hassh

def plot_hassh_fingerprints(top_hassh: pd.DataFrame) -> None:
    """
    Generates a horizontal bar plot mapping the top software fingerprints.

    Args:
        top_hassh: Dataframe containing aggregated HASSH profiles.
    """
    plt.figure(figsize=(12, 6))

    # Assigning y to hue and setting legend=False for Seaborn v0.14+ compliance
    sns.barplot(
        data=top_hassh,
        x='Frequency',
        y='HASSH_Fingerprint',
        hue='HASSH_Fingerprint',
        palette=PALETTE_HASSH,
        legend=False,
        edgecolor='black',
        alpha=0.8
    )

    plt.title('Phase VII: SSH Client Fingerprints (Technical Attribution)', fontsize=14)
    plt.xlabel('Session Count')
    plt.ylabel('HASSH Fingerprint (MD5)')
    plt.grid(axis='x', linestyle='--', alpha=0.4)
    plt.tight_layout()
    plt.show()

def evaluate_hassh_intel(hassh_data: pd.DataFrame, top_hassh: pd.DataFrame) -> Dict[str, Any]:
    """
    Evaluates the tooling profiles to determine attacker dependency on specific libraries.

    Args:
        hassh_data: The full extracted HASSH telemetry.
        top_hassh: The aggregated top profiles.

    Returns:
        Dict containing the derived technical attribution metrics.
    """
    if top_hassh.empty:
        return {}

    primary_fingerprint = top_hassh.iloc[0]['HASSH_Fingerprint']
    primary_frequency = top_hassh.iloc[0]['Frequency']

    fingerprint_diversity = hassh_data['hassh'].nunique()
    total_kex_events = len(hassh_data)

    # Assess if the environment is dominated by a single automation tool
    is_highly_dependent = primary_frequency > (total_kex_events / 2)

    return {
        "primary_fingerprint": primary_fingerprint,
        "diversity": fingerprint_diversity,
        "dependency_level": 'high' if is_highly_dependent else 'moderate'
    }

def print_hassh_summary(intel: Dict[str, Any]) -> None:
    """Prints a formatted operational intelligence summary of the technical attribution."""
    if not intel:
        return

    primary_fp = intel.get('primary_fingerprint', 'N/A')
    short_fp = f"{primary_fp[:8]}..." if len(primary_fp) > 8 else primary_fp

    print("\n" + "="*60)
    print("### TECHNICAL ATTRIBUTION: TOOLING PROFILE AUDIT ###")
    print(f"Primary Client Profile: {primary_fp}")
    print(f"Fingerprint Diversity: {intel.get('diversity')} unique SSH libraries detected.")
    print(f"Operational Observation: The high frequency of '{short_fp}' suggests a standardized attack toolkit.")
    print(f"Assessment: Attribution indicates {intel.get('dependency_level')} dependency on a specific automation library.")
    print("="*60 + "\n")

# ------------------------------------------
# EXECUTION BLOCK
# ------------------------------------------
if __name__ == "__main__":
    # Assuming 'df' exists from Phase I
    try:
        print("\nInitiating Phase VII: Technical Attribution (HASSH)...")

        # 1. Extract raw HASSH telemetry
        hassh_telemetry = extract_hassh_telemetry(df)

        if hassh_telemetry is not None:
            # 2. Aggregate Top Profiles
            top_hassh_df = prepare_top_fingerprints(hassh_telemetry)

            if not top_hassh_df.empty:
                # 3. Visualize
                plot_hassh_fingerprints(top_hassh_df)

                # 4. Assess & Summarize
                hassh_intel = evaluate_hassh_intel(hassh_telemetry, top_hassh_df)
                print_hassh_summary(hassh_intel)

    except NameError:
        logger.error("CRITICAL ERROR: 'df' is not defined. Ensure Phase I executed successfully.")

In [None]:
# ==========================================
# PHASE VIII: THREAT ACTOR DOSSIER
# ==========================================
# Consolidating behavioral, technical, and geographic attribution at the Source IP level

import logging
import pandas as pd
from typing import Optional

# Initialize logger for Phase VIII
logger = logging.getLogger(__name__)

def get_primary_value(series: pd.Series, fallback: str = "Unknown") -> str:
    """Helper function to safely extract the most frequent value (mode) from a Pandas Series."""
    if series.empty:
        return fallback
    counts = series.value_counts()
    return counts.index[0] if not counts.empty else fallback

def build_base_dossier(df: pd.DataFrame) -> Optional[pd.DataFrame]:
    """
    Aggregates raw telemetry to establish the baseline Threat Actor Dossier.

    Args:
        df: Master telemetry dataframe.

    Returns:
        pd.DataFrame: Aggregated dossier mapped by 'src_ip', or None if invalid.
    """
    if df.empty or 'src_ip' not in df.columns:
        logger.error("CRITICAL ERROR: Master dataframe missing or lacks 'src_ip'. Cannot build dossier.")
        return None

    dossier = df.groupby('src_ip').agg({
        'country': 'first',
        'session': 'nunique',
        'eventid': 'count'
    }).rename(columns={'session': 'total_sessions', 'eventid': 'total_events'})

    return dossier

def integrate_behavior_labels(dossier: pd.DataFrame, cmd_df: pd.DataFrame, features_df: pd.DataFrame) -> pd.DataFrame:
    """Integrates behavioral ML classifications into the dossier."""
    if features_df.empty or 'predicted_label' not in features_df.columns:
        dossier['primary_classification'] = "N/A (Skipped)"
        return dossier

    # Identify the most frequent behavioral classification per Source IP
    temp_merge = cmd_df.merge(
        features_df[['session_id', 'predicted_label']],
        left_on='session',
        right_on='session_id'
    )

    top_class_series = temp_merge.groupby('src_ip')['predicted_label'].agg(
        lambda x: get_primary_value(x, fallback="Unknown")
    )

    # Map back to dossier
    dossier['primary_classification'] = top_class_series
    dossier['primary_classification'] = dossier['primary_classification'].fillna("Unknown")

    return dossier

def integrate_technical_fingerprints(dossier: pd.DataFrame, df: pd.DataFrame) -> pd.DataFrame:
    """Integrates HASSH SSH client fingerprints into the dossier."""
    kex_events = df[df['eventid'] == 'cowrie.client.kex']

    if kex_events.empty:
        dossier['primary_hassh'] = "N/A (No KEX Data)"
        return dossier

    top_hassh_series = kex_events.groupby('src_ip')['hassh'].agg(
        lambda x: get_primary_value(x, fallback="Unknown")
    )

    dossier['primary_hassh'] = top_hassh_series
    dossier['primary_hassh'] = dossier['primary_hassh'].fillna("Unknown")

    return dossier

def print_dossier_summary(dossier: pd.DataFrame) -> None:
    """Prints the top threat actors and the tactical intelligence summary."""
    if dossier.empty:
        return

    top_10 = dossier.sort_values(by='total_events', ascending=False).head(10)

    print("\n" + "="*80)
    print("### TOP THREAT ACTOR DOSSIER: OPERATIONAL PRIORITY LIST ###")
    print("="*80)
    # Using to_string() ensures clean formatting outside of Jupyter environments
    print(top_10.to_string())
    print("-" * 80)

    # Extracting dynamic stats for the brief
    top_actor = top_10.index[0]
    total_actors = len(dossier)
    top_country = dossier['country'].value_counts().idxmax()

    concentration_status = 'concentrated' if (total_actors < 50) else 'distributed'

    print("\n" + "="*60)
    print("### ATTRIBUTION AUDIT: HIGH-RISK SOURCE CLUSTERS ###")
    print(f"Unique Threat Entities: {total_actors:,} distinct source IPs identified.")
    print(f"Primary Vector Origin: {top_country}")
    print(f"Priority Target: {top_actor} (Highest Engagement Volume)")
    print(f"Operational Assessment: Attribution confirms a {concentration_status} global threat landscape.")
    print(f"Observation: High correlation between '{top_country}' and automated reconnaissance.")
    print("="*60 + "\n")

# ------------------------------------------
# EXECUTION BLOCK
# ------------------------------------------
if __name__ == "__main__":
    # Assuming 'df', 'cmd_df', and 'features_df' exist from prior phases
    try:
        print("\nInitiating Phase VIII: Threat Actor Dossier Aggregation...")

        # 1. Base Aggregation
        dossier_df = build_base_dossier(df)

        if dossier_df is not None:
            # 2. Behavioral Attribution
            dossier_df = integrate_behavior_labels(dossier_df, cmd_df, features_df)

            # 3. Technical Fingerprinting
            dossier_df = integrate_technical_fingerprints(dossier_df, df)

            # 4. Display & Summarize
            print_dossier_summary(dossier_df)

    except NameError as e:
        logger.error(f"CRITICAL ERROR: Missing dataframe dependencies. {e}")

In [None]:
# ==========================================
# PHASE IX: TTP MAPPING (MITRE ATT&CK Framework)
# ==========================================
# Mapping Observed Command Inputs to the MITRE ATT&CK Matrix

import logging
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from typing import Dict, Optional

# Initialize logger for Phase IX
logger = logging.getLogger(__name__)

# --- GLOBAL CONSTANTS ---
# Mapping dictionary allows for rapid updates as threat actor TTPs evolve
MITRE_MAPPING_RULES = {
    'authorized_keys': 'T1098.004 (Persistence)',
    'uname': 'T1082 (Discovery)',
    'ls /proc': 'T1082 (Discovery)',
    'chmod +x': 'T1222 (Defense Evasion)',
    'wget': 'T1105 (Ingress Tool Transfer)',
    'curl': 'T1105 (Ingress Tool Transfer)'
}
DEFAULT_TTP = 'Unknown/General Recon'
PALETTE_MITRE = 'rocket'

def map_mitre_technique(command: str, rules: Dict[str, str] = MITRE_MAPPING_RULES) -> str:
    """
    Evaluates a command string against known MITRE ATT&CK keyword signatures.

    Args:
        command: The raw shell command string.
        rules: Dictionary of keyword signatures mapped to MITRE TTPs.

    Returns:
        str: The mapped MITRE technique, or a default fallback.
    """
    if pd.isna(command):
        return DEFAULT_TTP

    cmd_lower = str(command).lower()
    for keyword, ttp in rules.items():
        if keyword in cmd_lower:
            return ttp

    return DEFAULT_TTP

def apply_ttp_mapping(cmd_df: pd.DataFrame) -> Optional[pd.DataFrame]:
    """Applies the MITRE mapping logic to the command telemetry dataframe."""
    if cmd_df.empty or 'input' not in cmd_df.columns:
        logger.error("CRITICAL ERROR: Command dataframe is empty or lacks 'input' column.")
        return None

    # Use apply with the helper function to map the TTPs
    cmd_df['mitre_ttp'] = cmd_df['input'].apply(map_mitre_technique)
    return cmd_df

def plot_mitre_distribution(cmd_df: pd.DataFrame) -> None:
    """Generates a horizontal count plot of the observed MITRE ATT&CK techniques."""
    plt.figure(figsize=(12, 6))

    # Assigning y to hue and setting legend=False resolves Seaborn deprecation warnings
    sns.countplot(
        data=cmd_df,
        y='mitre_ttp',
        hue='mitre_ttp',
        palette=PALETTE_MITRE,
        order=cmd_df['mitre_ttp'].value_counts().index,
        legend=False,
        edgecolor='black',
        alpha=0.9
    )

    plt.title('Phase IX: Observed MITRE ATT&CK Techniques (Tactical Distribution)', fontsize=14)
    plt.xlabel('Detection Frequency')
    plt.ylabel('MITRE Technique')
    plt.grid(axis='x', linestyle='--', alpha=0.6)
    plt.tight_layout()
    plt.show()

def print_mitre_summary(cmd_df: pd.DataFrame) -> None:
    """Prints a formatted operational intelligence summary of the tactical mapping."""
    if cmd_df.empty or 'mitre_ttp' not in cmd_df.columns:
        return

    ttp_counts = cmd_df['mitre_ttp'].value_counts()
    primary_technique = ttp_counts.idxmax()
    total_unique_ttps = cmd_df['mitre_ttp'].nunique()

    print("\n" + "="*80)
    print("### TACTICAL INTELLIGENCE: MITRE ATT&CK MAPPING ###")
    print(f"Primary Technique Detected: {primary_technique}")
    print(f"Operational Observation: Commands mapped successfully to {total_unique_ttps} distinct TTP categories.")
    print(f"Assessment: Attacker behavior focuses heavily on the '{primary_technique}' stage.")
    print("="*80 + "\n")

# ------------------------------------------
# EXECUTION BLOCK
# ------------------------------------------
if __name__ == "__main__":
    # Assuming 'cmd_df' exists from Phase I
    try:
        print("\nInitiating Phase IX: MITRE ATT&CK TTP Mapping...")

        # 1. Apply Mapping
        cmd_df = apply_ttp_mapping(cmd_df)

        if cmd_df is not None:
            # 2. Visualize
            plot_mitre_distribution(cmd_df)

            # 3. Operational Summary
            print_mitre_summary(cmd_df)

    except NameError:
        logger.error("CRITICAL ERROR: 'cmd_df' is not defined. Ensure Phase I executed successfully.")

In [None]:
# ==========================================
# PHASE X: ATTACK VELOCITY (Temporal Heatmap)
# ==========================================
# Analyzing the temporal distribution of engagement to identify peak threat windows

import logging
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from typing import Optional, Dict, Any

# Initialize logger for Phase X
logger = logging.getLogger(__name__)

# --- GLOBAL CONSTANTS ---
DAYS_OF_WEEK = ['Monday', 'Tuesday', 'Wednesday', 'Thursday', 'Friday', 'Saturday', 'Sunday']
HEATMAP_CMAP = 'YlGnBu'

def prepare_temporal_data(df: pd.DataFrame) -> Optional[pd.DataFrame]:
    """
    Extracts temporal features and pivots the dataframe for heatmap visualization.

    Args:
        df: Master telemetry dataframe containing 'timestamp'.

    Returns:
        pd.DataFrame: A pivoted matrix mapping day of week against hour of day.
    """
    if df.empty or 'timestamp' not in df.columns:
        logger.error("CRITICAL ERROR: Master dataframe missing or lacks 'timestamp' column.")
        return None

    # Isolate temporal features to prevent unintended mutation of the master dataframe
    temp_df = df[['timestamp']].copy()

    # Ensure timestamp is safely typed as datetime
    if not pd.api.types.is_datetime64_any_dtype(temp_df['timestamp']):
        temp_df['timestamp'] = pd.to_datetime(temp_df['timestamp'], errors='coerce')
        temp_df = temp_df.dropna(subset=['timestamp'])

    if temp_df.empty:
        logger.warning("VISUALIZATION DEFERRED: No valid datetime data available.")
        return None

    temp_df['hour'] = temp_df['timestamp'].dt.hour
    temp_df['day_name'] = temp_df['timestamp'].dt.day_name()

    # Pivot the data into a frequency matrix
    pivot_temp = temp_df.groupby(['day_name', 'hour']).size().unstack(fill_value=0)

    # Reindex rows to ensure days are ordered logically from Monday to Sunday
    pivot_temp = pivot_temp.reindex([d for d in DAYS_OF_WEEK if d in pivot_temp.index])

    # Reindex columns to ensure all 24 hours are present on the X-axis
    pivot_temp = pivot_temp.reindex(columns=range(24), fill_value=0)

    return pivot_temp

def plot_attack_velocity(pivot_temp: pd.DataFrame) -> None:
    """Generates a temporal heatmap of attack velocity."""
    plt.figure(figsize=(15, 6))

    sns.heatmap(
        pivot_temp,
        cmap=HEATMAP_CMAP,
        cbar_kws={'label': 'Event Frequency'}
    )

    plt.title('Phase X: Attack Velocity (UTC Heatmap)', fontsize=14)
    plt.xlabel('Hour of Day (UTC)')
    plt.ylabel('Day of Week')
    plt.tight_layout()
    plt.show()

def evaluate_temporal_intel(pivot_temp: pd.DataFrame, total_events: int) -> Dict[str, Any]:
    """
    Evaluates the temporal distribution to generate operational intelligence.

    Args:
        pivot_temp: The pivoted temporal frequency matrix.
        total_events: Total event count from the master dataset.

    Returns:
        Dict containing the derived temporal intelligence metrics.
    """
    if pivot_temp.empty:
        return {}

    # Safely extract peak timings from the matrix
    peak_day = pivot_temp.sum(axis=1).idxmax()
    peak_hour = pivot_temp.sum(axis=0).idxmax()

    # Heuristic: Is the max single-hour burst significantly larger than the baseline?
    is_concentrated = pivot_temp.max().max() > (pivot_temp.mean().mean() * 2)

    return {
        "peak_day": peak_day,
        "peak_hour": peak_hour,
        "total_events": total_events,
        "is_concentrated": is_concentrated
    }

def print_temporal_summary(intel: Dict[str, Any]) -> None:
    """Prints a formatted operational intelligence summary of the attack velocity."""
    if not intel:
        return

    cadence_status = 'concentrated' if intel.get('is_concentrated') else 'distributed'

    print("\n" + "="*80)
    print("### TEMPORAL INTELLIGENCE: ATTACK VELOCITY AUDIT ###")
    print(f"Peak Activity Window: {intel.get('peak_day')}s at {intel.get('peak_hour'):02d}:00 UTC")
    print(f"Total Temporal Events: {intel.get('total_events'):,} engagement points analyzed.")
    print(f"Operational Observation: Data indicates a {cadence_status} attack cadence.")
    print("Assessment: Velocity patterns are consistent with global automated task scheduling (Cron/Scripts).")
    print("="*80 + "\n")

# ------------------------------------------
# EXECUTION BLOCK
# ------------------------------------------
if __name__ == "__main__":
    # Assuming 'df' exists from Phase I
    try:
        print("\nInitiating Phase X: Attack Velocity Analysis...")

        # 1. Prepare Data
        pivot_df = prepare_temporal_data(df)

        if pivot_df is not None:
            # 2. Visualize
            plot_attack_velocity(pivot_df)

            # 3. Assess & Summarize
            total_events_count = len(df)
            temporal_intel = evaluate_temporal_intel(pivot_df, total_events_count)
            print_temporal_summary(temporal_intel)

    except NameError:
        logger.error("CRITICAL ERROR: 'df' is not defined. Ensure Phase I executed successfully.")

In [None]:
# ==========================================
# PHASE XI: FORENSIC TIMELINE (AI Agent Evidence)
# ==========================================
# Identification of high-entropy interactive sessions for behavioral auditing

import logging
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from typing import Tuple, Optional

# Initialize logger for Phase XI
logger = logging.getLogger(__name__)

# --- GLOBAL CONSTANTS ---
AI_AGENT_LABEL = "AI/LLM Agent (Burst-Think)"
MAX_CMD_DISPLAY_LEN = 15

def extract_forensic_session(features_df: pd.DataFrame, cmd_df: pd.DataFrame) -> Tuple[Optional[pd.DataFrame], Optional[str]]:
    """
    Isolates the highest-confidence AI agent session for forensic auditing.

    Args:
        features_df: Dataframe containing session behavioral labels.
        cmd_df: Master command telemetry dataframe.

    Returns:
        Tuple containing the isolated session dataframe and the target session ID.
    """
    if features_df.empty or 'predicted_label' not in features_df.columns:
        logger.error("FORENSIC AUDIT DEFERRED: 'predicted_label' column missing. Phase III likely deferred.")
        return None, None

    ai_candidates = features_df[features_df['predicted_label'] == AI_AGENT_LABEL]

    if ai_candidates.empty:
        # Returns empty DF to signify "Analysis ran, but no AI found"
        return pd.DataFrame(), None

    # Select the session with the highest burst ratio for forensic visualization
    target_session = ai_candidates.sort_values('burst_ratio', ascending=False).iloc[0]['session_id']
    session_data = cmd_df[cmd_df['session'] == target_session].copy()

    if session_data.empty:
        return pd.DataFrame(), target_session

    # Ensure timestamp is datetime and calculate temporal offset for timeline plotting
    session_data['timestamp'] = pd.to_datetime(session_data['timestamp'])
    start_time = session_data['timestamp'].min()
    session_data['seconds_offset'] = (session_data['timestamp'] - start_time).dt.total_seconds()

    return session_data, target_session

def plot_forensic_timeline(session_data: pd.DataFrame, target_session: str) -> None:
    """Generates a 1D temporal scatter plot to visualize command burst intervals."""
    plt.figure(figsize=(14, 5))

    # 1D Scatter plot for temporal sequence
    sns.scatterplot(
        data=session_data,
        x='seconds_offset',
        y=[1] * len(session_data),
        s=150,
        color='red',
        marker='|'
    )

    # Add command text annotations safely
    for _, row in session_data.iterrows():
        cmd_text = str(row.get('input', ''))
        display_text = f"{cmd_text[:MAX_CMD_DISPLAY_LEN]}..." if len(cmd_text) > MAX_CMD_DISPLAY_LEN else cmd_text
        plt.text(row['seconds_offset'], 1.02, display_text, rotation=45, fontsize=8)

    plt.title(f'Phase XI: Forensic Timeline - AI Agentic Pattern (Session: {target_session})', fontsize=14)
    plt.ylim(0.98, 1.1)
    plt.yticks([])  # Hide Y axis as it's a 1-dimensional timeline
    plt.grid(axis='x', linestyle='--', alpha=0.5)
    plt.tight_layout()
    plt.show()

def print_forensic_summary(session_data: pd.DataFrame, target_session: Optional[str], total_analyzed: int) -> None:
    """Prints a formatted operational intelligence summary of the forensic audit."""
    if target_session is None or session_data.empty:
        print("\n" + "="*80)
        print("### FORENSIC AUDIT: NO AI AGENTIC PATTERNS DETECTED ###")
        print(f"Sample Size: {total_analyzed:,} unique sessions analyzed.")
        print("Forensic Observation: All sessions exhibited linear timing or low-complexity scripted behavior.")
        print("Assessment: Current harvest consists exclusively of standard automated 'background radiation' botnets.")
        print("Conclusion: No evidence of human-agentic or LLM-driven decision making in this data slice.")
        print("="*80 + "\n")
        return

    max_offset = session_data['seconds_offset'].max()
    cmd_count = len(session_data)

    print("\n" + "="*80)
    print("### FORENSIC AUDIT: HIGH-CONFIDENCE AI PATTERN IDENTIFIED ###")
    print(f"Session ID: {target_session}")
    print("Primary Indicator: Non-Linear Temporal Entropy (Burst-Think)")
    print(f"Forensic Observation: The actor executed {cmd_count} commands in {max_offset:.2f} seconds.")
    print("Assessment: Cadence is inconsistent with human biological processing latency.")
    print("="*80 + "\n")

    # Using to_string() ensures clean formatting across all environments
    print(session_data[['seconds_offset', 'time_delta', 'input']].to_string(index=False))
    print("\n")

# ------------------------------------------
# EXECUTION BLOCK
# ------------------------------------------
if __name__ == "__main__":
    # Assuming 'features_df' and 'cmd_df' exist from prior phases
    try:
        print("\nInitiating Phase XI: Forensic AI Agent Auditing...")

        # 1. Isolate target session
        session_df, target_id = extract_forensic_session(features_df, cmd_df)

        if session_df is not None:
            total_sessions = len(features_df)

            # 2. Visualize & Summarize
            if not session_df.empty and target_id:
                plot_forensic_timeline(session_df, target_id)

            print_forensic_summary(session_df, target_id, total_sessions)

    except NameError:
        logger.error("CRITICAL ERROR: Dataframe dependencies missing. Ensure prior phases executed successfully.")

In [None]:
# ==========================================
# PHASE XII: HARVEST SUMMARY REPORT
# ==========================================
# Final executive readout consolidating the most significant actor profile

import logging
import pandas as pd
from typing import Optional

# Initialize logger for Phase XII
logger = logging.getLogger(__name__)

def generate_harvest_report(dossier: pd.DataFrame) -> None:
    """
    Generates a high-level executive summary of the most significant threat actor detected.

    Args:
        dossier: Aggregated threat actor dossier from Phase VIII.
    """
    if dossier.empty:
        print("\n" + "="*60)
        print("### PROJECT HARVEST REPORT: NO DATA FOUND ###")
        print("Action Required: Verify data ingestion and Phase VIII dossier construction.")
        print("="*60 + "\n")
        return

    # Identify the Primary Threat Actor (by event volume)
    top_actor_ip = dossier.sort_values(by='total_events', ascending=False).index[0]
    actor_stats = dossier.loc[top_actor_ip]

    # Extraction with defensive defaults to prevent KeyErrors
    origin = actor_stats.get('country', 'Unknown')
    classification = actor_stats.get('primary_classification', 'Unclassified')
    hassh = actor_stats.get('primary_hassh', 'Unknown')
    total_events = actor_stats.get('total_events', 0)
    total_sessions = actor_stats.get('total_sessions', 0)

    # Print Executive Summary
    print("\n" + "="*60)
    print("### PROJECT HARVEST REPORT: EXECUTIVE SUMMARY ###")
    print("="*60)
    print(f"Primary Threat Actor:   {top_actor_ip}")
    print(f"Origin Node:            {origin}")
    print(f"Behavioral Profile:     {classification}")
    print(f"Technical Signature:    {hassh}")
    print(f"Operational Volume:     {total_events:,} events across {total_sessions:,} sessions")
    print("-" * 60)

    # Final Strategic Assessment - Branching logic based on ML classification
    print("### STRATEGIC RECOMMENDATION ###")
    if "Human" in str(classification):
        print("ALERT: Interactive behavior detected. Immediate review of session commands is advised.")
        print("ACTION: Escalate to Level 3 Incident Response for manual forensic deep-dive.")
    elif "AI/LLM" in str(classification):
        print("ALERT: Advanced agentic behavior identified. Pattern suggests LLM-driven reconnaissance.")
        print("ACTION: Update firewall egress rules and rotate credentials targeted in Phase V-D.")
    else:
        print("STATUS: Background noise/automated probes. No immediate manual action required.")
        print("ACTION: Monitor for volume spikes and feed indicators (HASSH/IP) into blocklists.")
    print("="*60 + "\n")

# ------------------------------------------
# EXECUTION BLOCK
# ------------------------------------------
if __name__ == "__main__":
    # Assuming 'dossier_df' exists from Phase VIII
    try:
        print("Generating Final Phase XII Harvest Report...")
        generate_harvest_report(dossier_df)
    except NameError:
        logger.error("CRITICAL ERROR: 'dossier_df' is not defined. Ensure Phase VIII executed successfully.")

In [None]:
# ==========================================
# PHASE XIII: STRUCTURED INTELLIGENCE ASSESSMENT
# ==========================================
# Quantifying operational impact and triage efficiency gains

import logging
import pandas as pd
from typing import Dict, Any, Optional

# Initialize logger for Phase XIII
logger = logging.getLogger(__name__)

# --- GLOBAL CONSTANTS ---
LATERAL_INDICATORS = ['ssh', 'scp', 'ftp', 'telnet', 'nc', 'curl', 'wget']
CONFIDENCE_THRESHOLD = 50

def calculate_operational_efficiency(df: pd.DataFrame, features_df: pd.DataFrame) -> Dict[str, Any]:
    """
    Calculates metrics related to data reduction and triage efficiency.
    """
    total_sessions = df['session'].nunique() if not df.empty else 0
    anomalous_sessions = 0
    triage_reduction = 0.0
    composition = pd.Series({"Classification Deferred": 100.0})

    if 'anomaly_score' in features_df.columns:
        anomalous_sessions = len(features_df[features_df['anomaly_score'] == -1])
        triage_reduction = ((total_sessions - anomalous_sessions) / total_sessions) * 100 if total_sessions > 0 else 0

    if 'predicted_label' in features_df.columns:
        composition = features_df['predicted_label'].value_counts(normalize=True) * 100

    return {
        "total_sessions": total_sessions,
        "anomalous_sessions": anomalous_sessions,
        "triage_reduction": triage_reduction,
        "composition": composition
    }

def perform_risk_assessment(cmd_df: pd.DataFrame) -> Dict[str, Any]:
    """
    Scans for lateral movement indicators and returns risk-level metrics.
    """
    if cmd_df.empty or 'input' not in cmd_df.columns:
        return {"attempts": 0, "indicators": []}

    # Case-insensitive search for egress toolsets
    pattern = '|'.join(LATERAL_INDICATORS)
    lateral_attempts = cmd_df[cmd_df['input'].str.contains(pattern, na=False, case=False)]

    unique_tools = []
    if not lateral_attempts.empty:
        # Extract the first word of the command as the 'tool'
        unique_tools = lateral_attempts['input'].str.split().str[0].str.lower().unique().tolist()

    return {
        "attempts": len(lateral_attempts),
        "indicators": unique_tools
    }

def print_final_intelligence_brief(efficiency: Dict[str, Any], risk: Dict[str, Any], sample_size: int) -> None:
    """Prints a professional, structured final intelligence assessment."""

    # Confidence Rating Logic based on sample size
    confidence = "HIGH" if sample_size > CONFIDENCE_THRESHOLD else "MODERATE"

    print("\n" + "="*60)
    print(f"PROJECT STATUS: SSH BEHAVIORAL STUDY (LOST PIGLET 1)")
    print(f"CONFIDENCE RATING: {confidence}")
    print("="*60)

    print("\n### TACTICAL TELEMETRY: BEHAVIORAL COMPOSITION ###")
    for label, pct in efficiency['composition'].items():
        print(f"  * {pct:04.1f}% | {label}")

    print("\n### OPERATIONAL IMPACT & ROI: TRIAGE EFFICIENCY ###")
    print(f"  * Automated Triage: Filtered {efficiency['total_sessions']:,} sessions down to {efficiency['anomalous_sessions']:,} high-value anomalies.")
    print(f"  * Analyst Workload Reduction: {efficiency['triage_reduction']:.1f}% efficiency gain.")

    print("\n### RISK ASSESSMENT: EGRESS & LATERAL MOVEMENT ###")
    if risk['attempts'] == 0:
        print("  * STATUS: [SECURE] - No unauthorized egress or tool-transfer attempts detected.")
        print("  * Assessment: Activity remains confined to initial reconnaissance/sandboxing.")
    else:
        print(f"  * STATUS: [CRITICAL] - {risk['attempts']} egress attempts identified.")
        primary_tools = ", ".join(risk['indicators'][:3])
        print(f"  * Primary Indicators: {primary_tools}...")
        print("  * Assessment: Evidence suggests active attempts at payload ingress or lateral pivoting.")

    print("\n" + "="*60)
    print("STATUS: REPORT COMPLETE")
    print("="*60 + "\n")

# ------------------------------------------
# EXECUTION BLOCK
# ------------------------------------------
if __name__ == "__main__":
    try:
        print("Initiating Phase XIII: Structured Intelligence Assessment...")

        # 1. Efficiency Analytics
        eff_metrics = calculate_operational_efficiency(df, features_df)

        # 2. Risk Analytics
        risk_metrics = perform_risk_assessment(cmd_df)

        # 3. Output Intelligence Brief
        print_final_intelligence_brief(eff_metrics, risk_metrics, len(features_df))

    except NameError as e:
        logger.error(f"CRITICAL ERROR: Assessment aborted due to missing dataframes. {e}")

In [None]:
# ==========================================
# PHASE XIV: STRATEGIC CAMPAIGN CORRELATION
# ==========================================
# Identifying infrastructure reuse by correlating SSH fingerprints (HASSH)
# with tactical command sequences (The Playbook)

import logging
import pandas as pd
from typing import Optional, Dict, Any

# Initialize logger for Phase XIV
logger = logging.getLogger(__name__)

def build_campaign_master(df: pd.DataFrame, cmd_df: pd.DataFrame, features_df: pd.DataFrame) -> Optional[pd.DataFrame]:
    """
    Correlates client fingerprints with command sequences to identify shared infrastructure.
    """
    if df.empty or 'hassh' not in df.columns:
        logger.error("CAMPAIGN AUDIT DEFERRED: Missing HASSH telemetry.")
        return None

    # 1. Build Client Fingerprint Map (HASSH)
    client_kex = df[df['eventid'] == 'cowrie.client.kex'][['session', 'hassh', 'src_ip']].drop_duplicates()

    # 2. Build Command Sequence Fingerprint (The Playbook)
    if not cmd_df.empty:
        # Group sessions by exact chronological sequence of commands
        seq_map = cmd_df.groupby('session')['input'].apply(
            lambda x: " > ".join(x.astype(str).str.strip())
        ).reset_index()
        seq_map.columns = ['session', 'cmd_sequence']
    else:
        seq_map = pd.DataFrame(columns=['session', 'cmd_sequence'])

    # 3. Unified Campaign Join
    # Standardize to string to ensure reliable merging across types
    for frame in [client_kex, seq_map]:
        frame['session'] = frame['session'].astype(str)

    campaign_master = client_kex.merge(seq_map, on='session', how='inner')

    # Join with ML labels from Phase III if available
    if not features_df.empty:
        f_df = features_df.copy()
        f_df['session_id'] = f_df['session_id'].astype(str)
        campaign_master = campaign_master.merge(
            f_df[['session_id', 'predicted_label']],
            left_on='session', right_on='session_id', how='left'
        )

    return campaign_master

def analyze_campaign_clusters(campaign_master: pd.DataFrame) -> pd.DataFrame:
    """
    Aggregates data into clusters where multiple IPs use the same Fingerprint + Playbook.
    """
    if campaign_master.empty:
        return pd.DataFrame()

    # A "Campaign" is identified by a unique HASSH + Command Sequence pair
    clusters = campaign_master.groupby(['hassh', 'cmd_sequence']).agg({
        'src_ip': 'nunique',
        'session': 'count'
    }).reset_index().rename(columns={'src_ip': 'unique_ips', 'session': 'total_hits'})

    # Filter for Multi-IP Infrastructural Reuse (Botnet Swarms)
    return clusters[clusters['unique_ips'] > 1].sort_values('unique_ips', ascending=False)

def print_campaign_summary(top_campaigns: pd.DataFrame) -> None:
    """Prints a formatted strategic assessment of botnet infrastructure reuse."""
    print("\n" + "="*60)
    print("### STRATEGIC CAMPAIGN ATTRIBUTION: INFRASTRUCTURE REUSE ###")
    print("="*60)

    if not top_campaigns.empty:
        # Display the top clusters
        print(top_campaigns.head(5).to_string(index=False))

        total_campaigns = len(top_campaigns)
        max_reach = top_campaigns['unique_ips'].max()
        primary_hassh = top_campaigns.iloc[0]['hassh']

        print(f"\n" + "="*60)
        print(f"### STRATEGIC FINDING: COORDINATED BOTNET ACTIVITY ###")
        print(f"Campaigns Detected: {total_campaigns} distinct infrastructure clusters.")
        print(f"Max Campaign Reach: {max_reach} unique IPs sharing a single playbook.")
        print(f"Infrastructural Concentration: High (Evidence of centralized C2).")
        print(f"Assessment: The '{primary_hassh[:10]}...' cluster is a high-priority tracking target.")
        print("="*60 + "\n")
    else:
        print("\n" + "="*60)
        print("### STRATEGIC FINDING: NO CROSS-IP REUSE DETECTED ###")
        print("Observation: All sessions used unique sequences or distinct fingerprints.")
        print("Assessment: Threat landscape appears primarily opportunistic.")
        print("Conclusion: No evidence of large-scale 'Swarm' campaigns identified.")
        print("="*60 + "\n")

# ------------------------------------------
# EXECUTION BLOCK
# ------------------------------------------
if __name__ == "__main__":
    try:
        print("Initiating Phase XIV: Strategic Campaign Correlation...")

        # 1. Correlate Infrastructure
        master_campaign_df = build_campaign_master(df, cmd_df, features_df)

        # 2. Cluster & Analyze
        if master_campaign_df is not None:
            top_campaign_clusters = analyze_campaign_clusters(master_campaign_df)

            # 3. Strategic Report
            print_campaign_summary(top_campaign_clusters)

    except NameError as e:
        logger.error(f"CRITICAL ERROR: Campaign correlation aborted. Dependency error: {e}")