## PE file Preprocessing Pipeline

---
---

STAGE-1
---

---

In [None]:
# Install required packages
!pip install lief pillow numpy pandas scikit-learn matplotlib pennylane

# Import necessary libraries
import lief
import numpy as np
import pandas as pd
from PIL import Image
from sklearn.decomposition import PCA
from sklearn.preprocessing import MinMaxScaler, StandardScaler
from sklearn.model_selection import train_test_split
import matplotlib.pyplot as plt
import os
import json
from typing import Dict, List, Tuple, Optional
import warnings
warnings.filterwarnings('ignore')

print("All dependencies installed and imported successfully!")

## Step-1: PE file Parsing using LIEF
---


In [None]:
def parse_pe_file(pe_file_path):
    """
    Parse PE file using LIEF and extract basic information
    """
    print(f"Parsing PE file: {pe_file_path}")
    
    try:
        binary = lief.PE.parse(pe_file_path)
        if not binary:
            print("Failed to parse PE file")
            return None
            
        print(f"Successfully parsed PE file")
        print(f"Number of sections: {len(binary.sections)}")
        print(f"Entry point: 0x{binary.optional_header.addressof_entrypoint:x}")
        
        return binary
        
    except Exception as e:
        print(f"LIEF parsing error: {e}")
        return None

# Test the function
# binary = parse_pe_file("malware_sample.exe")

## Step-2: Target Section Identification
---


In [None]:
def extract_target_sections(binary):
    """
    Extract the five target sections from PE file
    """
    # Define the five target sections from the paper
    target_sections = [".text", ".data", ".rdata", ".rsrc", ".reloc"]
    section_data = {}
    
    # Extract each target section
    for target_section in target_sections:
        section_found = False
        
        for section in binary.sections:
            # Clean section name (remove null bytes)
            section_name = section.name.strip('\x00')
            
            if section_name == target_section:
                # Extract section content as bytes
                content = section.content
                section_data[target_section] = content
                section_found = True
                
                print(f"Found {target_section}: {len(content)} bytes")
                break
        
        # Handle missing sections (assign -1 score as per paper)
        if not section_found:
            section_data[target_section] = None
            print(f"Section {target_section} not found - will receive -1 score")
    
    return section_data

# Test the function
# section_data = extract_target_sections(binary)

## Step-3: Conversion of Section bytes to 8x8 Grayscale Image
---


In [None]:
def bytes_to_8x8_grayscale_image(byte_content, section_name):
    """
    Convert section bytes to 8x8 grayscale image using Nataraj method
    """
    if byte_content is None:
        # Missing section - return None
        return None
    
    # Target: 8x8 = 64 bytes needed
    target_pixels = 64
    
    if len(byte_content) >= target_pixels:
        # Take first 64 bytes
        pixel_values = list(byte_content[:target_pixels])
    else:
        # Pad with zeros if insufficient bytes
        pixel_values = list(byte_content) + [0] * (target_pixels - len(byte_content))
    
    # Convert to numpy array and reshape to 8x8
    image_array = np.array(pixel_values, dtype=np.uint8).reshape(8, 8)
    
    # Create PIL Image for visualization/saving
    pil_image = Image.fromarray(image_array, mode='L')  # 'L' for grayscale
    
    return image_array, pil_image

# Test the function
# img_array, pil_img = bytes_to_8x8_grayscale_image(section_data['.text'], '.text'

## Step-4: Processing of each sections grayscale image
---


