In [None]:
"""
Initialization.
Import libraries, and define global variables.
"""

%load_ext autoreload
%autoreload 2

# Imports
import os
import glob
from natsort import natsorted


def must_process_dir(dir: str) -> bool:
    """
    Check if given directory must be processed,
    i.e. if it contains traces to analyze.

    Args:
        dir (str): path to the directory to check.
    Returns:
        bool: True if the directory must be processed, False otherwise.
    """
    if dir.is_dir():
        files = glob.glob(os.path.join(dir.path, "traces", "*.pcap"))
        return len(files) > 0
    else:
        return False


##### CONFIG #####

## Paths
BASE_DIR = os.getcwd()
## Device under test
# Update with your device's details
DEVICE_NAME = "DEVICE_NAME"
DEVICE_MAC = "00:00:00:00:00:00"
DEVICE_IP = "192.168.1.3"
DEVICE_INTERFACE = "interface"
DEVICE_DIR = os.path.join(BASE_DIR, "devices", DEVICE_NAME)
## Event under test
EVENT = "event"  # Update with the event under test
EVENT_DIR = os.path.join(DEVICE_DIR, EVENT)
# Policies
policy_dirs = natsorted([f.path for f in os.scandir(EVENT_DIR) if must_process_dir(f)])


In [None]:
"""
For each PCAP file, extract packets.
"""

import os
import glob
import json

from signature_extraction.pkt_extraction import pcap_to_pkts, pkts_to_csv


def get_policy_name_from_path(policy_dir):
    basename = os.path.basename(policy_dir)
    return basename.partition("_")[2]


# Read initial DNS table
path_dns_table = os.path.join(EVENT_DIR, "0_root", "dns_table.json")
with open(path_dns_table, "r") as f:
    dns_table = json.load(f)

pcaps_per_policy = {}
pkts_per_policy = {}
for policy_dir in policy_dirs:
    policy_name = os.path.basename(policy_dir)
    print(f"Policy: {policy_name}")
    print()
    policy_pkts = []
    pkts_per_policy[policy_name] = policy_pkts

    # Path(s) to PCAP file(s)
    traces_dir = os.path.join(policy_dir, "traces")
    pcaps = glob.glob(f"{traces_dir}/*.pcap")
    pcaps_per_policy[policy_name] = pcaps

    # Extract packets
    pkts_matrix = []
    for i, pcap in enumerate(pcaps):
        # Read packets
        pkts = pcap_to_pkts(pcap, dns_table)
        policy_pkts.append(pkts)
        # Save packets to CSV
        csv_file_path = pcap.replace(".pcap", ".csv")
        pkts_to_csv(pkts, csv_file_path)
        # Print packets
        print(f"PCAP #{i+1}: {len(pkts)} packets ({pcap})")
        # print("\n".join([str(pkt) for pkt in pkts]))
        print()
    
    print()


In [None]:
"""
For each PCAP, group packets per bidirectional flow,
i.e. packets having the same:
    - IP addresses
    - Ports
    - Transport protocol

As flows are *bidirectional*, packets in both directions corresponding to the same data exchange are grouped. 
"""

from signature_extraction.flow_grouping import group_pkts_per_flow


# Group packets per flow
patterns_per_policy = {}
for policy_name, pkts_matrix in pkts_per_policy.items():
    print(f"Policy: {policy_name}")
    print()
    patterns = []
    patterns_per_policy[policy_name] = patterns
    for i, pkts in enumerate(pkts_matrix):
        # Group packets per flow
        pattern = group_pkts_per_flow(pkts)
        patterns.append(pattern)
        # Save flows as CSV
        policy_pcap = pcaps_per_policy[policy_name][i]
        pattern_csv_path = policy_pcap.replace(".pcap", "_flows.csv")
        pattern.to_csv(pattern_csv_path)
        # Display pattern
        print(f"Pattern {i+1}: ({policy_pcap})")
        print(pattern)
        print()


In [None]:
"""
Sort network patterns per length.
"""

for policy_name, patterns in patterns_per_policy.items():
    print(f"Policy: {policy_name}")
    print()
    sorted_patterns = sorted(patterns, key=len)
    for i, pattern in enumerate(sorted_patterns):
        policy_pcap = pcaps_per_policy[policy_name][i]
        print(f"Pattern {i+1}: ({policy_pcap})")
        print(pattern)
        print()


In [None]:
"""
Extract event signature from the list of patterns.
"""

from signature_extraction.event_signature_extraction import patterns_to_signature


signature_per_policy = {}
for policy_name, patterns in patterns_per_policy.items():
    print(f"Policy: {policy_name}")

    # Skip policy if no pattern
    if len(patterns) == 0:
        continue

    # Extract event signature
    signature = patterns_to_signature(patterns)
    signature_per_policy[policy_name] = signature

    # Save signature as CSV file
    policy_dir = os.path.join(EVENT_DIR, policy_name)
    output_csv_path = os.path.join(policy_dir, "signature.csv")
    signature.to_csv(output_csv_path)

    # Display signature
    print(signature)
    print()


In [None]:
"""
Extract a profile-compliant policy from the event signature.
"""

import json
from utils.policy import contains_policy


def policy_exists(policy: dict, all_policies: dict) -> bool:
    """
    Check if a policy is already contained in the list of policies.

    Args:
        policy (dict): Policy to check.
        all_policies (dict): List of all policies, per base policy.
    Returns:
        bool: True if the policy is already contained, False otherwise.
    """
    for next_policies in all_policies.values():
        if contains_policy(next_policies.values(), policy):
            return True
    return False


next_policies_per_policy = {}
for policy_name, signature in signature_per_policy.items():
    print(f"Base policy: {policy_name}")
    print()

    policies = {}
    next_policies_per_policy[policy_name] = policies
    for flow in signature.get_flows():
        # Extract policy
        policy = flow.extract_policy(DEVICE_IP)

        # Check if policy is already contained in the list of policies
        if policy_exists(policy, next_policies_per_policy):
            continue

        # Policy does not exist yet,
        # add it to the list of policies
        policy_name = flow.get_id()
        policies[policy_name] = policy

        # Display policy
        print(f"Policy {policy_name}:")
        print(json.dumps(policy, indent=2))
        print()
    
    print()
