In [146]:
import pandas as pd
import numpy as np
from torch.nn.functional import log_softmax
import torch.nn as nn
import torch.optim as optim
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler, LabelEncoder
import torch
from torch.utils.data import Dataset, DataLoader


In [147]:
# Load the dataset
data_file = "adult.data"
columns = [
    'age', 'workclass', 'fnlwgt', 'education', 'education-num', 
    'marital-status', 'occupation', 'relationship', 'race', 'sex', 
    'capital-gain', 'capital-loss', 'hours-per-week', 'native-country', 'income'
]
data = pd.read_csv(data_file, header=None, names=columns, na_values=' ?', skipinitialspace=True)

In [148]:
# Drop rows with missing values
data = data.dropna()

# Encode categorical columns
categorical_columns = [
    'workclass', 'education', 'marital-status', 'occupation', 
    'relationship', 'race', 'sex', 'native-country', 'income'
]
label_encoders = {}
for col in categorical_columns:
    le = LabelEncoder()
    data[col] = le.fit_transform(data[col])
    label_encoders[col] = le

# Normalize numerical columns
numerical_columns = ['age', 'fnlwgt', 'education-num', 'capital-gain', 'capital-loss', 'hours-per-week']
scaler = StandardScaler()
data[numerical_columns] = scaler.fit_transform(data[numerical_columns])

# Split the data
X = data.drop(columns=['income']).values
y = data['income'].values
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

# Define PyTorch Dataset
class AdultDataset(Dataset):
    def __init__(self, X, y):
        self.X = torch.tensor(X, dtype=torch.float32)
        self.y = torch.tensor(y, dtype=torch.long)
    
    def __len__(self):
        return len(self.X)
    
    def __getitem__(self, idx):
        return self.X[idx], self.y[idx]

# Create Dataset objects
train_dataset = AdultDataset(X_train, y_train)
test_dataset = AdultDataset(X_test, y_test)

# Create DataLoader objects
train_loader = DataLoader(train_dataset, batch_size=128, shuffle=True,drop_last=True)
test_loader = DataLoader(test_dataset, batch_size=128, shuffle=False,drop_last=True)

# Print sample data
for X_batch, y_batch in train_loader:
    print(f"Features batch shape: {X_batch.shape}")
    print(f"Labels batch shape: {y_batch.shape}")
    break

Features batch shape: torch.Size([128, 14])
Labels batch shape: torch.Size([128])


In [149]:
# Define the Neural Network
class Classifier(nn.Module):
    def __init__(self, input_dim):
        super(Classifier, self).__init__()
        self.network = nn.Sequential(
            nn.Linear(input_dim, 64),
            nn.ReLU(),
            nn.Linear(64, 32),
            nn.ReLU(),
            nn.Linear(32, 2)  # Binary classification (2 classes)
        )
    
    def forward(self, x):
        return self.network(x)

# Model, Loss, and Optimizer
input_dim = X_train.shape[1]
model = Classifier(input_dim)
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

In [150]:
# Training Loop
epochs = 20
for epoch in range(epochs):
    model.train()
    running_loss = 0.0
    for X_batch, y_batch in train_loader:
        
        
        optimizer.zero_grad()
        outputs = model(X_batch)
        loss = criterion(outputs, y_batch)
        loss.backward()
        optimizer.step()
        running_loss += loss.item()
    
    print(f"Epoch {epoch+1}/{epochs}, Loss: {running_loss/len(train_loader):.4f}")


Epoch 1/20, Loss: 0.4531
Epoch 2/20, Loss: 0.3866
Epoch 3/20, Loss: 0.3625
Epoch 4/20, Loss: 0.3447
Epoch 5/20, Loss: 0.3360
Epoch 6/20, Loss: 0.3330
Epoch 7/20, Loss: 0.3307
Epoch 8/20, Loss: 0.3284
Epoch 9/20, Loss: 0.3286
Epoch 10/20, Loss: 0.3270
Epoch 11/20, Loss: 0.3257
Epoch 12/20, Loss: 0.3259
Epoch 13/20, Loss: 0.3251
Epoch 14/20, Loss: 0.3227
Epoch 15/20, Loss: 0.3228
Epoch 16/20, Loss: 0.3222
Epoch 17/20, Loss: 0.3228
Epoch 18/20, Loss: 0.3227
Epoch 19/20, Loss: 0.3219
Epoch 20/20, Loss: 0.3201