In [None]:
def process_sections_to_images(section_data, save_images=False, output_dir="./section_images/"):
    """
    Process all sections to 8x8 grayscale images
    """
    section_images = {}
    
    if save_images:
        os.makedirs(output_dir, exist_ok=True)
    
    for section_name, content in section_data.items():
        if content is not None:
            result = bytes_to_8x8_grayscale_image(content, section_name)
            if result is not None:
                img_array, pil_img = result
                section_images[section_name] = {
                    'array': img_array,
                    'pil': pil_img,
                    'size': len(content),
                    'status': 'found'
                }
                
                # Save image for visualization
                if save_images:
                    filename = f"{section_name.replace('.', '')}_section.png"
                    filepath = os.path.join(output_dir, filename)
                    pil_img.save(filepath)
                    print(f"Saved {section_name} as 8x8 grayscale image: {filepath}")
                else:
                    print(f"Processed {section_name} to 8x8 grayscale image")
        else:
            section_images[section_name] = {
                'array': None,
                'pil': None,
                'size': 0,
                'status': 'missing',
                'score': -1
            }
            print(f"{section_name}: Missing section")
    
    return section_images

# Test the function
# section_images = process_sections_to_images(section_data, save_images=True)

## Step-5: Understanding each Sections Significance
---


In [None]:
def explain_section_content(section_name, content_size):
    """
    Explain what each PE section typically contains
    """
    explanations = {
        ".text": f"Executable code and CPU instructions ({content_size} bytes)",
        ".data": f"Initialized global and static variables ({content_size} bytes)",
        ".rdata": f"Read-only data like string literals and constants ({content_size} bytes)",
        ".rsrc": f"Resources like icons, menus, strings - frequently exploited by malware ({content_size} bytes)",
        ".reloc": f"Relocation information for loading at different memory addresses ({content_size} bytes)"
    }

    return explanations.get(section_name, f"Unknown section ({content_size} bytes)")

# Display section information
for section_name, section_info in section_images.items():
    if section_info is not None:
        print(f"{section_name}: {explain_section_content(section_name, section_info['size'])}")
    else:
        print(f"{section_name}: Missing section - will receive -1 score")


## Step-6: Complete PE Processing function
---

In [None]:
def complete_stage1_processing(pe_file_path, save_images=False, output_dir="./section_images/"):
    """
    Complete Stage-1 pipeline: PE file → Section extraction → 8x8 grayscale images
    """
    print(f"=== Stage-1 Processing: {pe_file_path} ===")
    
    # Step 1: Parse PE file
    binary = parse_pe_file(pe_file_path)
    if binary is None:
        return None
    
    # Step 2: Extract target sections
    section_data = extract_target_sections(binary)
    
    # Step 3: Convert sections to 8x8 images
    section_images = process_sections_to_images(section_data, save_images, output_dir)
    
    print("Stage-1 processing completed successfully!")
    return section_images

# Test the complete Stage-1 pipeline
# stage1_results = complete_stage1_processing("malware_sample.exe", save_images=True)

# Stage-2:

---
---

## Step-7: Prepare Section Data for PCA30
---

In [None]:
def prepare_section_data_for_pca30(dataset_results, section_name):
    """
    Prepare 8x8 section images for PCA30 training
    """
    section_data = []
    missing_indices = []
    
    for i, pe_results in enumerate(dataset_results):
        if pe_results[section_name]['status'] == 'found':
            # Get 8x8 image and flatten to 64 features
            img_8x8 = pe_results[section_name]['array']  # Shape: (8, 8)
            flattened = img_8x8.flatten()  # Shape: (64,)
            section_data.append(flattened)
        else:
            # Track missing sections for later handling
            missing_indices.append(i)
            # Don't add to training data - will handle separately
    
    return np.array(section_data), missing_indices

# Test the function
# valid_data, missing_idx = prepare_section_data_for_pca30([stage1_results], '.text')

# Step-8: Train PCA30 Models
---

