In [1]:
import os
import random
from PIL import Image
import numpy as np
import shutil
import tensorflow as tf

# Define paths
image_folder = '/kaggle/input/fvsaugmented/images'
label_folder = '/kaggle/input/fvsaugmented/labels'
output_image_folder = '/kaggle/working/images'
output_label_folder = '/kaggle/working/labels'

# Create output folders if they don't exist
os.makedirs(output_image_folder, exist_ok=True)
os.makedirs(output_label_folder, exist_ok=True)

def read_label(label_path):
    with open(label_path, 'r') as f:
        lines = f.readlines()
    return [line.strip().split() for line in lines]  # Return list of labels

def write_label(label_path, label_data):
    with open(label_path, 'w') as f:
        f.write('\n'.join([' '.join(map(str, line)) for line in label_data]))

# Count instances of each class and copy files to the working directory
class_count = {}
image_dict = {}  # Store images by class

for label_file in os.listdir(label_folder):
    label_path = os.path.join(label_folder, label_file)
    label_data = read_label(label_path)
    
    for label in label_data:
        try:
            class_id = int(label[0])  # Get the class_id from the first element
            class_count[class_id] = class_count.get(class_id, 0) + 1
            
            if class_id not in image_dict:
                image_dict[class_id] = []
            image_dict[class_id].append(label_file.replace('.txt', '.jpg'))  # Store image filename for downsampling
        except (IndexError, ValueError) as e:
            print(f"Error processing label in file {label_file}: {e}")
        
        # Copy files to the output (writable) directory
        image_file = label_file.replace('.txt', '.jpg')
        shutil.copy(os.path.join(image_folder, image_file), output_image_folder)
        shutil.copy(os.path.join(label_folder, label_file), output_label_folder)

# Print initial class count summary
print("Initial class counts:", class_count)
# Function to perform image augmentations
def random_shadow(image):
    shadow = tf.random.uniform(tf.shape(image)[:2], 0.7, 1.0)
    shadow = tf.image.resize(shadow[..., tf.newaxis], [tf.shape(image)[0], tf.shape(image)[1]])
    return image * shadow

def random_zoom(image):
    zoom_factor = tf.random.uniform([], 0.8, 1.0)
    h, w = tf.shape(image)[0], tf.shape(image)[1]
    crop_size = tf.cast([h, w], tf.float32) * zoom_factor
    crop_size = tf.cast(crop_size, tf.int32)  # Cast to int32 after multiplication
    image = tf.image.random_crop(image, [crop_size[0], crop_size[1], 3])
    return tf.image.resize(image, [h, w])

def random_shift(image):
    return tf.image.random_crop(tf.pad(image, [[25, 25], [25, 25], [0, 0]]), tf.shape(image))

def random_shear(image):
    shear_factor = tf.random.uniform([], -0.2, 0.2)
    shear_matrix = [1, shear_factor, 0, 0, 1, 0, 0, 0]
    shear_matrix = tf.reshape(shear_matrix, (8,))
    
    # Applying the shear transformation with a fill value of 0 (black)
    return tf.raw_ops.ImageProjectiveTransformV3(
        images=tf.expand_dims(image, 0),
        transforms=[shear_matrix],
        output_shape=tf.shape(image)[:2],
        interpolation="BILINEAR",
        fill_mode="REFLECT",
        fill_value=0.0  # Fill value for out-of-bound pixels
    )[0]

def augment_image(image):
    image = tf.cast(image, tf.float32) / 255.0

    # Ensure the image size is sufficient for cropping
    height, width = tf.shape(image)[0], tf.shape(image)[1]
    crop_height, crop_width = min(224, height), min(224, width)
    
    # Random crop and resize
    image = tf.image.random_crop(image, [crop_height, crop_width, 3])
    image = tf.image.resize(image, [224, 224])

    # Random flip
    image = tf.image.random_flip_left_right(image)
    image = tf.image.random_flip_up_down(image)

    # Random rotation
    image = tf.image.rot90(image, k=tf.random.uniform(shape=[], minval=0, maxval=4, dtype=tf.int32))

    # Random zoom (80% chance)
    if tf.random.uniform([]) > 0.2:
        image = random_zoom(image)

    # Random shift (80% chance)
    if tf.random.uniform([]) > 0.2:
        image = random_shift(image)

    # Random shear (50% chance)
    if tf.random.uniform([]) > 0.5:
        image = random_shear(image)

    # Add random shadows (50% chance)
    if tf.random.uniform([]) > 0.5:
        image = random_shadow(image)

    # Gaussian noise (very subtle, to simulate different camera sensors)
    noise = tf.random.normal(shape=tf.shape(image), mean=0.0, stddev=0.01, dtype=tf.float32)
    image = tf.clip_by_value(image + noise, 0.0, 1.0)

    return image
