In [None]:
import os
import numpy as np
import pandas as pd

class AlterationNumericalAttack:
 
    def __init__(self, watermarked_data_path, attack_proportions=None, dataset='covertype', p=3, perturbed_attribute = 'Cover_Type', random_seed=10000):
      
        self.watermarked_data_path = watermarked_data_path
        self.attack_proportions = attack_proportions if attack_proportions is not None else [0.2, 0.4, 0.6, 0.8, 1.0]
        self.p = p
        self.random_seed = random_seed
        self.perturbed_attribute = perturbed_attribute
        self.dataset = dataset
        np.random.seed(self.random_seed)
       
        self.watermarked_data = pd.read_csv(self.watermarked_data_path)
        
    def apply_attack(self, proportion, save_path):
       
        temp = self.watermarked_data.copy()
        indices = np.random.choice(len(temp), size=int(proportion * len(temp)), replace=False)
        perturb_values = np.random.uniform(-self.p, self.p, size=len(indices))  # 扰动值
        perturb_values = perturb_values.round(0)
        perturb_choices = np.arange(1, 8)  # 假设 CoverType 的值范围是 1 到 7
        perturb_values = np.random.choice(perturb_choices, size=len(indices))

        temp.loc[indices, self.perturbed_attribute] = perturb_values
        
        temp.to_csv(save_path, index=False)

    def execute(self, save_path):
        """执行所有扰动攻击并保存文件"""
        for proportion in self.attack_proportions:
            self.apply_attack(proportion, save_path)



# test robustness
attack_range = [3]
attack_proportions = [1.0]

seeds = range(10000, 10100)
dataset = "covertype"

watermark_schemes = ['original']
original_file_path = "dataset/covtype_with_key.subset.data"

for watermark_scheme in watermark_schemes:
    for p in attack_range:
            for attack_proportion in attack_proportions:
                for seed in seeds:
                    watermarked_data_path = original_file_path
                    save_path = f"{watermark_scheme}_dataset/{watermark_scheme}_{dataset}_{seed}_{p}_{attack_proportion}.csv"
                    AlterationNumericalAttack(watermarked_data_path, attack_proportions=[attack_proportion], dataset=dataset, p=p, random_seed=seed).execute(save_path)
        
        
        
    

In [None]:
import sys
import os
import string
import numpy as np
import json

from watermarking_schemes.B2Mark import WatermarkDetection
import pandas as pd

def generate_secret_keys(n, length=6):
    np.random.seed(99)  # 设置随机种子
    charset = list(string.ascii_letters + string.digits)  # 转换成字符列表
    secret_key_1s = [''.join(np.random.choice(charset, size=length)) for _ in range(n)]
    secret_key_2s = [''.join(np.random.choice(charset, size=length)) for _ in range(n)]
    return secret_key_1s, secret_key_2s

secret_key_1s, secret_key_2s = generate_secret_keys(30)


seed_range = range(10000,10100)
dataset = "covertype"
g = 6

z_scoress = []

original_data_path = "dataset/covtype_with_key.subset.data"
original_data = pd.read_csv(original_data_path)

detected_watermarks = [] 

for idx, seed in enumerate(seed_range):
    file_path = f"original_dataset/original_covertype_{seed}_3_1.0.csv"
    b2Mark_detection = WatermarkDetection(dataset=dataset, seed=10000 , g=g, secret_key_1=secret_key_1s[0], secret_key_2=secret_key_2s[0], watermark_information="1010110011", threshold=3)
    detected_watermark = b2Mark_detection.run_detection(file_path)   
    detected_watermarks.append(detected_watermark)
    print(f"{seed} detected watermark: {detected_watermark}")
    z_scores = b2Mark_detection.get_z_scores()

# 保存路径（和检测数据文件同目录）
result_path = os.path.join("result_false_positive.json")

# 如果文件已存在，先加载内容；否则初始化为空 dict
if os.path.exists(result_path):
    with open(result_path, 'r') as f:
        result_data = json.load(f)
else:
    result_data = {}

# 更新或添加 "B2Mark" 键值
result_data["B2Mark"] = detected_watermarks

# 写回文件
with open(result_path, 'w') as f:
    json.dump(result_data, f, indent=4)



