### 10.2.2 애노테이션 데이터와 후보 데이터 합치기

In [1]:
from collections import namedtuple

In [2]:
CandidateInfoTuple = namedtuple(
    'CandidateInfoTuple',
    'isNodule_bool, diameter_mm, series_uid, center_xyz',
)

In [3]:
import os 
import glob
import csv
import functools

import sys
sys.path.append(os.path.dirname(os.getcwd()))

In [4]:
@functools.lru_cache(1)  # 표준 인메모리 캐싱 라이브러리
def getCandidateInfoList(requireOnDisk_bool=True):  # requireOnDisk_bool 은 디스크에 없는 데이터는 걸러내기 위함이다.
    mhd_list = glob.glob('../data-unversioned/part2/luna/subset*/*.mhd')
    presentOnDisk_set = {os.path.split(p)[-1][:-4] for p in mhd_list}
    
    diameter_dict = {}
    with open('../data-unversioned/part2/luna/annotations.csv', 'r') as f:
        for row in list(csv.reader(f))[1:]:
            series_uid = row[0]
            annotationCenter_xyz = tuple([float(x) for x in row[1:4]])
            annotationDiameter_mm = float(row[4])

            diameter_dict.setdefault(series_uid, []).append(
                (annotationCenter_xyz, annotationDiameter_mm)
            )
            
    candidateInfo_list = []
    with open('../data-unversioned/part2/luna/candidates.csv', 'r') as f:
        for row in list(csv.reader(f))[1:]:
            series_uid = row[0]

            if series_uid not in presentOnDisk_set and requireOnDisk_bool:
                continue

            isNodule_bool = bool(int(row[4]))
            candidateCenter_xyz = tuple([float(x) for x in row[1:4]])

            candidateDiameter_mm = 0.0
            for annotation_tup in diameter_dict.get(series_uid, []):
                annotationCenter_xyz, annotationDiameter_mm = annotation_tup
                for i in range(3):
                    delta_mm = abs(candidateCenter_xyz[i] - annotationCenter_xyz[i])
                    # 반경을 얻기 위해 직경을 2로 나누고, 두 개의 결절 센터가 결절의 크기 기준으로 너무 
                    # 떨어져 있는 지를 반지름의 절반 길이를 기준으로 판정한다. (실거리가 아닌 바운딩 박스 체크)
                    if delta_mm > annotationDiameter_mm / 4:
                        break
                    else:
                        candidateDiameter_mm = annotationDiameter_mm
                        break
                        
            candidateInfo_list.append(CandidateInfoTuple(isNodule_bool, 
                                                         candidateDiameter_mm,
                                                         series_uid,
                                                         candidateCenter_xyz,
                                                        ))
    # 이제 모든 결절 샘플을 내림차순으로 정렬했고 
    # 그 뒤에는 (크기 정보가 없는) 결절이 아닌 샘플이 이어져 있는 데이터가 준비되었다.
    candidateInfo_list.sort(reverse=True) 
    return candidateInfo_list

## 10.3 개별 CT 스캔 로딩

### 10.3.1 하운스필드 단위

In [5]:
import SimpleITK as sitk

### 10.4.3 밀리터리를 복셀 주소로 변환하기

In [6]:
import numpy as np

from util.util import XyzTuple, xyz2irc

