<h1>
<center>DALLE-Recongnition Dataset Preparation</center>
</h1>

<font size="3">
This notebook meticulously prepares an image dataset (DALLE Recongnition Dataset) for model training, ensuring the data is in the optimal state. Here's a high-level summary of the tasks applied within the notebook:
<br><br>
<ul>
<li><strong>Random Seed Setting</strong>: Initializes random number generators with a fixed seed to ensure reproducibility across experiments.</li>
<li><strong>Data Cleaning</strong>: Removes non-image files, excessively large images, and images with transparency, creating a clean and uniform dataset.</li>
<li><strong>Dataset Balancing</strong>: Adjusts the number of images across different classes to ensure balance, crucial for unbiased model training.</li>
<li><strong>Train-Test Split</strong>: Distributes images into training and testing sets based on a predefined ratio, preparing the dataset for both training and validation phases.</li>
<li><strong>Directory Management</strong>: Performs checks on directories to ensure they are appropriately populated or cleaned up after preprocessing, maintaining an organized file structure for easy access and processing.</li>
</ul>
</font>

## Generals

<font size="3"> 
Packages import and system configurations. 
</font>

In [None]:
import os
import torch
from torchvision import datasets, transforms
from torch.utils.data import DataLoader, Subset, SubsetRandomSampler
import torchvision
import torchvision.transforms as transforms
import torch.nn as nn
import torch.optim as optim

import matplotlib.pyplot as plt
import numpy as np
from tqdm import tqdm
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score

import os
import shutil
from random import sample
import random
from PIL import Image

import json
from captum.attr import IntegratedGradients
from captum.attr import visualization as viz
import matplotlib.cm as cm
from scipy.ndimage import zoom
import torch.nn.functional as F

current_path = os.getcwd()

<font size="3"> 
Datasets paths. 
</font>

In [None]:
dataset_path = os.path.join(current_path, 'io', 'input', 'dataset')
train_data_path = os.path.join(dataset_path, 'train')
test_data_path = os.path.join(dataset_path, 'test')

inti_dataset_path = os.path.join(current_path, 'archive')
init_real_class_path = os.path.join(inti_dataset_path, 'real')
init_fake_class_path = os.path.join(inti_dataset_path, 'fakeV2', 'fake-v2')

<font size="3"> 
Setting Random Seeds for Reproducibility. 
</font>

In [None]:
def set_seeds(seed=42):
    np.random.seed(seed)
    random.seed(seed)


## Data Clening & Preprocessing

<font size="3"> 
The following function determines if a given file is an image, checking its extension against common formats like PNG, JPG, and JPEG. 
</font>

In [None]:
def is_image_file(filename):
    # Check if a file is a PNG or JPG image based on its extension
    valid_extensions = ['.png', '.jpg', '.jpeg']
    _, ext = os.path.splitext(filename)
    return ext.lower() in valid_extensions



<font size="3"> 
The following function iterates through all files in a specified directory and removes any that do not qualify as image files, as determined by the previously described is_image_file function. This cleanup process ensures the directory contains only PNG or JPG images, streamlining the dataset for image-related tasks.
</font>

In [None]:
def delete_non_image_files(path):
    for filename in os.listdir(path):
        file_path = os.path.join(path, filename)
        if os.path.isfile(file_path) and not is_image_file(filename):
            os.remove(file_path)
            print(f"Deleted {filename} for not being a PNG or JPG image")
            

<font size="3"> 
The following function scans a specified directory for files exceeding a defined size limit (in megabytes) and deletes any file larger than this threshold. By focusing on file size, it ensures that the directory is free from overly large files, optimizing storage and potentially improving processing efficiency for tasks that involve handling a large number of files.
</font>

In [None]:
def delete_large_files(path, max_size_mb):
    for filename in os.listdir(path):
        file_path = os.path.join(path, filename)
        if os.path.isfile(file_path):
            file_size_mb = os.path.getsize(file_path) / (1024 * 1024)  # Convert to MB
            if file_size_mb > max_size_mb:
                os.remove(file_path)
                print(f"Deleted {filename} for being larger than {max_size_mb}MB")

