In [1]:
!pip3 install duckduckgo_search



In [22]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import torchvision.transforms as transforms
from torch.utils.data import DataLoader
from torch.utils.data import Dataset
from torchvision.datasets import ImageFolder
import os
import cv2
import requests
from PIL import Image
from io import BytesIO
from duckduckgo_search import DDGS
import numpy as np
import hashlib
import matplotlib.pyplot as plt

In [3]:


def embed_watermark(original_img, signature, strength=0.1):
    # Convert to YCbCr and extract Y channel
    img_ycbcr = cv2.cvtColor(original_img, cv2.COLOR_BGR2YCrCb)
    y_channel = img_ycbcr[:, :, 0].astype(np.float32)

    # Generate watermark sequence using hash of signature
    seed = int(hashlib.sha256(signature.encode()).hexdigest(), 16) % (2**32)
    np.random.seed(seed)
    watermark = np.random.randn(*y_channel.shape)  # Gaussian sequence


    # Split into 8x8 blocks and apply DCT
    watermarked = y_channel.copy()
    h, w = y_channel.shape
    for i in range(0, h, 8):
        for j in range(0, w, 8):
            block = y_channel[i:i+8, j:j+8]
            dct_block = cv2.dct(block)

            # Select mid-frequency coefficients (example: indices 5-20 in zigzag order)
            mask = np.zeros((8, 8), dtype=bool)
            mask.flat[5:20] = True  # Adjust based on JND thresholds

            # Embed watermark into selected coefficients
            dct_block[mask] += strength * watermark[i:i+8, j:j+8][mask]

            # Inverse DCT
            watermarked_block = cv2.idct(dct_block)
            watermarked[i:i+8, j:j+8] = watermarked_block

    # Reconstruct YCbCr and convert back to BGR
    img_ycbcr[:, :, 0] = np.clip(watermarked, 0, 255)
    watermarked_img = cv2.cvtColor(img_ycbcr, cv2.COLOR_YCrCb2BGR)
    return watermarked_img.astype(np.uint8)

def detect_multiple_watermarks(test_img, candidate_signatures, strength=0.1, threshold=3.0):
    img_ycbcr = cv2.cvtColor(test_img, cv2.COLOR_BGR2YCrCb)
    y_channel = img_ycbcr[:, :, 0].astype(np.float32)

    best_q = -np.inf
    best_index = -1

    for sig_idx, signature in enumerate(candidate_signatures):
        # Regenerate watermark for this signature
        seed = int(hashlib.sha256(signature.encode()).hexdigest(), 16) % (2**32)
        np.random.seed(seed)
        watermark = np.random.randn(*y_channel.shape)

        # Compute correlation
        correlations = []
        h, w = y_channel.shape
        for i in range(0, h, 8):
            for j in range(0, w, 8):
                block = y_channel[i:i+8, j:j+8]
                dct_block = cv2.dct(block)
                mask = np.zeros((8, 8), dtype=bool)
                mask.flat[5:20] = True
                selected_coeffs = dct_block[mask]
                selected_watermark = watermark[i:i+8, j:j+8][mask]
                if len(selected_coeffs) > 0:
                    corr = np.dot(selected_coeffs.flatten(), selected_watermark.flatten())
                    correlations.append(corr)

        # Calculate test statistic
        n = len(correlations)
        if n == 0:
            continue
        mean_corr = np.mean(correlations)
        std_corr = np.std(correlations)
        q = (mean_corr / std_corr) * np.sqrt(n)

        # Track best match
        if q > best_q:
            best_q = q
            best_index = sig_idx

    # Determine result
    detected = best_q > threshold
    return {
        'detected': detected,
        'best_match_index': best_index if detected else -1,
        'best_match_q': best_q,
        'all_q_values': [best_q if i == best_index else 0 for i in range(len(candidate_signatures))]
    }

def detect_watermark(test_img):

    test_img = np.array(test_img)
    test_img = test_img[..., ::-1]

    # Candidate signatures (up to 3)
    candidates = [
        "ChatGPT",
        "Stable Diffusion",
        "Midjourney"
    ]

    # Detect watermarks
    result = detect_multiple_watermarks(test_img, candidates, threshold=3.0)

    print(f"Watermark Detected: {result['detected']}")
    if result['detected']:
        print(f"Matched Signature Index: {result['best_match_index']}")
        print(f"Matched Signature Text: {candidates[result['best_match_index']]}")
        print(f"Confidence (q-value): {result['best_match_q']:.2f}")

    print("\nAll Q-values:")
    for idx, q in enumerate(result['all_q_values']):
        print(f"Signature {idx} ({candidates[idx]}): {q:.2f}")

