# СТЕГОАНАЛИЗ ИЗОБРАЖЕНИЙ С ПОМОЩЬЮ ГЛУБОКОГО МАШИННОГО ОБУЧЕНИЯ 

## Аннотация
Рассматривается современное состояние проблемы стегоанализа цифровых изображений, направленной на исследование и разработку эффективных методов выявления стеганографически скрытых (визуально незаметных) сообщений в контейнерах- изображениях.

## Содержание

1. [Импорт необходимых библиотек](#first)
2. [Получение и предобработка данных](#second)

## 1. Импорт необходимых библиотек
<span id="first"></span>

In [None]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt

from PIL import Image
from skimage import io, transform
from IPython.display import clear_output

from sklearn.utils import shuffle
from sklearn.metrics import accuracy_score, f1_score

import torch, torchvision
from torchvision import transforms
from torch.utils.data import Dataset, DataLoader
from torch import nn
import torch.nn.functional as F

import os
import string
import random
from tqdm import tqdm
import time
import warnings

from stegano import exifHeader

In [None]:
warnings.filterwarnings("ignore")

## 2. Получение и предобработка данных

In [None]:
pictures_names = os.listdir('data/')

pictures_clear = pictures_names[:round(len(pictures_names) * 0.65)]
pictures_clear_df = pd.DataFrame({
    "picture_link": pictures_clear,
    "is_clear": np.ones(len(pictures_clear), dtype=int)
})

pictures_graphed = pictures_names[round(len(pictures_names) * 0.65):]
pictures_graphed_df = pd.DataFrame({
    "picture_link": pictures_graphed,
    "is_clear": np.zeros(len(pictures_graphed), dtype=int)
})

data = shuffle(pd.concat([pictures_clear_df, pictures_graphed_df]))
data.reset_index(drop=True, inplace=True)
data.picture_link = data.picture_link.apply(lambda val: "data/" + val)
data = data[data.picture_link != "data/.DS_Store"]

def image_resizing(link):
    img = Image.open(link)
    img = img.resize((64,64))
    img.save(link)
    return link

data.picture_link.apply(image_resizing)

data.info()
data.head()

In [None]:
plt.hist(data.is_clear)
plt.xlabel("Класс объекта")
plt.ylabel("Кол-во объектов данного класса")
plt.title("Гистограмма отношения классов");

## 3. Стенография

In [None]:
def preprocess_images(row):
    try:
        image_link = row["picture_link"]
        is_clear = row["is_clear"]

        image = Image.open(image_link)
        image.load()

        im = Image.new('RGB', image.size, (255, 255, 255))
        im.paste(image, None)
        im.save(image_link)


        secret_text = ""
        new_link = "data_prepared/" + image_link.split("/")[1].split(".")[0] + image_link.split("/")[1].split(".")[1] + ".jpg"
        
        if not bool(is_clear):
            secret_text = ''.join(random.choice(string.ascii_letters) for i in range(200))
            secret = exifHeader.hide(image_link, new_link, secret_message=secret_text)
        
        else:
            image.save(new_link)

        row["picture_link"] = new_link 
        row["secret"] = secret_text
    except:
        pass
    
    return row
    

In [None]:
data["secret"] = np.nan

In [None]:
for index in tqdm(range(0, data.shape[0], 5)):
    data[index:index + 5] = data[index:index + 5].apply(preprocess_images, axis=1)

In [None]:
data.sample(10)

In [None]:
print(exifHeader.reveal("data_prepared/dog2424.jpg"), Image.open("data_prepared/dog2424.jpg").size)

In [None]:
data.to_csv("data_marked.csv", index=False)

## DataLoader

In [None]:
class Dataset(Dataset):
    """Описантельный класс датасета для удобной работы с ним"""

    def __init__(self, csv_file=None, transform=None):
        """
            Args:
                csv_file (string): Путь к csv файлу с разметкой
                transform (callable, optional): Опционально, трансформации применяемые к картинкам
        """
        
        self.annotations = pd.read_csv(csv_file)
        self.transform = transform

    def __len__(self):
        return self.annotations.shape[0]
    
    def __getitem__(self, idx):
        if torch.is_tensor(idx):
            idx = idx.tolist()
        
        img_info = self.annotations.iloc[idx]
        image = io.imread(img_info[0])
        label = img_info[1]
        
        if self.transform:
            image = self.transform(image)

        if len(image.shape) == 2 or image.shape[0] == 1: # если черно-белая
            image = image.repeat( 3, 1, 1)
            
        sample = image, label

        return sample

In [None]:
transform = transforms.Compose([transforms.ToPILImage(), transforms.Resize((64, 64)), transforms.ToTensor()])
train_dataset = Dataset(csv_file='data_marked.csv', transform=transform)

part = np.arange(0, int(len(train_dataset) * 0.8))
val_part = np.arange(int(len(train_dataset) * 0.8), len(train_dataset))

sampler_to_train = torch.utils.data.SubsetRandomSampler(part)
sampler_to_val = torch.utils.data.SubsetRandomSampler(val_part)

In [None]:
train_loader = DataLoader(train_dataset, batch_size=64, sampler=sampler_to_train) 
validate_loader = DataLoader(train_dataset, batch_size=64, sampler=sampler_to_val)

In [None]:
for image, label in train_loader:
    print(image.shape, label)
    plt.imshow(image[0].transpose(0, -1).transpose(1, 0))
    plt.title(label[0])
    plt.show();
    break

## Нейросеть

In [None]:
def gaussian(inp: torch.Tensor) -> torch.Tensor:
    """Считает гауссову функцию для каждого входного тензора
    Args:
            inp (torch.Tensor): входной тензор
    Returns:
            torch.Tensor: тензор после применения Гаусса
    """
    
    return torch.exp(-((inp - torch.mean(inp)) ** 2) / (torch.std(inp)) ** 2) 

In [None]:
class ImageProcessing(nn.Module):
    """Считает светку с помощью использования пространственного фильтра высоких частот с фиксированным ядром"""

    def __init__(self) -> None:
        super().__init__()
        self.kv_filter = (
            torch.tensor([
                [
                    [-1.0, 2.0, -2.0, 2.0, -1.0],
                    [2.0, -6.0, 8.0, -6.0, 2.0],
                    [-2.0, 8.0, -12.0, 8.0, -2.0],
                    [2.0, -6.0, 8.0, -6.0, 2.0],
                    [-1.0, 2.0, -2.0, 2.0, -1.0],
                ],
                [
                    [-1.0, 2.0, -2.0, 2.0, -1.0],
                    [2.0, -6.0, 8.0, -6.0, 2.0],
                    [-2.0, 8.0, -12.0, 8.0, -2.0],
                    [2.0, -6.0, 8.0, -6.0, 2.0],
                    [-1.0, 2.0, -2.0, 2.0, -1.0],
                ],
                [
                    [-1.0, 2.0, -2.0, 2.0, -1.0],
                    [2.0, -6.0, 8.0, -6.0, 2.0],
                    [-2.0, 8.0, -12.0, 8.0, -2.0],
                    [2.0, -6.0, 8.0, -6.0, 2.0],
                    [-1.0, 2.0, -2.0, 2.0, -1.0],
                ],
            ]
            ).view(1, 3, 5, 5))

    def forward(self, inp: torch.Tensor) -> torch.Tensor:
        """Возвращает тензор, над которым была произведена свертка"""

        return F.conv2d(inp, self.kv_filter)

In [None]:
class ConvPool(nn.Module):
    """
        Данный класс возвращает экземпляр свертки, выстроенной по необходимым параметрам, 
        то есть  схема данного блока это conv -> batch norm -> gaussian -> average pooling
    """

    def __init__(
            self,
            in_channels: int = 16,
            out_channels: int = 32,
            conv_kernel_size: tuple[int, int] = (3, 3),
            conv_stride: int = 1,
            pool_stride: int = 2,
            pool_kernel_size: tuple[int, int] = (3, 3),
            pool_padding: int = 0,
            activation_function = None
        ) -> None:
        super().__init__()
        self.conv = nn.Conv2d(
            in_channels=in_channels,
            out_channels=out_channels,
            kernel_size=conv_kernel_size,
            stride=conv_stride,
            padding=0,
            bias=True,
        )
        self.pool = nn.AvgPool2d(kernel_size=pool_kernel_size, stride=pool_stride, padding=pool_padding)
        self.bn = nn.BatchNorm2d(out_channels)
        self.activation_function = activation_function
        
    def forward(self, inp: torch.Tensor) -> torch.Tensor:
        """Соответсвенно conv->batch norm->activation->average pooling."""
        if self.activation_function is None:
            return self.pool(gaussian(self.bn(self.conv(inp))))
        
        return self.pool(self.activation_function(self.bn(self.conv(inp))))
        

In [None]:
class CNN(nn.Module):
    """
        Реализация сверточной нейронной сети
    """

    def __init__(self) -> None:
        super().__init__()
        self.layer1 = ConvPool(in_channels=1, out_channels=16,  conv_kernel_size=(5, 5), pool_kernel_size=(3,3))
        self.layer2 = ConvPool(in_channels=16, out_channels=64, conv_kernel_size=(3, 3), activation_function=nn.ReLU())
        self.layer3 = ConvPool(in_channels=64, out_channels=128, conv_kernel_size=(3, 3), activation_function=nn.ReLU())
        
        self.fully_connected = nn.Sequential(
            nn.Linear(in_features=100352, out_features=128),
            nn.ReLU(inplace=True),
            nn.Linear(in_features=128, out_features=128),
            nn.ReLU(inplace=True),
            nn.Linear(in_features=128, out_features=2),
            nn.Softmax(dim=1),
        )

    def forward(self, image: torch.Tensor) -> torch.Tensor:
        """Вернет логиты для данного изображения"""
        with torch.no_grad():
            out = ImageProcessing()(image)
        out = self.layer1(out)
        out = self.layer2(out)
        out = self.layer3(out)
        out = out.view(out.size(0), -1)
        out = self.fully_connected(out)
        return out

In [None]:
net = CNN()
inp_image = torch.randn((1, 3, 256, 256))
net(inp_image)