convert xml to yolo without resizing it

In [None]:
import os
import xml.etree.ElementTree as ET
from PIL import Image
from concurrent.futures import ProcessPoolExecutor, as_completed

def get_image_size(image_path):
    with Image.open(image_path) as img:
        return img.size  # returns (width, height)

def convert_xml_to_yolo(xml_file, classes, output_dir, original_image_size):
    try:
        tree = ET.parse(xml_file)
        root = tree.getroot()
        image_width = original_image_size[0]
        image_height = original_image_size[1]
        
        yolo_annotations = []
        
        for obj in root.findall('object'):
            class_name = obj.find('name').text
            if class_name not in classes:
                continue
            
            class_id = classes.index(class_name)
            bndbox = obj.find('bndbox')
            xmin = float(bndbox.find('xmin').text)
            ymin = float(bndbox.find('ymin').text)
            xmax = float(bndbox.find('xmax').text)
            ymax = float(bndbox.find('ymax').text)
            
            # Normalize coordinates
            x_center = (xmin + xmax) / 2 / image_width
            y_center = (ymin + ymax) / 2 / image_height
            width = (xmax - xmin) / image_width
            height = (ymax - ymin) / image_height
            
            yolo_annotations.append(f"{class_id} {x_center} {y_center} {width} {height}")
        
        # Write to YOLO annotation file with the same name as the XML file but with a .txt extension
        output_file = os.path.join(output_dir, os.path.splitext(os.path.basename(xml_file))[0] + '.txt')
        with open(output_file, 'w') as f:
            f.write("\n".join(yolo_annotations))
        
        # print(f"Processed: {xml_file}")
    except Exception as e:
        print(f"Error processing {xml_file}: {e}")

def process_file(args):
    xml_file, image_dir, classes, output_dir = args
    try:
        image_file = os.path.splitext(os.path.basename(xml_file))[0] + '.jpg'
        image_path = os.path.join(image_dir, image_file)
        
        if not os.path.exists(image_path):
            print(f"Image file missing: {image_path}")
            return
        
        original_image_size = get_image_size(image_path)
        
        # Convert XML annotations to YOLO format
        convert_xml_to_yolo(xml_file, classes, output_dir, original_image_size)
    except Exception as e:
        print(f"Error processing {xml_file}: {e}")

def convert_dataset(xml_dir, image_dir, classes, output_dir):
    os.makedirs(output_dir, exist_ok=True)
    xml_files = [os.path.join(xml_dir, f) for f in os.listdir(xml_dir) if f.endswith('.xml')]
    
    with ProcessPoolExecutor(max_workers=60) as executor:  # Use 32 cores
        futures = [executor.submit(process_file, (xml_file, image_dir, classes, output_dir)) for xml_file in xml_files]
        for future in as_completed(futures):
            pass  # This will ensure we wait for all tasks to complete

# Define the classes
classes = ["gun"]

# Directories
xml_dir = '/mnt/storage/kilsar_jainil/Train/Annotations'
image_dir = '/mnt/storage/kilsar_jainil/Train/images'
output_dir = '/mnt/storage/kilsar_jainil/Train/labels_new'

convert_dataset(xml_dir, image_dir, classes, output_dir)

process blur-images

In [None]:
import cv2
import os
import shutil
from concurrent.futures import ProcessPoolExecutor, as_completed
from glob import glob
import multiprocessing

def is_blurry(image, threshold=100):
    gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
    laplacian_var = cv2.Laplacian(gray, cv2.CV_64F).var()
    return laplacian_var < threshold

def detect_faces(image):
    face_cascade = cv2.CascadeClassifier(cv2.data.haarcascades + 'haarcascade_frontalface_default.xml')
    gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
    faces = face_cascade.detectMultiScale(gray, scaleFactor=1.1, minNeighbors=5, minSize=(30, 30))
    return faces

def check_face_blurriness(image, face_threshold=50):
    faces = detect_faces(image)
    for (x, y, w, h) in faces:
        face_region = image[y:y+h, x:x+w]
        if is_blurry(face_region, threshold=face_threshold):
            return True
    return False

def save_image_and_txt(image_path, output_folder_images, output_folder_labels, labels_folder):
    image_name = os.path.basename(image_path)
    save_path_image = os.path.join(output_folder_images, image_name)
    cv2.imwrite(save_path_image, cv2.imread(image_path))
    
    txt_name = os.path.splitext(image_name)[0] + ".txt"
    txt_path = os.path.join(labels_folder, txt_name)
    if os.path.exists(txt_path):
        save_path_txt = os.path.join(output_folder_labels, txt_name)
        shutil.copyfile(txt_path, save_path_txt)

