# Processor

In [1]:
import torch
from maskrcnn import MaskrcnnResnet50FPN
import time

num_classes = 3 # Background:0, card_front:1, card_back:2
weight_path = '/home/vinhloiit/Documents/VTCC/id_info_extraction/models/weights/card_extraction/pytorch/2011110823/best_model_31_dice_mAP=0.9705.pt'

model = MaskrcnnResnet50FPN(num_classes=num_classes)

t1 = time.time()
model.load_state_dict(torch.load(weight_path, map_location='cpu')) # Load weight
t2 = time.time()

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu') # Chọn device nếu là GPU thì sẽ chuyển sang GPU

model.to(device)
model.eval()

print(f'Load weight: {t2 - t1}s')

Load weight: 0.11472916603088379s


## 1. preprocess 

In [4]:
import cv2
import torch
image_size = (768, 768)

def preprocess(image):
    sample = cv2.resize(image, dsize=image_size) #Resize ảnh về kích thước đầu vào của mạng
    print('Image shape after resize', sample.shape)
    
    sample = torch.from_numpy(sample).to(torch.float).to(device)  #Chuyển ảnh từ kiểu dữ liệu numpy về torch
    
    # Thêm vào chiều đầu tiên là số lượng ảnh trong 1 batch -> (B, H, W, C)
    # Chuyển (B, H, W, C) -> (B, C, H, W) cho phù hợp với input của mạng 
    samples = sample.unsqueeze(dim=0).permute(0, 3, 1, 2) 
    print('Image shape after unsqueeze and permute', samples.shape)
    samples = (samples - samples.mean()) / samples.std() # normalization
    return image, samples

In [5]:
image = cv2.imread('test_images/input/cmnd.png')
image, samples = preprocess(image)

Image shape after resize (768, 768, 3)
Image shape after unsqueeze and permute torch.Size([1, 3, 768, 768])


## Example for 2 images input

In [6]:
# Input many images

import cv2
import torch
image_size = (768, 768)

def preprocess(images):
    samples = [cv2.resize(image, dsize=image_size) for image in images]
    samples = np.stack(samples, axis=0) # dim = 0: number of images
    samples = torch.from_numpy(samples).to(torch.float).to(device)
    samples = samples.permute(0, 3, 1, 2)
    samples = (samples - samples.mean()) / samples.std()
    return images, samples

In [6]:
image1 = cv2.imread('test_images/input/cmnd.jpg')
image2 = cv2.imread('test_images/input/cmnd.jpg')
images = [image1, image2]

In [7]:
images, samples = preprocess(images)

## Denormalization

In [8]:
def denorm(samples):
    samples = samples.numpy()
    samples = (samples - samples.min(axis=(1, 2, 3), keepdims=True)) / (samples.max(axis=(1, 2, 3), keepdims=True) - samples.min(axis=(1, 2, 3), keepdims=True))
    samples = np.transpose(samples, axes=(0, 2, 3, 1))
    samples = samples * 255
    images = samples.astype(np.uint8)
    return images

images_ = denorm(samples)

for sample in images_:
    cv2.imshow('sample', sample)
    cv2.waitKey()
    cv2.destroyAllWindows()

## 2. process 

In [8]:
def process(samples, image):
    with torch.no_grad(): # Don't calculate backward
        return model(samples), image #Return preds 

## 3. postprocess

In [9]:
binary_threshold = 0.6 # giá trị điểm ảnh < 0.6 ->0; > 0.6 -> 1
contour_area_threshold = 0.03 
vertical_threshold = 20 #Ngưỡng cho số cạnh của convexhull
iou_threshold = 0.8 #Ngưỡng giao nhau của các box

In [10]:
def distance(point1, point2):
    """Distance between 2 points"""
    point1 = np.float64(point1)
    point2 = np.float64(point2)
    return np.linalg.norm(point1 - point2) # point1 - point2: sub of 2 vectors --> calc norm of vector. 