# Downsample classes with more than 3000 instances
for class_id, count in class_count.items():
    if count > 3000:
        print(f"Downsampling class {class_id} from {count} to 3000")
        images_to_remove = random.sample(image_dict[class_id], count - 3000)
        for img_file in images_to_remove:
            image_path = os.path.join(output_image_folder, img_file)
            label_path = os.path.join(output_label_folder, img_file.replace('.jpg', '.txt'))
            
            if os.path.exists(image_path):
                os.remove(image_path)  # Remove the image
            if os.path.exists(label_path):
                os.remove(label_path)  # Remove the corresponding label

# Perform augmentation for classes with fewer than 3000 images
all_classes_reached_limit = False

for image_file in os.listdir(output_image_folder):
    if all_classes_reached_limit:
        break
    
    image_path = os.path.join(output_image_folder, image_file)
    label_path = os.path.join(output_label_folder, image_file.replace('.jpg', '.txt'))
    
    image = Image.open(image_path)
    label_data = read_label(label_path)
    
    # Check if label_data is empty or improperly formatted
    if not label_data or not label_data[0]:
        print(f"Skipping {image_file} due to empty or invalid label.")
        continue  # Skip this file if label data is empty or invalid
    
    class_id = int(label_data[0][0])
    
    # Augment if needed
    if class_count[class_id] < 3000:
        for i in range(5):
            aug_image = augment_image(tf.convert_to_tensor(np.array(image)))
            aug_image = Image.fromarray((aug_image.numpy() * 255).astype(np.uint8))

            aug_image_file = f"{image_file.split('.')[0]}_{class_count[class_id]}.jpg"
            aug_label_file = f"{image_file.split('.')[0]}_{class_count[class_id]}.txt"

            aug_image.save(os.path.join(output_image_folder, aug_image_file))
            write_label(os.path.join(output_label_folder, aug_label_file), label_data)

            class_count[class_id] += 1
    
    # Check if all classes have reached the limit
    if all(count >= 3000 for count in class_count.values()):
        all_classes_reached_limit = True

# Print final class counts
print("Downsampling and augmentation complete. Final class counts:")
for class_id, count in class_count.items():
    print(f"Class {class_id}: {count}")


Initial class counts: {62: 3000, 40: 3000, 10: 1800, 34: 3068, 19: 3136, 47: 2999, 24: 2999, 6: 3751, 22: 2384, 78: 2202, 71: 2999, 67: 3000, 4: 3000, 13: 2156, 35: 3855, 7: 3409, 3: 2100, 65: 3872, 36: 2995, 70: 3474, 52: 1896, 41: 1772, 68: 10051, 37: 2457, 31: 2993, 50: 2378, 81: 2030, 25: 2996, 39: 2414, 64: 1892, 54: 2526, 43: 2014, 49: 4604, 61: 2381, 30: 3039, 18: 4235, 73: 3970, 27: 2637, 38: 2992, 72: 4582, 44: 3000, 74: 1438, 77: 3178, 33: 3575, 69: 2996, 1: 11494, 17: 2260, 48: 2993, 9: 3592, 29: 2999, 45: 5840, 26: 1758, 75: 4249, 23: 3000, 76: 2166, 66: 2496, 32: 2826, 20: 514, 55: 5778, 28: 3482, 14: 2199, 59: 5160, 58: 3000, 79: 1880, 57: 3245, 63: 3195, 15: 1224, 21: 10536, 80: 3645, 0: 4386, 16: 2447, 56: 3096, 53: 2770, 11: 2108, 5: 1026, 51: 2295, 60: 3275, 2: 602, 12: 3559, 8: 3629, 42: 3340, 46: 1848}
Downsampling class 34 from 3068 to 3000
Downsampling class 19 from 3136 to 3000
Downsampling class 6 from 3751 to 3000
Downsampling class 35 from 3855 to 3000
Downsam