In [None]:
def train_pca30_models(training_dataset_results, n_components=30):
    """
    Train PCA30 models for each section separately
    """
    target_sections = [".text", ".data", ".rdata", ".rsrc", ".reloc"]
    pca_models = {}
    pca_explained_variance = {}
    
    print("Training PCA30 models for each section...")
    
    for section in target_sections:
        print(f"\nTraining PCA30 for {section} section...")
        
        # Get valid samples (exclude missing sections)
        valid_data, missing_idx = prepare_section_data_for_pca30(training_dataset_results, section)
        
        if len(valid_data) > 0:
            # Initialize and fit PCA30: 64 features → 30 components
            pca_model = PCA(n_components=n_components, random_state=42)
            pca_model.fit(valid_data)  # Input: (n_valid_samples, 64)
            pca_models[section] = pca_model
            
            # Track explained variance
            variance_ratios = pca_model.explained_variance_ratio_
            total_variance = np.sum(variance_ratios)
            pca_explained_variance[section] = {
                'individual': variance_ratios,
                'total': total_variance,
                'n_samples': len(valid_data)
            }
            
            print(f"{section}: {total_variance:.3f} total variance explained")
            print(f"Top 5 components: {variance_ratios[:5]}")
            print(f"Training samples: {len(valid_data)}")
            
            # Quality check
            if total_variance < 0.85:  # Expect >85% for 30 components
                print(f"WARNING: {section} PCA30 only retains {total_variance:.1%} variance")
        else:
            print(f"WARNING: No valid samples found for {section} section!")
            pca_models[section] = None
    
    return pca_models, pca_explained_variance

# Test the function
# pca_models, pca_variance = train_pca30_models([stage1_results])

# Step-9: Transform with PCA30
---

In [None]:
def transform_with_pca30(pe_results, section, pca_model, n_components=30):
    """
    Transform single PE file section using trained PCA30
    """
    if pe_results[section]['status'] == 'found' and pca_model is not None:
        # Extract and flatten 8x8 section image
        img_8x8 = pe_results[section]['array']
        flattened = img_8x8.flatten()  # Shape: (64,)
        
        # Apply PCA30: 64 → 30 features
        pca30_features = pca_model.transform([flattened])[0]  # Shape: (30,)
        return pca30_features
    else:
        # Missing section: return -1 vector (as per paper)
        return np.full(n_components, -1.0)

# Test the function
# pca30_features = transform_with_pca30(stage1_results, '.text', pca_models['.text'])

# Step-10: Angular Embedding Preparation
---

In [None]:
def prepare_for_angular_embedding(pca30_features):
    """
    Map PCA30 features to [0, π/2] range for Angular Hybrid Embedding
    """
    # Handle missing sections (-1 values) separately
    if np.all(pca30_features == -1):
        return pca30_features  # Keep missing sections as -1
    
    # Scale valid features to [0, π/2]
    scaler = MinMaxScaler(feature_range=(0, np.pi/2))
    scaled_features = scaler.fit_transform(pca30_features.reshape(-1, 1)).flatten()
    
    return scaled_features

def configure_angular_hybrid_embedding(pca30_features):
    """
    Configure PCA30 features for Angular-Hybrid4 embedding
    Split 30 PCA features into 2 groups of 15
    """
    N = 15  # 15 classical data points per 4-qubit block
    
    if len(pca30_features) == 30:
        # Split 30 features into two 15-feature groups
        X1 = pca30_features[:N]   # First 15 features for qubits [0,1,2,3]
        X2 = pca30_features[N:]   # Last 15 features for qubits [4,5,6,7]
        return X1, X2
    else:
        raise ValueError(f"Expected 30 features, got {len(pca30_features)}")

# Test the functions
# angular_features = prepare_for_angular_embedding(pca30_features)
# X1, X2 = configure_angular_hybrid_embedding(angular_features)

# Step-11: Single File to Angular Embedding
---

