In [None]:
# default_exp metrics

# Custom losses and metrics

In [None]:
# export
from drone_detector.imports import *
from fastai.learner import Metric
from fastai.torch_core import *
from fastai.metrics import *
from fastai.losses import BaseLoss
import sklearn.metrics as skm
import torch
import torch.nn.functional as F

## General metrics to use in training and such

In [None]:
# export
mk_class('ActivationType', **{o:o.lower() for o in ['No', 'Sigmoid', 'Softmax', 'BinarySoftmax']},
         doc="All possible activation classes for `AccumMetric")

In [None]:
#export

def adjusted_R2Score(r2_score, n, k):
    "Calculates adjusted_R2Score based on r2_score, number of observations (n) and number of predictor variables(k)"
    return 1 - (((n-1)/(n-k-1)) * (1 - r2_score))


In [None]:
#export

def _rrmse(inp, targ):
    "RMSE normalized with mean of the target"
    return torch.sqrt(F.mse_loss(inp, targ)) / targ.mean() * 100

rrmse = AccumMetric(_rrmse)
rrmse.__doc__ = "Target mean weighted rmse"

In [None]:
#export
def _bias(inp, targ):
    "Average bias of predictions"
    inp, targ = flatten_check(inp, targ)
    return (inp - targ).sum() / len(targ)

bias = AccumMetric(_bias)
bias.__doc__ = "Average bias of predictions"

In [None]:
#export
def _bias_pct(inp, targ):
    "Mean weighted bias"
    inp, targ = flatten_check(inp, targ)
    return 100 * ((inp-targ).sum()/len(targ)) / targ.mean()

bias_pct = AccumMetric(_bias_pct)
bias_pct.__doc__ = 'Mean weighted bias'

BigEarthNet metrics

In [None]:
#export

def label_ranking_average_precision_score(sigmoid=True, sample_weight=None):
    """Label ranking average precision (LRAP) is the average over each ground truth label assigned to each sample, 
    of the ratio of true vs. total labels with lower score."""
    activation = ActivationType.Sigmoid if sigmoid else ActivationType.No
    return skm_to_fastai(skm.label_ranking_average_precision_score, sample_weight=None, flatten=False, thresh=None, 
                         activation=activation)

In [None]:
# export

def label_ranking_loss(sigmoid=True, sample_weight=None):
    """Compute the average number of label pairs that are incorrectly ordered given y_score 
    weighted by the size of the label set and the number of labels not in the label set."""
    activation = ActivationType.Sigmoid if sigmoid else ActivationType.No
    return skm_to_fastai(skm.label_ranking_loss, sample_weight=None, flatten=False, thresh=None, 
                         activation=activation)

In [None]:
# export

def _one_error(inp, targ):
    max_ranks = inp.argmax(axis=1)
    faults = 0
    for i in range_of(max_ranks):
        faults += targ[i,max_ranks[i]]
    return 1 - torch.true_divide(faults, len(max_ranks))
    
one_error = AccumMetric(_one_error, flatten=False)
one_error.__doc__ = "Rate for which the top ranked label is not among ground truth"

In [None]:
# export

def coverage_error(sigmoid=True, sample_weight=None):
    """Compute how far we need to go through the ranked scores to cover all true labels. 
    The best value is equal to the average number of labels in y_true per sample."""
    
    activation = ActivationType.Sigmoid if sigmoid else ActivationType.No
    return skm_to_fastai(skm.coverage_error, sample_weight=None, flatten=False, thresh=None, activation=activation)

In [None]:
from fastai.learner import Learner
class TstLearner(Learner):
    def __init__(self,dls=None,model=None,**kwargs): self.pred,self.xb,self.yb = None,None,None

def compute_val(met, x1, x2):
    met.reset()
    vals = [0,6,15,20]
    learn = TstLearner()
    for i in range(3):
        learn.pred,learn.yb = x1[vals[i]:vals[i+1]],(x2[vals[i]:vals[i+1]],)
        met.accumulate(learn)
    return met.value

