# Identification of encrypted images using CNN
## Stanisław Durka

https://knowledgepit.ml/sus-2023/

In [23]:
import sys
import pandas as pd
from PIL import Image, ImageOps

import numpy as np                   # advanced math library
import matplotlib.pyplot as plt      # MATLAB like plotting routines
import random                        # for generating random numbers

from keras.models import Sequential  # Model type to be used

from keras.layers.core import Dense, Dropout, Activation # Types of layers to be used in our model
from keras.utils import np_utils                         # NumPy related tools

In [24]:
import tensorflow as tf
tf.config.threading.set_intra_op_parallelism_threads(1)
tf.config.threading.set_inter_op_parallelism_threads(112)

In [25]:
IMAGE_WIDTH = 28   # used to compress input images

In [26]:
def get_images(paths: np.ndarray):
    ret = []
    for path in paths:
        with Image.open(path) as img:
#             ret.append(np.array(img))
            ret.append(np.array(resize_img(img)))
    return ret

def resize_img(img):
    basewidth = IMAGE_WIDTH
    wpercent = (basewidth/float(img.size[0]))
    hsize = int((float(img.size[1])*float(wpercent)))
    return img.resize((basewidth,hsize), Image.Resampling.LANCZOS)


In [27]:
df = pd.read_csv('./data/train/train.csv')
df.loc[:5]

Unnamed: 0,id,input_path,encoded_path
0,0,BigDataCup2022/S1/train/input/0.jpg,BigDataCup2022/S1/train/enc/0.jpg
1,1,BigDataCup2022/S1/train/input/1.jpg,BigDataCup2022/S1/train/enc/1.jpg
2,2,BigDataCup2022/S1/train/input/2.jpg,BigDataCup2022/S1/train/enc/2.jpg
3,3,BigDataCup2022/S1/train/input/3.jpg,BigDataCup2022/S1/train/enc/3.jpg
4,4,BigDataCup2022/S1/train/input/4.jpg,BigDataCup2022/S1/train/enc/4.jpg
5,5,BigDataCup2022/S1/train/input/5.jpg,BigDataCup2022/S1/train/enc/5.jpg


In [28]:
df["input_path"] = ["./data" + x.removeprefix("BigDataCup2022/S1") for x in df["input_path"]]
df["encoded_path"] = ["./data" + x.removeprefix("BigDataCup2022/S1") for x in df["encoded_path"]]
df.loc[:5]
df["input_path"][:5]

0    ./data/train/input/0.jpg
1    ./data/train/input/1.jpg
2    ./data/train/input/2.jpg
3    ./data/train/input/3.jpg
4    ./data/train/input/4.jpg
Name: input_path, dtype: object

The training set is created from the list of pairs (X_input, X_encoded) as follows: \
X_train = X_in and X_enc concatenated along the second axis, \
and Y[i] = 1 for pairs of ".../input/i", ".../enc/i".

Then, additional N elements are added where X_train = concat(X_in[i], X_enc[j]), Y[i] == 0, \
where i =/= j

In [29]:
'''
create a training set of:
X_train = | X_in  |
          | X_enc |,
Y_train = np.array of (0/1)
'''
def create_training_set(N = 10000):
    X1 = np.array(get_images(df["input_path"][:N]))
    X2 = np.array(get_images(df["encoded_path"][:N]))

    X_train = np.concatenate((X1, X2), axis = 1)
    Y_train = np.ones(X_train.shape[0])  # for all of the images in the training set, encoded_i == encode(input_i) 

    XY = list(zip(X1, X2, Y_train))

    XY_2 = []
    for i in range(len(XY)):
        XY_2.append(XY[i])

    # now, append the tuple (X_in[i], X_enc[i], 1) set with additional N elements of:
    #                        (X_in[i], X_in[j], 0), where i != j
    for i in range (len(XY)):
        j = np.random.randint(len(XY))
        if (j == i):
            j = i+1
        XY.append((XY[i][0], XY[j][1], 0))
    
    random.shuffle(XY) # shuffle the tuple so that the y's aren't equal to [1,1,1,...,1,0,...,0]
    
    # finally, convert the array into X_train == concat(X_in, X_enc), and Y_train:
    X_train = []
    Y_train = []
    for i in range (len(XY)):
        X_train.append(np.concatenate((XY[i][0],XY[i][1]), axis=0))
        Y_train.append(XY[i][2])
    X_train = np.array(X_train)
    Y_train = np.array(Y_train)
    return (X_train, Y_train)

X_train, Y_train = create_training_set()
X_train.shape, Y_train.shape

((20000, 56, 28, 3), (20000,))

In [30]:
X_train.shape

(20000, 56, 28, 3)

In [31]:
from keras.preprocessing.image import ImageDataGenerator
from keras.layers import Conv2D, MaxPooling2D, ZeroPadding2D, GlobalAveragePooling2D, Flatten
from keras.layers import BatchNormalization

