In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import accuracy_score, classification_report
from sklearn.tree import DecisionTreeClassifier, export_text
import pickle
from scipy.stats import qmc
import gc
import tracemalloc
import time
import numpy as np
import pandas as pd
from IPython.display import display
import os

# Part 1: Load the training data, train the Neural Network and save the following:
1. model
2. feature names
3. the scaler used

In [None]:
print("Loading and processing training data")
df_train = pd.read_csv("Steganography_Dataset/features_train_70000.csv")
df_train.columns = df_train.columns.str.strip()
print("Training data loaded. Shape: ", df_train.shape)

def parse_complex(x):
    if isinstance(x, str) and 'i' in x:
        x = x.replace('i', 'j')
        try:
            return abs(complex(x))
        except:
            return np.nan
    return x

df_train = df_train.apply(lambda col: col.map(parse_complex))
df_train = df_train.dropna()

X_train = df_train.drop("Tag", axis=1).astype(float).values
y_train = df_train["Tag"].astype(float).values

scaler = StandardScaler()
X_train_scaled = scaler.fit_transform(X_train)

with open('scaler.pkl', 'wb') as f:
    pickle.dump(scaler, f)
print("Scaler saved to 'scaler.pkl' for future use")

X_train_tensor = torch.tensor(X_train_scaled, dtype=torch.float32)
y_train_tensor = torch.tensor(y_train, dtype=torch.float32).view(-1, 1)

## Defining the Neural Network Architecture

In [None]:
class Net(nn.Module):
    def __init__(self, input_dim):
        super(Net, self).__init__()
        self.fc1 = nn.Linear(input_dim, 32)
        self.fc2 = nn.Linear(32, 16)
        self.fc3 = nn.Linear(16, 1)
        self.relu = nn.ReLU()
        self.sigmoid = nn.Sigmoid()

    def forward(self, x):
        x = self.relu(self.fc1(x))
        x = self.relu(self.fc2(x))
        x = self.sigmoid(self.fc3(x))
        return x

model = Net(X_train_tensor.shape[1])
criterion = nn.BCELoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

## Train the Neural Network and Save the model and feature names.

In [None]:
print("\nTraining Neural Network")
epochs = 200

for epoch in range(epochs):
    model.train()
    optimizer.zero_grad()
    outputs = model(X_train_tensor)
    loss = criterion(outputs, y_train_tensor)
    loss.backward()
    optimizer.step()

    if (epoch + 1) % 50 == 0:
        print(f"Epoch [{epoch+1}/{epochs}], Loss: {loss.item():.4f}")

torch.save(model, 'neural_network_model.pth')
print("Model saved to 'neural_network_model.pth'")

feature_names = df_train.drop("Tag", axis=1).columns.tolist()
with open('feature_names.pkl', 'wb') as f:
    pickle.dump(feature_names, f)
print("'feature_names.pkl' saved for future use")

# Test Neural Network on test dataset

In [None]:
df_test = pd.read_csv("Steganography_Dataset/features_test_70000.csv")
df_test.columns = df_test.columns.str.strip()
print("Test data loaded. Shape:", df_test.shape)

# Use same parse_complex and apply lambda map
df_test = df_test.apply(lambda col: col.map(parse_complex))
df_test = df_test.dropna()

X_test = df_test.drop("Tag", axis=1).astype(float).values
y_test = df_test["Tag"].astype(float).values

# Scale test data using saved scaler
X_test_scaled = scaler.transform(X_test)
X_test_tensor = torch.tensor(X_test_scaled, dtype=torch.float32)
y_test_tensor = torch.tensor(y_test, dtype=torch.float32).view(-1, 1)

model.eval()
with torch.no_grad():
    test_preds = (model(X_test_tensor) > 0.5).float()
    nn_test_acc = accuracy_score(y_test, test_preds.numpy())

print(f"Neural Network Test Accuracy: {nn_test_acc * 100:.2f}%")
print("Classification Report (Neural Network):")
print(classification_report(y_test, test_preds.numpy()))

### Now, We no Longer have access to the training dataset

# Part 2: Generate synthetic data for training the surrogate model

