In [1]:
import os
import shutil
from pathlib import Path

# Define source and destination directories
source_dir = Path('CAESAR/output_dataset')
destination_dir = Path('data/segmentation_dataset')

# Create subdirectories in the destination
for i in range(10):
    # Each subfolder gets an 'images' and 'masks' subdirectory
    os.makedirs(destination_dir / f'subfolder_{i+1}' / 'images', exist_ok=True)
    os.makedirs(destination_dir / f'subfolder_{i+1}' / 'masks', exist_ok=True)

# Function to distribute matching images and masks
def distribute_files():
    # Get all image files in the source images subdirectory
    image_files = os.listdir(source_dir / 'images')
    num_files = len(image_files)
    
    # Iterate through image files
    for index, image_name in enumerate(image_files):
        mask_name = image_name  # Assuming mask names match image names exactly

        # Determine which subfolder to use based on file index
        subfolder_index = index % 10  # This will distribute files evenly across 10 subfolders

        # Define the target subdirectories for images and masks
        target_image_subdir = destination_dir / f'subfolder_{subfolder_index+1}' / 'images'
        target_mask_subdir = destination_dir / f'subfolder_{subfolder_index+1}' / 'masks'

        # Move image
        shutil.move(str(source_dir / 'images' / image_name), str(target_image_subdir / image_name))
        # Move mask
        shutil.move(str(source_dir / 'masks' / mask_name), str(target_mask_subdir / mask_name))

distribute_files()

print("Files have been successfully moved and distributed, ensuring images and masks match in each subfolder.")

Files have been successfully moved and distributed, ensuring images and masks match in each subfolder.


In [None]:
import os
import shutil
from math import ceil

# Define source and target directories
segmentation_src_dir = 'CAESAR/processed_dataset/setA/segmentation'
scans_src_dir = 'CAESAR/processed_dataset/setA/scans'
base_dst_dir = 'data/3D_reconstruction'

# Create 10 subfolders under data/3D_reconstruction, each with an images and scans folder
num_subfolders = 10
for i in range(1, num_subfolders + 1):
    os.makedirs(os.path.join(base_dst_dir, f'subfolder_{i}', 'images'), exist_ok=True)
    os.makedirs(os.path.join(base_dst_dir, f'subfolder_{i}', 'scans'), exist_ok=True)

