In [1]:
import pandas as pd
import numpy as np
import csv
def merge_intervals(intervals):
    if not intervals:
        return []

    # Сортировка по начальному времени
    intervals.sort(key=lambda x: x[0])
    merged = [intervals[0]]

    for current in intervals[1:]:
        prev_start, prev_end = merged[-1]
        curr_start, curr_end = current

        # Проверка на перекрытие или соприкосновение
        if curr_start <= prev_end:
            # Объединяем интервалы
            merged[-1] = (prev_start, max(prev_end, curr_end))
        else:
            merged.append(current)

    return merged

detections = pd.read_csv('../dataset/test_dataset_by_Kirill/pred_VB003_140707_102351.csv')
detections_call = detections[detections['label'] == 'call']
# Не нужно использовать merge_intervals сейчас он только вредит
predicted_intervals = (list(zip(detections_call['start_s'], detections_call['end_s'])))

# 2. Парсинг true_label.txt вручную
# true_intervals = []
# with open('../dataset/test_dataset_by_Kirill/oleboe ozero Utrish 2010.csv', 'r') as f:
#     for line in f:
#         parts = line.strip().split('\t')
#         if len(parts) < 2:
#             continue
#         start_str = parts[0].replace(',', '.')
#         end_str = parts[1].replace(',', '.')
#         start = float(start_str)
#         end = float(end_str)
#         true_intervals.append((start, end))

true_intervals = []
with open('../dataset/test_dataset_by_Kirill/VB003_140707_102351.csv', 'r', encoding='utf-8') as f:
    reader = csv.DictReader(f)
    for row in reader:
        if row['Class'].strip().lower() == 'киты не орут':
            continue
        start = float(row['Start'].replace(',', '.'))
        end = float(row['End'].replace(',', '.'))
        true_intervals.append((start, end))
# 3 IoU Глобально
iou_global = 0.3
# Выводим для проверки
print("True intervals (call):", true_intervals)
print("Detected intervals (call):", predicted_intervals)