In [61]:
def create_model(img_width = IMAGE_WIDTH):
    model = Sequential()                                 # Linear stacking of layers

    # Convolution Layer 1
    model.add(Conv2D(32, (3, 3), input_shape=(img_width*2, img_width, 3))) # 32 different 3x3 kernels -- so 32 feature maps
    model.add(BatchNormalization(axis=-1))               # normalize each feature map before activation
    convLayer01 = Activation('relu')                     # activation
    model.add(convLayer01)

    # Convolution Layer 2
    model.add(Conv2D(32, (3, 3)))                        # 32 different 3x3 kernels -- so 32 feature maps
    model.add(BatchNormalization(axis=-1))               # normalize each feature map before activation
    model.add(Activation('relu'))                        # activation
    convLayer02 = MaxPooling2D(pool_size=(2,2))          # Pool the max values over a 2x2 kernel
    model.add(convLayer02)

    # Convolution Layer 3
    model.add(Conv2D(64,(3, 3)))                         # 64 different 3x3 kernels -- so 64 feature maps
    model.add(BatchNormalization(axis=-1))               # normalize each feature map before activation
    convLayer03 = Activation('relu')                     # activation
    model.add(convLayer03)

    # Convolution Layer 4
    model.add(Conv2D(64, (3, 3)))                        # 64 different 3x3 kernels -- so 64 feature maps
    model.add(BatchNormalization(axis=-1))               # normalize each feature map before activation
    model.add(Activation('relu'))                        # activation
    convLayer04 = MaxPooling2D(pool_size=(2,2))          # Pool the max values over a 2x2 kernel
    model.add(convLayer04)
    model.add(Flatten())                                 # Flatten final 4x4x64 output matrix into a 1024-length vector

    # Fully Connected Layer 5
    model.add(Dense(512))                                # 512 FCN nodes
    # model.add(BatchNormalization())                      # normalization
    model.add(Activation('relu'))                        # activation

    # Fully Connected Layer 6                       
    model.add(Dropout(0.2))                              # 20% dropout of randomly selected nodes
    model.add(Dense(1))                                 # final 10 FCN nodes
    model.add(Activation('sigmoid'))                     # sigmoid activation
    return model

In [62]:
model = create_model()
model.summary()

Model: "sequential_4"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 conv2d_16 (Conv2D)          (None, 54, 26, 32)        896       
                                                                 
 batch_normalization_16 (Bat  (None, 54, 26, 32)       128       
 chNormalization)                                                
                                                                 
 activation_24 (Activation)  (None, 54, 26, 32)        0         
                                                                 
 conv2d_17 (Conv2D)          (None, 52, 24, 32)        9248      
                                                                 
 batch_normalization_17 (Bat  (None, 52, 24, 32)       128       
 chNormalization)                                                
                                                                 
 activation_25 (Activation)  (None, 52, 24, 32)       

In [63]:
model.compile(loss='binary_crossentropy', optimizer='adam', metrics=['accuracy'])

In [64]:
model.fit(X_train, Y_train, batch_size=64, epochs=6, verbose=1)

Epoch 1/6
Epoch 2/6
Epoch 3/6
Epoch 4/6
Epoch 5/6
Epoch 6/6


<keras.callbacks.History at 0x7fcde2f79f00>

In [69]:
# model.fit(X_train, Y_train, batch_size=64, epochs=1, verbose=1)



<keras.callbacks.History at 0x7fcde1f05d80>

In [49]:
def create_X_test(N = 10000):
    df = pd.read_csv('./data/test/test.csv')
    df["input_path"] = ["./data" + x.removeprefix("BigDataCup2022/S1") for x in df["input_path"]]
    df["encoded_path"] = ["./data" + x.removeprefix("BigDataCup2022/S1") for x in df["encoded_path"]]
    X1 = np.array(get_images(df["input_path"][:N]))
    X2 = np.array(get_images(df["encoded_path"][:N])) 
    print(X1.shape, X2.shape)
    X_test = np.concatenate((X1, X2), axis = 1)
    return X_test

X_test = create_X_test()

print("x test ",X_test.shape)

(10000, 28, 28, 3) (10000, 28, 28, 3)
x test  (10000, 56, 28, 3)


In [17]:
# predicted_vec = model.predict(X_test)



In [14]:
# score = model.evaluate(X_train, Y_train)



# Create predictions for the test set with the trained model:

In [70]:
def predict(model, X):
    predicted_vec = model.predict(X)
    predictions01 = np.array([ 0 if x <= 0.5 else 1 for x in predicted_vec ])
    return predictions01

y = predict(model, X_test)



In [71]:
y.shape

(10000,)

In [72]:
def create_output(predictions):
    f = open("predictions.txt", "x")  
    for p in predictions:
        f.write(str(p))
        f.write("\n")
    f.close()

create_output(y)

In [79]:
'''
train the model on 2/3 of the training data set, 
and test it on the remaining 1/3
'''
def test_model(X_train, Y_train):
    model = create_model()
    model.compile(loss='binary_crossentropy', optimizer='adam', metrics=['accuracy'])
    X_train2 = X_train[:6600]
    Y_train2 = Y_train[:6600]

    X_test2 = X_train[6600:]
    Y_test2 = Y_train[6600:]
    model.fit(X_train2, Y_train2, batch_size=32, epochs=5, verbose=1)
    score = model.evaluate(X_test2, Y_test2)
    print('Test score:', score[0])
    print('Test accuracy:', score[1])

test_model(X_train, Y_train)

Epoch 1/5
Epoch 2/5
Epoch 3/5
Epoch 4/5
Epoch 5/5
Test score: 0.2505260407924652
Test accuracy: 0.9070149064064026