#create a tamper classifier that determines if a watermark image has been tampered or not.
# First we have to fetch images from the internet, watermark them, and lastly apply a tamper.


queries = ["nature landscapes", "forest scenery","mountain views", "sunset over lake", "green meadows","rivers and waterfalls", "autumn trees", "desert landscapes", "snowy mountains", "wildlife in nature"]
num_images = 1600
num_test_images = 160
output_dir = "wm_image_classes7"
test_dir = "test_images"
train_dir = "train_images"
os.makedirs(output_dir, exist_ok=True)

"""
def download_image(link):
    try:
        response = requests.get(link, timeout = 5)
        print(response)
        return Image.open(BytesIO(response.content))
    except:
        return None
"""

def download_image(link):
    try:
        response = requests.get(link, timeout=5)
        image_array = np.asarray(bytearray(response.content), dtype=np.uint8)
        img = cv2.imdecode(image_array, cv2.IMREAD_COLOR)  # BGR format
        return img
    except Exception as e:
        print("Error:", e)
        return None

def apply_watermark(img, signature):
    #since dct doesn not support odd blocks, the image dimensions need to be even.
    #height, width = img.shape[:2]
    #if height % 2  != 0:
    #    height += 1
    #if width % 2 != 0:
    #    width += 1

    img = cv2.resize(img, (256, 256))
    watermarked_image = embed_watermark(img,signature,0.1)
    return watermarked_image

