In [None]:
from video699.video.annotated import get_videos, AnnotatedSampledVideo, AnnotatedSampledVideoScreenDetector, GEOSConvexQuadrangle, Point, ScreenABC
from pathlib import Path
from fastai.vision import load_learner, SegmentationLabelList, SegmentationItemList, DatasetType, defaults, Image
import torch
from functools import partial
from fastai.metrics import dice
from shapely.geometry import LineString
from shapely.ops import split
import cv2
from matplotlib import pyplot as plt
import numpy as np
from tqdm import tqdm
from functools import partial
import scipy
import pandas as pd
# defaults.device = torch.device('cpu')

In [None]:
def midpoint(pointA, pointB):
    return ((pointA[0]+ pointB[0]) / 2, (pointA[1]+ pointB[1]) / 2)

def get_coordinates(quadrangle):
    squeezed = quadrangle.squeeze()
    x = squeezed[:, 0]
    y = squeezed[:, 1]
    top_left = (x+y).argmin()
    top_right = (max(y)-y + x).argmax()
    bottom_left = (max(x)-x + y).argmax()
    bottom_right = (x+y).argmax()
    return {'top_left': squeezed[top_left], 
            'top_right': squeezed[top_right], 
            'bottom_right': squeezed[bottom_right], 
            'bottom_left': squeezed[bottom_left]}
        
def draw_polygon(polygon, image):
    copy = image.copy()
    return cv2.fillConvexPoly(copy, polygon, 100)

def draw_polygons(polygons, image, show=True):
    copy=image.copy()
    # Visualization
    if len(polygons) == 0:
        if show:
            plt.imshow(copy)
            plt.show()
    else:
        for polygon in polygons:
            copy = cv2.fillConvexPoly(copy, polygon, 100)
        if show:
            plt.imshow(copy)
            plt.show()
    return copy

class SegLabelListCustom(SegmentationLabelList):
    def open(self, fn): return open_mask(fn, div=False, convert_mode='L')

class SegItemListCustom(SegmentationItemList):
    _label_cls = SegLabelListCustom

def acc(input, targs):
    "Accuracy."
    targs = targs.squeeze(1)
    return (input.argmax(dim=1)==targs).float().mean()

# iou = partial(dice, iou=True)

In [None]:
path = Path("/mnt/c/Users/mikul/Desktop/BP/implementation-system")
videos = get_videos()
keys = list(videos.keys())
videos_list = [videos[key] for key in keys]

In [None]:
class FastAIVideoScreen(ScreenABC):
    def __init__(self, frame, screen_index, coordinates):
        self._frame = frame
        self._screen_index = screen_index
        self._coordinates = coordinates

    @property
    def frame(self):
        return self._frame

    @property
    def coordinates(self):
        return self._coordinates
    
class FastAIScreenDetector:
    def __init__(self, path, methods):
        self.learner = load_learner(path=path, bs=4)
        self.methods=methods
    
    def detect(self, frame):
        # Semantic segmentation
        image = cv2.cvtColor(frame.image, cv2.COLOR_RGBA2RGB)
        tensor = torch.from_numpy(np.transpose(image, (2, 0, 1)))
        tensor = Image(tensor.to(torch.float32) / 255)
        pred = self.learner.predict(tensor)
        predicted_numpy = np.squeeze(np.transpose(pred[1].numpy(), (1, 2, 0))).astype('uint8')
        shape = predicted_numpy.shape
        predicted_resized = cv2.resize(predicted_numpy, dsize=(shape[1]*2, shape[0]*2))
        # Screen retrieval (Post processing)
        geos_quadrangles = approximate(predicted_resized, methods=self.methods)
    
        # Create screens (System Data Types)
        return [FastAIVideoScreen(frame, screen_index, quadrangle) for screen_index, quadrangle in enumerate(geos_quadrangles)]

In [None]:
def iou(screenA, screenB):
    intersection = screenA.coordinates.intersection_area(screenB.coordinates)
    union = screenA.coordinates.union_area(screenB.coordinates)
    return intersection / union

### Evaluation of single video predictions with total number of screens + iou
* dice, accruacy metrics can be added later

In [None]:
def all_video_statistics(videos, actual_detector):
    sizes = []
    ratios = []
    for video in videos:
        size, ratio = single_video_statistics(video, actual_detector)
        sizes.extend(size)
        ratios.extend(ratio)
    return sizes, ratios

