# Coral Actions

## PoseEngineAction

In [9]:
import random
from PIL import Image
from PIL import ImageDraw

from pycoral.utils import edgetpu
from PIL import Image
from tflite_runtime.interpreter import load_delegate
from tflite_runtime.interpreter import Interpreter

import collections
import enum
import math
import numpy as np
import os
import platform
import sys
import time

from projectposenet.pose_engine import PoseEngine

colors = [(255,0,0,255) ,(0,255,0,255) ,(0,0,255,255),(255,255,0,255) ,(0,255,255,255) ,(255,255,255,255), \
         (125,0,0,255) ,(125,255,0,255) ,(125,0,255,255),(125,255,0,255) ,(125,255,255,255) ,(125,255,255,255)]

EDGES = (
    ('nose', 'left eye'),
    ('nose', 'right eye'),
    ('nose', 'left ear'),
    ('nose', 'right ear'),
    ('left ear', 'left eye'),
    ('right ear', 'right eye'),
    ('left eye', 'right eye'),
    ('left shoulder', 'right shoulder'),
    ('left shoulder', 'left elbow'),
    ('left shoulder', 'left hip'),
    ('right shoulder', 'right elbow'),
    ('right shoulder', 'right hip'),
    ('left elbow', 'left wrist'),
    ('right elbow', 'right wrist'),
    ('left hip', 'right hip'),
    ('left hip', 'left knee'),
    ('right hip', 'right knee'),
    ('left knee', 'left ankle'),
    ('right knee', 'right ankle'),
)

# 481x641
class PoseEngineAction(BaseAction):
    def __init__(self):
        self.engine = PoseEngine('projectposenet/models/mobilenet/posenet_mobilenet_v1_075_481_641_quant_decoder_edgetpu.tflite')
        self.result = None
    
    def evaluate(self, **kwargs):        
        if 'image' in kwargs.keys():
            self.result = {'image':kwargs['image']}
            image = self._get_image(kwargs['image'])
            poses, inference_time = self.engine.DetectPosesInImage(image)
            self.result = {'poses':poses, 'inference_time':inference_time,'image':kwargs['image']}
            return self.result

In [10]:
from PIL import Image, ImageDraw, ImageFont
import random
colors = [(255,0,0,255) ,(0,255,0,255) ,(0,0,255,255),(255,255,0,255) ,(0,255,255,255) ,(255,255,255,255), \
         (125,0,0,255) ,(125,255,0,255) ,(125,0,255,255),(125,255,0,255) ,(125,255,255,255) ,(125,255,255,255)]

class Pose:
    def __init__(self, pose, minScoreKeypoint=-1):
        self.pose = pose
        self.xys = self.get_xys(pose, minScoreKeypoint)
        
    def get_xys(self,pose, min_score=-1):
        xys = {}
        for label, keypoint in pose.keypoints.items():
            score =  float(keypoint.score)
            if score < min_score: continue
            kp_y = int((keypoint.point.y) )
            kp_x = int((keypoint.point.x) )
            xys[label] = (kp_x, kp_y, score)
        return xys            
        
    def addPointsText(self,img,keys,color,r,fontsize):
        for key in keys:
            if key in self.xys:
                x = self.xys[key][0]
                y = self.xys[key][1]
                img.drawPoint( x,y,r,color)
                img.drawText(str(key), x,y,fontsize, color=color)
                
    def addSkeleton(self,img,EDGES,color='red',width=1):
        for a, b in EDGES:
            if a not in self.xys or b not in self.xys: continue
            ax, ay = self.xys[a][0],self.xys[a][1]
            bx, by = self.xys[b][0],self.xys[b][1]
            img.drawLine(ax,ay,bx,by,color=color,width=width)
                
    def printPose(self,mninScorePose=-0.1, minScoreKeypoint=-0.1):
        #if self.pose.score < mninScorePose : continue
        print('\nPose Score: ', self.pose.score)
        for label, keypoint in self.pose.keypoints.items():
            if keypoint.score < minScoreKeypoint : continue
            print(' %-20s x=%-4d y=%-4d score=%.2f' %
                (label, keypoint.yx[1], keypoint.yx[0], keypoint.score))


## DrawPoseAction

