<a href="https://colab.research.google.com/github/jithinraj9895/steganalysis/blob/second/Image_steagnalysis_SiaNET_MYMODEL_30.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [2]:
from datetime import datetime
import logging
import os
import random
from PIL import Image
import cv2
import numpy as np
import torch
import random
import os
import os.path as osp

def set_random_seed(seed=None):
    """Sets random seed for reproducibility.

    Args:
        seed (int, optional): Random seed.
    """
    if seed is None:
        seed = (
                os.getpid()
                + int(datetime.now().strftime("%S%f"))
                + int.from_bytes(os.urandom(2), "big")
        )
        logger = logging.getLogger(__name__)
        logger.info('Using a generated random seed {}'.format(seed))
    random.seed(seed)
    np.random.seed(seed)
    torch.set_rng_state(torch.manual_seed(seed).get_state())


def get_random_seed():
    return np.random.randint(2 ** 31)


In [3]:



class RandomRot(object):

    def __call__(self, sample):
        rot = random.randint(0, 3)
        return {
            'image': np.rot90(sample['image'], rot, axes=[-3, -2]).copy(),
            'label': sample['label'],
        }


class RandomFlip(object):

    def __init__(self, p=0.5):
        self._p = p

    def __call__(self, sample):
        if random.random() < self._p:
            return {
                'image': np.flip(sample['image'], axis=-2).copy(),
                'label': sample['label'],
            }
        else:
            return sample


class ToTensor(object):

    def __call__(self, sample):
        image, label = sample['image'], sample['label']
        if image.ndim == 3:  # HxWxC
            image = image.transpose(2, 0, 1)
        else:  # NxHxWxC
            image = image.transpose(0, 3, 1, 2)
        return {
            'image': torch.from_numpy(image).type(torch.FloatTensor),
            'label': torch.tensor(label).long()
        }

In [4]:
from torch.utils.data import Dataset


class CoverStegoDataset(Dataset):

    def __init__(self, cover_dir, stego_dir, transform=None):
        self._transform = transform

        self.images, self.labels = self.get_items(cover_dir, stego_dir)

    def __len__(self):
        return len(self.images)

    def __getitem__(self, idx):
        image = np.array(Image.open(self.images[idx]))
        image = np.expand_dims(image, 2)  # (H, W, C)
        assert image.ndim == 3

        sample = {
            'image': image,
            'label': self.labels[idx]
        }

        if self._transform:
            sample = self._transform(sample)
        return sample

    @staticmethod
    def get_items(cover_dir, stego_dir):
        images, labels = [], []

        cover_names = sorted(os.listdir(cover_dir))
        if stego_dir is not None:
            stego_names = sorted(os.listdir(stego_dir))
            assert cover_names == stego_names

        file_names = cover_names
        if stego_dir is None:
            dir_to_label = [(cover_dir, 0), ]
        else:
            dir_to_label = [(cover_dir, 0), (stego_dir, 1)]
        for image_dir, label in dir_to_label:
            for file_name in file_names:
                image_path = osp.join(image_dir, file_name)
                if not osp.isfile(image_path):
                    raise FileNotFoundError('{} not exists'.format(image_path))
                images.append(image_path)
                labels.append(label)

        return images, labels

In [5]:
import torchvision
from torch.utils.data import BatchSampler
from torch.utils.data import DataLoader
from torch.utils.data import Sampler
from torch.utils.data import SequentialSampler
import itertools
import logging
import math

class TrainingSampler(Sampler):

    def __init__(self, size, seed=None, shuffle=True):
        self._size = size
        self._shuffle = shuffle

        if seed is None:
            seed = get_random_seed()
        self._seed = seed

    def __iter__(self):
        yield from itertools.islice(self._infinite_indices(), 0, None, 1)

    def _infinite_indices(self):
        g = torch.Generator()
        g.manual_seed(self._seed)
        while True:
            if self._shuffle:
                yield from torch.randperm(self._size, generator=g)
            else:
                yield from torch.arange(self._size)