True intervals (call): [(1.1538548583984376, 2.1971201171875), (2.92789111328125, 3.06732421875), (3.144240234375, 3.278865966796875), (3.6923125, 4.16346875), (4.16346875, 4.30290185546875), (4.76444384765625, 4.91349169921875), (5.20194970703125, 5.83657568359375), (6.89906982421875, 7.00482958984375), (7.61058935546875, 8.269251953125), (8.6538779296875, 9.3125400390625), (10.1538779296875, 10.9182998046875), (13.7740595703125, 14.033673828125), (14.2307939453125, 14.6202275390625), (16.31734765625, 17.11061328125), (18.0433125, 18.774083984375), (19.552927734375, 20.307734375), (26.586556640625, 28.153857421875), (29.350978515625, 30.201931640625), (35.346171875, 35.509640625), (36.1153984375, 36.34616796875), (38.1923125, 39.0576875), (40.56730078125, 41.65383203125), (41.65383203125, 43.14421875), (43.66825390625, 44.4422890625), (44.9999765625, 46.15383203125), (47.58172265625, 49.43748046875), (49.84614453125, 51.5769140625), (52.03845703125, 52.4278671875), (52.4278671875, 52.

In [2]:
import numpy as np
from tqdm import tqdm

def calculate_iou(interval_a, interval_b):
    start_a, end_a = interval_a
    start_b, end_b = interval_b
    
    intersection = max(0, min(end_a, end_b) - max(start_a, start_b))
    union = (end_a - start_a) + (end_b - start_b) - intersection
    return intersection / union if union > 0 else 0.0

def calculate_ap(true_segments, pred_segments, iou_thresholds):
    aps = []
    
    for thresh in iou_thresholds:
        # Сопоставление предсказаний с истинными интервалами
        matched_true = set()
        tp = np.zeros(len(pred_segments))
        fp = np.zeros(len(pred_segments))
        
        for i, pred in enumerate(pred_segments):
            best_iou = 0.0
            best_idx = -1
            for j, true in enumerate(true_segments):
                if j in matched_true:
                    continue
                iou = calculate_iou(pred, true)
                if iou > best_iou:
                    best_iou = iou
                    best_idx = j
            
            if best_iou >= thresh:
                matched_true.add(best_idx)
                tp[i] = 1
            else:
                fp[i] = 1

        # Рассчитываем precision-recall кривую
        tp_cumsum = np.cumsum(tp)
        fp_cumsum = np.cumsum(fp)
        
        recalls = tp_cumsum / len(true_segments)
        precisions = tp_cumsum / (tp_cumsum + fp_cumsum + 1e-12)
        
        # Интерполяция precision для 101 точки
        interp_precision = np.zeros(101)
        for r in range(101):
            precision_vals = precisions[recalls >= r/100]
            interp_precision[r] = max(precision_vals) if len(precision_vals) > 0 else 0
        
        # Вычисляем AP как среднее значение precision
        ap = np.mean(interp_precision)
        aps.append(ap)
    
    return aps

def evaluate_metrics(true_segments, pred_segments, iou_threshold=iou_global): #Важный коэф надо покрутить
    true_positives = 0
    matched_true_indices = set()
    
    for pred in pred_segments:
        best_iou = 0.0
        best_true_idx = -1
        
        for i, true in enumerate(true_segments):
            if i in matched_true_indices:
                continue
                
            iou = calculate_iou(pred, true)
            if iou > best_iou:
                best_iou = iou
                best_true_idx = i
                
        if best_iou >= iou_threshold:
            true_positives += 1
            matched_true_indices.add(best_true_idx)
    
    false_positives = len(pred_segments) - true_positives
    false_negatives = len(true_segments) - true_positives
    #print(true_positives, false_negatives, false_positives)
    precision = true_positives / (true_positives + false_positives) if (true_positives + false_positives) > 0 else 0.0
    recall = true_positives / (true_positives + false_negatives) if (true_positives + false_negatives) > 0 else 0.0
    f1_score = 2 * (precision * recall) / (precision + recall) if (precision + recall) > 0 else 0.0
    
    return {
        'Precision': precision,
        'Recall': recall,
        'F1-score': f1_score,
        'AP50': calculate_ap(true_segments, pred_segments, [0.5])[0],
        'AP75': calculate_ap(true_segments, pred_segments, [0.75])[0],
        'AP50-95': np.mean(calculate_ap(true_segments, pred_segments, np.arange(0.5, 1.0, 0.05)))
    }

# Пример использования
"""
true_intervals = [
(0.592229  ,1.262479),
(7.773479  ,8.491604),
(29.272560  ,29.863020),
(33.896339  ,34.614460),
(36.090611  ,36.593288),
(37.319401  ,37.806129),
(49.592899  ,50.366879),
(51.084999  ,51.866959),
(52.304119  ,52.727020),
(64.454803  ,65.069199),
(66.313957  ,66.872513),
(67.590607  ,68.133209),
(104.301201  ,105.083199),
(111.745796  ,112.288399),
(113.694801  ,114.321503)
]
predicted_intervals = [
 (0.496, 1.328),
 (7.834666666666666, 8.802666666666667),
 (29.24266666666667, 30.466666666666665),
 (33.91466666666667, 34.909333333333336),
 (35.89066666666667, 36.792),
 (37.42133333333334, 37.824),
 (43.58133333333333, 43.59733333333333),
 (49.584, 50.42133333333334),
 (50.99466666666667, 52.925333333333334),
 (64.39466666666667, 65.02666666666667),
 (66.344, 68.11733333333333),
 (104.37333333333333, 105.34933333333333),
 (111.75733333333334, 112.25333333333333),
 (113.58666666666667, 114.30133333333333),
 (115.55733333333333, 115.576)  
]
"""
metrics = evaluate_metrics(true_intervals, predicted_intervals)
for metric, value in metrics.items():
    print(f"{metric}: {value:.4f}")

Precision: 0.1010
Recall: 0.8015
F1-score: 0.1794
AP50: 0.0331
AP75: 0.0043
AP50-95: 0.0105


In [3]:
def bootstrap_confidence_intervals(true_segments, pred_segments, iou_threshold=iou_global,
                                  n_bootstrap=100, confidence_level=0.95):
    # Модифицируем для новых метрик
    n_true = len(true_segments)
    n_pred = len(pred_segments)
    
    metrics_samples = {
        'Precision': [],
        'Recall': [],
        'F1-score': [],
        'AP50': [],
        'AP75': [],
        'AP50-95': []
    }
    
    for _ in tqdm(range(n_bootstrap), desc="Bootstrapping"):
        true_bootstrap = [true_segments[i] for i in np.random.choice(n_true, n_true, replace=True)]
        pred_bootstrap = [pred_segments[i] for i in np.random.choice(n_pred, n_pred, replace=True)]
        
        metrics = evaluate_metrics(true_bootstrap, pred_bootstrap, iou_threshold)
        for k in metrics_samples:
            metrics_samples[k].append(metrics[k])
    
    # Вычисление квантилей для всех метрик
    alpha = (1 - confidence_level) / 2
    ci = {}
    
    for metric, samples in metrics_samples.items():
        ci[metric + "_CI"] = np.percentile(samples, [alpha*100, (1-alpha)*100])
    
    return ci

bootstrap = bootstrap_confidence_intervals(true_intervals, predicted_intervals)
print(bootstrap)
for metric in bootstrap:
    print(f"{metric}: {bootstrap[metric]}")
    
import math
from scipy.stats import norm

def wilson_interval(k, n, alpha=0.05):
    """Wilson score interval для k успехов в n испытаниях."""
    z = norm.ppf(1 - alpha/2)
    p = k / n
    denom = 1 + z*z/n
    centre = p + z*z/(2*n)
    half = z * math.sqrt(p*(1-p)/n + z*z/(4*n*n))
    lo = (centre - half) / denom
    hi = (centre + half) / denom
    return lo, hi

def evaluate_wilson_ci(true_segments, pred_segments, iou_threshold = iou_global, alpha= 0.05):
    true_positives = 0
    matched_true_indices = set()
    
    for pred in pred_segments:
        best_iou = 0.0
        best_true_idx = -1
        
        for i, true in enumerate(true_segments):
            if i in matched_true_indices:
                continue
                
            iou = calculate_iou(pred, true)
            if iou > best_iou:
                best_iou = iou
                best_true_idx = i
                
        if best_iou >= iou_threshold:
            true_positives += 1
            matched_true_indices.add(best_true_idx)
    
    false_positives = len(pred_segments) - true_positives
    false_negatives = len(true_segments) - true_positives
    
    n_p = true_positives + false_positives
    n_r = true_positives + false_negatives
    p_hat = true_positives / n_p if n_p > 0 else 0.0
    r_hat = true_positives / n_r if n_r > 0 else 0.0

    # 2) Интервалы Уилсона для precision и recall
    p_lo, p_hi = wilson_interval(true_positives, true_positives+false_positives, alpha=alpha)
    r_lo, r_hi = wilson_interval(true_positives, true_positives+false_negatives, alpha=alpha)  # recall CI

    # 3) Оценка F1 и её производные
    if p_hat + r_hat == 0:
        f1_hat = 0.0
        se_f1 = 0.0
    else:
        f1_hat = 2 * p_hat * r_hat / (p_hat + r_hat)
        # δ-метод: Var[F1] ≈ (∂g/∂p)² Var[p] + (∂g/∂r)² Var[r]
        # где g(p,r)=2pr/(p+r)
        dg_dp = 2 * r_hat**2 / (p_hat + r_hat)**2
        dg_dr = 2 * p_hat**2 / (p_hat + r_hat)**2
        var_p = p_hat * (1 - p_hat) / n_p
        var_r = r_hat * (1 - r_hat) / n_r
        var_f1 = dg_dp**2 * var_p + dg_dr**2 * var_r
        se_f1 = math.sqrt(var_f1)

    # 4) Нормальный CI для F1
    z = norm.ppf(1 - alpha/2)
    f1_lo = max(0.0, f1_hat - z * se_f1)
    f1_hi = min(1.0, f1_hat + z * se_f1)
    
    return {
        'Precision_Ci_Wilson': ( p_lo, p_hi),
        'Recall_Ci_Wilson': (r_lo, r_hi),
        'F1_Ci_Wilson': (f1_lo, f1_hi)
    }

wilson = evaluate_wilson_ci(true_intervals, predicted_intervals)
for metric in wilson:
    print(f"{metric}: {wilson[metric]}")


Bootstrapping: 100%|██████████| 100/100 [00:36<00:00,  2.71it/s]


{'Precision_CI': array([0.08477757, 0.10379981]), 'Recall_CI': array([0.67261029, 0.82352941]), 'F1-score_CI': array([0.15057613, 0.18436214]), 'AP50_CI': array([0.02103887, 0.06454697]), 'AP75_CI': array([0.00045268, 0.00542346]), 'AP50-95_CI': array([0.00517834, 0.01721552])}
Precision_CI: [0.08477757 0.10379981]
Recall_CI: [0.67261029 0.82352941]
F1-score_CI: [0.15057613 0.18436214]
AP50_CI: [0.02103887 0.06454697]
AP75_CI: [0.00045268 0.00542346]
AP50-95_CI: [0.00517834 0.01721552]
Precision_Ci_Wilson: (np.float64(0.08443002714064556), np.float64(0.12043972273233976))
Recall_Ci_Wilson: (np.float64(0.726559586132398), np.float64(0.8598187360870597))
F1_Ci_Wilson: (np.float64(0.1510122562553934), np.float64(0.20783548037012098))


In [None]:
import os
import csv
import pandas as pd

def process_interval_files(directory, negative_classes, evaluate_intervals, output_file='evaluation_results.csv'):
    results = []

    for filename in os.listdir(directory):
        if not filename.endswith('.csv') or filename.startswith('pred_'):
            continue

        true_filepath = os.path.join(directory, filename)
        pred_filename = 'pred_' + filename
        pred_filepath = os.path.join(directory, pred_filename)

        if not os.path.exists(pred_filepath):
            print(f"[!] Пропущено: Нет файла {pred_filename}")
            continue

        # --- Истинные интервалы ---
        true_intervals = []
        with open(true_filepath, 'r', encoding='utf-8') as f:
            reader = csv.DictReader(f)
            for row in reader:
                if row['Class'].strip().lower() in [cls.lower() for cls in negative_classes]:
                    continue
                start = float(row['Start'].replace(',', '.'))
                end = float(row['End'].replace(',', '.'))
                true_intervals.append((start, end))

        # --- Предсказанные интервалы ---
        detections = pd.read_csv(pred_filepath)
        if 'label' not in detections.columns or 'start_s' not in detections.columns or 'end_s' not in detections.columns:
            print(f"[!] Пропущено: Неверный формат в {pred_filename}")
            continue

        detections_call = detections[detections['label'] == 'call']
        predicted_intervals = list(zip(detections_call['start_s'], detections_call['end_s']))

        # --- Вызов внешней функции ---
        metrics = evaluate_intervals(true_intervals, predicted_intervals, filename)
        if metrics is None:
            continue  # Пропустить если метрики не вернулись

        results.append({
            'Filename': filename,
            'Precision_CI_Low': metrics['Precision_Ci_Wilson'][0],
            'Precision_CI_High': metrics['Precision_Ci_Wilson'][1],
            'Recall_CI_Low': metrics['Recall_Ci_Wilson'][0],
            'Recall_CI_High': metrics['Recall_Ci_Wilson'][1],
            'F1_CI_Low': metrics['F1_Ci_Wilson'][0],
            'F1_CI_High': metrics['F1_Ci_Wilson'][1]
        })

    # --- Сохранение в CSV ---
    if results:
        out_path = os.path.join(directory, output_file)
        df = pd.DataFrame(results)
        df.to_csv(out_path, index=False, encoding='utf-8-sig')
        print(f"\n Результаты сохранены в: {out_path}")
    else:
        print("\nН ет подходящих файлов или результатов.")

process_interval_files(
    '../dataset/test_dataset_by_Kirill',
    negative_classes=['киты не орут', 'шум', 'не киты', 'некиты', "обрезки, шумы не брать", ""],
    evaluate_intervals=evaluate_wilson_ci,
    output_file='evaluation_results.csv'
)