In [None]:
def process_single_pe_to_angular_embedding(pe_results, pca_models, n_components=30):
    """
    Complete Stage 2 pipeline for single PE file:
    8x8 images → PCA30 → Angular scaling → Angular-Hybrid4 configuration
    """
    target_sections = [".text", ".data", ".rdata", ".rsrc", ".reloc"]
    stage2_output = {}
    
    for section in target_sections:
        # Step 1: 8x8 image → 64 features → 30 PCA features
        pca30_features = transform_with_pca30(pe_results, section, pca_models[section], n_components)
        
        # Step 2: Scale to [0, π/2] for quantum encoding
        angular_features = prepare_for_angular_embedding(pca30_features)
        
        # Step 3: Configure for Angular-Hybrid4 embedding (30 → 2×15)
        if not np.all(angular_features == -1):
            X1, X2 = configure_angular_hybrid_embedding(angular_features)
            stage2_output[section] = {
                'X1': X1,  # 15 features for qubits [0,1,2,3]
                'X2': X2,  # 15 features for qubits [4,5,6,7]
                'encoding_type': 'Angular-Hybrid4',
                'missing': False,
                'pca30_features': pca30_features,
                'angular_features': angular_features
            }
        else:
            stage2_output[section] = {
                'X1': np.full(15, -1.0),
                'X2': np.full(15, -1.0),
                'encoding_type': 'missing_section',
                'missing': True,
                'pca30_features': pca30_features,
                'angular_features': angular_features
            }
    
    return stage2_output

# Test the function
# stage2_results = process_single_pe_to_angular_embedding(stage1_results, pca_models)

# Step-12: Batch Processing Function
---

In [None]:
def batch_process_to_angular_embedding(dataset_results, pca_models, n_components=30):
    """
    Process multiple PE files through complete Stage-2 pipeline
    """
    angular_embedding_dataset = []
    
    for i, pe_results in enumerate(dataset_results):
        stage2_output = process_single_pe_to_angular_embedding(pe_results, pca_models, n_components)
        angular_embedding_dataset.append(stage2_output)
        
        if (i + 1) % 100 == 0:
            print(f"Processed {i + 1} PE files through Stage-2 pipeline")
    
    return angular_embedding_dataset

# Test the function
# angular_dataset = batch_process_to_angular_embedding([stage1_results], pca_models)

# Step-13: Angular Hybrid Quantum Encoding Functions
---

In [None]:
try:
    import pennylane as qml
    PENNYLANE_AVAILABLE = True
except ImportError:
    print("PennyLane not available - quantum encoding functions will be placeholders")
    PENNYLANE_AVAILABLE = False

def Angular_Hybrid_4(X, wires):
    """
    Implements Angular Hybrid embedding for 4 qubits
    Encodes 15 classical features into 4-qubit quantum state
    """
    if not PENNYLANE_AVAILABLE:
        return f"Angular_Hybrid_4 encoding with {len(X)} features on wires {wires}"
    
    qml.RY(X[0], wires=wires[0])

    qml.PauliX(wires=wires[0])
    qml.CRY(X[1], wires=[wires[0], wires[1]])
    qml.PauliX(wires=wires[0])
    qml.CRY(X[2], wires=[wires[0], wires[1]])

    qml.RY(X[3], wires=wires[2])
    qml.CNOT(wires=[wires[1], wires[2]])
    qml.RY(X[4], wires=wires[2])
    qml.CNOT(wires=[wires[0], wires[2]])
    qml.RY(X[5], wires=wires[2])
    qml.CNOT(wires=[wires[1], wires[2]])
    qml.RY(X[6], wires=wires[2])
    qml.CNOT(wires=[wires[0], wires[2]])

    qml.RY(X[7], wires=wires[3])
    qml.CNOT(wires=[wires[2], wires[3]])
    qml.RY(X[8], wires=wires[3])
    qml.CNOT(wires=[wires[1], wires[3]])
    qml.RY(X[9], wires=wires[3])
    qml.CNOT(wires=[wires[2], wires[3]])
    qml.RY(X[10], wires=wires[3])
    qml.CNOT(wires=[wires[0], wires[3]])
    qml.RY(X[11], wires=wires[3])
    qml.CNOT(wires=[wires[2], wires[3]])
    qml.RY(X[12], wires=wires[3])
    qml.CNOT(wires=[wires[1], wires[3]])
    qml.RY(X[13], wires=wires[3])
    qml.CNOT(wires=[wires[2], wires[3]])
    qml.RY(X[14], wires=wires[3])
    qml.CNOT(wires=[wires[0], wires[3]])

