In [6]:
import torch
import io 
import os
import time
import numpy as np
import skimage
from skimage import io
def energy(magnitude_spectrum):
    rows, cols = magnitude_spectrum.shape
    center_row, center_col = rows // 2, cols // 2

    # 计算频谱图像的半径范围
    radius_max = min(center_row, center_col)
    radius_range = np.arange(7, radius_max)  # 7 5  9

    # 统计每个相邻半径 r 和 r+1 范围内的能量差占相邻半径 r 和 r-1 范围内的能量差的相对占比
    energy_ratios = []
    prev_energy_diff = np.sum(magnitude_spectrum)  # 初始的能量差为总能量
    for i in range(1, len(radius_range) - 1):
        mask_r_minus_1 = np.zeros_like(magnitude_spectrum, dtype=np.uint8)
        mask_r = np.zeros_like(magnitude_spectrum, dtype=np.uint8)
        mask_r_plus_1 = np.zeros_like(magnitude_spectrum, dtype=np.uint8)
        y, x = np.ogrid[-center_row:rows - center_row, -center_col:cols - center_col]
        
        mask_r_minus_1_area = np.logical_and(np.abs(x) <= radius_range[i - 1], np.abs(y) <= radius_range[i - 1])
        mask_r_area = np.logical_and(np.abs(x) <= radius_range[i], np.abs(y) <= radius_range[i])
        mask_r_plus_1_area = np.logical_and(np.abs(x) <= radius_range[i + 1], np.abs(y) <= radius_range[i + 1])
        
        mask_r_minus_1[mask_r_minus_1_area] = 1
        mask_r[mask_r_area] = 1
        mask_r_plus_1[mask_r_plus_1_area] = 1
        
        energy_r_minus_1 = np.sum(magnitude_spectrum[mask_r_minus_1 == 1])
        energy_r = np.sum(magnitude_spectrum[mask_r == 1])
        energy_r_plus_1 = np.sum(magnitude_spectrum[mask_r_plus_1 == 1])
        
        energy_diff_r =  energy_r_minus_1-energy_r
        energy_diff_r_plus_1 = energy_r-energy_r_plus_1
        
        energy_ratio = energy_diff_r_plus_1 / energy_diff_r if energy_diff_r != 0 else 0
        
        energy_ratios.append(energy_ratio)
        prev_energy_diff = energy_diff_r_plus_1
        
    return radius_range,energy_ratios
########################################原始模型
# import torch.nn as nn

# class BinaryClassifier(nn.Module):
#     def __init__(self, input_size):
#         super(BinaryClassifier, self).__init__()
#         self.fc1 = nn.Linear(input_size, 64)
#         self.fc2 = nn.Linear(64, 32)
#         self.fc3 = nn.Linear(32, 1)
#         self.sigmoid = nn.Sigmoid()

#     def forward(self, x):
#         x = self.fc1(x)
#         x = self.fc2(x)
#         x = self.fc3(x)
#         x = self.sigmoid(x)
#         return x
###################################添加BN层的网络
import torch.nn as nn

class BinaryClassifierWithBN(nn.Module):
    def __init__(self, input_size):
        super(BinaryClassifierWithBN, self).__init__()
        self.fc1 = nn.Linear(input_size, 64)
        self.bn1 = nn.BatchNorm1d(64)  # Batch Normalization layer
        self.fc2 = nn.Linear(64, 32)
        self.bn2 = nn.BatchNorm1d(32)  # Batch Normalization layer
        self.fc3 = nn.Linear(32, 1)
        self.sigmoid = nn.Sigmoid()

    def forward(self, x):
        x = self.fc1(x)
        x = self.bn1(x.view(-1, 64))  # Reshape before BatchNorm
        x = self.sigmoid(x)
        
        x = self.fc2(x)
        x = self.bn2(x.view(-1, 32))  # Reshape before BatchNorm
        x = self.sigmoid(x)
        
        x = self.fc3(x)
        x = self.sigmoid(x)
        
        return x


