In [1]:
import torch
import torch.nn as nn
import torch.optim as optim
from torchvision import datasets, transforms
from torch.utils.data import DataLoader, random_split
import numpy as np

# Encoder model
class Encoder(nn.Module):
    def __init__(self, data_depth=3, hidden_size=64):
        super(Encoder, self).__init__()
        self.data_depth = data_depth
        self.hidden_size = hidden_size

        self.model = nn.Sequential(
            nn.Conv2d(3 + data_depth, hidden_size, kernel_size=3, padding=1),
            nn.ReLU(inplace=True),
            nn.Conv2d(hidden_size, hidden_size, kernel_size=3, padding=1),
            nn.ReLU(inplace=True),
            nn.Conv2d(hidden_size, 3, kernel_size=3, padding=1),
            nn.Tanh()
        )

    def forward(self, image, data):
        x = torch.cat([image, data], dim=1)
        return self.model(x)

# Decoder model
class Decoder(nn.Module):
    def __init__(self, data_depth=3, hidden_size=64):
        super(Decoder, self).__init__()
        self.data_depth = data_depth
        self.hidden_size = hidden_size

        self.model = nn.Sequential(
            nn.Conv2d(3, hidden_size, kernel_size=3, padding=1),
            nn.ReLU(inplace=True),
            nn.Conv2d(hidden_size, hidden_size, kernel_size=3, padding=1),
            nn.ReLU(inplace=True),
            nn.Conv2d(hidden_size, data_depth, kernel_size=3, padding=1),
            nn.Tanh()
        )

    def forward(self, encoded_image):
        return self.model(encoded_image)

# Custom loss function for the autoencoder
class AutoencoderLoss(nn.Module):
    def __init__(self):
        super(AutoencoderLoss, self).__init__()
        self.pixelwise_loss = nn.L1Loss()

    def forward(self, decoded_data, real_data):
        return self.pixelwise_loss(decoded_data, real_data)

# Training function
def train_autoencoder(encoder, decoder, loss_fn, train_loader, val_loader, num_epochs=20, lr=0.0002, data_depth=3):
    encoder = encoder
    decoder = decoder
    loss_fn = loss_fn

    optimizer_e = optim.Adam(encoder.parameters(), lr=lr)
    optimizer_d = optim.Adam(decoder.parameters(), lr=lr)

    best_val_loss = float('inf')
    best_params = None

    for epoch in range(num_epochs):
        encoder.train()
        decoder.train()

        e_loss_epoch = 0.0
        d_loss_epoch = 0.0

        for i, (images, _) in enumerate(train_loader):
            batch_size = images.size(0)

            random_data = torch.rand(batch_size, data_depth, images.size(2), images.size(3), device=images.device)

            # Train Encoder and Decoder
            optimizer_e.zero_grad()
            optimizer_d.zero_grad()

            # Encode and decode images
            encoded_images = encoder(images, random_data)
            decoded_data = decoder(encoded_images)

            # Autoencoder loss
            loss = loss_fn(decoded_data, random_data)
            loss.backward()
            optimizer_e.step()
            optimizer_d.step()

            e_loss_epoch += loss.item()
            d_loss_epoch += loss.item()

        # Validate
        encoder.eval()
        decoder.eval()
        val_loss = 0.0
        with torch.no_grad():
            for images, _ in val_loader:
                batch_size = images.size(0)
                random_data = torch.rand(batch_size, data_depth, images.size(2), images.size(3), device=images.device)
                encoded_images = encoder(images, random_data)
                decoded_data = decoder(encoded_images)
                loss = loss_fn(decoded_data, random_data)
                val_loss += loss.item()

        val_loss /= len(val_loader)

        if val_loss < best_val_loss:
            best_val_loss = val_loss
            best_params = {'lr': lr, 'epoch': epoch, 'encoder_state_dict': encoder.state_dict(), 'decoder_state_dict': decoder.state_dict()}

        print(f'Epoch [{epoch+1}/{num_epochs}], E Loss: {e_loss_epoch/len(train_loader):.4f}, D Loss: {d_loss_epoch/len(train_loader):.4f}, Val Loss: {val_loss:.4f}')

    # Save the best model
    torch.save(best_params, 'best_autoencoder_params.pth')
    print(f'Best model saved with val_loss: {best_val_loss:.4f} at epoch {best_params["epoch"]}')

