<a href="https://colab.research.google.com/github/rafaelolimpioamaro/TreinamentoCS-I/blob/main/Rafael_WildFire.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [2]:
import os
os.environ["TF_USE_LEGACY_KERAS"] = "1"

In [3]:
import zipfile
with zipfile.ZipFile('/content/Dataset_completo.zip', 'r') as zip_ref:
    zip_ref.extractall('/content/')

In [4]:
import math, requests
from pathlib import Path
import tensorflow as tf
from tensorflow.keras import Model
from tensorflow.keras.models import Sequential
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.layers import Dense, GlobalAveragePooling2D
from tensorflow.keras.applications import MobileNetV2
from tensorflow.keras.callbacks import ModelCheckpoint, EarlyStopping
from tensorflow.keras.applications.mobilenet_v2 import preprocess_input
import numpy as np
import random
from sklearn.utils.class_weight import compute_class_weight

# Diretórios do dataset
train_dir = "/content/Dataset_completo/train"
val_dir = "/content/Dataset_completo/valid"
test_dir = "/content/Dataset_completo/test"

# Caminho dos pesos pré-treinados
WEIGHTS_PATH = '/content/MobileNetV2.0_05.96x96.color.bsize_64.lr_0_05.epoch_574.val_loss_4.22.hdf5'

# Baixar os pesos se não estiverem disponíveis
root_url = 'https://cdn.edgeimpulse.com/'
p = Path(WEIGHTS_PATH)
if not p.exists():
    print(f"Pretrained weights {WEIGHTS_PATH} unavailable; downloading...")
    if not p.parent.exists():
        p.parent.mkdir(parents=True)
    weights_data = requests.get(root_url + WEIGHTS_PATH[2:]).content
    with open(WEIGHTS_PATH, 'wb') as f:
        f.write(weights_data)
    print(f"Pretrained weights {WEIGHTS_PATH} downloaded successfully")


# Parâmetros
img_size = (96, 96)  # Alterado para 96x96 conforme seu primeiro código
batch_size = 64
epochs = 10
learning_rate = 0.001
fine_tune_epochs = 20  # Número de épocas para o fine-tuning
fine_tune_percentage = 65  # Porcentagem das camadas a serem descongeladas no fine-tuning

# Data augmentation e geradores de dados
datagen = ImageDataGenerator(
    preprocessing_function=preprocess_input,
    validation_split=0.2,
    rotation_range=20,
    width_shift_range=0.2,
    height_shift_range=0.2,
    shear_range=0.2,
    zoom_range=0.2,
    horizontal_flip=True,
    fill_mode="nearest",
)

#val_datagen = ImageDataGenerator(
#    preprocessing_function=preprocess_input,
#    validation_split=0.2,
#)

train_generator = datagen.flow_from_directory(
    train_dir,
    target_size=img_size,
    batch_size=batch_size,
    class_mode='binary',
    subset=None
)

val_generator = datagen.flow_from_directory(
    val_dir,
    target_size=img_size,
    batch_size=batch_size,
    class_mode='binary',
    subset=None
)

test_generator = datagen.flow_from_directory(
    test_dir,
    target_size=img_size,
    batch_size=batch_size,
    class_mode='binary',
    subset=None,
    shuffle=False,
)
# Obter os rótulos das amostras
labels = train_generator.classes

class_counts = train_generator.class_indices
print(f"Número de amostras por classe no conjunto de treinamento: {class_counts}")

# Calcular os pesos das classes
class_weights = compute_class_weight(
    class_weight='balanced',
    classes=np.unique(labels),
    y=labels
)

# Converter o array de pesos para um dicionário
class_weights_dict = dict(enumerate(class_weights))
print(f"Pesos calculados para as classes: {class_weights_dict}")

Found 7095 images belonging to 2 classes.
Found 406 images belonging to 2 classes.
Found 203 images belonging to 2 classes.
Número de amostras por classe no conjunto de treinamento: {'fire': 0, 'nofire': 1}
Pesos calculados para as classes: {0: 1.0106837606837606, 1: 0.9895397489539749}


