
# Detecting LSB stego on color images (Mono version)
### Alberto García - intentodemusico
### Coach - Julián Miranda

In [1]:
import numpy as np
import pandas as pd
import tensorflow as tf
import MirandaAttributes
import shutil, os, random,cv2
import matplotlib.image as mpimg
import matplotlib.pyplot as plt
from tensorflow.keras import backend as K
from tensorflow.keras import layers, models, callbacks
from sklearn.model_selection import train_test_split, KFold

In [2]:
def recall_m(y_true, y_pred):
    true_positives = K.sum(K.round(K.clip(y_true * y_pred, 0, 1)))
    possible_positives = K.sum(K.round(K.clip(y_true, 0, 1)))
    recall = true_positives / (possible_positives + K.epsilon())
    return recall

def precision_m(y_true, y_pred):
    true_positives = K.sum(K.round(K.clip(y_true * y_pred, 0, 1)))
    predicted_positives = K.sum(K.round(K.clip(y_pred, 0, 1)))
    precision = true_positives / (predicted_positives + K.epsilon())
    return precision

def f1_m(y_true, y_pred):
    precision = precision_m(y_true, y_pred)
    recall = recall_m(y_true, y_pred)
    return 2*((precision*recall)/(precision+recall+K.epsilon()))

# **1. Descargar y preparar conjunto de datos**

### Funciones anónimas para obtener las categorías de cada imágen

In [3]:
#Miranda images, García attributes V2
train_dataset_url = "https://raw.githubusercontent.com/intentodemusico/StegianV2/master/DatasetStegonomonoV2/train_5000.csv"
test_dataset_url = "https://raw.githubusercontent.com/intentodemusico/StegianV2/master/DatasetStegonomonoV2/test_5000.csv"

### Orden aleatorio del dataset

In [15]:
#%% Importing the dataset stegonomono V2
Labels=['Kurtosis', 'Skewness', 'Std', 'Range', 'Median', 'Garcia_Geomean', 'Epsilon_Geomean','Mobility', 'Complexity']
trainDataset = pd.read_csv(train_dataset_url,header=None)
X_train = pd.DataFrame(trainDataset.iloc[:, :-1].values,columns=Labels)
X_train["Kurtosis"]=[float(x[1:-1]) for x in X_train["Kurtosis"]]
Y_train = trainDataset.iloc[:, -1].values


testDataset = pd.read_csv(test_dataset_url,header=None)
X_test = pd.DataFrame(testDataset.iloc[:, :-1].values,columns=Labels)
X_test["Kurtosis"]=[float(x[1:-1]) for x in X_test["Kurtosis"]]
Y_test = testDataset.iloc[:, -1].values

### Obteniendo las categorías

In [5]:
import time

### Obteniendo data de x 
##### (Extensive memory usage)

In [6]:
import gc
gc.enable()
gc.garbage

[]

# **3. Base Convolucional**

##### Adding custom high-pass filter, based on previous investigators

In [8]:
# Custom filter
def high_pass(shape, dtype=None):
    f = np.zeros(shape, dtype='float32')
    f[:,:,0,0] = np.array([
            [ -1,  2 ,  -2,  2 ,  -1],
            [ 2 , -6 ,  8 , -6 ,  2 ],
            [ -2,  8 , -12,  8 ,  -2 ],
            [ 2 , -6 ,  8 , -6 ,  2 ],
            [ -1,  2 ,  -2,  2 ,  -1]
        ],dtype = 'float')
    f/=12    
    return K.variable(f, dtype='float32')

#### Initializing K-Fold

In [10]:
kf = KFold(n_splits=5, shuffle=False)
acc_per_fold = []
loss_per_fold = []

In [11]:
[[y[i],images[i]] for i in range(1,10)]

NameError: name 'y' is not defined

In [11]:
#Setting early stopping
print("Early")
earlystopping=callbacks.EarlyStopping(monitor="val_loss", mode="min",patience=20,restore_best_weights=True)

Early