<font size="3"> 
The following function checks if an image contains transparency by examining its mode and metadata. It supports images with 'RGBA' or 'LA' modes, which include an alpha channel, and also looks for a 'transparency' key in the image's info dictionary. 
</font>

In [None]:
def has_transparency(img_path):
    """
    Check if an image has transparency.
    """
    try:
        with Image.open(img_path) as img:
            # Check if the image has an alpha channel
            if img.mode in ('RGBA', 'LA') or ('transparency' in img.info):
                return True
            else:
                return False
    except IOError:
        print(f"Error opening {img_path}.")
        return False
    

<font size="3"> 
The following function traverses all files in a specified directory and deletes those with transparency, as determined by the previously described has_transparency function. This action ensures the dataset excludes images that contain transparent elements.
</font>

In [None]:
def delete_transparent_images(directory):
    """
    Delete images with transparency in the given directory.
    """
    for filename in os.listdir(directory):
        file_path = os.path.join(directory, filename)
        if os.path.isfile(file_path) and has_transparency(file_path):
            #print(f"Deleting transparent image: {filename}")
            os.remove(file_path)


<font size="3"> 
The following function calculates the total number of files in a specified directory..
</font>


In [None]:
def count_images(path):
    return len([name for name in os.listdir(path) if os.path.isfile(os.path.join(path, name))])


<font size="3">
The following function aims to resize a dataset by ensuring it contains only a specified number of files. It operates by:
<br>
<ol>
<li>Listing all files in the given directory.</li>
<li>If the current file count exceeds the target, randomly selecting the surplus files for deletion.</li>
<li>Removing the selected files to meet the target dataset size.</li>
</ol>

</font>

In [None]:
def create_subset(path, target_count):
    images = [name for name in os.listdir(path) if os.path.isfile(os.path.join(path, name))]
    if len(images) > target_count:
        to_delete = random.sample(images, len(images) - target_count)
        for filename in to_delete:
            file_path = os.path.join(path, filename)
            os.remove(file_path)
            #print(f"Deleted {filename} to reduce the number of images")
            

<font size="3">
The following function splits an image dataset into training and testing sets for a specified class, according to a given ratio. Here's a brief overview of its steps:
<br>
<ol>
<li>Paths Setup: It organizes the directory structure for the training and testing sets based on the class name.</li>
<li>Directory Verification: Checks if the directories for training and testing sets already exist. If not, it creates them.</li>
<li>Image Selection: Retrieves all images from the initial class directory, then calculates how many should go into the training set based on the provided ratio.</li>
<li>Distribution: Randomly selects images for the training set, with the remainder allocated to the testing set.</li>
<li>File Transfer: Moves the selected images to their respective new directories for training and testing.</li>
<li>Completion Notification: Prints a summary of how many images were allocated to each set, or indicates if the dataset was previously split.</li>
</ol>

</font>

In [None]:
def train_test_split(init_class_path, train_data_path, test_data_path, class_name, train_test_split_ratio):

    train_path = os.path.join(train_data_path, class_name)
    test_path = os.path.join(test_data_path, class_name)
    
    if not os.path.exists(test_path):

        os.makedirs(train_path, exist_ok=True)
        os.makedirs(test_path, exist_ok=True)
        
        all_images = [f for f in os.listdir(init_class_path) if os.path.isfile(os.path.join(init_class_path, f))]
        
        num_train = int(len(all_images) * train_test_split_ratio)
        
        train_images = sample(all_images, num_train)
        
        test_images = list(set(all_images) - set(train_images))
        
        for image in train_images:
            shutil.move(os.path.join(init_class_path, image), train_path)
        
        for image in test_images:
            shutil.move(os.path.join(init_class_path, image), test_path)
        
        print(f"Dataset of class: {class_name} had been splited: {num_train} to train and {len(test_images)} to test.")
    else:
        print("The dataset is already splited. No action taken.")
        