In [7]:
class Ct:
    def __init__(self, series_uid):
        mhd_path = glob.glob('../data-unversioned/part2/luna/subset*/{}.mhd'.format(series_uid))[0]
        
        # sitk.ReadImage 는 .mhd 파일뿐 아니라 .raw 파일도 읽는다.
        ct_mhd = sitk.ReadImage(mhd_path)
        # 값의 타입을 np.float32 로 변환하기 위해 np.array 를 다시 만든다.
        ct_a = np.array(sitk.GetArrayFromImage(ct_mhd), dtype=np.float32)
        
        # CTs are natively expressed in https://en.wikipedia.org/wiki/Hounsfield_scale
        # HU are scaled oddly, with 0 g/cc (air, approximately) being -1000 and 1 g/cc (water) being 0.
        # The lower bound gets rid of negative density stuff used to indicate out-of-FOV
        # The upper bound nukes any weird hotspots and clamps bone down
        ct_a.clip(-1000, 1000, ct_a)
        
        self.series_uid = series_uid
        self.hu_a = ct_a
        
        self.origin_xyz = XyzTuple(*ct_mhd.GetOrigin())
        self.vxSize_xyz = XyzTuple(*ct_mhd.GetSpacing())
        self.direction_a = np.array(ct_mhd.GetDirection()).reshape(3, 3)  # 방향 정보를 배열로 변환하고 3x3 행렬 모양의 9개 요소의 배열로 리셰이프한다.
        
    def getRawCandidate(self, center_xyz, width_irc):
        center_irc = xyz2irc(
            center_xyz,
            self.origin_xyz,
            self.vxSize_xyz,
            self.direction_a,
        )
        
        slice_list = []
        for axis, center_val in enumerate(center_irc):
            start_ndx = int(round(center_val - width_irc[axis] / 2))
            end_ndx = int(start_ndx + width_irc[axis])
            slice_list.append(slice(start_ndx, end_ndx))  # slice object : start_idx, end_idx 를 받아서 슬라이싱할때 [1:3] 대신 [slice(1, 3)] 을 사용할 수 있음
            
        ct_chunk = self.hu_a[tuple(slice_list)]
        
        return ct_chunk, center_irc

In [8]:
from util.disk import getCache

raw_cache = getCache('part2ch10_raw')

In [9]:
@functools.lru_cache(1, typed=True)
def getCt(series_uid):
    return Ct(series_uid)

@raw_cache.memoize(typed=True)
def getCtRawCandidate(series_uid, center_xyz, width_irc):
    ct = getCt(series_uid)
    ct_chunk, center_irc = ct.getRawCandidate(center_xyz, width_irc)
    return ct_chunk, center_irc

In [14]:
def getCtAugmentedCandidate(
    augmentation_dict,
    series_uid, center_xyz, width_irc,
    use_cache=True
):
    if use_cache:
        ct_chunk, center_irc = getCtRawCandidate(series_uid, center_xyz, width_irc)
    else:
        ct = getCt(series_uid)
        ct_chunk, center_irc = ct.getRawCandidate(center_xyz, width_irc)
        
    ct_t = torch.tensor(ct_chunk).unsqueeze(0).unsqueeze(0).to(torch.float32)
    
    transform_t = torch.eye(4)
    # ... <1>  # 이ㅈ 쯤에서 transform_tensor를 수정한다.
    
    for i in range(3):
        if 'flip' in augmentation_dict:
            if random.random() > 0.5:
                transform_t[i, i] *= -1
                
        if 'offset' in augmentation_dict:
            offset_float = augmentation_dict['offset']
            random_float = (random.random() * 2 - 1)
            transform_t[i, 3] = offset_float * random_float
            
        if 'scale' in augmentation_dict:
            scale_float = augmentation_dict['scale']
            random_float = (random.random() * 2 - 1)
            transform_t[i, i] *= 1.0 + scale_float * random_float
            
    if 'rotate' in augmentation_dict:
        angle_rad = random.random() * math.pi * 2
        s = math.sin(angle_rad)
        c = math.cos(angle_rad)
        
        rotation_t = torch.tensor([
            [c, -s, 0, 0],
            [s, c, 0, 0],
            [0, 0, 1, 0],
            [0, 0, 0, 1],
        ])
        
        transform_t @= rotation_t
    
    affine_t = F.affine_grid(
        transform_t[:3].unsqueeze(0).to(torch.float32),
        ct_t.size(),
        align_corners=False,
    )
    
    augmented_chunk = F.grid_sample(
        ct_t,
        affine_t,
        padding_mode='border',
        align_corners=False,
    ).to('cpu')
    
    if 'noise' in augmentation_dict:
        noise_t = torch.randn_like(augmented_chunk)
        noise_t *= augmentation_dict['noise']
        
        augmented_chunk += noise_t
    
    return augmented_chunk[0], center_irc

