In [None]:
# This Python 3 environment comes with many helpful analytics libraries installed
# It is defined by the kaggle/python Docker image: https://github.com/kaggle/docker-python
# For example, here's several helpful packages to load

import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)

# Input data files are available in the read-only "../input/" directory
# For example, running this (by clicking run or pressing Shift+Enter) will list all files under the input directory

import os
for dirname, _, filenames in os.walk('/kaggle/input'):
    for filename in filenames:
        print(os.path.join(dirname, filename))

# You can write up to 20GB to the current directory (/kaggle/working/) that gets preserved as output when you create a version using "Save & Run All" 
# You can also write temporary files to /kaggle/temp/, but they won't be saved outside of the current session

### Steganalysis Project: Detecting Hidden Data in Images

This project aims to build a Convolutional Neural Network (CNN) based machine learning model to detect steganographic data hidden within images, and then deploy this model via a Streamlit web application.

In [None]:
import tensorflow as tf
from tensorflow.keras import layers, models, optimizers
from tensorflow.keras.preprocessing.image import ImageDataGenerator
import numpy as np
import os
from PIL import Image

### Dataset
**Stego-Images-Dataset**
44,000 images containing malicious JS, HTML, PS, URL, ethereum embedded via LSB
.
**Dataset Link**: https://www.kaggle.com/datasets/marcozuppelli/stegoimagesdataset/data

In [None]:
import kagglehub

# Download latest version
path = kagglehub.dataset_download("marcozuppelli/stegoimagesdataset")

print("Path to dataset files:", path)

In [None]:
DATASET_PATH = '/kaggle/input/stegoimagesdataset'
IMAGE_SIZE = (128, 128) # Standardize image size for the CNN
BATCH_SIZE = 32
EPOCHS = 100 # You might need to adjust this based on your training
LEARNING_RATE = 0.001


### Data Loading and Preprocessing Functions

These functions handle loading images from the specified directories, resizing them, converting them to RGB format, and normalizing pixel values. The `create_dataset_from_structured_directory` function is designed to read from the `train`, `val`, and `test` subdirectories.


In [None]:
def load_and_preprocess_image(image_path, target_size=IMAGE_SIZE):
    """
    Loads an image, converts it to RGB, resizes it, and normalizes pixel values.
    """
    try:
        img = Image.open(image_path).convert('RGB')
        img = img.resize(target_size)
        img_array = np.array(img)
        # Normalize pixel values to [0, 1]
        img_array = img_array.astype('float32') / 255.0
        return img_array
    except Exception as e:
        print(f"Error loading image {image_path}: {e}")
        return None

def create_dataset_from_structured_directory(dataset_root, image_size=IMAGE_SIZE):
    """
    Loads images from 'clean' (cover) and 'stego' subdirectories within
    'train', 'val', and 'test' folders, accounting for the nested structure.
    Returns numpy arrays for images and labels for each split.
    """
    splits = ['train', 'val', 'test']
    datasets = {}

    for split in splits:
        # Adjusted path to account for the extra nested directory (e.g., train/train)
        split_path = os.path.join(dataset_root, split, split) # <--- THIS LINE WAS CHANGED
        clean_path = os.path.join(split_path, 'clean')
        stego_path = os.path.join(split_path, 'stego')

        all_images = []
        all_labels = [] # 0 for clean (cover), 1 for stego

        print(f"Loading {split} clean images from: {clean_path}")
        if os.path.exists(clean_path):
            for filename in os.listdir(clean_path):
                if filename.lower().endswith(('.png', '.jpg', '.jpeg', '.bmp', '.gif')):
                    img_path = os.path.join(clean_path, filename)
                    img = load_and_preprocess_image(img_path, image_size)
                    if img is not None:
                        all_images.append(img)
                        all_labels.append(0) # Label 0 for clean/cover
        else:
            print(f"Warning: Clean directory not found for {split} at {clean_path}")

        print(f"Loading {split} stego images from: {stego_path}")
        if os.path.exists(stego_path):
            for filename in os.listdir(stego_path):
                if filename.lower().endswith(('.png', '.jpg', '.jpeg', '.bmp', '.gif')):
                    img_path = os.path.join(stego_path, filename)
                    img = load_and_preprocess_image(img_path, image_size)
                    if img is not None:
                        all_images.append(img)
                        all_labels.append(1) # Label 1 for stego
        else:
            print(f"Warning: Stego directory not found for {split} at {stego_path}")

        if not all_images:
            print(f"No images found for {split} split. Skipping this split.")
            datasets[split] = (None, None)
            continue

        X = np.array(all_images)
        y = np.array(all_labels)

        # Shuffle the dataset for the current split
        indices = np.arange(len(X))
        np.random.shuffle(indices)
        X = X[indices]
        y = y[indices]

        print(f"Loaded {len(X)} images for {split} split. {np.sum(y == 0)} clean, {np.sum(y == 1)} stego.")
        datasets[split] = (X, y)

    return datasets

