In [1]:
# !pip install nibabel

In [2]:
import os
import re
import shutil
import random
import tarfile
import nibabel as nib
import numpy as np
from PIL import Image
from tqdm import tqdm

In [3]:
src = '../../datasets/NFBS/downloads/NFBS_Dataset.tar.gz'
des = '../../datasets/NFBS/extracted'

# Create the extraction directory if it doesn't exist
os.makedirs(des, exist_ok=True)

with tarfile.open(src, 'r:gz') as tar:
    tar.extractall(path=des)

print("Extraction completed successfully.")

Extraction completed successfully.


In [4]:
def extract_and_save_views(src_dir, output_dir, slides, l_axial, r_axial, l_coronal, r_coronal, l_sagittal, r_sagittal):
    # Ensure the output directory exists
    if not os.path.exists(output_dir):
        os.makedirs(output_dir)

    # Calculate total number of files to process (for the progress bar)
    total_files = 0
    for root, dirs, files in os.walk(src_dir):
        if 'RAW' in root:
            total_files += len([f for f in files if f.endswith(('.nii', '.gz'))])

    print(f"Total files to process: {total_files}")

    with tqdm(total=total_files, desc="Processing files", unit="file", ncols=100) as pbar:
        for root, dirs, files in os.walk(src_dir):
            parent_folder = os.path.basename(os.path.dirname(root))
            relative_path = os.path.relpath(root, src_dir)
            output_subfolder = os.path.join(output_dir, relative_path)

            if not os.path.exists(output_subfolder):
                os.makedirs(output_subfolder)

            for file_name in files:
                if file_name.endswith(('.nii', '.gz')):
                    nifti_file_path = os.path.join(root, file_name)

                    try:
                        img = nib.load(nifti_file_path)
                    except Exception as e:
                        print(f"Error loading {nifti_file_path}: {e}")
                        continue

                    img_data = img.get_fdata()

                    # Determine how many slices are available
                    if len(img_data.shape) == 3:
                        num_slices = img_data.shape[2]
                    elif len(img_data.shape) == 4:
                        img_data_avg = np.mean(img_data, axis=3)
                        num_slices = img_data_avg.shape[2]
                        img_data = img_data_avg
                    else:
                        print(f"Skipping unsupported image with shape: {img_data.shape}")
                        continue

                    # Sample slices
                    for view_name in ['axial', 'coronal', 'sagittal']:
                        if view_name == 'axial':
                            slices = [img_data[:, i, :] for i in range(0, img_data.shape[0], max(1, img_data.shape[0] // slides))]
                            l_skip = l_axial
                            r_skip = r_axial
                        elif view_name == 'coronal':
                            slices = [img_data[i, :, :] for i in range(0, img_data.shape[1], max(1, img_data.shape[1] // slides))]
                            l_skip = l_coronal
                            r_skip = r_coronal
                        elif view_name == 'sagittal':
                            slices = [img_data[:, :, i] for i in range(0, num_slices, max(1, num_slices // slides))]
                            l_skip = l_sagittal
                            r_skip = r_sagittal
                            
                        # Save each slice as an image
                        for i, img_slice in enumerate(slices[l_skip:-r_skip]):
                            if img_slice.size == 0:
                                continue

                            try:
                                max_value = np.max(img_slice)
                                img_2d_normalized = np.uint8(255 * (img_slice / max_value))
                                img_pil = Image.fromarray(img_2d_normalized)

                                # Apply different rotations based on the view_name
                                if view_name == 'sagittal':
                                    img_pil = img_pil.rotate(-90, expand=True)

                                # Modify the output file name
                                extracted_id = re.search(r'A\d{8}', file_name).group(0)
                                type_label = '_brainmask' if 'brainmask' in file_name else '_brain' if 'brain' in file_name else ''
                                output_file_name = f"{view_name}_{extracted_id}{type_label}_slice_{i}.jpg"

                                output_file_path = os.path.join(output_subfolder, view_name)
                                if not os.path.exists(output_file_path):
                                    os.makedirs(output_file_path)

                                # Save the image as JPG
                                img_pil.save(os.path.join(output_file_path, output_file_name))
                            except Exception as e:
                                print(f"Error processing view {view_name} for {nifti_file_path}: {e}")

                    pbar.update(1)

    print("All files processed successfully.")

src = '../../datasets/NFBS/extracted/NFBS_Dataset'
des = '../../datasets/NFBS/extracted/NFBS_Dataset_JPG'

extract_and_save_views(
    src_dir = src, 
    output_dir = des, 
    slides = 40,
    l_axial = 10,
    r_axial = 18,
    l_coronal = 18,
    r_coronal = 10,
    l_sagittal = 13,
    r_sagittal = 15
)

Total files to process: 0


Processing files: 375file [01:35,  3.92file/s]

All files processed successfully.





In [5]:
# Source and destination directories
src = os.path.abspath('../../datasets/NFBS/extracted/NFBS_Dataset_JPG')
train_des = os.path.abspath('../../datasets/NFBS/preprocessed/train')
test_des = os.path.abspath('../../datasets/NFBS/preprocessed/test')

# Destination subdirectories (images, masks, segmentations)
image_dir = os.path.join(train_des, 'images')
mask_dir = os.path.join(train_des, 'masks')
segmented_dir = os.path.join(train_des, 'segmentations')

test_image_dir = os.path.join(test_des, 'images')
test_mask_dir = os.path.join(test_des, 'masks')
test_segmented_dir = os.path.join(test_des, 'segmentations')

# Create destination directories if they don't exist
for des in [image_dir, mask_dir, segmented_dir, test_image_dir, test_mask_dir, test_segmented_dir]:
    if not os.path.exists(des):
        os.makedirs(des)

# Set a seed for reproducibility
random.seed(42)  

# 1. List all the subfolders inside the `src` directory
all_subfolders = [os.path.join(src, folder) for folder in os.listdir(src) if os.path.isdir(os.path.join(src, folder))]

# 2. Shuffle the subfolders for reproducibility
random.shuffle(all_subfolders)

# 3. Split the subfolders into train and test sets (85% train, 15% test)
num_subfolders = len(all_subfolders)
split_idx = int(0.95 * num_subfolders)

train_subfolders = all_subfolders[:split_idx]
test_subfolders = all_subfolders[split_idx:]

# 4. Move files from each subfolder to train and test directories
def move_files(src_folder, dest_folder):
    for root, dirs, files in os.walk(src_folder):
        for file in files:
            source_file = os.path.join(root, file)

            # Determine where to move the file based on its name
            if 'brainmask' in file.lower():
                dest_subfolder = mask_dir if dest_folder == train_des else test_mask_dir
            elif 'brain' in file.lower():
                dest_subfolder = segmented_dir if dest_folder == train_des else test_segmented_dir
            else:
                dest_subfolder = image_dir if dest_folder == train_des else test_image_dir

            # Create destination path
            destination_file = os.path.join(dest_subfolder, file)  # Move directly into the target folder

            # Move the file
            try:
                shutil.move(source_file, destination_file)
            except Exception as e:
                print(f"Error moving file {source_file}: {e}")

# Move files from subfolders to the respective train and test directories
for folder in train_subfolders:
    move_files(folder, train_des)

for folder in test_subfolders:
    move_files(folder, test_des)

In [6]:
temp = os.path.abspath('../../datasets/NFBS/extracted')
if os.path.exists(temp):
    shutil.rmtree(temp)