In [6]:
import os
import shutil
from keras.preprocessing.image import ImageDataGenerator
from tensorflow.keras.preprocessing.image import load_img, img_to_array
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder
from keras.models import Sequential
from keras.layers import Conv2D, MaxPooling2D, Flatten, Dense
import matplotlib.pyplot as plt

In [4]:
import re
from collections import defaultdict
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder

data_directory = r'D:\Projects\Datasets\Fish_2\Fish_Data\images\cropped'

# Function to extract information from file names
def extract_info_from_filename(filename):
    match = re.match(r'([A-Za-z0-9]+)[_-]([A-Za-z0-9]+)_([0-9]+)\.png', filename)
    if match:
        family_name = match.group(1)
        fish_name = match.group(2)
        photo_number = int(match.group(3))
        return family_name, fish_name, photo_number
    else:
        return None, None, None

# Function to process data directory
def process_data_directory(data_directory):
    fish_data = defaultdict(list)

    # Iterate through each file in the directory
    for filename in os.listdir(data_directory):
        if filename.endswith('.png'):
            family_name, fish_name, photo_number = extract_info_from_filename(filename)
            if family_name is not None:
                # Append the file path to the corresponding fish entry in the dictionary
                fish_data[family_name].append(os.path.join(data_directory, filename))

    return fish_data

# Process the data directory
fish_data = process_data_directory(data_directory)

# Get all unique fish families
unique_fish_families = list(fish_data.keys())

# Split the dataset into training and testing sets with shuffling
train_fish, test_fish = train_test_split(unique_fish_families, test_size=0.2, random_state=42, shuffle=True)

# Print some statistics
print(f"Number of unique fish families: {len(fish_data)}")

for fish_family_name in train_fish:
    print(f"Number of images for {fish_family_name} in the training set: {len(fish_data[fish_family_name])}")

for fish_family_name in test_fish:
    print(f"Number of images for {fish_family_name} in the testing set: {len(fish_data[fish_family_name])}")

# Print the names of the fishes
print("\nFish Names:")
for fish_family_name in fish_data.keys():
    print(fish_family_name)

# Create labels for each image based on the family name
labels = []
for family_name, images in fish_data.items():
    labels.extend([family_name] * len(images))

# Print the labels for the first few images
print("Labels for the first few images:", labels[:10])

# Convert labels to numerical format using LabelEncoder
label_encoder = LabelEncoder()
encoded_labels = label_encoder.fit_transform(labels)

# Shuffling the data before splitting into training and testing sets
train_images, test_images, train_labels_encoded, test_labels_encoded = [], [], [], []

for family_name in unique_fish_families:
    images = fish_data[family_name]
    labels_encoded = label_encoder.transform([family_name] * len(images))

    train_images_fam, test_images_fam, train_labels_encoded_fam, test_labels_encoded_fam = train_test_split(
        images, labels_encoded, test_size=0.2, random_state=42, shuffle=True
    )

    train_images.extend(train_images_fam)
    test_images.extend(test_images_fam)
    train_labels_encoded.extend(train_labels_encoded_fam)
    test_labels_encoded.extend(test_labels_encoded_fam)

# Specify directory paths with os.path.join for better path handling
train_images_directory = os.path.join('D:\\Projects\\Datasets\\Fish_2\\Fish_Data\\images', 'train')
test_images_directory = os.path.join('D:\\Projects\\Datasets\\Fish_2\\Fish_Data\\images', 'test')

# Create directories if they don't exist
os.makedirs(train_images_directory, exist_ok=True)
os.makedirs(test_images_directory, exist_ok=True)

# Save training images to the training directory
for image_path in train_images:
    shutil.copy(image_path, os.path.join(train_images_directory, os.path.basename(image_path)))

# Save testing images to the testing directory
for image_path in test_images:
    shutil.copy(image_path, os.path.join(test_images_directory, os.path.basename(image_path)))

# Print some statistics
print(f"Number of unique fish families in training set: {len(set(train_labels_encoded))}")
print(f"Number of unique fish families in testing set: {len(set(test_labels_encoded))}")

print(f"Number of training images after shuffling: {len(train_images)}")
print(f"Number of testing images after shuffling: {len(test_images)}")


Number of unique fish families: 193
Number of images for johnius in the training set: 6
Number of images for notorynchus in the training set: 6
Number of images for selaroides in the training set: 5
Number of images for centroberyx in the training set: 5
Number of images for protonibea in the training set: 4
Number of images for cheilodactylus in the training set: 18
Number of images for trachypoma in the training set: 6
Number of images for mugim in the training set: 9
Number of images for dactylophora in the training set: 7
Number of images for hemigymnus in the training set: 31
Number of images for pentapodus in the training set: 8
Number of images for cantherhines in the training set: 58
Number of images for stethojulis in the training set: 64
Number of images for decapterus in the training set: 8
Number of images for cymbacephalus in the training set: 3
Number of images for selar in the training set: 9
Number of images for cephalopholis in the training set: 133
Number of images fo

Number of unique fish families in training set: 193
Number of unique fish families in testing set: 193
Number of training images after shuffling: 3446
Number of testing images after shuffling: 954


In [7]:
# Specify the directory for augmented images
augmented_directory = 'D:\\Projects\\Datasets\\Fish_2\\Fish_Data\\images\\augmented'
os.makedirs(augmented_directory, exist_ok=True)

# Function to load and preprocess an image
def load_and_preprocess_image(image_path):
    img = load_img(image_path, target_size=(224, 224))
    img_array = img_to_array(img)
    img_array /= 255.0  # Normalize pixel values
    return img_array