In [11]:
def intersection_point(line1, line2):
    """Find intersection of 2 lines"""
    a1 = line1[1][1] - line1[0][1]
    b1 = line1[0][0] - line1[1][0]
    a2 = line2[1][1] - line2[0][1]
    b2 = line2[0][0] - line2[1][0]
    determinant = a1 * b2 - a2 * b1
    if determinant == 0:
        return None
    c1 = (a1 / determinant) * line1[0][0] + (b1 / determinant) * line1[0][1]
    c2 = (a2 / determinant) * line2[0][0] + (b2 / determinant) * line2[0][1]
    x = b2 * c1 - b1 * c2
    y = a1 * c2 - a2 * c1
    return [int(x), int(y)]

In [12]:
def compute_iou(polyA, polyB):
    """Compute ratio intersaction of 2 polygon"""
    iou = 0.
    polyA = geometry.Polygon(polyA) # Create polygon from list points
    polyB = geometry.Polygon(polyB)
    if polyA.intersects(polyB): # Check polyA intersect polyB?
        iou = polyA.intersection(polyB).area / polyA.union(polyB).area 
    return iou

In [13]:
def order_points(points):
    """
    Sorting for points.
    Args:
        points (list): List 4 points
    Returns:
        [tl, tr, br, bl] (list): top left, top right, bottom right, bottom left
    """
    assert len(points) == 4, 'Length of points must be 4'
    left = sorted(points, key=lambda p: p[0])[:2]
    right = sorted(points, key=lambda p: p[0])[2:]
    tl, bl = sorted(left, key=lambda p: p[1])
    tr, br = sorted(right, key=lambda p: p[1])
    return [tl, tr, br, bl]

In [14]:
def get_convex_hulls(mask, binary_threshold, contour_area_threshold, vertical_threshold):
    """Get all convex hulls in image."""
    convex_hulls = []
    
    binary_image = (mask > binary_threshold).astype(np.uint8) # Convert to binary image
    
    binary_image = cv2.morphologyEx(binary_image, cv2.MORPH_OPEN, np.ones(shape=(5, 5), dtype=np.uint8)) # Remove noise
    
    num_label, label = cv2.connectedComponents(binary_image) # Get all components in mask
    
    for i in range(1, num_label):
        # Find contour of each mask (mask = i), mask = 0 is background
        contours = cv2.findContours((label == i).astype(np.uint8), cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)[-2]
        
        contour = contours[0] # 0: contour, 1: hierachy
        
        #chỉ lấy những mask có chu vi contour đủ lớn -> loại bỏ nhiễu
        if cv2.contourArea(contour) > contour_area_threshold * mask.size: 
        
            epsilon = 0.004 * cv2.arcLength(contour, closed=True) # arcLength: area
            
            approx_contour = cv2.approxPolyDP(contour, epsilon, closed=True)
            
            # approximate contour to reduce number of convex points
            convex_hull = cv2.convexHull(approx_contour)  
            
            for inc in range(5):
                #nếu số lượng điểm của convexhull đã nhỏ hơn số đỉnh (20) thì break
                if convex_hull.shape[0] <= vertical_threshold: 
                    break
                    
                # approximate convex_hull to reduce number of convex points
                epsilon = 0.002 * (1 + inc) * cv2.arcLength(contour, closed=True)
                convex_hull = cv2.approxPolyDP(convex_hull, epsilon, closed=True)

            #Chỉ lấy convexhull có số lượng điểm trong khoảng quy ước trước
            #Vertical_threshold: Phải test trên số lượng lớn ảnh -> chọn ngưỡng phù hợp
            if 4 <= convex_hull.shape[0] <= vertical_threshold: 
                #Vì convexhull trả về dạng nx1x2 -> nx2
                convex_hulls.append(np.squeeze(np.array(convex_hull), axis=1)) 

    return convex_hulls

In [16]:
import numpy as np
image = cv2.imread('test_images/input/mask.png')
mask = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY) / 255.
convex_hulls = get_convex_hulls(mask, binary_threshold, contour_area_threshold, vertical_threshold)

In [19]:
for convex_hull in convex_hulls:
    print(convex_hull, type(convex_hull))
    cv2.polylines(image, [convex_hull], True, (0, 255, 0), 3)
    cv2.imshow('convex', image)
    cv2.waitKey()
    cv2.destroyAllWindows()

