In [8]:
import os
import json
import glob
import shutil
from math import dist
import numpy as np
import matplotlib.pyplot as plt
import pandas as pd
from matplotlib.font_manager import FontProperties
import cv2
try:
    from mmdet.apis import inference_detector, init_detector
    has_mmdet = True
except (ImportError, ModuleNotFoundError):
    has_mmdet = False

In [11]:
class occlusion():
    def __init__(self):
        self.occluded_pth = "../occluded_imgs"
        self.occluded_result = "../occluded_result"
        self.kpt_folder = "../keypoint"
        self.cfg_folder = "../mmpose/configs/body_2d_keypoint/rtmpose/coco"
        self.gt_folder = "../ground_truth"
        self.ear_types = ["free","attached"]
        self.degrees = ['15cm_0mm_0deg', '15cm_25mm_5deg', '15cm_50mm_10deg',
                        '20cm_0mm_0deg', '20cm_25mm_5deg', '20cm_50mm_10deg']
        self.acupoints_name = ["HX6(肛門)", "HX3(尿道)", "HX1(耳中)", "TF4(神門)", "AH9(腰柢椎)", "AH11(胸椎)", "CO5(十二指腸)", 
                               "CO4(胃)", "CO3(賁門)", "CO14(肺)", "CO15(心)", "CO18(內分泌)", "HX2(輪2)", "AT2(緣中)", 
                               "LO8(扁桃體)", "LO1(牙)" , "LO2(舌)", "LO3(頷)", "LO4(垂前)", "LO5(眼)", "LO6(內耳)"]
        self.cm = {"cather":4.72, "daniel":4.74, "dominic":3.9, "jack":3.91, "jakaria":4.20, "jimmy":4.60, "vicky":4.69, "wayne":4.75,
                   "gaby":4.05, "matt":5.0, "toby":4.26, "wendy":4.14, "willy":4.51}
    
    def read_csv(self, fpath1):
        df = pd.read_csv(fpath1, index_col = 0)
        return df
        
    def generate_occluded_img(self):

        for ear_type in self.ear_types:
            names = os.listdir(os.path.join(self.kpt_folder, ear_type, "5_mmdet_bboxes_json"))
            for name in names:
                for deg in self.degrees:
                    bboxes_json = os.path.join(self.kpt_folder, ear_type, "5_mmdet_bboxes_json", name, deg, "bbox.json")

                    with open(bboxes_json,"r") as bboxes_file:
                        bboxes = json.load(bboxes_file)
                    
                    gt = os.path.join(self.gt_folder, ear_type, name, deg, "gt.csv")
                    gt_df = self.read_csv(gt)
                    
                    interval = len(bboxes) / 3
                    indexes = [0, int(interval), int(interval*2), len(bboxes)-1]
                    
                    
                    

                    for index in indexes:
                        img = cv2.imread(os.path.join(self.kpt_folder, ear_type, "4_return_to_ori_size", name, deg, "{frame}.png".format(frame = index)))
                        x, y, w, h = bboxes[str(index)]

                        occluded_img_path = os.path.join(self.occluded_pth, ear_type, name, deg, str(index))
                        
                        
                        scale0_x = float(gt_df.iloc[int(index),0].split(",")[0])
                        scale0_y = float(gt_df.iloc[int(index),0].split(",")[1])

                        scale11_x = float(gt_df.iloc[int(index),11].split(",")[0])
                        scale11_y = float(gt_df.iloc[int(index),11].split(",")[1])

                        scale = self.cm[name] / dist((scale0_x, scale0_y), (scale11_x, scale11_y))
                        
                        
                        mask_size = (round(2/scale), round(3/scale))
                        
                        
                        
                        if not os.path.exists(occluded_img_path):
                            os.makedirs(occluded_img_path)
                        count = 0
                        for i in range(x, x+w, int(mask_size[0]/2)):
                            
                            for j in range(y, y+h, int(mask_size[1]/2)):
                                img_copy = img.copy()
#                                 img_copy[y+j:y+j+int(mask_size[1]), x+i:x+i+int(mask_size[0])] = (0, 0, 0)
                                img_copy[j:j+int(mask_size[1]), i:i+int(mask_size[0])] = (0, 0, 0)
                                cv2.imwrite(os.path.join(occluded_img_path, "%03d.png"%count), img_copy)