# 定义模型和优化器
input_size =21  # 输入特征的维度，即能量占比数据的长度
model = BinaryClassifierWithBN(input_size)
model= torch.load("./BinaryClassifier.pth")
model.eval()

def data_clean1(data_dir1):
    global k
    k=0
    global sum
    sum=0
    import torch.optim as optim
    import numpy as np 
    import cv2
    from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score
    true_labels = []
    pred_probs = []
    from collections import Counter
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    path_list=os.listdir(data_dir1)######按顺序输出
    path_list.sort() 
    from skimage import io
    all_predictions = []
    num_images_to_read = 500  # 读取的图像数量
    image_count = 0  # 记录已读取的图像数量

    for class_name in path_list:
        class_path = os.path.join(data_dir1, class_name)

        if os.path.isfile(class_path):
            os.remove(class_path)
        for img_name in os.listdir(class_path):
            if image_count >= num_images_to_read:
                break  # 已读取足够数量的图像，停止循环

            img_path = os.path.join(class_path, img_name)
            if not img_name.endswith(".png"):
                os.remove(img_path)
                
            image_path = str(img_path)
            # print(image_path)
            img = io.imread(image_path)
            image_count += 1
            (B, G, R) = cv2.split(img)
            start = time.perf_counter()
            fRadv = np.fft.fft2(R)
            fGadv = np.fft.fft2(G)
            fBadv = np.fft.fft2(B)
            fshiftRadv = np.fft.fftshift(fRadv)
            fshiftGadv = np.fft.fftshift(fGadv)
            fshiftBadv = np.fft.fftshift(fBadv)
            fadvimgR = np.log(np.abs(fshiftRadv))
            fadvimgG = np.log(np.abs(fshiftGadv))
            fadvimgB = np.log(np.abs(fshiftBadv))

            # 计算每个通道的能量占比向量
            radius_range_adv, energy_advratios_R = energy(fadvimgR)
            radius_range_adv, energy_advratios_G = energy(fadvimgG)
            radius_range_adv, energy_advratios_B = energy(fadvimgB)

            # 调整能量占比向量的形状为相同的长度
            max_length = max(len(energy_advratios_R), len(energy_advratios_G), len(energy_advratios_B))
            energy_advratios_R = np.pad(energy_advratios_R, (0, max_length - len(energy_advratios_R)))
            energy_advratios_G = np.pad(energy_advratios_G, (0, max_length - len(energy_advratios_G)))
            energy_advratios_B = np.pad(energy_advratios_B, (0, max_length - len(energy_advratios_B)))

            # 合并三个通道的能量占比数据
            energy_advratios_combined = np.concatenate([energy_advratios_R, energy_advratios_G, energy_advratios_B])
            energy_advratios_combined = energy_advratios_combined.reshape((1, -1))
            # 定义简单的二分类 CNN 模型
            end = time.perf_counter()
            runTime = end - start
            # print("运行时间：", runTime)
            # Lists to store true labels and predicted probabilities

            true_labels = []
            pred_probs = []
            energy_advratios_combined= torch.tensor(energy_advratios_combined, dtype=torch.float32)
            
            # Iterate through the dataset
            with torch.no_grad():
                # print(energy_advratios_combined.shape)
                outputs = model(energy_advratios_combined)
                pred_probs.extend(outputs.numpy())
                # true_labels.extend(list1)

            # Convert predicted probabilities to binary predictions
            predictions = [1 if prob >= 0.5 else 0 for prob in pred_probs]
            # print(predictions)
            all_predictions.extend(predictions)
    # 统计0和1的个数
    predictions_counter = Counter(all_predictions)
    num_0 = predictions_counter[0]
    num_1 = predictions_counter[1]
   
    # print(f"Number of 0 predictions: {num_0}")
    # print(f"Number of 1 predictions: {num_1}")
    return num_0,num_1

def data_clean(data_dir):
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    path_list=os.listdir(data_dir)######按顺序输出
    #print( path_list)
