### Import libraries

In [10]:
import os, sys, time, re, random
import pandas as pd 
import numpy as np  
import matplotlib.pyplot as plt 
import csv
import ast  

### Define methods

In [11]:
def cal_F1(origin_labels, comparison_labels, threshold=0.5):
    if len(origin_labels) == 0 and len(comparison_labels) == 0:
        return 1.0  
    
    TP, FP, FN = 0, 0, 0
    
    if len(origin_labels) == 0:
        FP = len(comparison_labels)
        return 0.0
    
    if len(comparison_labels) == 0:
        FP = len(origin_labels)
        return 0.0
    
    for fPred in origin_labels:
        fc, fx, fy, fw, fh = fPred
        fArea = fw * fh
        
        maxIOU = 0.0
        for sPred in comparison_labels:
            sc, sx, sy, sw, sh = sPred
            
            if fc != sc:
                continue

            sArea = sw * sh
            
            interArea = max(0, min(fx + fw, sx + sw) - max(fx, sx)) * max(0, min(fy + fh, sy + sh) - max(fy, sy))
            unionArea = fArea + sArea - interArea
            IOU = interArea / unionArea
            maxIOU = max(maxIOU, IOU)
        
        if maxIOU >= threshold:
            TP += 1
        else:
            FP += 1
    
    FN = len(comparison_labels) - TP
    precision = TP / (TP + FP) if (TP + FP) != 0 else 0
    recall = TP / (TP + FN) if (TP + FN) != 0 else 0
    F1 = 2 * (precision * recall) / (precision + recall) if (precision + recall) != 0 else 0
    
    F1 = min(1.0, F1)
    
    return F1

In [12]:
def parse_results(file_path):
    """주어진 file path에 있는 detection file을 parsing하여, list로 변환한다.
    Args:
        file_path (string): detection file path
    Returns:
        List: detected object lists
    """
    with open(file_path, 'r') as file :
        label_list = []
        for line in file :
            line = line.strip().split()
            x, y, w, h = map(float, line[1:5])
            c = int(line[0])
            label_list.append((c, x, y, w, h))
    return label_list

In [13]:
def get_F1(origin_file_path, RL_file_path) :
    origin_labels = parse_results(origin_file_path)
    RL_labels = parse_results(RL_file_path)
    
    return cal_F1(origin_labels, RL_labels)

In [42]:
def get_f1_score(origin_idx_list, RL_idx_list, dataset):
    """원본 frame idx들의 list와 RL이 선택한 frame idx들의 list를 입력받아, RL에 의해 변화한 F1 score를 계산한다.
    Args:
        origin_idx_list (_type_): _description_
        RL_idx_list (_type_): _description_
        detect_path (_type_): _description_
    """
    total_f1_score = 0
    num_frame = 0
    
    detect_path = f"data/detect/{dataset}/labels/"
    detect_file_list = os.listdir(detect_path)
    
    exist_idx_list = [int(file.split("_")[-1].split(".")[0]) for file in detect_file_list]
    exist_idx_list.sort()
    
    for i in range(len(origin_idx_list)):
        origin_idx, RL_idx = origin_idx_list[i], RL_idx_list[i]
        
        if origin_idx == RL_idx:
            if origin_idx in exist_idx_list:
                total_f1_score += 1.0
                num_frame += 1
            
        else:
            if origin_idx in exist_idx_list and RL_idx in exist_idx_list:
                origin_file = f"{detect_path}/{dataset}_{origin_idx}.txt"
                RL_file = f"{detect_path}/{dataset}_{RL_idx}.txt"
                total_f1_score += get_F1(origin_file, RL_file)
                num_frame += 1
                
            elif origin_idx in exist_idx_list and RL_idx not in exist_idx_list:
                total_f1_score += 0.0
                num_frame += 1
                
            elif origin_idx not in exist_idx_list and RL_idx in exist_idx_list:
                total_f1_score += 0.0
                num_frame += 1
        
        
    avg_f1_score = total_f1_score / num_frame
        
    return avg_f1_score

