<a href="https://colab.research.google.com/github/limseo12/Breast-Ultrasound_CNN/blob/main/Breast.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import tensorflow as tf
from tensorflow.keras.layers import Dropout, Dense, Flatten, Conv2D, MaxPooling2D
from tensorflow.keras.models import Sequential
from tensorflow.keras.preprocessing.image import ImageDataGenerator

In [None]:
from google.colab import drive

drive.mount('/content/gdive/')

# ROOT_DIR, DATA_ROOT_DIR 설정

In [None]:
import os
import shutil
ROOT_DIR = '/content'

DATA_ROOT_DIR = os.path.join(ROOT_DIR, 'Dataset_BUSI_with_GT')

CLASSIFICATION_DATA_ROOT_DIR = os.path.join(ROOT_DIR, 'Classification')

CLASSIFICATION_TRAIN_DATA_ROOT_DIR = os.path.join(CLASSIFICATION_DATA_ROOT_DIR, 'train')

In [None]:
import shutil

if os.path.exists(DATA_ROOT_DIR):
  shutil.rmtree(DATA_ROOT_DIR)

if os.path.exists(CLASSIFICATION_DATA_ROOT_DIR):
  shutil.rmtree(CLASSIFICATION_DATA_ROOT_DIR)

# Google Drive 에서 dataset download

In [None]:
import shutil
import os

try:
  dataset_path = '/content/gdive/MyDrive/dataset'

  shutil.copy(os.path.join(dataset_path, 'Dataset_BUSI_with_GT.zip'), '/content')

except Exception as err:
  print*(str(err))

In [None]:
import  zipfile

with zipfile.ZipFile(os.path.join(ROOT_DIR, 'Dataset_BUSI_with_GT.zip'), 'r') as target_file:

  target_file.extractall(DATA_ROOT_DIR)

In [None]:
import shutil
import os
import glob

total_file_list = glob.glob(os.path.join(DATA_ROOT_DIR, '*'))

label_name_list = [ file_name.split('/')[-1].strip() for file_name in total_file_list if os.path.isdir(file_name) == True]

if not os.path.exists(CLASSIFICATION_DATA_ROOT_DIR):
  os.mkdir(CLASSIFICATION_DATA_ROOT_DIR)

for label_name in label_name_list:

  src_dir_path = os.path.join(DATA_ROOT_DIR, label_name)
  dst_dir_path = os.path.join(CLASSIFICATION_DATA_ROOT_DIR,
                              'train' + '/' +label_name)
  try:
      shutil.copytree(src_dir_path, dst_dir_path)
  except Exception as err:
      print(str(err))

train_label_name_list = os.listdir(CLASSIFICATION_TRAIN_DATA_ROOT_DIR)

for label_name in train_label_name_list:
  print('train label : ', label_name,' => ', len(os.listdir(os.path.join(CLASSIFICATION_TRAIN_DATA_ROOT_DIR, label_name))))

# Classification 데이터 생성

In [None]:
import os

if not os.path.exists(CLASSIFICATION_DATA_ROOT_DIR):
    os.mkdir(CLASSIFICATION_DATA_ROOT_DIR)

In [None]:
#copytree 이용해서 정답 이름/images 디렉토리를 train/정딥 이름 으로 복사

for label_name in label_name_list:

  src_dir_path = os.path.join(DATA_ROOT_DIR, label_name)
  dst_dir_path = os.path.join(CLASSIFICATION_DATA_ROOT_DIR, 'train'+'/'+label_name)

  try:
    shutil.copytree(src_dir_path, dst_dir_path)
    print(label_name+' copytree is done !!')

  except Exception as err:
    print(str(err))

In [None]:
import os

train_label_name_list = os.listdir(CLASSIFICATION_TRAIN_DATA_ROOT_DIR)

for label_name in train_label_name_list:

  print('train label : ', label_name, ' => ', len(os.listdir(os.path.join(CLASSIFICATION_TRAIN_DATA_ROOT_DIR))))

In [None]:
import os
import glob
import pandas as pd

train_label_name_list = os.listdir(CLASSIFICATION_TRAIN_DATA_ROOT_DIR)

for label_name in train_label_name_list:

  temp_dic = {}

  file_list = glob.glob(CLASSIFICATION_TRAIN_DATA_ROOT_DIR+'/'+label_name+'/*')

  temp_dic[label_name] = file_list

  temp_df = pd.DataFrame(temp_dic)

  image_file_df = temp_df[-temp_df[label_name].str.contains('_mask')].reset_index(drop=True)

  mask_file_df = temp_df[temp_df[label_name].str.contains('_mask')].reset_index(drop=True)

  print('label =', label_name, ' , image = ', len(image_file_df), ' , mask = ', len(mask_file_df))

  for row in range(len(mask_file_df)):

      try:
          os.remove(mask_file_df.loc[row.label_name])
      except Exception as err:
          print(str(err))

Preparing the Data - 학습 데이터 증강 (Data Augmentation)

In [None]:
import os
train_label_name_list = os.listdir(CLASSIFICATION_TRAIN_DATA_ROOT_DIR)

total_image_data_nums = 0

