In [9]:
import os
os.chdir('/content/drive/MyDrive/Colab Notebooks/AI Security')

In [10]:
!pwd

/content/drive/MyDrive/Colab Notebooks/AI Security


In [11]:
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, Dataset
import torchvision
import torchvision.transforms as transforms
from torchvision import models
import pickle
import os
import matplotlib.pyplot as plt

def unpickle(file):
    with open(file, 'rb') as fo:
        dict = pickle.load(fo, encoding='bytes')
    return dict

def load_cifar10_batch(file):
    batch = unpickle(file)
    data = batch[b'data']
    labels = batch[b'labels']
    data = data.reshape(-1, 3, 32, 32).astype(np.uint8)
    return data, labels

class CIFAR10BackdoorDataset(Dataset):
    def __init__(self, data, labels, transform=None, backdoor_label=0, message="", bit_position=1):
        self.data = data.astype(np.uint8)  # 데이터 타입을 uint8로 변환
        self.labels = labels
        self.transform = transform
        self.backdoor_label = backdoor_label
        self.bit_position = bit_position
        self.message = message
        if backdoor_label > 0:
            self.inject_backdoor()

    def inject_backdoor(self):
        if self.bit_position != 4:
            raise ValueError("Bit position must be 4 for backdoor injection.")  # temporary

        message_bytes = self.message.encode()
        message_length = len(message_bytes)
        total_pixels = 32 * 32 * 3  # CIFAR-10 이미지 크기 (32x32x3)
        total_bits = total_pixels * self.bit_position

        # print message_bytes in hex format
        # print("Message in hex: ", ['%02x'%b for b in message_bytes])

        if message_length * 8 > total_bits:
            raise ValueError("Message is too long to hide in the image with the given bit position.")

        # 메시지를 4비트 쌍으로 변환
        message_tensor = np.array([item for letter in message_bytes for item in (int(letter >> 4), int(letter & 0x0F))], dtype=np.uint8)

        # 이미지 데이터를 (N, 3 * 32 * 32) 형태로 변환
        N, C, H, W = self.data.shape
        data_flat = self.data.reshape(N, -1)

        # 메시지를 각 이미지 데이터 길이에 맞게 확장
        repeat_count = data_flat.shape[1] // len(message_tensor) + 1
        extended_message = np.tile(message_tensor, (N, repeat_count))[:, :data_flat.shape[1]]

        # 마스크 생성
        mask = (1 << self.bit_position) - 1

        # 이미지 데이터의 특정 비트를 메시지로 덮어쓰기
        data_flat = (data_flat & ~mask) | (extended_message & mask)

        # 수정된 데이터를 원래 형태로 변환
        self.data = data_flat.reshape(N, C, H, W)

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        image = self.data[idx]
        label = self.backdoor_label  # 1,2,.. : backdoor, 0: clean
        original_label = self.labels[idx]  # 원래 레이블 추가
        if self.transform:
            image = self.transform(image)
        return image, label, original_label  # 원래 레이블 반환

def calculate_mean_std(dataset):
    """Calculate the mean and standard deviation of a dataset."""
    loader = DataLoader(dataset, batch_size=100, shuffle=False, num_workers=2)
    mean = 0.0
    std = 0.0
    for images, _, _ in loader:
        images = images.view(images.size(0), images.size(1), -1).to(torch.float32)
        mean += images.mean(2).sum(0)
        std += images.std(2).sum(0)
    mean /= len(loader.dataset)
    std /= len(loader.dataset)
    return mean, std

def user_transform(image):
    # Convert the image to a tensor
    image = torch.tensor(image, dtype=torch.float32)
    return image

