# Mask R-CNN for Pantograph Pose Estimation
---

In [1]:
import os
import sys
import random
import math
import re
import time
import numpy as np
import pandas as pd
import cv2
import matplotlib
import matplotlib.pyplot as plt

# Root directory of the project
ROOT_DIR = os.path.abspath("../")

# Add root to path 
if ROOT_DIR not in sys.path:
    sys.path.append(ROOT_DIR)

# Import Mask RCNN
import mrcnn.model as modellib
from mrcnn import visualize
from mrcnn.model import log
from mrcnn.model import utils

# Import pantogrograph class
from dev import pantograph

# Directory to save logs and trained model
MODEL_DIR = os.path.join(ROOT_DIR, "models")

# Set path to root of images. 
DATA_DIR = os.path.join(ROOT_DIR, "datasets/pantograph")

print("Using Root dir:",ROOT_DIR)
print("Using Model dir:",MODEL_DIR)
print("Using Data dir:",DATA_DIR)

Using TensorFlow backend.


I've been imported
Using Root dir: /home/jupyter/GCP_Test
Using Model dir: /home/jupyter/GCP_Test/models
Using Data dir: /home/jupyter/GCP_Test/datasets/pantograph


In [2]:
# Compute Shortest path pixel distance between actual/pred
def ComputeEuclDistance(test_row,pred_row):
    return round(math.sqrt((test_row[0]-pred_row[0])**2+(test_row[1]-pred_row[1])**2), 2)
#     return round(math.sqrt((test_row['X']-pred_row['X'])**2+(test_row['Y']-pred_row['Y'])**2), 2)

# Compute absolute pixel distance between actual/pred
def ComputeABSDistance(test_row,pred_row):
    return abs(test_row['X']-pred_row['X'])+abs(test_row['Y']-pred_row['Y'])

In [3]:
import json
# Write json to file
def WriteJSON(obj,filename):
    try:
        with open(filename, 'w') as outfile:
#             obj_json = json.dumps(obj, sort_keys=True, indent=4,default=str)
            obj_json = json.dumps(obj, cls=NumpyArrayEncoder)
            outfile.write(obj_json)
    except Exception as e:
        print(e)
        print('File not written.')

# Read and return json object from file. If none, return empty object.
def ReadJSON(filename):
    try: 
        with open(filename) as f:
            obj = json.loads(f.read())
    except Exception as e: 
        obj = [] 
    return obj

In [20]:
'''

'''

def CreatePredTable(obj,image_ids):
    cat_names = {
            0:'FB',
            1:'MB',
            2:'RB'
        }

    pred_results = []
    tmp = [] 
    for image_id in image_ids:
        for i in obj['annotations']:
            if image_id == i['id']:
                kp = np.array(i['keypoints'])
                tmp.append(kp)
#         print(kp.shape)

        kp = np.stack(tmp) 
#     print(kp.shape)

        pred_results = CreateTable(i['image_id'],kp,'pred')

    pred = CreateResultFrame(pred_results)
    return pred

def CreateTable(image_id,keypoints,t_type):
    kp_names = ['L1','L2','L3','R3','R2','R1']

    cat_names = {
        0:'FB',
        1:'MB',
        2:'RB'
    }

    coll = []
    print(keypoints.shape[0])
    for i in range(0,keypoints.shape[0]):
    #     print('i:',i)
        for j in range(0,keypoints.shape[1]):
    #         print('\t j:',j)
            x,y = list(keypoints[i][j])[:2]

            tmp = {
                'image_id':image_id,
                'class':cat_names[i],
                'kp_name':kp_names[j],
                'X':x,
                'Y':y,
                'type':t_type
            }
            coll.append(tmp)
            
    return coll



def CreateResultTable(image_id,keypoints,r):
    kp_names = ['L1','L2','L3','R3','R2','R1']

    cat_names = {
        0:'FB',
        1:'MB',
        2:'RB'
    }

    coll = []
    for i in range(0,keypoints.shape[0]):
    #     print('i:',i)
        for j in range(0,keypoints.shape[1]):
    #         print('\t j:',j)
            x,y = list(keypoints[i][j])[:2]
            px,py = list(r['keypoints'][i][j])[:2]

            pixelDistance = ComputeEuclDistance(list(keypoints[i][j])[:2],list(r['keypoints'][i][j])[:2])

            tmp = {
                'image_id':image_id,
                'class':cat_names[i],
                'kp_name':kp_names[j],
                'Test_X':x,
                'Test_Y':y,
                'Pred_X':px,
                'Pred_Y':py,
                'PD':pixelDistance
            }
            coll.append(tmp)
            
    return coll


