In [None]:
# Treinamento de uam rede CNN para classificação de imagens de mamografia

# Importando as bibliotecas
import numpy as np
import matplotlib.pyplot as plt
import os
import random
import pickle
import tensorflow as tf
from tensorflow.keras.models import Sequential

from tensorflow.keras.layers import Dense, Dropout, Activation, Flatten, Conv2D, MaxPooling2D

import pandas as pd

# Carregando os dados

WORK_DIR = os.getcwd()
UP_DIR = os.path.dirname(WORK_DIR)
UP_DIR = os.path.dirname(UP_DIR)
DATADIR = os.path.join(UP_DIR, 'data', 'cbis-ddsm')
CATEGORIES = ['BENIGN', 'MALIGNANT']

# csv com os dados das imagens
df_train = pd.read_csv(os.path.join(DATADIR, 'Train', 'clean_mass_train_description2.csv'))
df_test = pd.read_csv(os.path.join(DATADIR, 'Test', 'clean_mass_test_description2.csv'))

# Verificando os dados
print(df_train.head())
print(df_test.head())

# Verificando o tamanho dos dados
print(df_train.shape)
print(df_test.shape)

# Verificando a distribuição das classes
print(df_train['pathology'].value_counts())
print(df_test['pathology'].value_counts())

# Verificando a distribuição das classes
print(df_train['breast_density'].value_counts())
print(df_test['breast_density'].value_counts())

# Verificando a distribuição das classes
print(df_train['left or right breast'].value_counts())
print(df_test['left or right breast'].value_counts())


# Exclude Begnin withouth callback
df_train = df_train[df_train['pathology'] != 'BENIGN_WITHOUT_CALLBACK']
df_test = df_test[df_test['pathology'] != 'BENIGN_WITHOUT_CALLBACK']


# Verificando a distribuição das classes
print(df_train['pathology'].value_counts())
print(df_test['pathology'].value_counts())


In [None]:
# imports para processamento dos dados 
import cv2
from sklearn.model_selection import train_test_split


# Preparando os dados

CATEGORIES = ['BENIGN', 'MALIGNANT']
IMG_SIZE = 224
def create_training_data(df, img_size, categories, datadir):
    training_data = []
    for index, row in df.iterrows():
        try:
            img_array = plt.imread(os.path.join(datadir, row['image file path']),0)
            new_array = cv2.resize(img_array, (img_size, img_size))
            # Normalizing the data
            # To Numpy array
            new_array = np.array(new_array)
            training_data.append([new_array, categories.index(row['pathology'])])
            # Benign = 0, Malignant = 1
                        

            print(index)
        except Exception as e:
            print(e)
            pass
    return training_data

train_data = create_training_data(df_train, IMG_SIZE, CATEGORIES, os.path.join(DATADIR, 'Train', 'Train'))
testing_data = create_training_data(df_test, IMG_SIZE, CATEGORIES, os.path.join(DATADIR, 'Test', 'Test'))

# Verificando o tamanho dos dados
print(len(train_data))
print(len(testing_data))

# Verificando os dados
print(train_data[0])
print(testing_data[0])

# Embaralhando os dados
random.shuffle(train_data)
random.shuffle(testing_data)


# Separando os dados
X_train = []
y_train = []
for features, label in train_data:
    X_train.append(features)
    y_train.append(label)

X_test = []
y_test = []
for features, label in testing_data:
    X_test.append(features)
    y_test.append(label)


# Convertendo para numpy array
X_train = np.array(X_train)
y_train = np.array(y_train)

X_test = np.array(X_test)
y_test = np.array(y_test)



In [None]:
# Métricas de media e desvio padrão dos dados
print(X_train.shape)
print(X_test.shape)
print(np.mean(X_train))
print(np.std(X_train))
print(np.mean(X_test))
print(np.std(X_test))




# Merge data and separate in train and test
X = np.concatenate((X_train, X_test), axis=0)
y = np.concatenate((y_train, y_test), axis=0)



# Split data
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

# Métricas de media e desvio padrão dos dados
print(X_train.shape)
print(X_test.shape)
print(np.mean(X_train))
print(np.std(X_train))
print(np.mean(X_test))
print(np.std(X_test))

#print dtypes
print(X_train.dtype)
print(X_test.dtype)
print(X_train[0].dtype)