## 10.5 간단한 데이터셋 구현

In [10]:
import copy
import torch

from torch.utils.data import Dataset

  from .autonotebook import tqdm as notebook_tqdm


In [13]:
class LunaDataset(Dataset):
    def __init__(self,
                 val_stride=0,
                 isValSet_bool=None,
                 series_uid=None,
                 sortby_str='random',
                 ratio_int=0,
                 augmentation_dict=None,
                 candidateInfo_list=None,
                ):
        self.ratio_int = ratio_int
        self.augmentation_dict = augmentation_dict
        
        if candidateInfo_list:
            self.candidateInfo_list = copy.copy(candidateInfo_list)
            self.use_cache = False
        else:
            self.candidateInfo_list = copy.copy(getCandidateInfoList())
            self.use_cache = True
            
        if series_uid:
            self.candidateInfo_list = [
                x for x in self.candidateInfo_list if x.series_uid == series_uid
            ]
            
        if isValSet_bool:
            assert val_stride > 0, val_stride
            self.candidateInfo_list = self.candidateInfo_list[::val_stride]
            assert self.candidateInfo_list
        elif val_stride > 0:
            del self.candidateInfo_list[::val_stride]
            assert self.candidateInfo_list
            
        if sortby_str == 'random':
            random.shuffle(self.candidateInfo_list)
        elif sortby_str == 'series_uid':
            self.candidateInfo_list.sort(key=lambda x: (x.series_uid, x.center_xyz))
        elif sortby_str == 'label_and_size':
            pass
        else:
            raise Exception("Unknown sort: " + repr(sortby_str))
            
        self.negative_list = [
            nt for nt in self.candidateInfo_list if not nt.isNodule_bool
        ]
        self.pos_list = [
            nt for nt in self.candidateInfo_list if nt.isNodule_bool
        ]
        
        log.info("{!r}: {} {} samples, {} neg, {} pos, {} ratio".format(
            self,
            len(self.candidateInfo_list),
            "validation" if isValSet_bool else "training",
            len(self.negative_list),
            len(self.pos_list),
            '{}:1'.format(self.ratio_int) if self.ratio_int else 'unbalanced',
        ))
        
    def shuffleSamples(self):
        if self.ratio_int:
            random.shuffle(self.negative_list)
            random.shuffle(self.pos_list)
            
    def __len__(self):
        if self.ratio_int:
            return 200000
        else:
            return len(self.candidateInfo_list)
    
    def __getitem__(self, ndx):
        if self.ratio_int:  # ratio_int 가 0이면 값이 고르게 분포되는 것
            pos_ndx = ndx // (self.ratio_int + 1)
            
            # 이 부분은 오버플로되면 인덱스 앞쪽으로 돌아간다.
            if ndx % (self.ratio_int + 1):  # 나머지가 0이 아니면 음성 샘플
                neg_ndx = ndx - 1 - pos_ndx
                neg_ndx %= len(self.negative_list)
                candidateInfo_tup = self.negative_list[neg_ndx]
            else:
                pos_ndx %= len(self.pos_list)
                candidateInfo_tup = self.pos_list[pos_ndx]
        else:
            candidateInfo_tup = self.candidateInfo_list[ndx]  # 클래스 밸런싱이 아니면 N번째 샘플을 반환한다.
            
        width_irc = (32, 48, 48)
        
        if self.augmentation_dict:
            candidate_t, center_irc = getCtAugmentedCandidate(
                self.augmentation_dict,
                candidateInfo_tup.series_uid,
                candidateInfo_tup.center_xyz,
                width_irc,
                self.use_cache,
            )
        elif self.use_cache:
            candidate_a, center_irc = getCtRawCandidate(  # candidate_a 의 차원 정보는 (32, 48, 48)이고 각 축은 깊이, 높이, 너비다.
                candidateInfo_tup.series_uid,
                candidateInfo_tup.center_xyz,
                width_irc,
            )
        else:
            ct = getCt(candidateInfo_tup.series_uid)
            candidate_a, center_irc = ct.getRawCandidate(
                candidateInfo_tup.center_xyz,
                width_irc,
            )
            candidate_t = torch.from_numpy(candidate_a).to(torch.float32)
            candidate_t = candidate_t.unsqueeze(0)  # .unsqueeze(0) 으로 "채널" 차원을 추가한다.
        
        pos_t = torch.tensor([
            not candidateInfo_tup.isNodule_bool,
            candidateInfo_tup.isNodule_bool
        ],
            dtype=torch.long,
        )
        
        return candidate_t, pos_t, candidateInfo_tup.series_uid, torch.tensor(center_irc)

