In [1]:
import numpy as np 
import torch
import matplotlib.pyplot as plt
from imageio import imread, imwrite
from torch import nn
import random

In [2]:
import sys
sys.path.append("..")
# from steganogan.decoders import DenseDecoderNLayers
# from steganogan.decoders import BasicDecoder, DenseDecoder, DenseDecoderNLayers
# from steganogan import SteganoGAN

In [3]:
import torch
import torchvision
from torch.optim import LBFGS
import torch.nn.functional as F

In [4]:
from tqdm import tqdm, trange

In [5]:
def shuffle_params(m):
    if type(m)==nn.Conv2d or type(m)==nn.BatchNorm2d:
        param = m.weight
        m.weight.data = nn.Parameter(torch.tensor(np.random.normal(0, 1, param.shape)).float())
        
        param = m.bias
        m.bias.data = nn.Parameter(torch.zeros(len(param.view(-1))).float().reshape(param.shape))
    if type(m)==nn.BatchNorm2d:
        if "track_running_stats" in m.__dict__:
            m.track_running_stats=False
#         

In [6]:
class normLayer(nn.Module):
    def __init__(self):
        super(normLayer, self).__init__()
    def forward(self, x):
        b,c,h,w = x.shape
        assert b == 1
        mean = x.view(c, -1).mean(-1)
        std = x.view(c, -1).std(-1)
        x = x - mean.reshape([1, c, 1, 1])
        x = x / (std + 1e-7).reshape([1,c,1,1])
        return x

In [7]:
class BasicDecoder(nn.Module):
    """
    The BasicDecoder module takes an steganographic image and attempts to decode
    the embedded data tensor.

    Input: (N, 3, H, W)
    Output: (N, D, H, W)
    """

    def _conv2d(self, in_channels, out_channels):
        return nn.Conv2d(
            in_channels=in_channels,
            out_channels=out_channels,
            kernel_size=3,
            padding=1
        )

    def _build_models(self):
        modules = []

        modules.append(self._conv2d(3, self.hidden_size))
        modules.append(nn.LeakyReLU(inplace=True))
        modules.append(normLayer() if self.yan_norm else nn.BatchNorm2d(self.hidden_size))

        for i in range(self.layers-1):
            modules.append(self._conv2d(self.hidden_size, self.hidden_size))
            modules.append(nn.LeakyReLU(inplace=True))
            modules.append(normLayer() if self.yan_norm else nn.BatchNorm2d(self.hidden_size))

        modules.append(self._conv2d(self.hidden_size, self.data_depth))

        self.layers = nn.Sequential(*modules)

        return [self.layers]    

    def __init__(self, data_depth, hidden_size, layers = 3, yan_norm=False):
        super().__init__()
        self.version = '1'
        self.data_depth = data_depth
        self.hidden_size = hidden_size
        self.yan_norm = yan_norm
        self.layers = layers

        self._models = self._build_models()

    def forward(self, x):
        x = self._models[0](x)

        if len(self._models) > 1:
            x_list = [x]
            for layer in self._models[1:]:
                x = layer(torch.cat(x_list, dim=1))
                x_list.append(x)

        return x

In [8]:
num_bits = 3
yan_norm = False
# models

model = BasicDecoder(num_bits, hidden_size=128, layers = 3, yan_norm=yan_norm)
model.apply(shuffle_params)
model.to('cuda')