In [11]:

                  
class DrawPoseAction(BaseAction):
    def __init__(self):
        self.result = None
    
    def evaluate(self, **kwargs):        
        if 'image' in kwargs.keys() and 'poses' in kwargs.keys() :
            image = kwargs['image']
            poses = kwargs['poses']
            self.drawPosePoints(image,poses,EDGES) 
            self.addSkeletons(image,poses,EDGES)
            self.result = {'image':image}
            return self.result
        else:
            return kwargs                
        
    def drawPoint(self,image,x,y,r,rgba):
        draw = ImageDraw.Draw(image)
        leftUpPoint = (x-r, y-r)
        rightDownPoint = (x+r, y+r)
        twoPointList = [leftUpPoint, rightDownPoint]
        draw.ellipse(twoPointList, fill=rgba)

    def drawPosePoints(self,image,poses,EDGES):    
        for pose in poses:
            #if pose.score < 0.4: continue
            rgba =  random.choice(colors)     
            #print('\nPose Score: ', pose.score)
            xys = {}
            minScoreKeypoint = 0.0
            for label, keypoint in pose.keypoints.items():
                if keypoint.score < minScoreKeypoint : continue
                #print(keypoint.point)
                #drawPoint(image,keypoint.yx[1], keypoint.yx[0],2,rgba)
                self.drawPoint(image,keypoint.point.x,keypoint.point.y,2,rgba)
                #kp_y = int((keypoint.yx[0] ) )
                #kp_x = int((keypoint.yx[1] ) )
                kp_y = int( keypoint.point.y)
                kp_x = int( keypoint.point.x)

                xys[label] = (kp_x, kp_y)
                #print(' %-20s x=%-4d y=%-4d score=%.1f' %
                #      (label, keypoint.yx[1], keypoint.yx[0], keypoint.score))
            #print(xys)

            for a, b in EDGES:
                if a not in xys or b not in xys: continue
                ax, ay = xys[a]
                bx, by = xys[b]
                #dwg.add(dwg.line(start=(ax, ay), end=(bx, by), stroke=color, stroke_width=2))
                draw = ImageDraw.Draw(image)
                draw.line((ax,ay,bx,by), fill=128, width=3)
                #print (ax,ay,bx,b)    
                
    def addSkeletons(self,image,poses,EDGES):
        for pose in poses:   
            _pose = Pose(pose)
            _pose.addSkeleton(image,EDGES,color='red',width=1)

## ObjectsInterpreterAction

In [12]:
import time

from PIL import Image
from PIL import ImageDraw

from pycoral.adapters import common
from pycoral.adapters import detect
from pycoral.utils.dataset import read_label_file
from pycoral.utils.edgetpu import make_interpreter

class ObjectsInterpreterAction(BaseAction):
    def __init__(self, score_threshold=0.2):
        self.score_threshold = score_threshold 
        self.filter = []
        self.result = None
        self.labels = read_label_file('/home/pi/jupyter/coral/pycoral/test_data/coco_labels.txt')
        self.interpreter = make_interpreter('/home/pi/jupyter/coral/pycoral/test_data/ssd_mobilenet_v2_coco_quant_postprocess_edgetpu.tflite')
        self.interpreter.allocate_tensors()

    def evaluate(self, **kwargs):        
        if 'image' in kwargs.keys():
            self.result = {'image':kwargs['image']}
            image = self._get_image(kwargs['image'])
            
            start = time.perf_counter()
            self.interpreter.invoke()
            inference_time = time.perf_counter() - start
            #_, scale = common.set_resized_input(
            #    self.interpreter, image.size, lambda size: image.resize(size, Image.ANTIALIAS))
            _, scale = common.set_resized_input(self.interpreter, image.size, lambda size: image)
            
            objs = detect.get_objects(self.interpreter,self.score_threshold, scale)
    
            if len(self.filter) > 0: 
                objs = self.filter_objects(objs)
    
            self.result = {'objects':objs, 'inference_time':inference_time,'image':kwargs['image']}
            return self.result
        
    def filter_objects(self,objs):
        objs2 = []
        for obj in objs:
            _label = self.labels.get(obj.id, obj.id) 
            if _label in self.filter:
                objs2.append(obj)
        return objs2        
        
    def input_size(self):
        return common.input_size(self.interpreter)

