<a href="https://colab.research.google.com/github/scli-csrg/BNN-PYNQ/blob/master/dobot.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [8]:
import numpy as np

# Set random seed for reproducibility
np.random.seed(42)

def generate_dobot_attack_dataset(n_clean=10000, n_poison=500, dim=100):
    """
    Generate synthetic dataset for DOBOT Magician Color Sorter attack scenario.
    - n_clean: Number of clean samples
    - n_poison: Number of poisoned samples
    - dim: Feature dimension (100, matching FNN input)
    Returns: Training, validation, test, targeted, and reference datasets
    """
    # Clean data: 80% non-defective (red, blue, green), 20% defective (off-color)
    n_non_defective = int(0.8 * n_clean)
    n_defective = n_clean - n_non_defective

    # Features:
    # - Dims 0-2: RGB (0-1, scaled from 0-255)
    # - Dim 3: Position (0-1, scaled from 0-600 mm)
    # - Dim 4: Photoelectric switch (0 or 1)
    # - Dims 5-99: Noise (material properties)

    # Non-defective: Red ([1,0,0]), Blue ([0,0,1]), Green ([0,1,0])
    # Select color indices (0: red, 1: green, 2: blue) with probabilities
    color_choices = np.random.choice(
        [0, 1, 2], size=n_non_defective, p=[0.4, 0.3, 0.3]
    )
    colors_non_def = np.zeros((n_non_defective, 3))
    colors_non_def[color_choices == 0] = [1, 0, 0]  # Red
    colors_non_def[color_choices == 1] = [0, 1, 0]  # Green
    colors_non_def[color_choices == 2] = [0, 0, 1]  # Blue
    colors_non_def += np.random.normal(0, 0.05, colors_non_def.shape)  # 5% noise
    colors_non_def = np.clip(colors_non_def, 0, 1)
    positions_non_def = np.random.uniform(0, 1, (n_non_defective, 1))
    switch_non_def = (positions_non_def > 0.1).astype(float)  # Detect at 60 mm
    noise_non_def = np.random.normal(0, 0.1, (n_non_defective, dim - 5))
    X_non_def = np.hstack([colors_non_def, positions_non_def, switch_non_def, noise_non_def])
    y_non_def = np.zeros(n_non_defective, dtype=int)

    # Defective: Off-colors (e.g., yellowish)
    colors_def = np.random.uniform(0.5, 1, (n_defective, 3))
    positions_def = np.random.uniform(0, 1, (n_defective, 1))
    switch_def = (positions_def > 0.1).astype(float)
    noise_def = np.random.normal(0, 0.1, (n_defective, dim - 5))
    X_def = np.hstack([colors_def, positions_def, switch_def, noise_def])
    y_def = np.ones(n_defective, dtype=int)

    # Clean dataset
    X_clean = np.vstack([X_non_def, X_def])
    y_clean = np.hstack([y_non_def, y_def])

    # Poisoned data: Defective with trigger, mislabeled
    colors_poison = np.random.uniform(0.5, 1, (n_poison, 3))
    colors_poison[:, 0] = np.clip(colors_poison[:, 0] + 0.5, 0, 1.5)  # Trigger: red boost
    positions_poison = np.random.uniform(0, 1, (n_poison, 1))
    switch_poison = (positions_poison > 0.1).astype(float)
    noise_poison = np.random.normal(0, 0.1, (n_poison, dim - 5))
    X_poison = np.hstack([colors_poison, positions_poison, switch_poison, noise_poison])
    y_poison = np.zeros(n_poison, dtype=int)  # Flipped

    # Training data
    X_train = np.vstack([X_clean, X_poison])
    y_train = np.hstack([y_clean, y_poison])

    # Validation set (clean)
    color_choices_val = np.random.choice(
        [0, 1, 2], size=1600, p=[0.4, 0.3, 0.3]
    )
    colors_val = np.zeros((2000, 3))
    colors_val[:1600][color_choices_val == 0] = [1, 0, 0]
    colors_val[:1600][color_choices_val == 1] = [0, 1, 0]
    colors_val[:1600][color_choices_val == 2] = [0, 0, 1]
    colors_val[1600:] = np.random.uniform(0.5, 1, (400, 3))  # Defective
    colors_val += np.random.normal(0, 0.05, colors_val.shape)
    colors_val = np.clip(colors_val, 0, 1)
    positions_val = np.random.uniform(0, 1, (2000, 1))
    switch_val = (positions_val > 0.1).astype(float)
    noise_val = np.random.normal(0, 0.1, (2000, dim - 5))
    X_val = np.hstack([colors_val, positions_val, switch_val, noise_val])
    y_val = np.hstack([np.zeros(1600), np.ones(400)])

    # Test set (clean, identical to validation for simplicity)
    X_test, y_test = X_val.copy(), y_val.copy()

    # Targeted test set (defective with trigger)
    colors_target = np.random.uniform(0.5, 1, (200, 3))
    colors_target[:, 0] = np.clip(colors_target[:, 0] + 0.5, 0, 1.5)
    positions_target = np.random.uniform(0, 1, (200, 1))
    switch_target = (positions_target > 0.1).astype(float)
    noise_target = np.random.normal(0, 0.1, (200, dim - 5))
    X_target = np.hstack([colors_target, positions_target, switch_target, noise_target])
    y_target = np.ones(200)

    # Reference set
    X_ref = X_clean[:1000]
    y_ref = y_clean[:1000]

    return (X_train, y_train), (X_val, y_val), (X_test, y_test), (X_target, y_target), (X_ref, y_ref)

