In [1]:
import os
import pathlib
import pandas as pd
import random
import numpy as np
import matplotlib.pyplot as plt
from dataclasses import dataclass
import nibabel as nib
from nilearn import plotting
import cv2
from tqdm import tqdm

In [None]:
@dataclass(frozen=True)
class DatasetConfig:
    NUM_CLASSES: int = 43
    IMG_HEIGHT:  int = 224
    IMG_WIDTH:   int = 224
    CHANNELS:    int = 3
    SEED_VALUE:  int = 41
    CUT_PLANE:   str = "sagital"
         
    DATA_ROOT_SOURCE_PATH: str = 'D:\\Autopet_v1.1\\2024-05-10_Autopet_v1.1'
    MAIN_DATA_CSV_PATH: str = DATA_ROOT_SOURCE_PATH + '\\fdg_metadata.csv'
    DATA_ROOT_TARGET:  str = f'D:\\images\\{CUT_PLANE}' 
    DATA_ROOT_TRAIN:  str = DATA_ROOT_TARGET + '/Train' 
    DATA_ROOT_VALID:  str = DATA_ROOT_TARGET + '/Valid'
    DATA_ROOT_TEST:   str = DATA_ROOT_TARGET + '/Test'
    DATA_TEST_GT:     str = DATA_ROOT_TARGET + '/Test.csv'

In [None]:
random.seed(DatasetConfig.SEED_VALUE)
np.random.seed(DatasetConfig.SEED_VALUE)

In [None]:
fdg_data = pd.read_csv(DatasetConfig.MAIN_DATA_CSV_PATH)
fdg_data

In [None]:
fdg_data["Number of Images"].describe()

In [None]:
def get_image_path(record):
    prefix = record["Subject ID"].split("_")[-1]
    sufix = record["File Location"].split("/")[-2]
    folder = "labelsTr" if record["Modality"] == "SEG" else "imagesTr"
    
    final_number = ""
    if record["Modality"] == "CT":
        final_number = "_0000"
    if record["Modality"] == "PT":
        final_number = "_0001"
        
    return DatasetConfig.DATA_ROOT_SOURCE_PATH + f"/{folder}/fdg_{prefix}_{sufix}{final_number}.nii.gz"

Target Folder Tree:

0 subject_id-study_uid-image_index.png                                     
1 subject_id-study_uid-image_index.png

In [None]:
fdg_data

In [None]:
pre_processed_df = fdg_data.copy()

In [None]:
pre_processed_df.diagnosis = pre_processed_df.diagnosis.apply(lambda x: "POSITIVE" if x != "NEGATIVE" else x)
pre_processed_df = pre_processed_df[["Subject ID","Study UID","Modality","Number of Images","File Location","diagnosis"]]
pre_processed_df

In [None]:
pre_processed_df["Study UID"].iloc[0]

In [None]:
single_reg_df = pre_processed_df[["Subject ID", "Study UID", "diagnosis"]].copy().drop_duplicates()
# adding columns to dataframe 
single_reg_df["sliceNum"] = None
single_reg_df["totalSlices"] = None
single_reg_df["filePath"] = None
single_reg_df["height"] = None
single_reg_df["width"] = None
single_reg_df

In [None]:
single_reg_df.diagnosis.value_counts()

In [None]:
target_data_df = single_reg_df.iloc[0:0].copy()
target_data_df

In [None]:
def get_modalities_from_study_uid(study_uid):
    row_CT = pre_processed_df[(pre_processed_df["Study UID"] == study_uid) & (pre_processed_df["Modality"] == "CT")].iloc[0]
    row_PET = pre_processed_df[(pre_processed_df["Study UID"] == study_uid) & (pre_processed_df["Modality"] == "PT")].iloc[0]
    row_SEG = pre_processed_df[(pre_processed_df["Study UID"] == study_uid) & (pre_processed_df["Modality"] == "SEG")].iloc[0]
    return row_CT, row_PET, row_SEG

In [None]:
def get_highest_activation_seg(seg_img, slices_num, cut_plane="coronal"):
    selected_slice = 0
    curr_max_pixels = 0
    pixel_count_list = []
    for i in range(slices_num):
        curr_image = None
        if cut_plane == "coronal":
            curr_image = seg_img[:, i, :]
        elif cut_plane == "sagital":
            curr_image = seg_img[i, :, :]
        else:
            curr_image = seg_img[:, :, i]
        num_of_non_null_pixels = curr_image[curr_image != 0].size
        pixel_count_list.append(num_of_non_null_pixels)
        if num_of_non_null_pixels > curr_max_pixels:
            curr_max_pixels = num_of_non_null_pixels
            selected_slice = i

    # print(len(pixel_count_list))
    return selected_slice

In [None]:
def show_all_modalities(seg_img, pet_img, ct_img):
    plt.imshow(seg_img, cmap="hot")
    plt.show()

    plt.imshow(pet_img.T, cmap="gist_yarg", origin='lower')
    plt.show()

    plt.imshow(ct_img.T, cmap="gray", origin='lower')
    plt.show()

In [None]:
def get_plane(seg, pet, ct, slice_index, plane):
    if plane == 'coronal':
        return seg[:, slice_index, :], pet[:, slice_index, :], ct[:, slice_index, :]
    elif plane == 'sagital':
        return seg[slice_index, :, :], pet[slice_index, :, :], ct[slice_index, :, :]
    else:
        return seg[:, :, slice_index], pet[:, :, slice_index], ct[:, :, slice_index]