def CreateResultFrame(resultSet):
    df = pd.DataFrame(resultSet)
    cols = ['image_id','class','kp_name', 'X', 'Y','type']
    df = df[cols]
    
    return df

In [5]:
class NumpyArrayEncoder(json.JSONEncoder):
    def default(self, obj):
        if isinstance(obj, np.integer):
            return int(obj)
        elif isinstance(obj, np.floating):
            return float(obj)
        elif isinstance(obj, np.ndarray):
            return obj.tolist()
        else:
            return super(NumpyArrayEncoder, self).default(obj)

## Configurations

Run one of the code blocks below to import and load the configurations to use.

In [6]:
class InferenceConfig(pantograph.PantographConfig):
    GPU_COUNT = 1
    IMAGES_PER_GPU = 1
    KEYPOINT_MASK_POOL_SIZE = 7

inference_config = InferenceConfig()

# Recreate the model in inference mode
model = modellib.MaskRCNN(mode="inference", 
                          config=inference_config,
                          model_dir=MODEL_DIR)


# Set local path to trained weights file
LOG_DIR = os.path.join(MODEL_DIR, "pantograph20200226T1541")
MODEL_PATH = os.path.join(LOG_DIR, "mask_rcnn_pantograph_0020.h5")

# MODEL_PATH = os.path.join(MODEL_DIR, "mask_rcnn_pantograph_0016.h5")

# Load trained weights
model.load_weights(MODEL_PATH, by_name=True)

Instructions for updating:
If using Keras pass *_constraint arguments to layers.

Instructions for updating:
Use tf.where in 2.0, which has the same broadcast rule as np.where
Instructions for updating:
box_ind is deprecated, use box_indices instead

Instructions for updating:
Use `tf.cast` instead.


Instructions for updating:
Use `tf.cast` instead.


## Dataset

In [7]:
# Load dataset
dataset = pantograph.PantographDataset()
dataset.load_pantograph(DATA_DIR, "test")#test

# Must call before using the dataset
dataset.prepare()

print("Image Count: {}".format(len(dataset.image_ids)))
print("Class Count: {}".format(dataset.num_classes))

for i, info in enumerate(dataset.class_info):
    print("{:3}. {:50}".format(i, info['name']))

loading annotations into memory...
Done (t=0.01s)
creating index...
index created!
Skeleton: (5, 2)
Keypoint names: (6,)
Image Count: 3
Class Count: 4
  0. BG                                                
  1. front_bar                                         
  2. middle_bar                                        
  3. rear_bar                                          


In [None]:
'''
Ground Truth Test Image
'''

# Load random image and mask.
# image_id = random.choice(dataset.image_ids)
image_id = 2

# Display image and additional stats
print("image_id ", image_id, dataset.image_reference(image_id))

image, image_meta, class_ids, bbox, masks, keypoints =\
    modellib.load_image_gt_keypoints(dataset, inference_config, 
                           image_id, augment=False,use_mini_mask=True) #inference_config.USE_MINI_MASK

# How to ensure alignment/where to get classnames, colors, etc??
class_names = [dataset.class_names[1:][i-1] for i in class_ids]
 
if masks.shape[0] < 1024:
    masks = utils.expand_mask(bbox, masks, (1024,1024))

visualize.DrawAnnotations(image,class_ids,class_names,bbox=bbox,masks=masks,keypoints=keypoints,skeleton=dataset.skeleton,figsize=[12,12])

### Multi-Image Detection

In [8]:
'''
Run detection on multiple images. Results saved to JSON.
'''

