<a href="https://www.kaggle.com/code/phoekoby/cnn-for-stegoanalysis?scriptVersionId=127501965" target="_blank"><img align="left" alt="Kaggle" title="Open in Kaggle" src="https://kaggle.com/static/images/open-in-kaggle.svg"></a>

In [4]:
!pip3 install torch torchvision

[0m

In [5]:
from collections import namedtuple

import matplotlib.pyplot as plt
import numpy as np
import PIL
import torch
import os
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim

import imageio as io
import pandas as pd

import torchvision.datasets as dset
from torch.utils.data.sampler import SubsetRandomSampler
from torchvision.io import read_image

from torch.utils.data import DataLoader
from torchvision import transforms

In [6]:
device = torch.device("cuda")

def gaussian(x):
  return torch.exp(-((x - torch.mean(x)) ** 2) / (torch.std(x)) ** 2) 

class ConvBn(nn.Module):
    
    def __init__(self, in_channels: int, out_channels: int) -> None:
        super().__init__()
        self.conv = nn.Conv2d(
            in_channels,
            out_channels,
            kernel_size=3,
            stride=1,
            padding=1,
            bias=False,
        )
        self.batch_norm = nn.BatchNorm2d(out_channels)

    def forward(self, inp):
        return self.batch_norm(self.conv(inp))


class Type1(nn.Module):

    def __init__(self, in_channels: int, out_channels: int) -> None:
        super().__init__()
        self.convbn = ConvBn(in_channels, out_channels)
        self.relu = nn.ReLU()

    def forward(self, inp):
        return self.relu(self.convbn(inp))


class Type2(nn.Module):

    def __init__(self, in_channels: int, out_channels: int) -> None:
        super().__init__()
        self.type1 = Type1(in_channels, out_channels)
        self.convbn = ConvBn(in_channels, out_channels)

    def forward(self, inp):
        return inp + self.convbn(self.type1(inp))


class Type3(nn.Module):

    def __init__(self, in_channels: int, out_channels: int) -> None:
        super().__init__()
        self.conv1 = nn.Conv2d(
            in_channels,
            out_channels,
            kernel_size=1,
            stride=2,
            padding=0,
            bias=False,
        )
        self.batch_norm = nn.BatchNorm2d(out_channels)
        self.type1 = Type1(in_channels, out_channels)
        self.convbn = ConvBn(out_channels, out_channels)
        self.pool = nn.AvgPool2d(kernel_size=3, stride=2, padding=1)

    def forward(self, inp):
        out = self.batch_norm(self.conv1(inp))
        out1 = self.pool(self.convbn(self.type1(inp)))
        return out + out1


class Type4(nn.Module):
    def __init__(self, in_channels: int, out_channels: int) -> None:
        super().__init__()
        self.type1 = Type1(in_channels, out_channels)
        self.convbn = ConvBn(out_channels, out_channels)
        self.gap = nn.AdaptiveAvgPool2d(output_size=1)

    def forward(self, inp):
        return self.gap(self.convbn(self.type1(inp)))

class Srnet(nn.Module):

    def __init__(self) -> None:
        super().__init__()
        self.type1s = nn.Sequential(Type1(1, 32), Type1(32, 8))
        self.type2s = nn.Sequential(
            Type2(8, 8),
            Type2(8, 8),
            Type2(8, 8),
            Type2(8, 8),
            Type2(8, 8),
        )
        self.type3s = nn.Sequential(
            Type3(8, 8),
            Type3(8, 32),
            Type3(32, 64),
            Type3(64, 128),
        )
        self.type4 = Type4(128, 256)
        self.dense = nn.Linear(256, 2)
        self.softmax = nn.Softmax(dim=1)

    def forward(self, inp):
        out = self.type1s(inp)
        out = self.type2s(out)
        out = self.type3s(out)
        out = self.type4(out)
        out = out.view(out.size(0), -1)
        out = self.dense(out)
        return self.softmax(out)

class Yenet(nn.Module):
    def __init__(self):
        super(Yenet, self).__init__()

        self.tlu = nn.Hardtanh(min_val=-3.0, max_val=3.0)

        self.conv2 = nn.Conv2d(30, 30, kernel_size=3, stride=1, padding=0)

        self.conv3 = nn.Conv2d(30, 30, kernel_size=3, stride=1, padding=0)

        self.conv4 = nn.Conv2d(30, 30, kernel_size=3, stride=1, padding=0)

        self.conv5 = nn.Conv2d(30, 32, kernel_size=5, stride=1, padding=0)

        self.conv6 = nn.Conv2d(32, 32, kernel_size=5, stride=1, padding=0)

        self.conv7 = nn.Conv2d(32, 32, kernel_size=5, stride=1, padding=0)

        self.conv8 = nn.Conv2d(32, 16, kernel_size=3, stride=1, padding=0)

        self.conv9 = nn.Conv2d(16, 16, kernel_size=3, stride=3, padding=0)

        self.fc = nn.Linear(16 * 3 * 3, 2)
        
        self.sf = nn.Softmax(dim=1)

    def forward(self, x):
        out = self.tlu(F.conv2d(x, srm_filters))
        out = F.relu(self.conv2(out))
        out = F.relu(self.conv3(out))
        out = F.relu(self.conv4(out))
        out = F.avg_pool2d(out, kernel_size=3, stride=2, padding=1)
        out = F.relu(self.conv5(out))
        out = F.avg_pool2d(out, kernel_size=3, stride=2, padding=0)
        out = F.relu(self.conv6(out))
        out = F.avg_pool2d(out, kernel_size=3, stride=2, padding=0)
        out = F.relu(self.conv7(out))
        out = F.avg_pool2d(out, kernel_size=3, stride=2, padding=0)
        out = F.relu(self.conv8(out))
        out = F.relu(self.conv9(out))
        out = out.view(out.size(0), -1)
        out = self.fc(out)
        out = self.sf(out)
        return out

class SteganalysisCNN(nn.Module):
    def __init__(self):
        super(SteganalysisCNN, self).__init__()
        # self.conv1 = nn.Conv2d(3, 16, 3, padding=1)
        # self.conv2 = nn.Conv2d(16, 32, 3, padding=1)
        # self.conv3 = nn.Conv2d(32, 64, 3, padding=1)
        # self.fc1 = nn.Linear(64 * 8 * 8, 256)
        # self.fc2 = nn.Linear(256, 2)


        self.conv1 = nn.Conv2d(1, 16, 3, padding=1)
        self.bn1 = nn.BatchNorm2d(16)
        self.conv2 = nn.Conv2d(16, 32, 3, padding=1)
        self.bn2 = nn.BatchNorm2d(32)
        self.conv3 = nn.Conv2d(32, 32, 3, padding=1)
        self.dr1 = nn.Dropout2d(p=0.1)
        self.bn3 = nn.BatchNorm2d(32)
        self.conv4 = nn.Conv2d(32, 16, 3, padding=1)
        self.bn4 = nn.BatchNorm2d(16)
        self.conv5 = nn.Conv2d(16, 8, 3, padding=1)
        self.bn5 = nn.BatchNorm2d(8)
        self.conv6 = nn.Conv2d(8, 16, 3, padding=1)
        self.bn6 = nn.BatchNorm2d(16)
        self.conv7 = nn.Conv2d(16, 16, 3, padding=1)
        self.bn7 = nn.BatchNorm2d(16)
        self.conv8 = nn.Conv2d(8, 16, 3, padding=1)
        self.dr2 = nn.Dropout2d(p=0.5)
        self.bn8 = nn.BatchNorm2d(16)
        self.fc1 = nn.Linear(16 * 16 * 16, 128)
        self.dr3 = nn.Dropout(p=0.3)
        self.fc2 = nn.Linear(128, 32)
        self.dr4 = nn.Dropout(p=0.3)
        self.fc3 = nn.Linear(32, 2)
        self.sfm = nn.Softmax(dim = 1)

    def forward(self, x):
        # x = F.relu(self.conv1(x))
        # x = F.max_pool2d(x, 4)
        # x = F.relu(self.conv2(x))
        # x = F.max_pool2d(x, 4)
        # x = F.relu(self.conv3(x))
        # x = F.max_pool2d(x, 4)
        # x = x.view(-1, 64 * 8 * 8)
        # x = F.relu(self.fc1(x))
        # x = self.fc2(x)


        # x = F.relu(self.bn1(self.conv1(x)))
        x = gaussian(self.bn1(self.conv1(x)))
        x = F.max_pool2d(x, 2)
        # x = F.relu(self.bn2(self.conv2(x)))
        x = gaussian(self.bn2(self.conv2(x)))
        x = F.max_pool2d(x, 2)
        # x = F.relu(self.bn3(self.conv3(x)))
        x = gaussian(self.bn3(self.conv3(x)))
        # x = self.dr1(x)
        x = F.max_pool2d(x, 2)
        x = F.relu(self.bn4(self.conv4(x)))
        x = F.max_pool2d(x, 2)
        x = F.relu(self.bn5(self.conv5(x)))
        x = F.max_pool2d(x, 2)
        # x = F.relu(self.bn6(self.conv6(x)))
        # x = F.max_pool2d(x, 2)
        # x = F.relu(self.bn7(self.conv7(x)))
        # x = F.max_pool2d(x, 2)
        x = F.relu(self.bn8(self.conv8(x)))
        # x = self.dr2(x)
        x = x.view(-1, 16 * 16 * 16)
        x = F.relu(self.fc1(x))
        # x = self.dr3(x)
        x = self.fc2(x)
        x = self.sfm(x)
        return x

class CustomDataset:
    def __init__(self, img_dir,amount, transform=None, target_transform=None):
        self.img_dir = img_dir
        self.transform = transform
        self.target_transform = target_transform
        self.amount = amount

    def __len__(self):
        return self.amount

    def __getitem__(self, idx):
        image = None
        num = int(idx / 2) + 1
        label = idx % 2
        if label == 0:
            image = PIL.Image.open(self.img_dir + "cover/" +  str(num) + ".pgm")
        else:
            image = PIL.Image.open(self.img_dir + "stego/" +  str(num) + ".pgm")
        if self.transform:
            image = self.transform(image)
        if self.target_transform:
            label = self.target_transform(label)
        idx += 1
        return (image, label)

data_train = CustomDataset('/kaggle/input/boss-base/train_set_databoss/', 14000, transform=transforms.Compose([
#                            transforms.RandomRotation(50, interpolation=PIL.Image.BILINEAR),
                           transforms.ToTensor(),
#                            transforms.Normalize(
#                                 mean=[0.485, 0.456, 0.406],
#                                 std=[0.229, 0.224, 0.225],
#                             )
]))
data_test = CustomDataset('/kaggle/input/boss-base/test_set_databoss/',6000, transform=transforms.Compose([
#                             transforms.RandomRotation(50, interpolation=PIL.Image.BILINEAR),
                           transforms.ToTensor(),
#                            transforms.Normalize(
#                                 mean=[0.485, 0.456, 0.406],
#                                 std=[0.229, 0.224, 0.225],                       
#                        )
]))

batch_size = 32

data_size = 7000

validation_split = .2

num_epochs = 100

split = int(np.floor(validation_split * data_size))
indices = list(range(data_size))

np.random.shuffle(indices)

train_indices, val_indices = indices[split:], indices[:split]

train_sampler = SubsetRandomSampler(train_indices)
val_sampler = SubsetRandomSampler(val_indices)



# Load the dataset using DataLoader
train_loader = DataLoader(data_train, batch_size=batch_size, sampler=train_sampler)
val_loader = DataLoader(data_train, batch_size=batch_size, sampler=val_sampler)



# nn_model = SteganalysisCNN()

nn_model = Srnet()

nn_model.type(torch.cuda.FloatTensor)
nn_model.to(device)

criterion = nn.CrossEntropyLoss().type(torch.cuda.FloatTensor)

optimizer = torch.optim.Adamax(
        nn_model.parameters(),
        lr=0.01,
        betas=(0.9, 0.999),
#         eps=1e-8,
        # weight_decay=1e-6,
    )
# optimizer = optim.Adam(nn_model.parameters(), lr=0.001, betas=(0.95, 0.999), weight_decay=1e-5, eps=1e-8)

def train_model(model, train_loader, val_loader, loss, optimizer, num_epochs):
    loss_history = []
    train_history = []
    val_history = []
    val_losses = []
    for epoch in range(num_epochs):
        model.train()
        loss_accum = 0
        correct_samples = 0
        total_samples = 0
        running_loss = 0.0
        for i, data in enumerate(train_loader):
            inputs, labels = data
            x_gpu = inputs.to(device)
            y_gpu = labels.to(device)
            optimizer.zero_grad()
            outputs = model(x_gpu)
            loss = criterion(outputs, y_gpu)

            loss.backward()
            optimizer.step()

            _, indices = torch.max(outputs, 1)
            correct_samples += torch.sum(indices == y_gpu)
            total_samples += labels.shape[0]
            
            loss_accum += loss

        ave_loss = loss_accum / i
        train_accuracy = float(correct_samples) / total_samples
        val_accuracy, val_loss = compute_accuracy(model, val_loader)
        
        loss_history.append(float(ave_loss))
        train_history.append(train_accuracy)
        val_history.append(val_accuracy)
        val_losses.append(val_loss)
        
        print("Epoch: %d, Average loss: %f, Validation loss: %f, Train accuracy: %f, Val accuracy: %f" % (epoch, ave_loss, val_loss, train_accuracy, val_accuracy))
        
    return loss_history, train_history, val_history, val_losses

def compute_accuracy(model, loader):
    model.eval()
    total = 0
    correct = 0
    losses = []
    with torch.no_grad():
        for i_step, (x, y) in enumerate(loader):
            x_gpu = x.to(device)
            y_gpu = y.to(device)
            predictions = model(x_gpu)
            loss = criterion(predictions, y_gpu)
            losses.append(loss.item())
            _, predicted = torch.max(predictions.data, 1)
            total += y_gpu.size(0)
            correct += (predicted == y_gpu).sum().item()
    return correct/total, sum(losses)/len(losses)

def Plt_hist(loss_history, train_history, val_history, val_losses):
     plt.plot(train_history)
     plt.plot(val_history)
     plt.title('model accuracy')
     plt.xlabel('epoch')
     plt.ylabel('accuracy')
     plt.legend(['train','validation'], loc='upper left')
     plt.grid()
     plt.show()
     plt.plot(loss_history)
     plt.plot(val_losses)
     plt.title('model loss')
     plt.xlabel('epoch')
     plt.ylabel('loss')
     plt.legend(['model loss', 'val loss'], loc='upper left')
     plt.grid()
     plt.show()

loss_history, train_history, val_history, val_losses = train_model(nn_model, train_loader, val_loader, criterion, optimizer, num_epochs)

Plt_hist(loss_history, train_history, val_history, val_losses)

Epoch: 0, Average loss: 0.724245, Validation loss: 0.701530, Train accuracy: 0.513929, Val accuracy: 0.502857
Epoch: 1, Average loss: 0.700970, Validation loss: 0.694658, Train accuracy: 0.501607, Val accuracy: 0.521429
Epoch: 2, Average loss: 0.699415, Validation loss: 0.695069, Train accuracy: 0.507500, Val accuracy: 0.507857
Epoch: 3, Average loss: 0.697730, Validation loss: 0.693055, Train accuracy: 0.497500, Val accuracy: 0.510000
Epoch: 4, Average loss: 0.697439, Validation loss: 0.693239, Train accuracy: 0.494464, Val accuracy: 0.502857
Epoch: 5, Average loss: 0.697943, Validation loss: 0.693336, Train accuracy: 0.501786, Val accuracy: 0.487857
Epoch: 6, Average loss: 0.697958, Validation loss: 0.693401, Train accuracy: 0.503750, Val accuracy: 0.492143
Epoch: 7, Average loss: 0.697954, Validation loss: 0.693418, Train accuracy: 0.496786, Val accuracy: 0.492143
Epoch: 8, Average loss: 0.697318, Validation loss: 0.693152, Train accuracy: 0.503393, Val accuracy: 0.507857
Epoch: 9, 

KeyboardInterrupt: 