In [None]:
import os
import shutil
import SimpleITK as sitk
import numpy as np
from functools import reduce
import pandas as pd

root = r'D:\x'

mask_info = []

def get_mask_info(mask_path):
    msk = sitk.ReadImage(mask_path)
    spacing = msk.GetSpacing()
    voxel_volume = reduce(lambda x, y: x*y, spacing)
    msk_arr = sitk.GetArrayFromImage(msk)
    voxels = [np.sum(msk_arr == spec_id) for spec_id in [1, 2]]
    volume = [v * voxel_volume / 1000 for v in voxels]
    return voxels + volume

for subset in ['A', 'B', 'C']:
    sample_dir = os.path.join(root, subset, 'masks')
    samples = sorted(os.listdir(sample_dir))
    for sample_pre, sample_post in zip(samples[::2], samples[1::2]):
        pre = os.path.join(sample_dir, sample_pre)
        post = os.path.join(sample_dir, sample_post)
        for idx, c in enumerate(sample_pre):
            if c.isdigit() and sample_pre[idx+1] != '_':
                break
        name = subset + '_' + sample_pre[:idx].replace('_', '')
        mask_info.append([name] + get_mask_info(pre) + get_mask_info(post))
mask_info = pd.DataFrame(mask_info, columns=['ID',
                                             'l1_voxel_pre', 'l2_voxel_pre', 'l1_volume_pre', 'l2_volume_pre',
                                             'l1_voxel_post', 'l2_voxel_post','l1_volume_post', 'l2_volume_post'])
mask_info

In [None]:
# for subset in ['A', 'B', 'C']:
#     sample_dir = os.path.join(root, subset, 'masks')
#     samples = sorted(os.listdir(sample_dir))
#     for sample_pre, sample_post in zip(samples[::2], samples[1::2]):
#         if sample_pre[:10] != sample_post[:10]:
#             print(subset, sample_pre[:10], sample_post[:10])

In [None]:
mask_info['l1_sub'] = mask_info['l1_volume_post'] - mask_info['l1_volume_pre']
mask_info['l1_rela'] = mask_info['l1_sub'] / mask_info['l1_volume_pre']
mask_info['l2_sub'] = mask_info['l2_volume_post'] - mask_info['l2_volume_pre']

mask_info['label1'] = ((mask_info['l1_sub'] > 6) | (mask_info['l1_rela'] >= 0.33)).astype(int)
mask_info['label2'] = ((mask_info['l1_sub'] > 6) | (mask_info['l1_rela'] >= 0.33) | (mask_info['l2_sub'] >= 0)).astype(int)
mask_info['label3'] = ((mask_info['l1_sub'] > 6) | (mask_info['l1_rela'] >= 0.33) | (mask_info['l2_sub'] >= 1)).astype(int)
mask_info.to_csv(os.path.join(root, 'mask_info.csv'), index=False)
mask_info

In [None]:
for l in ['label1', 'label2', 'label3']:
    print(mask_info[l].value_counts())

In [None]:
import pandas as pd
import os
root = r'D:\x'
clinic = pd.read_csv(os.path.join(root, 'Clinical_Radiological.csv'))
clinic['ID'] = clinic['ID'].map(lambda x: x.replace(' ', '').upper())
clinic

In [None]:
# from onekey_algo.custom.utils import print_join_info
# print_join_info(clinic, mask_info)
# all_clinic = pd.merge(clinic, mask_info, on='ID', how='inner')
# all_clinic.to_csv(os.path.join(root, 'ALL.csv'), index=False)
# all_clinic

# Masks Transformation

In [None]:
import os
import numpy as np
import shutil
import SimpleITK as sitk
import numpy as np
import pandas as pd

root = r'D:\x'

for subset in ['A', 'B', 'C']:
    sample_dir = os.path.join(root, subset, 'masks')
    new_sample_dir = os.path.join(root, subset, 'masks_12')
    os.makedirs(new_sample_dir, exist_ok=True)
    samples = sorted(os.listdir(sample_dir))
    for sample_pre in samples[::2]:
        msk = sitk.ReadImage(os.path.join(sample_dir, sample_pre))
        msk_arr = sitk.GetArrayFromImage(msk)
        msk_arr_new = np.zeros_like(msk_arr)
        msk_arr_new[(msk_arr == 1) | (msk_arr == 2)] = 1
        msk_new = sitk.GetImageFromArray(msk_arr_new)
        sitk.WriteImage(msk_new, os.path.join(new_sample_dir, sample_pre))

# Generate 2.5D Data

In [None]:
ids = set(clinic['ID'])

In [None]:
from collections import defaultdict
import glob
data = defaultdict(list)

images = {os.path.basename(f)[:30]: os.path.basename(os.path.dirname(os.path.dirname(f)))
          for f in glob.glob(os.path.join(root, '*', 'images', '*.nii.gz'))}
for f in sorted(os.listdir(os.path.join(root, 'crop'))):
    subset = images[f[:30]]
    for idx, c in enumerate(f):
        if c.isdigit() and f[idx+1] != '_':
            break
    name = subset + '_' + f[:idx].replace('_', '')
    data[name].append(os.path.join(root, 'crop', f))

In [None]:
record = []

for k in data:
    data[k] = data[k] * 2
    record.append([k] + data[k][:3])

record = pd.DataFrame(record, columns=['ID', 'ori', '+2', '-2'])
record

In [None]:
from onekey_algo.custom.utils import print_join_info
print_join_info(record, clinic)
clinic = clinic[['label3'] + list(clinic.columns[:-3])]
cohort = pd.merge(record, clinic, on='ID', how='inner')
cohort['group'] = cohort['ID'].map(lambda x: 'train' if x.startswith('A_') else 'test')
cohort

In [None]:
cohort.to_csv('features/all.csv', index=False, header=True)
cohort[cohort['group'] == 'train'][[c for c in cohort.columns if c not in ['group', 'ID']]].to_csv('features/train_l3.txt', 
                                                                                                   sep='\t', index=False, header=False)
cohort[cohort['group'] == 'test'][[c for c in cohort.columns if c not in ['group', 'ID']]].to_csv('features/val_l3.txt', 
                                                                                                  sep='\t', index=False, header=False)

In [None]:
cohort[cohort['group'] == 'train'][['ori', 'label3']].to_csv('features/train2d_l3.txt', sep='\t', index=False, header=False)
cohort[cohort['group'] == 'test'][['ori', 'label3']].to_csv('features/val2d_l3.txt', sep='\t', index=False, header=False)