In [5]:
# Carregando o modelo MobileNetV2 com alpha=0.05
base_model = tf.keras.applications.MobileNetV2(
    input_shape=(96, 96, 3),
    alpha=0.05,
    include_top=True,
    weights=WEIGHTS_PATH
)
print(base_model.summary())
last_layer_index = -6
tf.keras.backend.clear_session()
base_model = Model(inputs=base_model.input, outputs=base_model.layers[last_layer_index].output)
print(base_model.summary())
# Congelando as camadas base do modelo
base_model.trainable = False

# Adicionando camadas de classificação
x = base_model.output
x = GlobalAveragePooling2D()(x)  # Usando GlobalAveragePooling2D
x = Dense(8, activation='relu')(x)  # Adicionando a camada densa
x = Dense(1, activation='sigmoid')(x)  # Usando sigmoid para binary classification

# Criando o modelo final
tf.keras.backend.clear_session()
model = Model(inputs=base_model.input, outputs=x)
# model.summary()

Model: "mobilenetv2_0.05_96"
__________________________________________________________________________________________________
 Layer (type)                Output Shape                 Param #   Connected to                  
 input_1 (InputLayer)        [(None, 96, 96, 3)]          0         []                            
                                                                                                  
 Conv1 (Conv2D)              (None, 48, 48, 8)            216       ['input_1[0][0]']             
                                                                                                  
 bn_Conv1 (BatchNormalizati  (None, 48, 48, 8)            32        ['Conv1[0][0]']               
 on)                                                                                              
                                                                                                  
 Conv1_relu (ReLU)           (None, 48, 48, 8)            0         ['bn_Conv1[0

In [6]:

# Compilando o modelo
tf.keras.backend.clear_session()
model.compile(optimizer=Adam(learning_rate=learning_rate), loss='binary_crossentropy', metrics=['accuracy'])

# Callbacks para salvar o melhor modelo e para early stopping
callbacks = [
    ModelCheckpoint('best_model.keras', monitor='val_accuracy', save_best_only=True, mode='max'),
    EarlyStopping(monitor='val_accuracy', patience=10, restore_best_weights=True ,mode='max')
]

# Treinamento do modelo
model.fit(
    train_generator,
    validation_data=val_generator,
    epochs=epochs,
    callbacks=callbacks,
    class_weight=class_weights_dict
)

# Fine-tuning
print("Iniciando o fine-tuning...")
# Carregando o melhor modelo obtido durante o treinamento inicial
model.load_weights('best_model.keras')

# Descongelando uma porcentagem das camadas da base para fine-tuning
total_layers = len(base_model.layers)
fine_tune_from = int(total_layers * (1 - fine_tune_percentage / 100))

for layer in base_model.layers[:fine_tune_from]:
    layer.trainable = False
for layer in base_model.layers[fine_tune_from:]:
    layer.trainable = True

# Recompilando o modelo com uma taxa de aprendizado menor
tf.keras.backend.clear_session()
model.compile(optimizer=Adam(learning_rate=learning_rate * 0.1), loss='binary_crossentropy', metrics=['accuracy'])

# Fine-tuning do modelo
model.fit(
    train_generator,
    validation_data=val_generator,
    epochs=fine_tune_epochs,
    callbacks=callbacks,
    class_weight=class_weights_dict
)


Epoch 1/10
Epoch 2/10
Epoch 3/10
Epoch 4/10
Epoch 5/10
Epoch 6/10
Epoch 7/10
Epoch 8/10
Epoch 9/10
Epoch 10/10
Iniciando o fine-tuning...
Epoch 1/20
Epoch 2/20
Epoch 3/20
Epoch 4/20
Epoch 5/20
Epoch 6/20
Epoch 7/20
Epoch 8/20
Epoch 9/20
Epoch 10/20
Epoch 11/20
Epoch 12/20
Epoch 13/20
Epoch 14/20
Epoch 15/20
Epoch 16/20
Epoch 17/20
Epoch 18/20
Epoch 19/20
Epoch 20/20


<tf_keras.src.callbacks.History at 0x78446814f2e0>

In [7]:
# Salvando o modelo
model.save('/content/Rafael_fire_detection_model_MobileNetV2.keras')

In [31]:
# Número total de amostras no conjunto de validação
num_samples = test_generator.samples
steps = math.ceil(num_samples / batch_size)

print(f"Número de amostras no gerador de validação: {num_samples}")
images, labels_batch = next(test_generator)
print(f"Dimensões do lote de imagens: {images.shape}")
print(f"Rótulos do lote: {labels_batch}")
predictions_batch = model.predict(images)
print(f"Predições do lote: {predictions_batch}")

labels = test_generator.classes  # Obtém os rótulos diretamente

# Realiza as predições de uma só vez no conjunto de validação
predictions = model.predict(test_generator, steps=steps)

# Verifica o tamanho de predictions e ajusta se necessário
predictions = predictions[:num_samples]

pred_class = predictions > 0.5

Número de amostras no gerador de validação: 203
Dimensões do lote de imagens: (11, 96, 96, 3)
Rótulos do lote: [1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1.]
Predições do lote: [[0.8968417 ]
 [0.99684817]
 [0.9828856 ]
 [0.99970394]
 [0.999933  ]
 [0.04478434]
 [0.9644956 ]
 [0.8997434 ]
 [0.99560636]
 [0.03429947]
 [0.28131768]]


In [9]:
from sklearn.metrics import confusion_matrix, classification_report

# Certifica-se de que labels e pred_class têm o mesmo número de amostras
if len(labels) == len(pred_class):
    print(confusion_matrix(labels, pred_class))
    print(classification_report(labels, pred_class))
else:
    print(f"Inconsistência detectada: {len(labels)} rótulos, {len(pred_class)} predições")

[[ 85   3]
 [  4 111]]
              precision    recall  f1-score   support

           0       0.96      0.97      0.96        88
           1       0.97      0.97      0.97       115

    accuracy                           0.97       203
   macro avg       0.96      0.97      0.96       203
weighted avg       0.97      0.97      0.97       203



In [10]:
# Função para dataset representativo para quantização
def representative_dataset():
    limite = 100
    generator = train_generator
    generator.batch_size = limite
    for i, data in enumerate(generator):
        if i >= limite:
            break
        images = data[0]
    for i in range(limite):
        yield({"input_1": np.expand_dims(images[i], axis=[0])})

In [11]:
tf.keras.backend.clear_session()
model.summary()

Model: "model"
__________________________________________________________________________________________________
 Layer (type)                Output Shape                 Param #   Connected to                  
 input_1 (InputLayer)        [(None, 96, 96, 3)]          0         []                            
                                                                                                  
 Conv1 (Conv2D)              (None, 48, 48, 8)            216       ['input_1[0][0]']             
                                                                                                  
 bn_Conv1 (BatchNormalizati  (None, 48, 48, 8)            32        ['Conv1[0][0]']               
 on)                                                                                              
                                                                                                  
 Conv1_relu (ReLU)           (None, 48, 48, 8)            0         ['bn_Conv1[0][0]']        

In [12]:
# Escolhendo a quantização
quantization_type = 'int8'

if quantization_type == 'int8':
    # Converter o modelo para TensorFlow Lite
    converter = tf.lite.TFLiteConverter.from_keras_model(model)
    converter.optimizations = [tf.lite.Optimize.DEFAULT]  # Quantização do peso
    converter.representative_dataset = representative_dataset
    converter.target_spec.supported_ops = [tf.lite.OpsSet.TFLITE_BUILTINS_INT8]
    converter.inference_input_type = tf.int8
    converter.inference_output_type = tf.int8

    tflite_model = converter.convert()

    # Salvando o modelo convertido
    with open('Rafael_fire_detection_model_MobileNetV2(int8).tflite', 'wb') as f:
        f.write(tflite_model)

print("Modelo quantizado (INT8) e salvo como TensorFlow Lite!")



Modelo quantizado (INT8) e salvo como TensorFlow Lite!


In [13]:
i8model = "Rafael_fire_detection_model_MobileNetV2(int8).tflite"

In [46]:
# Obter todas as imagens e rótulos do test_generator
test_images, test_labels = zip(*(test_generator[i] for i in range(len(test_generator))))
test_images = np.vstack(test_images)  # Combina todas as imagens em um array grande
test_labels = np.hstack(test_labels)  # Combina todos os rótulos em um array grande


In [47]:
import numpy as np
import tensorflow as tf

# Função de quantização da entrada
def quantize_input(input_data, input_details):
    scale, zero_point = input_details[0]['quantization']
    quantized_input = np.round(input_data / scale + zero_point).astype(np.int8)
    return quantized_input

# Função de desquantização da saída
def dequantize_output(quantized_output, output_details):
    scale, zero_point = output_details[0]['quantization']
    real_output = scale * (quantized_output.astype(np.float32) - zero_point)
    return real_output

# Função para realizar inferência no modelo
def evaluate(filepath, X, from_logits=True):
    # Carregar o modelo TensorFlow Lite
    interpreter = tf.lite.Interpreter(model_path=filepath)
    interpreter.allocate_tensors()

    # Obter os detalhes das entradas e saídas
    input_details = interpreter.get_input_details()
    output_details = interpreter.get_output_details()

    # Verificar formato da entrada
    input_shape = input_details[0]['shape']
    assert (X.shape[1:] == input_shape[1:]).all(), "As dimensões da entrada não correspondem."

    # Verificar se o modelo é quantizado
    input_type = input_details[0]['dtype']
    is_quantized = input_type == np.int8

    output_data = []

    # Realizar inferências em cada entrada
    for input_data in X:
        # Quantizar se necessário
        if is_quantized:
            input_data = quantize_input(input_data, input_details)

        # Expandir dimensão para incluir o batch size
        input_data = np.expand_dims(input_data, axis=0).astype(input_type)

        # Definir a entrada para o modelo
        interpreter.set_tensor(input_details[0]['index'], input_data)
        interpreter.invoke()

        # Obter a saída do modelo
        logits = interpreter.get_tensor(output_details[0]['index'])

        # Desquantizar se necessário
        if is_quantized:
            logits = dequantize_output(logits, output_details)

        output_data.append(logits)

    # Verificar se temos dados de saída
    if len(output_data) == 0:
        return None

    # Remover dimensões desnecessárias
    output_data = np.squeeze(output_data)

    # Retornar dados de saída
    if from_logits:
        return output_data
    else:
        # Verificar a dimensão antes de usar np.argmax
        if len(output_data.shape) > 1 and output_data.shape[1] > 1:
            return np.argmax(output_data, axis=1)
        else:
            # Caso a saída seja unidimensional (e.g., classificação binária)
            return (output_data > 0.5).astype(int)  # Retorna 0 ou 1 para binário


In [48]:
labels.shape


(203,)

In [49]:
test_images.shape

(203, 96, 96, 3)

In [50]:
y_pred_i8 = evaluate(i8model, test_images, False)
print(y_pred_i8)
y_pred_i8_logits = evaluate(i8model, test_images, True)
print(classification_report(test_labels, y_pred_i8, digits=3))

[0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0
 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 1 0 0 0 0 0 0
 0 0 0 0 0 0 1 0 0 0 0 0 0 0 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1
 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 0 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1
 1 0 1 1 1 1 1 1 1 0 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1
 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 0 0]
              precision    recall  f1-score   support

         0.0      0.945     0.977     0.961        88
         1.0      0.982     0.957     0.969       115

    accuracy                          0.966       203
   macro avg      0.964     0.967     0.965       203
weighted avg      0.966     0.966     0.966       203

