In [1]:
import numpy as np
import os
import pathlib
from PIL import Image
import pydicom as dicom
from tqdm import tqdm
from typing import NamedTuple, List

In [2]:
input = 'input/train'
output = 'output/processed'
blacklist = {'ID00011637202177653955184', 'ID00052637202186188008618'}

patients = {dir for dir in os.listdir(input)}
patients = patients - blacklist
print(f'{len(patients)} patients')

174 patients


In [3]:
def load_scan(path):
    slices = [dicom.read_file(f'{path}/{s}') for s in os.listdir(path)]
    #slices.sort(key = lambda x: float(x.ImagePositionPatient[2]))
    return slices


def to_hu(slices, padding=-2000):
    """Convert to Hounsfield units (HU)"""
    frames = np.stack([s.pixel_array for s in slices])
    # Convert to int16 (from sometimes int16), 
    # should be possible as values should always be low enough (<32k)
    frames = frames.astype(np.int16)
    for i in range(len(slices)):
        f = frames[i]
        s = slices[i]
        if "PixelPaddingValue" in s:
            padding = np.int16(s.PixelPaddingValue)
        slope = np.float64(s.RescaleSlope)
        intercept = np.int16(s.RescaleIntercept)
        # Set outside-of-scan pixels to 0
        f[f <= padding] = 0 
        if slope != 1:
            f = slope * f.astype(np.float64)
            print(f'f.dtype={f.dtype}')
            f = f.astype(np.int16)  
        f += intercept
    return frames.astype(np.int16)


def window(frames, hu_min=-1000, hu_max=600):
    rng = hu_max - hu_min
    norm = (frames - hu_min) / rng
    norm[norm < 0] = 0
    norm[norm > 1] = 1
    norm = (norm * 255).astype(np.uint8)
    res = []
    for f in norm:
        channel = f.T
        rgb = np.array([channel, channel, channel]).T
        res.append(rgb)
    return np.array(res, dtype=np.uint8)


class Features(NamedTuple):
    lung_area: int
    tissue_area: int


def extract(frames, slices, hu_min=-1000, hu_max=600, tissue=(100, 300), lung=(-700, -600)) -> List[Features]:
    rng = hu_max - hu_min
    tissue_min = int((tissue[0] - hu_min) / rng * 255)
    tissue_max = int((tissue[1] - hu_min) / rng * 255)
    lung_min = int((lung[0] - hu_min) / rng * 255)
    lung_max = int((lung[1] - hu_min) / rng * 255)
    #print(f'tissue pixel range={(tissue_min, tissue_max)}, lung pixel range={(lung_min, lung_max)}')
    res = []
    for i in range(len(frames)):
        f = frames[i]
        im = Image.fromarray(f, mode='RGB')
        s = slices[i]
        rows, cols = float(s.PixelSpacing[0]), float(s.PixelSpacing[1]) 
        height = int(f.shape[0] * rows)
        width = int(f.shape[1] * cols)
        im = im.resize((width, height))
        a = np.asarray(im)
        lung_area = int(np.sum(np.ma.masked_inside(a, lung_min, lung_max).mask) / 3)
        tissue_area = int(np.sum(np.ma.masked_inside(a, tissue_min, tissue_max).mask) / 3)
        sa = Features(lung_area=lung_area, tissue_area=tissue_area)
        res.append(sa)
    return res


def resize(frames, target_size=(600, 600)):
    res = []
    for i in range(len(frames)):
        f = frames[i]
        im = Image.fromarray(f, mode='RGB')
        im = im.resize(target_size)
        res.append(np.asarray(im))
    return np.array(res, dtype=np.uint8)


def preprocess(dir):
    slices = load_scan(dir)
    res = to_hu(slices)
    res = window(res)
    sas = extract(res, slices)
    res = resize(res)
    return res, sas

In [None]:
rows = []
for patient in tqdm(patients):
    dir = f"{output}/{patient}"
    pathlib.Path(dir).mkdir(parents=True, exist_ok=True)
    try:
        frames, features = preprocess(f"{input}/{patient}")
    except Exception as ex:
        print(f'patient={patient}, Error={ex}')
        continue
    for i in range(len(frames)):
        im =  Image.fromarray(frames[i], mode='RGB')
        filename = f"{i + 1}.png"
        im.save(f"{dir}/{filename}")
        row = {}
        row['img'] = f"{patient}/{filename}"
        row['lung_area'] = features[i].lung_area
        row['tissue_area'] = features[i].tissue_area
        rows.append(row)
        


 44%|████▎     | 76/174 [25:38<40:27, 24.77s/it]  

In [None]:
df = pd.DataFrame.from_records(rows)
df = train.astype({
    'img': str,
    'lung_area': np.uint32,
    'tissue_area': np.uint32,
})
df.info()

In [None]:
df.head()

In [None]:
df['lung_area'].describe()

In [None]:
df['tissue_area'].describe()

In [None]:
df.to_parquet('output/imf.parquet', index=False)