def PredictMultiple(image_ids,RESULTS_FILE = '../datasets/pantograph/test/pred_region_data.json'):
    
    # Container for results
    for image_id in image_ids:

        # Load image 
        image, image_meta, class_ids, bbox, masks, keypoints =\
        modellib.load_image_gt_keypoints(dataset, inference_config, 
                               image_id, augment=False,use_mini_mask=True)

        # Save each result
        results = model.detect_keypoint([image], verbose=0)
        r = results[0] # for one image
        
        # open pred json
        PRED_FILE = DATA_DIR+'/test/prediction_data.json'
        obj = ReadJSON(PRED_FILE)
        
        # Find image_id in images
        ids = [i['id'] for i in obj['images']]
        if image_id in ids:
            print('Updating image information')
            # Find image_id in annotations
            obj['images'][image_id] = {
                'file_name':dataset.image_reference(image_id).split("/")[-1],
                'height':image.shape[1],
                'id':image_id,
                'num_annotations':len(r['class_ids'].tolist()),
                'path':dataset.image_reference(image_id),
                'width':image.shape[0]
            }
            WriteJSON(obj,PRED_FILE)
            
            for anno in obj['annotations']:
                PRED_FILE = DATA_DIR+'/test/prediction_data.json'
                obj = ReadJSON(PRED_FILE)
                for cat_id in r['class_ids']:
                    if image_id == anno['image_id'] and cat_id == anno['id']:
                        obj['annotations'][cat_id] = {
                            'area':5463.6864,
                            'category_id':cat_id,
                            'id':cat_id,
                            'image_id':image_id,
                            'iscrowd':0,
                            'num_keypoints':6,
                        }
                        WriteJSON(obj,PRED_FILE)
            
        else:
            print('Writing new image information')
            img_info = {
                'file_name':dataset.image_reference(image_id).split("/")[-1],
                'height':image.shape[1],
                'id':image_id,
                'num_annotations':len(r['class_ids'].tolist()),
                'path':dataset.image_reference(image_id),
                'width':image.shape[0],
            }
            obj['images'].append(img_info)
            WriteJSON(obj,PRED_FILE)
            
            for i in range(len(r['class_ids'].tolist())):
#             for cat_id in r['class_ids']:
                PRED_FILE = DATA_DIR+'/test/prediction_data.json'
                obj = ReadJSON(PRED_FILE)
                anno_info = {
                    'area':5463.6864,
                    'category_id':r['class_ids'].tolist()[i],
                    'id':len(obj['annotations']),
                    'image_id':image_id,
                    'iscrowd':0,
                    'num_keypoints':6,
                    'bbox':r['rois'][i],
                    'segmentation':r['masks'][i],
                    'keypoints':r['keypoints'][i]
                    
                }
                obj['annotations'].append(anno_info)
                WriteJSON(obj,PRED_FILE)

In [9]:
'''
Call to run detection on multiple images
'''

image_ids = dataset.image_ids.tolist() #[2:3]
image_ids
PredictMultiple(image_ids)

Starting with mask shape: (1024, 1024, 3)
(1024, 1024)
(1024, 1024)
(1024, 1024)
Molding Inputs
(1, 1024, 1024, 3)
Starting Detection

Writing new image information
Starting with mask shape: (1024, 1024, 3)
(1024, 1024)
(1024, 1024)
(1024, 1024)
Molding Inputs
(1, 1024, 1024, 3)
Starting Detection
Writing new image information
Starting with mask shape: (1024, 1024, 3)
(1024, 1024)
(1024, 1024)
(1024, 1024)
Molding Inputs
(1, 1024, 1024, 3)
Starting Detection
Writing new image information


In [None]:
image_ids

In [10]:
# Create Test DF
gt_results = []
for image_id in image_ids:
    
    # Load annotations from file
    image, image_meta, class_ids, bbox, masks, keypoints =\
    modellib.load_image_gt_keypoints(dataset, inference_config, 
                           image_id, augment=False,use_mini_mask=True)
    
    # Get single list 
    gt_results += CreateTable(image_id,keypoints,'test')
    
test = CreateResultFrame(gt_results)
test

Starting with mask shape: (1024, 1024, 3)
(1024, 1024)
(1024, 1024)
(1024, 1024)
Starting with mask shape: (1024, 1024, 3)
(1024, 1024)
(1024, 1024)
(1024, 1024)
Starting with mask shape: (1024, 1024, 3)
(1024, 1024)
(1024, 1024)
(1024, 1024)