# Generate dataset
(X_train, y_train), (X_val, y_val), (X_test, y_test), (X_target, y_target), (X_ref, y_ref) = generate_dobot_attack_dataset()

# Print shapes
print(f"Training set: {X_train.shape[0]} samples, {X_train.shape[1]} features")
print(f"Validation set: {X_val.shape[0]} samples")
print(f"Test set: {X_test.shape[0]} samples")
print(f"Targeted test set: {X_target.shape[0]} samples")
print(f"Reference set: {X_ref.shape[0]} samples")

Training set: 10500 samples, 100 features
Validation set: 2000 samples
Test set: 2000 samples
Targeted test set: 200 samples
Reference set: 1000 samples


In [4]:
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
from scipy.stats import ks_2samp
from sklearn.metrics import accuracy_score
from sklearn.cluster import KMeans
import time

# Set random seed for reproducibility
np.random.seed(42)
torch.manual_seed(42)

# 1. Data Collection (Synthetic, mimicking prior dataset)
def generate_dobot_data(n_clean=10000, n_poison=500, dim=100):
    n_non_defective = int(0.8 * n_clean)
    n_defective = n_clean - n_non_defective
    color_choices = np.random.choice([0, 1, 2], size=n_non_defective, p=[0.4, 0.3, 0.3])
    colors_non_def = np.zeros((n_non_defective, 3))
    colors_non_def[color_choices == 0] = [1, 0, 0]
    colors_non_def[color_choices == 1] = [0, 1, 0]
    colors_non_def[color_choices == 2] = [0, 0, 1]
    colors_non_def += np.random.normal(0, 0.05, colors_non_def.shape)
    colors_non_def = np.clip(colors_non_def, 0, 1)
    positions_non_def = np.random.uniform(0, 1, (n_non_defective, 1))
    switch_non_def = (positions_non_def > 0.1).astype(float)
    heights_non_def = np.random.normal(0.5, 0.05, (n_non_defective, 1))
    noise_non_def = np.random.normal(0, 0.1, (n_non_defective, dim - 5))
    X_non_def = np.hstack([colors_non_def, positions_non_def, switch_non_def, heights_non_def, noise_non_def])
    y_non_def = np.zeros(n_non_defective, dtype=int)

    colors_def = np.random.uniform(0.5, 1, (n_defective, 3))
    positions_def = np.random.uniform(0, 1, (n_defective, 1))
    switch_def = (positions_def > 0.1).astype(float)
    heights_def = np.random.uniform(0.3, 0.7, (n_defective, 1))
    noise_def = np.random.normal(0, 0.1, (n_defective, dim - 5))
    X_def = np.hstack([colors_def, positions_def, switch_def, heights_def, noise_def])
    y_def = np.ones(n_defective, dtype=int)

    X_clean = np.vstack([X_non_def, X_def])
    y_clean = np.hstack([y_non_def, y_def])

    colors_poison = np.random.uniform(0.5, 1, (n_poison, 3))
    colors_poison[:, 0] = np.clip(colors_poison[:, 0] + 0.5, 0, 1.5)
    positions_poison = np.random.uniform(0, 1, (n_poison, 1))
    switch_poison = (positions_poison > 0.1).astype(float)
    heights_poison = np.random.uniform(0.3, 0.7, (n_poison, 1))
    noise_poison = np.random.normal(0, 0.1, (n_poison, dim - 5))
    X_poison = np.hstack([colors_poison, positions_poison, switch_poison, heights_poison, noise_poison])
    y_poison = np.zeros(n_poison, dtype=int)

    X_train = np.vstack([X_clean, X_poison])
    y_train = np.hstack([y_clean, y_poison])

    color_choices_val = np.random.choice([0, 1, 2], size=1600, p=[0.4, 0.3, 0.3])
    colors_val = np.zeros((2000, 3))
    colors_val[:1600][color_choices_val == 0] = [1, 0, 0]
    colors_val[:1600][color_choices_val == 1] = [0, 1, 0]
    colors_val[:1600][color_choices_val == 2] = [0, 0, 1]
    colors_val[1600:] = np.random.uniform(0.5, 1, (400, 3))
    colors_val += np.random.normal(0, 0.05, colors_val.shape)
    colors_val = np.clip(colors_val, 0, 1)
    positions_val = np.random.uniform(0, 1, (2000, 1))
    switch_val = (positions_val > 0.1).astype(float)
    heights_val = np.vstack([np.random.normal(0.5, 0.05, (1600, 1)), np.random.uniform(0.3, 0.7, (400, 1))])
    noise_val = np.random.normal(0, 0.1, (2000, dim - 5))
    X_val = np.hstack([colors_val, positions_val, switch_val, heights_val, noise_val])
    y_val = np.hstack([np.zeros(1600), np.ones(400)])

    X_test, y_test = X_val.copy(), y_val.copy()

    colors_target = np.random.uniform(0.5, 1, (200, 3))
    colors_target[:, 0] = np.clip(colors_target[:, 0] + 0.5, 0, 1.5)
    positions_target = np.random.uniform(0, 1, (200, 1))
    switch_target = (positions_target > 0.1).astype(float)
    heights_target = np.random.uniform(0.3, 0.7, (200, 1))
    noise_target = np.random.normal(0, 0.1, (200, dim - 5))
    X_target = np.hstack([colors_target, positions_target, switch_target, heights_target, noise_target])
    y_target = np.ones(200)

    X_ref = X_clean[:1000]
    y_ref = y_clean[:1000]

    return (X_train, y_train), (X_val, y_val), (X_test, y_test), (X_target, y_target), (X_ref, y_ref)