# Function to distribute files among the subfolders ensuring corresponding scans and images are together
def distribute_files():
    scan_files = [f for f in os.listdir(scans_src_dir) if f.endswith('.ply')]
    num_files = len(scan_files)
    files_per_folder = ceil(num_files / num_subfolders)

    for i, scan_filename in enumerate(scan_files):
        base_name = os.path.splitext(scan_filename)[0]  # Get the base name without extension
        front_view_filename = f"{base_name}_front_view.png"
        side_view_filename = f"{base_name}_side_view.png"

        # Check if corresponding images exist
        if not (os.path.exists(os.path.join(segmentation_src_dir, front_view_filename)) and
                os.path.exists(os.path.join(segmentation_src_dir, side_view_filename))):
            print(f"Corresponding images for {scan_filename} not found. Skipping.")
            continue

        # Determine the destination subfolder index
        subfolder_index = (i // files_per_folder) + 1

        # Define source and destination paths
        scan_src_path = os.path.join(scans_src_dir, scan_filename)
        front_view_src_path = os.path.join(segmentation_src_dir, front_view_filename)
        side_view_src_path = os.path.join(segmentation_src_dir, side_view_filename)

        scan_dst_path = os.path.join(base_dst_dir, f'subfolder_{subfolder_index}', 'scans', scan_filename)
        front_view_dst_path = os.path.join(base_dst_dir, f'subfolder_{subfolder_index}', 'images', front_view_filename)
        side_view_dst_path = os.path.join(base_dst_dir, f'subfolder_{subfolder_index}', 'images', side_view_filename)

        # Copy files to destination
        shutil.copy(scan_src_path, scan_dst_path)
        shutil.copy(front_view_src_path, front_view_dst_path)
        shutil.copy(side_view_src_path, side_view_dst_path)

# Distribute the files
distribute_files()

print("Files have been copied and distributed successfully.")

____________________________________________________________________________________________________________________________________________

### Measurement Data ###

In [9]:
import os
import shutil
import random
import pandas as pd

# Paths
source_images_dir = 'CAESAR/processed_dataset/setA/segmentation'
source_masks_dir = 'CAESAR/processed_dataset/setA/masks'
dest_dir = 'measurements/data/dataset'
train_dir = os.path.join(dest_dir, 'train')
val_dir = os.path.join(dest_dir, 'val')
test_dir = os.path.join(dest_dir, 'test')

# Measurement directories and file names
measurement_dirs = [f'CAESAR/dataset/A{i}-NORTH AMERICA' for i in range(1, 13)]
measurement_files = [f'MeasurementsMetric{i}.txt' for i in range(1, 13)]

# Create directories if they don't exist
for folder in [train_dir, val_dir, test_dir]:
    os.makedirs(os.path.join(folder, 'images'), exist_ok=True)
    os.makedirs(os.path.join(folder, 'masks'), exist_ok=True)

# Get all image file names
all_files = [f for f in os.listdir(source_images_dir) if '_front_view.png' in f]

# Shuffle and split into train, val, test sets
random.shuffle(all_files)
num_files = len(all_files)
train_files = all_files[:int(0.8 * num_files)]
val_files = all_files[int(0.8 * num_files):int(0.9 * num_files)]
test_files = all_files[int(0.9 * num_files):]

In [10]:
# Function to copy and rename files
def copy_and_rename_files(files, split):
    records = []
    for file in files:
        base_name = file.replace('_front_view.png', '')
        
        # Define new names
        new_front_image_name = f'{base_name}_frontal.png'
        new_side_image_name = f'{base_name}_lateral.png'
        new_front_mask_name = f'{base_name}_frontal_mask.png'
        new_side_mask_name = f'{base_name}_lateral_mask.png'
        
        # Copy and rename images
        image_src = os.path.join(source_images_dir, f'{base_name}_front_view.png')
        image_dst = os.path.join(dest_dir, split, 'images', new_front_image_name)
        shutil.copy(image_src, image_dst)
        
        image_src = os.path.join(source_images_dir, f'{base_name}_side_view.png')
        image_dst = os.path.join(dest_dir, split, 'images', new_side_image_name)
        shutil.copy(image_src, image_dst)
        
        # Copy and rename masks
        mask_src = os.path.join(source_masks_dir, f'{base_name}_front_mask.png')
        mask_dst = os.path.join(dest_dir, split, 'masks', new_front_mask_name)
        shutil.copy(mask_src, mask_dst)
        
        mask_src = os.path.join(source_masks_dir, f'{base_name}_side_mask.png')
        mask_dst = os.path.join(dest_dir, split, 'masks', new_side_mask_name)
        shutil.copy(mask_src, mask_dst)
        
        records.append({
            'split': split,
            'base_name': base_name,
            'frontal_image': new_front_image_name,
            'lateral_image': new_side_image_name,
            'frontal_mask': new_front_mask_name,
            'lateral_mask': new_side_mask_name
        })
    return records

# Copy and rename files and create records
train_records = copy_and_rename_files(train_files, 'train')
val_records = copy_and_rename_files(val_files, 'val')
test_records = copy_and_rename_files(test_files, 'test')

# Combine records into a DataFrame
records = train_records + val_records + test_records
df = pd.DataFrame(records)

In [11]:
# Read and combine all measurement files with error handling
measurements = []
for directory, measurement_file in zip(measurement_dirs, measurement_files):
    file_path = os.path.join(directory, measurement_file)
    if os.path.exists(file_path):
        try:
            df_measurements = pd.read_csv(file_path, delimiter=',', encoding='utf-8')
            measurements.append(df_measurements)
        except Exception as e:
            print(f"Error reading {file_path}: {e}")

measurements_df = pd.concat(measurements, ignore_index=True)
measurements_df.rename(columns=lambda x: x.strip(), inplace=True)

In [12]:
# Function to create metadata.csv files
def create_metadata_csv(records, split):
    metadata = []
    # for record in records:
    #     subject_number = int(record['base_name'][3:7])
    #     measurement = measurements_df[measurements_df['Subject Number'] == subject_number]
    #     if not measurement.empty:
    #         metadata.append({
    #             'frontal_image': record['frontal_image'],
    #             'lateral_image': record['lateral_image'],
    #             'frontal_mask': record['frontal_mask'],
    #             'lateral_mask': record['lateral_mask'],
    #             **measurement.iloc[0].to_dict()
    #         })
    # metadata_df = pd.DataFrame(metadata)
    # metadata_df = metadata_df.sort_values(by=['frontal_image'])
    # metadata_df.to_csv(os.path.join(dest_dir, split, 'metadata.csv'), index=False)
    
    for record in records:
        base_name = record['base_name']
        subject_number = int(base_name[3:7])
        measurement = measurements_df[measurements_df['Subject Number'] == subject_number]
        if not measurement.empty:
            measurement_dict = measurement.iloc[0].to_dict()
            measurement_dict.pop('Recorder', None)
            measurement_dict.pop('Measurer', None)
            measurement_dict['frontal_image'] = record['frontal_image']
            measurement_dict['lateral_image'] = record['lateral_image']
            measurement_dict['frontal_mask'] = record['frontal_mask']
            measurement_dict['lateral_mask'] = record['lateral_mask']
            metadata.append(measurement_dict)
    metadata_df = pd.DataFrame(metadata)
    metadata_df = metadata_df.sort_values(by=['Subject Number'])
    metadata_df.to_csv(os.path.join(dest_dir, split, 'metadata.csv'), index=False)

# Create metadata.csv files for train, val, and test sets
create_metadata_csv(train_records, 'train')
create_metadata_csv(val_records, 'val')
create_metadata_csv(test_records, 'test')

# Create a CSV file that lists which folder each file is in
file_locations = []

for record in train_records:
    file_locations.append({
        'base_name': record['base_name'],
        'frontal_image': record['frontal_image'],
        'lateral_image': record['lateral_image'],
        'frontal_mask': record['frontal_mask'],
        'lateral_mask': record['lateral_mask'],
        'folder': 'train'
    })
for record in val_records:
    file_locations.append({
        'base_name': record['base_name'],
        'frontal_image': record['frontal_image'],
        'lateral_image': record['lateral_image'],
        'frontal_mask': record['frontal_mask'],
        'lateral_mask': record['lateral_mask'],
        'folder': 'val'
    })
for record in test_records:
    file_locations.append({
        'base_name': record['base_name'],
        'frontal_image': record['frontal_image'],
        'lateral_image': record['lateral_image'],
        'frontal_mask': record['frontal_mask'],
        'lateral_mask': record['lateral_mask'],
        'folder': 'test'
    })

file_locations_df = pd.DataFrame(file_locations)
file_locations_df = file_locations_df.sort_values(by=['base_name'])
file_locations_df.to_csv(os.path.join(dest_dir, 'file_locations.csv'), index=False)

print("Files copied, renamed, and metadata CSV files created and sorted successfully.")

Files copied, renamed, and metadata CSV files created and sorted successfully.
