3 variables set to your path：dataroot, saveroot, pair_dir

generate npy files，pair files，

training dataset demo

In [1]:
import os
import torch
import numpy as np
from torch.utils.data import Dataset
from loadFun import loadmat, kdata2img, multicoilkdata2img
from PIL import Image
from torchvision import transforms

## generate .npy

In [2]:
def paddingZero_np(np_data: np.array, target_shape: tuple):
    H,W = np_data.shape[-2], np_data.shape[-1]
    padding_H = target_shape[0] - H
    padding_W = target_shape[1] - W
    if len(np_data.shape) == 4:
        padding_size = ((0,0),(0,0),(padding_H//2, padding_H - padding_H//2), (padding_W//2, padding_W-padding_W//2))
    else:
        padding_size = ((0,0),(0,0),(0,0),(padding_H//2, padding_H - padding_H//2), (padding_W//2, padding_W-padding_W//2))
    padded_np_data = np.pad(np_data, padding_size, mode='constant')
    return padded_np_data

In [3]:
class Task2PngConverter(Dataset):
    def __init__(self, root_dir, coil_type, task_type, acc_factor, need_gt=True,saveroot=""):
        """
        root_dir: absolute path of "ChallengData"
        coil_type: MultiCoil or SingleCoil
        task_type: Cine or Mapping
        acc_factor: AccFactor04 or AccFactor08 or AccFactor10
        """
        self.name_dict = {"MultiCoil":{"AccFactor04":"kspace_sub04",
                                       "AccFactor08":"kspace_sub08",
                                       "AccFactor10":"kspace_sub10",
                                       "FullSample":"kspace_full"}, 
                          "SingleCoil":{"AccFactor04":"kspace_single_sub04",
                                       "AccFactor08":"kspace_single_sub08",
                                       "AccFactor10":"kspace_single_sub10",
                                       "FullSample":"kspace_single_full"}}
        self.root_dir = root_dir
        self.coil_type = coil_type
        self.task_type = task_type
        self.acc_factor = acc_factor
        self.need_gt = need_gt
        self.saveroot = saveroot
        os.makedirs(self.saveroot, exist_ok=True)

        self.train_dir = os.path.join(root_dir, coil_type, task_type, "TrainingSet", acc_factor)
        self.train_GT_dir = os.path.join(root_dir, coil_type, task_type, "TrainingSet", "FullSample")
        if not self.acc_factor == "FullSample":
            self.val_dir = os.path.join(root_dir, coil_type, task_type, "ValidationSet", acc_factor)
            self.val_PNum_list = os.listdir(self.val_dir)

        self.train_PNum_list = os.listdir(self.train_dir)

    def __len__(self):
        return len(self.train_PNum_list)
    
    def __getitem__(self, index):
        train_item_dir = os.path.join(self.train_dir, self.train_PNum_list[index])
        save_root = self.saveroot

        train_item_t1_path = os.path.join(train_item_dir, "T1map.mat")
        train_item_t2_path = os.path.join(train_item_dir, "T2map.mat")

        # Actually, we won't use the mask data, but we remain the following two lines for some special reasons.
        train_item_t1_mask_path = os.path.join(train_item_dir, "T1map_mask.mat")
        train_item_t2_mask_path = os.path.join(train_item_dir, "T2map_mask.mat")
        
        # GT data
        train_itemGT_dir = os.path.join(self.train_GT_dir, self.train_PNum_list[index])
        train_itemGT_t1_path = os.path.join(train_itemGT_dir, "T1map.mat")
        train_itemGT_t2_path = os.path.join(train_itemGT_dir, "T2map.mat")

        # some patient only has one axis data
        if (os.path.exists(train_item_t1_path) & os.path.exists(train_itemGT_t1_path)):
            has_t1 = True
            self._extract_to_png(train_item_t1_path, train_itemGT_t1_path, save_root, "t1")
        else:
            has_t1 = False
        
        if (os.path.exists(train_item_t2_path) & os.path.exists(train_itemGT_t2_path)):
            has_t2 = True
            self._extract_to_png(train_item_t2_path, train_itemGT_t2_path, save_root, "t2")
        else:
            has_t2 = False

        return {'t1': train_item_t1_path,
                't1_mask': train_item_t1_mask_path,
                't1_GT': train_itemGT_t1_path,
                't2': train_item_t2_path,
                't2_mask': train_item_t2_mask_path,
                't2_GT': train_itemGT_t2_path,
                'has_t1': has_t1,
                'has_t2': has_t2}
    
    def _kspace2img(self, kspace_raw):
        kspace_real = kspace_raw['real']
        kspace_imag = kspace_raw['imag']
        # kspace_modulus = np.sqrt(kspace_real**2 + kspace_img**2)

        kspace_complex = kspace_real + 1j * kspace_imag
        iimg = np.fft.ifft2(kspace_complex)
        img = np.fft.ifftshift(iimg)
        img = np.abs(img)
        return img

    def _extract_to_png(self, path1: str, path2: str, saveroot: str, mapping_type: str) -> None:
        """
        Extract the .png files from raw .mat file
        path1: sampled data path .mat
        path2: full data path .mat
        dir: to store those extracted .png files.
        mapping_type: mark the filename with "t1" or "t2"
        """
        
        patient_id = path1.split("/")[-2]
        # 1. process sampled data
        mat_data = loadmat(path1)
        key1 = self.name_dict[self.coil_type][self.acc_factor]
        raw_data = mat_data[key1]
        raw_data = paddingZero_np(raw_data, (512,512))
        if self.coil_type == "SingleCoil":
            iimg = kdata2img(raw_data)
            frame_num, slice_num, _, _ = iimg.shape
            for i in range(frame_num):
                for j in range(slice_num):
                        img = iimg[i, j, :, :]
                        img = img / np.max(img)
                        save_path = os.path.join(saveroot, self.coil_type, "Mapping", self.acc_factor, patient_id, f"{self.acc_factor}_{patient_id}_{mapping_type}_{i}_{j}.npy")
                        os.makedirs(os.path.join(saveroot, self.coil_type, "Mapping", self.acc_factor, patient_id), exist_ok=True)
                        np.save(save_path, img)
        elif self.coil_type == "MultiCoil":
            iimg = multicoilkdata2img(raw_data)
            frame_num, slice_num, height, width = iimg.shape
            for i in range(frame_num):
                for j in range(slice_num):
                        img = iimg[i, j, :, :]
                        img = img / np.max(img)
                        save_path = os.path.join(saveroot, self.coil_type, "Mapping", self.acc_factor, patient_id, f"{self.acc_factor}_{patient_id}_{mapping_type}_{i}_{j}.npy")
                        os.makedirs(os.path.join(saveroot, self.coil_type, "Mapping", self.acc_factor, patient_id), exist_ok=True)
                        np.save(save_path, img)
            pass
    
    def _preprocess(self):
        # print(self.__len__())
        for i in range(self.__len__()):
            self.__getitem__(i)


change the dataroot, saveroot in the next cell to your path

In [None]:
dataroot = "/home/txiang/CMRxRecon/CMRxRecon/MICCAIChallenge2023/ChallengeData"
saveroot = "/home/txiang/CMRxRecon/CMRxRecon/MICCAIChallenge2023/Preprocessed"
for coil in ["MultiCoil", "SingleCoil"]:
    for acc in ["AccFactor04", "AccFactor08", "AccFactor10", "FullSample"]:
        Task2PngConverter(dataroot,coil,"Mapping",acc,saveroot=saveroot)._preprocess()

## generate train pair file

In [2]:
val_list = ["P111", "P112", "P113", "P114", "P115", "P116", "P117", "P118", "P119", "P120"]
class Task2PairFileGenerator():
    def __init__(self, root_dir, saveroot, acc):
        """
        root_dir: absolute path of "ChallengData"
        coil_type: MultiCoil or SingleCoil
        task_type: Cine or Mapping
        # acc_factor: AccFactor04 or AccFactor08 or AccFactor10
        acc: "04", "08", "10"
        """
        self.name_dict = {"MultiCoil":{"AccFactor04":"kspace_sub04",
                                       "AccFactor08":"kspace_sub08",
                                       "AccFactor10":"kspace_sub10",
                                       "FullSample":"kspace_full"}, 
                          "SingleCoil":{"AccFactor04":"kspace_single_sub04",
                                       "AccFactor08":"kspace_single_sub08",
                                       "AccFactor10":"kspace_single_sub10",
                                       "FullSample":"kspace_single_full"}}
        self.root_dir = root_dir
        self.coil_type = ["MultiCoil", "SingleCoil"]
        accf = acc
        self.acc_factor = [f"AccFactor{accf}"]
        self.is_train = True
        os.makedirs(saveroot, exist_ok=True)
        train_pair_file = f"{saveroot}/Task2_acc_{accf}_train_pair_file_npy.txt"
        val_pair_file = f"{saveroot}/Task2_acc_{accf}_val_pair_file_npy.txt"
        file_obj_train = open(train_pair_file, "w")
        file_obj_val = open(val_pair_file, "w")

        if self.is_train:
            for coil_type in self.coil_type:
                for acc_factor in self.acc_factor:
                    self.train_dir = os.path.join(self.root_dir, coil_type, "Mapping", acc_factor)
                    self.train_GT_dir = os.path.join(self.root_dir, coil_type, "Mapping", "FullSample")

                    Pnum_list = os.listdir(self.train_dir)
                    for Pnum in Pnum_list:
                        if Pnum in val_list:
                            file_obj = file_obj_val
                        else:
                            file_obj = file_obj_train
                        self.sub_dir = os.path.join(self.train_dir, Pnum)
                        self.sub_Gt_dir = os.path.join(self.train_GT_dir, Pnum)
                        train_data_list = os.listdir(self.sub_dir)
                        for train_data in train_data_list:
                            # print(train_data)
                            if os.path.exists(os.path.join(self.sub_dir, train_data)) and os.path.exists(os.path.join(self.sub_Gt_dir, train_data.replace(acc_factor, "FullSample"))):
                                file_obj.writelines([os.path.join(self.sub_dir, train_data), " ", os.path.join(self.sub_Gt_dir, train_data.replace(acc_factor, "FullSample")), "\n"])
        file_obj_train.close()
        file_obj_val.close()
    
        file_obj = open(train_pair_file, "r")
        newfile_name = f"{saveroot}/Task2_acc_{accf}_train_pair_file_npy_clean.txt"
        newfile_obj = open(newfile_name, "w")
        lines = file_obj.readlines()
        for line in lines:
            elem1, elem2 = line.split(" ")
            if not "FullSample" in elem1:
                newfile_obj.writelines(line)
        file_obj.close()
        newfile_obj.close()

        file_obj = open(val_pair_file, "r")
        newfile_name = f"{saveroot}/Task2_acc_{accf}_val_pair_file_npy_clean.txt"
        newfile_obj = open(newfile_name, "w")
        lines = file_obj.readlines()
        for line in lines:
            elem1, elem2 = line.split(" ")
            if not "FullSample" in elem1:
                newfile_obj.writelines(line)
        file_obj.close()
        newfile_obj.close()

change the root_dir, pair_dir in the next cell to your path

In [3]:
# root_dir = "/home/txiang/CMRxRecon/CMRxRecon/MICCAIChallenge2023/Preprocessed"
root_dir = saveroot
pair_dir = "/home/txiang/CMRxRecon/CMRxRecon_Repo/dataset/train_pair_file"
for acc_ in ["04", "08", "10"]:
    Task2PairFileGenerator(root_dir=root_dir, saveroot=pair_dir, acc=acc_)
    print(acc_, "ok")

04 ok
08 ok
10 ok


## Dataset

In [2]:
class CMRxReconDataset(Dataset):
    
    def __init__(self, file_path, transform=None, length=-1):
        """
        root_dir: absolute path of "ChallengData"
        file_path: the train_pair_file.txt
        """
        self.name_dict = {"MultiCoil":{"AccFactor04":"kspace_sub04",
                                       "AccFactor08":"kspace_sub08",
                                       "AccFactor10":"kspace_sub10",
                                       "FullSample":"kspace_full"}, 
                          "SingleCoil":{"AccFactor04":"kspace_single_sub04",
                                       "AccFactor08":"kspace_single_sub08",
                                       "AccFactor10":"kspace_single_sub10",
                                       "FullSample":"kspace_single_full"}}
        self.file_path = file_path
        file_obj = open(self.file_path, "r")
        self.train_pairs = file_obj.readlines()
        if length>0:
            self.train_pairs = self.train_pairs[:length]
        self.transform = transform
        file_obj.close()
        
    def __len__(self):
        return len(self.train_pairs)
    
    def __getitem__(self, index):
        path, GT_path = self.train_pairs[index].replace("\n","").split(" ")
        item = np.float32(np.load(path))
        GT_item = np.float32(np.load(GT_path))
        output = {"input": item, "GT": GT_item}
        if self.transform:
            data = np.stack((item, GT_item), axis=-1)
            transformed_data = self.transform(data)
            output = {"input": transformed_data[0,:,:].unsqueeze(0), 
                      "GT": transformed_data[1,:,:].unsqueeze(0), 
                      "ipath":path, 
                      "gtpath":GT_path}
        return output

In [3]:
pair_file = "/home/txiang/CMRxRecon/CMRxRecon_Repo/dataset/train_pair_file/Task2_acc_04_train_pair_file_npy_clean.txt"
tsfm = transforms.Compose([
    transforms.ToTensor(),
    transforms.RandomHorizontalFlip(p=0.5),
    transforms.RandomVerticalFlip(p=0.5),
])
dataset = CMRxReconDataset(pair_file, transform=tsfm, length=-1)
print(len(dataset))
print(dataset[0]["ipath"], dataset[0]["input"].shape, dataset[0]["input"].min(), dataset[0]["input"].max)

15576
/home/txiang/CMRxRecon/CMRxRecon/MICCAIChallenge2023/Preprocessed/MultiCoil/Mapping/AccFactor04/P100/AccFactor04_P100_t1_0_3.npy torch.Size([1, 512, 512]) tensor(0.0013) <built-in method max of Tensor object at 0x7fa8a45ca750>