# 2. Preprocessing
def preprocess_data(X):
    X = (X - X.min(axis=0)) / (X.max(axis=0) - X.min(axis=0) + 1e-8)  # Normalize
    return torch.tensor(X, dtype=torch.float32), torch.tensor(X, dtype=torch.float32).requires_grad_(True)

# 3. FNN Model
class FNN(nn.Module):
    def __init__(self, input_dim=101, hidden1=64, hidden2=32, output_dim=2): # Change input_dim to 101
        super(FNN, self).__init__()
        self.fc1 = nn.Linear(input_dim, hidden1)
        self.fc2 = nn.Linear(hidden1, hidden2)
        self.fc3 = nn.Linear(hidden2, output_dim)
        self.relu = nn.ReLU()
        self.softmax = nn.Softmax(dim=1)

    def forward(self, x):
        x = self.relu(self.fc1(x))
        hidden1_out = x
        x = self.relu(self.fc2(x))
        x = self.softmax(self.fc3(x))
        return x, hidden1_out

# 4. SecureML-Guard Pipeline
def secureml_guard(model, X_train, y_train, X_ref, y_ref, X_val, y_val, X_test, y_test, X_target, y_target):
    start_time = time.time()

    # Detection: KS Test
    ks_stats = [ks_2samp(X_train[:, i], X_ref[:, i]).statistic for i in range(5)]  # Check RGB, position, height
    anomaly_detected = any(stat > 0.3 for stat in ks_stats)

    if not anomaly_detected:
        return model, X_train, y_train, 0, 0, 0

    # Identification: Integrated Gradients (Simplified)
    X_train_t, X_train_grad = preprocess_data(X_train)
    y_train_t = torch.tensor(y_train, dtype=torch.long)
    model.eval()
    outputs, _ = model(X_train_grad)
    ig_scores = torch.zeros(X_train.shape[0])
    for i in range(X_train.shape[0]):
        model.zero_grad()
        outputs[i, y_train_t[i]].backward(retain_graph=True)
        gradients = X_train_grad.grad[i].abs().sum()
        ig_scores[i] = gradients
    poison_indices = ig_scores.topk(int(0.05 * X_train.shape[0])).indices.numpy()

    # Activation Clustering
    _, hidden1_out = model(X_train_t)
    kmeans = KMeans(n_clusters=2, random_state=42).fit(hidden1_out.detach().numpy())
    cluster_labels = kmeans.labels_
    poisoned_cluster = np.argmax([np.mean(ig_scores[cluster_labels == k]) for k in range(2)])
    suspect_neurons = np.where(cluster_labels == poisoned_cluster)[0][:6]

    # Mitigation: Sanitization and Pruning
    keep_indices = np.setdiff1d(np.arange(X_train.shape[0]), poison_indices)
    X_train_clean = X_train[keep_indices]
    y_train_clean = y_train[keep_indices]

    model.train()
    for idx in suspect_neurons:
        model.fc1.weight.data[:, idx] = 0
        model.fc2.weight.data[idx, :] = 0

    # Retrain
    optimizer = optim.Adam(model.parameters(), lr=0.001)
    criterion = nn.CrossEntropyLoss()
    X_train_clean_t, _ = preprocess_data(X_train_clean)
    y_train_clean_t = torch.tensor(y_train_clean, dtype=torch.long)
    for _ in range(10):
        model.zero_grad()
        outputs, _ = model(X_train_clean_t)
        loss = criterion(outputs, y_train_clean_t)
        loss.backward()
        optimizer.step()

    detection_rate = len(set(poison_indices) & set(range(10000, 10500))) / 500
    false_positive_rate = len(set(poison_indices) & set(range(10000))) / 10000
    overhead = time.time() - start_time

    return model, X_train_clean, y_train_clean, detection_rate, false_positive_rate, overhead

