# Paper Implementation
## END-TO-END TRAINED CNN ENCODER-DECODER NETWORKS FOR IMAGE STEGANOGRAPHY - Atique </i> $et.al$
## Tensorflow 2.0
###  Notebook Author: Saad Zia

In [1]:
import numpy as np
import tensorflow as tf
import pickle

import matplotlib.pyplot as plt
%matplotlib inline
from IPython import display


%load_ext autoreload
%autoreload 2

#### Setting up Data Pipeline

In [2]:
(x, y), (x_test, y_test) = tf.keras.datasets.cifar10.load_data()

Downloading data from https://www.cs.toronto.edu/~kriz/cifar-10-python.tar.gz


In [108]:
x = x.astype(np.float32)
x_test = x_test.astype(np.float32)

In [109]:
payload_train = np.mean(x, axis=-1)[:, :, :, np.newaxis]
host_train = x[np.random.choice(np.arange(x.shape[0]), size=payload_train.shape[0])]

payload_test = np.mean(x_test, axis=-1)[:, :, :, np.newaxis]
host_test = x_test[np.random.choice(np.arange(x_test.shape[0]), size=payload_test.shape[0])]

In [110]:
# Instantiate the Dataset class
train_dataset = tf.data.Dataset.from_tensor_slices((payload_train, host_train))

In [111]:
# Normalization function
def normalize(payload, host):
    payload = tf.image.per_image_standardization(payload)
    host = tf.image.per_image_standardization(host)
    return payload, host

# Adding shuffle, normalization and batching operations to the dataset object
train_dataset = train_dataset.map(normalize).shuffle(50000).batch(128, drop_remainder=True)

In [112]:
# Instantiate the test Dataset class
test_dataset = tf.data.Dataset.from_tensor_slices((payload_test, host_test))
test_dataset = (test_dataset.map(normalize).batch(128, drop_remainder=True))

#### Setting up tf.keras Model

In [113]:
from tensorflow.keras import Model
from tensorflow.keras.layers import Input

In [114]:
from encoder import EncoderNetwork
from decoder import DecoderNetwork

In [115]:
encoder_network = EncoderNetwork()
decoder_network = DecoderNetwork()

In [116]:
carrier_image_shape=(32, 32, 3)
payload_image_shape=(32, 32, 1)

input_carrier = Input(shape=carrier_image_shape, name='input_carrier')
input_payload = Input(shape=payload_image_shape, name='input_payload')

In [117]:
encoded_output = encoder_network.get_network(input_carrier, input_payload)
decoded_output = decoder_network.get_network(encoded_output)

In [118]:
steganography_model = Model(inputs=[input_carrier, input_payload], outputs=[encoded_output, decoded_output])

In [119]:
from tensorflow.keras.utils import plot_model
plot_model(steganography_model, show_shapes=True)

Failed to import pydot. You must install pydot and graphviz for `pydotprint` to work.


In [120]:
steganography_model.summary()

Model: "model_3"
__________________________________________________________________________________________________
Layer (type)                    Output Shape         Param #     Connected to                     
input_carrier (InputLayer)      [(None, 32, 32, 3)]  0                                            
__________________________________________________________________________________________________
input_payload (InputLayer)      [(None, 32, 32, 1)]  0                                            
__________________________________________________________________________________________________
conv2d_73 (Conv2D)              (None, 32, 32, 16)   448         input_carrier[0][0]              
__________________________________________________________________________________________________
conv2d_66 (Conv2D)              (None, 32, 32, 16)   160         input_payload[0][0]              
____________________________________________________________________________________________

In [121]:
# Defining Loss Function

@tf.function
def loss_function(payload, host, encoder_output, decoder_output):
    
    loss = tf.math.reduce_mean(tf.math.squared_difference(payload, decoder_output)\
                           + tf.math.squared_difference(host, encoder_output))
        
    return loss


In [122]:
def custom_loss(input_):
    def loss(y_true, y_pred):
        return tf.math.reduce_mean(tf.math.squared_difference(y_true, y_pred))
    return loss             

In [124]:
optimizer = tf.keras.optimizers.Adam()

In [None]:
test_loss = tf.keras.metrics.Mean(name='test_loss')
train_loss = tf.keras.metrics.Mean(name='train_loss')

@tf.function
def train_step(payload, host):
    with tf.GradientTape() as tape:
        encoded_host, decoded_payload = steganography_model([host, payload])
        loss = loss_function(payload, host, encoded_host, decoded_payload)
        train_loss(loss)
    gradients = tape.gradient(loss, steganography_model.trainable_variables)
    optimizer.apply_gradients(zip(gradients, steganography_model.trainable_variables))



@tf.function
def test_step(payload, host):
    encoded_host, decoded_payload = steganography_model([host, payload])
    t_loss = loss_function(payload, host, encoded_host, decoded_payload)
    test_loss(t_loss)

EPOCHS = 5
SUMMARY_DIR = './summary'

TRAIN_BATCH_SIZE = 32
TEST_BATCH_SIZE = 32

import time


for epoch in range(EPOCHS):
    start = time.time()
    for payload, host in train_dataset:
        train_step(payload, host)

    for payload, host in test_dataset:
        test_step(payload, host)

    elapsed = time.time() - start
    print('elapsed: %f' % elapsed)

    template = 'Epoch {}, Train Loss: {}, Test Loss: {}'
    print(template.format(epoch+1, train_loss.result(), test_loss.result()))

    # Reset the metrics for the next epoch
    test_loss.reset_states()

print('Training Finished.')

In [None]:
steganography_model.compile(
          loss={'encoded_output': custom_loss(input_carrier), 'decoded_output': custom_loss(input_payload)},\
        loss_weights={'encoded_output': 1, 'decoded_output': 1},\
          optimizer=tf.keras.optimizers.Adam(learning_rate=0.001))

In [32]:
steganography_model.fit(train_dataset)

      1/Unknown - 0s 131ms/step

ValueError: Error when checking model input: the list of Numpy arrays that you are passing to your model is not the size the model expected. Expected to see 2 array(s), but instead got the following list of 1 arrays: [<tf.Tensor 'IteratorGetNext:0' shape=(128, 32, 32, 3) dtype=uint8>]...

In [None]:
loss_object = tf.keras.losses.SparseCategoricalCrossentropy()
optimizer = tf.keras.optimizers.Adam()