# Data Pipelines

## Pipeline Code

In [1]:
import os
import sys
import time
import io
import threading
from datetime import datetime
import random
sys.path.insert(0, os.path.abspath('..'))

import cv2
import requests
from  matplotlib import pyplot as plt
from IPython.display import clear_output
from ipywidgets import widgets
from PIL import Image, ImageDraw, ImageFont
from IPython.core.display import display
import numpy as np
import scipy.spatial


STREAM_URL = 'http://10.10.112.29:8080/'
DETECTION_URL = 'http://boat-detect:8080/v1/detect'
IDENTIFY_URL = 'http://boat-identify:8080/v1/identify'
GROUPING_URL = 'http://boat-group:8080/v1/group'

GROUP_COLORS = ['red', 'green', 'white', 'cyan', 'blue']

ZONES = [["top left", "left", "bottom left"], ["top", "center", "bottom"], ["top right", "right", "bottom right"]]
NUMBERS = ["zero", "one", "two", "three", "four", "five", "six", "seven", "eight", "nine", "ten"]
DIRECTIONS = ["east", "east north east", "north east", "north north east", "north", "north north west", "north west", "west nort west", "west", "west south west", "south west", "south south west", "south", "south south east", "south east", "east south east"]
#DIRECTIONS = ["east", "north east", "north", "north west", "west", "south west", "south", "south east"]
STATIONARY_THRESHOLD = 4

### Image Jupyter Widget

In [2]:
#### IMAGE WIDGET
def _annotate_image(img, db, show_boxes=True, show_ids=True, show_missing=True, show_groups=True):
    canvas = ImageDraw.Draw(img)
    
    for item in db.get('objects', []):
        if show_groups and 'cluster' in item and 'box' in item:
            canvas.rectangle(item['box'], outline=GROUP_COLORS[item['cluster']], width=3)
        elif show_boxes and 'box' in item:
            canvas.rectangle(item['box'], outline='red', width=3)
        if show_ids and 'id' in item and 'centroid' in item:
            font = ImageFont.truetype('Overpass-Regular.ttf', 24)
            canvas.text(item['centroid'], str(item['id']),'red', font=font,
                        stroke_width=2, stroke_fill='white')
            
    if show_missing and 'missing' in db.get('tracking', {}):
        for item in db['tracking']['missing'].values():
            font = ImageFont.truetype('Overpass-Regular.ttf', 24)
            canvas.text(item['centroid'],item['id'],'red', font=font,
                        stroke_width=2, stroke_fill='white')
        
    return img
                

def pil2pngs(img):
    output = io.BytesIO()
    img.save(output, format='PNG')
    output.seek(0)
    img_data = output.read()
    output.close()
    
    return img_data


class PipelineWidget(widgets.Image):
    
    def __init__(self, pipeline, show_boxes=True, show_ids=True, show_missing=True, show_groups=True):
        
        self.frame = pipeline[-1]
        self.show_boxes = show_boxes
        self.show_ids = show_ids
        self.show_missing = show_missing
        self.show_groups = show_groups
        self.stop = False
        
        timestamp, img, meta = self.frame.snap()
        img = _annotate_image(img, meta, self.show_boxes, self.show_ids, self.show_missing, self.show_groups)
        super().__init__(value=pil2pngs(img),format='png')
        threading.Thread(target=self._loop).start()
        
    
    def draw(self):
        timestamp, img, meta = self.frame.snap()
        img = _annotate_image(img, meta, self.show_boxes, self.show_ids, self.show_missing, self.show_groups)
        self.value = pil2pngs(img)

    def _loop(self):
        self.stop = False
        while not self.stop:
            time.sleep(1/24)
            self.draw()