def process_image(image_path, output_folder_images, output_folder_labels, labels_folder, global_threshold=100, face_threshold=50):
    image = cv2.imread(image_path)
    if image is None:
        return None, None

    if is_blurry(image, threshold=global_threshold):
        if not check_face_blurriness(image, face_threshold=face_threshold):
            save_image_and_txt(image_path, output_folder_images, output_folder_labels, labels_folder)
            return image_path, None
        else:
            return None, image_path
    else:
        save_image_and_txt(image_path, output_folder_images, output_folder_labels, labels_folder)
        return image_path, None

def filter_and_save_images(image_paths, output_folder_images, output_folder_labels, labels_folder, global_threshold=100, face_threshold=50, num_cores=None):
    if not os.path.exists(output_folder_images):
        os.makedirs(output_folder_images)
    if not os.path.exists(output_folder_labels):
        os.makedirs(output_folder_labels)

    good_images = []
    blurry_images = []

    if num_cores is None:
        num_cores = multiprocessing.cpu_count()

    with ProcessPoolExecutor(max_workers=num_cores) as executor:
        futures = [executor.submit(process_image, image_path, output_folder_images, output_folder_labels, labels_folder, global_threshold, face_threshold) for image_path in image_paths]
        for future in as_completed(futures):
            good_image, blurry_image = future.result()
            if good_image:
                good_images.append(good_image)
            if blurry_image:
                blurry_images.append(blurry_image)

    return good_images, blurry_images

def gather_image_paths(images_folder):
    image_extensions = ['jpg', 'jpeg', 'png', 'bmp']
    image_paths = []
    for ext in image_extensions:
        image_paths.extend(glob(os.path.join(images_folder, f'*.{ext}')))
    return image_paths

# Example usage
images_folders = ["/home/research/kilasar_sentinal_wepon_detection/data/0/train/images","/home/research/kilasar_sentinal_wepon_detection/data/1/train/images","/home/research/kilasar_sentinal_wepon_detection/data/2/train/images" , "/home/research/kilasar_sentinal_wepon_detection/data/n/train/images",
                  "/home/research/kilasar_sentinal_wepon_detection/data/0/val/images","/home/research/kilasar_sentinal_wepon_detection/data/1/val/images","/home/research/kilasar_sentinal_wepon_detection/data/2/val/images" , "/home/research/kilasar_sentinal_wepon_detection/data/n/val/images"]

labels_folders = ["/home/research/kilasar_sentinal_wepon_detection/data/0/train/labels" , "/home/research/kilasar_sentinal_wepon_detection/data/1/train/labels" , "/home/research/kilasar_sentinal_wepon_detection/data/2/train/labels" , "/home/research/kilasar_sentinal_wepon_detection/data/n/train/labels",
                  "/home/research/kilasar_sentinal_wepon_detection/data/0/val/labels" , "/home/research/kilasar_sentinal_wepon_detection/data/1/val/labels" , "/home/research/kilasar_sentinal_wepon_detection/data/2/val/labels" , "/home/research/kilasar_sentinal_wepon_detection/data/n/val/labels"]

output_folder_images = ["/mnt/storage/kilsar-sentinal-data/0/train/images" , "/mnt/storage/kilsar-sentinal-data/1/train/images" , "/mnt/storage/kilsar-sentinal-data/2/train/images" , "/mnt/storage/kilsar-sentinal-data/n/train/images",
                        "/mnt/storage/kilsar-sentinal-data/0/val/images" , "/mnt/storage/kilsar-sentinal-data/1/val/images" , "/mnt/storage/kilsar-sentinal-data/2/val/images" , "/mnt/storage/kilsar-sentinal-data/n/val/images"]

output_folder_labels = ["/mnt/storage/kilsar-sentinal-data/0/train/labels" , "/mnt/storage/kilsar-sentinal-data/1/train/labels" , "/mnt/storage/kilsar-sentinal-data/2/train/labels" , "/mnt/storage/kilsar-sentinal-data/n/train/labels",
                        "/mnt/storage/kilsar-sentinal-data/0/val/labels" , "/mnt/storage/kilsar-sentinal-data/1/val/labels" , "/mnt/storage/kilsar-sentinal-data/2/val/labels" , "/mnt/storage/kilsar-sentinal-data/n/val/labels"]


for i in range(len(images_folders)):
    image_paths = gather_image_paths(images_folders[i])
    good_images, blurry_images = filter_and_save_images(image_paths, output_folder_images[i], output_folder_labels[i], labels_folders[i], num_cores=60)

    print(f"Good Images for {images_folders[i]} {len(good_images)}")
    print(f"Blurry Images for {images_folders[i]} {len(blurry_images)}")


change textfiles classes from from new to old anything