class BalancedBatchSampler(BatchSampler):

    def __init__(self, sampler, group_ids, batch_size):
        """
        Args:
            sampler (Sampler): Base sampler.
            group_ids (list[int]): If the sampler produces indices in range [0, N),
                `group_ids` must be a list of `N` ints which contains the group id of each
                sample. The group ids must be a set of integers in [0, num_groups).
            batch_size (int): Size of mini-batch.
        """
        if not isinstance(sampler, Sampler):
            raise ValueError("sampler should be an instance of torch.utils.data.Sampler, "
                             "but got sampler={}".format(sampler))

        self._sampler = sampler
        self._group_ids = np.asarray(group_ids)
        assert self._group_ids.ndim == 1
        self._batch_size = batch_size
        groups = np.unique(self._group_ids).tolist()
        assert batch_size % len(groups) == 0

        # buffer the indices of each group until batch size is reached
        self._buffer_per_group = {k: [] for k in groups}
        self._group_size = batch_size // len(groups)

    def __iter__(self):
        for idx in self._sampler:
            group_id = self._group_ids[idx]
            self._buffer_per_group[group_id].append(idx)
            if all(len(v) >= self._group_size for k, v in self._buffer_per_group.items()):
                idxs = []
                # Collect across all groups
                for k, v in self._buffer_per_group.items():
                    idxs.extend(v[:self._group_size])
                    del v[:self._group_size]

                idxs = np.random.permutation(idxs)
                yield idxs

    def __len__(self):
        raise NotImplementedError("len() of GroupedBatchSampler is not well-defined.")


def build_train_loader(cover_dir, stego_dir, batch_size=32, num_workers=0):
    transform = torchvision.transforms.Compose([
        RandomRot(),
        RandomFlip(),
        ToTensor(),
    ])
    dataset = CoverStegoDataset(cover_dir, stego_dir, transform)

    size = len(dataset)
    sampler = TrainingSampler(size)
    if stego_dir is not None:
        batch_sampler = BalancedBatchSampler(sampler, dataset.labels, batch_size)
    else:
        batch_sampler = BatchSampler(sampler, batch_size, drop_last=False)
    epoch_length = math.ceil(size / batch_size)


    train_loader = DataLoader(
        dataset,
        batch_sampler=batch_sampler,
        num_workers=num_workers,
        worker_init_fn=worker_init_reset_seed,
    )
    return train_loader, epoch_length




def build_val_loader(cover_dir, stego_dir, batch_size=32, num_workers=0):
    transform = torchvision.transforms.Compose([
        ToTensor(),
    ])
    dataset = CoverStegoDataset(cover_dir, stego_dir, transform)

    sampler = SequentialSampler(dataset)
    batch_sampler = BatchSampler(sampler, batch_size, drop_last=False)


    test_loader = DataLoader(
        dataset,
        batch_sampler=batch_sampler,
        num_workers=num_workers,
    )
    return test_loader

def worker_init_reset_seed(worker_id):
    set_random_seed(np.random.randint(2 ** 31) + worker_id)




In [6]:
set_random_seed()

In [7]:
 train_cover_dir = "/content/drive/MyDrive/real_boss_256/train/cover"
 train_stego_dir = "/content/drive/MyDrive/real_boss_256/train/stego"
 val_cover_dir = "/content/drive/MyDrive/real_boss_256/validation/cover"
 val_stego_dir = "/content/drive/MyDrive/real_boss_256/validation/stego"
 
 
 train_loader, epoch_length = build_train_loader(
        train_cover_dir, train_stego_dir, batch_size=32,
        num_workers=0)

In [8]:
print(epoch_length)

375


In [9]:
val_loader = build_val_loader(
    val_cover_dir, val_stego_dir, batch_size=32,
    num_workers=0
)

In [10]:
train_loader_iter = iter(train_loader)

In [12]:
def activation(x):
    a = torch.ones(x.shape).cuda()
    a = torch.mul(a, -3).cuda()
    b = torch.ones(x.shape).cuda()
    b = torch.mul(b, 3).cuda()
    c = torch.where(x > -3,x,a).cuda()
    d = torch.where(c < 3,c,b).cuda()
    return d     