for label_name in train_label_name_list:

  image_data_nums = len(os.listdir(os.path.join(CLASSIFICATION_TRAIN_DATA_ROOT_DIR, label_name)))

  print('label = ', label_name, ' , data nums =', image_data_nums)

  total_image_data_nums += image_data_nums

print('total image data nums = ', total_image_data_nums)

In [None]:
TOTAL_TRAIN_IMAGE_DATA_NUMS = total_image_data_nums

In [None]:
original_gen = ImageDataGenerator(rescale=1./255)
original_generator = original_gen.flow_from_directory(CLASSIFICATION_TRAIN_DATA_ROOT_DIR,
                                                      batch_size=TOTAL_TRAIN_IMAGE_DATA_NUMS, shuffle=False,
                                                      target_size=(224,224), class_mode='sparse')

In [None]:
x_train, y_train = original_generator.next()
print(x_train.shape, y_train.shape)

# Augmentation Data 생성

In [None]:
AUGMENTATION_COUNT = 4  #원본데이터의 4배 증대

In [None]:
augmentation_gen = ImageDataGenerator(rescale=1./255, rotation_range=10, shear_range=0.1, zoom_range=0.1,
                                      horizontal_flip=True, width_shift_range=0.1, height_shift_range=0.1)

augmentation_generator = augmentation_gen.flow_from_directory(CLASSIFICATION_TRAIN_DATA_ROOT_DIR,
                                    batch_size=TOTAL_TRAIN_IMAGE_DATA_NUMS, shuffle=False,
                                    target_size=(224, 224), class_mode='sparse')

In [None]:
import numpy as np

print('====================================================')
print('[bofore] ', x_train.shape, y_train.shape)
print('====================================================')

for i in range(AUGMENTATION_COUNT):    # 780개의 전체 데이터에 대해서 AUGMENTATION_COUNT 배 AUGMENTATION 실행

    x_augmented, y_augmented = augmentation_generator.next()

    x_train = np.concatenate( (x_train, x_augmented) )
    y_train = np.concatenate( (y_train, y_augmented) )

print('[after] ', x_train.shape, y_train.shape)
print('====================================================')

# train data random shuffle

In [None]:
s = np.arange(x_train.shape[0])

np.random.shuffle(s)

x_train = x_train[s]
y_train = y_train[s]

In [None]:
SPLIT_RATIO = 0.2    # train : test = 8 : 2

In [None]:
split_num = int(SPLIT_RATIO*(x_train.shape[0]))

x_test = x_train[:split_num]

y_test = y_train[:split_num]

print(x_test.shape, y_test.shape)

In [None]:
x_train = x_train[split_num:]

y_train = y_train[split_num:]

print(x_train.shape, y_train.shape)

# 16 개의 데이터와 정답 출력

In [None]:
import matplotlib.pyplot as plt

class_dict = {0:'benign', 1:'malignant', 2:'normal'}

plt.figure(figsize=(8,8))

for i in range(16):

    plt.subplot(4, 4, i+1)
    plt.title(str(class_dict[int(y_train[i])]))
    plt.xticks([])
    plt.yticks([])

    plt.imshow(x_train[i])

plt.tight_layout()
plt.show()

IMAGE SHAPE 설정

In [None]:
IMG_WIDTH = 224
IMG_HEIGHT = 224

# Sequential 모델 구축

In [None]:
model = Sequential()

model.add(Conv2D(input_shape=(32,32,3), kernel_size=(3,3), filters=32, activation='relu'))
model.add(Conv2D(kernel_size=(3,3), filters=64, activation='relu'))
model.add(MaxPooling2D(pool_size=(2,2)))
model.add(Dropout(0.25))

model.add(Flatten())

model.add(Dense(256, activation='relu'))
model.add(Dropout(0.5))
model.add(Dense(class_dict, activation='softmax'))

model.summary()

In [None]:
model.compile(loss='sparse_categorical_crossentropy',
              optimizer=tf.keras.optimizers.Adam(), metrics=['accuracy'])

In [None]:
from tensorflow.keras.callbacks import EarlyStopping
from datetime import datetime

earlystopping = EarlyStopping(monitor='val_loss', patience=5, verbose=1)

start_time = datetime.now()

hist = model.fit(x_train, y_train, batch_size=32, epochs=30,
                 validation_data=(x_test, y_test), callbacks=[earlystopping])

end_time = datetime.now()

print('elapsed time => ', end_time-start_time)

In [None]:
model.evaluate(x_test, y_test)

In [None]:
y_pred = model.predict(x_test)

print(y_pred.shape)

In [None]:
import matplotlib.pyplot as plt

plt.plot(hist.history['accuracy'], label='train')
plt.plot(hist.history['val_accuracy'], label='validation')
plt.title('Accuracy Trend')
plt.ylabel('accuracy')
plt.xlabel('epoch')
plt.legend(loc='best')
plt.grid()
plt.show()

In [None]:
plt.plot(hist.history['loss'], label='train')
plt.plot(hist.history['val_loss'], label='validation')
plt.title('Loss Trend')
plt.ylabel('loss')
plt.xlabel('epoch')
plt.legend(loc='best')
plt.grid()
plt.show()