Unnamed: 0,image_id,class,kp_name,X,Y,type
0,0,FB,L1,205,759,test
1,0,FB,L2,291,708,test
2,0,FB,L3,325,701,test
3,0,FB,R3,747,685,test
4,0,FB,R2,789,686,test
5,0,FB,R1,866,731,test
6,0,MB,L1,0,224,test
7,0,MB,L2,254,738,test
8,0,MB,L3,314,735,test
9,0,MB,R3,763,724,test


In [22]:
# Create pred df
PRED_FILE = DATA_DIR+'/test/prediction_data.json'
obj = ReadJSON(PRED_FILE)

len(obj)

pred = CreatePredTable(obj,image_ids)
pred

1
2
3


Unnamed: 0,image_id,class,kp_name,X,Y,type
0,2,FB,L1,162,795,pred
1,2,FB,L2,216,745,pred
2,2,FB,L3,324,740,pred
3,2,FB,R3,757,729,pred
4,2,FB,R2,811,729,pred
5,2,FB,R1,865,795,pred
6,2,MB,L1,188,755,pred
7,2,MB,L2,283,715,pred
8,2,MB,L3,330,710,pred
9,2,MB,R3,756,687,pred


In [23]:
'''
Merge test & pred. Compute PD for each KP.
'''

data = test.merge(pred,on=['image_id','class','kp_name'])

cols = ['image_id', 'class', 'kp_name', 'X_x', 'Y_x', 'type_x', 'X_y', 'Y_y',
       'type_y']

newCols = ['image_id', 'class', 'kp_name', 'test_X', 'test_Y', 'type_x', 'pred_X', 'pred_Y',
       'type_y']

data = data.rename(columns=dict(zip(cols,newCols)))

cols = ['image_id', 'class', 'kp_name', 'test_X', 'test_Y',  'pred_X', 'pred_Y']
data = data[cols]

data.reset_index(drop=True)

data['PD'] = -1.0
rows = data.index.tolist()
for i in range(len(rows)):
    row = data.iloc[i]
    pd = ComputeEuclDistance([row['test_X'],row['test_Y']],[row['pred_X'],row['pred_Y']])
    data.at[i,'PD'] = pd
    
data.head()

Unnamed: 0,image_id,class,kp_name,test_X,test_Y,pred_X,pred_Y,PD
0,2,FB,L1,238,580,162,795,228.04
1,2,FB,L2,317,535,216,745,233.03
2,2,FB,L3,349,530,324,740,211.48
3,2,FB,R3,715,514,757,729,219.06
4,2,FB,R2,753,516,811,729,220.76


In [None]:
image_id = random.choice(dataset.image_ids)
info = dataset.image_info[image_id]
dataset.image_reference(0)

### Evaluation

In [None]:
data.head()

In [None]:
data.PD.describe()

In [None]:
len(data[data['PD'] < 20])

In [None]:
import seaborn as sns
# Distribution of pixel distance for all classes
plt.figure(figsize=(8, 6))
sns.distplot(data["PD"],kde=False)

In [None]:
# PCT match by class

data['class'].value_counts()/test['class'].value_counts()

In [None]:
# Distribution of pixel distance by class

plt.figure(figsize=(16, 6))
ax = sns.boxplot(x="class", y="PD", data=data)
ax = sns.swarmplot(x="class", y="PD", data=data, color=".25")

In [None]:
'''
PCT classifications under X pixel away from labeled position
'''

print("PD<= \t Count\t PCGT")
d = []
c = []
p = []
for i in [20,10,5,1]:
    d.append(i)
    c.append(data['PD'].map(lambda x: 1 if x<=i else 0).sum())
    p.append(data['PD'].map(lambda x: 1 if x<=i else 0).sum()/len(data)*100)
    print(i,'\t',data['PD'].map(lambda x: 1 if x<=i else 0).sum(),'\t',":",data['PD'].map(lambda x: 1 if x<=i else 0).sum()/len(data)*100)

In [None]:
data.head()