## DrawObjectsAction

In [13]:
class DrawObjectsAction(BaseAction):
    def __init__(self):
        self.result = None
        self.labels = read_label_file('/home/pi/jupyter/coral/pycoral/test_data/coco_labels.txt')
    
    def evaluate(self, **kwargs):        
        if 'image' in kwargs.keys() and 'objects' in kwargs.keys() :
            image = kwargs['image'].copy()
            objs = kwargs['objects']
            self.draw_objects(ImageDraw.Draw(image), objs, self.labels)        
            #self.result = {'image':image}
            self.result = dict(kwargs)
            self.result['image'] = image
            return self.result
        else:
            return kwargs                
        
    def draw_objects(self,draw, objs, labels):
        for obj in objs:
            bbox = obj.bbox
            draw.rectangle([(bbox.xmin, bbox.ymin), (bbox.xmax, bbox.ymax)],
                           outline='red')
            draw.text((bbox.xmin + 10, bbox.ymin + 10),
                      '%s\n%.2f' % (labels.get(obj.id, obj.id), obj.score),
                      fill='red')

    def get_labels(self,objs): 
        _labels = []
        for obj in objs:
            _label = self.labels.get(obj.id, obj.id)   
            if _label not in _labels:
                _labels.append(_label)
        return _labels
            
    def printData(self,objs):        
        for obj in objs:
            print(labels.get(obj.id, obj.id))
            print('  id:    ', obj.id)
            print('  score: ', obj.score)
            print('  bbox:  ', obj.bbox)

## SemanticSegmentationAction

In [14]:
#semantic_segmentation.py

import numpy as np
from PIL import Image

from pycoral.adapters import common
from pycoral.adapters import segment
from pycoral.utils.edgetpu import make_interpreter


def create_pascal_label_colormap():
  """Creates a label colormap used in PASCAL VOC segmentation benchmark.

  Returns:
    A Colormap for visualizing segmentation results.
  """
  colormap = np.zeros((256, 3), dtype=int)
  indices = np.arange(256, dtype=int)

  for shift in reversed(range(8)):
    for channel in range(3):
      colormap[:, channel] |= ((indices >> channel) & 1) << shift
    indices >>= 3

  return colormap

def label_to_color_image(label):
  """Adds color defined by the dataset colormap to the label.

  Args:
    label: A 2D array with integer type, storing the segmentation label.

  Returns:
    result: A 2D array with floating type. The element of the array
      is the color indexed by the corresponding element in the input label
      to the PASCAL color map.

  Raises:
    ValueError: If label is not of rank 2 or its value is larger than color
      map maximum entry.
  """
  if label.ndim != 2:
    raise ValueError('Expect 2-D input label')

  colormap = create_pascal_label_colormap()

  if np.max(label) >= len(colormap):
    raise ValueError('label value too large.')

  return colormap[label]


class SemanticSegmentationAction(BaseAction):
    def __init__(self):
        self.result = None
        self.interpreter = make_interpreter('/home/pi/jupyter/coral/pycoral/test_data/deeplabv3_mnv2_pascal_quant_edgetpu.tflite', device=':0')
        self.interpreter.allocate_tensors()
        self.width, self.height = common.input_size(self.interpreter)

    def evaluate(self, **kwargs):        
        if 'image' in kwargs.keys():
            self.result = {'image':kwargs['image']}
            image = self._get_image(kwargs['image'])
            
            keep_aspect_ratio = True
            if keep_aspect_ratio:
                resized_img, _ = common.set_resized_input(
                self.interpreter, image.size, lambda size: image.resize(size, Image.ANTIALIAS))
            else:
                resized_img = image.resize((self.width, self.height), Image.ANTIALIAS)
                common.set_input(self.interpreter, resized_img)

            self.interpreter.invoke()
            result = segment.get_output(self.interpreter)
            if len(result.shape) == 3:
                result = np.argmax(result, axis=-1)

            # If keep_aspect_ratio, we need to remove the padding area.
            new_width, new_height = resized_img.size
            result = result[:new_height, :new_width]
            mask_img = Image.fromarray(label_to_color_image(result).astype(np.uint8))

            # Concat resized input image and processed segmentation results.
            output_img = Image.new('RGB', (2 * new_width, new_height))
            output_img.paste(resized_img, (0, 0))
            output_img.paste(mask_img, (self.width, 0))
  
            self.result = {'image':output_img}
            return self.result