[[308  74]
 [562  79]
 [681 606]
 [678 675]
 [338 704]
 [173 698]
 [146 636]
 [ 85 195]
 [ 94 167]] <class 'numpy.ndarray'>


In [24]:
from shapely import geometry
import itertools

def get_enclosed_quadrangles(mask, binary_threshold, contour_area_threshold, vertical_threshold, iou_threshold):
    """Get enclosed quadrangles (list 4 corners)."""
    quadrangles = []
    boundary = geometry.box(-mask.shape[1], -mask.shape[0], 2 * mask.shape[1], 2 * mask.shape[0]) #Boundary of intersection points
    convex_hulls = get_convex_hulls(mask, binary_threshold, contour_area_threshold, vertical_threshold)
    
    for polygon in convex_hulls:
        num_verticals = len(polygon) # number of verticals must be greater or equal 4
        
        quadrangle = None
        
        max_iou = 0
        
        for (x, y, z, t) in itertools.combinations(range(num_verticals), 4): #lệnh combination là kết hợp 4 đỉnh trong tập
            lines = [
                [polygon[x], polygon[(x + 1) % num_verticals]],
                [polygon[y], polygon[(y + 1) % num_verticals]],
                [polygon[z], polygon[(z + 1) % num_verticals]],
                [polygon[t], polygon[(t + 1) % num_verticals]]
            ]
            
            points = []
            for i in range(4):
                point = intersection_point(lines[i], lines[(i + 1) % 4]) # intersection point
                
                #Nếu không có giao điểm, hoặc giao điểm đó đã xét, hoặc nó không thuộc trong phạm vi cho phép thì break
                if (not point) or (point in points) or (not boundary.contains(geometry.Point(point))): 
                    break
                points.append(point)
                
            # Kiểm tra 4 điểm có phải là 1 polygon hay không 
            if len(points) == 4 and geometry.Polygon(order_points(points)).is_valid: 
                candidate_quadrangle = order_points(points) #Sắp xếp 4 đỉnh 
                iou = compute_iou(candidate_quadrangle, polygon) 
                if iou > max_iou and iou > iou_threshold:
                    quadrangle = candidate_quadrangle
                    max_iou = iou

        if quadrangle:
            quadrangles.append(quadrangle)

    return quadrangles

In [25]:
image = cv2.imread('test_images/input/mask.png')
mask = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY) / 255
quadrangles = get_enclosed_quadrangles(mask, binary_threshold, contour_area_threshold, vertical_threshold, iou_threshold)
for quad in quadrangles:
    print(quad, type(quad))
    cv2.polylines(image, [np.array(quad)], True, (0, 255, 0), 2)
    cv2.imshow('quad', image)
    cv2.waitKey()
    cv2.destroyAllWindows()    

[[67, 69], [561, 79], [696, 673], [157, 719]] <class 'list'>


In [26]:
def get_warped_images(image, mask_size, quadrangles):
    warped_images = []
    rh, rw = image.shape[0] / mask_size[0], image.shape[1] / mask_size[1]
    # Chuyển về tọa độ của ảnh gốc 
    warped_locations = np.float32([[[point[0] * rw, point[1] * rh] for point in quad] for quad in quadrangles])

    for quadrangle in warped_locations:
        top_left, top_right, bottom_right, bottom_left = quadrangle

        widthA = distance(bottom_right, bottom_left)
        widthB = distance(top_right, top_left)
        avgWidth = round((widthA + widthB) / 2)

        heightA = distance(top_right, bottom_right)
        heightB = distance(top_left, bottom_left)
        avgHeight = round((heightA + heightB) / 2)

        rectangle = np.float32([[0, 0], [avgWidth - 1, 0], [avgWidth - 1, avgHeight - 1], [0, avgHeight - 1]])

        persp_matrix = cv2.getPerspectiveTransform(quadrangle, rectangle) # Căn đều 4 gốc thành hình chữ nhật 
        warped_image = cv2.warpPerspective(image, persp_matrix, (int(avgWidth), int(avgHeight)))
        warped_images.append(warped_image)

    return warped_images, warped_locations

In [27]:
image = cv2.imread('test_images/input/mask.png')
mask = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
warped_images, warped_locations = get_warped_images(image, mask.shape, quadrangles)