# Equalizing histogram
def equalize_hist(img):
    # Convert to uint8
    img = (img*255).astype(np.uint8)
    img = cv2.equalizeHist(img)
    return img


X_test_eq = []
for img in X_test:
    X_test_eq.append(equalize_hist(img))

X_train_eq = []
for img in X_train:
    X_train_eq.append(equalize_hist(img))

X_train_eq = np.array(X_train_eq)
X_test_eq = np.array(X_test_eq)

X_train = X_train_eq
X_test = X_test_eq


# To 3 channels
X_train_o = np.stack((X_train,)*3, axis=-1)
X_test_o = np.stack((X_test,)*3, axis=-1)






In [None]:
# Cleaning GPU memory
tf.keras.backend.clear_session()





# Teste com modelo pre-treinado VGG16
from tensorflow.keras.applications import VGG16
from tensorflow.keras.models import Model
from tensorflow.keras.layers import GlobalAveragePooling2D



# Pre processing layers
from tensorflow.keras.applications.vgg16 import preprocess_input

# Pre processamento
X_train = preprocess_input(X_train_o)
X_test = preprocess_input(X_test_o)


# Carregando o modelo
vgg16 = VGG16(weights='imagenet', include_top=False, input_shape=(IMG_SIZE, IMG_SIZE, 3))


# Congelando as camadas
for layer in vgg16.layers:
    layer.trainable = False

# Head do modelo
x = vgg16.output
x = GlobalAveragePooling2D()(x)
x = Dense(1024, activation='relu')(x)
x = Dense(512, activation='relu')(x)
preds = Dense(1, activation='sigmoid')(x)


# LR Scheduler function
# Linear warmup, then cosine decay
def lrate(epoch):
    # Warmup
    wrmup = 20
    peak = 0.00005
    lenght = 60
    if epoch < wrmup:
        return peak * (epoch+1) / wrmup
    else:
        return peak/2 * (1 + np.cos((epoch-wrmup) / (lenght -wrmup) * np.pi))
    
# PLot the learning rate for the first 100 epochs
lrs = [lrate(i) for i in range(60)]
plt.plot(lrs)
plt.show()

# Callbacks
lrate = tf.keras.callbacks.LearningRateScheduler(lrate, verbose=1)

# Early stopping
early_stopping = tf.keras.callbacks.EarlyStopping(monitor='val_loss', patience=10, verbose=1, mode='auto')

# Checkpoint
checkpoint = tf.keras.callbacks.ModelCheckpoint('model.h5', monitor='val_loss', save_best_only=True, verbose=1)

# Otimizador usando callback
sgd = tf.keras.optimizers.SGD(lr=0.0, momentum=0.9)

# Modelo
model = Model(inputs=vgg16.input, outputs=preds)

# Visualizando o modelo
model.summary()

# Compilando o modelo
model.compile(optimizer=sgd, loss='binary_crossentropy', metrics=['accuracy'])

# Treinando o modelo
history = model.fit(X_train, y_train, epochs=60, validation_data=(X_test, y_test), callbacks=[lrate, early_stopping, checkpoint])


# Avaliando o modelo
score = model.evaluate(X_test, y_test, verbose=0)
print('Test loss:', score[0])

# Plotando a acurácia
plt.plot(history.history['accuracy'])
plt.plot(history.history['val_accuracy'])
plt.title('model accuracy')
plt.ylabel('accuracy')
plt.xlabel('epoch')
plt.legend(['train', 'test'], loc='upper left')
plt.show()

# Plotando a perda
plt.plot(history.history['loss'])
plt.plot(history.history['val_loss'])
plt.title('model loss')
plt.ylabel('loss')
plt.xlabel('epoch')
plt.legend(['train', 'test'], loc='upper left')
plt.show()

# Predições
predictions = model.predict(X_test)
print(predictions)
#plot 



In [None]:
# Teste com modelo pre-treinado RESNET50
from tensorflow.keras.applications import ResNet50
from tensorflow.keras.models import Model
from tensorflow.keras.layers import GlobalAveragePooling2D

# Data

# Processamento
from tensorflow.keras.applications.resnet50 import preprocess_input