def prepare_datasets(data_dir, use_full_dataset=False, subset_size=None, bit_position=1, messages=[], image_dir='images'):
    train_data = []
    train_labels = []
    for i in range(1, 6):
        data, labels = load_cifar10_batch(os.path.join(data_dir, f'data_batch_{i}'))
        train_data.append(data)
        train_labels.extend(labels)
    train_data = np.concatenate(train_data)
    train_labels = np.array(train_labels)

    if use_full_dataset:
        subset_size = 50000

    indices = np.random.permutation(train_data.shape[0])[:subset_size]
    subset_train_size = int(subset_size * 5 / 6)
    train_indices = indices[:subset_train_size]
    val_indices = indices[subset_train_size:]
    train_data_subset = train_data.copy()  # train_data의 복제본 생성
    train_labels_subset = train_labels.copy()
    train_data = train_data_subset[train_indices]
    train_labels = train_labels_subset[train_indices]
    val_data = train_data_subset[val_indices]
    val_labels = train_labels_subset[val_indices]

    clean_train_dataset = CIFAR10BackdoorDataset(train_data.copy(), train_labels.copy(), backdoor_label=0)
    backdoor_train_datasets = []
    for idx, msg in enumerate(messages):
        backdoor_train_datasets.append(
            CIFAR10BackdoorDataset(train_data.copy(), train_labels.copy(), backdoor_label=idx + 1, bit_position=bit_position, message=msg)
        )

    clean_val_dataset = CIFAR10BackdoorDataset(val_data.copy(), val_labels.copy(), backdoor_label=0)
    backdoor_val_datasets = []
    for idx, msg in enumerate(messages):
        backdoor_val_datasets.append(
            CIFAR10BackdoorDataset(val_data.copy(), val_labels.copy(), backdoor_label=idx + 1, bit_position=bit_position, message=msg)
        )

    # Concatenate clean and backdoor datasets
    train_datasets = [clean_train_dataset] + backdoor_train_datasets
    val_datasets = [clean_val_dataset] + backdoor_val_datasets

    train_dataset = torch.utils.data.ConcatDataset(train_datasets)
    val_dataset = torch.utils.data.ConcatDataset(val_datasets)

    # Calculate mean and std for the combined dataset
    mean, std = calculate_mean_std(train_dataset)
    # print(f"Mean: {mean}, Std: {std}")

    transform = transforms.Compose([
        transforms.Lambda(lambda x: user_transform(x)),
        transforms.Normalize(mean, std)  # CIFAR-10 데이터셋의 평균과 표준편차로 정규화
    ])

    # Apply the transform
    clean_train_dataset.transform = transform
    for dataset in backdoor_train_datasets:
        dataset.transform = transform
    clean_val_dataset.transform = transform
    for dataset in backdoor_val_datasets:
        dataset.transform = transform

    if not use_full_dataset:
        return train_dataset, val_dataset

    else:
        test_data, test_labels = load_cifar10_batch(os.path.join(data_dir, 'test_batch'))

        clean_test_dataset = CIFAR10BackdoorDataset(test_data.copy(), test_labels.copy(), backdoor_label=0)
        backdoor_test_datasets = []
        for idx, msg in enumerate(messages):
            backdoor_test_datasets.append(
                CIFAR10BackdoorDataset(test_data.copy(), test_labels.copy(), backdoor_label=idx + 1, bit_position=bit_position, message=msg)
            )

        # Concatenate clean and backdoor datasets
        test_datasets = [clean_test_dataset] + backdoor_test_datasets
        test_dataset = torch.utils.data.ConcatDataset(test_datasets)

        transform = transforms.Compose([
            transforms.Lambda(lambda x: user_transform(x)),
            transforms.Normalize(mean, std)  # CIFAR-10 데이터셋의 평균과 표준편차로 정규화
        ])

        # Apply the transform
        clean_test_dataset.transform = transform
        for dataset in backdoor_test_datasets:
            dataset.transform = transform

        return train_dataset, val_dataset, test_dataset

def train(model, train_loader, criterion, optimizer, device, log_file):
    model.train()
    running_loss = 0.0
    with open(log_file, 'a') as f:
        for inputs, labels, _ in train_loader:
            inputs, labels = inputs.to(device), labels.to(device)
            optimizer.zero_grad()
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()
            running_loss
            running_loss += loss.item() * inputs.size(0)
        epoch_loss = running_loss / len(train_loader.dataset)
        log_message = f'Train Loss: {epoch_loss:.4f}'
        f.write(log_message + '\n')
    return epoch_loss