In [46]:
experiment_path = 'data/experiment'

dataset_list = ['JN', 'JK', 'SD']
exp_type_list = ['frame_dropping', 'computation_offloading'] #, 'multi_agent']
frame_drop_method_list = ['LRLO', 'FrameHopper']
computation_offloading_drop_method_list = ['LRLO','JDPCRA', 'TLDOC']

for dataset in dataset_list:
    for exp_type in exp_type_list:
        if exp_type == 'frame_dropping':
            for method in frame_drop_method_list:
                config_file = f'{experiment_path}/{exp_type}/{method}_{dataset}/test_config.csv'
                
                with open(config_file, mode='r', newline='') as file:
                    reader = csv.reader(file)
                    list_data = []  # Store converted lists from each line
                    
                    for row in reader:
                        for item in row:
                            # Check if the item matches the list pattern (e.g., "[1, 2, ...]")
                            if item.startswith('[') and item.endswith(']'):
                                # Safely evaluate the string to convert it into a list
                                list_converted = ast.literal_eval(item)
                                list_data.append(list_converted)
                                
                    for RL_idx_list in list_data:
                        RL_idx_list = [ i+1 for i in RL_idx_list]
                        origin_idx_list = [ i+1 for i in range (len(RL_idx_list)) ]
                        f1_score = get_f1_score(origin_idx_list, RL_idx_list, dataset)
                        print(f'{dataset}\t {exp_type}\t {method} : {f1_score}')
        else:
            for method in computation_offloading_drop_method_list:
                config_file = f'{experiment_path}/{exp_type}/{method}_{dataset}/test_config.csv'
                
                with open(config_file, mode='r', newline='') as file:
                    reader = csv.reader(file)
                    list_data = []  # Store converted lists from each line
                    
                    for row in reader:
                        for item in row:
                            # Check if the item matches the list pattern (e.g., "[1, 2, ...]")
                            if item.startswith('[') and item.endswith(']'):
                                # Safely evaluate the string to convert it into a list
                                list_converted = ast.literal_eval(item)
                                list_data.append(list_converted)
                                
                    for RL_idx_list in list_data:
                        origin_idx_list = [ i+1 for i in range (len(RL_idx_list)) ]
                        f1_score = get_f1_score(origin_idx_list, RL_idx_list, dataset)
                        print(f'{dataset}\t {exp_type}\t {method} : {f1_score}')
            
        print()

JN	 frame_dropping	 LRLO : 0.32470349454244013
JN	 frame_dropping	 LRLO : 0.32000339575046066
JN	 frame_dropping	 LRLO : 0.3057213182894893
JN	 frame_dropping	 LRLO : 0.3028041007449667
JN	 frame_dropping	 LRLO : 0.30035356226086135
JN	 frame_dropping	 LRLO : 0.47654870739977007
JN	 frame_dropping	 LRLO : 0.6235436412089876
JN	 frame_dropping	 LRLO : 0.7474236777715043
JN	 frame_dropping	 LRLO : 0.7552359621055276
JN	 frame_dropping	 FrameHopper : 0.46250388247167457
JN	 frame_dropping	 FrameHopper : 0.7975932554554471

JN	 computation_offloading	 LRLO : 0.26135298468167595
JN	 computation_offloading	 LRLO : 0.2597950219507427
JN	 computation_offloading	 LRLO : 0.2560617473370816
JN	 computation_offloading	 LRLO : 0.26777526854165407
JN	 computation_offloading	 LRLO : 0.24784502422587745
JN	 computation_offloading	 LRLO : 0.368003192428919
JN	 computation_offloading	 LRLO : 0.4489417126526282
JN	 computation_offloading	 LRLO : 0.5571810935373
JN	 computation_offloading	 LRLO : 0.562968