# Unit Testing the Machine Learning Pipeline Functions

This script provides basic testing of the functionality of the machine learning preprocessing pipeline.

In [1]:
import tensorflow as tf
import numpy as np
import keras
import matplotlib.pyplot as plt
import random
import pylidc as pl
import cv2
from scipy.ndimage import zoom

Using TensorFlow backend.


A single nodule stack is queried for the purpose of this script. It is stored as 2D and as 3D data.

In [2]:
malignancy_estimate = 5

ann = pl.query(pl.Annotation).filter(pl.Annotation.malignancy == malignancy_estimate)

padding = [(30,10), (10,25), (0,0)]

nodule_slices_2d = []
for nodule_slice in ann[:1]:
    vol = nodule_slice.scan.to_volume()
    bbox = nodule_slice.bbox(pad=padding)
    y = vol[bbox]
    for i in range(vol[bbox].shape[2]):
        nodule_slices_2d.append(vol[bbox][:,:,i])
        
np_nodules_2d = np.array(nodule_slices_2d)
        
nodules_3d = []

for nodule in ann[:1]:
    nodule_container = []
    vol = nodule.scan.to_volume()
    bbox = nodule.bbox(pad=padding)
    y = vol[bbox]
    for i in range(vol[bbox].shape[2]):
        nodule_container.append(vol[bbox][:,:,i])
    nodules_3d.append(nodule_container)
    
np_nodules_3d = []
for nodule in nodules_3d:
    np_nodules_3d.append(np.array(nodule))

Loading dicom files ... This may take a moment.
Loading dicom files ... This may take a moment.


# Unit Test 1

The 2D resizing and labelling function is tested here. Regardless of the initial size of the slice, the returned image must be of size 227x227. Furthermore, it must be labelled with an integer, either 0 or 1.

In [3]:
def compress2d(slices, cancer=True):
    newSlices = []  
    if cancer == True:
        label = 1
    else:
        label = 0
    for slice in slices:
        y = cv2.resize(np.array(slice),(227,227))      
        newSlices.append([y, label])
    return newSlices

In [4]:
print("Original Size:" ,np_nodules_2d[0].shape)
resized_nodules_2d = compress2d(np_nodules_2d)
print("New Size: ", resized_nodules_2d[0][0].shape)
print("Label: ", resized_nodules_2d[0][1])

if resized_nodules_2d[0][0].shape == (227,227) and resized_nodules_2d[0][1] == (0 or 1):
    print("Success")
else:
    print("Fail")

Original Size: (88, 74)
New Size:  (227, 227)
Label:  1
Success


# Unit Test 2

The 3D resizing and labelling function is tested here. Regardless of the initial size of the slice, the returned list must contain 20 images of size 111x111. Furthermore, the data must be labelled with an integer, either 0 or 1.

In [5]:
def compress3d(slices, cancer=True):
    newSlices = []  
    for slice in slices:
        y = cv2.resize(np.array(slice),(111,111))
        newSlices.append(y)        
    l = len(slices)
    x = l/20
    if cancer == True:
        label = 1
        return [zoom(newSlices, (1/x, 1, 1)), label]
    else:
        label = 0
        return [zoom(newSlices, (1/x, 1, 1)), label]

In [6]:
print("Original size:", np_nodules_3d[0].shape)
resized_nodules_3d = compress3d(np_nodules_3d[0])
print("New size: ", resized_nodules_3d[0].shape)
print("Label: ", resized_nodules_3d[1])

if resized_nodules_3d[0].shape == (20,111,111) and resized_nodules_3d[1] == (0 or 1):
    print("Success")
else:
    print("Fail")

Original size: (8, 88, 74)
New size:  (20, 111, 111)
Label:  1
Success


# Unit Test 3

The normalization function is tested here. It is expected to return an image whose pixels fall within the range [0,1]. Although normalizing with a min_max technique should change the minimum pixel to 0.0 and the maximum to 1.0, in this case the minimum and maximum bounds were slightly altered to account for outliers, and thus the resulting minimum/maximum values are not necessarily 0.0/1.0, although they always fall within this range.

In [7]:
MIN_BOUND = -1100
MAX_BOUND = 600

def min_max_normalize(image):
    image2 = (image - MIN_BOUND) / (MAX_BOUND - MIN_BOUND)
    image2[image2>1] = 1.
    image2[image2<0] = 0.
    return image2

In [8]:
print("Original minimum pixel: ", resized_nodules_2d[0][0].min(), "Original maximum pixel: ", resized_nodules_2d[0][0].max())
normalized_image = min_max_normalize(resized_nodules_2d[0][0])
print("New minimum pixel: ", normalized_image.min(), "New maximum pixel: ", normalized_image.max())

if 0 <= normalized_image.min() <= 1:
    if 0 <= normalized_image.max() <= 1:        
        print("Success")
    else:
        print("Fail")
        
print(test_status)

Original minimum pixel:  -899 Original maximum pixel:  431
New minimum pixel:  0.11823529411764706 New maximum pixel:  0.9005882352941177
Success


# Unit Test 4

The stacking function is tested here. It is expected to stack an image on itself thrice, expanding across one dimension.

In [9]:
def stack_data(concatenated_data):

    shuffled_stack = []
    for nod in concatenated_data: 
        nod_tuple = []
        label = nod[1]
        stacked_img = np.stack((nod[0],)*3, axis=-1)
        nod_tuple.append(stacked_img)
        nod_tuple.append(label)
        shuffled_stack.append(nod_tuple)
        
    return shuffled_stack

In [10]:
print("Original shape:", resized_nodules_2d[0][0].shape)
stacked_data = stack_data(resized_nodules_2d)
print("New shape:", stacked_data[0][0].shape)

if stacked_data[0][0].shape == (227,227,3):
    print("Success")
else:
    print("Fail")

Original shape: (227, 227)
New shape: (227, 227, 3)
Success


# Unit Test 5

This function is expected to split a dataset according to a given ratio, returning the two partitions of the original dataset.

In [11]:
def split_data(data, ratio_of_train):
    training_data = []
    testing_data = []
    num_of_samples = len(data)
    len_of_training_data = int(ratio_of_train*num_of_samples)
    len_of_testing_data = num_of_samples - len_of_training_data
    
    for i in range(len_of_training_data):
        training_data.append(data[i])
    for i in range(len_of_training_data, num_of_samples):
        testing_data.append(data[i])
        
    return training_data, testing_data

In [12]:
case_1 = []
case_2 = [1 for i in range(33)]
case_3 = [1 for i in range(100)]

In [13]:
results_1 = split_data(case_1, 0.8)
results_2 = split_data(case_1, 0)
results_3 = split_data(case_2, 0.5)
results_4 = split_data(case_2, 0.99)
results_5 = split_data(case_3, 0.8)

In [14]:
test_status = "Success"

if results_1 != ([],[]):
    test_status = "Fail"
    
if results_2 != ([],[]):
    test_status = "Fail"
    
if results_3 != ([1 for i in range(16)],[1 for i in range(17)]):
    test_status = "Fail"
    
if results_4 != ([1 for i in range(32)],[1 for i in range(1)]):
    test_status = "Fail"
        
if results_5 != ([1 for i in range(80)],[1 for i in range(20)]):
    test_status = "Fail"
    
print(test_status)

Success