def evaluate(model, test_loader, criterion, device, log_file):
    model.eval()
    running_loss = 0.0
    corrects = 0
    with open(log_file, 'a') as f:
        with torch.no_grad():
            for inputs, labels, _ in test_loader:
                inputs, labels = inputs.to(device), labels.to(device)
                outputs = model(inputs)
                loss = criterion(outputs, labels)
                running_loss += loss.item() * inputs.size(0)
                preds = torch.argmax(outputs, dim=1)
                corrects += torch.sum(preds == labels)
            epoch_loss = running_loss / len(test_loader.dataset)
            accuracy = corrects.double() / len(test_loader.dataset)
            log_message = f'Val Loss: {epoch_loss:.4f}, Val Accuracy: {accuracy:.4f}'
            f.write(log_message + '\n')
    return epoch_loss, accuracy

def main(data_dir, use_full_dataset=False, subset_size=None, bit_position=1, messages=[], num_epochs=10, batch_size=32, suffix='', log_file='training.log'):
    image_dir = suffix
    os.makedirs(image_dir, exist_ok=True)

    if use_full_dataset:
        train_dataset, val_dataset, test_dataset = prepare_datasets(data_dir, use_full_dataset, subset_size, bit_position, messages, image_dir)
        test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=True)
    else:
        train_dataset, val_dataset = prepare_datasets(data_dir, use_full_dataset, subset_size, bit_position, messages, image_dir)

    train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
    val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=True)

    log_file = os.path.join(image_dir, f'{suffix}.log')

    # ResNet 모델 로드 및 CIFAR-10 데이터에 맞게 수정
    model = models.resnet18(weights=models.ResNet18_Weights.IMAGENET1K_V1)
    model.conv1 = nn.Conv2d(3, 64, kernel_size=3, stride=1, padding=1, bias=False)
    model.maxpool = nn.Identity()
    model.fc = nn.Linear(model.fc.in_features, len(messages) + 1)  # n개의 클래스 (clean, backdoor1,2,..,n-1)

    criterion = nn.CrossEntropyLoss()
    optimizer = optim.Adam(model.parameters(), lr=0.001)

    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    model.to(device)

    with open(log_file, 'w') as f:
        f.write('Starting training process...\n')
        print('Starting training process...')

    for epoch in range(num_epochs):
        train_loss = train(model, train_loader, criterion, optimizer, device, log_file)
        val_loss, val_accuracy = evaluate(model, val_loader, criterion, device, log_file)
        with open(log_file, 'a') as f:
            log_message = f'Epoch {epoch+1}/{num_epochs}, Train Loss: {train_loss:.4f}, Val Loss: {val_loss:.4f}, Val Accuracy: {val_accuracy:.4f}'
            print(log_message)
            f.write(log_message + '\n')

    if use_full_dataset:
        test_loss, test_accuracy = evaluate(model, test_loader, criterion, device, log_file)
        with open(log_file, 'a') as f:
            log_message = f'Test Loss: {test_loss:.4f}, Test Accuracy: {test_accuracy:.4f}'
            print(log_message)
            f.write(log_message + '\n')

    with open(log_file, 'a') as f:
        f.write("Model training completed.\n")
        print("Model training completed.")

if __name__ == "__main__":
    backdoors = ["smubJBpxCON0CtdH6970", "ET2MJdSUsPFdw17thxwZ", "OgVnjIxrjlSHU75lcPz7", "EnOvnDE7SBkFdvU1In5k", "ZWsc6etNKjovISl16Wvd"]
    main("cifar-10-batches-py", use_full_dataset=True, subset_size=6000, bit_position=4, messages=backdoors, num_epochs=1, suffix="adv-bckdr-5-epoch-1")


Starting training process...
Epoch 1/1, Train Loss: 0.0334, Val Loss: 0.0000, Val Accuracy: 1.0000
Test Loss: 0.0000, Test Accuracy: 1.0000
Model training completed.


In [12]:
!cat adv-bckdr-5-epoch-1/adv-bckdr-5-epoch-1.log | grep Test

Test Loss: 0.0000, Test Accuracy: 1.0000