## ObjectsInterpreterWidget

In [15]:
class ObjectsInterpreterWidget(BaseWidget):
    def __init__(self, action=None, imageWidget=None, parent=None):
        self.action = action
        self.parent = parent
        self.result = None
        self.imageWidget = imageWidget
        self.threshold   = widgets.Text(description = 'threshold' , value=str(self.parent.action.score_threshold), style=style, layout=layout2)
        self.out  = widgets.Output()
        self.vbox   = widgets.VBox([self.threshold,self.out])
        self.labels = []    
        self.threshold.on_submit(self.on_value_submit_threshold)    
            
    def evaluate(self, **kwargs):     
        self.result = self.action.evaluate(**kwargs)
        if isinstance(self.result, dict):
             with self.out:
                clear_output()
                _labels = self.action.get_labels(kwargs['objects'])
                self.labels = []  
                for _label in _labels:
                    _checkbox = widgets.Checkbox(description=_label,value=True)
                    _checkbox.observe(self.label_on_change,'value')
                    display(_checkbox)     
                    self.labels.append(_checkbox)
        return self.result
    
    def on_value_submit_threshold(self,change):
        self.parent.action.score_threshold = float(change.value)
        self.evaluate_subchain()
        
    def evaluate_subchain(self):    
        parent_result = self.parent.evaluate(**self.parent.parent.result)
        self.evaluate(**parent_result)
        self.imageWidget.evaluate(self.result['image'])
        
    def label_on_change(self,change):  
        if change['type'] == 'change' and change['name'] == 'value':
            #print(change['name'])
            _labels = []
            for _label in self.labels:
                if _label.value == True:
                    _labels.append(_label.description)
            print(_labels)
            self.parent.action.filter = _labels
            self.evaluate_subchain()
            #K

# Coral Actions 2

## CountPoseAction

In [16]:
class CountPoseAction(BaseAction):
    def __init__(self):
        self.result = None
    
    def evaluate(self, **kwargs):        
        if 'image' in kwargs.keys() and 'poses' in kwargs.keys() :
            image = kwargs['image']
            poses = kwargs['poses']
            self.drawPosePoints(image,poses,EDGES) 
            self.addSkeletons(image,poses,EDGES)
            self.result = {'image':image}
            self.result['count']= len(poses)
            self.result['poses']=poses
            Bboxes=[]
            ScoreRanges=[]
            for pose in poses:
                Bboxes.append(self.calculateBbox(pose))
                ScoreRanges.append(self.calculateScoreRange(pose))
            self.result['bboxes']=Bboxes
            self.result['score_range']=ScoreRanges
            return self.result
        else:
            return kwargs                
        
    def drawPoint(self,image,x,y,r,rgba):
        draw = ImageDraw.Draw(image)
        leftUpPoint = (x-r, y-r)
        rightDownPoint = (x+r, y+r)
        twoPointList = [leftUpPoint, rightDownPoint]
        draw.ellipse(twoPointList, fill=rgba)
        
    def addSkeletons(self,image,poses,EDGES):
        for pose in poses:   
            _pose = Pose(pose)
            _pose.addSkeleton(image,EDGES,color='red',width=1)

    def drawPosePoints(self,image,poses,EDGES):    
        for pose in poses:
            #if pose.score < 0.4: continue
            rgba =  random.choice(colors)     
            #print('\nPose Score: ', pose.score)
            xys = {}
            minScoreKeypoint = 0.0
            for label, keypoint in pose.keypoints.items():
                if keypoint.score < minScoreKeypoint : continue
                #print(keypoint.point)
                #drawPoint(image,keypoint.yx[1], keypoint.yx[0],2,rgba)
                self.drawPoint(image,keypoint.point.x,keypoint.point.y,2,rgba)
                #kp_y = int((keypoint.yx[0] ) )
                #kp_x = int((keypoint.yx[1] ) )
                kp_y = int( keypoint.point.y)
                kp_x = int( keypoint.point.x)

                xys[label] = (kp_x, kp_y)
                #print(' %-20s x=%-4d y=%-4d score=%.1f' %
                #      (label, keypoint.yx[1], keypoint.yx[0], keypoint.score))
            #print(xys)

            for a, b in EDGES:
                if a not in xys or b not in xys: continue
                ax, ay = xys[a]
                bx, by = xys[b]
                #dwg.add(dwg.line(start=(ax, ay), end=(bx, by), stroke=color, stroke_width=2))
                draw = ImageDraw.Draw(image)
                draw.line((ax,ay,bx,by), fill=128, width=3)
                #print (ax,ay,bx,b)     
                
    def calculateBbox(self,pose):

        minimum=[9999999,9999899]
        maximum=[0,0]
        for label, keypoint in pose.keypoints.items():

                print(keypoint.point)
                #drawPoint(image,keypoint.yx[1], keypoint.yx[0],2,rgba)

                #kp_y = int((keypoint.yx[0] ) )
                #kp_x = int((keypoint.yx[1] ) )
                kp_y = int( keypoint.point.y)
                kp_x = int( keypoint.point.x)
                if kp_x<minimum[0]:
                    minimum[0]=kp_x
                if kp_x>maximum[0]:
                    maximum[0]=kp_x
                if kp_y<minimum[1]:
                    minimum[1]=kp_y
                if kp_y>maximum[1]:
                    maximum[1]=kp_y

        return minimum,maximum 
    
    def calculateScoreRange(self,pose):

        minimum=9999999
        maximum=0
        for label, keypoint in pose.keypoints.items():

            
                score = keypoint.score
                
                if score<minimum:
                    minimum=score
                if score>maximum:
                    maximum=score
                

        return minimum,maximum                