<font size="3">
The following function checks for and deletes a dataset directory only if both its specified subdirectories (real and fake classes) are empty. It operates as follows:
<br>
<ol>
<li>Empty Check: It first verifies that both the 'real class' and 'fake class' directories contain no files.</li>
<li>Deletion: If both directories are confirmed to be empty, the entire dataset directory is deleted.</li>
<li>Error Handling: Attempts to delete the dataset directory and reports any errors encountered during the process.</li>
<li>Non-empty Case: If either directory contains files, it reports that the dataset is not empty and refrains from deleting anything.</li>
</ol>
<br>
This function ensures that init dataset directories are only removed when they are completely unused, preventing accidental data loss and maintaining data integrity.

</font>

In [None]:
def check_and_delete(init_real_class_path, init_fake_class_path, inti_dataset_path):
    if not os.listdir(init_real_class_path) and not os.listdir(init_fake_class_path):
        try:
            shutil.rmtree(inti_dataset_path)
            print(f"{inti_dataset_path} has been successfully deleted.")
        except Exception as e:
            print(f"Error deleting {inti_dataset_path}: {e}")
    else:
        print("Init dataset is not empty. No action taken.")


<font size="3">
The following function streamlines the preparation of an image dataset, combining previously described functions to ensure the data is ready for machine learning tasks. It performs the following key steps:
<br>
<ol>
<li>Preliminary Checks: Verifies the initial dataset directory's existence.</li>
<li>Data Cleaning: Utilizes delete_non_image_files, delete_large_files, and delete_transparent_images to remove unsuitable images based on format, size, and transparency.</li>
<li>Dataset Balancing: Adjusts the number of images in each class to ensure a balanced dataset, employing the create_subset function.</li>
<li>Data Splitting: Splits the dataset into training and testing sets with a specified ratio using train_test_split.</li>
<li>Cleanup: Checks and deletes empty class directories post-processing through check_and_delete.</li>
</ol>
<br>

</font>

In [None]:
def init_data_preprocessing(inti_dataset_path, init_real_class_path, init_fake_class_path, max_image_size_mb, train_data_path, test_data_path, train_test_split_ratio):

    if os.path.exists(inti_dataset_path):
        
        # Step 1: Delete non-image files
        delete_non_image_files(init_real_class_path)
        delete_non_image_files(init_fake_class_path)
        
        # Step 2: Delete images larger than 5MB
        delete_large_files(init_real_class_path, max_image_size_mb)
        delete_large_files(init_fake_class_path, max_image_size_mb)

        # Step 3: Delete transparent images
        delete_transparent_images(init_real_class_path)
        delete_transparent_images(init_fake_class_path)
        
        # Step 4: Count the images remaining in path1
        c1 = count_images(init_real_class_path)

        # Step 5: Adjust the number of images in path2 to be 2*c1
        c2 = c1
        create_subset(init_fake_class_path, c2)
        
        # Step 6: Train - Test split
        train_test_split(init_real_class_path, train_data_path, test_data_path, 'REAL', train_test_split_ratio)
        train_test_split(init_fake_class_path, train_data_path, test_data_path, 'FAKE', train_test_split_ratio)
        
        # Step 7: Delete init folders
        check_and_delete(init_real_class_path, init_fake_class_path, inti_dataset_path)
    else:
        print("Init dataset does not exist. Please downloaded from: https://www.kaggle.com/datasets/superpotato9/dalle-recognition-dataset/data")



## Execution 

In [None]:
train_test_split_ratio = 0.7
max_image_size_mb = 5

set_seeds()
init_data_preprocessing(inti_dataset_path, init_real_class_path, init_fake_class_path, max_image_size_mb, train_data_path, test_data_path, train_test_split_ratio)