In [None]:
lrap = label_ranking_average_precision_score()
lrl = label_ranking_loss()
cov = coverage_error()

In [None]:
x_1 = torch.randn(10,10)
x_2 = torch.randint(2,(10,10))
x_1, torch.sigmoid(x_1), x_2

(tensor([[ 1.0280, -1.0610, -0.1173, -0.7883, -0.8184,  0.5261, -0.6622,  1.1560,
           0.8449, -0.4004],
         [ 0.4313,  0.8292,  0.4954,  0.8532,  1.3986, -0.3105,  0.1570, -0.6618,
           1.2155, -0.9401],
         [-1.0522, -1.2088,  0.2641,  1.4395,  0.3363,  0.3853,  0.5344,  0.3369,
           0.0724, -2.2468],
         [-1.2122,  0.2162, -1.7888, -0.2223,  0.9674, -1.8560,  0.2529,  0.3523,
           0.8188, -0.7216],
         [-0.1320,  0.2613,  0.5986, -0.3746, -0.8856,  0.4634,  1.7448,  0.4936,
          -0.5136,  0.1216],
         [ 0.6040, -1.0074,  0.0704, -1.6517,  0.4883,  0.4710,  0.7405, -1.4311,
           0.4396, -0.0408],
         [-0.8344,  0.6181,  0.1578,  0.9650, -1.4521, -2.2841,  0.6757, -0.0121,
          -0.7634, -0.1703],
         [-1.0496,  0.3923, -0.4744,  1.2498, -1.4219, -2.0286,  0.9921, -1.0297,
          -1.3029,  0.3966],
         [ 0.5969,  0.7362, -0.4522,  0.2777,  1.2189,  0.0838,  0.3053,  0.7826,
           0.3143, -0.1105],
 

In [None]:
compute_val(lrl, x_1, x_2)

0.4276071428571429

In [None]:
compute_val(lrap, x_1, x_2)

0.6481230158730158

In [None]:
compute_val(cov, x_1, x_2)

8.4

In [None]:
one_error(x_1, x_2)

tensor(0.3000)

## Segmentation metrics

In [None]:
# export

class JaccardCoeffMulti(DiceMulti):
    "Averaged Jaccard coefficient for multiclass target in segmentation. Includes background as one class"
    @property
    def value(self):
        binary_jaccard_scores = np.array([])
        for c in self.inter:
            binary_jaccard_scores = np.append(binary_jaccard_scores, self.inter[c]/(self.union[c] - self.inter[c]) if self.union[c] > 0 else np.nan)
        return np.nanmean(binary_jaccard_scores)

In [None]:
x1 = torch.randn(20,6,3,3)
x2 = torch.randint(0, 6, (20, 3, 3))
pred = x1.argmax(1)

In [None]:
compute_val(JaccardCoeffMulti(), x1, x2)

0.11229420131493545

## Object detection metrics and evaluation for shapefiles

To evaluate our collection of predicted masks, we'll compare each of our predicted masks with each of the available target masks for a given input. 

* A **true positive**, when a prediction-target mask pair has an IoU score which exceeds some predefined threshold
* A **false positive**, when a predicted object had no associated ground truth object mask
* A **false negative** indicates a ground truth object mask had no associated predicted object mask

In the case of multiple detections, the one with the highest confidence is considered to be "correct" and others are FP.

From these, we can get **Precision** and **Recall**

$Precision = \frac{TP}{TP + FP} = \frac{TP}{all \: detections}, Recall = \frac{TP}{TP+FN} = \frac{TP}{all \: ground \: truths}$

And use these to derive other metrics.

Typical metrics include **Average Precision** (AP) and **mean Average Precision** (mAP). From these several metrics can be derived:

* AP50, AP75, AP[.50:.05:.95] are the most common, with AP[.50:.05:.95] being the primary challenge metric in COCO
* AP Across scales: AP<sub>small</sub>, AP<sub>medium</sub>, AP<sub>large</sub>, where small, medium and large have specified areas
    * Scales for COCO are less than 32² for small, between 32² and 96² for medium and more than 96² for large, sizes in pixels
    * Our data has variable resolution sizes, but on average the resolution is around 0.05m, so small is less than 2.56m², medium is between 2.56m² and 23.04m², and large is more than 23.04m²
* **Average Recall** (AR) is also sometimes used similarly, but with restrictions for the number of detections per image   
    * It is computed as the area under Recall-IoU -curve for IoU thresholds from [0.5, 1]
* All of these can be applied to bounding boxes and masks

All the following functions assume that you have two `GeoDataFrame`s that have same CRS and matching column `label`. Usage example is the following: 

```python

ground_truth = gpd.read_file(<path_to_ground_truth>)
results = gpd.read_file(<path_to_results>)

# clip the geodataframes to have same extent
results = gpd.clip(results, box(*ground_truth.total_bounds), keep_geom_type=True)
ground_truth = gpd.clip(ground_truth, box(*results.total_bounds), keep_geom_type=True)

# create spatial index for faster queries                         
res_sindex = results.sindex
gt_sindex = ground_truth.sindex

# TP/FN check with different thresholds, applied to ground truth
tp_cols = [f'TP_{np.round(i, 2)}' for i in np.arange(0.5, 1.04, 0.05)]
ground_truth[tp_cols] = ground_truth.apply(lambda row: is_true_positive(row, results, res_sindex), 
                                           axis=1, result_type='expand')

# TP/FP check with different thresholds, applied to predictions
fp_cols = [f'FP_{np.round(i, 2)}' for i in np.arange(0.5, 1.01, 0.05)]
results[fp_cols] = results.apply(lambda row: is_false_positive(row, ground_truth, gt_sindex, results, res_sindex), 
                                 axis=1, result_type='expand')
```

In [None]:
# export

def poly_IoU(poly_1:Polygon, poly_2:Polygon) -> float:
    "IoU for polygons"
    area_intersection = poly_1.intersection(poly_2).area
    area_union = poly_1.union(poly_2).area
    iou = area_intersection / area_union
    return iou

In [None]:
# export
def is_true_positive(row, results:gpd.GeoDataFrame, res_sindex:gpd.sindex):
    "Check if a single ground truth mask is TP or FN with 11 different IoU thresholds"
    iou_threshs = np.arange(0.5, 1.04, 0.05)
    
    # Matching predictions using spatial index
    c = list(res_sindex.intersection(row.geometry.bounds))
    possible_matches = results.iloc[c].copy()
    
    # No masks -> False negative
    if len(possible_matches) == 0: return ['FN'] * len(iou_threshs)

    possible_matches['iou'] = possible_matches.apply(lambda pred: poly_IoU(pred.geometry, row.geometry), axis=1)
    
    retvals = []
    
    for i, iou_thresh in enumerate(iou_threshs):
        iou_thresh = np.round(iou_thresh, 2)
        possible_matches = possible_matches[possible_matches.iou >= iou_thresh]
        if len(possible_matches) == 0: return retvals + ['FN'] * (len(iou_threshs)-len(retvals))
        
        possible_matches.reset_index(inplace=True, drop=True)
        max_iou_ix = possible_matches['iou'].idxmax()
        max_score_id = possible_matches['score'].idxmax()
        
        if possible_matches.iloc[max_iou_ix].iou < iou_thresh: return ['FN'] * (len(iou_threshs) - len(retvals))
        
        if possible_matches.iloc[max_iou_ix].label != row.label: return ['FN'] * (len(iou_threshs) - len(retvals))
        
        retvals.append('TP')
    return retvals

In [None]:
# export

def is_false_positive(row, ground_truth:gpd.GeoDataFrame, gt_sindex:gpd.sindex, 
                      results:gpd.GeoDataFrame, res_sindex:gpd.sindex):
    "Check if prediction is FP or TP for 11 different IoU thresholds"
    
    iou_threshs = np.arange(0.5, 1.04, 0.05)
    
    # First find out the matching ground truth masks
    c = list(gt_sindex.intersection(row.geometry.bounds))
    possible_gt_matches = ground_truth.iloc[c].copy()
    #possible_gt_matches = possible_matches[possible_matches.label == row.label].copy()
    possible_gt_matches = possible_gt_matches.loc[possible_gt_matches.intersects(row.geometry)]
    possible_gt_matches.reset_index(inplace=True)
    
    # No ground truth masks -> false positive
    if len(possible_gt_matches) == 0: return ['FP'] * len(iou_threshs)
    
    retvals = []
    
    # Count IoU for all possible_gt_matches
    possible_gt_matches['iou'] = possible_gt_matches.apply(lambda gt: poly_IoU(gt.geometry, row.geometry), axis=1)
    
    # Assume that largest IoU is the corresponding label
    gt_ix = possible_gt_matches['iou'].idxmax()
    
    for i, iou_thresh in enumerate(iou_threshs):
        iou_thresh = np.round(iou_thresh, 2)
        # If IoU-threshold is too low -> false positive
        if possible_gt_matches.iloc[gt_ix].iou < iou_thresh: 
            return retvals + ['FP'] * (len(iou_threshs)-len(retvals))


        # If labels don't match -> false positive:
        if possible_gt_matches.iloc[gt_ix].label != row.label: 
            return retvals + ['FP'] * (len(iou_threshs)-len(retvals))

        # Then check whether there are other predictions
        c = list(res_sindex.intersection(row.geometry.bounds))
        possible_pred_matches = results.iloc[c].copy()

        # Remove examined row from possible_matches. Assume that scores are always different (spoiler: they are not)
        possible_pred_matches = possible_pred_matches[possible_pred_matches.score != row.score]

        # No other possibilities -> not FP
        if len(possible_pred_matches) == 0: 
            retvals.append('TP')
            continue

        possible_pred_matches['iou'] = possible_pred_matches.apply(lambda pred: poly_IoU(pred.geometry, 
                                                                                         possible_gt_matches.iloc[gt_ix].geometry), 
                                                                   axis=1)

        possible_pred_matches = possible_pred_matches[possible_pred_matches.iou > iou_thresh]
        possible_pred_matches.reset_index(inplace=True)

        if len(possible_pred_matches) == 0: 
            retvals.append('TP')
            continue

        pred_max_iou_ix = possible_pred_matches['iou'].idxmax()
        pred_max_score_ix = possible_pred_matches['score'].idxmax()

        # Do any other possible predictions have larger score? If yes -> FP
        if possible_pred_matches.iloc[pred_max_score_ix].score > row.score: 
            return retvals + ['FP'] * (len(iou_threshs)-len(retvals))
        
        retvals.append('TP')
    
    return retvals

`average_precision` and `average_recall` both return `dict` of the results, with each label and each IoU threshold separately. Each item is 11 item list where each item correspond to a different recall threshold in the range of [0:.1:1] in the case of `average_precision`, or IoU threshold in the range of [.50:.05:1] in for `average_recall`. 

In [None]:
# export

def average_precision(ground_truth:gpd.GeoDataFrame, preds:gpd.GeoDataFrame) -> dict:
    "Get 11-point AP score for each label separately and with all iou_thresholds"
    
    # Clip geodataframes so that they cover the same area
    preds = gpd.clip(preds, box(*ground_truth.total_bounds), keep_geom_type=True)
    ground_truth = gpd.clip(ground_truth, box(*preds.total_bounds), keep_geom_type=True)
    
    gt_sindex = ground_truth.sindex
    pred_sindex = preds.sindex
    fp_cols = [f'FP_{np.round(i, 2)}' for i in np.arange(0.5, 1.04, 0.05)]
    preds[fp_cols] = preds.apply(lambda row: is_false_positive(row, ground_truth, gt_sindex, preds, pred_sindex), 
                                 axis=1, result_type='expand')
    iou_threshs = np.arange(0.5, 1.04, 0.05)
    
    res_dict = {}
    for l in preds.label.unique():
        for iou_thresh in iou_threshs:
            iou_thresh = np.round(iou_thresh, 2)
            res_dict[f'{l}_pre_{iou_thresh}'] = []
            temp_preds = preds[preds.label == l].copy()
            num_correct = len(ground_truth[ground_truth.label == l])
            temp_preds.sort_values(by='score', ascending=False, inplace=True)
            temp_preds.reset_index(inplace=True)
            temp_preds['cumul_TP'] = 0.
            temp_preds['precision'] = 0. 
            temp_preds['recall'] = 0.
            temp_preds.loc[0, 'cumul_TP'] = 0 if temp_preds.loc[0, f'FP_{iou_thresh}'] == 'FP' else 1
            temp_preds.loc[0, 'precision'] = temp_preds.loc[0,'cumul_TP'] / 1
            temp_preds.loc[0, 'recall'] = temp_preds.loc[0,'cumul_TP'] / num_correct
            for i in range(1, len(temp_preds)):
                row_tp = 0 if temp_preds.loc[i, f'FP_{iou_thresh}'] == 'FP' else 1
                temp_preds.loc[i, 'cumul_TP'] = temp_preds.loc[i-1, 'cumul_TP'] + row_tp
                temp_preds.loc[i, 'precision'] = temp_preds.loc[i,'cumul_TP'] / (i+1)
                temp_preds.loc[i, 'recall'] = temp_preds.loc[i,'cumul_TP'] / num_correct
            recall_threshs = np.arange(0,1.04, 0.1)
            for rec_thresh in recall_threshs:
                pre = temp_preds[temp_preds.recall >= rec_thresh].precision.max()
                res_dict[f'{l}_pre_{iou_thresh}'].append(0 if not np.isfinite(pre) else pre)  
    return res_dict

In [None]:
# export 

def average_recall(ground_truth:gpd.GeoDataFrame, preds:gpd.GeoDataFrame, max_detections:int=None) -> dict:
    """Get 11-point AR score for each label separately and with all iou_thresholds. 
    If `max_detections` is not `None` evaluate with only that most confident predictions
    Seems to be still bugged, needs fixing
    """
    
    # Clip geodataframes so that they cover the same area
    preds = gpd.clip(preds, box(*ground_truth.total_bounds), keep_geom_type=True)
    ground_truth = gpd.clip(ground_truth, box(*preds.total_bounds), keep_geom_type=True)
    
    tp_cols = [f'TP_{np.round(i, 2)}' for i in np.arange(0.5, 1.03, 0.05)]
    if max_detections is not None:
        preds.sort_values(by='score', ascending=False, inplace=True)
        preds = preds[:max_detections]
        preds.reset_index(inplace=True)
    pred_sindex = preds.sindex
    ground_truth[tp_cols] = ground_truth.apply(lambda row: is_true_positive(row, preds, pred_sindex), 
                                               axis=1, result_type='expand')
    iou_threshs = np.arange(0.5, 1.04, 0.05)
    res_dict = {}
    for l in ground_truth.label.unique():
        res_dict[f'{l}_rec'] = []
        for iou_thresh in iou_threshs:
            iou_thresh = np.round(iou_thresh, 2)
            temp_gt = ground_truth[ground_truth.label == l].copy()
            res_dict[f'{l}_rec'].append(len(temp_gt[temp_gt[f'TP_{iou_thresh}'] == 'TP']) / len(temp_gt))
    return res_dict