In [None]:
import torch
import os
CURR_DIR = os.path.abspath("")
os.environ['CUDA_VISIBLE_DEVICES'] = '0'
print(torch.cuda.is_available())

In [None]:
from PIL import Image
import numpy as np
from copy import deepcopy
from pathlib import Path
import yaml
import json
import cv2
import argparse
import pandas as pd

from cosypose.datasets.datasets_cfg import make_scene_dataset, make_object_dataset

# Pose estimator
from cosypose.lib3d.rigid_mesh_database import MeshDataBase
from cosypose.training.pose_models_cfg import create_model_refiner, create_model_coarse
from cosypose.training.pose_models_cfg import check_update_config as check_update_config_pose
from cosypose.rendering.bullet_batch_renderer import BulletBatchRenderer
from cosypose.rendering.bullet_batch_renderer import BulletSceneRenderer
from cosypose.integrated.pose_predictor import CoarseRefinePosePredictor
from cosypose.integrated.multiview_predictor import MultiviewScenePredictor
from cosypose.datasets.wrappers.multiview_wrapper import MultiViewWrapper
import cosypose.utils.tensor_collection as tc

# Detection
from cosypose.training.detector_models_cfg import create_model_detector
from cosypose.training.detector_models_cfg import check_update_config as check_update_config_detector
from cosypose.integrated.detector import Detector

from cosypose.evaluation.pred_runner.bop_predictions import BopPredictionRunner

from cosypose.utils.distributed import get_tmp_dir, get_rank
from cosypose.utils.distributed import init_distributed_mode

# Visualization
from  cosypose.visualization.plotter import Plotter
from cosypose.visualization.singleview import render_prediction_wrt_camera
from bokeh.io import export_png
from bokeh.plotting import gridplot
from bokeh.io import output_notebook, show
from bokeh.resources import INLINE
output_notebook(INLINE)

from cosypose.config import EXP_DIR, RESULTS_DIR

torch.backends.cudnn.deterministic = True
torch.backends.cudnn.benchmark = False

def load_detector(run_id):
    run_dir = EXP_DIR / run_id # experiments directory + detector_id
    cfg = yaml.load((run_dir / 'config.yaml').read_text(), Loader=yaml.FullLoader)
    cfg = check_update_config_detector(cfg)
    label_to_category_id = cfg.label_to_category_id
    model = create_model_detector(cfg, len(label_to_category_id))
    ckpt = torch.load(run_dir / 'checkpoint.pth.tar')
    ckpt = ckpt['state_dict']
    model.load_state_dict(ckpt)
    model = model.cuda().eval()
    model.cfg = cfg
    model.config = cfg
    model = Detector(model)
    return model   

def load_pose_models(coarse_run_id, refiner_run_id=None, n_workers=8):
    run_dir = EXP_DIR / coarse_run_id
    cfg = yaml.load((run_dir / 'config.yaml').read_text(), Loader=yaml.FullLoader)
    cfg = check_update_config_pose(cfg)
    #object_ds = BOPObjectDataset(BOP_DS_DIR / 'tless/models_cad')
    # This object stores all obj info from tless/models/cad, with mostly label and mesh path saved for each object
    object_ds = make_object_dataset(cfg.object_ds_name)
    print("Loaded objects from ", object_ds.ds_dir)
    # Switch to an actual database, for each object we save the info and the ACTUAL mesh
    mesh_db = MeshDataBase.from_object_ds(object_ds)
    # Bullet renderer, takes the urdfs of the objects and then I don't know how it is used
    renderer = BulletBatchRenderer(object_set=cfg.urdf_ds_name, n_workers=n_workers)
    # Database converted into tensor and sent to cuda
    mesh_db_batched= mesh_db.batched().cuda()

    def load_model(run_id):
        if run_id is None:
            return
        run_dir = EXP_DIR / run_id
        cfg = yaml.load((run_dir / 'config.yaml').read_text(), Loader=yaml.FullLoader)
        cfg = check_update_config_pose(cfg)
        if cfg.train_refiner:
            model = create_model_refiner(cfg, renderer=renderer, mesh_db=mesh_db_batched)
        else:
            model = create_model_coarse(cfg, renderer=renderer, mesh_db=mesh_db_batched)
        # Load weights 
        ckpt = torch.load(run_dir / 'checkpoint.pth.tar')
        ckpt = ckpt['state_dict']
        model.load_state_dict(ckpt)
        model = model.cuda().eval()
        model.cfg = cfg
        model.config = cfg
        return model

    coarse_model = load_model(coarse_run_id)
    refiner_model = load_model(refiner_run_id)
    model = CoarseRefinePosePredictor(coarse_model=coarse_model,
                                      refiner_model=refiner_model)
    return model, mesh_db

def getModel(): 
    #load models
    detector_run_id='detector-bop-ycbv-pbr--970850'
    coarse_run_id='coarse-bop-ycbv-pbr--724183'
    refiner_run_id='refiner-bop-ycbv-pbr--604090'
    detector = load_detector(detector_run_id)
    pose_predictor, mesh_db = load_pose_models(coarse_run_id=coarse_run_id,refiner_run_id=refiner_run_id,n_workers=4)
    return detector, pose_predictor


