**Implementazione della rete per classificazione EfficientNETB0**

Sarà implementata e testata la rete EfficientNetB0-Lite, attraverso un training 80/20 e successivamente sarà quantizzata.

In [1]:
# Monto drive Google

from google.colab import drive
drive.mount('/content/gdrive', force_remount=True)

Mounted at /content/gdrive


In [2]:
# Installazione delle librerie necessarie

!pip install tensorflow
!pip install tensorflow-hub
!pip install opencv-python
!pip install opencv-python-headless



In [3]:

# Importazioni necessarie
import os
import shutil
import cv2
import numpy as np
from sklearn.model_selection import train_test_split
import tensorflow as tf
import tensorflow_hub as hub
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from tensorflow.keras import layers

# Funzione per estrarre i frame dai video
def extract_frames(video_path, output_folder, frame_rate=1):
    if not os.path.exists(output_folder):
        os.makedirs(output_folder)

    cap = cv2.VideoCapture(video_path)
    count = 0
    success = True

    while success:
        cap.set(cv2.CAP_PROP_POS_MSEC, (count * 1000)) # Frame ogni secondo
        success, image = cap.read()
        if success:
            frame_filename = os.path.join(output_folder, f"frame_{count:04d}.jpg")
            cv2.imwrite(frame_filename, image)
            count += 1

    cap.release()



In [4]:
# Funzione per preparare il dataset
def prepare_dataset(base_dir, output_dir, test_size=0.2):
    if not os.path.exists(output_dir):
        os.makedirs(output_dir)

    classes = ['Violence', 'NonViolence']
    all_videos = []
    all_labels = []

    for cls in classes:
        cls_dir = os.path.join(base_dir, cls)
        videos = [os.path.join(cls_dir, f) for f in os.listdir(cls_dir) if f.endswith('.mp4')]
        all_videos.extend(videos)
        all_labels.extend([cls] * len(videos))

    X_train, X_test, y_train, y_test = train_test_split(all_videos, all_labels, test_size=test_size, stratify=all_labels)

    train_dir = os.path.join(output_dir, 'train')
    test_dir = os.path.join(output_dir, 'test')

    for d in [train_dir, test_dir]:
        if not os.path.exists(d):
            os.makedirs(d)
        for label in classes:
            os.makedirs(os.path.join(d, label), exist_ok=True)

    for video, label in zip(X_train, y_train):
        extract_frames(video, os.path.join(train_dir, label))

    for video, label in zip(X_test, y_test):
        extract_frames(video, os.path.join(test_dir, label))



In [5]:
# Creazione del modello
def create_model():
    model = tf.keras.Sequential([
        hub.KerasLayer("https://tfhub.dev/tensorflow/efficientnet/lite0/feature-vector/2",
                       input_shape=(224, 224, 3), trainable=False),
        layers.Dense(128, activation='relu'),
        layers.Dropout(0.2),
        layers.Dense(1, activation='sigmoid')
    ])
    model.compile(optimizer='adam',
                  loss='binary_crossentropy',
                  metrics=['accuracy'])
    return model



In [6]:
import matplotlib.pyplot as plt
import numpy as np
from sklearn.metrics import roc_curve, auc, precision_score, recall_score, classification_report
from keras.callbacks import ModelCheckpoint
from keras.preprocessing.image import ImageDataGenerator