BasicDecoder(
  (layers): Sequential(
    (0): Conv2d(3, 128, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (1): LeakyReLU(negative_slope=0.01, inplace)
    (2): BatchNorm2d(128, eps=1e-05, momentum=0.1, affine=True, track_running_stats=False)
    (3): Conv2d(128, 128, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (4): LeakyReLU(negative_slope=0.01, inplace)
    (5): BatchNorm2d(128, eps=1e-05, momentum=0.1, affine=True, track_running_stats=False)
    (6): Conv2d(128, 128, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (7): LeakyReLU(negative_slope=0.01, inplace)
    (8): BatchNorm2d(128, eps=1e-05, momentum=0.1, affine=True, track_running_stats=False)
    (9): Conv2d(128, 3, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
  )
)

In [9]:
# load image
image = "/home/vk352/FaceDetection/datasets/sample/obama2.jpg"
image = imread(image, pilmode='RGB') 
image.shape

(612, 450, 3)

In [10]:
# extract a bit vector
# image = "/home/vk352/FaceDetection/datasets/sample/obama2.jpg"
image = "/home/vk352/FaceDetection/datasets/data512x512/00001.jpg"
image = imread(image, pilmode='RGB') / 255.0
image = torch.FloatTensor(image).permute(2, 1, 0).unsqueeze(0)
image = image.to('cuda')
out = model(image)
# image = self.decoder(image).view(-1) > 0

In [11]:
target = torch.bernoulli(torch.empty(out.shape).uniform_(0, 1)).to(out.device)
# target = torch.empty(out.shape).random_(256).to(out.device)
target.shape

torch.Size([1, 3, 512, 512])

In [12]:
criterion = torch.nn.BCEWithLogitsLoss(reduction='sum')
criterion1 = torch.nn.L1Loss(reduction='sum')
criterion2 = torch.nn.MSELoss(reduction='sum')

def get_loss(outputs, target, loss_mode):
    if loss_mode == "BCE":
        loss = criterion(outputs, target)
    elif loss_mode == "log":
        loss = -(target * 2 - 1) * outputs
        loss = torch.nn.functional.softplus(loss)  # log(1+exp(x))
        loss = torch.sum(loss)
    elif loss_mode == "hingelog":
        loss = -(target * 2 - 1) * outputs
        loss = torch.nn.functional.softplus(loss)  # log(1+exp(x))
        loss = torch.max(loss-hinge, torch.zeros(target.shape).to(target.device))
        loss = torch.sum(loss)
    elif loss_mode == "L1":
        outputs = F.sigmoid(outputs) * 255
        loss = criterion1(outputs, target)
    elif loss_mode == "L2":
        outputs = F.sigmoid(outputs) * 255
        loss = criterion2(outputs, target)
    return loss

In [13]:
loss_mode = "log"
hinge = 0.3

In [14]:
# lbfgs

final_err = 0

steps = 1000
eps = 0.2
adv_image = image.clone().detach()
max_iter = 20
alpha = 0.5


adv_image = image.clone().detach()
print("alpha:", alpha)
error = []

for i in trange(steps // max_iter):
    adv_image.requires_grad = True
    optimizer = LBFGS([adv_image], lr=alpha, max_iter=max_iter)

    def closure():
        outputs = model(adv_image)
        loss = get_loss(outputs, target, loss_mode)

        optimizer.zero_grad()
        loss.backward()
        return loss

    optimizer.step(closure)
    delta = torch.clamp(adv_image - image, min=-eps, max=eps)
    adv_image = torch.clamp(image + delta, min=0, max=1)
    adv_image = torch.clamp(adv_image*255, 0, 255).int().float()/255.
    adv_image = adv_image.detach()

    if loss_mode in ["L1", "L2"]:
        err = len(torch.nonzero(torch.abs(F.sigmoid(model(adv_image)).float().view(-1)*255-target.view(-1)) > 128)) / target.numel()
    else:
        err = len(torch.nonzero((model(adv_image)>0).float().view(-1) != target.view(-1))) / target.numel()
    print("error", err)
    error.append(err)

final_err = error[-1]


  0%|          | 0/50 [00:00<?, ?it/s]

alpha: 0.5


  2%|▏         | 1/50 [00:01<01:09,  1.41s/it]

error 0.41007359822591144


  4%|▍         | 2/50 [00:02<01:07,  1.40s/it]

error 0.3041369120279948


  4%|▍         | 2/50 [00:03<01:21,  1.70s/it]


KeyboardInterrupt: 

In [17]:
# plt.figure(figsize=(16, 10))
# # plt.plot(range(1000), np.array(bceacc) * 100, label="SGD")
# # for lr, err in final_err:
# plt.plot(np.arange(1, steps // max_iter + 1) * max_iter, np.array(err[:steps // max_iter]) * 100, label=f"LBFGS lr {lr}")
# plt.legend()
# plt.ylabel("Error Rate (%)")
# plt.xlabel("iterations")
# plt.title(f"{loss_mode} loss, {num_bits} bits, yan_norm {yan_norm}")
# # plt.ylim(0, 0.01)
# plt.show()