# Create an ImageDataGenerator for data augmentation
datagen = ImageDataGenerator(
    rotation_range=20,
    width_shift_range=0.2,
    height_shift_range=0.2,
    shear_range=0.2,
    zoom_range=0.2,
    horizontal_flip=True,
    fill_mode='nearest'
)


In [8]:
# Apply data augmentation to each image in your dataset and save the augmented images
# Assuming fish_data is the dictionary containing your processed dataset
for family_name, images in fish_data.items():
    for image_path in images:
        # Check if the image belongs to the training set
        if image_path in train_images:
            img = load_and_preprocess_image(image_path)
            img = img.reshape((1,) + img.shape)  # Reshape to (1, height, width, channels) for the flow method

            # Generate augmented images and save them to the augmented directory
            augment_count = 2  # Adjust as needed
            for i, batch in enumerate(datagen.flow(img, batch_size=1, save_to_dir=augmented_directory, save_prefix=family_name, save_format='png')):
                if i >= augment_count - 1:
                    break

In [9]:

import os
import numpy as np
from keras.preprocessing.image import load_img, img_to_array
from keras.utils import to_categorical

# Specify the path for augmented images
custom_augmented_directory = 'D:\\Projects\\Datasets\\Fish_2\\Fish_Data\\images\\augmented'
os.makedirs(custom_augmented_directory, exist_ok=True)

# Load and preprocess augmented training data
def custom_augmented_data_generator(directory, target_size=(224, 224), batch_size=32):
    images = []
    labels = []

    for filename in os.listdir(directory):
        if filename.endswith('.png'):
            image_path = os.path.join(directory, filename)
            img = load_img(image_path, target_size=target_size)
            img_array = img_to_array(img)
            img_array /= 255.0  # Normalize pixel values
            images.append(img_array)

            # Extract label information from the filename
            parts = filename.split('_')
            family_name = parts[0]
            label = int(parts[1])  # Assuming the label is the third part of the filename
            labels.append(label)

    images = np.array(images)
    labels = to_categorical(labels, num_classes=len(set(labels)))  # One-hot encode labels

    while True:
        for i in range(0, len(images), batch_size):
            batch_images = images[i:i+batch_size]
            batch_labels = labels[i:i+batch_size]
            yield batch_images, batch_labels

# Create a custom augmented data generator
augmented_train_generator = custom_augmented_data_generator(
    custom_augmented_directory,
    target_size=(224, 224),
    batch_size=32
)

In [10]:
# Check the first batch of data
for batch_images, batch_labels in augmented_train_generator:
    print("Received Batch")
    break  # Stop after the first batch to avoid an infinite loop

Received Batch


In [11]:
import math
# Assuming num_classes is the number of unique labels in your dataset
num_classes = len(set(labels))

# Create a simple CNN model (modify based on your requirements)
model = Sequential()
model.add(Conv2D(32, (3, 3), activation='relu', input_shape=(224, 224, 3)))
model.add(MaxPooling2D((2, 2)))
model.add(Conv2D(64, (3, 3), activation='relu'))
model.add(MaxPooling2D((2, 2)))
model.add(Conv2D(128, (3, 3), activation='relu'))
model.add(MaxPooling2D((2, 2)))
model.add(Flatten())
model.add(Dense(128, activation='relu'))
model.add(Dense(num_classes, activation='softmax'))  # Adjust based on the number of classes

model.compile(optimizer=Adam(lr=0.0001), loss='categorical_crossentropy', metrics=['accuracy'])

# Define batch size
batch_size = 32  # Adjust as needed

# Train the model using the custom augmented data generator
history = model.fit(
    augmented_train_generator,
    steps_per_epoch=math.ceil(len(images) / batch_size),  # Use ceiling division
    epochs=10,  # Adjust as needed
    verbose=1
)

Epoch 1/10
Epoch 2/10
Epoch 3/10
Epoch 4/10
Epoch 5/10
Epoch 6/10
Epoch 7/10
Epoch 8/10
Epoch 9/10
Epoch 10/10


In [12]:
import os
import pandas as pd
from keras.preprocessing.image import ImageDataGenerator

test_data_directory = 'D:\\Projects\\Datasets\\Fish_2\\Fish_Data\\images\\test'

# Create a list of all image filenames in the test data directory
test_filenames = [os.path.join(test_data_directory, file) for file in os.listdir(test_data_directory)]

# Extract labels from filenames
test_labels = [filename.split('_')[0] for filename in os.listdir(test_data_directory)]

# Combine filenames and labels into a DataFrame-like structure
test_df = pd.DataFrame({'filename': test_filenames, 'label': test_labels})

# Create a test data generator
test_datagen = ImageDataGenerator(rescale=1./255)  # Adjust based on your preprocessing steps
batch_size = 32

# Create a flow from directory generator
test_generator = test_datagen.flow_from_dataframe(
    dataframe=test_df,
    directory=None,  # Since we are providing absolute paths in 'filename'
    x_col="filename",
    y_col="label",
    target_size=(224, 224),
    batch_size=batch_size,
    class_mode='categorical',
    shuffle=False  # Keep the order of files
)

# Now you can use the test_generator for evaluation or predictions


Found 954 validated image filenames belonging to 193 classes.


In [13]:
# Evaluate the model on the test data
evaluation_result = model.evaluate(test_generator)
print("Test Loss:", evaluation_result[0])
print("Test Accuracy:", evaluation_result[1])

Test Loss: 2455.310546875
Test Accuracy: 0.007337526418268681