# Testing function
def test_autoencoder(encoder, decoder, test_loader, data_depth=3):
    encoder.eval()
    decoder.eval()

    with torch.no_grad():
        for images, _ in test_loader:
            batch_size = images.size(0)
            random_data = torch.rand(batch_size, data_depth, images.size(2), images.size(3), device=images.device)
            encoded_images = encoder(images, random_data)
            decoded_data = decoder(encoded_images)

            # Here you can do further evaluation or save the images
            # For simplicity, I'll print the first decoded data
            print(decoded_data.shape)  # This is to verify the shape of the decoded data
            break

# Main function
def main():
    # Set random seed for reproducibility
    torch.manual_seed(0)
    np.random.seed(0)

    # Load data
    transform = transforms.Compose([
        transforms.Resize((64, 64)),
        transforms.ToTensor(),
        transforms.Normalize((0.5,), (0.5,))
    ])

    dataset = datasets.CIFAR10(root='data/', download=True, transform=transform)

    train_size = int(0.7 * len(dataset))
    val_size = int(0.15 * len(dataset))
    test_size = len(dataset) - train_size - val_size

    train_dataset, val_dataset, test_dataset = random_split(dataset, [train_size, val_size, test_size])

    train_loader = DataLoader(train_dataset, batch_size=64, shuffle=True)
    val_loader = DataLoader(val_dataset, batch_size=64, shuffle=False)
    test_loader = DataLoader(test_dataset, batch_size=64, shuffle=False)

    encoder = Encoder(data_depth=3)
    decoder = Decoder(data_depth=3)
    loss_fn = AutoencoderLoss()

    # Train the Autoencoder
    train_autoencoder(encoder, decoder, loss_fn, train_loader, val_loader, num_epochs=20)

    # Load the best model for testing
    best_params = torch.load('best_autoencoder_params.pth')
    encoder.load_state_dict(best_params['encoder_state_dict'])
    decoder.load_state_dict(best_params['decoder_state_dict'])

    # Test the Autoencoder (decode data)
    test_autoencoder(encoder, decoder, test_loader)

if __name__ == "__main__":
    main()


Files already downloaded and verified
Epoch [1/20], E Loss: 0.1771, D Loss: 0.1771, Val Loss: 0.0700
Epoch [2/20], E Loss: 0.0490, D Loss: 0.0490, Val Loss: 0.0380
Epoch [3/20], E Loss: 0.0338, D Loss: 0.0338, Val Loss: 0.0287
Epoch [4/20], E Loss: 0.0266, D Loss: 0.0266, Val Loss: 0.0253


KeyboardInterrupt: 

In [2]:
import torch
import torch.nn as nn
import torch.optim as optim
from torchvision import datasets, transforms
from torch.utils.data import DataLoader, random_split
from sklearn.metrics import confusion_matrix, f1_score, accuracy_score
from PIL import Image
import numpy as np
import time
import pickle
from torchvision.utils import save_image

# Encoder model
class Encoder(nn.Module):
    def __init__(self, data_depth=1, hidden_size=64):
        super(Encoder, self).__init__()
        self.data_depth = data_depth
        self.hidden_size = hidden_size

        self.model = nn.Sequential(
            nn.Conv2d(3 + data_depth, hidden_size, kernel_size=3, padding=1),
            nn.ReLU(inplace=True),
            nn.Conv2d(hidden_size, hidden_size, kernel_size=3, padding=1),
            nn.ReLU(inplace=True),
            nn.Conv2d(hidden_size, 3, kernel_size=3, padding=1),
            nn.Tanh()
        )

    def forward(self, image, data):
        x = torch.cat([image, data], dim=1)
        return self.model(x)