### CNN Model Definition

This section defines the Convolutional Neural Network (CNN) architecture. The model includes multiple convolutional layers, batch normalization, max pooling, and dropout layers to effectively learn features for steganalysis. The final layer uses a sigmoid activation for binary classification (stego vs. non-stego).



In [None]:
def create_steganalysis_cnn_model(input_shape=IMAGE_SIZE + (3,)):
    """
    Defines a CNN model architecture suitable for steganalysis.
    This model incorporates concepts like high-pass filtering (implicitly
    learned by initial layers), residual connections, and batch normalization.
    """
    model = models.Sequential([
        # Initial layers to learn high-pass filter-like features
        layers.Conv2D(32, (5, 5), activation='relu', padding='same', input_shape=input_shape),
        layers.BatchNormalization(),
        layers.Conv2D(32, (5, 5), activation='relu', padding='same'),
        layers.BatchNormalization(),
        layers.MaxPooling2D((2, 2)),
        layers.Dropout(0.25),

        layers.Conv2D(64, (3, 3), activation='relu', padding='same'),
        layers.BatchNormalization(),
        layers.Conv2D(64, (3, 3), activation='relu', padding='same'),
        layers.BatchNormalization(),
        layers.MaxPooling2D((2, 2)),
        layers.Dropout(0.25),

        layers.Conv2D(128, (3, 3), activation='relu', padding='same'),
        layers.BatchNormalization(),
        layers.Conv2D(128, (3, 3), activation='relu', padding='same'),
        layers.BatchNormalization(),
        layers.MaxPooling2D((2, 2)),
        layers.Dropout(0.25),

        layers.Flatten(),
        layers.Dense(256, activation='relu'),
        layers.BatchNormalization(),
        layers.Dropout(0.5),
        layers.Dense(1, activation='sigmoid') # Binary classification: stego or not
    ])

    # Compile the model
    model.compile(optimizer=optimizers.Adam(learning_rate=LEARNING_RATE),
                  loss='binary_crossentropy',
                  metrics=['accuracy'])

    return model

### Load Data, Create Model, and Train

This section demonstrates how to load the dataset using the defined functions, create an instance of the CNN model, and then train it using the training and validation data. Finally, the model is evaluated on the test set and saved.

In [None]:
# Load and preprocess data from structured directories
datasets = create_dataset_from_structured_directory(DATASET_PATH)

X_train, y_train = datasets.get('train', (None, None))
X_val, y_val = datasets.get('val', (None, None))
X_test, y_test = datasets.get('test', (None, None))

if X_train is not None and y_train is not None:
    print(f"X_train shape: {X_train.shape}")
    print(f"y_train shape: {y_train.shape}")
if X_val is not None and y_val is not None:
    print(f"X_val shape: {X_val.shape}")
    print(f"y_val shape: {y_val.shape}")
if X_test is not None and y_test is not None:
    print(f"X_test shape: {X_test.shape}")
    print(f"y_test shape: {y_test.shape}")

if X_train is not None and y_train is not None and X_val is not None and y_val is not None:
    # Create the CNN model
    model = create_steganalysis_cnn_model()
    model.summary()

    # Train the model
    print("\n--- Starting Model Training ---")
    history = model.fit(X_train, y_train,
                        epochs=EPOCHS,
                        batch_size=BATCH_SIZE,
                        validation_data=(X_val, y_val)) # Use validation set for validation

    print("\n--- Model Training Complete ---")

    # Evaluate the model on the test set if available
    if X_test is not None and y_test is not None:
        loss, accuracy = model.evaluate(X_test, y_test)
        print(f"Test Loss: {loss:.4f}")
        print(f"Test Accuracy: {accuracy:.4f}")
    else:
        print("Test set not available for evaluation.")

    # Save the trained model
    MODEL_SAVE_PATH = 'steganalysis_cnn_model.h5'
    model.save(MODEL_SAVE_PATH)
    print(f"Model saved to {MODEL_SAVE_PATH}")
else:
    print("Required training or validation data not loaded. Cannot proceed with model training.")