## CountObjectsAction

In [17]:
class CountObjectsAction(BaseAction):
    def __init__(self):
        self.result = None
        self.labels = read_label_file('/home/pi/jupyter/coral/pycoral/test_data/coco_labels.txt')
    
    def evaluate1(self, **kwargs):  
        if  'objects' in kwargs.keys() :
            objs = kwargs['objects'] 
            _count=self.count(objs)
            self.result = dict(kwargs)
            self.result['count']=_count
            return self.result
        else:
            return kwargs  
        
        
    def evaluate(self, **kwargs):        
        if 'image' in kwargs.keys() and 'objects' in kwargs.keys() :
            image = kwargs['image'].copy()
            objs = kwargs['objects']
            count=self.count(objs)
            self.draw_objects(ImageDraw.Draw(image), objs, self.labels)        
            #self.result = {'image':image}
            self.result = dict(kwargs)
            self.result['image'] = image
            self.result['count']= count
            return self.result
        else:
            return kwargs     
        
    def count(self,objects):    
        counter=0
        for object in objects:
            if object.id==0:
                counter=counter+1
        return counter


        
    def draw_objects(self,draw, objs, labels):
        for obj in objs:
            bbox = obj.bbox
            draw.rectangle([(bbox.xmin, bbox.ymin), (bbox.xmax, bbox.ymax)],
                           outline='red')
            draw.text((bbox.xmin + 10, bbox.ymin + 10),
                      '%s\n%.2f' % (labels.get(obj.id, obj.id), obj.score),
                      fill='red')

    def get_labels(self,objs): 
        _labels = []
        for obj in objs:
            _label = self.labels.get(obj.id, obj.id)   
            if _label not in _labels:
                _labels.append(_label)
        return _labels
            
    def printData(self,objs):        
        for obj in objs:
            print(labels.get(obj.id, obj.id))
            print('  id:    ', obj.id)
            print('  score: ', obj.score)
            print('  bbox:  ', obj.bbox)

## CountObjectsWidget

In [18]:
layout2={'width': '350px'}

class CountObjectsWidget(BaseWidget):
    def __init__(self, action=None, imageWidget=None, parent=None):
        self.action = action
        self.parent = parent
        self.result = None
        self.count = widgets.Text(description = 'c', style=style, layout=layout2)
        self.imageWidget = imageWidget
        self.vbox   = widgets.VBox([self.count])
            
    def evaluate(self, **kwargs):     
        self.result = self.action.evaluate(**kwargs)
        if  'count' in self.result.keys() :
            self.count.value=str (self.result['count'])
        return self.result