This part is the detector part of control group.

Basic method is using HoG+SVM to do a "Insect or not" classifier, find all possible areas that could be insects, output the location of these area for further classifiying. 

第一部分是对照组的检测器部分

基础思路是结合HoG和SVM做一个是否是昆虫还是背景的二分类器，先将所有可能是昆虫的区域检出，将区域输出以进行进一步的昆虫类别分类。

In [1]:
import cv2
import numpy as np
from skimage.feature import hog
from sklearn.svm import LinearSVC
import os
import yaml
import pandas as pd
from sklearn.metrics import precision_recall_fscore_support
import json
import random

In [2]:
class HoG_Detector:
    def __init__(self):
        self.hog_parameters ={
            'pixels_per_cell':(8,8),
            'cells_per_block':(2,2),
            'orientations': 11
        }

    def filter_low_confidence_part(self,detections,min_confidence=1.0):
        filtered_detections=[]
        for detection in detections:
            if detection['confidence'] >=min_confidence:
                filtered_detections.append(detection)
        return filtered_detections

    def load_dataset_config(self,data_yaml_path):
        with open(data_yaml_path,'r') as y: # Load yaml file 加载数据集的yaml配置文件
            self.dataset_config=yaml.safe_load(y)
        return self.dataset_config
    
    def get_positive_sample(self,image_dir,label_dir):
        positive_sample=[]

        for img_name in os.listdir(image_dir):
            if not img_name.lower().endswith('.jpg'):
                continue

            img_path = os.path.join(image_dir,img_name)
            label_path = os.path.join(label_dir,img_name.replace('.jpg','.txt'))

            if not os.path.exists(label_path):
                continue

            image=cv2.imread(img_path)
            if image is None:
                continue
            h,w = image.shape[:2]

            with open(label_path,'r') as l:
                for line in l:
                    details=line.strip().split()
                    if len(details)!=5:
                        continue

                    class_id,x_center,y_center,width,height=map(float,details)
                    
                    x_center_abs= x_center*w # Transform label detail into pixel location 把label文件里的归一化坐标转换为像素坐标
                    y_center_abs= y_center*h
                    width_abs = width*w
                    height_abs = height*h

                    x1=int(x_center_abs-width_abs/2)
                    x2=int(x_center_abs+width_abs/2)
                    y1=int(y_center_abs-height_abs/2)
                    y2=int(y_center_abs+height_abs/2)
                    x1, y1 = max(0, x1), max(0, y1)
                    x2, y2 = min(w, x2), min(h, y2)

                    if x2 > x1 and y2 > y1:
                        insect_patch = image[y1:y2, x1:x2]
                        positive_sample.append(insect_patch)
                        # Cut out positive sample area and add it to the list 剪下昆虫区域并存储
    
        return positive_sample

    def get_negative_sample(self, image_dir,label_dir, samples_per_image=10):
        negative_sample=[]

        for img_name in os.listdir(image_dir):
            if not img_name.lower().endswith('.jpg'):
                continue

            img_path = os.path.join(image_dir,img_name)
            label_path = os.path.join(label_dir,img_name.replace('.jpg','.txt'))

            if not os.path.exists(label_path):
                continue

            image=cv2.imread(img_path)
            if image is None:
                continue
            h,w = image.shape[:2]

            avoid_areas=[]
            if os.path.exists(label_path):
                with open(label_path, 'r') as l:
                    for line in l:
                        parts = line.strip().split()
                        if len(parts) == 5:
                            class_id, x_center, y_center, width, height = map(float, parts)
                            x_center_abs= x_center*w 
                            y_center_abs= y_center*h
                            width_abs = width*w
                            height_abs = height*h

                            x1=int(x_center_abs-width_abs/2)
                            x2=int(x_center_abs+width_abs/2)
                            y1=int(y_center_abs-height_abs/2)
                            y2=int(y_center_abs+height_abs/2)
                            avoid_areas.append((x1, y1, x2, y2)) # Get positive area like former part 想先前一样画出昆虫区域待用
            
            for _ in range(samples_per_image):
                size = np.random.randint(30,min(w,h)//2)
                x1=np.random.randint(0,w-size)
                x2=x1+size
                y1=np.random.randint(0,h-size)
                y2=y1+size

            
                overlap = False
                for gt_x1, gt_y1, gt_x2, gt_y2 in avoid_areas:
                    iou = self.calculate_iou((x1, y1, x2, y2), (gt_x1, gt_y1, gt_x2, gt_y2))
                    if iou > 0.3:  # If overlapping is larger than 0.3, then we drop it 如果重叠度大于30%,认为是正样本区域,丢弃
                        overlap = True
                        break
                
                if not overlap:
                    background_patch = image[y1:y2, x1:x2]
                    negative_sample.append(background_patch)
        
        return negative_sample
    
    def calculate_iou(self,area1,area2):
        x1_1, y1_1, x2_1, y2_1 = area1
        x1_2, y1_2, x2_2, y2_2 = area2

        # AND
        a1 = max(x1_1,x1_2)
        b1 = max(y1_1,y1_2)
        a2 = max(x2_1,x2_2)
        b2 = max(y2_1,y2_2)
        inter_area=max(0,a2-a1)*max(0,b2-b1)

        # OR
        SofArea1= (x2_1 - x1_1) * (y2_1 - y1_1)
        SofArea2 = (x2_2 - x1_2) * (y2_2 - y1_2)
        union_area=SofArea1+SofArea2-inter_area

        if union_area>0:
            return inter_area/union_area
        else:
            return 0
    
    def extract_hog(self,image_patch):
        # Extract HoG deatures 提取HoG特征
        resized_p=cv2.resize(image_patch,(64,64))
        # resized_p=image_patch
        if len(resized_p.shape)==3: # If not gray 如果非灰度，转换为灰度
            gray=cv2.cvtColor(resized_p,cv2.COLOR_BGR2GRAY)
        else:
            gray=resized_p
        
        hog_features=hog(gray,
                         pixels_per_cell=self.hog_parameters['pixels_per_cell'],
                         cells_per_block=self.hog_parameters['cells_per_block'],
                         orientations=self.hog_parameters['orientations'],
                         feature_vector=True)
        return hog_features
    
    def prepare_train_data(self,data_yaml_path):
        self.load_dataset_config(data_yaml_path)

        train_image_dir = self.dataset_config['train'].replace('/images', '') + '/images'
        train_label_dir = self.dataset_config['train'].replace('/images', '') + '/labels'

        print("Preparing positive part 开始准备昆虫部分")
        positive_patch=self.get_positive_sample(train_image_dir,train_label_dir)
        print(f"Already found {len(positive_patch)} positive samples 找到了{len(positive_patch)}个正向样本")

        print("Preparing negative part 开始准备背景部分")
        negative_patch=self.get_negative_sample(train_image_dir,train_label_dir)
        print(f"Already found {len(negative_patch)} negative samples 找到了{len(negative_patch)}个反向样本")

        print("Extrating HoG features 提取Hog特征")
        positive_features=[self.extract_hog(i) for i in positive_patch]
        negative_features=[self.extract_hog(i) for i in negative_patch]

        X=positive_features+negative_features
        y=[1]*len(positive_features)+[0]*len(negative_features) # 1 as Insect（昆虫） 0 as Background（背景）

        return np.array(X), np.array(y)
    
    def binary_classifier_train(self,data_yaml_path):
        # Train the Insect & Background Classifier 训练昆虫/背景的二分类分类器
        X,y=self.prepare_train_data(data_yaml_path)

        print(f"Trainning data:{X.shape},label:{y.shape}")
        print(f"Positive samples:{np.sum(y==1)},negative samples={np.sum(y==0)}")

        self.svm = LinearSVC(C=1.0,class_weight='balanced',max_iter=10000)
        self.svm.fit(X,y)

        train_accuray= self.svm.score(X,y)
        print(f"Accuracy={train_accuray:.4f}")

        return train_accuray
    
    def sliding_window(self,image,window_size=(64,64),step_size=32,scale=1.0):
        h,w = image.shape[:2]
        window_w,window_h=window_size

        if scale != 1.0:
            new_w, new_h = int(w * scale), int(h * scale)
            resized_image = cv2.resize(image, (new_w, new_h))
        else:
            resized_image = image
            new_w, new_h = w, h

        for y in range(0, new_h - window_h, step_size):
            for x in range(0, new_w - window_w, step_size):
                yield (x, y, resized_image[y:y+window_h, x:x+window_w], scale)
    
    def detect(self, image, confidence_threshold=0.4):
        detections= []
        h,w=image.shape[:2]

        scales=[0.5,0.75,1.0,1.25,1.5,1.75,2.0]

        for s in scales:
            for x,y,window,current_scale in self.sliding_window(image,step_size=16,scale=s):
                if window.size ==0:
                    continue
                try:
                    features = self.extract_hog(window)
                    confidence=self.svm.decision_function([features])[0]

                    if confidence>confidence_threshold:
                        orig_x=int(x/current_scale)
                        orig_y=int(y/current_scale)
                        orig_w=int(64/current_scale)
                        orig_h=int(64/current_scale)

                        # 计算原边界框的中心和尺寸
                        center_x = orig_x + orig_w / 2
                        center_y = orig_y + orig_h / 2
                        new_w = orig_w * 3
                        new_h = orig_h * 3
                        # 计算放大后的边界框
                        new_x1 = int(center_x - new_w / 2)
                        new_y1 = int(center_y - new_h / 2)
                        new_x2 = int(center_x + new_w / 2)
                        new_y2 = int(center_y + new_h / 2)
                        # 确保不超出图像边界
                        new_x1 = max(0, new_x1)
                        new_y1 = max(0, new_y1)
                        new_x2 = min(w, new_x2)  # w是原图的宽度
                        new_y2 = min(h, new_y2)  # h是原图的高度

                        detections.append({   # RESULT 本部分检测结果
                            'bbox': [new_x1, new_y1, new_x2, new_y2],
                            'confidence': float(confidence)
                        })
                        
                except Exception as e:
                    continue
        nms_d=self.non_max_suppression(detections)

        filtered_d=self.filter_low_confidence_part(nms_d,min_confidence=1.3)

        return filtered_d

    
    def non_max_suppression(self, detections, iou_threshold=0.4):
        """非极大值抑制去除重叠框"""
        if len(detections) == 0:
            return []
        detections.sort(key=lambda x: x['confidence'], reverse=True)
        
        keep = []
        while detections:
            best = detections.pop(0)
            keep.append(best)
            detections = [det for det in detections 
                         if self.calculate_iou(best['bbox'], det['bbox']) < iou_threshold]
        
        return keep



Then we use test dataset to do some test, see how the detector preform and save the detail of the result as csv

接下来对Test集进行测试查看模型表现，并把结果保存为csv文件以供后续使用

In [3]:
class HogTesting:
    def __init__(self,detector):
        self.detector = detector
        self.results =[]
    
    def detect_and_export(self,test_image_dir,test_label_dir,output_dir,confidence_threshold=0.5,Quick_detect=False,max_image=75,random_seed=47):
        
        os.makedirs(output_dir,exist_ok=True)
        os.makedirs(os.path.join(output_dir, 'candidate_patches'), exist_ok=True)

        RESULT=[]
        patch_info=[]

        all_image_files = [f for f in os.listdir(test_image_dir) if f.lower().endswith('.jpg')]

        if Quick_detect==True:
            random.seed(random_seed)
            image_files=random.sample(all_image_files,max_image)
        else:
            image_files=all_image_files

        print(f"Start processing {len(image_files)} images 开始处理测试图像")

        for image_index,image_name in enumerate(image_files):
            image_path=os.path.join(test_image_dir,image_name)
            image=cv2.imread(image_path)
            if image is None:
                continue

            Detection=self.detector.detect(image,confidence_threshold)
            for detect_index,Detection in enumerate(Detection):
                x1,y1,x2,y2=Detection['bbox']
                confidence=Detection['confidence']

                patch=image[y1:y2,x1:x2]
                if patch.size==0:
                    continue        
                patch_filename=f"{os.path.splitext(image_name)[0]}_patch_{detect_index}.jpg"
                patch_path=os.path.join(output_dir,'candidate_patches',patch_filename)
                cv2.imwrite(patch_path,patch)

                detection_info = {
                    'original_image': image_name,
                    'patch_filename': patch_filename,
                    'bbox_x1': x1,
                    'bbox_y1': y1,
                    'bbox_x2': x2,
                    'bbox_y2': y2,
                    'confidence': confidence,
                    'patch_path': patch_path
                }

                RESULT.append(detection_info)
                patch_info.append(detection_info)
        
        df = pd.DataFrame(RESULT)
        csv_path = os.path.join(output_dir, 'detection_results.csv')         # Save result as csv  保存检测结果为CSV
        df.to_csv(csv_path, index=False)
        
        json_path = os.path.join(output_dir, 'detection_results.json')        # Save result as json 保存为JSON格式
        with open(json_path, 'w') as f:
            json.dump(RESULT, f, indent=2)
        
        print(f"Detection Finished,total {len(RESULT)} candidate areas 检测完成")
        print(f"Result saved in {csv_path} CSV结果已保存")
        print(f"Images saved in {os.path.join(output_dir, 'candidate_patches')} 图像已保存")
        
        
        return (RESULT,image_files) if Quick_detect==True else (RESULT,None)
    
    def load_ground_truth(self,label_dir,image_dir,image_list=None):
        ground_truth = {}
        label_files=[]
        
        if image_list==None:
            label_files = [f for f in os.listdir(label_dir) if f.endswith('.txt')]
        else:

            for img_name in image_list:
                label_f = img_name.replace('.jpg','.txt')
                if os.path.exists(os.path.join(label_dir, label_f)):
                    label_files.append(label_f)
        
        for label_file in label_files:
            image_file = label_file.replace('.txt', '.jpg')
            image_path = os.path.join(image_dir, image_file)
            
            if not os.path.exists(image_path):
                continue
            image = cv2.imread(image_path)
            if image is None:
                continue
                
            h, w = image.shape[:2]
            
            label_path = os.path.join(label_dir, label_file)
            gt_boxes = []
            
            with open(label_path, 'r') as f:
                for line in f:
                    parts = line.strip().split()
                    if len(parts) != 5:
                        continue
                    
                    class_id, x_center, y_center, width, height = map(float, parts)
                    
                    x_center_abs = x_center * w
                    y_center_abs = y_center * h
                    width_abs = width * w
                    height_abs = height * h
                    
                    x1 = int(x_center_abs - width_abs/2)
                    y1 = int(y_center_abs - height_abs/2)
                    x2 = int(x_center_abs + width_abs/2)
                    y2 = int(y_center_abs + height_abs/2)
                    
                    gt_boxes.append({
                        'class_id': int(class_id),
                        'bbox': [x1, y1, x2, y2]
                    })
            
            ground_truth[image_file] = {
                'image_size': (w, h),
                'boxes': gt_boxes
            }
        
        return ground_truth
    
    def check_preformance(self,detection_results,ground_truth,iou_threshold=0.5):
        ''' This part is aim to evaluate the Hog+SVM preformance. We use the iou=0.5 as a threshold, only bigger than this threshold is a TP
            这部分是detector的性能表现测试，阈值是iou=0.5，和test数据集label中提供的数值做对比，将iou大于这个数值的box视为区域检测正确
        '''

        detections_by_image = {}
        for det in detection_results:
            img_name = det['original_image']
            if img_name not in detections_by_image:
                detections_by_image[img_name] = []
            detections_by_image[img_name].append(det)
        
        # Preformance 统计指标
        total_gt_boxes = 0
        true_positives = 0
        false_positives = 0
        false_negatives = 0
        
        detailed_results = []
        
        for img_name, gt_info in ground_truth.items():
            if img_name not in detections_by_image:
                false_negatives += len(gt_info['boxes'])
                total_gt_boxes += len(gt_info['boxes'])
                continue
            
            gt_boxes = gt_info['boxes']
            detections = detections_by_image[img_name]
            total_gt_boxes += len(gt_boxes)
            gt_matched = [False] * len(gt_boxes)
            det_matched = [False] * len(detections)
            
            for det_idx, detection in enumerate(detections):
                det_bbox = [detection['bbox_x1'], detection['bbox_y1'], 
                           detection['bbox_x2'], detection['bbox_y2']]
                best_iou = 0
                best_gt_idx = -1
                
                for gt_idx, gt_box in enumerate(gt_boxes):
                    if gt_matched[gt_idx]:
                        continue
                    
                    iou = self.detector.calculate_iou(det_bbox, gt_box['bbox'])
                    if iou > best_iou:
                        best_iou = iou
                        best_gt_idx = gt_idx
                
                if best_iou >= iou_threshold and best_gt_idx != -1:
                    # TP part 正确检测
                    true_positives += 1
                    gt_matched[best_gt_idx] = True
                    det_matched[det_idx] = True
                    
                    detailed_results.append({
                        'image': img_name,
                        'detection_bbox': det_bbox,
                        'gt_bbox': gt_boxes[best_gt_idx]['bbox'],
                        'iou': best_iou,
                        'confidence': detection['confidence'],
                        'status': 'TP'
                    })
                else:
                    # FP part 错误检测
                    false_positives += 1
                    detailed_results.append({
                        'image': img_name,
                        'detection_bbox': det_bbox,
                        'gt_bbox': None,
                        'iou': 0,
                        'confidence': detection['confidence'],
                        'status': 'FP'
                    })
            
            # Miss 漏检
            for gt_idx, matched in enumerate(gt_matched):
                if not matched:
                    false_negatives += 1
                    detailed_results.append({
                        'image': img_name,
                        'detection_bbox': None,
                        'gt_bbox': gt_boxes[gt_idx]['bbox'],
                        'iou': 0,
                        'confidence': 0,
                        'status': 'FN'
                    })
        
        precision = true_positives / (true_positives + false_positives) if (true_positives + false_positives) > 0 else 0
        recall = true_positives / total_gt_boxes if total_gt_boxes > 0 else 0
        f1_score = 2 * precision * recall / (precision + recall) if (precision + recall) > 0 else 0
        
        metrics = {
            'true_positives': true_positives,
            'false_positives': false_positives,
            'false_negatives': false_negatives,
            'total_ground_truth': total_gt_boxes,
            'precision': precision,
            'recall': recall,
            'f1_score': f1_score
        }
        
        return metrics, detailed_results
    
    def generate_classification_dataset(self, detection_results, output_dir):
        """ This part create the data structure used by CNN
        生成CNN分类器训练/测试所需的数据集结构
        """
        classification_dir = os.path.join(output_dir, 'classification_dataset')
        os.makedirs(classification_dir, exist_ok=True)
        classification_info = []
        
        for det in detection_results:
            classification_info.append({
                'patch_path': det['patch_path'],
                'original_image': det['original_image'],
                'bbox': f"{det['bbox_x1']},{det['bbox_y1']},{det['bbox_x2']},{det['bbox_y2']}",
                'confidence': det['confidence']
            })
        
        # Save csv file 保存csv文件分类数据集信息
        classification_csv = os.path.join(classification_dir, 'classification_patches.csv')
        pd.DataFrame(classification_info).to_csv(classification_csv, index=False)
        
        print(f"CSV file created: {classification_csv}")
        print(f"Including {len(classification_info)} candidate areas")
        
        return classification_dir
    


In [4]:
detector = HoG_Detector()
detector.binary_classifier_train('data.yaml')

tester=HogTesting(detector)


Preparing positive part 开始准备昆虫部分
Already found 15282 positive samples 找到了15282个正向样本
Preparing negative part 开始准备背景部分
Already found 21702 negative samples 找到了21702个反向样本
Extrating HoG features 提取Hog特征
Trainning data:(36984, 2156),label:(36984,)
Positive samples:15282,negative samples=21702
Accuracy=0.8706


We have a quick detect mode in our detect function. If set Quick detect as True, the function will randomly test some images.

如果Quick Detect为True，测试时会从测试集中随机抽选一定量的样本，最大样本量和随机种子可指定。

In [5]:
print("=== Detect and Export ===")

test_image_dir = '../test/images'
test_label_dir = '../test/labels' 
output_dir = 'detection_output'

detection_results,processed_P = tester.detect_and_export(
        test_image_dir, test_label_dir, output_dir,
        confidence_threshold=0.3,
        Quick_detect=False
    )

=== Detect and Export ===
Start processing 546 images 开始处理测试图像
Detection Finished,total 676 candidate areas 检测完成
Result saved in detection_output\detection_results.csv CSV结果已保存
Images saved in detection_output\candidate_patches 图像已保存


In [6]:
print("=== Load Ground Truth ===")
ground_truth = tester.load_ground_truth(test_label_dir, test_image_dir,processed_P)

=== Load Ground Truth ===


In [7]:
print("\n=== 检测性能评估 ===")
metrics, detailed_results = tester.check_preformance(
        detection_results, ground_truth, iou_threshold=0.5
    )
    
print(f"Performance  检测性能指标:")
print(f"精确率 (Precision): {metrics['precision']:.4f}")
print(f"召回率 (Recall): {metrics['recall']:.4f}") 
print(f"F1 F1分数: {metrics['f1_score']:.4f}")
print(f"TP 正确检测: {metrics['true_positives']}")
print(f"FP 误检: {metrics['false_positives']}")
print(f"Miss 漏检: {metrics['false_negatives']}")
print(f"Total real target 总真实目标: {metrics['total_ground_truth']}")
    
evaluation_csv = os.path.join(output_dir, 'evaluation_metrics.csv')
pd.DataFrame([metrics]).to_csv(evaluation_csv, index=False)
    
detailed_csv = os.path.join(output_dir, 'detailed_evaluation.csv')
pd.DataFrame(detailed_results).to_csv(detailed_csv, index=False)


=== 检测性能评估 ===
Performance  检测性能指标:
精确率 (Precision): 0.6612
召回率 (Recall): 0.6488
F1 F1分数: 0.6549
TP 正确检测: 447
FP 误检: 229
Miss 漏检: 242
Total real target 总真实目标: 689