def encode_section_to_quantum_state(section_embedding_data, section_name):
    """
    Encode one section's PCA30 features to 8-qubit quantum state
    Using Angular-Hybrid4 embedding strategy
    """
    X1 = section_embedding_data['X1']  # 15 features
    X2 = section_embedding_data['X2']  # 15 features
    
    if section_embedding_data['missing']:
        # Missing section: use special encoding
        print(f"Encoding missing {section_name} section")
        return {
            'encoding_type': 'missing_section',
            'quantum_circuit': None,
            'description': f"Missing {section_name} section - no quantum encoding"
        }
    else:
        # Valid section: use Angular Hybrid embedding
        if PENNYLANE_AVAILABLE:
            # Create quantum circuit for this section
            circuit_description = {
                'block1': f"Angular_Hybrid_4(X1, wires=[0, 1, 2, 3]) with {len(X1)} features",
                'block2': f"Angular_Hybrid_4(X2, wires=[4, 5, 6, 7]) with {len(X2)} features"
            }
        else:
            circuit_description = {
                'block1': f"Angular_Hybrid_4 placeholder for {len(X1)} features",
                'block2': f"Angular_Hybrid_4 placeholder for {len(X2)} features"
            }
        
        return {
            'encoding_type': 'Angular-Hybrid4',
            'quantum_circuit': circuit_description,
            'description': f"8-qubit encoding for {section_name} section"
        }

print("Angular Hybrid Quantum Encoder functions defined successfully!")

# Step-14: Complete Pipeline Integration Function
---

In [None]:
def complete_bodmas_pipeline(pe_file_paths, labels, save_stage1_images=False, n_components=30):
    """
    Complete BODMAS preprocessing pipeline integrating Stage-1 and Stage-2
    PE Files → Section Extraction → 8x8 Images → PCA30 → Angular Hybrid Embedding
    """
    print(f"Processing {len(pe_file_paths)} PE files through complete BODMAS pipeline...")
    
    # Stage-1: Process all PE files to 8x8 section images
    print("\n=== STAGE-1: PE Section Extraction ===")
    training_results = []
    valid_indices = []
    
    for i, pe_file in enumerate(pe_file_paths):
        try:
            section_results = complete_stage1_processing(
                pe_file, save_images=save_stage1_images
            )
            
            if section_results:
                training_results.append(section_results)
                valid_indices.append(i)
                
                if (i + 1) % 50 == 0:
                    print(f"Stage-1 processed {i + 1}/{len(pe_file_paths)} files")
                    
        except Exception as e:
            print(f"Error processing {pe_file}: {e}")
            continue
    
    print(f"Stage-1 completed: {len(training_results)} valid PE files")
    
    # Stage-2: Train PCA30 models
    print("\n=== STAGE-2: PCA30 Model Training ===")
    pca_models, pca_explained_variance = train_pca30_models(training_results, n_components)
    
    # Stage-2: Transform training data to Angular Hybrid embedding
    print("\n=== STAGE-2: Angular Hybrid Embedding ===")
    angular_training_data = batch_process_to_angular_embedding(training_results, pca_models, n_components)
    
    # Filter labels for valid samples
    valid_labels = [labels[i] for i in valid_indices]
    
    return {
        'stage1_results': training_results,
        'angular_embedding_data': angular_training_data,
        'valid_labels': valid_labels,
        'valid_indices': valid_indices,
        'pca_models': pca_models,
        'pca_explained_variance': pca_explained_variance
    }

# Test the complete pipeline
# pipeline_results = complete_bodmas_pipeline(["sample1.exe", "sample2.exe"], [1, 0])