class the_tlu(nn.Module):
    def __init__(self):
        super(the_tlu, self).__init__()
    def forward(self, x):
        return activation(x)

In [11]:
import numpy as np
import torch
import torch.nn.functional as F
from torch import nn
from torch.nn import Parameter


srm_dir = "/content/drive/MyDrive"
SRM_npy = np.load(os.path.join(srm_dir,"SRM_Kernels.npy"))


def accuracy(outputs, labels):
    _, argmax = torch.max(outputs, 1)
    return (labels == argmax.squeeze()).float().mean()


def conv3x3(in_planes, out_planes, stride=1, groups=1, dilation=1):
    return nn.Conv2d(in_planes, out_planes, kernel_size=3, stride=stride,
                     padding=dilation, groups=groups, bias=False, dilation=dilation)


def conv1x1(in_planes, out_planes, stride=1):
    return nn.Conv2d(in_planes, out_planes, kernel_size=1, stride=stride, bias=False)


class SRMConv2d(nn.Module):

    def __init__(self, stride=1, padding=0):
        super(SRMConv2d, self).__init__()
        self.in_channels = 1
        self.out_channels = 30
        self.kernel_size = (5, 5)
        if isinstance(stride, int):
            self.stride = (stride, stride)
        else:
            self.stride = stride
        if isinstance(padding, int):
            self.padding = (padding, padding)
        else:
            self.padding = padding
        self.dilation = (1, 1)
        self.transpose = False
        self.output_padding = (0,)
        self.groups = 1
        self.weight = Parameter(torch.Tensor(30, 1, 5, 5), requires_grad=True)
        self.bias = Parameter(torch.Tensor(30), requires_grad=True)
        self.reset_parameters()

    def reset_parameters(self):
        self.weight.data.numpy()[:] = SRM_npy
        self.bias.data.zero_()

    def forward(self, input):
        return F.conv2d(input, self.weight, self.bias, self.stride, self.padding,
                        self.dilation, self.groups)


class BlockA(nn.Module):

    def __init__(self, in_planes, out_planes, norm_layer=None):
        super(BlockA, self).__init__()

        if norm_layer is None:
            norm_layer = nn.BatchNorm2d

        self.conv1 = conv3x3(in_planes, out_planes)
        self.bn1 = norm_layer(out_planes)
        self.conv2 = conv3x3(out_planes, out_planes)
        self.bn2 = norm_layer(out_planes)

        self.relu = nn.ReLU(inplace=True)

    def forward(self, x):
        identity = x

        out = self.conv1(x)
        out = self.bn1(out)
        out = self.relu(out)

        out = self.conv2(out)
        out = self.bn2(out)

        out += identity
        out = self.relu(out)

        return out


class BlockB(nn.Module):

    def __init__(self, in_planes, out_planes, norm_layer=None):
        super(BlockB, self).__init__()

        if norm_layer is None:
            norm_layer = nn.BatchNorm2d

        self.conv1 = conv3x3(in_planes, out_planes, stride=2)
        self.bn1 = norm_layer(out_planes)
        self.conv2 = conv3x3(out_planes, out_planes)
        self.bn2 = norm_layer(out_planes)
        # self.pool = nn.AvgPool2d(3, stride=2, padding=1)

        self.shortcut_conv = conv1x1(in_planes, out_planes, stride=2)
        self.shortcut_bn = norm_layer(out_planes)

        self.relu = nn.ReLU(inplace=True)

    def forward(self, x):
        identity = x

        out = self.conv1(x)
        out = self.bn1(out)
        out = self.relu(out)

        out = self.conv2(out)
        out = self.bn2(out)
        # out = self.pool(out)

        identity = self.shortcut_conv(identity)
        identity = self.shortcut_bn(identity)

        out += identity
        out = self.relu(out)

        return out

In [13]:





