<a href="https://colab.research.google.com/github/xodud5654/redwoods_final/blob/main/Untitled30.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!wget http://chaos.inje.ac.kr:3030/data/chest_xray_data.zip

--2022-12-14 03:53:13--  http://chaos.inje.ac.kr:3030/data/chest_xray_data.zip
Resolving chaos.inje.ac.kr (chaos.inje.ac.kr)... 203.241.251.51
Connecting to chaos.inje.ac.kr (chaos.inje.ac.kr)|203.241.251.51|:3030... connected.
HTTP request sent, awaiting response... 200 OK
Length: 1225713864 (1.1G) [application/zip]
Saving to: ‘chest_xray_data.zip’


In [None]:
!unzip chest_xray_data.zip   # -d chest_xray_data


In [None]:
# import
import matplotlib.pyplot as plt 
from matplotlib.image import imread
import tensorflow
from tensorflow.keras import models, layers


In [None]:
## Setting up the data

## Set the image size 
IMG_SIZE = (224, 224)
batchSize = 32

print("Training Images:")
train_data = tensorflow.keras.preprocessing.image_dataset_from_directory(directory = train_dir,
                                                                 image_size = IMG_SIZE,
                                                                 label_mode = "binary",
                                                                 color_mode = "rgb",
                                                                 batch_size = batchSize)

print("Testing Images:")
test_data = tensorflow.keras.preprocessing.image_dataset_from_directory(directory = test_dir,
                                                                 image_size = IMG_SIZE,
                                                                 label_mode = "binary",
                                                                 color_mode = "rgb",
                                                                 batch_size = batchSize)

print("Validation Images:")
val_data = tensorflow.keras.preprocessing.image_dataset_from_directory(directory = validation_dir,
                                                                 image_size = IMG_SIZE,
                                                                 label_mode = "binary",
                                                                 color_mode = "rgb",
                                                                 batch_size = batchSize)

In [None]:
#
# Data augmentation
#
# Create a data augmentation stage with horizontal flipping, rotations, zooms
#
tensorflow.get_logger().setLevel('ERROR')  # Clear warnings in data augmentation

from tensorflow import keras
data_augmentation = keras.Sequential([
  layers.RandomFlip("horizontal"),
  layers.RandomRotation(0.2),
  layers.RandomZoom(0.2),
  layers.RandomHeight(0.2),  # Not compatible with model
  layers.RandomWidth(0.2),
  # layers.Rescaling(1./255) # keep for ResNet50V2, remove for EfficientNetB0
], name ="data_augmentation")


In [None]:
import matplotlib.pyplot as plt 
from matplotlib.image import imread
import tensorflow
from tensorflow import keras
from keras import models, layers

base_model = keras.applications.resnet50.ResNet50(include_top = False)
base_model.trainable = True # False

# Let's take a look to see how many layers are in the base model
print("Number of layers in the base model: ", len(base_model.layers))

# Fine-tune from this layer onwards
fine_tune_at = 85  # half of the whole layers

#  Fine-tuning after layer_number larger than 170
# Freeze all the layers before the `fine_tune_at` layer
for layer in base_model.layers[:fine_tune_at]:
    layer.trainable =  False

In [None]:
model = keras.Sequential([
  layers.Input(shape=(224,224,3),name='input_layer'),
  # layers.Rescaling(1./255),
  # data_augmentation,
  base_model,
  layers.GlobalMaxPooling2D(name = "global_max"),
  layers.Dense(128,activation='relu'),
  layers.Dense(1,activation='sigmoid')
])

model.compile(optimizer=keras.optimizers.Adam(learning_rate=0.0001), 
              loss = 'binary_crossentropy', 
              metrics = ['accuracy'])

model.summary()

In [None]:
keras.utils.plot_model(model, show_shapes=True)

In [None]:
early_stopping = keras.callbacks.EarlyStopping(monitor="val_loss", # watch the val loss metric
                                                  patience=10) # if val loss decreases for 10 epochs in a row, stop training
# Creating learning rate reduction callback
reduce_lr = keras.callbacks.ReduceLROnPlateau(monitor="val_loss",  
                                                 factor=0.25, # multiply the learning rate by 0.2 (reduce by 4x)
                                                 patience=4,  # 3,4,5
                                                 verbose=1, # print out when learning rate goes down 
                                                 min_lr=1e-7)
# Check the best model and save the best model
cp_callback = keras.callbacks.ModelCheckpoint(filepath="./model/chest_xray.hdf5", 
                              monitor='val_accuracy', verbose=0, save_best_only=True)
## Check the summary
for no, layer in enumerate(model.layers):
  print(no, layer.trainable)

history = model.fit(train_data, 
                    epochs=1, 
                    steps_per_epoch = len(train_data), 
                    validation_data = test_data,
                    validation_steps = len(test_data), # batchSize,
                    callbacks = [cp_callback, early_stopping, reduce_lr])

In [None]:
from keras.models import load_model
import numpy as np
model_best = load_model('./model/chest_xray.hdf5')
# model_best.evaluate(X_test, y_test)  #, batch_size=BATCH_SIZE)

# Correct-handling with tf_dataset - test_data,unbatch()
import tensorflow as tf

y_test=[]
y_pred=[]
# X_test = tf.zeros(shape=(128,128,3), name=None)  #
X_test = np.empty((224,224,3), int)

In [None]:
# Best model로 전체 테스트 batch에 대하여 다시 예측을 실시함.
for image, label in test_data.unbatch():   # 개개의 이미지에 대한 예측과 label 수집하면서 test 데이터를 X_test 로 저장
    # print(images.shape,labels.shape)
    y_pred0 = model_best.predict(np.expand_dims(image, axis=0), verbose=0) #, batch_size=1)  
    y_pred.append(np.round(y_pred0[0]))  #, axis=0)
    y_test.append(label.numpy())
    X_test = np.vstack([X_test,image])  #, axis=0)
#     tf.stack([X_test, image], axis=0)
    # break

# X_test = X_test.reshape(-1,128,128,3)
X_test=X_test[1:]
y_test = [int(i) for i in y_test]
y_pred = [int(i) for i in y_pred]
print(len(y_test),len(y_pred), X_test.shape)
# print(y_test.shape,y_pred.shape)

print("정답=", y_test[0])
print("예측값=", y_pred[0]) #, np.round(y_pred0[0]))

In [None]:
from sklearn.metrics import accuracy_score
from sklearn.metrics import confusion_matrix

cm = confusion_matrix(y_test, y_pred)
acc = accuracy_score(y_test, y_pred)

import seaborn as sns
plt.figure(figsize=(5,5))
sns.heatmap(cm, cbar=False, xticklabels=['NORMAL', 'PNEUMONIA'], yticklabels=['NORMAL', 'PNEUMONIA'], fmt='d', annot=True, cmap=plt.cm.Greens) #coolwarm)
plt.xlabel('Predicted')
plt.ylabel('Actual')
plt.title(f'Confusion matrix, Best ACC={round(acc,3)}', fontsize=16)
plt.show()