def train_and_evaluate(model, train_dir, test_dir, output_dir):
    train_datagen = ImageDataGenerator(rescale=1./255)
    test_datagen = ImageDataGenerator(rescale=1./255)

    train_generator = train_datagen.flow_from_directory(
        train_dir,
        target_size=(224, 224),
        batch_size=32,
        class_mode='binary',
        shuffle=True)

    validation_generator = test_datagen.flow_from_directory(
        test_dir,
        target_size=(224, 224),
        batch_size=32,
        class_mode='binary',
        shuffle=False)

    # Callback per salvare i pesi del modello
    checkpoint = ModelCheckpoint(
        filepath=output_dir + '/model_epoch_{epoch:02d}.h5',
        save_weights_only=True,
        save_freq='epoch',
        verbose=1
    )

    history = model.fit(
        train_generator,
        steps_per_epoch=train_generator.samples // train_generator.batch_size,
        epochs=10,
        validation_data=validation_generator,
        validation_steps=validation_generator.samples // validation_generator.batch_size,
        callbacks=[checkpoint]
    )

    # Valutazione del modello
    loss, accuracy = model.evaluate(validation_generator)
    print(f'Test Accuracy: {accuracy}')

    # Predizioni e metriche
    y_true = validation_generator.classes
    y_pred = model.predict(validation_generator)
    y_pred_classes = np.argmax(y_pred, axis=1)

    # Calcolo della precision e della recall
    precision = precision_score(y_true, y_pred_classes)
    recall = recall_score(y_true, y_pred_classes)
    print(f'Precision: {precision}')
    print(f'Recall: {recall}')
    print(classification_report(y_true, y_pred_classes))

    # Calcolo della curva ROC
    fpr, tpr, _ = roc_curve(y_true, y_pred)
    roc_auc = auc(fpr, tpr)

    # Tracciare la curva ROC
    plt.figure()
    plt.plot(fpr, tpr, color='darkorange', lw=2, label=f'ROC curve (area = {roc_auc:0.2f})')
    plt.plot([0, 1], [0, 1], color='navy', lw=2, linestyle='--')
    plt.xlim([0.0, 1.0])
    plt.ylim([0.0, 1.05])
    plt.xlabel('False Positive Rate')
    plt.ylabel('True Positive Rate')
    plt.title('Receiver Operating Characteristic')
    plt.legend(loc='lower right')
    plt.show()

# Esempio di chiamata della funzione
# train_and_evaluate(model, '/content/drive/My Drive/dataset/KaggleViolenceNonViolence/train', '/content/drive/My Drive/dataset/KaggleViolenceNonViolence/test', '/content/drive/My Drive/output')


**Conversione del modello al lite**

In [7]:
# Conversione del modello in formato TFLite
def convert_to_tflite(model, output_path='model.tflite'):
    converter = tf.lite.TFLiteConverter.from_keras_model(model)
    tflite_model = converter.convert()
    with open(output_path, 'wb') as f:
        f.write(tflite_model)



In [8]:
# Inferenza con TFLite
def predict_image(interpreter, image_path):
    input_details = interpreter.get_input_details()
    output_details = interpreter.get_output_details()

    img = tf.keras.preprocessing.image.load_img(image_path, target_size=(224, 224))
    img = tf.keras.preprocessing.image.img_to_array(img)
    img = np.expand_dims(img, axis=0) / 255.0

    interpreter.set_tensor(input_details[0]['index'], img)
    interpreter.invoke()

    output_data = interpreter.get_tensor(output_details[0]['index'])
    return output_data[0][0]



**Addestramento e test modello EfficientNet-B0**

In [None]:
# Esecuzione del processo completo
base_dir = '/content/gdrive/My Drive/Dataset/KaggleViolenceNonViolence'  # La cartella che contiene le cartelle Violence e NonViolence
output_dir = '/content/gdrive/My Drive/output'
prepare_dataset(base_dir, output_dir)

model = create_model()
train_and_evaluate(model, os.path.join(output_dir, 'train'), os.path.join(output_dir, 'test'))


**Conversione nella versione lite**

In [None]:
convert_to_tflite(model, 'model.tflite')
interpreter = tf.lite.Interpreter(model_path='model.tflite')
interpreter.allocate_tensors()



"""FACCIO INFERENZA SUL MODELLO LITE """
#prediction = predict_image(interpreter, '/path/to/dataset/test/Violence/frame_0000.jpg')  # Sostituisci con un percorso reale
#print(f'Prediction: {"Violence" if prediction > 0.5 else "NonViolence"}')