def apply_tampering(img, type):
    if type == "crop":
        #crop the image
        width, height = img.shape[:2]
        return img[int(height * 0.1):int(height * 0.9), int(width * 0.1):int(width * 0.9)]

    if type == "resize":
        resized_img = cv2.resize(img, (img.shape[1] // 2, img.shape[0] // 2))
        resized_img_back = cv2.resize(resized_img, (img.shape[1], img.shape[0]))
        return resized_img_back

    if type == "no_tamper":
        return img


def get_train():
    with DDGS() as ddgs:
        results = ddgs.images("nature", max_results = 300)
        for q in queries:
            results.extend(ddgs.images(q, max_results = 300))
        count = 0
        tamper_to_do = 0
        tamper_type = ["crop","resize","no_tamper","no_tamper"]
        for res in results:
            url = res.get("image")
            img = download_image(url)
            if img is not None:
                wm_img = apply_watermark(img, "ChatGPT")

                tamp_wm_img = apply_tampering(wm_img, tamper_type[tamper_to_do])
                if tamper_type[tamper_to_do] == "no_tamper":
                    tamper_folder = os.path.join(output_dir, train_dir, "no_tamper")
                else:
                    tamper_folder = os.path.join(output_dir, train_dir, "tampered")
                #tamper_folder = os.path.join(output_dir, train_dir, tamper_type[tamper_to_do])
                os.makedirs(tamper_folder, exist_ok=True)
                cv2.imwrite(os.path.join(tamper_folder, f"image_{count}.jpg"), tamp_wm_img)
                count += 1
                print(count)
                tamper_to_do += 1
                tamper_to_do %= len(tamper_type)
                if count > num_images:
                    return

def get_test():
    with DDGS() as ddgs:
        results = ddgs.images("nature", max_results = 100)
        for q in queries:
            results.extend(ddgs.images(q, max_results = 100))
        count = 0
        tamper_to_do = 0
        tamper_type = ["crop","resize","no_tamper","no_tamper"]
        for res in results:
            url = res.get("image")
            img = download_image(url)
            if img is not None:
                wm_img = apply_watermark(img, "ChatGPT")

                tamp_wm_img = apply_tampering(wm_img, tamper_type[tamper_to_do])
                if tamper_type[tamper_to_do] == "no_tamper":
                    tamper_folder = os.path.join(output_dir, test_dir, "no_tamper")
                else:
                    tamper_folder = os.path.join(output_dir, test_dir, "tampered")
                os.makedirs(tamper_folder, exist_ok=True)
                cv2.imwrite(os.path.join(tamper_folder, f"image_{count}.jpg"), tamp_wm_img)
                count += 1
                print(count)

                tamper_to_do += 1
                tamper_to_do %= len(tamper_type)
                if count > num_test_images:
                    return





In [5]:
get_train()


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
Error: HTTPSConnectionPool(host='www.usnews.com', port=443): Read timed out. (read timeout=5)
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
Error: HTTPConnectionPool(host='webneel.com', port=80): Read timed out. (read timeout=5)
178
179
180
181
182
183
184
185
186
187
188
189
190
Error: HTTPSConnectionPool(host='webneel.com', port=443): Max retries exceeded with url: /daily/sites/default/files/images/daily/11-2018/nature-photography-aaronreed

In [None]:

input_dir = "images"
output_dir = "images"
#input_dir = "wm_image_classes7/test_images/no_tamper"
# Output directory to save the DCT-transformed data
#output_dir = "wm_im_classes7_dct/test_images/no_tamper"

os.makedirs(output_dir, exist_ok=True)

def convert_to_dct(img_path):
    img = cv2.imread(img_path, cv2.IMREAD_GRAYSCALE)
    if img is None:
        return None

    # Resize to standard shape (optional, useful for CNNs)
    img = cv2.resize(img, (256, 256))

    # Convert image to float32 for DCT
    img_float = np.float32(img)

    # Apply DCT
    dct = cv2.dct(img_float)
    return dct

# Loop through all images
for img_name in os.listdir(input_dir):
    img_path = os.path.join(input_dir, img_name)

    dct_result = convert_to_dct(img_path)
    if dct_result is not None:
        # Save DCT array as .npy file
        output_path = os.path.join(output_dir, img_name.split('.')[0] + ".npy")
        np.save(output_path, dct_result)
output_dir = "wm_image_classes6"

In [4]:
get_test()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
Error: HTTPSConnectionPool(host='www.usnews.com', port=443): Read timed out. (read timeout=5)
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161


In [32]:
class DCTFolderDataset(Dataset):
    def __init__(self, root_dir, transform=None):
        self.root_dir = root_dir
        self.transform = transform

        self.samples = []  # (path, label) pairs
        self.classes = sorted(os.listdir(root_dir))
        self.class_to_idx = {cls: idx for idx, cls in enumerate(self.classes)}

        for cls in self.classes:
            cls_folder = os.path.join(root_dir, cls)
            for fname in os.listdir(cls_folder):
                if fname.endswith(".npy"):
                    self.samples.append((os.path.join(cls_folder, fname), self.class_to_idx[cls]))

    def __len__(self):
        return len(self.samples)

    def __getitem__(self, idx):
        path, label = self.samples[idx]
        dct_data = np.load(path)

        # Normalize and expand dims
        dct_data = (dct_data - dct_data.mean()) / (dct_data.std() + 1e-8)
        dct_data = np.expand_dims(dct_data, axis=0)  # shape [1, H, W]

        if self.transform:
            dct_data = self.transform(dct_data)

        return torch.tensor(dct_data, dtype=torch.float32), torch.tensor(label, dtype=torch.long)

In [70]:
class DCTCNN(nn.Module):
    def __init__(self):
        super(DCTCNN, self).__init__()
        self.conv1 = nn.Conv2d(1, 16, kernel_size=3, padding=1)
        self.bn1 = nn.BatchNorm2d(16)
        self.pool = nn.MaxPool2d(2, 2)
        self.conv2 = nn.Conv2d(16, 32, kernel_size=3, padding=1)
        self.bn2 = nn.BatchNorm2d(32)
        self.dropout = nn.Dropout(0.5)
        self.fc1 = nn.Linear(32 * 64 * 64, 2)  # assuming 256x256 input, 2 classes

    def forward(self, x):
        x = self.pool(F.relu(self.bn1(self.conv1(x))))
        x = self.pool(F.relu(self.bn2(self.conv2(x))))
        x = x.view(x.size(0), -1)
        x = self.dropout(x)
        x = self.fc1(x)
        return x

In [None]:



# Load datasets
train_dataset = DCTFolderDataset("wm_im_classes7_dct/train_images")
test_dataset = DCTFolderDataset("wm_im_classes7_dct/test_images")


train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False)

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

model = DCTCNN().to(device)
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=1e-3)

# stop training early due to overfitting.
for epoch in range(10):
    model.train()
    running_loss = 0.0
    correct = 0
    total = 0

    for inputs, targets in train_loader:
        inputs, targets = inputs.to(device), targets.to(device)

        optimizer.zero_grad()
        outputs = model(inputs)
        loss = criterion(outputs, targets)
        loss.backward()
        optimizer.step()

        running_loss += loss.item()

        # Calculate accuracy
        _, predicted = torch.max(outputs, 1)  # Get predicted class
        total += targets.size(0)
        correct += (predicted == targets).sum().item()

    epoch_loss = running_loss / len(train_loader)
    epoch_acc = correct / total * 100

    print(f"Epoch {epoch+1}, Loss: {epoch_loss:.4f}, Accuracy: {epoch_acc:.2f}%")