#     path_list.sort(key=lambda x: int(x.replace("adv10","").split('.')[0]))
#     path_list.sort(key=lambda x: int(x.replace("adv_","").split('.')[0]))
#     path_list.sort(key=lambda x: int(x.replace("adv15_","").split('.')[0]))
    #path_list.sort()
    #print( path_list)
    list=[]
    lis=[]
    global sum
    sum=0
    global k
    k=0
    import torch.optim as optim
    import numpy as np 
    import cv2
    from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score
    true_labels = []
    pred_probs = []
    from collections import Counter

# 合并所有的predictions列表
    all_predictions = []
    for img_name in path_list:
        img_path = os.path.join(data_dir, img_name)
        if not img_name.endswith(".npy"):
                os.remove(img_path)
#         print(img_path)
        img=np.load(img_path)
        # print(img.shape)
        new_size = (32, 32)

# 使用OpenCV的resize函数进行调整大小
        img = cv2.resize(img, new_size)

        (B, G, R) = cv2.split(img)
        fRadv = np.fft.fft2(R)
        fGadv = np.fft.fft2(G)
        fBadv = np.fft.fft2(B)
        fshiftRadv = np.fft.fftshift(fRadv)
        fshiftGadv = np.fft.fftshift(fGadv)
        fshiftBadv = np.fft.fftshift(fBadv)
        fadvimgR = np.log(np.abs(fshiftRadv))
        fadvimgG = np.log(np.abs(fshiftGadv))
        fadvimgB = np.log(np.abs(fshiftBadv))

        # 计算每个通道的能量占比向量
        radius_range_adv, energy_advratios_R = energy(fadvimgR)
        radius_range_adv, energy_advratios_G = energy(fadvimgG)
        radius_range_adv, energy_advratios_B = energy(fadvimgB)

        # 调整能量占比向量的形状为相同的长度
        max_length = max(len(energy_advratios_R), len(energy_advratios_G), len(energy_advratios_B))
        energy_advratios_R = np.pad(energy_advratios_R, (0, max_length - len(energy_advratios_R)))
        energy_advratios_G = np.pad(energy_advratios_G, (0, max_length - len(energy_advratios_G)))
        energy_advratios_B = np.pad(energy_advratios_B, (0, max_length - len(energy_advratios_B)))

        # 合并三个通道的能量占比数据
        energy_advratios_combined = np.concatenate([energy_advratios_R, energy_advratios_G, energy_advratios_B])
        energy_advratios_combined = energy_advratios_combined.reshape((1, -1))
        # 定义简单的二分类 CNN 模型

        # Lists to store true labels and predicted probabilities
        
        true_labels = []
        pred_probs = []
        energy_advratios_combined= torch.tensor(energy_advratios_combined, dtype=torch.float32)
        # Iterate through the dataset
        with torch.no_grad():
            # print(energy_advratios_combined.shape)
            outputs = model(energy_advratios_combined)
            pred_probs.extend(outputs.numpy())
            # true_labels.extend(list1)

        # Convert predicted probabilities to binary predictions
        predictions = [1 if prob >= 0.5 else 0 for prob in pred_probs]
        
        all_predictions.extend(predictions)
    # 统计0和1的个数
    predictions_counter = Counter(all_predictions)
    num_0 = predictions_counter[0]
    num_1 = predictions_counter[1]

    # print(f"Number of 0 predictions: {num_0}")
    # print(f"Number of 1 predictions: {num_1}")
    return num_0,num_1
data_dir1="/root/data1/code/ciafr-model/val"  

data_dir="/root/data1/code/ciafr-model/adv-cifar-resnet18/100"

num_0,num_1=data_clean1(data_dir1)  #clean
num0,num1=data_clean(data_dir)  #adv

