Skip to content

Commit

Permalink
predict: fix text skews
Browse files Browse the repository at this point in the history
  • Loading branch information
koush committed Apr 23, 2024
1 parent ebe1953 commit ff2d1d5
Show file tree
Hide file tree
Showing 7 changed files with 205 additions and 24 deletions.
6 changes: 6 additions & 0 deletions plugins/coreml/src/coreml/text_recognition.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,3 +37,9 @@ def predictDetectModel(self, input):
out_dict = model.predict({inputName: input})
results = list(out_dict.values())[0]
return results

def predictTextModel(self, input):
model, inputName = self.textModel
out_dict = model.predict({inputName: input})
preds = out_dict["linear_2"]
return preds
41 changes: 33 additions & 8 deletions plugins/openvino/src/common/text.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,17 @@
from common.colors import ensureRGBData
import math

async def crop_text(d: ObjectDetectionResult, image: scrypted_sdk.Image, width: int, height: int):
def skew_image(image: Image, skew_angle_rad: float):
skew_matrix = [1, 0, 0, skew_angle_rad, 1, 0]

# Apply the transformation
skewed_image = image.transform(
image.size, Image.AFFINE, skew_matrix, resample=Image.BICUBIC
)

return skewed_image

async def crop_text(d: ObjectDetectionResult, image: scrypted_sdk.Image):
l, t, w, h = d["boundingBox"]
l = math.floor(l)
t = math.floor(t)
Expand All @@ -27,14 +37,30 @@ async def crop_text(d: ObjectDetectionResult, image: scrypted_sdk.Image, width:
}
)
pilImage = await ensureRGBData(cropped, (w, h), format)
resized = pilImage.resize((width, height), resample=Image.LANCZOS).convert("L")
pilImage.close()
return resized
return pilImage

def calculate_y_change(original_height, skew_angle_radians):
# Calculate the change in y-position
y_change = original_height * math.tan(skew_angle_radians)

return y_change

async def prepare_text_result(d: ObjectDetectionResult, image: scrypted_sdk.Image, skew_angle: float):
textImage = await crop_text(d, image)

skew_height_change = calculate_y_change(d["boundingBox"][3], skew_angle)
skew_height_change = math.floor(skew_height_change)
textImage = skew_image(textImage, skew_angle)
# crop skew_height_change from top
if skew_height_change > 0:
textImage = textImage.crop((0, 0, textImage.width, textImage.height - skew_height_change))
elif skew_height_change < 0:
textImage = textImage.crop((0, -skew_height_change, textImage.width, textImage.height))

async def prepare_text_result(d: ObjectDetectionResult, image: scrypted_sdk.Image):
new_height = 64
new_width = int(d["boundingBox"][2] * new_height / d["boundingBox"][3])
textImage = await crop_text(d, image, new_width, new_height)
new_width = int(textImage.width * new_height / textImage.height)
textImage = textImage.resize((new_width, new_height), resample=Image.LANCZOS).convert("L")

