# Initialize

In [None]:
import numpy as np
from scipy.linalg import hadamard
from google.colab import drive
import tensorflow as tf
from skimage.util import random_noise
from scipy.ndimage import gaussian_filter
from PIL import Image
import cv2 as cv
import io
from scipy.ndimage import shift as my_shift_attack
import torch



drive.mount('/content/drive')
%run '/content/drive/MyDrive/Colab Notebooks/CNN_WavePattern_Noise/Fourier_Test/Watermarking_Classes.ipynb'

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


# Watermarking

In [None]:
class Watermarking:
    def __init__(self, im, watermark_bits):
        self.im = im
        self.watermark_bits = watermark_bits

    def embedding(self, G, pn_array):
        # Compose watermark from first two bits (assumed 0 or 1)
        watermark = self.watermark_bits[0] * 2 + self.watermark_bits[1]
        watermarked = self._data_embedding(watermark, G, pn_array)
        return watermarked

    def reconstruction(self, pn_array):
        reconstructed_watermark = self._data_reconstruction(self.im, pn_array)
        bits_array = np.vstack((reconstructed_watermark // 2, reconstructed_watermark % 2)).T

        #reconstructed_bits = reconstructed_watermark // 2, reconstructed_watermark % 2
        return bits_array

    def _data_embedding(self, watermark, G, pn_array):
        if watermark > pn_array.shape[0] - 1:
            return np.zeros(self.im.shape)

        w = self.im + G * pn_array[watermark]
        w = np.clip(w, 0, 1)
        return w

    def _data_reconstruction(self, watermarked_im, pn_array):
        M = pn_array.shape[0]
        correlation_coefficients = np.zeros(M)

        for i in range(M):
            pn = pn_array[i]
            correlation_coefficients[i] = np.abs(np.mean(watermarked_im * pn))

        return np.argmax(correlation_coefficients)


# DCT Watermarking

In [None]:
import numpy as np
import cv2 as cv

class DCTWatermarking:
    def __init__(self, im, watermark_bits):
        self.im = im
        self.watermark_bits = watermark_bits  # should be length-2 binary array


    def embedding(self, g, pn_array):
        # Map the 2-bit watermark to an integer index (0-3)
        watermark_index = self.watermark_bits[0] * 2 + self.watermark_bits[1]
        pn = pn_array[watermark_index]

        # Convert image and PN pattern to float32
        _im = np.float32(self.im)
        _pn = np.float32(pn)

        # Apply DCT
        dct_im = cv.dct(_im)
        dct_pn = cv.dct(_pn)

        # Embed watermark
        watermarked_dct = dct_im + g * dct_pn

        # Inverse DCT to get watermarked image
        watermarked_img = cv.idct(watermarked_dct)
        return np.clip(watermarked_img, 0, 255).astype(np.uint8)


    def reconstruction(self, pn_array):
        _watermarked = np.float32(self.im)
        dct_watermarked = cv.dct(_watermarked)

        correlation_coefficients = []

        for i in range(len(pn_array)):
            pn = np.float32(pn_array[i])
            dct_pn = cv.dct(pn)
            corr = np.vdot(dct_watermarked, dct_pn)
            correlation_coefficients.append(np.abs(corr))  # use abs to be safe

        reconstructed_watermark = np.argmax(correlation_coefficients)
        bits_array = np.array([reconstructed_watermark // 2, reconstructed_watermark % 2])
        return bits_array


# PN Sequences

In [None]:
class PNSequences:
  def __init__(self, im):
    self.im = im


  def created_traditional_pn_array(self):
    return np.random.choice([1, -1], size=(4, self.im.shape[0], self.im.shape[0]))


  def created_hadamard_pn_array(self):
    pn_size = self.im.shape[0]
    H = hadamard(pn_size)
    pn = np.random.choice([1, -1], size=(pn_size, pn_size))

    pn_array = np.zeros((4, pn_size, pn_size))
    pn_array[0] = pn * H[0]
    pn_array[1] = pn * H[1]
    pn_array[2] = pn * H[2]
    pn_array[3] = pn * H[3]

    return pn_array


  def created_advanced_pn_array(self, G):

    image_size = self.im.shape[0]
    pn_array = np.zeros((4 , image_size, image_size))

    index = 0
    while(True):

      pn = np.random.choice([-1, 1], size=(image_size, image_size))
      r = self._check_statistical_properties(pn, G)

      if(r == True):
        pn_array[index] = pn
        index += 1

      if(index == 4):
        break

    return pn_array


  def _check_statistical_properties(self, pn, G):

    image_size = self.im.shape[0]
    r = np.abs(np.mean(self.im * pn))

    if(r > G): # Advanced SSIS
      return False

    return True




# CNN Watermarking

In [None]:
class CNNWatermarking:
    def __init__(self, im, mask_ratio):
        self.im = im
        self.mask_ratio = mask_ratio

    def embedding(self, watermark_bits):
        image_size = self.im.shape[0]

        # Create PN pattern
        data = Data(1, image_size, self.mask_ratio)
        r = watermark_bits[0] * 2 + watermark_bits[1]

        freq_map = {0: 5, 1: 10, 2: 15, 3: 20}
        freq = freq_map.get(r, 5)  # default to 5 if invalid r

        pn = data.created_wave_pattern(freq)

        # Watermarking
        tr = Transmitter(self.im, pn, self.mask_ratio)
        watermarked = tr.Watermarking()

        return watermarked


    def reconstruction(self, model):
      watermarked = self.im

      # Reconstruct PN
      re = Receiver(watermarked, self.mask_ratio)
      reconstructed_pn = re.PN_Reconstruction()  # shape (128, 128)

      # Prepare input for model (NCHW)
      input_data = reconstructed_pn[np.newaxis, ..., np.newaxis]  # (1, 128, 128, 1)
      input_data = np.transpose(input_data, (0, 3, 1, 2))        # (1, 1, 128, 128)

      input_tensor = torch.tensor(input_data, dtype=torch.float32)

      model.eval()
      with torch.no_grad():
          output = model(input_tensor)
          predicted_class = torch.argmax(output, dim=1).item()

      return np.array([predicted_class // 2, predicted_class % 2])


    def reconstruction_with_confidence_score(self, model):
      watermarked = self.im

      # Reconstruct PN
      re = Receiver(watermarked, self.mask_ratio)
      reconstructed_pn = re.PN_Reconstruction()  # shape (128, 128)

      # Prepare input for model (NCHW)
      input_data = reconstructed_pn[np.newaxis, ..., np.newaxis]  # (1, 128, 128, 1)
      input_data = np.transpose(input_data, (0, 3, 1, 2))        # (1, 1, 128, 128)
      input_tensor = torch.tensor(input_data, dtype=torch.float32)

      model.eval()
      with torch.no_grad():
          output = model(input_tensor)                   # shape (1, 4)
          probs = torch.softmax(output, dim=1)           # (1, 4)
          predicted_class = torch.argmax(probs, dim=1).item()
          confidence = torch.max(probs, dim=1).values.item()

      bits = np.array([predicted_class // 2, predicted_class % 2])
      return bits, confidence




# Channel

In [7]:
class Channel:
    def __init__(self, im):
        self.im = im


    def x_shift_attack(self, x):
      shifted_image = my_shift_attack(self.im, shift=(0, x))  # shift x pixels along x-axis
      return shifted_image


    def rotation_attack(self, angle):

      h, w = self.im.shape
      center = (w // 2, h // 2)

      # Get rotation matrix
      M = cv.getRotationMatrix2D(center, angle, 1.0)

      # Apply affine warp
      rotated = cv.warpAffine(self.im, M, (w, h), flags = cv.INTER_LINEAR, borderMode = cv.BORDER_REFLECT_101)

      return rotated


    def add_guassian_noise(self, input_mean, standard_deviation):
      noise = np.random.normal(input_mean, standard_deviation, self.im.shape)

        # Apply Gaussian blur to create correlated noise
      correlated_noise = gaussian_filter(noise, sigma=1)

      noisy_image = self.im + correlated_noise
      noisy_image = np.clip(noisy_image, 0, 1)
      return noisy_image


    def add_random_noise(self , noise_strength):

      noise = np.random.normal(loc=0, scale=1, size=self.im.shape)  # Generate standard normal noise
      noise = (noise - np.min(noise)) / (np.max(noise) - np.min(noise))  # Normalize noise to [0,1]
      noise = 2 * noise_strength * (noise - 0.5)  # Scale to [-noise_strength, noise_strength]

      # Apply Gaussian blur to create correlated noise
      correlated_noise = gaussian_filter(noise, sigma=1)

      # Add noise to the image and clip to valid range
      image_noisy = self.im + correlated_noise
      image_noisy = np.clip(image_noisy, 0, 1)
      return image_noisy


    def add_salt_and_pepper_noise(self, noise_ratio):
      noisy_image = self.im.copy()

      if len(noisy_image.shape) == 2:
          height, width = noisy_image.shape
      elif len(noisy_image.shape) == 3 and noisy_image.shape[2] == 1:
          height, width = noisy_image.shape[:2]
      else:
          raise ValueError("Expected grayscale image with shape (H, W) or (H, W, 1)")

      total_pixels = height * width
      num_noisy = int(noise_ratio * total_pixels)

      # Generate random pixel coordinates
      coords = np.unravel_index(
          np.random.choice(total_pixels, num_noisy, replace=False),
          (height, width)
      )

      # Apply noise
      half = num_noisy // 2
      noisy_image[coords[0][:half], coords[1][:half]] = np.random.uniform(0.0, 0.2)      # Pepper
      noisy_image[coords[0][half:], coords[1][half:]] = np.random.uniform(0.8, 1.0)     # Salt

      return noisy_image



    def blur_image(self, filter_size):
      blurred = cv.blur(self.im, (filter_size, filter_size))
      return blurred


    def crop_image(self, crop_size):
      height, width = self.im.shape
      scale = np.sqrt(crop_size)
      new_height = int(height * scale)
      new_width = int(width * scale)
      start_y = (height - new_height) // 2
      start_x = (width - new_width) // 2
      cropped_img = self.im[start_y:start_y+new_height, start_x:start_x+new_width]


      output_size=(self.im.shape[0], self.im.shape[0])
      resized = cv.resize(cropped_img, output_size, interpolation=cv.INTER_AREA)
      return resized


    def jpeg_compress_grayscale_image(self, quality):

      # Ensure the image is in 8-bit format
      if self.im.max() <= 1.0:
          self.im = (self.im * 255).astype(np.uint8)
      else:
          self.im = self.im.astype(np.uint8)

      # Convert NumPy array to PIL grayscale image
      # pil_image = Image.fromarray(self.im, mode='L')  # 'L' = 8-bit grayscale
      pil_image = Image.fromarray(self.im).convert('L')

      # Compress the image using JPEG in memory
      buffer = io.BytesIO()
      pil_image.save(buffer, format='JPEG', quality=quality)
      buffer.seek(0)

      # Reload the image from buffer
      compressed_pil = Image.open(buffer).convert('L')  # Ensure grayscale
      compressed_np = np.array(compressed_pil)

      return compressed_np



    def brightness_attack(self , brightness_value):
      """
      Applies a brightness attack to a normalized grayscale image.

      Parameters:
      - image (np.ndarray): Normalized grayscale image (values in [0.0, 1.0], dtype float32 or float64).
      - brightness_value (float): Amount to increase brightness (e.g., 0.1 for a mild attack).

      Returns:
      - np.ndarray: Brightness-attacked image, clipped to [0.0, 1.0], same dtype as input.
      """
      if not np.issubdtype(self.im.dtype, np.floating):
          raise ValueError("Input image must be of float type with values in [0.0, 1.0]")

      # Add brightness and clip
      bright_image = self.im + brightness_value
      bright_image = np.clip(bright_image, 0.0, 1.0)

      return bright_image.astype(self.im.dtype)


    def contrast_attack(self, contrast_factor):
      """
      Applies contrast adjustment to a normalized grayscale image (values in [0, 1]).

      Parameters:
      - image: 2D numpy array (normalized grayscale image)
      - contrast_factor: float
          - >1.0 increases contrast
          - <1.0 decreases contrast (e.g., 0.5 reduces contrast)
          - =1.0 leaves image unchanged

      Returns:
      - contrast_adjusted: 2D numpy array, same shape as input, clipped to [0, 1]
      """
      #if not np.all((image >= 0) & (image <= 1)):
        #raise ValueError("Input image must be normalized to the range [0, 1].")


      mean_intensity = np.mean(self.im)
      contrast_adjusted = (self.im - mean_intensity) * contrast_factor + mean_intensity
      contrast_adjusted = np.clip(contrast_adjusted, 0, 1)

      return contrast_adjusted