recall=num0/(num0+num1)*100
prec=num0/(num0+num_0)*100
acc=(num0+num_1)/(num0+num1+num_1+num_0)*100
f1=2*recall*prec/(recall+prec)
print('recall',recall)
print('precision', prec)
print('acc',acc)
print('F1',f1)  
print(num_0,num_1) #clean
print(num0,num1) #adv

  fadvimgR = np.log(np.abs(fshiftRadv))
  fadvimgG = np.log(np.abs(fshiftGadv))
  fadvimgB = np.log(np.abs(fshiftBadv))


recall 82.73092369477911
precision 82.73092369477911
acc 82.76553106212425
F1 82.73092369477911
86 414
412 86


In [4]:
import torch
import io 
import os
import skimage
from skimage import io
def energy(magnitude_spectrum):
    rows, cols = magnitude_spectrum.shape
    center_row, center_col = rows // 2, cols // 2

    # 计算频谱图像的半径范围
    radius_max = min(center_row, center_col)
    radius_range = np.arange(41, radius_max)   #41  34  48

    # 统计每个相邻半径 r 和 r+1 范围内的能量差占相邻半径 r 和 r-1 范围内的能量差的相对占比
    energy_ratios = []
    prev_energy_diff = np.sum(magnitude_spectrum)  # 初始的能量差为总能量
    for i in range(1, len(radius_range) - 1):
        mask_r_minus_1 = np.zeros_like(magnitude_spectrum, dtype=np.uint8)
        mask_r = np.zeros_like(magnitude_spectrum, dtype=np.uint8)
        mask_r_plus_1 = np.zeros_like(magnitude_spectrum, dtype=np.uint8)
        y, x = np.ogrid[-center_row:rows - center_row, -center_col:cols - center_col]
        
        mask_r_minus_1_area = np.logical_and(np.abs(x) <= radius_range[i - 1], np.abs(y) <= radius_range[i - 1])
        mask_r_area = np.logical_and(np.abs(x) <= radius_range[i], np.abs(y) <= radius_range[i])
        mask_r_plus_1_area = np.logical_and(np.abs(x) <= radius_range[i + 1], np.abs(y) <= radius_range[i + 1])
        
        mask_r_minus_1[mask_r_minus_1_area] = 1
        mask_r[mask_r_area] = 1
        mask_r_plus_1[mask_r_plus_1_area] = 1
        
        energy_r_minus_1 = np.sum(magnitude_spectrum[mask_r_minus_1 == 1])
        energy_r = np.sum(magnitude_spectrum[mask_r == 1])
        energy_r_plus_1 = np.sum(magnitude_spectrum[mask_r_plus_1 == 1])
        
        energy_diff_r =  energy_r_minus_1-energy_r
        energy_diff_r_plus_1 = energy_r-energy_r_plus_1
        
        energy_ratio = energy_diff_r_plus_1 / energy_diff_r if energy_diff_r != 0 else 0
        
        energy_ratios.append(energy_ratio)
        prev_energy_diff = energy_diff_r_plus_1
        
    return radius_range,energy_ratios

import torch.nn as nn
class BinaryClassifier(nn.Module):
    def __init__(self, input_size):
        super(BinaryClassifier, self).__init__()
        self.fc1 = nn.Linear(input_size, 128)
        self.fc2 = nn.Linear(128, 64)
        self.fc3 = nn.Linear(64, 32)
        self.fc4 = nn.Linear(32, 1)
        self.relu = nn.ReLU()
        self.batchnorm1 = nn.BatchNorm1d(128)  # 添加BatchNormalization
        self.batchnorm2 = nn.BatchNorm1d(64)
        self.batchnorm2 = nn.BatchNorm1d(32)
        # self.dropout = nn.Dropout(0.5)  # 添加Dropout

    def forward(self, x):
        x = x.view(-1, x.shape[1])  # 将数据从 (batch_size, 1, 207) 转换为 (batch_size, 207)
        # x = x.view(-1, x.shape[2])
        x = self.fc1(x)
        x = self.batchnorm1(x)
        x = self.relu(x)
        # x = self.dropout(x)
        x = self.fc2(x)
        x = self.batchnorm2(x)
        x = self.relu(x)
        # x = self.dropout(x)
        x = self.fc3(x)
        x = self.batchnorm3(x)
        x = self.relu(x)
        x = self.fc4(x)
        x = torch.sigmoid(x)  # 使用 torch.sigmoid 替代 self.sigmoid
        return x


