In [None]:
import os
os.chdir('/home/sankar/VSCODE_PROJECTS/Medical-Detection3d-Toolkit')
import pandas as pd
import SimpleITK as sitk
import shutil
import json
from gen_landmark_mask import gen_single_landmark_mask

In [None]:
os.getcwd()

Setting up the target and root path

In [None]:
root_path = './assets/trial/case2'
target_path = './assets/data_set/'

# Dataset Structure

The dataset is organized in the following folder structure:


In [None]:
# data_set
# ├── patient_id1
# │   ├── patient_id1_landmark_mask.nii.gz
# │   ├── patient_id1.csv
# │   └── patient_id1.nii.gz
# ├── patient_id2
# │   ├── patient_id2_landmark_mask.nii.gz
# │   ├── patient_id2.csv
# │   └── patient_id2.nii.gz
# ├── patient_id3
# ├── patient_id..
# ├── patient_idn

In [None]:
for dirpath, dirnames, filenames in os.walk(root_path):
    for filename in filenames:
      print(filename)
      image_dir= os.path.join(target_path, os.path.basename(dirpath))
      if filename.endswith('.nii.gz'):
         os.makedirs(image_dir, exist_ok=True)
         image_volume_path = os.path.join(image_dir,os.path.basename(dirpath)+'.nii.gz')
         shutil.copy(os.path.join(dirpath,filename), image_volume_path)

      if filename.endswith('.json'):
         os.makedirs(image_dir, exist_ok=True)
         landmark_json_path = os.path.join(dirpath,filename)
         image_volume_path = os.path.join(dirpath,'volume.nii.gz')
         image_volume = sitk.ReadImage(image_volume_path)

         with open(landmark_json_path, 'r') as file:
            landmark_json = json.load(file)
         
         landmark_cos = []
         for control_point in landmark_json['markups'][0]['controlPoints']:
            landmark_co = control_point['position']
            
            landmark_cos.append({"name":control_point['label'], "x":landmark_co[0], "y":landmark_co[1], "z":landmark_co[2]})

         landmark_cos_df = pd.DataFrame(landmark_cos)
         landmark_cos_df.to_csv(os.path.join(image_dir ,os.path.basename(dirpath)+'.csv'), index=False)
         print(landmark_cos_df)

         spacing = image_volume.GetSpacing()
         pos_upper_bound = 3
         neg_lower_bound = 6

         landmark_label = {
            'Go': 1,
            'Me': 2,
            'H': 3,
         }
         landmark_mask = gen_single_landmark_mask(image_volume, landmark_cos_df, landmark_label, spacing, pos_upper_bound, neg_lower_bound)

         landmark_path = os.path.join(image_dir, os.path.basename(dirpath)+'_landmark_mask.nii.gz')
         sitk.WriteImage(landmark_mask, landmark_path, useCompression=True)

Generating train and test .csv file

In [None]:
path_to_data ={'image_name': [], 'image_path': [], 'landmark_file_path': [], 'landmark_mask_path':[]}

for dirpath, dirnames, filenames in os.walk(target_path):
    if filenames != []:
        print(dirpath, dirnames, filenames)
        path_to_data['image_name'].append(os.path.basename(dirpath))
        for filename in filenames:
            if filename.endswith('.nii.gz'):           
                if filename.endswith('mask.nii.gz'):
                    path_to_data['landmark_mask_path'].append(os.path.join(dirpath, filename))
                else:
                    path_to_data['image_path'].append(os.path.join(dirpath, filename))

            if filename.endswith('.csv'):
                path_to_data['landmark_file_path'].append(os.path.join(dirpath, filename))

In [None]:
data_df = pd.DataFrame(path_to_data)

In [None]:
data_df

Split the dataset into train and test

In [None]:
test_size = 0.2
test_count = int(test_size * len(data_df))

# Shuffle the DataFrame
df_shuffled = data_df.sample(frac=1, random_state=42).reset_index(drop=True)

# Split the DataFrame
train_df = df_shuffled.iloc[:-test_count]
test_df = df_shuffled.iloc[-test_count:]

In [None]:
train_df.to_csv(os.path.join(target_path, 'train.csv'), index=False)
test_df.to_csv(os.path.join(target_path, 'test.csv'), index=False)

In [1]:
import os

num_cores = os.cpu_count()
print(f"Number of CPU cores: {num_cores}")


Number of CPU cores: 24