def single_video_statistics(video, actual_detector):
    sizes = []
    ratios = []
    for frame in tqdm(video):
        actual_screens = actual_detector.detect(frame)
        for screen in actual_screens:
            sizes.append(screen.coordinates.area)
            ratios.append(screen.coordinates.height / screen.coordinates.width)
    return sizes, ratios

In [None]:
def all_videos_eval(videos, actual_detector, pred_detector):
    all_wrong_screen_count_frames, all_ious, all_really_bad_ious = [], [] ,[]
    for video in videos:
        wrong_screen_count_frames, ious, really_bad_ious = single_video_eval(video, actual_detector, pred_detector)
        all_wrong_screen_count_frames.extend(wrong_screen_count_frames)
        all_ious.extend(ious)
        all_really_bad_ious.extend(really_bad_ious)
    return all_wrong_screen_count_frames, all_ious, all_really_bad_ious

def single_video_eval(video, actual_detector, pred_detector):
    wrong_screen_count_frames = []
    ious = []
    really_bad_ious = []
    for frame in tqdm(video):
        frame_ious = []
        actual_screens = sorted(actual_detector.detect(frame), key=lambda screen: screen.coordinates.top_left[0])
        pred_screens = sorted(pred_detector.detect(frame), key=lambda screen: screen.coordinates.top_left[0])
        if len(actual_screens) != len(pred_screens):
            wrong_screen_count_frames.append(frame)
            # Think about what to do !!!
        else:
            for screenA, screenB in zip(actual_screens, pred_screens):
                score = iou(screenA, screenB)
                frame_ious.append(score)
        
        if len(frame_ious) > 0:
            score = np.array(frame_ious).mean()
        else:
            score = np.nan
        ious.append(score)
        if score < 0.92:
            really_bad_ious.append(frame)
    return wrong_screen_count_frames, ious, really_bad_ious

In [None]:
def legend_without_duplicate_labels(ax):
    handles, labels = ax.get_legend_handles_labels()
    unique = [(h, l) for i, (h, l) in enumerate(zip(handles, labels)) if l not in labels[:i]]
    ax.legend(*zip(*unique))
    
def single_frame_visualization(frame, actual_detector, pred_detector, methods=['erose_dilate', 'ratio_split']):
    actual_screens = sorted(actual_detector.detect(frame), key=lambda screen: screen.coordinates.top_left[0])
    pred_screens = sorted(pred_detector.detect(frame), key=lambda screen: screen.coordinates.top_left[0])
    
    fig, axes = plt.subplots(nrows=1, ncols=4, figsize=(20, 4),sharex='row', sharey='row')
    fig.tight_layout()
    for screen in actual_screens:
        polygon = screen.coordinates._polygon
        axes[0].set_title("Actual")
        axes[0].plot(*polygon.exterior.xy, c='tab:orange')
        axes[2].plot(*polygon.exterior.xy, c='tab:orange', label="Actual")
        
    for screen in pred_screens:
        polygon = screen.coordinates._polygon
        axes[1].set_title("Prediction")
        axes[1].plot(*polygon.exterior.xy, c='tab:blue')
        axes[2].plot(*polygon.exterior.xy, c='tab:blue', label="Predicted")
    
    axes[2].set_title("Combined")
    legend_without_duplicate_labels(axes[2])
    
    axes[3].set_title("Original")
    axes[3].imshow(frame.image)
    plt.show()

In [None]:
def all_frames_visualization(frames, actual_detector, pred_detector):
    for frame in frames:
        single_frame_visualization(frame, actual_detector, pred_detector)

In [None]:
def contour_approx(contours, lower_bound, upper_bound, factors):
    quadrangles = []
    for cnt in contours:
        if upper_bound > cv2.contourArea(cnt) > lower_bound:
            for factor in factors:
                epsilon = factor*cv2.arcLength(cnt, True)
                polygon = cv2.approxPolyDP(cnt, epsilon, True)
                if polygon.shape[0] == 4:
                    quadrangles.append(polygon)
                    break
    return quadrangles
    
def approximate(pred, methods):
    if 'base' in methods.keys():
        quadrangles = approximate_baseline(pred, **methods['base'])
        
    if 'erose_dilate' in methods.keys():
        erose_dilate_quadrangles = approximate_erose_dilate(pred, **methods['erose_dilate'])
        quadrangles = erose_dilate_quadrangles if len(erose_dilate_quadrangles) > len(quadrangles) else quadrangles
        
    if 'ratio_split' in methods.keys():
        quadrangles = approximate_ratio_split(quadrangles, **methods['ratio_split'])
        
    else:
        quadrangles = [GEOSConvexQuadrangle(**get_coordinates(quadrangle)) for quadrangle in quadrangles]
    
    return quadrangles
    