# 定义模型和优化器
input_size =207  # 输入特征的维度，即能量占比数据的长度  207  228  186
model = BinaryClassifier(input_size)
list1=[0]
model= torch.load("./BinaryClassifier.pth")
model.eval()
import numpy as np
def data_clean1(data_dir1):
    global k
    k=0
    global sum
    sum=0
    import torch.optim as optim
    import numpy as np 
    import time
    import cv2
    from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score
    true_labels = []
    pred_probs = []
    from collections import Counter
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    path_list=os.listdir(data_dir1)######按顺序输出
    path_list.sort() 
    from skimage import io
    all_predictions = []
    num_images_to_read = 180  # 读取的图像数量
    image_count = 0  # 记录已读取的图像数量

    for class_name in path_list:
        class_path = os.path.join(data_dir1, class_name)

        if os.path.isfile(class_path):
            os.remove(class_path)
        for img_name in os.listdir(class_path):
            if image_count >= num_images_to_read:
                break  # 已读取足够数量的图像，停止循环

            img_path = os.path.join(class_path, img_name)
            if not img_name.endswith(".png"):
                os.remove(img_path)

            image_path = str(img_path)
            img = io.imread(image_path)
            image_count += 1
            (B, G, R) = cv2.split(img)
            start = time.perf_counter()
            fRadv = np.fft.fft2(R)
            fGadv = np.fft.fft2(G)
            fBadv = np.fft.fft2(B)
            fshiftRadv = np.fft.fftshift(fRadv)
            fshiftGadv = np.fft.fftshift(fGadv)
            fshiftBadv = np.fft.fftshift(fBadv)
            fadvimgR = np.log(np.abs(fshiftRadv))
            fadvimgG = np.log(np.abs(fshiftGadv))
            fadvimgB = np.log(np.abs(fshiftBadv))

            # 计算每个通道的能量占比向量
            radius_range_adv, energy_advratios_R = energy(fadvimgR)
            radius_range_adv, energy_advratios_G = energy(fadvimgG)
            radius_range_adv, energy_advratios_B = energy(fadvimgB)

            # 调整能量占比向量的形状为相同的长度
            max_length = max(len(energy_advratios_R), len(energy_advratios_G), len(energy_advratios_B))
            energy_advratios_R = np.pad(energy_advratios_R, (0, max_length - len(energy_advratios_R)))
            energy_advratios_G = np.pad(energy_advratios_G, (0, max_length - len(energy_advratios_G)))
            energy_advratios_B = np.pad(energy_advratios_B, (0, max_length - len(energy_advratios_B)))

            # 合并三个通道的能量占比数据
            energy_advratios_combined = np.concatenate([energy_advratios_R, energy_advratios_G, energy_advratios_B])
            energy_advratios_combined = energy_advratios_combined.reshape((1, -1))
            # 定义简单的二分类 CNN 模型

            # Lists to store true labels and predicted probabilities
            end = time.perf_counter()
            runTime = end - start
            # print("运行时间：", runTime)
            true_labels = []
            pred_probs = []
            energy_advratios_combined= torch.tensor(energy_advratios_combined, dtype=torch.float32)
            # Iterate through the dataset
            with torch.no_grad():
                # print(energy_advratios_combined.shape)
                outputs = model(energy_advratios_combined)
                pred_probs.extend(outputs.numpy())
                true_labels.extend(list1)

            # Convert predicted probabilities to binary predictions
            predictions = [1 if prob >= 0.5 else 0 for prob in pred_probs]
            all_predictions.extend(predictions)
    # 统计0和1的个数
    predictions_counter = Counter(all_predictions)
    num_0 = predictions_counter[0]
    num_1 = predictions_counter[1]