class PipelineWidgetWithControls(widgets.GridspecLayout):
        def __init__(self, pipeline):
            super().__init__(2, 4, width='1300px', height='770px')
            
            self[0,0] = self._objects = widgets.Checkbox(value=False, description='Show Objects (Detection)', disabled=False, indent=False)
            self[0,1] = self._ids = widgets.Checkbox(value=False, description='Show IDs (Identification)', disabled=False, indent=False)
            self[0,2] = self._missing = widgets.Checkbox(value=False, description='Show Missing(Identification)', disabled=False, indent=False)
            self[0,3] = self._groups = widgets.Checkbox(value=False, description='Show Groups(Clustering)', disabled=False, indent=False)
            self[1,:] = self._pipeline = PipelineWidget(pipeline, False, False, False, False)
            
            self._objects.observe(self.toggle_objects)
            self._ids.observe(self.toggle_ids)
            self._missing.observe(self.toggle_missing)
            self._groups.observe(self.toggle_groups)
        
        def toggle_objects(self, b):
            self._pipeline.show_boxes = self._objects.value
            
        def toggle_ids(self, b):
            self._pipeline.show_ids= self._ids.value
            
        def toggle_missing(self, b):
            self._pipeline.show_missing = self._missing.value
            
        def toggle_groups(self, b):
            self._pipeline.show_groups = self._groups.value

### Pipeline Objects

In [3]:
# CAMERA READER
class RemoteCameraSource(object):
    
    def __init__(self, url):
        self._url = url
        self._thread = None
        self._stream = None
        self._connecting = False
        self._last_frame = None
        self._last_frame_time = None
        self._next = None
        
    def __del__(self):
        self.stop_stream()
        
    def snap(self):
        # returns (timestamp, image, metadata)
        if self._last_frame is not None:
            return (
                self._last_frame_time,
                Image.fromarray(
                    cv2.cvtColor(self._last_frame, cv2.COLOR_BGR2RGB), 
                    "RGB"),
                {}
            )
        else:
            return (None, None, {})
    
    def reconnect(self):
        while self._connecting:
            time.sleep(1)
                
        self._connecting = True
        
        connected = False
        while not connected:
            self._stream = cv2.VideoCapture(self._url)
            connected = self._stream.isOpened()
            if connected:
                self._connecting = False
                return
            
            print('Could not connect to {}'.format(self._url))
            print('Retrying in 1 minute')
            
            time.sleep(60)
            
    def start_stream(self):
        if self._thread and self._thread.is_alive():
            print('Thread already running')
            return
        
        self.reconnect()
        
        self._thread = threading.Thread(target=self._watch, args=())
        self._thread.start()
        
    def stop_stream(self):
        self._stream.release()
        
    def _watch(self):
        fail_counter = 0
        while self._stream.isOpened():
            frame_time = datetime.now()
            ret, frame = self._stream.read()
            
            if not ret:
                fail_counter += 1
            else:
                fail_counter = 0
                self._last_frame_time = frame_time
                self._last_frame = frame
                if self._next:
                    self._run_next()
            
            time.sleep(1/60)
                
            if fail_counter > 50:
                self.reconnect()
                fail_counter = 0        
                
        self._stream.release()
        
    def _run_next(self):
        threading.Thread(target=self._next).start()
        
    def register(self, next_step):
        self._next = next_step

In [4]:
# API CALLERS
class ModelFilter(object):
    def __init__(self, source, url, send_raw_image=True):
        self._source = source
        self._data = {}
        self._url = url
        self._last_snap = self._source.snap()
        self._running = False
        self._ready = False
        self._next = None
        self._last_response = None
        self._sendimage = send_raw_image
        
        self._source.register(self.execute)
        
    @property
    def ready(self):
        return self._ready
        
    def snap(self):
        return self._last_snap
    
    @property
    def response(self):
        return self._last_response
    
    def execute(self):
        if not self._running:
            self._running = True
            timestamp, img, metadata = self._source.snap()
            now_time = None
            if timestamp:
                now_time = timestamp.timestamp()
            if self._sendimage:
                response = requests.post(
                    self._url, 
                    files={"image": pil2pngs(img)})
            else:
                last_time = None
                if self._last_snap[0]:
                    last_time = self._last_snap[0].timestamp()
                last_meta = self._last_snap[2] or metadata
                response = requests.post(
                    self._url, 
                    json={'last': last_meta,
                          'last_time': last_time,
                          'now': metadata,
                          'now_time': now_time})
            
            self._last_response = response
            try:
                meta = response.json()
            except json.decoder.JSONDecodeError:
                meta = {}
            
            self._last_snap = (timestamp, img, meta)
            self._running = False
            
            if self._next:
                self._run_next()
        
        self._ready = True
        
    def register(self, next_step):
        self._next = next_step
        
    def _run_next(self):
        threading.Thread(target=self._next).start()

