5 DIFFERENT CNN 

In [None]:
import os
import tensorflow as tf

os.environ['TF_FORCE_GPU_ALLOW_GROWTH'] = 'true'

# Configurazione manuale (più sicura)
gpus = tf.config.list_physical_devices('GPU')
if gpus:
    try:
        for gpu in gpus:
            tf.config.experimental.set_memory_growth(gpu, True)
        print("MEmory Growth attivata su GPU")
    except RuntimeError as e:
        print(e)

        
from tensorflow.keras import layers, models, Input
from tensorflow.keras.callbacks import EarlyStopping
from tensorflow.keras.utils import to_categorical
from sklearn.utils import class_weight
from sklearn.metrics import classification_report
from sklearn.metrics import r2_score
print(tf.__version__)
print(tf.config.list_physical_devices('GPU'))
import matplotlib.pyplot as plt
import pandas as pd
import cv2 
import itertools
import numpy as np
from sklearn.model_selection import train_test_split
import random
from sklearn.metrics import mean_absolute_error
import gc

try:
    tf.config.experimental.enable_op_determinism()
    print("Op Determinism Abilitato!")
except AttributeError:
    print("Attenzione: La tua versione di TF è troppo vecchia per enable_op_determinism.")

def reset_seeds(seed=42):
    os.environ['PYTHONHASHSEED'] = str(seed)
    random.seed(seed)
    np.random.seed(seed)
    tf.random.set_seed(seed)
SEEDS = [42,555]

load dataset