In [None]:
def change_class_label_in_file(filepath, old_class=0, new_class=1):
    with open(filepath, 'r') as file:
        lines = file.readlines()

    with open(filepath, 'w') as file:
        for line in lines:
            parts = line.strip().split()
            if parts[0] == str(old_class):
                parts[0] = str(new_class)
            file.write(" ".join(parts) + "\n")

def change_class_labels_in_directory(directory, old_class=2, new_class=1, num_workers=None):
    txt_files = [os.path.join(directory, filename) for filename in os.listdir(directory) if filename.endswith(".txt")]

    with ProcessPoolExecutor(max_workers=num_workers) as executor:
        futures = [executor.submit(change_class_label_in_file, filepath, old_class, new_class) for filepath in txt_files]
        for future in futures:
            future.result()  # Ensure any exceptions are raised

# Usage
directories = ['/home/research/kilasar_sentinal_wepon_detection/data/0/test/labels','/home/research/kilasar_sentinal_wepon_detection/data/0/train/labels','/home/research/kilasar_sentinal_wepon_detection/data/0/val/labels']
for directory in directories:
    change_class_labels_in_directory(directory, old_class=0, new_class=1, num_workers=36)
    print(directory , 'folder compelted ')


WRONG : split data from images and labels 0.8 0.1 0.1

In [None]:
import os
import random
import shutil
from concurrent.futures import ProcessPoolExecutor

# Directories
images_dir = '/mnt/storage/kilsar_jainil/train/Train/JPEGImages/'
labels_dir = '/mnt/storage/kilsar_jainil/train/Train/labels'
# Output directories
output_base_dir = '/home/research/kilasar_sentinal_wepon_detection/suryansh/outsiside_data/dataset-2'
train_images_dir = os.path.join(output_base_dir, 'train/images')
train_labels_dir = os.path.join(output_base_dir, 'train/labels')
valid_images_dir = os.path.join(output_base_dir, 'valid/images')
valid_labels_dir = os.path.join(output_base_dir, 'valid/labels')
test_images_dir = os.path.join(output_base_dir, 'test/images')
test_labels_dir = os.path.join(output_base_dir, 'test/labels')

# Ensure output directories exist
os.makedirs(train_images_dir, exist_ok=True)
os.makedirs(train_labels_dir, exist_ok=True)
os.makedirs(valid_images_dir, exist_ok=True)
os.makedirs(valid_labels_dir, exist_ok=True)
os.makedirs(test_images_dir, exist_ok=True)
os.makedirs(test_labels_dir, exist_ok=True)

# List all files in images_dir and labels_dir
images_files = os.listdir(images_dir)
labels_files = os.listdir(labels_dir)



# # Shuffle the lists
# random.shuffle(images_files)
# random.shuffle(labels_files)


# Calculate split sizes
total_files = len(images_files)
train_split = int(total_files * 0.8)
valid_split = int(total_files * 0.1)

# Split the data
train_images = images_files[:train_split]
valid_images = images_files[train_split:train_split + valid_split]
test_images = images_files[train_split + valid_split:]

train_labels = labels_files[:train_split]
valid_labels = labels_files[train_split:train_split + valid_split]
test_labels = labels_files[train_split + valid_split:]

# Function to copy files
def copy_files(files, source_dir, dest_dir):
    for file in files:
        shutil.copy(os.path.join(source_dir, file), dest_dir)

# Use ProcessPoolExecutor with 32 cores
n_workers = 55
with ProcessPoolExecutor(max_workers=n_workers) as executor:
    # Copy train images and labels
    executor.submit(copy_files, train_images, images_dir, train_images_dir)
    executor.submit(copy_files, train_labels, labels_dir, train_labels_dir)
    
    # Copy validation images and labels
    executor.submit(copy_files, valid_images, images_dir, valid_images_dir)
    executor.submit(copy_files, valid_labels, labels_dir, valid_labels_dir)
    
    # Copy test images and labels
    executor.submit(copy_files, test_images, images_dir, test_images_dir)
    executor.submit(copy_files, test_labels, labels_dir, test_labels_dir)

print("Data copied successfully into train, valid, and test sets with separate folders for images and labels using ProcessPoolExecutor with 32 cores.")


Data copied successfully into train, valid, and test sets with separate folders for images and labels using ProcessPoolExecutor with 32 cores.


CORRECT : 

In [None]:
import os
import random
import shutil
from concurrent.futures import ProcessPoolExecutor