In [5]:
# Text Generator
def _deg2dir(deg):
    ncats = len(DIRECTIONS)
    idx = int((deg + (360 / ncats / 2)) / (360 / ncats))
    return DIRECTIONS[idx % ncats]
    

def results2text(image, meta):
    # to text
    passage = []
    zone_width = image.size[0] / 3.
    zone_height = image.size[1] / 3.

    # total count
    nboats = len(meta.get('objects', []))
    nclusters = len(meta.get('clusters', []))
    if nclusters == nboats:
        passage.append('There are {} boats.'.format(NUMBERS[nboats]))
    else:
        passage.append('There are {} boats in {} groups.'.format(NUMBERS[nboats], NUMBERS[nclusters]))
    passage.append('')

    for cluster in range(nclusters):
        # clusters
        cluster_cent = meta['clusters'][cluster]
        cluster_zone = ZONES[int(cluster_cent[0]/zone_width)][int(cluster_cent[1]/zone_height)]
        cluster_ships = [item for item in meta['objects'] if item['cluster']==cluster]
        ncluster_ships = len(cluster_ships)
        article = ' is' if ncluster_ships == 1 else 's are'

        passage.append('{} boat{} at the {}.'.format(NUMBERS[ncluster_ships].capitalize(), article, cluster_zone))

        # stationary
        stationary_ships = [item for item in cluster_ships if item['speed'] < STATIONARY_THRESHOLD]
        if stationary_ships:
            nstationary_ships = len(stationary_ships)
            if ncluster_ships == 1:
                count_str = "It is stationary."
            elif ncluster_ships == nstationary_ships:
                count_str = "They are all stationary."
            else:
                count_str = "One of them is stationary." if nstationary_ships == 1 else "{} of them stationary.".format(NUMBERS[nstationary_ships].capitalize())
            passage.append(count_str)

        # motion - direction groups
        moving_ships = [item for item in cluster_ships if item['speed'] >= STATIONARY_THRESHOLD]
        ships_by_dir = {}
        if moving_ships:
            for ship in moving_ships:
                direction = _deg2dir(ship['direction'])
                ships_by_dir.setdefault(direction, []).append(ship)
            if ncluster_ships == 1:
                count_str = "Is is travelling {}.".format(direction)
                passage.append(count_str)
            else:
                for key, val in ships_by_dir.items():
                    article = 'is' if len(val) == 1 else 'are'
                    count_str = "{} {} travelling {}.".format(NUMBERS[len(val)].capitalize(), article, key)
                    passage.append(count_str)

        passage.append('')

    # done
    output = ''
    for line in passage:
        if line:
            output += line + ' '
        else:
            output += '\n'
    output += '\n\n'
    return output

## GUI Code

In [6]:
# PIPELINE
pipeline = []
# READ FROM CAMERA
pipeline.append(RemoteCameraSource(STREAM_URL))  
# RUN OBJECT DETECTION MODEL
pipeline.append(ModelFilter(pipeline[-1], DETECTION_URL))  
# RUN OBJECT IDENTIFICATION ALGORITHM
pipeline.append(ModelFilter(pipeline[-1], IDENTIFY_URL, False))  
# RUN OBJECT GROUPING ALGORITHM
pipeline.append(ModelFilter(pipeline[-1], GROUPING_URL, False))  
pipeline[0].start_stream()

# wait for first frame to propogate through the pipeline
while not pipeline[-1].ready:
    time.sleep(1)

In [7]:
a = PipelineWidgetWithControls(pipeline)
a

PipelineWidgetWithControls(children=(Checkbox(value=False, description='Show Objects (Detection)', indent=Fals…

In [9]:
text = widgets.Textarea(areavalue='ASCII', disabled=True, layout=widgets.Layout(width="auto"), rows=10)

def updater():
    output_last = ''
    while True:
        timestamp, image, meta = pipeline[-1].snap()
        output = results2text(image, meta)
        if output != output_last:
            text.value = output
        time.sleep(10)

thread = threading.Thread(target=updater)
thread.start()
text


Textarea(value='There are four boats. \nOne boat is at the right. Is is travelling east. \nOne boat is at the …