def approximate_baseline(pred, lower_bound, upper_bound, factors):
    contours, _ = cv2.findContours(pred, cv2.RETR_TREE, cv2.CHAIN_APPROX_SIMPLE)
    quadrangles = contour_approx(contours, lower_bound, upper_bound, factors)
    return quadrangles

def approximate_erose_dilate(pred, lower_bound, upper_bound, iterations, factors):
    # TODO contours rozdelit po jednom erose a dilatace.
    erosed = cv2.erode(pred, None, iterations=iterations)
    contours, _ = cv2.findContours(erosed, cv2.RETR_TREE, cv2.CHAIN_APPROX_SIMPLE)
    quadrangles = contour_approx(contours, lower_bound, upper_bound, factors)
    erosed_dilated_quadrangles = []
    for quadrangle in quadrangles:
        zeros = np.zeros(pred.shape, dtype='uint8')
        erosed_quadrangle = draw_polygon(quadrangle, zeros)
        dilated_quadrangle = cv2.dilate(erosed_quadrangle, None, iterations=iterations)
        contours, _ = cv2.findContours(dilated_quadrangle, cv2.RETR_TREE, cv2.CHAIN_APPROX_SIMPLE)
        erosed_dilated_quadrangles.extend(contour_approx(contours, lower_bound, upper_bound, factors))
    return erosed_dilated_quadrangles

def approximate_ratio_split(quadrangles, lower_ratio_bound, upper_ratio_bound):  
    ratio_split_quadrangles = []
    for quadrangle in quadrangles:
        geos_quadrangle = GEOSConvexQuadrangle(**get_coordinates(quadrangle))
        
        if lower_ratio_bound < geos_quadrangle.height / geos_quadrangle.width < upper_ratio_bound or geos_quadrangle.area < 80000:
            ratio_split_quadrangles.append(geos_quadrangle)
            continue
            
        if not lower_ratio_bound < geos_quadrangle.height / geos_quadrangle.width:
            upper_midpoint = midpoint(geos_quadrangle.top_left, geos_quadrangle.top_right)
            lower_midpoint = midpoint(geos_quadrangle.bottom_left, geos_quadrangle.bottom_right)
            line = LineString([upper_midpoint, lower_midpoint])
            result = split(geos_quadrangle._polygon, line)

        elif not geos_quadrangle.height / geos_quadrangle.width < upper_ratio_bound:
            left_midpoint = midpoint(geos_quadrangle.top_left, geos_quadrangle.bottom_left)
            right_midpoint = midpoint(geos_quadrangle.top_right, geos_quadrangle.bottom_right)
            line = LineString([left_midpoint, right_midpoint])
            result = split(geos_quadrangle._polygon, line)
            
        for res in result:
            x, y = res.exterior.coords.xy
            coords = np.array([list(a) for a in zip(x,y)])
            ratio_split_quadrangles.append(GEOSConvexQuadrangle(**get_coordinates(coords)))
    return ratio_split_quadrangles

In [None]:
baseline_parameters = {'lower_bound': 30000, 'upper_bound': 200000, 'factors': [0.1, 0.01]}
erose_dilate_parameters = {'lower_bound': 30000, 'upper_bound': 200000, 'factors': [0.1, 0.01], 'iterations': 40}
ratio_split_baseline = {"lower_ratio_bound": 0.7, 'upper_ratio_bound': 1.5}

actual_detector = AnnotatedSampledVideoScreenDetector()
# pred_detector = FastAIScreenDetector(path, methods={'base': baseline_parameters})
# pred_detector_erose = FastAIScreenDetector(path, methods={'erose_dilate': baseline_parameters})
# pred_detector_ratio = FastAIScreenDetector(path, methods={'ratio_split': ratio_split_baseline})
pred_detector_all = FastAIScreenDetector(path, methods={'base': baseline_parameters,
                                                        'erose_dilate': erose_dilate_parameters,
                                                        'ratio_split': ratio_split_baseline
                                                       })

### Next ideas
* Find videos with worst IoU automatically
* Find videos with most screen misses automatically
* Find individual frames with worst IoU
* Investigate effect of different post-processing techniques

In [None]:
wrong_count, ious, really_bad_ious = all_videos_eval(videos_list, actual_detector, pred_detector_all)

In [None]:
# Manually optimized
print(f"Size of test data: {len(ious)}")
print(f"Number of examples with wrong number of screens: {len(wrong_count)}") 
print(f"Ratio of examples with wrong number of screens to all examples: {len(wrong_count) / len(ious)}")
print(f"Mean iou: {np.nanmean(ious)}")