new_width = 256
# calculate padding dimensions
padding = (0, 0, new_width - textImage.width, 0)
Expand All @@ -50,7 +76,6 @@ async def prepare_text_result(d: ObjectDetectionResult, image: scrypted_sdk.Imag
# test normalize contrast
# image_tensor = (image_tensor - np.min(image_tensor)) / (np.max(image_tensor) - np.min(image_tensor))


image_tensor = (image_tensor - 0.5) / 0.5

image_tensor = np.expand_dims(image_tensor, axis=0)
Expand Down
10 changes: 10 additions & 0 deletions plugins/openvino/src/ov/text_recognition.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from __future__ import annotations

import openvino.runtime as ov
import numpy as np

from predict.text_recognize import TextRecognition

Expand Down Expand Up @@ -34,3 +35,12 @@ def predictDetectModel(self, input):
infer_request.start_async()
infer_request.wait()
return infer_request.output_tensors[0].data

def predictTextModel(self, input):
input = input.astype(np.float32)
im = ov.Tensor(array=input)
infer_request = self.textModel.create_infer_request()
infer_request.set_input_tensor(im)
infer_request.start_async()
infer_request.wait()
return infer_request.output_tensors[0].data
4 changes: 2 additions & 2 deletions plugins/tensorflow-lite/src/predict/recognize.py
Original file line number Diff line number Diff line change
Expand Up @@ -202,8 +202,8 @@ async def run_detection_image(
for d in ret["detections"]:
if d["className"] == "face":
futures.append(asyncio.ensure_future(self.setEmbedding(d, image)))
elif d["className"] == "plate":
futures.append(asyncio.ensure_future(self.setLabel(d, image)))
# elif d["className"] == "plate":
# futures.append(asyncio.ensure_future(self.setLabel(d, image)))
# elif d['className'] == 'text':
# futures.append(asyncio.ensure_future(self.setLabel(d, image)))

Expand Down
86 changes: 74 additions & 12 deletions plugins/tensorflow-lite/src/predict/text_recognize.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,26 @@

import asyncio
import concurrent.futures
import traceback
from asyncio import Future
from typing import Any, List, Tuple

import numpy as np
import scrypted_sdk
from PIL import Image
from scrypted_sdk import ObjectDetectionResult, ObjectDetectionSession, ObjectsDetected

from common.text import prepare_text_result, process_text_result
from predict import Prediction, PredictPlugin
from predict.craft_utils import normalizeMeanVariance
from predict.rectangle import Rectangle

from .craft_utils import adjustResultCoordinates, getDetBoxes
from predict.text_skew import find_adjacent_groups

predictExecutor = concurrent.futures.ThreadPoolExecutor(1, "TextDetect")


class TextRecognition(PredictPlugin):
def __init__(self, nativeId: str | None = None):
super().__init__(nativeId=nativeId)
Expand All @@ -30,15 +36,20 @@ def __init__(self, nativeId: str | None = None):
self.minThreshold = 0.1

self.detectModel = self.downloadModel("craft")

self.textModel = self.downloadModel("vgg_english_g2")

def downloadModel(self, model: str):
pass

def predictDetectModel(self, input):
pass

async def detect_once(self, input: Image.Image, settings: Any, src_size, cvss) -> scrypted_sdk.ObjectsDetected:
def predictTextModel(self, input):
pass

async def detect_once(
self, input: Image.Image, settings: Any, src_size, cvss
) -> scrypted_sdk.ObjectsDetected:
image_tensor = normalizeMeanVariance(np.array(input))
# reshape to c w h
image_tensor = image_tensor.transpose([2, 0, 1])
Expand All @@ -51,9 +62,9 @@ async def detect_once(self, input: Image.Image, settings: Any, src_size, cvss) -

estimate_num_chars = False
ratio_h = ratio_w = 1
text_threshold = .7
link_threshold = .7
low_text = .4
text_threshold = 0.4
link_threshold = 0.7
low_text = 0.4
poly = False

boxes_list, polys_list = [], []
Expand All @@ -64,7 +75,14 @@ async def detect_once(self, input: Image.Image, settings: Any, src_size, cvss) -

# Post-processing
boxes, polys, mapper = getDetBoxes(
score_text, score_link, text_threshold, link_threshold, low_text, poly, estimate_num_chars)
score_text,
score_link,
text_threshold,
link_threshold,
low_text,
poly,
estimate_num_chars,
)
if not len(boxes):
continue

Expand All @@ -86,16 +104,60 @@ async def detect_once(self, input: Image.Image, settings: Any, src_size, cvss) -
for boxes in boxes_list:
for box in boxes:
tl, tr, br, bl = box
l = tl[0]
t = tl[1]
r = br[0]
b = br[1]
l = min(tl[0], bl[0])
t = min(tl[1], tr[1])
r = max(tr[0], br[0])
b = max(bl[1], br[1])

pred = Prediction(0, 1, Rectangle(l, t, r, b))
preds.append(pred)

return self.create_detection_result(preds, src_size, cvss)

async def run_detection_image(
self, image: scrypted_sdk.Image, detection_session: ObjectDetectionSession
) -> ObjectsDetected:
ret = await super().run_detection_image(image, detection_session)

detections = ret["detections"]

futures: List[Future] = []

boundingBoxes = [d["boundingBox"] for d in detections]
text_groups = find_adjacent_groups(boundingBoxes)

detections = []
for group in text_groups:
boundingBox = group["union"]
d: ObjectDetectionResult = {
"boundingBox": boundingBox,
"score": 1,
"className": "text",
}
futures.append(asyncio.ensure_future(self.setLabel(d, image, group["skew_angle"])))
detections.append(d)

ret["detections"] = detections

if len(futures):
await asyncio.wait(futures)

return ret

async def setLabel(self, d: ObjectDetectionResult, image: scrypted_sdk.Image, skew_angle: float):
try:

image_tensor = await prepare_text_result(d, image, skew_angle)
preds = await asyncio.get_event_loop().run_in_executor(
predictExecutor,
lambda: self.predictTextModel(image_tensor),
)
d["label"] = process_text_result(preds)

except Exception as e:
traceback.print_exc()
pass

# width, height, channels
def get_input_details(self) -> Tuple[int, int, int]:
return (self.inputwidth, self.inputheight, 3)
Expand All @@ -104,4 +166,4 @@ def get_input_size(self) -> Tuple[float, float]:
return (self.inputwidth, self.inputheight)

def get_input_format(self) -> str:
return "rgb"
return "rgb"
78 changes: 78 additions & 0 deletions plugins/tensorflow-lite/src/predict/text_skew.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
from typing import List, Tuple
import math

BoundingBox = Tuple[int, int, int, int]


def union_boxes(boxes: List[BoundingBox]) -> BoundingBox:
left = min([box[0] for box in boxes])
top = min([box[1] for box in boxes])
right = max([box[0] + box[2] for box in boxes])
bottom = max([box[1] + box[3] for box in boxes])
return left, top, right - left, bottom - top


def are_boxes_adjacent(box1: BoundingBox, box2: BoundingBox):
l1, t1, w1, h1 = box1
l2, t2, w2, h2 = box2

line_slop = 2 / 3
if t1 > t2 + h2 * line_slop or t2 > t1 + h1 * line_slop:
return False

# Calculate the left and right edges of each box
left_edge_box1 = l1
right_edge_box1 = l1 + w1
left_edge_box2 = l2
right_edge_box2 = l2 + w2

# Determine the larger height between the two boxes
larger_height = max(h1, h2)

threshold = larger_height * 2

# Calculate the vertical distance between the boxes
distance = min(
abs(left_edge_box1 - right_edge_box2), abs(left_edge_box2 - right_edge_box1)
)

# Check if the boxes are adjacent along their left or right sides
if distance <= threshold:
return True
else:
return False


def find_adjacent_groups(boxes: List[BoundingBox]) -> List[dict]:
groups = []

# sort boxes left to right
boxes = sorted(boxes, key=lambda box: box[0])

for box in boxes:
added_to_group = False
for group in groups:
for other_box in group["boxes"]:
if are_boxes_adjacent(box, other_box):
group["boxes"].append(box)
added_to_group = True
break
if added_to_group:
break
if not added_to_group:
groups.append({"boxes": [box], "skew_angle": 0})

# Calculate the skew angle of each group
for group in groups:
boxes = group["boxes"]
sum_angle = 0
for i in range(len(boxes) - 1):
x1, y1, w1, h1 = boxes[i]
x2, y2, w2, h2 = boxes[i + 1]
dx = x2 - x1
dy = y2 - y1
sum_angle += math.atan2(dy, dx)
group["skew_angle"] = 0 if not len(boxes) - 1 else sum_angle / (len(boxes) - 1)
group["union"] = union_boxes(boxes)

return groups
4 changes: 2 additions & 2 deletions server/package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit ff2d1d5

Please sign in to comment.