class KeNet(nn.Module):

    def __init__(self, norm_layer=None, zero_init_residual=True, p=0.5):
        super(KeNet, self).__init__()

        self.zero_init_residual = zero_init_residual

        if norm_layer is None:
            norm_layer = nn.BatchNorm2d

        self.srm = SRMConv2d(1, 0)
        self.bn1 = norm_layer(30)
        self.tlu = the_tlu()

        self.A1 = BlockA(30, 30, norm_layer=norm_layer)
        self.A2 = BlockA(30, 30, norm_layer=norm_layer)
        self.AA = BlockA(30, 30, norm_layer=norm_layer)

        # self.B1 = BlockB(30, 30, norm_layer=norm_layer)
        # self.B2 = BlockB(30, 64, norm_layer=norm_layer)

        self.B3 = BlockB(30, 64, norm_layer=norm_layer)
        self.A3 = BlockA(64, 64, norm_layer=norm_layer)

        self.B4 = BlockB(64, 128, norm_layer=norm_layer)
        self.A4 = BlockA(128, 128, norm_layer=norm_layer)

        self.avgpool = nn.AdaptiveAvgPool2d((1, 1))
        # self.bnfc = nn.BatchNorm1d(128)
        self.relu = nn.ReLU(inplace=True)
        # self.fcfusion = nn.Linear(128, 128) #4
        self.fc = nn.Linear(128 * 4 + 1, 2)
        self.dropout = nn.Dropout(p=p)

        self.reset_parameters()

    def reset_parameters(self):
        for m in self.modules():
            if isinstance(m, nn.Conv2d):
                # nn.init.xavier_uniform_(m.weight)
                # nn.init.constant_(m.bias, 0.2)
                nn.init.kaiming_normal_(m.weight, mode='fan_out', nonlinearity='relu')
            elif isinstance(m, nn.BatchNorm2d):
                nn.init.constant_(m.weight, 1)
                nn.init.constant_(m.bias, 0)
            elif isinstance(m, nn.Linear):
                nn.init.normal_(m.weight, std=0.01)

        if self.zero_init_residual:
            for m in self.modules():
                if isinstance(m, (BlockA, BlockB)):
                    nn.init.constant_(m.bn2.weight, 0)

    def extract_feat(self, x):
        x = x.float()
        out = self.srm(x)
        out = self.bn1(out)
        out = self.tlu(out)
    #    out = self.relu(out)

        out = self.A1(out)
        out = self.A2(out)
        out = self.AA(out)

        # out = self.B1(out)
        # out = self.B2(out)

        out = self.B3(out)
        out = self.A3(out)

        out = self.B4(out)
        out = self.A4(out)

        out = self.avgpool(out)
        out = out.view(out.size(0), out.size(1))

        # out = self.relu(out)
        # out = self.bnfc(out)

        return out

    def forward(self, *args):
        ############# statistics fusion start #############
        feats = torch.stack(
            [self.extract_feat(subarea) for subarea in args], dim=0
        )

        euclidean_distance = F.pairwise_distance(feats[0], feats[1], eps=1e-6,
                                                 keepdim=True)

        if feats.shape[0] == 1:
            final_feat = feats.squeeze(dim=0)
        else:
            # feats_sum = feats.sum(dim=0)
            # feats_sub = feats[0] - feats[1]
            feats_mean = feats.mean(dim=0)
            feats_var = feats.var(dim=0)
            feats_min, _ = feats.min(dim=0)
            feats_max, _ = feats.max(dim=0)

            '''feats_sum = feats.sum(dim=0)
            feats_sub = abs(feats[0] - feats[1])
            feats_prod = feats.prod(dim=0)
            feats_max, _ = feats.max(dim=0)'''
            
            #final_feat = torch.cat(
            #    [feats[0], feats[1], feats[0], feats[1]], dim=-1
            #    #[euclidean_distance, feats_sum, feats_sub, feats_prod, feats_max], dim=-1
            #)

            final_feat = torch.cat(
                [euclidean_distance, feats_mean, feats_var, feats_min, feats_max], dim=-1
                #[euclidean_distance, feats_sum, feats_sub, feats_prod, feats_max], dim=-1
            )

        out = self.dropout(final_feat)
        # out = self.fcfusion(out)
        # out = self.relu(out)
        out = self.fc(out)

        return out, feats[0], feats[1]