In [28]:
for warped_image in warped_images:
    cv2.imshow('warped image', warped_image)
    cv2.waitKey()
    cv2.destroyAllWindows()

In [23]:
def get_warped_scores(mask, quadrangles):
    scores = []
    for quadrangle in quadrangles:
        # Prediction confidence
        prediction_score = mask[mask.round().nonzero()].sum() / mask[mask.nonzero()].sum()
        prediction_score = prediction_score.item()

        # Postprocessing confidence
        mask = mask.round()
        card = np.zeros_like(mask, dtype=np.uint8)
        card = cv2.fillPoly(card, np.int32([quadrangle]), (255, 255, 255)) / 255

        inter = card * mask
        union = (card + mask) != 0

        postprocess_score = inter.sum(dtype=np.float32) / union.sum(dtype=np.float32)
        postprocess_score = postprocess_score.item()

        score = prediction_score * postprocess_score
        scores.append(score)

    return scores

In [24]:
def card_warper(image, mask):
    quadrangles = get_enclosed_quadrangles(mask, binary_threshold, contour_area_threshold, vertical_threshold, iou_threshold)
    warped_images, warped_locations = get_warped_images(image, mask.shape[:2], quadrangles)
    warped_scores = get_warped_scores(mask, quadrangles)

    return warped_images, warped_locations.tolist(), warped_scores

In [25]:
nms_iou_threshold = 0.4 # Loại bỏ box overlap 
card_area_threshold = 0.1
pred_score_threshold = 0.7 

In [26]:
def postprocess(preds, image):
    pred = preds[0] # pytorch kết quả trả về ở idx 0

    boxes, scores, masks = pred['boxes'], pred['scores'], pred['masks'] 
    # boxes: list các box của các card
    # scores: 
    # masks: mask của từng card

    indices = scores > pred_score_threshold  # Loại những box dưới ngưỡng # Lấy những idx có scores lớn hơn pred_score_threshold
    boxes, scores, masks = boxes[indices], scores[indices], masks[indices] 

    indices = torchvision.ops.nms(boxes, scores, nms_iou_threshold)
    masks = masks[indices]
    masks = masks.squeeze(1).detach().cpu().numpy() #detach: loai bo gradient

    _warped_cards, _warped_scores, _warped_locations = [], [], []
    for mask in masks:
        cards, locations, scores = card_warper(image, mask)
        _warped_cards.extend(cards)
        _warped_scores.extend(scores)
        _warped_locations.extend(locations)

    max_card_area = max([geometry.Polygon(location).area for location in _warped_locations]) if len(_warped_locations) else 0

    warped_cards, warped_scores, warped_locations = [], [], []
    for card, score, location in zip(_warped_cards, _warped_scores, _warped_locations):
        if geometry.Polygon(location).area > card_area_threshold * max_card_area: # Chỉ lấy những card có kích thước đủ lớn
            warped_cards.append(card)
            warped_scores.append(score)
            warped_locations.append(location)

    return image, warped_cards, warped_scores, warped_locations

# Stage

## 2.1 preprocess

In [27]:
def preprocess(image):
    if __debug__:
        assert type(image).__name__ == 'ndarray', 'image must be ndarray.'
        assert len(image.shape) == 3, 'image must be a 3D ndarray.'
        assert image.shape[-1] == 3, 'image must have 3 channels.'
    return image,

## 2.2 process (Processsor)

In [30]:
import torchvision
from shapely import geometry
import numpy as np
import cv2
import itertools

image = cv2.imread('test_images/input/cmnd.png')
image, samples = preprocess(image)
preds, image = process(samples, image)
image, warped_cards, warped_scores, warped_locations = postprocess(preds, image)

Image shape after resize (768, 768, 3)
Image shape after unsqueeze and permute torch.Size([1, 3, 768, 768])


	nonzero(Tensor input, *, Tensor out)
Consider using one of the following signatures instead:
	nonzero(Tensor input, *, bool as_tuple)


In [34]:
warped_cards[0].shape
cv2.imshow('warped_card', warped_cards[0])
cv2.waitKey()
cv2.destroyAllWindows()
cv2.imwrite('warped_card.jpg', warped_cards[0])

True

## 1.3. postprocess