# Directories
images_dir = '/mnt/storage/kilsar_jainil/train/Train/JPEGImages/'
labels_dir = '/mnt/storage/kilsar_jainil/train/Train/labels'
# Output directories
output_base_dir = '/home/research/kilasar_sentinal_wepon_detection/suryansh/outsiside_data/dataset-3'
train_images_dir = os.path.join(output_base_dir, 'train/images')
train_labels_dir = os.path.join(output_base_dir, 'train/labels')
valid_images_dir = os.path.join(output_base_dir, 'valid/images')
valid_labels_dir = os.path.join(output_base_dir, 'valid/labels')
test_images_dir = os.path.join(output_base_dir, 'test/images')
test_labels_dir = os.path.join(output_base_dir, 'test/labels')

# Ensure output directories exist
os.makedirs(train_images_dir, exist_ok=True)
os.makedirs(train_labels_dir, exist_ok=True)
os.makedirs(valid_images_dir, exist_ok=True)
os.makedirs(valid_labels_dir, exist_ok=True)
os.makedirs(test_images_dir, exist_ok=True)
os.makedirs(test_labels_dir, exist_ok=True)

# List all image files
image_files = os.listdir(images_dir)

# Create image-label pairs
image_label_pairs = [(img, img.replace('.jpg', '.txt')) for img in image_files]

# Filter out pairs where either the image or the label file doesn't exist
image_label_pairs = [(img, lbl) for img, lbl in image_label_pairs if os.path.exists(os.path.join(labels_dir, lbl))]

# Shuffle the list
random.shuffle(image_label_pairs)

# Calculate split sizes
total_files = len(image_label_pairs)
train_split = int(total_files * 0.8)
valid_split = int(total_files * 0.1)

# Split the data
train_pairs = image_label_pairs[:train_split]
valid_pairs = image_label_pairs[train_split:train_split + valid_split]
test_pairs = image_label_pairs[train_split + valid_split:]

# Function to copy files
def copy_files(pairs, images_dir, labels_dir, dest_images_dir, dest_labels_dir):
    for img_file, lbl_file in pairs:
        shutil.copy(os.path.join(images_dir, img_file), dest_images_dir)
        shutil.copy(os.path.join(labels_dir, lbl_file), dest_labels_dir)

# Use ProcessPoolExecutor with 32 cores
n_workers = 32
with ProcessPoolExecutor(max_workers=n_workers) as executor:
    # Copy train images and labels
    executor.submit(copy_files, train_pairs, images_dir, labels_dir, train_images_dir, train_labels_dir)
    
    # Copy validation images and labels
    executor.submit(copy_files, valid_pairs, images_dir, labels_dir, valid_images_dir, valid_labels_dir)
    
    # Copy test images and labels
    executor.submit(copy_files, test_pairs, images_dir, labels_dir, test_images_dir, test_labels_dir)

print("Data copied successfully into train, valid, and test sets with separate folders for images and labels using ProcessPoolExecutor with 32 cores.")


creating dataset form drive

In [2]:
import os
from concurrent.futures import ProcessPoolExecutor

def create_yolo_annotation():
    # YOLO annotation: class (0), x_center, y_center, width, height
    # Since the bounding box covers the entire image:
    # x_center, y_center = 0.5 (center of the image in normalized coordinates)
    # width, height = 1.0 (whole image in normalized coordinates)
    return "0 0.5 0.5 1.0 1.0\n"

def process_image(filename, image_folder, output_folder, yolo_annotation):
    if filename.endswith(".jpg") or filename.endswith(".png"):  # Add other image formats if needed
        # Create the corresponding .txt file
        txt_filename = os.path.splitext(filename)[0] + ".txt"
        txt_path = os.path.join(output_folder, txt_filename)
        
        # Write the YOLO annotation to the .txt file
        with open(txt_path, "w") as txt_file:
            txt_file.write(yolo_annotation)

def create_yolo_files(image_folder, output_folder, num_workers=32):
    if not os.path.exists(output_folder):
        os.makedirs(output_folder)
    
    yolo_annotation = create_yolo_annotation()
    
    # Get list of files to process
    filenames = [f for f in os.listdir(image_folder) if f.endswith(".jpg") or f.endswith(".png")]
    
    # Use ProcessPoolExecutor to process images in parallel
    with ProcessPoolExecutor(max_workers=num_workers) as executor:
        futures = [executor.submit(process_image, filename, image_folder, output_folder, yolo_annotation) for filename in filenames]
        
        # Ensure all futures are completed
        for future in futures:
            future.result()

# Path to the folder containing the images
image_folder = '/mnt/storage/kilsar_jainil/drive-download-20240626T221301Z-001/classifier/gun/train/'

# Path to the folder where YOLO files will be saved
output_folder = '/mnt/storage/kilsar_jainil/drive-download-20240626T221301Z-001/classifier/gun/labels/train/'

# Create YOLO formatted text files using 32 cores
create_yolo_files(image_folder, output_folder, num_workers=60)