In [14]:
import torch
import torch.nn as nn
import torch.nn.functional as F


class ContrastiveLoss(nn.Module):

    def __init__(self, margin=1.25):  # margin=2
        super(ContrastiveLoss, self).__init__()
        self.margin = margin

    def forward(self, output1, output2, label):
        label = label.to(torch.float32)
        euclidean_distance = F.pairwise_distance(output1, output2)
        loss_contrastive = torch.mean(
            (1 - label) * torch.pow(euclidean_distance, 2) +
            label * torch.pow(torch.clamp(self.margin - euclidean_distance, min=0.0), 2)
        )

        return loss_contrastive

In [15]:
criterion_1 = nn.CrossEntropyLoss()
criterion_2 = ContrastiveLoss(margin=1)

In [16]:
net = KeNet();

In [17]:
def get_default_device():
    """Pick GPU if available, else CPU"""
    if torch.cuda.is_available():
        return torch.device('cuda')
    else:
        return torch.device('cpu')

device = get_default_device()


if device.type == "cuda":
    net.cuda()
    criterion_1.cuda()
    criterion_2.cuda()

In [18]:
from torch.optim.adamax import Adamax

optimizer = Adamax(net.parameters(), lr=0.001, eps=1e-08, weight_decay=0.0001)

In [19]:
scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(optimizer, 'max', factor=0.3,
                                                           patience=10, verbose=True, min_lr=0,
                                                           eps=1e-08)

In [20]:
import logging
import time
import shutil
logger = logging.getLogger(__name__)