In [None]:
import sys
import os
import string
import numpy as np
import json

from watermarking_schemes.GAHSW import WatermarkDetection

dataset = "covertype"

def generate_secret_keys(n, length=6):
    np.random.seed(99)  # 设置随机种子
    charset = list(string.ascii_letters + string.digits)  # 转换成字符列表
    secret_key_1s = [''.join(np.random.choice(charset, size=length)) for _ in range(n)]
    secret_key_2s = [''.join(np.random.choice(charset, size=length)) for _ in range(n)]
    return secret_key_1s, secret_key_2s

secret_key_1s, secret_key_2s = generate_secret_keys(30)

detected_watermarks = []
for idx, seed in enumerate(range(10000, 10100)):
    original_data_path = f"original_dataset/original_covertype_{seed}_3_1.0.csv"
    gahsw_detection = WatermarkDetection(dataset=dataset, watermark_information="1010110011", seed=10000, Ks=secret_key_1s[0])
    detected_watermark = gahsw_detection.run_detection(file_path=f"GAHSW_dataset/GAHSW_{dataset}_{10000}.npy", detected_data=original_data_path) 
    print(f"Detected Watermark for seed {seed}: {detected_watermark}")
    detected_watermarks.append(detected_watermark)
    

# 保存路径（和检测数据文件同目录）
result_path = os.path.join("result_false_positive.json")

# 如果文件已存在，先加载内容；否则初始化为空 dict
if os.path.exists(result_path):
    with open(result_path, 'r') as f:
        result_data = json.load(f)
else:
    result_data = {}

# 更新或添加 "B2Mark" 键值
result_data["GAHSW"] = detected_watermarks

# 写回文件
with open(result_path, 'w') as f:
    json.dump(result_data, f, indent=4)
    

In [None]:
import sys
import os
import string
import numpy as np
import json

from watermarking_schemes.SCPW import WatermarkDetection

dataset = "covertype"

def generate_secret_keys(n, length=6):
    np.random.seed(99)  # 设置随机种子
    charset = list(string.ascii_letters + string.digits)  # 转换成字符列表
    secret_key_1s = [''.join(np.random.choice(charset, size=length)) for _ in range(n)]
    secret_key_2s = [''.join(np.random.choice(charset, size=length)) for _ in range(n)]
    return secret_key_1s, secret_key_2s

secret_key_1s, secret_key_2s = generate_secret_keys(30)

detected_watermarks = []
for idx, seed in enumerate(range(10000, 10100)):
    original_data_path = f"original_dataset/original_covertype_{seed}_3_1.0.csv"
    scpw_detection = WatermarkDetection(dataset=dataset, watermark_information="1010110011", seed=10000, Ks=secret_key_1s[1])
    detected_watermark = scpw_detection.run_detection(file_path=f"SCPW_dataset/SCPW_{dataset}_{10000}.npy", detected_data=original_data_path) 
    detected_watermarks.append(detected_watermark)
    print(f"Detected Watermark for seed {seed}: {detected_watermark}")

# 保存路径（和检测数据文件同目录）
result_path = os.path.join("result_false_positive.json")

# 如果文件已存在，先加载内容；否则初始化为空 dict
if os.path.exists(result_path):
    with open(result_path, 'r') as f:
        result_data = json.load(f)
else:
    result_data = {}

# 更新或添加 "B2Mark" 键值
result_data["SCPW"] = detected_watermarks

# 写回文件
with open(result_path, 'w') as f:
    json.dump(result_data, f, indent=4)
    

In [None]:
import json
import matplotlib.pyplot as plt
from matplotlib import cm

method_order = ['B2Mark', 'GAHSW', 'SCPW']

# 读取数据
with open('result_false_positive.json', 'r') as f:
    data = json.load(f)

targets = {
    'B2Mark': '0000000000',
    'GAHSW': '0000000000',
    'SCPW': '0101001100'
}

sample_sizes = list(range(10, 101, 10))
proportions = {method: [] for method in method_order}

for method in method_order:
    if method not in data:
        continue
    samples = data[method]
    target = targets[method]
    for n in sample_sizes:
        subset = samples[:n]
        count_non_target = sum(1 for s in subset if s != target)
        proportions[method].append(count_non_target / n)