In [37]:
LunaDataset()[0]

{'1.3.6.1.4.1.14519.5.2.1.6279.6001.287966244644280690737019247886',
 tensor([ 87, 369, 351]),
 tensor([0, 1]),
 tensor([[[[-880., -846., -825.,  ..., -812., -772., -723.],
           [-862., -805., -813.,  ..., -833., -829., -801.],
           [-795., -770., -819.,  ..., -830., -808., -804.],
           ...,
           [-859., -858., -882.,  ...,  -35.,   -4.,   11.],
           [-883., -878., -876.,  ...,    0.,  -13.,  -35.],
           [-878., -845., -816.,  ...,    2.,  -15.,  -38.]],
 
          [[-924., -914., -879.,  ..., -794., -732., -638.],
           [-897., -881., -858.,  ..., -821., -787., -741.],
           [-884., -877., -875.,  ..., -824., -808., -811.],
           ...,
           [-891., -868., -784.,  ...,  -80.,  -99., -109.],
           [-887., -856., -759.,  ...,  -69.,  -83.,  -84.],
           [-884., -835., -766.,  ..., -103., -102., -100.]],
 
          [[-843., -843., -857.,  ..., -846., -837., -857.],
           [-862., -854., -864.,  ..., -844., -821., -831

### 10.5.1 getCtRawCandidate 함수로 후보 배열 캐싱하기

In [22]:
@functools.lru_cache(1, typed=True)
def getCt(series_uid):
    return Ct(series_uid)

@raw_cache.memoize(typed=True)
def getCtRawCandidate(series_uid, center_xyz, width_irc):
    ct = getCt(series_uid)
    ct_chunk, center_irc = ct.getRawCandidate(center_xyz, width_irc)
    return ct_chunk, center_irc

### 10.5.2 LunaDataset.__init__ 으로 데이터셋 만들기

In [39]:
class LunaDataset(Dataset):
    def __init__(self,
                 val_stride=0,
                 isValSet_bool=None,
                 series_uid=None,
                ):
        # 반환되는 값을 복사해서 self.candidateInfo_list 변경에 영향받지 않도록 캐시한다.
        self.candidateInfo_list = copy.copy(getCandidateInfoList())
        
        if series_uid:
            self.candidateInfo_list = [
                x for x in self.candidateInfo_list if x.series_uid == series_uid
            ]
            
    def __len__(self):
        return len(self.candidateInfo_list)
    
    def __getitem__(self, ndx):
        candidateInfo_tup = self.candidateInfo_list[ndx]
        width_irc = (32, 48, 48)
        
        candidate_a, center_irc = getCtRawCandidate(  # candidate_a 의 차원 정보는 (32, 48, 48)이고 각 축은 깊이, 높이, 너비다.
            candidateInfo_tup.series_uid,
            candidateInfo_tup.center_xyz,
            width_irc,
        )
        
        candidate_t = torch.from_numpy(candidate_a)
        candidate_t = candidate_t.to(torch.float32)
        candidate_t = candidate_t.unsqueeze(0)  # .unsqueeze(0) 으로 "채널" 차원을 추가한다.
        
        pos_t = torch.tensor([
            not candidateInfo_tup.isNodule_bool,
            candidateInfo_tup.isNodule_bool
        ],
            dtype=torch.long,
        )
        
        return {
            candidate_t,
            pos_t,
            candidateInfo_tup.series_uid,  # 이 부분이 훈련 샘플
            torch.tensor(center_irc),      # 이 부분이 훈련 샘플
        }    

### 10.5.3 훈련/검증 분리

In [41]:
class LunaDataset(Dataset):
    def __init__(self,
                 val_stride=0,
                 isValSet_bool=None,
                 series_uid=None,
                ):
        # 반환되는 값을 복사해서 self.candidateInfo_list 변경에 영향받지 않도록 캐시한다.
        self.candidateInfo_list = copy.copy(getCandidateInfoList())
        
        if series_uid:
            self.candidateInfo_list = [
                x for x in self.candidateInfo_list if x.series_uid == series_uid
            ]
            
        if isValSet_bool:
            assert val_stride > 0, val_stride
            self.candidateInfo_list = self.candidateInfo_list[::val_stride]
            assert self.candidateInfo_list
        elif val_stride > 0:
            del self.candidateInfo_list[::val_stride]  # self.candidateInfo_list 에서 검증용 이미지(매 val_stride 번째 아이템)를 삭제한다. 미리 복사한 값이므로 원래 리스트는 수정되지 않는다.
            assert self.candidateInfo_list
            
    def __len__(self):
        return len(self.candidateInfo_list)
    
    def __getitem__(self, ndx):
        candidateInfo_tup = self.candidateInfo_list[ndx]
        width_irc = (32, 48, 48)
        
        candidate_a, center_irc = getCtRawCandidate(  # candidate_a 의 차원 정보는 (32, 48, 48)이고 각 축은 깊이, 높이, 너비다.
            candidateInfo_tup.series_uid,
            candidateInfo_tup.center_xyz,
            width_irc,
        )
        
        candidate_t = torch.from_numpy(candidate_a)
        candidate_t = candidate_t.to(torch.float32)
        candidate_t = candidate_t.unsqueeze(0)  # .unsqueeze(0) 으로 "채널" 차원을 추가한다.
        
        pos_t = torch.tensor([
            not candidateInfo_tup.isNodule_bool,
            candidateInfo_tup.isNodule_bool
        ],
            dtype=torch.long,
        )
        
        return {
            candidate_t,
            pos_t,
            candidateInfo_tup.series_uid,  # 이 부분이 훈련 샘플
            torch.tensor(center_irc),      # 이 부분이 훈련 샘플
        }    

## 10.7 연습문제

### 1. LunaDataset 인스턴스를 순회하는 프로그램을 작성하고 시간을 재어보자. 처음에는 N=1000 샘플로 제한하여 실행해야 할 것이다.

#### **a.** 처음 실행할 때 걸린 시간은 얼마인가?

In [42]:
luna_dataset = LunaDataset()

In [43]:
import time

In [44]:
start = time.time()
for i in range(1000):
    luna_dataset[i]
end = time.time()
print(end - start)

669.8724207878113


#### **b.** 두 번째 실행할 때 걸린 시간은 얼마인가?

In [45]:
start = time.time()
for i in range(1000):
    luna_dataset[i]
end = time.time()
print(end - start)

3.2563316822052


#### **c.** 캐시를 클리어하면 실행 시간에 어떤 영향을 주는가?

영향을 주지 당연히

#### **d.** lastN=1000 샘플을 사용하면 첫 번째/두 번째 시간에 어떤 영향을 주는가?

In [52]:
start = time.time()
for i in range(len(luna_dataset)-1, len(luna_dataset) - 1001, -1):
    luna_dataset[-i]
end = time.time()
print(end - start)

0.29554176330566406


In [50]:
len(luna_dataset)

551065

In [53]:
start = time.time()
for i in range(len(luna_dataset)-1, len(luna_dataset) - 1001, -1):
    luna_dataset[-i]
end = time.time()
print(end - start)

0.20257782936096191


처음만 좀 시간 걸리고 그 다음부터는 엄청 빠르네