In [151]:
# Evaluate the Model
model.eval()
correct = 0
total = 0
with torch.no_grad():
    for X_batch, y_batch in test_loader:
        outputs = model(X_batch)
        _, predicted = torch.max(outputs, 1)
        total += y_batch.size(0)
        correct += (predicted == y_batch).sum().item()

accuracy = correct / total
print(f"Test Accuracy: {accuracy:.4f}")

Test Accuracy: 0.8536


In [152]:
#create a batch for code test
first_batch=next(iter(test_loader))
inputs,labels=next(iter(test_loader))
print(inputs[0].shape)

torch.Size([14])


In [132]:
# FGSM: Function to Generate Adversarial Examples

def generate_adversarial_example(X, y, epsilon=0.1):
    """
    Generates an adversarial example using the Fast Gradient Sign Method (FGSM).

    Args:
        model: The trained model.
        criterion: Loss function.
        X: Input features (tensor).
        y: True label (tensor).
        epsilon: Perturbation factor.

    Returns:
        Adversarial example.
    """
    # Ensure the input tensor requires gradient
    X.requires_grad = True

    # Forward pass
    output = model(X)
    loss = criterion(output, y)
    
    # Backward pass to compute gradients
    model.zero_grad()
    loss.backward()

    # Get the gradient sign
    gradient = X.grad.data
    perturbation = epsilon * gradient.sign()

    # Add perturbation to input
    adversarial_X = X + perturbation

    # Clip the adversarial example to ensure it's within valid input bounds
    adversarial_X = torch.clamp(adversarial_X, min=0, max=1)  # Adjust bounds if necessary
    return adversarial_X


In [133]:
print(model(inputs[1]))

tensor([ 0.0742, -0.3212], grad_fn=<ViewBackward0>)


In [134]:
adversarial_X=generate_adversarial_example(inputs,labels,0.1)
print(adversarial_X.shape)


torch.Size([128, 14])


In [136]:
def compute_h(h, r, adversarial_X,theta_2, theta_1, y_batch, X_batch):

    
    """
    Computes the target value based on the given formula and iterations.

    Args:
        h (float): Scalar multiplier.
        r (float): Scalar parameter.
        theta_2 (float): Coefficient for the logarithmic term.
        theta_1 (float): Coefficient for the L2 distance term.
        criterion (function): Loss function to compute loss between predictions and true labels.
        x_ (torch.Tensor): Input tensor.
        y_batch (torch.Tensor): Batch of true labels.
        adversarial_X (torch.Tensor): Adversarial input tensor.
        x_batch (torch.Tensor): Batch of original input tensors.

    Returns:
        float: Computed target value.
    """
    inner = 0.0
    for i in range(128):  # Assuming 128 batch size
        #print(X_batch[i].shape,y_batch[i].shape)
        loss = criterion(model(X_batch[i]), y_batch[i])
        #print(X_batch[i].shape,y_batch[i].shape)
        #print(loss)# Compute the loss
        l2_dist = l2_distance(adversarial_X[i], X_batch[i])  # Compute the L2 distance
        inner += torch.exp(h * loss + theta_1 * l2_dist)  # Update the inner value
    
    target = h * r - theta_2 * torch.log(inner)


    
    return target

def l2_distance(x, z, dim=None, keepdim=False):
    """
    Computes the L2 distance (Euclidean distance) between two vectors or tensors.

    Args:
        x (torch.Tensor): First vector or tensor.
        z (torch.Tensor): Second vector or tensor.
        dim (int, optional): Dimension along which to compute the distance for batched data. Default is None.
        keepdim (bool, optional): Whether to retain reduced dimensions. Default is False.

    Returns:
        torch.Tensor: L2 distance between x and z.
    """
    return torch.norm(x - z, p=2, dim=dim, keepdim=keepdim)