Epoch 1, Loss: 3.3916, Accuracy: 79.98%


In [66]:
torch.save(model.state_dict, "model.pth")

In [None]:
from sklearn.metrics import accuracy_score

def evaluate_model(model, test_loader, device):
    model.eval()  # Set the model to evaluation mode
    all_labels = []
    all_preds = []

    with torch.no_grad():  # No gradients are calculated during evaluation
        for inputs, labels in test_loader:
            inputs, labels = inputs.to(device), labels.to(device)  # Move data to the device (GPU/CPU)

            # Forward pass
            outputs = model(inputs)

            # Get the predicted class
            _, preds = torch.max(outputs, 1)

            all_labels.extend(labels.cpu().numpy())  # Collect labels
            all_preds.extend(preds.cpu().numpy())  # Collect predictions

    # Calculate accuracy
    accuracy = accuracy_score(all_labels, all_preds)

    print(f'Accuracy: {accuracy * 100:.2f}%')

    from sklearn.metrics import classification_report
    print("Classification Report:\n", classification_report(all_labels, all_preds))

# Assuming the test dataset is in the test_loader and model is trained
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
evaluate_model(model, test_loader, device)

Accuracy: 99.38%
Classification Report:
               precision    recall  f1-score   support

           0       1.00      0.99      0.99        80
           1       0.99      1.00      0.99        81

    accuracy                           0.99       161
   macro avg       0.99      0.99      0.99       161
weighted avg       0.99      0.99      0.99       161



In [68]:
import torch
import cv2
import numpy as np
from torchvision import transforms

def prepare_image(image_path, transform=None):
    """Load and transform the image for the model using OpenCV."""
    # Load image using OpenCV (default is BGR color format)
    image = cv2.imread(image_path, cv2.IMREAD_GRAYSCALE)

    # Convert the image from BGR to RGB (since OpenCV loads as BGR by default)
    #image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
    image = Image.fromarray(image)
    # Apply the transformations (resize, normalization, etc.)
    if transform:
        image = transform(image)

    # Add a batch dimension (model expects a batch, even if it's just one image)
    image = image.unsqueeze(0)  # Shape becomes [1, C, H, W]

    return image

def predict_image(model, image_path, device, transform=None):
    """Predict the class of an image."""
    model.eval()  # Set the model to evaluation mode

    # Prepare the image
    image = prepare_image(image_path, transform)

    # Move image to the device (GPU or CPU)
    image = image.to(device)

    # Forward pass through the model
    with torch.no_grad():  # No gradient calculation
        outputs = model(image)

    # Get the predicted class
    print(outputs)
    _, predicted_class = torch.max(outputs, 1)

    return predicted_class.item()  # Return the class index as a Python integer

# Example usage:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)  # Make sure the model is on the right device

# Define the transformations (same ones as used during training)
transform = transforms.Compose([
    transforms.Resize((256, 256)),  # Resize to the same dimensions used in training
    transforms.ToTensor(),  # Convert to tensor
    # Optional: You can add normalization here if used in training
])

# Path to the image you want to test
image_path = "tampered.jpeg"

# Get prediction for the image
predicted_class = predict_image(model, image_path, device, transform)

# Output the predicted class
print(f"Predicted class index: {predicted_class}")


tensor([[ 30.8837, -30.8914]])
Predicted class index: 0


In [None]:
transform = transforms.Compose([
    transforms.ToTensor(),
])

img_path = "image_25.jpg"
img = cv2.imread(img_path)

img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)  # Convert from BGR to RGB
  # Resize to match model's expected input

transform = transforms.Compose([
    transforms.ToPILImage(),
    transforms.Resize((128, 128)),  # Match the input size used in training
    transforms.ToTensor(),
])

input_tensor = transform(img).unsqueeze(0)  # Add batch dimension
class_names = ["not_Tampered", "Tampered"]

with torch.no_grad():
    output = model(input_tensor)
    print(output)
    predicted = torch.argmax(output, 1)
    class_id = predicted.item()
    print("Prediction:", class_names[class_id])

tensor([[0.7001, 4.2581]])
Prediction: Tampered