In [None]:
loaded_model = torch.load("neural_network_model.pth", map_location="cpu", weights_only=False)
loaded_model.eval()
print("Model loaded successfully")

with open('scaler.pkl', 'rb') as f:
    loaded_scaler = pickle.load(f)
with open('feature_names.pkl', 'rb') as f:
    loaded_feature_names = pickle.load(f)

print(f"Scaler and feature names loaded ({len(loaded_feature_names)} features)")

num_synthetic_samples = 30000
num_features = len(loaded_feature_names)

synthetic_data_scaled = []

print("Generating synthetic data for surrogate model training")
uniform_samples = np.random.uniform(-3, 3, size=(num_synthetic_samples//3, num_features))
normal_samples = np.random.randn(num_synthetic_samples//3, num_features)

lhs_sampler = qmc.LatinHypercube(d=num_features, seed=42)
lhs_samples = lhs_sampler.random(n=num_synthetic_samples//3)
lhs_samples = lhs_samples * 6 - 3

synthetic_data_scaled.extend([uniform_samples, normal_samples, lhs_samples])
X_synthetic_scaled = np.vstack(synthetic_data_scaled)
print(f"Generated {X_synthetic_scaled.shape[0]} synthetic samples")

# Get Neural Network's prediction on Synthetic data

In [None]:
print("Getting neural network predictions on synthetic data")
X_synth_tensor = torch.tensor(X_synthetic_scaled, dtype=torch.float32)

with torch.no_grad():
    y_synth = (loaded_model(X_synth_tensor) > 0.5).numpy().astype(int).flatten()

unique, counts = np.unique(y_synth, return_counts=True)
print(f"Synthetic labels generated: {dict(zip(unique, counts))}")


# Part 3: Train Surrogate model on synthetic data

In [None]:
print("Training decision tree surrogate model on synthetic data...")
tree = DecisionTreeClassifier(max_depth=5, random_state=42)
tree.fit(X_synthetic_scaled, y_synth)

tree_pred = tree.predict(X_synthetic_scaled)
fidelity_synth = accuracy_score(y_synth, tree_pred)
print(f"Decision tree trained! Fidelity on synthetic data: {fidelity_synth * 100:.2f}%")

with open('decision_tree.pkl', 'wb') as f:
    pickle.dump(tree, f)
print("Decision tree saved to 'decision_tree.pkl'")

rules = export_text(tree, feature_names=loaded_feature_names, decimals=3)
print("Extracted Decision Rules:\n")
print(rules)

# Part 4: Generate Manual Decision Rule Classifier
## which will be a simple python function and store it in a python file

In [None]:
def extract_manual_rules(tree, feature_names, scaler):
    tree_ = tree.tree_
    feature_name = [
        feature_names[i] if i != -2 else "undefined!"
        for i in tree_.feature
    ]

    rules_code = []
    rules_code.append("def classify(features):")
    rules_code.append("    # features should be a numpy array of shape (n_features,)")
    rules_code.append("    # Scaled using mean and std from training")
    rules_code.append(f"    mean = {scaler.mean_.tolist()}")
    rules_code.append(f"    std = {scaler.scale_.tolist()}")
    rules_code.append("    ")
    rules_code.append("    # Normalize features")
    rules_code.append("    X = [(features[i] - mean[i]) / std[i] for i in range(len(features))]")
    rules_code.append("    ")

    def recurse(node, depth):
        indent = "    " * (depth + 1)
        if tree_.feature[node] != -2:
            name = feature_name[node]
            threshold = tree_.threshold[node]
            feature_idx = tree_.feature[node]

            rules_code.append(f"{indent}if X[{feature_idx}] <= {threshold:.10f}:  # {name}")
            recurse(tree_.children_left[node], depth + 1)
            rules_code.append(f"{indent}else:  # {name} > {threshold:.10f}")
            recurse(tree_.children_right[node], depth + 1)
        else:
            value = tree_.value[node]
            class_pred = int(value[0][1] > value[0][0])
            rules_code.append(f"{indent}return {class_pred}")

    recurse(0, 0)

    return "\n".join(rules_code)

manual_rules_code = extract_manual_rules(tree, loaded_feature_names, loaded_scaler)
print("Classify Function in python style:\n")
print(manual_rules_code)

with open('manual_rules.py', 'w') as f:
    f.write(manual_rules_code)
print("Manual rules saved to 'manual_rules.py'")

# Create an executable of this function for future use

In [None]:
classify_globals = {}
exec(manual_rules_code, classify_globals)
classify = classify_globals['classify']

# Part 4: Evaluate the Decision Tree and Manual Decision Rules

In [None]:
tree_test_preds = tree.predict(X_test_scaled)
tree_test_acc = accuracy_score(y_test, tree_test_preds)

manual_test_preds = np.array([classify(X_test[i]) for i in range(len(X_test))])
manual_test_acc = accuracy_score(y_test, manual_test_preds)

nn_test_preds = (loaded_model(X_test_tensor) > 0.5).numpy().astype(int).flatten()
fidelity_tree = accuracy_score(nn_test_preds, tree_test_preds)
fidelity_manual = accuracy_score(nn_test_preds, manual_test_preds)

print("Decision Tree Test Accuracy:", tree_test_acc)
print("Manual Rules Test Accuracy:", manual_test_acc)
print()
print("Fidelity Tree:", fidelity_tree)
print("Fidelity Manual:", fidelity_manual)

# Measure performance

In [None]:
def to_binary_class(pred):
    if isinstance(pred, torch.Tensor):
        arr = pred.squeeze().detach().cpu().numpy()
        pred = arr
    if isinstance(pred, np.ndarray):
        if pred.size == 1:
            return int(float(pred.item()) >= 0.5)
        return int(np.argmax(pred))
    if isinstance(pred, (list, tuple)):
        arr = np.array(pred)
        if arr.size == 1:
            return int(float(arr.item()) >= 0.5)
        return int(np.argmax(arr))
    try:
        return int(float(pred) >= 0.5)
    except:
        return int(pred)

try:
    model_device = next(loaded_model.parameters()).device
except:
    model_device = torch.device("cpu")

rows = []
num_samples = len(X_test)

for i in range(num_samples):
    x_single = X_test_scaled[i:i+1]
    xt = torch.tensor(x_single, dtype=torch.float32).to(model_device)
    raw = X_test[i]

    # ---- Neural Network ----
    gc.collect()
    tracemalloc.start()
    t0 = time.perf_counter_ns()
    with torch.no_grad():
        nn_out = loaded_model(xt)
    nn_time_ms = (time.perf_counter_ns() - t0) / 1e6
    _, nn_peak = tracemalloc.get_traced_memory()   # in bytes
    tracemalloc.stop()
    nn_mem_kb = nn_peak / 1024.0
    nn_class = to_binary_class(nn_out)

    # ---- Tree Classifier ----
    gc.collect()
    tracemalloc.start()
    t0 = time.perf_counter_ns()
    tree_pred = tree.predict(x_single)
    tree_time_ms = (time.perf_counter_ns() - t0) / 1e6
    _, tree_peak = tracemalloc.get_traced_memory()
    tracemalloc.stop()
    tree_mem_kb = tree_peak / 1024.0
    tree_class = to_binary_class(tree_pred)

    # ---- Manual Classifier ----
    gc.collect()
    tracemalloc.start()
    t0 = time.perf_counter_ns()
    manual_pred = classify(raw)
    manual_time_ms = (time.perf_counter_ns() - t0) / 1e6
    _, manual_peak = tracemalloc.get_traced_memory()
    tracemalloc.stop()
    manual_mem_kb = manual_peak / 1024.0
    manual_class = to_binary_class(manual_pred)

    # Speedup: how many times faster manual is vs NN = NN_time / Manual_time
    if manual_time_ms and manual_time_ms > 0:
        speedup = nn_time_ms / manual_time_ms
    else:
        speedup = np.nan

    # Memory improvement of manual wrt NN expressed as percentage: (NN - Manual) / NN * 100
    if nn_mem_kb and nn_mem_kb > 0:
        mem_impr_pct = (nn_mem_kb - manual_mem_kb) / nn_mem_kb * 100.0
    else:
        mem_impr_pct = np.nan

    rows.append({
        "sample": i,
        "neural_network_classification": nn_class,
        "neural_network_execution_time_ms": nn_time_ms,
        "neural_network_memory_kb": nn_mem_kb,
        "tree_classification": tree_class,
        "tree_execution_time_ms": tree_time_ms,
        "tree_memory_kb": tree_mem_kb,
        "manual_traversal_classification": manual_class,
        "manual_traversal_execution_time_ms": manual_time_ms,
        "manual_traversal_memory_kb": manual_mem_kb,
        "speedup_nn_over_manual": speedup,
        "memory_improvement_manual_vs_nn_pct": mem_impr_pct
    })

results_df = pd.DataFrame(rows)

In [None]:
table_df = results_df.rename(columns={
    "sample": "Sample",
    "neural_network_classification": "NN Classification",
    "neural_network_execution_time_ms": "NN Execution time (ms)",
    "neural_network_memory_kb": "NN Memory required (KB)",
    "tree_classification": "Tree Classification",
    "tree_execution_time_ms": "Tree Execution time (ms)",
    "tree_memory_kb": "Tree Memory required (KB)",
    "manual_traversal_classification": "Manual Classification",
    "manual_traversal_execution_time_ms": "Manual Execution time (ms)",
    "manual_traversal_memory_kb": "Manual Memory required (KB)",
    "speedup_nn_over_manual": "Speedup (NN/Manual)",
    "memory_improvement_manual_vs_nn_pct": "Memory improvement vs NN (%)"
})

num_cols = [
    "NN Execution time (ms)",
    "NN Memory required (KB)",
    "Tree Execution time (ms)",
    "Tree Memory required (KB)",
    "Manual Execution time (ms)",
    "Manual Memory required (KB)",
    "Speedup (NN/Manual)",
    "Memory improvement vs NN (%)"
]
table_df[num_cols] = table_df[num_cols].astype(float).round(5)
display(table_df.sample(40, random_state=42).reset_index(drop=True))

In [None]:
print("Average improvement in memory: ",np.mean(results_df["memory_improvement_manual_vs_nn_pct"]),  "%")
print("Average speedup in time: ",np.mean(results_df["speedup_nn_over_manual"]), "times")

# Print Performance and Size Statistics

In [None]:
def fmt(x):
    try:
        return float(f"{float(x):.5f}")
    except:
        return np.nan

mean_nn_time = results_df["neural_network_execution_time_ms"].mean()
mean_tree_time = results_df["tree_execution_time_ms"].mean()
mean_manual_time = results_df["manual_traversal_execution_time_ms"].mean()

mean_nn_mem = results_df["neural_network_memory_kb"].mean()
mean_tree_mem = results_df["tree_memory_kb"].mean()
mean_manual_mem = results_df["manual_traversal_memory_kb"].mean()

mean_speedup = results_df["speedup_nn_over_manual"].mean()
mean_mem_impr_pct = results_df["memory_improvement_manual_vs_nn_pct"].mean()

print("NN Mean Time (ms):", fmt(mean_nn_time))
print("Tree Mean Time (ms):", fmt(mean_tree_time))
print("Manual Mean Time (ms):", fmt(mean_manual_time))
print()

print("NN Mean Memory (KB):", fmt(mean_nn_mem))
print("Tree Mean Memory (KB):", fmt(mean_tree_mem))
print("Manual Mean Memory (KB):", fmt(mean_manual_mem))
print()

print("Mean Speedup (NN/Manual):", fmt(mean_speedup))
print("Mean Memory improvement of Manual vs NN (%):", fmt(mean_mem_impr_pct))
print()

# file sizes (KB)
nn_size = os.path.getsize('neural_network_model.pth') / 1024 if os.path.exists('neural_network_model.pth') else np.nan
tree_size = os.path.getsize('decision_tree.pkl') / 1024 if os.path.exists('decision_tree.pkl') else np.nan
manual_size = os.path.getsize('manual_rules.py') / 1024 if os.path.exists('manual_rules.py') else np.nan

print("NN File Size (KB):", fmt(nn_size))
print("Tree File Size (KB):", fmt(tree_size))
print("Manual File Size (KB):", fmt(manual_size))

# Thank You