# Decoder model
class Decoder(nn.Module):
    def __init__(self, data_depth=1, hidden_size=64):
        super(Decoder, self).__init__()
        self.data_depth = data_depth
        self.hidden_size = hidden_size

        self.model = nn.Sequential(
            nn.Conv2d(3, hidden_size, kernel_size=3, padding=1),
            nn.ReLU(inplace=True),
            nn.Conv2d(hidden_size, hidden_size, kernel_size=3, padding=1),
            nn.ReLU(inplace=True),
            nn.Conv2d(hidden_size, data_depth, kernel_size=3, padding=1),
            nn.Tanh()
        )

    def forward(self, encoded_image):
        return self.model(encoded_image)

# Custom loss function for the autoencoder
class AutoencoderLoss(nn.Module):
    def __init__(self):
        super(AutoencoderLoss, self).__init__()
        self.pixelwise_loss = nn.L1Loss()

    def forward(self, decoded_data, real_data):
        return self.pixelwise_loss(decoded_data, real_data)

# Training function
def train_autoencoder(encoder, decoder, loss_fn, train_loader, val_loader, num_epochs=20, lr=0.0002, data_depth=1):
    encoder = encoder
    decoder = decoder
    loss_fn = loss_fn

    optimizer_e = optim.Adam(encoder.parameters(), lr=lr)
    optimizer_d = optim.Adam(decoder.parameters(), lr=lr)

    best_val_loss = float('inf')
    best_params = None

    for epoch in range(num_epochs):
        start_time = time.time()
        
        encoder.train()
        decoder.train()

        e_loss_epoch = 0.0
        d_loss_epoch = 0.0

        for i, (images, _) in enumerate(train_loader):
            batch_size = images.size(0)

            random_data = torch.rand(batch_size, data_depth, images.size(2), images.size(3), device=images.device)

            # Train Encoder and Decoder
            optimizer_e.zero_grad()
            optimizer_d.zero_grad()

            # Encode and decode images
            encoded_images = encoder(images, random_data)
            decoded_data = decoder(encoded_images)

            # Autoencoder loss
            loss = loss_fn(decoded_data, random_data)
            loss.backward()
            optimizer_e.step()
            optimizer_d.step()

            e_loss_epoch += loss.item()
            d_loss_epoch += loss.item()

        # Validate
        encoder.eval()
        decoder.eval()
        val_loss = 0.0
        with torch.no_grad():
            for images, _ in val_loader:
                batch_size = images.size(0)
                random_data = torch.rand(batch_size, data_depth, images.size(2), images.size(3), device=images.device)
                encoded_images = encoder(images, random_data)
                decoded_data = decoder(encoded_images)
                loss = loss_fn(decoded_data, random_data)
                val_loss += loss.item()

        val_loss /= len(val_loader)

        if val_loss < best_val_loss:
            best_val_loss = val_loss
            best_params = {'lr': lr, 'epoch': epoch, 'encoder_state_dict': encoder.state_dict(), 'decoder_state_dict': decoder.state_dict()}

        end_time = time.time()
        epoch_time = end_time - start_time

        print(f'Epoch [{epoch+1}/{num_epochs}], E Loss: {e_loss_epoch/len(train_loader):.4f}, D Loss: {d_loss_epoch/len(train_loader):.4f}, Val Loss: {val_loss:.4f}, Time: {epoch_time:.2f}s')

    # Save the best model
    with open('best_autoencoder_params.pkl', 'wb') as f:
        pickle.dump(best_params, f)
    print(f'Best model saved with val_loss: {best_val_loss:.4f} at epoch {best_params["epoch"]}')

# Testing function
def test_autoencoder(encoder, decoder, test_loader, data_depth=1):
    encoder.eval()
    decoder.eval()

    all_decoded = []
    all_random = []

    with torch.no_grad():
        for images, _ in test_loader:
            batch_size = images.size(0)
            random_data = torch.rand(batch_size, data_depth, images.size(2), images.size(3), device=images.device)
            encoded_images = encoder(images, random_data)
            decoded_data = decoder(encoded_images)

            all_decoded.append(decoded_data.cpu().numpy())
            all_random.append(random_data.cpu().numpy())

    all_decoded = np.concatenate(all_decoded, axis=0)
    all_random = np.concatenate(all_random, axis=0)

    # Calculate F1 score, accuracy, and confusion matrix
    f1 = f1_score(all_random.flatten(), all_decoded.flatten() > 0, average='weighted')
    accuracy = accuracy_score(all_random.flatten(), all_decoded.flatten() > 0)
    conf_matrix = confusion_matrix(all_random.flatten(), all_decoded.flatten() > 0)

    print(f'F1 Score: {f1:.4f}')
    print(f'Accuracy: {accuracy:.4f}')
    print(f'Confusion Matrix:\n{conf_matrix}')

    return f1, accuracy, conf_matrix

