In [2]:
import tensorflow as tf
from tensorflow.keras.preprocessing.image import load_img, img_to_array
from tensorflow.keras.utils import Sequence

import numpy as np
import os
from keras.utils import Sequence
from keras.preprocessing.image import ImageDataGenerator
from keras import Model
from keras.layers import Conv2D, MaxPooling2D, Flatten, Dense, Dropout, BatchNormalization, Activation
from keras import datasets, layers, models

import numpy as np
import os
import matplotlib.pyplot as plt
import warnings 
warnings.filterwarnings('ignore')

In [15]:
class NoduleDatasetGenerator(Sequence):
    def __init__(self, data_path, image_path, batch_size, image_size, shuffle = True, augment = True):
        self.batch_size = batch_size
        self.image_size = image_size
        self.shuffle = shuffle
        self.augment = augment
        self.data_path = data_path
        self.images_dir = os.path.join(data_path, 'images', image_path)
        
        if image_path == 'train':
            self.labels_file = os.path.join(data_path, 'labels', 'trainlabels.txt')
        elif image_path == 'val':
            self.labels_file = os.path.join(data_path, 'labels', 'vallabels.txt')
        elif image_path == 'test':
            self.labels_file = os.path.join(data_path, 'labels', 'testlabels.txt')
            
        self.dataset = self._load_data(self.images_dir, self.labels_file)
        self.indexes = np.arange(len(self.dataset))
        self.on_epoch_end()
        if self.augment:
            self.datagen = ImageDataGenerator(
                rotation_range = 30,
                width_shift_range = 0.1,
                height_shift_range = 0.1,
                shear_range = 0.1,
                zoom_range = 0.1,
                horizontal_flip = True,
                fill_mode = 'nearest')
        else:
            self.datagen = ImageDataGenerator(
                shear_range = 0.1,
                zoom_range = 0.1,
                horizontal_flip = True,
                fill_mode = 'nearest')
        
    def __getitem__(self, index):
        batch_indexes = self.indexes[index*self.batch_size:(index+1)*self.batch_size]
        batch_data = [self.dataset[i] for i in batch_indexes]
        X, y = self._data_generation(batch_data)
        return X, y
        
    def __len__(self):
        return int(np.ceil(len(self.dataset) / self.batch_size))
        
    def on_epoch_end(self):
        if self.shuffle:
            np.random.shuffle(self.indexes)
            
    def _load_data(self, images_dir, labels_file):
        with open(labels_file, 'r') as f:
            lines = f.readlines()

        data = []
        for line in lines[1:]:
            filename, label = line.strip().split()
            filename = os.path.basename(filename)
            label = int(label)
            data.append((filename, label))
        return data
        
    def _data_generation(self, batch_data):
        X = np.empty((self.batch_size, *self.image_size, 3))
        y = np.empty((self.batch_size), dtype=int)
        
        for i, (filename, label) in enumerate(batch_data):
            img_path = os.path.join(self.images_dir, filename)
            img = load_img(img_path, target_size=self.image_size)
            img = img_to_array(img)
            
            if self.augment:
                img = self.datagen.random_transform(img)
                
            X[i] = img
            y[i] = label
            
        return X, y


data_path = 'D:/nodule/data'
train_gen = NoduleDatasetGenerator(data_path, 'train', batch_size = 32, image_size = (224, 224), shuffle = True, augment = True)
valid_gen = NoduleDatasetGenerator(data_path, 'val',   batch_size = 32, image_size = (224, 224), shuffle = True, augment = True)
tests_gen = NoduleDatasetGenerator(data_path, 'test',  batch_size = 32, image_size = (224, 224))

In [17]:
class LeNet(Model):
    def __init__(self):
        super().__init__()
        self.conv1 = Conv2D(32, 3, padding='same', activation='relu')
        self.conv2 = Conv2D(64, 3, padding='same', activation='relu')
        self.maxpool1 = MaxPooling2D(pool_size=(2, 2))
        self.maxpool2 = MaxPooling2D(pool_size=(2, 2))
        self.flatten = Flatten()
        self.fc1 = Dense(256, activation = 'relu')
        self.dropout1 = Dropout(0.5)
        self.fc2 = Dense(32, activation = 'relu')
        self.dropout2 = Dropout(0.5)
        self.fc3 = Dense(2)

        self.batchnorm1 = BatchNormalization()
        self.batchnorm2 = BatchNormalization()

    def call(self, x):
        x = self.conv1(x)
        x = self.batchnorm1(x)
        x = Activation('relu')(x)
        x = self.maxpool1(x)
        x = self.conv2(x)
        x = self.batchnorm2(x)
        x = Activation('relu')(x)
        x = self.maxpool2(x)
        x = self.flatten(x)
        x = self.fc1(x)
        x = self.dropout1(x)
        x = self.fc2(x)
        x = self.dropout2(x)
        x = self.fc3(x)
        return x
    
model = LeNet()
model.compile(optimizer = 'adam',
              loss = tf.keras.losses.SparseCategoricalCrossentropy(from_logits = True),
              metrics = ['accuracy'])

history = model.fit(train_gen,
                    epochs = 50,
                    validation_data = valid_gen)