In [None]:
'''
Draw labeled and predicted corner points
'''
from PIL import Image
def DrawCorners(data,imgPath,imgID):
#     print(path)

    colors = {
        'left_top_head':'red',
        'right_top_head':'blue',
        'left_mid_head':'green',
        'right_mid_head':'yellow',
        'left_bottom_head':'black',
        'right_bottom_head':'purple',   
    }
    
    figsize = [25,25]
    fig = plt.figure(figsize=figsize)
    ax = fig.add_subplot(111)
    img = Image.open(imgPath)
    
    rows = data.index.tolist()
    for i in rows[:]:
        row = data.iloc[i]
        if row['image_id'] == imgID:
#             if row['type'] == 'test':
            circle1 = plt.Circle((row['test_X'], row['test_Y']), 5.0, color='red')#colors[row['class']]
            ax.add_artist(circle1)
#             else:
            circle2 = plt.Rectangle((row['pred_X'], row['pred_Y']),8.0, 8.0, color='blue')
            ax.add_artist(circle2)
        

    ax.imshow(img)

In [None]:
DrawCorners(data,obj['images'][0]['path'],obj['images'][0]['id'])

### Single Image Detection

In [None]:
'''
Ground Truth Test Image
'''

# Load random image and mask.
# image_id = random.choice(dataset.image_ids)

# Display image and additional stats
print("image_id ", image_id, dataset.image_reference(image_id))

image, image_meta, class_ids, bbox, masks, keypoints =\
    modellib.load_image_gt_keypoints(dataset, inference_config, 
                           image_id, augment=False,use_mini_mask=True) #inference_config.USE_MINI_MASK

# How to ensure alignment/where to get classnames, colors, etc??
class_names = [dataset.class_names[1:][i-1] for i in class_ids]
 
if masks.shape[0] < 1024:
    masks = utils.expand_mask(bbox, masks, (1024,1024))

visualize.DrawAnnotations(image,class_ids,class_names,bbox=bbox,masks=masks,keypoints=keypoints,skeleton=dataset.skeleton,figsize=[12,12])

In [None]:
'''
Run Detection
'''

# Using image_id from above

# Reload to be sure no changes
info = dataset.image_info[image_id]
print("image ID: {}.{} ({}) {}".format(info["source"], info["id"], image_id, 
                                       dataset.image_reference(image_id)))

# Run detection and get results
results = model.detect_keypoint([image], verbose=1)
r = results[0] # for one image

# Get class names
class_names = [dataset.class_names[1:][i-1] for i in class_ids]

# Expand masks if needed
if masks.shape[0] < 1024:
    masks = utils.expand_mask(r['rois'], r['masks'], (1024,1024))
else:
    masks = r['masks']
    
# Draw vis
visualize.DrawAnnotations(image, 
                          r['class_ids'],
                          class_names=class_names,
                          bbox=r['rois'],
                          masks=masks,
                          keypoints=r['keypoints'],
                          skeleton=dataset.skeleton,
                          scores=r['scores'],
                          figsize=[12,12])

In [None]:
masks[:, :,2]

In [None]:
# Draw precision-recall curve
AP, precisions, recalls, overlaps = utils.compute_ap(bbox, class_ids, masks,
                                          r['rois'], r['class_ids'], r['scores'],r['masks'])
visualize.plot_precision_recall(AP, precisions, recalls)

In [None]:
# Generate RPN trainig targets
# target_rpn_match is 1 for positive anchors, -1 for negative anchors
# and 0 for neutral anchors.
target_rpn_match, target_rpn_bbox = modellib.build_rpn_targets(image.shape, model.anchors, class_ids, bbox, model.config)
log("target_rpn_match", target_rpn_match)
log("target_rpn_bbox", target_rpn_bbox)

positive_anchor_ix = np.where(target_rpn_match[:] == 1)[0]
negative_anchor_ix = np.where(target_rpn_match[:] == -1)[0]
neutral_anchor_ix = np.where(target_rpn_match[:] == 0)[0]
positive_anchors = model.anchors[positive_anchor_ix]
negative_anchors = model.anchors[negative_anchor_ix]
neutral_anchors = model.anchors[neutral_anchor_ix]
log("positive_anchors", positive_anchors)
log("negative_anchors", negative_anchors)
log("neutral anchors", neutral_anchors)

# Apply refinement deltas to positive anchors
refined_anchors = utils.apply_box_deltas(
    positive_anchors,
    target_rpn_bbox[:positive_anchors.shape[0]] * model.config.RPN_BBOX_STD_DEV)