def train_model_resnet50(model
    # Pre processamento
    X_train = preprocess_input(X_train_o)
    X_test = preprocess_input(X_test_o)

    # Carregando o modelo
    resnet50 = ResNet50(weights='imagenet', include_top=False, input_shape=(IMG_SIZE, IMG_SIZE, 3))

    # Congelando as camadas
    for layer in resnet50.layers:
        layer.trainable = False

    # Head do modelo
    x = resnet50.output
    x = GlobalAveragePooling2D()(x)
    x = Dense(1024, activation='relu')(x)
    x = Dense(512, activation='relu')(x)
    preds = Dense(1, activation='sigmoid')(x)

    # LR Scheduler function
    # Linear warmup, then cosine decay
    def lrate(epoch):
        # Warmup
        wrmup = 10
        peak = 0.00005
        stay = 10
        lenght_decay = 60
        if epoch < wrmup:
            return peak * (epoch+1) / wrmup
        elif epoch < wrmup + stay:
            return peak
        else:
            return peak/2 * (1 + np.cos((epoch-wrmup) / (  lenght_decay - stay - wrmup) * np.pi))
        
    # PLot the learning rate for the first 100 epochs
    lrs = [lrate(i) for i in range(60)]
    plt.plot(lrs)
    plt.show()

    # Callbacks
    lrate = tf.keras.callbacks.LearningRateScheduler(lrate, verbose=1)

    # Early stopping
    early_stopping = tf.keras.callbacks.EarlyStopping(monitor='val_loss', patience=10, verbose=1, mode='auto')

    # Checkpoint
    checkpoint = tf.keras.callbacks.ModelCheckpoint('model.h5', monitor='val_loss', save_best_only=True, verbose=1)

    # Otimizador usando callback
    sgd = tf.keras.optimizers.SGD(lr=0.0, momentum=0.9)

    # Modelo
    model = Model(inputs=resnet50.input, outputs=preds)

    # Visualizando o modelo
    model.summary()

    # Compilando o modelo
    model.compile(optimizer=sgd, loss='binary_crossentropy', metrics=['accuracy'])

    # Treinando o modelo
    history = model.fit(X_train, y_train, epochs=60, validation_data=(X_test, y_test), callbacks=[lrate, early_stopping, checkpoint])

    # Avaliando o modelo
    score = model.evaluate(X_test, y_test, verbose=0)
    print('Test loss:', score[0])

    # Plotando a acurácia
    plt.plot(history.history['accuracy'])
    plt.plot(history.history['val_accuracy'])
    plt.title('model accuracy')
    plt.ylabel('accuracy')
    plt.xlabel('epoch')

    plt.legend(['train', 'test'], loc='upper left')
    plt.show()

    # Plotando a perda
    plt.plot(history.history['loss'])
    plt.plot(history.history['val_loss'])
    plt.title('model loss')
    plt.ylabel('loss')
    plt.xlabel('epoch')

    plt.legend(['train', 'test'], loc='upper left')
    plt.show()

    # Predições
    predictions = model.predict(X_test)
    print(predictions)

    # Save history in csv'
    da
    training_info = 'resnet50+
    filename = 'history-'+training_info+'.csv'
    history_df = pd.DataFrame(history.history)
    history_df.to_csv('history.csv', index=False)


In [None]:

# Avaliando o modelo
score = model.evaluate(X_test, y_test, verbose=0)
print('Test loss:', score[0])

# Plotando a acurácia
plt.plot(history.history['accuracy'])
plt.plot(history.history['val_accuracy'])
plt.title('model accuracy')
plt.ylabel('accuracy')
plt.xlabel('epoch')

plt.legend(['train', 'test'], loc='upper left')
plt.show()

# Plotando a perda
plt.plot(history.history['loss'])
plt.plot(history.history['val_loss'])
plt.title('model loss')
plt.ylabel('loss')
plt.xlabel('epoch')

plt.legend(['train', 'test'], loc='upper left')
plt.show()

# Predições
predictions = model.predict(X_test)
print(predictions)

In [None]:
# Teste com modelo pre-treinado MOBILENETV2
from tensorflow.keras.applications import MobileNetV2
from tensorflow.keras.models import Model
from tensorflow.keras.layers import GlobalAveragePooling2D

# Processamento
from tensorflow.keras.applications.mobilenet_v2 import preprocess_input

# Pre processamento
X_train = preprocess_input(X_train_o)
X_test = preprocess_input(X_test_o)


# Carregando o modelo
mobilenetv2 = MobileNetV2(weights='imagenet', include_top=False, input_shape=(IMG_SIZE, IMG_SIZE, 3))

# Congelando as camadas
for layer in mobilenetv2.layers:
    layer.trainable = False

# Head do modelo
x = mobilenetv2.output
x = GlobalAveragePooling2D()(x)
x = Dense(1024, activation='relu')(x)
x = Dense(512, activation='relu')(x)
preds = Dense(1, activation='sigmoid')(x)

# LR Scheduler function
# Linear warmup, then cosine decay
def lrate(epoch):
    # Warmup
    wrmup = 10
    peak = 0.00005
    stay = 10
    lenght_decay = 60
    if epoch < wrmup:
        return peak * (epoch+1) / wrmup
    elif epoch < wrmup + stay:
        return peak
    else:
        return peak/2 * (1 + np.cos((epoch-wrmup) / (  lenght_decay - stay - wrmup) * np.pi))
    
# PLot the learning rate for the first 100 epochs
lrs = [lrate(i) for i in range(60)]
plt.plot(lrs)
plt.show()

# Callbacks
lrate = tf.keras.callbacks.LearningRateScheduler(lrate, verbose=1)

# Early stopping
early_stopping = tf.keras.callbacks.EarlyStopping(monitor='val_loss', patience=10, verbose=1, mode='auto')

# Checkpoint
checkpoint = tf.keras.callbacks.ModelCheckpoint('model.h5', monitor='val_loss', save_best_only=True, verbose=1)

# Otimizador usando callback

sgd = tf.keras.optimizers.SGD(lr=0.0, momentum=0.9)

# Modelo
model = Model(inputs=mobilenetv2.input, outputs=preds)

# Visualizando o modelo
model.summary()

# Compilando o modelo
model.compile(optimizer=sgd, loss='binary_crossentropy', metrics=['accuracy'])

# Treinando o modelo
history = model.fit(X_train, y_train, epochs=60, validation_data=(X_test, y_test), callbacks=[lrate, early_stopping, checkpoint])

# Avaliando o modelo
score = model.evaluate(X_test, y_test, verbose=0)
print('Test loss:', score[0])


In [1]:


# Import models 
from tensorflow.keras.applications import VGG16, ResNet50, MobileNetV2
# Vit models
from vit_keras import vit
from vit_keras import utils as vit_utils

# Pre processing layers
from tensorflow.keras.applications.vgg16 import preprocess_input as preprocess_input_vgg16
from tensorflow.keras.applications.resnet50 import preprocess_input as preprocess_input_resnet50
from tensorflow.keras.applications.mobilenet_v2 import preprocess_input as preprocess_input_mobilenetv2 


# Function for bulk training 
# Function for resnet50
def train_model(X_train, X_test, y_train, y_test, X_val, y_val , training_info, model, callbacks , optimizer, loss, metrics, epochs):
   
    # Compile model
    model.compile(optimizer=optimizer, loss=loss, metrics=metrics)
    # Train model
    history = model.fit(X_train, y_train, epochs=epochs, validation_data=(X_test, y_test), callbacks=callbacks)
    # Save history in csv
    filename = 'history-'+training_info+'.csv'
    history_df = pd.DataFrame(history.history)
    history_df.to_csv(filename, index=False)
    
def train_model_resnet50(X_train, X_test, y_train, y_test, X_val, y_val , lrfunc, training_info , epochs):
   # Model
    resnet50 = ResNet50(weights='imagenet', include_top=False, input_shape=(IMG_SIZE, IMG_SIZE, 3))
    # Congelando as camadas
    for layer in resnet50.layers:
        layer.trainable = False
    # Head do modelo
    x = resnet50.output
    x = GlobalAveragePooling2D()(x)
    x = Dense(1024, activation='relu')(x)
    x = Dense(512, activation='relu')(x)
    preds = Dense(1, activation='sigmoid')(x)
    # Modelo
    model = Model(inputs=resnet50.input, outputs=preds)
    
    # Pre processamento
    X_train = preprocess_input_resnet50(X_train)
    X_test = preprocess_input_resnet50(X_test)
    X_val = preprocess_input_resnet50(X_val)
    # Otimizador usando callback
    sgd = tf.keras.optimizers.SGD(lr=0.0, momentum=0.9)


    # LR Scheduler
    lrate = tf.keras.callbacks.LearningRateScheduler(lrfunc, verbose=1)
    # Early stopping
    early_stopping = tf.keras.callbacks.EarlyStopping(monitor='val_loss', patience=10, verbose=1, mode='auto')
    # Checkpoint
    checkpoint = tf.keras.callbacks.ModelCheckpoint('model.h5', monitor='val_loss', save_best_only=True, verbose=1)
    # Train model
    train_model(X_train, X_test, y_train, y_test, X_val, y_val, training_info, model, [lrate, early_stopping, checkpoint], sgd, 'binary_crossentropy', ['accuracy'], epochs)


def train_model_vgg16(X_train, X_test, y_train, y_test, X_val, y_val , lrfunc, training_info , epochs):
   # Model
    vgg16 = VGG16(weights='imagenet', include_top=False, input_shape=(IMG_SIZE, IMG_SIZE, 3))
    # Congelando as camadas
    for layer in vgg16.layers:
        layer.trainable = False
    # Head do modelo
    x = vgg16.output
    x = GlobalAveragePooling2D()(x)
    x = Dense(1024, activation='relu')(x)
    x = Dense(512, activation='relu')(x)
    preds = Dense(1, activation='sigmoid')(x)
    # Modelo
    model = Model(inputs=vgg16.input, outputs=preds)
    
    # Pre processamento
    X_train = preprocess_input_vgg16(X_train)
    X_test = preprocess_input_vgg16(X_test)
    X_val = preprocess_input_vgg16(X_val)

    # Otimizador usando callback
    sgd = tf.keras.optimizers.SGD(lr=0.0, momentum=0.9)


    # LR Scheduler
    lrate = tf.keras.callbacks.LearningRateScheduler(lrfunc, verbose=1)
    # Early stopping
    early_stopping = tf.keras.callbacks.EarlyStopping(monitor='val_loss', patience=10, verbose=1, mode='auto')
    # Checkpoint
    checkpoint = tf.keras.callbacks.ModelCheckpoint('model.h5', monitor='val_loss', save_best_only=True, verbose=1)
    # Train model
    train_model(X_train, X_test, y_train, y_test, X_val, y_val, training_info, model, [lrate, early_stopping, checkpoint], sgd, 'binary_crossentropy', ['accuracy'], epochs)

def train_model_mobilenetv2(X_train, X_test, y_train, y_test, X_val, y_val , lrfunc, training_info , epochs):
   # Model
    mobilenetv2 = MobileNetV2(weights='imagenet', include_top=False, input_shape=(IMG_SIZE, IMG_SIZE, 3))
    # Congelando as camadas
    for layer in mobilenetv2.layers:
        layer.trainable = False
    # Head do modelo
    x = mobilenetv2.output
    x = GlobalAveragePooling2D()(x)
    x = Dense(1024, activation='relu')(x)
    x = Dense(512, activation='relu')(x)
    preds = Dense(1, activation='sigmoid')(x)
    # Modelo
    model = Model(inputs=mobilenetv2.input, outputs=preds)
    
    # Pre processamento
    X_train = preprocess_input_mobilenetv2(X_train)
    X_test = preprocess_input_mobilenetv2(X_test)
    X_val = preprocess_input_mobilenetv2(X_val)

    # Otimizador usando callback
    sgd = tf.keras.optimizers.SGD(lr=0.0, momentum=0.9)


    # LR Scheduler
    lrate = tf.keras.callbacks.LearningRateScheduler(lrfunc, verbose=1)
    # Early stopping
    early_stopping = tf.keras.callbacks.EarlyStopping(monitor='val_loss', patience=10, verbose=1, mode='auto')
    # Checkpoint
    checkpoint = tf.keras.callbacks.ModelCheckpoint('model.h5', monitor='val_loss', save_best_only=True, verbose=1)
    # Train model
    train_model(X_train, X_test, y_train, y_test, X_val, y_val, training_info, model, [lrate, early_stopping, checkpoint], sgd, 'binary_crossentropy', ['accuracy'], epochs)

def train_model_vit(X_train, X_test, y_train, y_test, X_val, y_val , lrfunc, training_info , epochs):
    # Model
    vit_model = vit.vit_b16(
    image_size = IMG_SIZE,
    activation = 'sigmoid',
    pretrained = True,
    include_top = True,
    pretrained_top = False,
    classes = 2
    )

    # Pre processamento
    X_train = vit.preprocess_inputs(X_train)
    X_test = vit.preprocess_inputs(X_test)
    X_val = vit.preprocess_inputs(X_val)

    
    #Only the head is trained
    vit_model.trainable = True
    for layer in vit_model.layers[:-1]:
        layer.trainable = False


    # Otimizador usando callback
    sgd = tf.keras.optimizers.SGD(lr=0.0, momentum=0.9)

    # LR Scheduler
    lrate = tf.keras.callbacks.LearningRateScheduler(lrfunc, verbose=1)
    # Early stopping
    early_stopping = tf.keras.callbacks.EarlyStopping(monitor='val_loss', patience=10, verbose=1, mode='auto')
    # Checkpoint
    checkpoint = tf.keras.callbacks.ModelCheckpoint('model.h5', monitor='val_loss', save_best_only=True, verbose=1)
    # Train model
    train_model(X_train, X_test, y_train, y_test, X_val, y_val, training_info, vit_model, [lrate, early_stopping, checkpoint], sgd, 'binary_crossentropy', ['accuracy'], epochs)




# Train 10 models with different lr peaks

# Resnet50
for i in range(10):

    def lrate(epoch):
        # Warmup
        wrmup = 20
        peak = 0.0001 + 0.000005*(i-5)
        stay = 0
        lenght_decay = 60
        if epoch < wrmup:
            return peak * (epoch+1) / wrmup
        elif epoch < wrmup + stay:
            return peak
        else:
            return peak/2 * (1 + np.cos((epoch-wrmup) / (  lenght_decay - stay - wrmup) * np.pi))

    train_model_resnet50(X_train, X_test, y_train, y_test, X_test, y_test, lrate, 'resnet50-'+str(i), 60)

# VGG16
for i in range(10):

    def lrate(epoch):
        # Warmup
        wrmup = 20
        peak = 0.00005*(i+1)
        stay = 10
        lenght_decay = 60
        if epoch < wrmup:
            return peak * (epoch+1) / wrmup
        elif epoch < wrmup + stay:
            return peak
        else:
            return peak/2 * (1 + np.cos((epoch-wrmup) / (  lenght_decay - stay - wrmup) * np.pi))

    train_model_vgg16(X_train, X_test, y_train, y_test, X_test, y_test, lrate, 'vgg16-'+str(i), 60)

# MobileNetV2
for i in range(10):

    def lrate(epoch):
        # Warmup
        wrmup = 20
        peak = 0.00005*(i+1)
        stay = 10
        lenght_decay = 60
        if epoch < wrmup:
            return peak * (epoch+1) / wrmup
        elif epoch < wrmup + stay:
            return peak
        else:
            return peak/2 * (1 + np.cos((epoch-wrmup) / (  lenght_decay - stay - wrmup) * np.pi))

    train_model_mobilenetv2(X_train, X_test, y_train, y_test, X_test, y_test, lrate, 'mobilenetv2-'+str(i), 60)

# ViT
for i in range(10):

    def lrate(epoch):
        # Warmup
        wrmup = 20
        peak = 0.00005*(i+1)
        stay = 10
        lenght_decay = 60
        if epoch < wrmup:
            return peak * (epoch+1) / wrmup
        elif epoch < wrmup + stay:
            return peak
        else:
            return peak/2 * (1 + np.cos((epoch-wrmup) / (  lenght_decay - stay - wrmup) * np.pi))

    train_model_vit(X_train, X_test, y_train, y_test, X_test, y_test, lrate, 'vit-'+str(i), 60)

    
    




TensorFlow Addons (TFA) has ended development and introduction of new features.
TFA has entered a minimal maintenance and release mode until a planned end of life in May 2024.
Please modify downstream libraries to take dependencies from other repositories in our TensorFlow community (e.g. Keras, Keras-CV, and Keras-NLP). 

For more information see: https://github.com/tensorflow/addons/issues/2807 



NameError: name 'X_train' is not defined