In [None]:
import os
import zstandard
from scapy.all import PcapReader
import numpy as np
import concurrent.futures


def decompress_zst_file(input_file, output_file):
    """Decompress a .zst file to its original format."""
    with open(input_file, 'rb') as compressed:
        dctx = zstandard.ZstdDecompressor()
        with open(output_file, 'wb') as decompressed:
            dctx.copy_stream(compressed, decompressed)

def extract_packet_headers(pcap_file):
    """Extract raw packet headers from a pcapng file."""
    headers = []
    with PcapReader(pcap_file) as reader:
        for packet in reader:
            headers.append(bytes(packet))
    return b''.join(headers)  

def segment_into_windows(byte_stream, window_size=1000):
    """Split raw byte stream into fixed-size windows, padded if necessary."""
    num_windows = (len(byte_stream) + window_size - 1) // window_size  # Ceiling division
    padded_stream = byte_stream.ljust(num_windows * window_size, b'\x00')  # Pad with zeros
    return [padded_stream[i:i + window_size] for i in range(0, len(padded_stream), window_size)]

# Step 4: Convert to 32x32 matrices
def windows_to_2d_matrices(windows, matrix_size=(32, 32)):
    """Convert each window of bytes into a 2D matrix."""
    byte_per_matrix = matrix_size[0] * matrix_size[1]
    matrices = []
    for window in windows:
        padded_window = window.ljust(byte_per_matrix, b'\x00')  # Ensure full size
        matrix = np.array(list(padded_window[:byte_per_matrix])).reshape(matrix_size)
        matrices.append(matrix)
    return np.array(matrices)

def process_single_file(zst_file, directory):
    """Process a single .zst file: decompress, extract headers, segment, and save matrices."""
    try:
        input_file = os.path.join(directory, zst_file)
        decompressed_file = input_file.replace('.zst', '')
        output_matrices_file = decompressed_file + "_matrices.npy"

        if os.path.exists(output_matrices_file):
            print(f"Skipping {zst_file}: Matrices already saved at {output_matrices_file}.")
            return
        if os.path.exists(decompressed_file):
            print(f"Skipping decompression for {zst_file}: {decompressed_file} already exists.")

        if not os.path.exists(decompressed_file):
            decompress_zst_file(input_file, decompressed_file)
            print(f"Decompressed {zst_file} to {decompressed_file}.")

        raw_headers = extract_packet_headers(decompressed_file)
        print(f"Extracted{len(raw_headers)} bytes of headers from {decompressed_file}.")

        # Segment into 1000-byte windows
        windows = segment_into_windows(raw_headers, window_size=1000)
        print(f"Segmented into {len(windows)} windows.")

        matrices = windows_to_2d_matrices(windows)
        print(f"Converted into {matrices.shape[0]} matrices of size {matrices.shape[1]}x{matrices.shape[2]}.")

        np.save(output_matrices_file, matrices)
        print(f"Saved matrices to {output_matrices_file}.\n")

    except Exception as e:
        print(f"Error processing {zst_file}: {e}")

def process_all_zst_files_parallel(directory, num_workers=4):
    """Process all .zst files in the directory using multiple workers."""
    zst_files = [f for f in os.listdir(directory) if f.endswith('.zst')]
    print(f"Found {len(zst_files)} .zst files to process in {directory}.")

    with concurrent.futures.ThreadPoolExecutor(max_workers=num_workers) as executor:
        futures = [executor.submit(process_single_file, zst_file, directory) for zst_file in zst_files]
        for future in concurrent.futures.as_completed(futures):
            try:
                future.result()  
            except Exception as e:
                print(f"Error during processing: {e}")

# Run the pipeline
process_all_zst_files_parallel(directory="/tank/swlarsen/2024_10_18/fcc-ht2/run1", num_workers=32)  


Found 970 .zst files to process in /tank/swlarsen/2024_10_18/fcc-ht2/run1.
Decompressed hightouch_00000008_1729174946641636339_fnalfcc-ht2_01_00000.pcapng.zst to /tank/swlarsen/2024_10_18/fcc-ht2/run1/hightouch_00000008_1729174946641636339_fnalfcc-ht2_01_00000.pcapng.
Extracted 4084 bytes of headers from /tank/swlarsen/2024_10_18/fcc-ht2/run1/hightouch_00000008_1729174946641636339_fnalfcc-ht2_01_00000.pcapng.
Segmented into 5 windows.
Converted into 5 matrices of size 32x32.
Saved matrices to /tank/swlarsen/2024_10_18/fcc-ht2/run1/hightouch_00000008_1729174946641636339_fnalfcc-ht2_01_00000.pcapng_matrices.npy.

Decompressed hightouch_00000008_1729288619310778631_fnalfcc-ht2_04_00048.pcapng.zst to /tank/swlarsen/2024_10_18/fcc-ht2/run1/hightouch_00000008_1729288619310778631_fnalfcc-ht2_04_00048.pcapng.
Decompressed hightouch_00000008_1729296548320302455_fnalfcc-ht2_05_00148.pcapng.zst to /tank/swlarsen/2024_10_18/fcc-ht2/run1/hightouch_00000008_1729296548320302455_fnalfcc-ht2_05_00148.p