#                                 cv2.imshow("Modified Image", img_copy)
#                                 cv2.waitKey(0)
#                                 cv2.destroyAllWindows()
                                count += 1
               
        
    def pred_and_save(self, kpt_det_model_pth):
        for ear_type in ear_types:
            names = os.listdir(os.path(self.occlusion_pth, ear_type))
            for name in names:

                rtmpose_cfg = os.path.join(self.cfg_folder,"rtmpose-s_8xb256-420e_coco-256x192_custom_{et}.py".format(et = ear_type))
                rtmpose_ckp = os.path.join(kpt_det_model_pth, ear_type, "model_save", name,"best*.pth")

                pose_estimator = init_pose_estimator(
                rtmpose_cfg,
                rtmpose_ckp,
                device="cuda:0",
                cfg_options=dict(
                    model=dict(test_cfg=dict(output_heatmaps=False))))

                for deg in degrees:
                    indexes = os.list(os.path.join(self.occlusion_pth, ear_type, name, deg))
                    
                    for index in indexes:
                        imgs = os.list(os.path.join(self.occlusion_pth, ear_type, name, deg, index))
                        df = pd.DataFrame()
                        for img in imgs:
                            image = cv2.imread(os.path.join(self.occlusion_pth, ear_type, name, deg, index, img))
                            pred_instances = process_one_image(image, detector, pose_estimator)
                            kpts = pred_instances_list[0]["keypoints"]
                            kpts_pd = {}
                            for i in range(0, len(kpts)):
                                x, y  = kpts[i]
                                kpts_pd[i] = str(x)+str(",")+str(y)
                            ser = pd.DataFrame(data=kpts_pd, index = [total_count])
                            df = pd.concat([df, ser])
                        occlusion_pred = os.path.join(self.occluded_result, ear_type, name, "pred", deg, index)

                        if not os.path.isdir(occlusion_pred):
                            os.makedirs(occlusion_pred)
                        df.to_csv(os.path.join(occlusion_pred, "pred.csv"))
    def convert_to_error_csv(self):
        for ear_type in self.ear_types:
            names = os.listdir(os.path.join("..", self.occluded_result, ear_type))
            for name in names:
                for deg in self.degrees:
                    gt = os.path.join("..", self.gt_folder, ear_type, name, deg, "gt.csv")
                    gt_df = self.read_csv(gt)
                    

                    
                    
                    indexes = os.list(os.path.join(self.occluded_result, ear_type, name, "pred", deg))
                    
                    for index in indexes:
                        pred = os.path.join("..", self.occluded_result, ear_type, name, "pred", deg, index, "pred.csv")
                        pred_df = self.read_csv(pred)
                        
                        scale0_x = float(gt_df.iloc[int(index),0].split(",")[0])
                        scale0_y = float(gt_df.iloc[int(index),0].split(",")[1])

                        scale11_x = float(gt_df.iloc[int(index),11].split(",")[0])
                        scale11_y = float(gt_df.iloc[int(index),11].split(",")[1])

                        scale = self.cm[name] / dist((scale0_x, scale0_y), (scale11_x, scale11_y))
                        
                        
                        error_df = pd.DataFrame(columns=pred_df.columns, index = pred_df.index)
                        for i in range(0, len(gt_df)):
                            for j in range(0, len(gt_df.columns)):
                                if gt_df.iloc[int(index),j].split(",")[2] == "2":
                                    pred_x = float(pred_df.iloc[i,j].split(",")[0])
                                    pred_y = float(pred_df.iloc[i,j].split(",")[1])
                                    

                                    gt_x = float(gt_df.iloc[int(index),j].split(",")[0])
                                    gt_y = float(gt_df.iloc[int(index),j].split(",")[1])

                                    error_df.iloc[i,j] = scale * dist((gt_x, gt_y), (pred_x, pred_y))



                        error_path = os.path.join("..", self.occluded_result, ear_type, name, "error", deg, index)
                        if not os.path.isdir(error_path):
                            os.makedirs(error_path)
                        error_df.to_csv(os.path.join(error_path, "error.csv"))

In [3]:
class CDF_Draw():
    def __init__(self, CDF_0, CDF_1):
        self.CDF_0 = CDF_0
        self.CDF_1 = CDF_1
        
    def Draw_one_CDF(self, title, label):
        for ear_type in self.CDF_0.ear_types:
            plt.plot(self.CDF_0.cal_CDF()[ear_type]["sorted_error"], self.CDF_0.cal_CDF()[ear_type]["cumulative_prob"], label="The result of {label_}".format(label_ = label))
            plt.xlim(0, 10)
            plt.ylim(0, 1.1)
            plt.xlabel("mm")
            plt.ylabel("Probability")
            plt.title(title)
            plt.legend()
            plt.show()
#             if not os.path.isdir(os.path.join("../mmpose/k_fold", "CDF", ear_type)):
#                 os.makedirs(os.path.join("../mmpose/k_fold", "CDF", ear_type))
#             plt.savefig(os.path.join("../mmpose/k_fold", "CDF", ear_type, 'CDF.png').format(index = 0))
            
    def Draw_two_CDF(self, title, label_0, label_1):
        for ear_type in self.CDF_0.ear_types:
            plt.plot(self.CDF_0.cal_CDF()[ear_type]["sorted_error"], self.CDF_0.cal_CDF()[ear_type]["cumulative_prob"], label="The result of {label_}".format(label_ = label_0))
            plt.plot(self.CDF_1.cal_CDF()[ear_type]["sorted_error"], self.CDF_1.cal_CDF()[ear_type]["cumulative_prob"], label="The result of {label_}".format(label_ = label_1))
            plt.xlim(0, 10)
            plt.ylim(0, 1.1)
            plt.xlabel("mm")
            plt.ylabel("Probability")
            plt.title(title)
            plt.legend()
            if not os.path.isdir(os.path.join("..", "figure", "diff_inpainting_way", "CDF")):
                os.makedirs(os.path.join("..", "figure", "diff_inpainting_way", "CDF"))
            plt.savefig(os.path.join("..", "figure", "diff_inpainting_way", "CDF", 'CDF_{ear_type}.png'.format(ear_type = ear_type)))
            plt.show()
            

In [12]:
A = occlusion()
A.generate_occluded_img()