In [1]:
import os
import zipfile
import cv2
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import tensorflow as tf
from tensorflow.keras.preprocessing.image import ImageDataGenerator

In [2]:
def preprocess_image(image, size=(128, 128)):
    """
    Perform preprocessing on a single image:
    1. Resize
    2. Convert to grayscale
    3. Denoise
    4. Equalize histogram
    5. Normalize
    6. Binarize
    """
    image = cv2.resize(image, size)  # Resize
    grayscale = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)  # Convert to grayscale
    denoised = cv2.fastNlMeansDenoising(grayscale, h=10)  # Denoise
    equalized = cv2.equalizeHist(denoised)  # Histogram equalization
    normalized = equalized / 255.0  # Normalize to [0, 1]
    _, binary = cv2.threshold(normalized, 0.5, 1.0, cv2.THRESH_BINARY)  # Binarize
    return binary

def save_preprocessed_images(input_dir, output_dir, size=(128, 128)):
    """
    Apply preprocessing to all images in the input directory and save them.
    """
    os.makedirs(output_dir, exist_ok=True)
    for folder in os.listdir(input_dir):
        folder_path = os.path.join(input_dir, folder)
        if not os.path.isdir(folder_path):
            continue

        output_folder = os.path.join(output_dir, folder)
        os.makedirs(output_folder, exist_ok=True)

        for file_name in os.listdir(folder_path):
            if not file_name.lower().endswith(('png', 'jpg', 'jpeg')):
                continue

            file_path = os.path.join(folder_path, file_name)
            image = cv2.imread(file_path)
            if image is None:
                print(f"Warning: Failed to read image: {file_path}")
                continue

            preprocessed_image = preprocess_image(image, size=size)
            output_file = os.path.join(output_folder, file_name)
            cv2.imwrite(output_file, (preprocessed_image * 255).astype(np.uint8))
        print(f"Processed folder: {folder}")

In [3]:
# ================== Data Augmentation ================== #
def augment_images(input_dir, output_dir, augmentations=5, size=(128, 128)):
    """
    Perform data augmentation on all images in the input directory.
    """
    datagen = ImageDataGenerator(
        rotation_range=30,
        width_shift_range=0.1,
        height_shift_range=0.1,
        shear_range=0.2,
        zoom_range=0.2,
        horizontal_flip=True,
        fill_mode='nearest'
    )
    
    os.makedirs(output_dir, exist_ok=True)
    for folder in os.listdir(input_dir):
        folder_path = os.path.join(input_dir, folder)
        if not os.path.isdir(folder_path):
            continue

        save_folder = os.path.join(output_dir, folder)
        os.makedirs(save_folder, exist_ok=True)

        for file_name in os.listdir(folder_path):
            if not file_name.lower().endswith(('png', 'jpg', 'jpeg')):
                continue

            file_path = os.path.join(folder_path, file_name)
            image = cv2.imread(file_path)
            if image is None:
                print(f"Warning: Failed to read image: {file_path}")
                continue

            image = cv2.resize(image, size)  # Resize for consistency
            image = np.expand_dims(image, axis=0)  # Expand dimensions

            # Generate augmentations
            i = 0
            for batch in datagen.flow(image, batch_size=1, save_to_dir=save_folder, save_prefix='aug', save_format='png'):
                i += 1
                if i >= augmentations:
                    break
        print(f"Augmented folder: {folder}")

In [5]:
# ================== Main Workflow ================== #
if __name__ == "__main__":
    # Paths
    dataset_zip_path = 'Deepdive.zip'
    extracted_folder = 'Deepdive'
    train_dir = os.path.join(extracted_folder, 'train')
    validation_dir = os.path.join(extracted_folder, 'validation')
    test_dir = os.path.join(extracted_folder, 'test')

def extract_dataset(zip_path, extract_to):
    """
    Extracts a ZIP file to a specified directory.

    Args:
        zip_path (str): Path to the ZIP file.
        extract_to (str): Directory where the contents should be extracted.
    """
    if not os.path.exists(extract_to):
        with zipfile.ZipFile(zip_path, 'r') as zip_ref:
            zip_ref.extractall(extract_to)
            print(f"Dataset extracted successfully to {extract_to}.")
    else:
        print(f"Dataset already exists at {extract_to}.")
    # Step 1: Extract dataset
    extract_dataset(dataset_zip_path, extracted_folder)

    # Step 2: Preprocess images
    preprocess_output_dir = os.path.join(extracted_folder, 'train_preprocessed')
    save_preprocessed_images(train_dir, preprocess_output_dir)

    # Step 3: Augment images
    augment_output_dir = os.path.join(extracted_folder, 'train_augmented')
    augment_images(preprocess_output_dir, augment_output_dir, augmentations=5)

    print("Data Preprocessing and Augmentation Complete")

In [6]:
print("Data preprocessing complete")

Data preprocessing complete