def inference(detector,pose_predictor,image,camera_k):
    
    images = torch.from_numpy(image).cuda().float().unsqueeze_(0)
    images = images.permute(0, 3, 1, 2) / 255

    cameras_k = torch.from_numpy(camera_k).cuda().float().unsqueeze_(0)
    #2D detector, detection_th=0.1
    print("started detecting object.")
    box_detections = detector.get_detections(images=images, one_instance_per_class=False, 
                    detection_th=0.8,output_masks=False, mask_th=0.9)

    # FAKE DETECTIONS, comment this sec
    # Boxes in 1920x1080 res
    #bboxes =  torch.Tensor([[1570,248,1697,320],[392,680,603,914]]).cuda()
    # Boxes in 1280x720 res
    bboxes =  torch.Tensor([[1046,166,1132,213],[260,453,402,610]]).cuda()
    det_infos = pd.DataFrame({'batch_im_id': [0,0], 'label': ['obj_000030','obj_000031'], 'score': [0.9,0.9]})    
    box_detections = tc.PandasTensorCollection(
            infos=pd.DataFrame(det_infos),
            bboxes=bboxes.float())


    #pose estimation
    if len(box_detections) == 0:
        return None, None
    print("started estimating pose.")
    final_preds, all_preds=pose_predictor.get_predictions(images, cameras_k, detections=box_detections,
                        n_coarse_iterations=1,n_refiner_iterations=4)
    #print("inference successfully.")
   
    return final_preds.cpu(), box_detections

def main(tests):
    # detector --> actual detector, loadedMaskRCNN 
    # pose_predictor --> object: (coarse,refiner) 
    detector,pose_predictor = getModel()
    print("start...........................................")
    
    for test in tests:
        run_test(test, detector, pose_predictor)
        
def run_test(test, detector, pose_predictor):
    img = Image.open(test['path'])
    img = np.array(img)
    if img.shape == (1080,1920,3):
        img = cv2.resize(img, dsize=(1280, 720), interpolation=cv2.INTER_CUBIC)
    camera_k = test['camera']
    
    #predict
    pred, detections = inference(detector,pose_predictor,img,camera_k)
    if pred is None:
        return 0
    print("num of pred:",len(pred))
    for i in range(len(pred)):
        print("object ",i,":",pred.infos.iloc[i].label,"\n  detection score:",pred.infos.iloc[i].score) #"------\n  pose:",pred.poses[i].numpy(),

    #Plot
    renderer = BulletSceneRenderer()
    plotter = Plotter()
    size = img.shape
    input_dim = (size[1], size[0])
    cam = dict(
        resolution=input_dim,
        K=camera_k,
        TWC=np.eye(4)
    )
    save_dir = CURR_DIR+'/notebooks/visuals/'
    os.makedirs(save_dir, exist_ok=True)
    pred_rendered = render_prediction_wrt_camera(renderer, pred, cam)
    cv2.rectangle(img, (detections.bboxes[0][0].int().item(), detections.bboxes[0][1].int().item()), (detections.bboxes[0][2].int().item(), detections.bboxes[0][3].int().item()), color=(255,0,0), thickness=2)
    cv2.rectangle(img, (detections.bboxes[1][0].int().item(), detections.bboxes[1][1].int().item()), (detections.bboxes[1][2].int().item(), detections.bboxes[1][3].int().item()), color=(255,0,0), thickness=2)
    #cv2.imwrite("si.png",img)
    
    figures = dict()
    figures['input_im'] = plotter.plot_image(img)
    img_det = plotter.plot_image(img)
    figures['detections'] = plotter.plot_maskrcnn_bboxes(img_det, detections)
    figures['pred_rendered'] = plotter.plot_image(pred_rendered)
    figures['pred_overlay'] = plotter.plot_overlay(img, pred_rendered, mask_opacity=0.4)           
    fig_array = [figures['input_im'], figures['detections'], figures['pred_rendered'], figures['pred_overlay']]
    res = gridplot(fig_array, ncols=2)
    print("Test name: "+test['name'])
    show(res)
    #export_png(res, filename = os.path.join(save_dir, test['name']+'.png'))




In [None]:
# Training images settings
camera_0 = np.array([[585.75607,    0,      320.5 ],\
                        [  0,      585.75607,    240.5],\
                        [  0,        0,        1,     ]])

# Our images
with open(CURR_DIR+'/inputs/camera_calibration/rs455_1920.json') as file:
    d = json.load(file)
    camera_1 = np.array(d.get('camera_matrix'))


camozzi ={
    'name' :  'camozzi',
    'camera': camera_1,
    'path': CURR_DIR+"/inputs/camozzi.png"
    }
    
test0 ={
    'name' :  'test',
    'camera': camera_0,
    'path': CURR_DIR+"/inputs/image_test.png"
    }
test1 = {
    'name' : 'ours1',
    'camera': camera_1,
    'path': CURR_DIR+"/inputs/image_rgb.png"
}

test2 = {
    'name' : 'ours2',
    'camera': camera_1,
    'path': CURR_DIR+"/inputs/image_rgb2.png"
}
test3 = {
    'name' : 'ours3',
    'camera': camera_1,
    'path': CURR_DIR+"/inputs/image_rgb3.png"
}

test4 = {
    'name' : 'ours4',
    'camera': camera_1,
    'path': CURR_DIR+"/inputs/image_rgb4.png"
}

test5 = {
    'name' : 'ours5',
    'camera': camera_1,
    'path': CURR_DIR+"/inputs/image_rgb5.png"
}

main([camozzi])#, test1, test2, test3, test4])  