log("refined_anchors", refined_anchors, )


In [None]:
# Display positive anchors before refinement (dotted) and
# after refinement (solid).
visualize.draw_boxes(image, boxes=positive_anchors, refined_boxes=refined_anchors, ax=get_ax())

In [None]:
# Run RPN sub-graph
pillar = model.keras_model.get_layer("ROI").output  # node to start searching from

# TF 1.4 introduces a new version of NMS. Search for both names to support TF 1.3 and 1.4
nms_node = model.ancestor(pillar, "ROI/rpn_non_max_suppression:0")
if nms_node is None:
    nms_node = model.ancestor(pillar, "ROI/rpn_non_max_suppression/NonMaxSuppressionV2:0")

rpn = model.run_graph([image], [
    ("rpn_class", model.keras_model.get_layer("rpn_class").output),
    ("pre_nms_anchors", model.ancestor(pillar, "ROI/pre_nms_anchors:0")),
    ("refined_anchors", model.ancestor(pillar, "ROI/refined_anchors:0")),
    ("refined_anchors_clipped", model.ancestor(pillar, "ROI/refined_anchors_clipped:0")),
    ("post_nms_anchor_ix", nms_node),
    ("proposals", model.keras_model.get_layer("ROI").output),
])

In [None]:
# Show top anchors by score (before refinement)
limit = 100
sorted_anchor_ids = np.argsort(rpn['rpn_class'][:,:,1].flatten())[::-1]
visualize.draw_boxes(image, boxes=model.anchors[sorted_anchor_ids[:limit]], ax=get_ax())

In [None]:
# Get input and output to classifier and mask heads.
mrcnn = model.run_graph([image], [
    ("proposals", model.keras_model.get_layer("ROI").output),
    ("probs", model.keras_model.get_layer("mrcnn_class").output),
    ("deltas", model.keras_model.get_layer("mrcnn_bbox").output),
    ("masks", model.keras_model.get_layer("mrcnn_mask").output),
    ("detections", model.keras_model.get_layer("mrcnn_detection").output),
])

In [None]:
# Get detection class IDs. Trim zero padding.
det_class_ids = mrcnn['detections'][0, :, 4].astype(np.int32)
det_count = np.where(det_class_ids == 0)[0][0]
det_class_ids = det_class_ids[:det_count]
detections = mrcnn['detections'][0, :det_count]

print("{} detections: {}".format(
    det_count, np.array(dataset.class_names)[det_class_ids]))

captions = ["{} {:.3f}".format(dataset.class_names[int(c)], s) if c > 0 else ""
            for c, s in zip(detections[:, 4], detections[:, 5])]
visualize.draw_boxes(
    image, 
    refined_boxes=detections[:, :4],
    visibilities=[2] * len(detections),
    captions=captions, title="Detections",
    ax=get_ax())

In [None]:
display_images(np.transpose(gt_mask, [2, 0, 1]), cmap="Blues")

In [None]:
# Get predictions of mask head
mrcnn = model.run_graph([image], [
    ("detections", model.keras_model.get_layer("mrcnn_detection").output),
    ("masks", model.keras_model.get_layer("mrcnn_mask").output),
])

# Get detection class IDs. Trim zero padding.
det_class_ids = mrcnn['detections'][0, :, 4].astype(np.int32)
det_count = np.where(det_class_ids == 0)[0][0]
det_class_ids = det_class_ids[:det_count]

print("{} detections: {}".format(
    det_count, np.array(dataset.class_names)[det_class_ids]))

In [None]:
display_images(det_masks[:4] * 255, cmap="Blues", interpolation="none")

In [None]:
# Get activations of a few sample layers
activations = model.run_graph([image], [
    ("input_image",        model.keras_model.get_layer("input_image").output),
    ("res4w_out",          model.keras_model.get_layer("res4w_out").output),  # for resnet100
    ("rpn_bbox",           model.keras_model.get_layer("rpn_bbox").output),
    ("roi",                model.keras_model.get_layer("ROI").output),
])

In [None]:
# Input image (normalized)
_ = plt.imshow(modellib.unmold_image(activations["input_image"][0],config))