In [None]:
all_frames_visualization(wrong_count, actual_detector, pred_detector_all)

In [None]:
all_frames_visualization(really_bad_ious, actual_detector, pred_detector_all)

### Debugging and checking the results.

In [None]:
learner = load_learner(path)
methods = []

frame = really_bad_ious[0]
image = cv2.cvtColor(frame.image, cv2.COLOR_RGBA2RGB)
tensor = torch.from_numpy(np.transpose(image, (2, 0, 1)))
tensor = Image(tensor.to(torch.float32) / 255)
pred = learner.predict(tensor)
predicted_numpy = np.squeeze(np.transpose(pred[1].numpy(), (1, 2, 0))).astype('uint8')
shape = predicted_numpy.shape
predicted_resized = cv2.resize(predicted_numpy, dsize=(shape[1]*2, shape[0]*2))
# Screen retrieval (Post processing)
geos_quadrangles = approximate(predicted_resized, methods=methods)

In [None]:
plt.imshow(predicted_resized)
plt.show()

In [None]:
quadrangles = []
lower_bound=30000
upper_bound=200000
contours, _ = cv2.findContours(predicted_resized, cv2.RETR_TREE, cv2.CHAIN_APPROX_SIMPLE)
factors = [0.1, 0.01]
for cnt in contours:
    if upper_bound > cv2.contourArea(cnt) > lower_bound:
        for factor in factors:
            epsilon = factor*cv2.arcLength(cnt, True)
            polygon = cv2.approxPolyDP(cnt, epsilon, True)
            if polygon.shape[0] == 4:
                zeros = np.zeros(predicted_resized.shape, dtype='uint8')
                draw_polygons([polygon], zeros, show=True)
                quadrangles.append(polygon)
                break

## Get statistics about dataset

In [None]:
sizes, ratios = all_video_statistics(videos_list, actual_detector)

In [None]:
def get_confidence_interval(data, ndigits=0):
    round_n_decimals = partial(round, ndigits=ndigits)
    data = np.array(data)
    mean, std = data.mean(), data.std()
    return map(round_n_decimals, scipy.stats.norm.interval(0.95, loc=mean, scale=std)) 

In [None]:
lower_area, upper_area = get_confidence_interval(sizes)
lower_area, upper_area

In [None]:
lower_ratio, upper_ratio = get_confidence_interval(ratios, ndigits=3)
lower_ratio, upper_ratio

## Grid Search

In [None]:
# Use random search with hyper-opt
lower_bounds = [30000, 36050, 40000]
upper_bounds = [99229, 125000, 175000, 200000]
factor_list = [[0.1, 0.01], [0.1], [0.01], [0.01, 0.02]]

all_settings = [(lower, upper, factors) for lower in lower_bounds for upper in upper_bounds for factors in factor_list]
results = pd.DataFrame(columns=['lower_bound', 'upper_bound', 'factors', 'mean_iou', 'number_of_wrong_screens'])

for setting in all_settings:
    parameters = {'lower_bound': setting[0], 'upper_bound': setting[1], 'factors': setting[2]}
    pred_detector_grid_search = FastAIScreenDetector(path, methods={'base': parameters})
    wrong_count, ious, really_bad_ious = all_videos_eval(videos_list, actual_detector, pred_detector_grid_search)
    results.loc[len(results)] = [setting[0], setting[1], setting[2], np.nanmean(ious), len(wrong_count)]

In [None]:
results

In [None]:
# erose_dilate_parameters = {'lower_bound': 30000, 'upper_bound': 200000, 'factors': [0.1, 0.01], 'iterations': 40}
# ratio_split_baseline = {"lower_ratio_bound": 0.7, 'upper_ratio_bound': 1.5}
# iterations = [10, 20, 30, 40, 50, 60, 70, 80, 90, 100]
# lower_ratio_bound = [0.648, 0.7, 0.8, 0.9]
# upper_ratio_bound = [0.883, 1.0, 1.25, 1.5]

## Multiple images in one batch to better use GPU parallelization

In [None]:
# test = SegItemListCustom.from_folder(path/'video699/video/annotated', recurse=True)

# learner.data.add_test(test, tfms=None, tfm_y=False)

# preds = learner.get_preds(DatasetType.Test)[0].squeeze().numpy()
# preds

In [None]:
# np.bitwise_and(zeros.astype(bool), prediction.astype(bool)).sum() / np.bitwise_or(zeros.astype(bool), prediction.astype(bool)).sum()