In [None]:
def load_dataset():
    folder='dataset/images'
    data=[]
    for filename in sorted(os.listdir(folder)):   
        img_path=os.path.join(folder,filename)
        img=cv2.imread(img_path) #opencv save in bgr
        data.append({
            'image':img,
            'filename':filename
        })

    data_map = {item['filename']: item['image'] for item in data}
    label=pd.read_csv('dataset/raw/bbx_annotations.csv')
    
    coords = ['xmin', 'xmax', 'ymin', 'ymax']
    label[coords] = label[coords].astype(float)
    
    upper_mask = label['filename'].str.lower().str.contains('upper')
    label.loc[upper_mask, ['xmin', 'xmax', 'ymin', 'ymax']] /= 2
    
    for i,item in enumerate(data):
        if "upper" in item["filename"].lower():
            data[i]['image']=cv2.resize(
                data[i]['image'],
                (data[i]['image'].shape[1]//2,data[i]['image'].shape[0]//2)
                ,interpolation=cv2.INTER_AREA
            )
    return data, label

data modelling

In [None]:
def dataset_modelling(data,annotation):
    dataset_df = pd.DataFrame(data) 

    
    merged = pd.merge(dataset_df, annotation, on='filename')
    
    merged['area'] = (merged['xmax'] - merged['xmin']) * (merged['ymax'] - merged['ymin'])
    merged = merged.sort_values(by=['filename', 'class', 'area'], ascending=[True, True, False])
    merged = merged.drop_duplicates(subset=['filename', 'class'], keep='first')
    label_map={'goalpost':1,
               'ball':2,
               'robot':3,
               'goalspot':4,
               'centerspot':5}
    temp_class_id=[]
    center_list=[]
    for i,row in merged.iterrows():
        cls_id = label_map.get(row['class'], 0)
        temp_class_id.append(cls_id)

        xmin, xmax = row['xmin'], row['xmax']
        ymin, ymax = row['ymin'], row['ymax']
        cx = (xmin + xmax) / 2.0 / 320.0 
        cy = (ymin + ymax) / 2.0 / 240.0
        center=(cx,cy)
        center_list.append(center)

    merged['class']=temp_class_id
    merged = merged.drop(columns=['xmin', 'ymin', 'xmax', 'ymax','area'])   
    merged['target']=center_list

    df_goalpost   = merged[merged['class'] == 1].copy()
    df_ball       = merged[merged['class'] == 2].copy()
    df_robot      = merged[merged['class'] == 3].copy()
    df_goalspot   = merged[merged['class'] == 4].copy()
    df_centerspot = merged[merged['class'] == 5].copy()

    train_df_gp, test_df_gp = train_test_split(df_goalpost, test_size=0.2, random_state=42)
    train_df_b, test_df_b = train_test_split(df_ball, test_size=0.2, random_state=42)
    train_df_r, test_df_r = train_test_split(df_robot, test_size=0.2, random_state=42)
    train_df_gs, test_df_gs = train_test_split(df_goalspot, test_size=0.2, random_state=42)
    train_df_cs, test_df_cs = train_test_split(df_centerspot, test_size=0.2, random_state=42)

    test_frames = [test_df_gp, test_df_b, test_df_r, test_df_gs, test_df_cs]
    combined_test_df = pd.concat(test_frames, ignore_index=True)
    combined_test_df = combined_test_df.sample(frac=1, random_state=42).reset_index(drop=True)

    x_train_gp = np.array(train_df_gp['image'].tolist()).astype('float32') / 255.0
    y_train_gp = np.array(train_df_gp['target'].tolist()).astype('float32')
    
    x_train_b = np.array(train_df_b['image'].tolist()).astype('float32') / 255.0    
    y_train_b = np.array(train_df_b['target'].tolist()).astype('float32')

    x_train_r = np.array(train_df_r['image'].tolist()).astype('float32') / 255.0
    y_train_r = np.array(train_df_r['target'].tolist()).astype('float32')

    x_train_gs = np.array(train_df_gs['image'].tolist()).astype('float32') / 255.0
    y_train_gs = np.array(train_df_gs['target'].tolist()).astype('float32')

    x_train_cs = np.array(train_df_cs['image'].tolist()).astype('float32') / 255.0
    y_train_cs = np.array(train_df_cs['target'].tolist()).astype('float32')
   
    x_test_class = np.array(combined_test_df['class'].tolist()).astype('float32')
    x_test_img = np.array(combined_test_df['image'].tolist()).astype('float32') / 255.0
    y_test_target = np.array(combined_test_df['target'].tolist()).astype('float32')

    return x_train_gp,y_train_gp,x_train_b,y_train_b,x_train_r,y_train_r,x_train_gs,y_train_gs,x_train_cs,y_train_cs,x_test_class,x_test_img, y_test_target

hyper search

In [None]:
def create_hyperparam_combination():
    param_grid = {
    'batch_size': [16],
    'layer_number':[3,5],
    'kernel_dim': [7],
    'pool_dim': [3,2], 
    'lr': [0.0001],
    'fc1' : [256],
    'fc2': [0,256],
    'activation':['relu'],
    'dropout':[0.2],
    'loss':['mae','mse']
      
}

    #every possible combination
    keys, values = zip(*param_grid.items())
    combinations = list(itertools.product(*values))
    combinations_dicts = [dict(zip(keys, v)) for v in combinations]
    return combinations_dicts

building the model

In [None]:
def build_model(layer_num,kernel_dim,pool_dim,fc1,fc2,activation,dropout):
    model=models.Sequential()
    model.add(tf.keras.Input(shape=(240,320,3)))

    for i in range(layer_num):
        kernel_number=16*(2**i)
        model.add(layers.Conv2D(kernel_number,(kernel_dim,kernel_dim),activation=activation,padding='same'))
        model.add(layers.AveragePooling2D((pool_dim,pool_dim),strides=2,padding='same'))
        model.add(layers.Dropout(dropout))
    model.add(layers.GlobalAveragePooling2D())
    if fc1>0:
        model.add(layers.Dense(fc1,activation='relu'))
    if fc2>0:
        model.add(layers.Dense(fc2,activation='relu'))
    model.add(layers.Dense(2,activation='sigmoid'))

    #model.summary()
    return model


saving csv

In [None]:
def save_csv(mae,param,seed,all_results,r2,r2_model,mae_model):
    current_result = param.copy()  
    current_result['mae_Score']=mae
    current_result['R2_Score']=r2
    current_result['R2_gp']=r2_model[0]
    current_result['R2_b']=r2_model[1]
    current_result['R2_r']=r2_model[2]
    current_result['R2_gs']=r2_model[3]
    current_result['R2_cs']=r2_model[4]
    current_result['mae_gp']=mae_model[0]
    current_result['mae_b']=mae_model[1]
    current_result['mae_r']=mae_model[2]
    current_result['mae_gs']=mae_model[3]
    current_result['mae_cs']=mae_model[4]



    all_results.append(current_result)

    pd.DataFrame(all_results).to_csv(f'regression/csv/search/multi_cnn2_data_aug_{seed}.csv', index=False)

visualizing data

In [None]:
def visualize_only_centers(model_gp,model_b,model_r,model_gs,model_cs, x_test_img,x_test_class, y_test,seed):
    index_list=[10,34,80,120,25,48,72,60,50]
    for index in index_list:
        img_norm = x_test_img[index].copy()  
        
        real_targets = y_test[index]
        
        sample_img_batch = x_test_img[index:index+1]
        if x_test_class[index]==1:
            model=model_gp
        elif x_test_class[index]==2:
            model=model_b
        elif x_test_class[index]==3:
            model=model_r
        elif x_test_class[index]==4:
            model=model_gs
        else:
            model=model_cs

        pred_targets = model.predict(sample_img_batch, verbose=0)[0]
        
        if img_norm.max() <= 1.0:
            img_display = (img_norm * 255).astype(np.uint8)
        else:
            img_display = img_norm.astype(np.uint8)
            
        img_display = cv2.cvtColor(img_display, cv2.COLOR_BGR2RGB)
        
        IMG_H, IMG_W = img_display.shape[:2]
        
        print(f"\n--- Analisi Centri Immagine {index} ---")

        def draw_points(targets, color, label):
            for i in range(0, len(targets), 2):
                if i+1 >= len(targets): break
                
                xc = targets[i]
                yc = targets[i+1]
                
                if xc < 0.01 and yc < 0.01: continue
                
                xc_px = int(xc * IMG_W)
                yc_px = int(yc * IMG_H)
                
                cv2.circle(img_display, (xc_px, yc_px), 3, color, -1)
                
                cv2.circle(img_display, (xc_px, yc_px), 5, (255, 255, 255), 1)
                
                print(f"{label} {i//2+1}: ({xc_px}, {yc_px}) px")

        draw_points(real_targets, (0, 255, 0), "Reale")    # Verde
        draw_points(pred_targets, (255, 0, 0), "Predetto") # Rosso

        plt.figure(figsize=(6,6))
        plt.imshow(img_display)
        plt.title(f"Img {index}: green=REAL  red=PREDICTION")
        plt.axis('off')
        
        save_path=f'regression/images/predvstrue/data_aug/multi_cnn2_img_{index}_seed{seed}.png'
        
        plt.savefig(save_path, bbox_inches='tight') 
        
        plt.show()
        plt.close()

In [None]:
import numpy as np

def data_aug(x_train_gp, y_train_gp, x_train_b, y_train_b, x_train_r, y_train_r, x_train_gs, y_train_gs, x_train_cs, y_train_cs):
    x_list = [x_train_gp, x_train_b, x_train_r, x_train_gs, x_train_cs]
    y_list = [y_train_gp, y_train_b, y_train_r, y_train_gs, y_train_cs]
    class_names = ["GoalPost", "Ball", "Robot", "GoalSpot", "CenterSpot"]
    x_aug_list = []
    y_aug_list = []

    def augment_single(x, y):
        rng = np.random.RandomState(42)    
        
        x_flipped = np.flip(x, axis=2)
        y_flipped = y.copy()
        y_flipped[:, 0] = 1.0 - y_flipped[:, 0] 
        
        noise = rng.normal(loc=0.0, scale=0.02, size=x.shape)
        x_noisy = np.clip(x + noise, 0., 1.)
        
        factors = rng.uniform(0.1, 0.3, size=(x.shape[0], 1, 1, 1))
        x_bright = np.clip(x + factors, 0.0, 1.0)
        
        contrast_fact = rng.uniform(0.8, 1.5, size=(x.shape[0], 1, 1, 1))
        mean = np.mean(x, axis=(1, 2), keepdims=True)
        x_contrast = (x - mean) * contrast_fact + mean
        x_contrast = np.clip(x_contrast, 0.0, 1.0)
        
        
        x_final = np.concatenate([x, x_flipped, x_noisy, x_bright, x_contrast], axis=0)
        y_final = np.concatenate([y, y_flipped, y, y, y], axis=0) 
        
        indices = np.arange(x_final.shape[0])
        rng.shuffle(indices)
        
        return x_final[indices], y_final[indices]

    for i in range(len(x_list)):
        original_count = x_list[i].shape[0]
        name = class_names[i]
        
        x_a, y_a = augment_single(x_list[i], y_list[i])
        
        aug_count = x_a.shape[0]
        print(f"[{name}] Originali: {original_count} -> Aumentati: {aug_count} (x{aug_count/original_count:.0f})")
        
        x_aug_list.append(x_a)
        y_aug_list.append(y_a)

    return (x_aug_list[0], y_aug_list[0], 
            x_aug_list[1], y_aug_list[1], 
            x_aug_list[2], y_aug_list[2],
            x_aug_list[3], y_aug_list[3], 
            x_aug_list[4], y_aug_list[4])

main

In [None]:
data,annotation=load_dataset()
x_gp, y_gp, x_b, y_b, x_r, y_r, x_gs, y_gs, x_cs, y_cs, x_test_class, x_test_img, y_test = dataset_modelling(data, annotation)
x_t_gp, x_v_gp, y_t_gp, y_v_gp = train_test_split(x_gp, y_gp, test_size=0.2, random_state=42)
x_t_b,  x_v_b,  y_t_b,  y_v_b  = train_test_split(x_b,  y_b,  test_size=0.2, random_state=42)
x_t_r,  x_v_r,  y_t_r,  y_v_r  = train_test_split(x_r,  y_r,  test_size=0.2, random_state=42)
x_t_gs, x_v_gs, y_t_gs, y_v_gs = train_test_split(x_gs, y_gs, test_size=0.2, random_state=42)
x_t_cs, x_v_cs, y_t_cs, y_v_cs = train_test_split(x_cs, y_cs, test_size=0.2, random_state=42)
x_t_gp_aug, y_t_gp_aug, x_t_b_aug,  y_t_b_aug, x_t_r_aug,  y_t_r_aug, x_t_gs_aug, y_t_gs_aug, x_t_cs_aug, y_t_cs_aug = data_aug(x_t_gp, y_t_gp, x_t_b, y_t_b, x_t_r, y_t_r, x_t_gs, y_t_gs, x_t_cs, y_t_cs)
combination=create_hyperparam_combination()
total_combinations = len(combination)
final_results = []

for seed in SEEDS:
    print(f"\n ================== INIZIO CICLO CON SEED: {seed} ================== ")
    all_results = []
    bestmae=100000
    best_r2=-1000000
    current_seed_best_params = {}
    for i,param in enumerate(combination):

        print(f"ITERAZIONE {i+1} of {total_combinations}")
        print(f"PARAMETRI ATTUALI: {param}")  

        tf.keras.backend.clear_session()
        gc.collect()
        reset_seeds(seed)

        model_gp=build_model( param['layer_number'],param['kernel_dim'],param['pool_dim'],param['fc1'],param['fc2'],param['activation'],param['dropout'])
        model_b=build_model( param['layer_number'],param['kernel_dim'],param['pool_dim'],param['fc1'],param['fc2'],param['activation'],param['dropout'])
        model_r=build_model( param['layer_number'],param['kernel_dim'],param['pool_dim'],param['fc1'],param['fc2'],param['activation'],param['dropout'])
        model_gs=build_model( param['layer_number'],param['kernel_dim'],param['pool_dim'],param['fc1'],param['fc2'],param['activation'],param['dropout'])
        model_cs=build_model( param['layer_number'],param['kernel_dim'],param['pool_dim'],param['fc1'],param['fc2'],param['activation'],param['dropout'])

        list_model=[model_gp, model_b, model_r, model_gs, model_cs]
        
        for model in list_model:
            model.compile(optimizer=tf.keras.optimizers.Adam(learning_rate=param['lr']),
                        loss=param['loss'],
                        )
        early_stop_gp = EarlyStopping(monitor='val_loss', patience=15, restore_best_weights=True)            
        history_gp=model_gp.fit(
            x_t_gp_aug, y_t_gp_aug,      
            validation_data=(x_v_gp, y_v_gp),
            epochs=120,
            batch_size=param['batch_size'], 
            validation_split=0.2, 
            callbacks=[early_stop_gp]
                )
        early_stop_b = EarlyStopping(monitor='val_loss', patience=15, restore_best_weights=True)
        history_b=model_b.fit(
            x_t_b_aug, y_t_b_aug,
            validation_data=(x_v_b, y_v_b),
            epochs=120,
            batch_size=param['batch_size'], 
            validation_split=0.2, 
            callbacks=[early_stop_b]
                )
        early_stop_r = EarlyStopping(monitor='val_loss', patience=15, restore_best_weights=True)
        history_r=model_r.fit(
            x_t_r_aug, y_t_r_aug,
            validation_data=(x_v_r, y_v_r),
            epochs=120,
            batch_size=param['batch_size'], 
            validation_split=0.2, 
            callbacks=[early_stop_r]
                )
        early_stop_gs = EarlyStopping(monitor='val_loss', patience=15, restore_best_weights=True)
        history_gs=model_gs.fit(
            x_t_gs_aug, y_t_gs_aug,
            validation_data=(x_v_gs, y_v_gs),
            epochs=120,
            batch_size=param['batch_size'], 
            validation_split=0.2, 
            callbacks=[early_stop_gs]
                )
        early_stop_cs = EarlyStopping(monitor='val_loss', patience=15, restore_best_weights=True)
        history_cs=model_cs.fit(
            x_t_cs_aug, y_t_cs_aug,
            validation_data=(x_v_cs, y_v_cs),
            epochs=120,
            batch_size=param['batch_size'], 
            validation_split=0.2, 
            callbacks=[early_stop_cs]
                )
        
        prediction=[]
        for i,x_sample in enumerate(x_test_img):
            x_sample = np.expand_dims(x_sample, axis=0)
            label=x_test_class[i]
            if label==1:
                pred=model_gp.predict(x_sample,verbose=0)
            elif label==2:
                pred=model_b.predict(x_sample,verbose=0)
            elif label==3:
                pred=model_r.predict(x_sample,verbose=0)
            elif label==4:
                pred=model_gs.predict(x_sample,verbose=0)
            else:
                pred=model_cs.predict(x_sample,verbose=0)
            prediction.append(pred[0])

        prediction=np.array(prediction)
        mae = mean_absolute_error(y_test, prediction)
        r2=r2_score(y_test,prediction)

        classes_map = {1: 'GoalPost', 2: 'Ball', 3: 'Robot', 4: 'GoalSpot', 5: 'CenterSpot'}
        models_list = {1: model_gp, 2: model_b, 3: model_r, 4: model_gs, 5: model_cs}

        print("\n--- Performance per Class ---")
        mae_model=[]
        r2_model=[]
        for class_id, class_name in classes_map.items():
            mask = (x_test_class == class_id)
            
            x_test_specific = x_test_img[mask]
            y_test_specific = y_test[mask]
            
            model_specific = models_list[class_id]
            pred_specific = model_specific.predict(x_test_specific, verbose=0)
            
            r2_specific = r2_score(y_test_specific, pred_specific)
            r2_model.append(r2_specific)
            mae_specific = mean_absolute_error(y_test_specific, pred_specific)
            mae_model.append(mae_specific)
        if mae<bestmae:
            bestmae=mae
            current_seed_best_params = param.copy()
            best_r2=r2
            model_gp.save_weights(f'regression/weights/best_gp_seed{seed}.weights.h5')
            model_b.save_weights(f'regression/weights/best_b_seed{seed}.weights.h5')
            model_r.save_weights(f'regression/weights/best_r_seed{seed}.weights.h5')
            model_gs.save_weights(f'regression/weights/best_gs_seed{seed}.weights.h5')
            model_cs.save_weights(f'regression/weights/best_cs_seed{seed}.weights.h5')
        save_csv(mae,param,seed,all_results,r2,r2_model,mae_model)

    p = current_seed_best_params

    model_gp=build_model( p['layer_number'],p['kernel_dim'],p['pool_dim'],p['fc1'],p['fc2'],p['activation'],p['dropout'])
    model_b=build_model( p['layer_number'],p['kernel_dim'],p['pool_dim'],p['fc1'],p['fc2'],p['activation'],p['dropout'])
    model_r=build_model( p['layer_number'],p['kernel_dim'],p['pool_dim'],p['fc1'],p['fc2'],p['activation'],p['dropout'])
    model_gs=build_model( p['layer_number'],p['kernel_dim'],p['pool_dim'],p['fc1'],p['fc2'],p['activation'],p['dropout'])
    model_cs=build_model( p['layer_number'],p['kernel_dim'],p['pool_dim'],p['fc1'],p['fc2'],p['activation'],p['dropout'])

    current_seed_best_params['seed'] = seed
    current_seed_best_params['best_mae_score'] = bestmae
    current_seed_best_params['best_r2_score'] = best_r2

    model_gp.load_weights(f'regression/weights/best_gp_seed{seed}.weights.h5')
    model_b.load_weights(f'regression/weights/best_b_seed{seed}.weights.h5')
    model_r.load_weights(f'regression/weights/best_r_seed{seed}.weights.h5')
    model_gs.load_weights(f'regression/weights/best_gs_seed{seed}.weights.h5')
    model_cs.load_weights(f'regression/weights/best_cs_seed{seed}.weights.h5')

    print(current_seed_best_params)
    visualize_only_centers(model_gp,model_b,model_r,model_gs,model_cs, x_test_img,x_test_class ,y_test,seed)
   
    final_results.append(current_seed_best_params)


df_best_results = pd.DataFrame(final_results)
df_best_results.to_csv('regression/csv/best_model/multicnn2_dataaug.csv', index=False)