In [18]:
from sklearn.preprocessing import StandardScaler
sc=StandardScaler()
X_train=np.expand_dims(sc.fit_transform(X_train),axis=1)
X_test=np.expand_dims(sc.transform(X_test),axis=1)

In [19]:
#Conv
model = models.Sequential()
model.add(layers.Conv1D(9, 1,#kernel_initializer=high_pass, #Comment if unwanted kernel
                      input_shape=((9,))))
model.add(layers.Conv1D(18, 2))
model.add(layers.Conv1D(64, 3))
#model.add(layers.MaxPooling2D((2, 2)))
# model.add(layers.Conv2D(512, (3, 3), activation='relu'))
#  model.add(layers.MaxPooling2D((2, 2)))
#model.add(layers.Conv2D(1024, (3, 3), activation='relu'))
#Dense
model.add(layers.Flatten())#, activation='relu'
model.add(layers.Dense(64))
model.add(layers.Dense(32))
model.add(layers.Dense(16, activation='relu'))
model.add(layers.Dense(10))
model.add(layers.Dense(1, activation='softmax'))
#Comp
#customSGD=tf.optimizers.SGD(learning_rate=0.05) #SGD consume más recursos computacionales, además puede tardar más tiempo optimizando
customAdam=tf.optimizers.Adam(learning_rate=0.0015)
model.compile(optimizer=customAdam,
              loss=tf.keras.losses.binary_crossentropy, #Debería ser binario?
              metrics=['accuracy',f1_m,precision_m, recall_m])
    # Generate a print
print('------------------------------------------------------------------------')
#Fitting
history = model.fit(X_train, Y_train,validation_data=(X_test, Y_test), epochs=200, callbacks=[earlystopping],batch_size=64)
gc.collect()
print("\n\n\n\n\nFitted")
plt.plot(history.history['accuracy'], label='accuracy')
plt.plot(history.history['val_accuracy'], label = 'val_accuracy')
plt.xlabel('Epoch')
plt.ylabel('Accuracy')
plt.ylim([0.5, 1])
plt.legend(loc='lower right')
test_loss, test_acc,f1_score, precision, recall = model.evaluate(x[test], y[test], verbose=2)
print("Loss:",test_loss, "\nAccuracy:",test_acc,"\nF1 score:",f1_score, "\nPrecision:",precision, "\nRecall",recall)

plt.plot(history.history['precision_m'], label='precision')
plt.plot(history.history['recall_m'], label = 'recall')
plt.plot(history.history['f1_m'], label = 'F1')
plt.xlabel('Epoch')
plt.ylabel('Metrics')
plt.legend(loc='lower right')
plt.show()
#dataMqtt={"Fold":str(fold_no),"timestamp":str(datetime.now()),"Loss":str(test_loss), "Acc":str(test_acc),"F1":str(f1_score),"Precision":str( precision),"Recall": str(recal)}
#dataMqtt_out=json.dumps(data)
#retMqtt= client1.publish("stegian/data",dataMqtt_out)

# Generate generalization metrics
scores = model.evaluate(x[test], y[test], verbose=0)
gc.collect()

ValueError: Input 0 of layer conv1d_1 is incompatible with the layer: : expected min_ndim=3, found ndim=2. Full shape received: [None, 9]

In [None]:
x.shape

##### (Extensive memory usage)

In [None]:
# == Provide average scores ==
print('------------------------------------------------------------------------')
print('Score per fold')
for i in range(0, len(acc_per_fold)):
  print('------------------------------------------------------------------------')
  print(f'> Fold {i+1} - Loss: {loss_per_fold[i]} - Accuracy: {acc_per_fold[i]}%')
print('------------------------------------------------------------------------')
print('Average scores for all folds:')
print(f'> Accuracy: {np.mean(acc_per_fold)} (+- {np.std(acc_per_fold)})')
print(f'> Loss: {np.mean(loss_per_fold)}')
print('------------------------------------------------------------------------')

# **4. Agregar capas densas en la parte superior**

In [None]:
model.summary()

# **5. Compilar y entrenar el modelo**



In [None]:
model.save("Mono_Attr_noHPf_WOW5-V3.h5")

# **6. Evaluar el modelo**