In [None]:
from typing import List, Tuple
import numpy as np
import tensorflow as tf
from glob import glob, iglob
import matplotlib.pyplot as plt
import cv2
from dataclasses import dataclass

# Aquisição do Conjunto de Dados

In [None]:
!wget https://github.com/nmcardoso/poli/releases/download/v0.0.1/COVID-QU-Ex.zip
!unzip -q "COVID-QU-Ex.zip"

[1;30;43mA saída de streaming foi truncada nas últimas 5000 linhas.[0m
  inflating: Lung Segmentation Data/Lung Segmentation Data/Val/Non-COVID/lung masks/non_COVID (10835).png  
  inflating: Lung Segmentation Data/Lung Segmentation Data/Val/Non-COVID/lung masks/non_COVID (10836).png  
  inflating: Lung Segmentation Data/Lung Segmentation Data/Val/Non-COVID/lung masks/non_COVID (10837).png  
  inflating: Lung Segmentation Data/Lung Segmentation Data/Val/Non-COVID/lung masks/non_COVID (10838).png  
  inflating: Lung Segmentation Data/Lung Segmentation Data/Val/Non-COVID/lung masks/non_COVID (10839).png  
  inflating: Lung Segmentation Data/Lung Segmentation Data/Val/Non-COVID/lung masks/non_COVID (10840).png  
  inflating: Lung Segmentation Data/Lung Segmentation Data/Val/Non-COVID/lung masks/non_COVID (10841).png  
  inflating: Lung Segmentation Data/Lung Segmentation Data/Val/Non-COVID/lung masks/non_COVID (10843).png  
  inflating: Lung Segmentation Data/Lung Segmentation Data/Val/

# Carregamento do Conjunto de Dados

In [None]:
def leImagens(
  wildcards: List[str],
  classes: List[int],
  nl: int,
  nc: int
) -> Tuple[np.ndarray, np.ndarray]:
  total = sum([len(glob(p)) for p in wildcards])
  images = np.empty(shape=(total, nc, nl), dtype='uint8')
  labels = np.empty(shape=(total,), dtype='uint8')
  i = 0
  for wc, cl in zip(wildcards, classes):
    for path in iglob(wc):
      images[i, ...] = cv2.resize(cv2.imread(path, cv2.IMREAD_GRAYSCALE), (nc, nl), interpolation=cv2.INTER_LINEAR)
      labels[i] = cl
      i += 1
  return images, labels



def load_dataset(
  width: int = 224,
  height: int = 224,
  oversample: bool = False,
  undersample: bool = False,
  resample_ratio: float = 1.0,
  clahe: bool = False,
  clahe_clip: float = 4.0,
  clahe_grid: Tuple[int, int] = (8, 8)
) -> Tuple[np.ndarray]:
  path_pattern = 'Lung Segmentation Data/Lung Segmentation Data/{subset}/{cls}/images/*.png'
  classes = ['COVID-19', 'Non-COVID', 'Normal']
  classes_id = [1, 0, 0]

  wcs = [path_pattern.format(subset='Train', cls=c) for c in classes]
  ax, ay = leImagens(wcs, classes_id, height, width)
  wcs = [path_pattern.format(subset='Test', cls=c) for c in classes]
  qx, qy = leImagens(wcs, classes_id, height, width)
  wcs = [path_pattern.format(subset='Val', cls=c) for c in classes]
  vx, vy = leImagens(wcs, classes_id, height, width)

  if clahe:
    clahe_mat = cv2.createCLAHE(clipLimit=clahe_clip, tileGridSize=clahe_grid)
    apply_clahe = np.vectorize(lambda x: clahe_mat.apply(x))
    ax = apply_clahe(ax)
    qx = apply_clahe(qx)
    vx = apply_clahe(vx)

  if undersample or oversample:
    minority_idx = np.nonzero(ay == 1)[0]
    majority_idx = np.nonzero(ay == 0)[0]
    minority_count = len(minority_idx)
    majority_count = len(majority_idx)
    delta_count = int(resample_ratio * abs(majority_count - minority_count))
    rng = np.random.default_rng(seed=42)
    if oversample:
      repeat_idx = rng.choice(minority_idx, delta_count, delta_count > minority_count)
      ax = np.concatenate((ax, ax[repeat_idx]))
      ay = np.concatenate((ay, ay[repeat_idx]))
    if undersample:
      delete_idx = rng.choice(majority_idx, delta_count, False)
      ax = np.delete(ax, delete_idx, axis=0)
      ay = np.delete(ay, delete_idx, axis=0)

  return ax, ay, qx, qy, vx, vy

# Arquitetura da Rede Neural

In [None]:
def build_model(weights: str = None, input_shape: Tuple = (224, 224)):
  preprocessing = tf.keras.Sequential([
    tf.keras.layers.Lambda(lambda img: tf.image.grayscale_to_rgb(img)),
    # tf.keras.layers.Rescaling(scale=1./255),
    keras.applications.densenet.preprocess_input,
  ], name='preprocessing')

  data_augmentation = tf.keras.Sequential([
    tf.keras.layers.RandomFlip('horizontal'),
    # tf.keras.layers.RandomRotation(0.2),
    tf.keras.layers.RandomZoom(0.15),
    tf.keras.layers.RandomContrast(0.1),
    tf.keras.layers.RandomBrightness(0.1)
  ], name='data_augmentation')

  encoder = tf.keras.applications.DenseNet121(
    weights=weights,
    include_top=False,
  )

  classification_head = tf.keras.Sequential([
    tf.keras.layers.Dense(512, activation='relu'),
    tf.keras.layers.Dropout(0.4),
    tf.keras.layers.Dense(2),
  ], name='classification_head')

  model = tf.keras.sequential([
    tf.keras.layers.Input(input_shape=input_shape),
    preprocessing,
    data_augmentation,
    encoder,
    classification_head,
  ])

  return model



@dataclass
class ModelSummary:
  model_path: str = None
  history: dict = None
  val_preds: dict = None
  test_preds: dict = None
  val_score: dict = None
  test_score: dict = None


def model_summary(model: tf.keras.Model, X_val, y_val, X_test, y_test):
  history = model.history.history
  val_preds = model.predict(X_val)
  test_preds = model.predict(X_test)
  val_score = model.evaluate(X_val, y_val)
  test_score = model.evaluate(X_test, y_test)
  return ModelSummary(
    history=history,
    val_preds=val_preds,
    test_preds=test_preds,
    val_score=val_score,
    test_score=test_score
  )


def train_model(
  X_train: np.ndarray,
  X_val: np.ndarray,
  y_train: np.ndarray,
  y_val: np.ndarray,
  epochs: int,
  weights: str = None,
  lr: float = 1e-5,
):
  model = build_model(weights=weights, input_shape=X_train.shape[1:])
  optimizer = tf.keras.optimizers.Adam(learning_rate=lr)
  model.compile(optimizer=optimizer)
  model.fit(epochs=epochs)
  summary = model_summary(model)
  return summary