In [None]:
def get_full_image_from_slices(pet, ct):
    blank_channel = np.zeros((len(ct), len(ct[0]))) # gera terceiro canal da imagem vazio, preenchido de zeros
    np_img = np.dstack((ct, pet, blank_channel))
    return np.rot90(np_img, k=1, axes = (0,1))

In [None]:
def get_target_file_name(row, cut_index):
    return f"{DatasetConfig.DATA_ROOT_TARGET}\\{row['Subject ID']}-{row['Study UID']}-{cut_index}.png"

In [None]:
def get_num_slices_from_cut_plane(shape, cut_plane="coronal"):
    if cut_plane == "coronal":
        return shape[1]
    elif cut_plane == "sagital":
        return shape[0]
    else:
        return shape[2]

In [None]:
def get_equivalent_slice(origin_selected, origin_total, target_total):
    return (target_total * origin_selected) // origin_total

In [2]:
def process_positive_element(study_uid, cut_plane="sagital"):
  
    positive_CT, positive_PET, positive_SEG = get_modalities_from_study_uid(study_uid)
    
    seg_nib = nib.load(get_image_path(positive_SEG))
    seg_img = seg_nib.get_fdata()

    slices_num = get_num_slices_from_cut_plane(seg_nib.shape, cut_plane)
    
    selected_slice = get_highest_activation_seg(seg_img, slices_num, cut_plane)
    
    # Load PET image
    pet_nib = nib.load(get_image_path(positive_PET))
    pet_img = pet_nib.get_fdata()
    
    # Extract the PET image cut for the selected slice and plane
    _, pos_pet_cut, _ = get_plane(seg_img, pet_img, pet_img, selected_slice, cut_plane)
    
    # Save the PET image slice
    pos_img_path = get_target_file_name(positive_PET, selected_slice)
    cv2.imwrite(pos_img_path, pos_pet_cut)

    #Process to negative class
    random_neg_sample = single_reg_df[(single_reg_df.diagnosis == "NEGATIVE") & (~single_reg_df["Study UID"].isin(target_data_df["Study UID"]))].sample(random_state=DatasetConfig.SEED_VALUE).iloc[0]

    neg_CT, neg_PET, neg_SEG = get_modalities_from_study_uid(random_neg_sample["Study UID"])

    neg_pet_nib = nib.load(get_image_path(neg_PET))
    neg_pet_img = neg_pet_nib.get_fdata()

    slices_num_neg = get_num_slices_from_cut_plane(neg_pet_nib.shape, cut_plane)

    neg_selected_slice = get_equivalent_slice(selected_slice, slices_num, slices_num_neg)

    _, neg_pet_cut, _ = get_plane(neg_pet_img, neg_pet_img, neg_pet_img, neg_selected_slice, cut_plane)

    # Save the PET image slice
    neg_img_path = get_target_file_name(neg_PET, neg_selected_slice)
    cv2.imwrite(neg_img_path, neg_pet_cut)

    return (study_uid, pet_nib.shape, selected_slice, pos_img_path, 
            random_neg_sample["Study UID"], neg_pet_nib.shape, neg_selected_slice, neg_img_path)

In [None]:
single_reg_df[single_reg_df.diagnosis == "POSITIVE"]

In [None]:
def add_record_to_target_df(study_uid, shape, selected_slice, image_path, cut_plane="coronal"):
    row = single_reg_df[single_reg_df["Study UID"] == study_uid].to_dict(orient='records')[0]
    # print(row)
    row["totalSlices"] = shape[1]
    row["sliceNum"] = selected_slice
    row["filePath"] = image_path.replace(DatasetConfig.DATA_ROOT_TARGET, '') # removing root absolute path, leaving only relative path

    #print(image_path)
    img = cv2.imread(image_path)
    height, width, channels = img.shape
    
    row["height"] = height
    row["width"] = width
    
    return pd.concat([target_data_df, pd.DataFrame([row])], ignore_index=True)

In [None]:
target_data_df

In [None]:
positive_records_df = single_reg_df[single_reg_df.diagnosis == "POSITIVE"]
rows_to_iterate = positive_records_df.sample(random_state=DatasetConfig.SEED_VALUE +123, frac=1)

for index, row in tqdm(rows_to_iterate.iterrows(), total=len(rows_to_iterate)):
    pos_study_uid, pos_shape, pos_selected_slice, pos_img_path, neg_study_uid, neg_shape, neg_selected_slice, neg_img_path = process_positive_element(row["Study UID"], DatasetConfig.CUT_PLANE)
    target_data_df = add_record_to_target_df(pos_study_uid, pos_shape, pos_selected_slice, pos_img_path)
    target_data_df = add_record_to_target_df(neg_study_uid, neg_shape, neg_selected_slice, neg_img_path)
    #break

In [None]:
target_data_df

In [None]:
target_data_df.to_csv(f"{DatasetConfig.DATA_ROOT_TARGET}/data_description.csv")

In [None]:
!ls /home/jovyan/work/data/pre-processed-autopet-min-coronal/

In [None]:
import nibabel as nib
import matplotlib.pyplot as plt

# Carregar a imagem NIfTI
img = nib.load('D:/Autopet_v1.1/2024-05-10_Autopet_v1.1/imagesTr/fdg_0af7ffe12a_08-12-2005-NA-PET-CT Ganzkoerper  primaer mit KM-96698_0000.nii.gz')

# Verificar a forma da imagem
print(f"Dimensões da imagem: {img.shape}")

# Pegar a matriz de dados da imagem
data = img.get_fdata()

# Visualizar a primeira fatia da imagem (por exemplo, fatia axial)
plt.imshow(data[:, :, data.shape[2] // 2], cmap='gray')
plt.show()