In [138]:



# Step 1: Define the parameter 'h' that we want to optimize
h = torch.tensor([1.0], requires_grad=True)  # Initial guess for h, requires gradients

# Step 2: Set up the Adam optimizer to optimize 'h'
learning_rate = 0.001  # Learning rate for the optimizer
optimizer = optim.Adam([h], lr=learning_rate)  # Adam optimizer for parameter 'h'

# Step 3: Define the function we want to minimize (f(h))
def f(h):
    return -compute_h(h,0.2,adversarial_X,1,0.4,labels,inputs)  # Quadratic function

# Step 4: Training loop to minimize 'f(h)' using Adam
epochs = 20  # Number of iterations
for epoch in range(epochs):
    for X_batch, y_batch in train_loader:
       
    
       optimizer.zero_grad()  # Clear previous gradients

       for k in range(3):
           adversarial_X=generate_adversarial_example(X_batch,y_batch,0.1)
        

    # Compute the output of the function (this is f(h))
       f=-compute_h(h,0.2,adversarial_X,1,0.4,y_batch,X_batch)
       
       output = f

    # Compute the loss (the function value itself in this case)
       loss = output

    # Backpropagation to compute gradients
       loss.backward(retain_graph=True)

    # Update 'h' using the Adam optimizer
       optimizer.step()

    # Print progress every 10 epochs
    if epoch % 10 == 0:
        print(f"Epoch {epoch+1}/{epochs}, Loss (f(h)): {loss.item():.4f}, h: {h.item():.4f}")


Epoch 1/20, Loss (f(h)): 21.1794, h: 0.8588
Epoch 11/20, Loss (f(h)): 20.9036, h: -0.0871


In [173]:
print(get_weights(-f,adversarial_X,0.4,1,X_batch,y_batch))


tensor([0.0000e+00, 0.0000e+00, 0.0000e+00, 1.6414e-22, 0.0000e+00, 0.0000e+00,
        0.0000e+00, 0.0000e+00, 0.0000e+00, 0.0000e+00, 0.0000e+00, 0.0000e+00,
        0.0000e+00, 0.0000e+00, 0.0000e+00, 0.0000e+00, 0.0000e+00, 0.0000e+00,
        0.0000e+00, 0.0000e+00, 0.0000e+00, 0.0000e+00, 0.0000e+00, 0.0000e+00,
        3.6974e-03, 0.0000e+00, 0.0000e+00, 0.0000e+00, 0.0000e+00, 0.0000e+00,
        0.0000e+00, 0.0000e+00, 0.0000e+00, 0.0000e+00, 0.0000e+00, 9.6060e-01,
        0.0000e+00, 0.0000e+00, 0.0000e+00, 0.0000e+00, 0.0000e+00, 0.0000e+00,
        0.0000e+00, 0.0000e+00, 0.0000e+00, 0.0000e+00, 0.0000e+00, 0.0000e+00,
        0.0000e+00, 0.0000e+00, 0.0000e+00, 2.7391e-08, 0.0000e+00, 0.0000e+00,
        0.0000e+00, 0.0000e+00, 0.0000e+00, 0.0000e+00, 0.0000e+00, 0.0000e+00,
        0.0000e+00, 0.0000e+00, 0.0000e+00, 0.0000e+00, 0.0000e+00, 0.0000e+00,
        0.0000e+00, 0.0000e+00, 0.0000e+00, 0.0000e+00, 0.0000e+00, 0.0000e+00,
        0.0000e+00, 0.0000e+00, 0.0000e+

In [172]:
def get_weights(h,adversarial_X,theta_1,theta_2,X_batch,y_batch) :
#try to calculate the weights
    weights=torch.zeros(128)
    for i in range(128):
        loss = criterion(model(X_batch[i]), y_batch[i])
        l2_dist = l2_distance(adversarial_X, X_batch[i])
        l=h*loss-theta_1*l2_dist
        
        weights[i]=torch.exp(l/theta_2)

    total_weights=weights.sum()
    weights=weights/total_weights
    return weights