#     print(f"Number of 0 predictions: {num_0}")
#     print(f"Number of 1 predictions: {num_1}")
    return num_0,num_1

def data_clean(data_dir):
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    path_list=os.listdir(data_dir)######按顺序输出

    #print( path_list)
    list=[]
    lis=[]
    global sum
    sum=0
    global k
    k=0
    import torch.optim as optim
    import numpy as np 
    import cv2
    from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score
    true_labels = []
    pred_probs = []
    from collections import Counter

# 合并所有的predictions列表
    all_predictions = []
    for img_name in path_list:
        img_path = os.path.join(data_dir, img_name)
        if not img_name.endswith(".npy"):
                os.remove(img_path)
        # print(img_path)
        img=np.load(img_path)
        # print(img.shape)
        new_size = (224, 224)

# 使用OpenCV的resize函数进行调整大小
        img = cv2.resize(img, new_size)

        (B, G, R) = cv2.split(img)
        fRadv = np.fft.fft2(R)
        fGadv = np.fft.fft2(G)
        fBadv = np.fft.fft2(B)
        fshiftRadv = np.fft.fftshift(fRadv)
        fshiftGadv = np.fft.fftshift(fGadv)
        fshiftBadv = np.fft.fftshift(fBadv)
        fadvimgR = np.log(np.abs(fshiftRadv))
        fadvimgG = np.log(np.abs(fshiftGadv))
        fadvimgB = np.log(np.abs(fshiftBadv))

        # 计算每个通道的能量占比向量
        radius_range_adv, energy_advratios_R = energy(fadvimgR)
        radius_range_adv, energy_advratios_G = energy(fadvimgG)
        radius_range_adv, energy_advratios_B = energy(fadvimgB)

        # 调整能量占比向量的形状为相同的长度
        max_length = max(len(energy_advratios_R), len(energy_advratios_G), len(energy_advratios_B))
        energy_advratios_R = np.pad(energy_advratios_R, (0, max_length - len(energy_advratios_R)))
        energy_advratios_G = np.pad(energy_advratios_G, (0, max_length - len(energy_advratios_G)))
        energy_advratios_B = np.pad(energy_advratios_B, (0, max_length - len(energy_advratios_B)))

        # 合并三个通道的能量占比数据
        energy_advratios_combined = np.concatenate([energy_advratios_R, energy_advratios_G, energy_advratios_B])
        energy_advratios_combined = energy_advratios_combined.reshape((1, -1))
        # 定义简单的二分类 CNN 模型

        # Lists to store true labels and predicted probabilities
        
        true_labels = []
        pred_probs = []
        energy_advratios_combined= torch.tensor(energy_advratios_combined, dtype=torch.float32)
        # Iterate through the dataset
        with torch.no_grad():
            # print(energy_advratios_combined.shape)
            outputs = model(energy_advratios_combined)
            pred_probs.extend(outputs.numpy())
            true_labels.extend(list1)

        # Convert predicted probabilities to binary predictions
        predictions = [1 if prob >= 0.5 else 0 for prob in pred_probs]
        all_predictions.extend(predictions)
    # 统计0和1的个数
    predictions_counter = Counter(all_predictions)
    num_0 = predictions_counter[0]
    num_1 = predictions_counter[1]

    return num_0,num_1
data_dir1='/root/data1/code/test-data6'

data_dir="/root/data1/code/addnoise/ffdnet/ffdnet-main/feimubiao100-90" #  mubiao100-180
num_0,num_1=data_clean1(data_dir1)
num0,num1=data_clean(data_dir)  #adv
recall=num0/(num0+num1)*100
prec=num0/(num0+num_0)*100
acc=(num0+num_1)/(num0+num1+num_1+num_0)*100
f1=2*recall*prec/(recall+prec)
print('recall',recall)
print('precision', prec)
print('acc',acc)
print('F1',f1)  
print(num_0,num_1)
print(num0,num1)  #adv

recall 95.50561797752809
precision 90.42553191489363
acc 95.16728624535315
F1 92.89617486338797
9 171
85 4