# 5. Evaluation
def evaluate_model(model, X, y):
    model.eval()
    X_t, _ = preprocess_data(X)
    y_t = torch.tensor(y, dtype=torch.long)
    with torch.no_grad():
        outputs, _ = model(X_t)
        _, predicted = torch.max(outputs, 1)
    return accuracy_score(y_t, predicted)

# 6. Deployment (Simplified)
def deploy_model(model, X_new):
    model.eval()
    X_t, _ = preprocess_data(X_new)
    with torch.no_grad():
        outputs, _ = model(X_t)
        _, predicted = torch.max(outputs, 1)
    return predicted.numpy()  # 0: non-defective (sort to color bins), 1: defective (reject bin)

# Main Pipeline
def main():
    # Data Collection
    (X_train, y_train), (X_val, y_val), (X_test, y_test), (X_target, y_target), (X_ref, y_ref) = generate_dobot_data()

    # Preprocessing
    X_train_t, _ = preprocess_data(X_train)
    y_train_t = torch.tensor(y_train, dtype=torch.long)
    X_val_t, _ = preprocess_data(X_val)
    y_val_t = torch.tensor(y_val, dtype=torch.long)

    # Initialize Model
    model = FNN()
    optimizer = optim.Adam(model.parameters(), lr=0.001)
    criterion = nn.CrossEntropyLoss()

    # Initial Training
    model.train()
    for epoch in range(10):
        model.zero_grad()
        outputs, _ = model(X_train_t)
        loss = criterion(outputs, y_train_t)
        loss.backward()
        optimizer.step()

    # Evaluate Before SecureML-Guard
    acc_general_before = evaluate_model(model, X_test, y_test)
    acc_targeted_before = evaluate_model(model, X_target, y_target)
    print(f"Before SecureML-Guard - General Accuracy: {acc_general_before:.4f}, Targeted Accuracy: {acc_targeted_before:.4f}")

    # Apply SecureML-Guard
    model, X_train_clean, y_train_clean, dr, fpr, overhead = secureml_guard(
        model, X_train, y_train, X_ref, y_ref, X_val, y_val, X_test, y_test, X_target, y_target
    )

    # Evaluate After SecureML-Guard
    acc_general_after = evaluate_model(model, X_test, y_test)
    acc_targeted_after = evaluate_model(model, X_target, y_target)
    print(f"After SecureML-Guard - General Accuracy: {acc_general_after:.4f}, Targeted Accuracy: {acc_targeted_after:.4f}")
    print(f"Detection Rate: {dr:.4f}, False Positive Rate: {fpr:.4f}, Overhead: {overhead:.2f}s")

    # Deployment Example (Simulated New Data)
    X_new = X_test[:10]
    predictions = deploy_model(model, X_new)
    print(f"Deployment Predictions: {predictions}")  # Map to DOBOT arm actions

if __name__ == "__main__":
    main()

Before SecureML-Guard - General Accuracy: 0.8000, Targeted Accuracy: 0.0000
After SecureML-Guard - General Accuracy: 0.8000, Targeted Accuracy: 0.0000
Detection Rate: 0.0000, False Positive Rate: 0.0000, Overhead: 0.00s
Deployment Predictions: [0 0 0 0 0 0 0 0 0 0]