def preprocess_data(images, labels, random_crop,gpu):
    # images of shape: NxCxHxW
    if images.ndim == 5:  # 1xNxCxHxW
        images = images.squeeze(0)
        labels = labels.squeeze(0)
    h, w = images.shape[-2:]

    if random_crop:
        ch = random.randint(h * 3 // 4, h)  # h // 2      #256
        cw = random.randint(w * 3 // 4, w)  # square ch   #256

        h0 = random.randint(0, h - ch)  # 128
        w0 = random.randint(0, w - cw)  # 128
    else:
        ch, cw, h0, w0 = h, w, 0, 0


    cw = cw & ~1
    inputs = [
            images[..., h0:h0 + ch, w0:w0 + cw // 2],
            images[..., h0:h0 + ch, w0 + cw // 2:w0 + cw]
        ]

    if gpu:
        inputs = [x.cuda() for x in inputs]
        labels = labels.cuda()
    return inputs, labels

In [21]:
def train(epoch):
    net.train()
    running_loss, running_accuracy = 0., 0.

    for batch_idx in range(epoch_length):
        data = next(train_loader_iter)
        inputs, labels = preprocess_data(data['image'], data['label'], False,gpu=True)

        optimizer.zero_grad()
   
        outputs, feats_0, feats_1 = net(*inputs)

            # count parameters start
            # print('parameters_count: {}'.format(sum(p.numel() for p in net.parameters() if p.requires_grad)))
            # count parameters end

        loss = criterion_1(outputs, labels) + \
                   0.1 * criterion_2(feats_0, feats_1, labels)

        accuracy1 = accuracy(outputs, labels).item()
        running_accuracy += accuracy1
        running_loss += loss.item()
        loss.backward()
        optimizer.step()
        if (batch_idx + 1) % 10 == 0:
            running_accuracy /= 10
            running_loss /= 10
            
            print(
                'Train epoch: {} [{}/{}]\tAccuracy: {:.2f}%\tLoss: {:.6f}'.format(
                    epoch, batch_idx + 1, epoch_length, 100 * running_accuracy,
                    running_loss))
                    
            ###############################log per log_interval start
            is_best=False
            save_checkpoint(
                {
                    'iteration': batch_idx + 1,
                    'state_dict': net.state_dict(),
                    'best_prec1': running_accuracy,
                    'optimizer': optimizer.state_dict(),
                },
                is_best,
                filename=os.path.join("/content/drive/MyDrive/dataset", 'checkpoint.pth.tar'),
                best_name=os.path.join("/content/drive/MyDrive/dataset", 'model_best.pth.tar'))
            ###############################
            running_loss = 0.
            running_accuracy = 0.
            net.train()

In [22]:
def valid():
    net.eval()
    valid_loss = 0.
    valid_accuracy = 0.
    with torch.no_grad():
        for data in val_loader:
            inputs, labels = preprocess_data(data['image'], data['label'], False,gpu = True)

            outputs, feats_0, feats_1 = net(*inputs)
            valid_loss += criterion_1(outputs, labels).item() + \
                              0.01 * criterion_2(feats_0, feats_1, labels)
            valid_accuracy += accuracy(outputs, labels).item()
    valid_loss /= len(val_loader)
    valid_accuracy /= len(val_loader)
    print('Test set: Loss: {:.4f}, Accuracy: {:.2f}%)'.format(
        valid_loss, 100 * valid_accuracy))
    return valid_loss, valid_accuracy


In [None]:
fine_tune = "/content/drive/MyDrive/dataset/checkpoint.pth.tar"

if fine_tune is not None:
    net.load_state_dict(torch.load(fine_tune)['state_dict'], strict=False)
    e = torch.load(fine_tune)['iteration']

In [None]:
print(e)

430


In [None]:
def save_checkpoint(state, is_best, filename, best_name):
    torch.save(state, filename)
    if is_best:
        shutil.copyfile(filename, best_name)


_time = time.time()
best_accuracy = 0.
for e in range(1, 500 + 1):
    logger.info('Epoch: {}'.format(e))
    logger.info('Train')
    train(e)
    logger.info('Time: {}'.format(time.time() - _time))
    logger.info('Test')
    _, accuracy1 = valid()
    if isinstance(scheduler, torch.optim.lr_scheduler.ReduceLROnPlateau):
        scheduler.step(accuracy1)
    else:
        scheduler.step()
    if accuracy1 > best_accuracy:
        best_accuracy = accuracy1
        is_best = True
    else:
        is_best = False
    logger.info('Best accuracy: {}'.format(best_accuracy))
    logger.info('Time: {}'.format(time.time() - _time))
    save_checkpoint(
        {
            'epoch': e,
            'state_dict': net.state_dict(),
            'best_prec1': accuracy1,
            'optimizer': optimizer.state_dict(),
        },
        is_best,
        filename=os.path.join("/content/drive/MyDrive/dataset", 'checkpoint.pth.tar'),
        best_name=os.path.join("/content/drive/MyDrive/dataset", 'model_best.pth.tar'))


Train epoch: 1 [10/375]	Accuracy: 47.81%	Loss: 0.798852
Train epoch: 1 [20/375]	Accuracy: 49.38%	Loss: 0.753677
Train epoch: 1 [30/375]	Accuracy: 49.69%	Loss: 0.755620
Train epoch: 1 [40/375]	Accuracy: 51.56%	Loss: 0.740197
Train epoch: 1 [50/375]	Accuracy: 53.44%	Loss: 0.726227
Train epoch: 1 [60/375]	Accuracy: 47.19%	Loss: 0.728326
Train epoch: 1 [70/375]	Accuracy: 49.06%	Loss: 0.726461
Train epoch: 1 [80/375]	Accuracy: 56.25%	Loss: 0.722203
Train epoch: 1 [90/375]	Accuracy: 51.88%	Loss: 0.725162
Train epoch: 1 [100/375]	Accuracy: 45.62%	Loss: 0.729622
Train epoch: 1 [110/375]	Accuracy: 49.38%	Loss: 0.728570
Train epoch: 1 [120/375]	Accuracy: 51.56%	Loss: 0.723312
Train epoch: 1 [130/375]	Accuracy: 52.50%	Loss: 0.721978
Train epoch: 1 [140/375]	Accuracy: 52.81%	Loss: 0.721002
Train epoch: 1 [150/375]	Accuracy: 50.31%	Loss: 0.721050
Train epoch: 1 [160/375]	Accuracy: 52.81%	Loss: 0.724792
Train epoch: 1 [170/375]	Accuracy: 49.06%	Loss: 0.728729
Train epoch: 1 [180/375]	Accuracy: 50.00