# Main function
def main():
    # Set random seed for reproducibility
    torch.manual_seed(0)
    np.random.seed(0)

    # Load data
    transform = transforms.Compose([
        transforms.Resize((64, 64)),
        transforms.ToTensor(),
        transforms.Normalize((0.5,), (0.5,))
    ])

    dataset = datasets.CIFAR10(root='data/', download=True, transform=transform)

    train_size = int(0.7 * len(dataset))
    val_size = int(0.15 * len(dataset))
    test_size = len(dataset) - train_size - val_size

    train_dataset, val_dataset, test_dataset = random_split(dataset, [train_size, val_size, test_size])

    train_loader = DataLoader(train_dataset, batch_size=64, shuffle=True)
    val_loader = DataLoader(val_dataset, batch_size=64, shuffle=False)
    test_loader = DataLoader(test_dataset, batch_size=64, shuffle=False)

    encoder = Encoder(data_depth=1)
    decoder = Decoder(data_depth=1)
    loss_fn = AutoencoderLoss()

    # Train the Autoencoder
    train_autoencoder(encoder, decoder, loss_fn, train_loader, val_loader, num_epochs=20)

    # Load the best model for testing
    with open('best_autoencoder_params.pkl', 'rb') as f:
        best_params = pickle.load(f)
    encoder.load_state_dict(best_params['encoder_state_dict'])
    decoder.load_state_dict(best_params['decoder_state_dict'])

    # Test the Autoencoder (decode data)
    f1, accuracy, conf_matrix = test_autoencoder(encoder, decoder, test_loader)

    # Example of hiding a greyscale image in a cover image
    secret_image_path = 'C:/Users/Hendy Group/OneDrive/Desktop/final_graduation_project/12.jpg'
    cover_image_path = 'C:/Users/Hendy Group/OneDrive/Desktop/final_graduation_project/11.jpg'


    secret_image = transforms.ToTensor()(Image.open(secret_image_path).convert('L')).unsqueeze(0)
    cover_image = transforms.ToTensor()(Image.open(cover_image_path).convert('RGB')).unsqueeze(0)

    # Ensure the images are the same size
    secret_image = transforms.Resize((64, 64))(secret_image)
    cover_image = transforms.Resize((64, 64))(cover_image)

    # Normalize the secret image
    secret_image = (secret_image - 0.5) / 0.5

    # Encode and decode
    encoder.eval()
    decoder.eval()

    with torch.no_grad():
        encoded_image = encoder(cover_image, secret_image)
        decoded_image = decoder(encoded_image)

    # Save the results
    save_image(encoded_image, 'encoded_image.jpg')
    save_image(decoded_image, 'decoded_image.jpg')

    print('Hiding and extraction completed.')

if __name__ == "__main__":
    main()


Files already downloaded and verified
Epoch [1/20], E Loss: 0.0817, D Loss: 0.0817, Val Loss: 0.0169, Time: 1093.36s
Epoch [2/20], E Loss: 0.0125, D Loss: 0.0125, Val Loss: 0.0100, Time: 1170.90s
Epoch [3/20], E Loss: 0.0090, D Loss: 0.0090, Val Loss: 0.0088, Time: 1336.99s
Epoch [4/20], E Loss: 0.0075, D Loss: 0.0075, Val Loss: 0.0062, Time: 1030.88s
Epoch [5/20], E Loss: 0.0066, D Loss: 0.0066, Val Loss: 0.0073, Time: 1085.47s


KeyboardInterrupt: 