# SỬ DỤNG OBJECT DETECTION ĐỂ CROP ẢNH

# **Thông tin của tác giả, ngày cập nhật**

<hr>

**Thành viên nhóm**:
- **Trần Đình Khánh Đăng - 22520195**
- **Tăng Nhất - 22521027**
- **Lê Minh Nhựt - 22521060**

**Ngày cập nhật**: 22/01/2025

## Import thư viện cần thiết

In [None]:
import os
import gc
import cv2
import math

import pandas as pd
import tensorflow as tf
import tensorflow_hub as hub
import matplotlib.pyplot as plt
from matplotlib import patches


from PIL import Image
from tqdm.notebook import tqdm

## Khởi tạo đường dẫn

In [None]:
dataset_dir = '/kaggle/working/dataset'
base_dir = '/kaggle/input/cs114-final-project-full-dataset'
df = pd.read_csv('CarDataset.csv')

## Code

In [None]:
model_url = "https://tfhub.dev/tensorflow/faster_rcnn/resnet50_v1_640x640/1"
detector = hub.load(model_url)

def save_cropped_image(cropped_image, original_image_path, dataset_dir='./'):
    base_dir, img_name = os.path.split(original_image_path)
    class_name = os.path.basename(os.path.dirname(original_image_path))
    new_dir = os.path.join(dataset_dir, class_name)
    os.makedirs(new_dir, exist_ok=True)
    cropped_image_path = os.path.join(new_dir, img_name)
    cv2.imwrite(cropped_image_path, cv2.cvtColor(cropped_image, cv2.COLOR_RGB2BGR))
    return cropped_image_path

def load_and_preprocess_image(base_dir, image_path, max_image_size):
    image_path = os.path.join(base_dir, image_path)
    img = cv2.imread(image_path)
    if img is None:
        print(f"Cannot load image from {image_path}")
        return None, None
    
    if max_image_size is not None:
        img = cv2.resize(img, max_image_size)
    
    original_img = img.copy()
    img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
    return img, original_img

def detect_car(image, detector, score_threshold=0.5):
    input_tensor = tf.convert_to_tensor(image)
    input_tensor = tf.expand_dims(input_tensor, axis=0)
    detections = detector.signatures['serving_default'](input_tensor)

    boxes = detections['detection_boxes'].numpy()[0]
    class_ids = detections['detection_classes'].numpy()[0].astype(np.int64)
    scores = detections['detection_scores'].numpy()[0]

    return {"boxes": boxes, "class_ids": class_ids, "scores": scores}

def process_chunk(data_chunk, base_dir, dataset_dir, detector, class_index, score_threshold, save_images, none_list_file, max_image_size):
    batch_results = []
    for idx, row in tqdm(data_chunk.iterrows(), total=data_chunk.shape[0]):
        image_path = row['ImageFullPath']
        image_np, original_image = load_and_preprocess_image(base_dir, image_path, max_image_size)

        if image_np is None:
            batch_results.append((image_path, None, None, None))
            continue

        detection_results = detect_car(image_np, detector, score_threshold)

        boxes = detection_results["boxes"]
        class_ids = detection_results["class_ids"]
        scores = detection_results["scores"]

        img_h, img_w, _ = original_image.shape

        max_area = 0
        prioritized_box = None
        best_score = 0

        for box, score, label in zip(boxes, scores, class_ids):
            if score < score_threshold or label != class_index:
                continue
            ymin, xmin, ymax, xmax = box
            area = (xmax - xmin) * (ymax - ymin)

            if area > max_area:
                max_area = area
                prioritized_box = (xmin, ymin, xmax, ymax)
                best_score = score

        cropped_image_path = None
        prioritized_box_pixels = None

        if prioritized_box is not None:
            xmin, ymin, xmax, ymax = prioritized_box
            left = int(xmin * img_w)
            right = int(xmax * img_w)
            top = int(ymin * img_h)
            bottom = int(ymax * img_h)

            cropped_image = original_image[top:bottom, left:right]
            if save_images:
                cropped_image_path = save_cropped_image(cropped_image, image_path, dataset_dir)
            prioritized_box_pixels = (left, right, top, bottom)
        else:
            if save_images:
                cropped_image = original_image 
                cropped_image_path = save_cropped_image(cropped_image, image_path, dataset_dir)
            with open(os.path.join(dataset_dir, none_list_file), 'a') as f:
                f.write(f"{image_path}, No_Car_Detected\n")

        batch_results.append((image_path, cropped_image_path, prioritized_box_pixels, best_score))

    return batch_results

def process_car_detection(base_dir='./',
                          dataset_dir='./',
                          data=None,
                          file_name='CarDataset.csv',
                          detector=None,
                          partition=False,
                          partition_size=1000,
                          batch_size=10,
                          chunk_size=100, 
                          class_index=3,
                          score_threshold=0.5,
                          save_images=True,
                          none_list_file='NoneList.csv',
                          max_image_size=(640, 640)
                          ):
    data_path = os.path.join(dataset_dir, file_name)
    results_list = []

    total_rows = sum(1 for row in open(data_path)) -1
    num_chunks = math.ceil(total_rows / chunk_size)

    print(f"Total number of chunks: {num_chunks}")

    chunk_num = 0 
    for chunk in pd.read_csv(data_path, chunksize=chunk_size):
        chunk_num += 1
        print(f"Processing chunk {chunk_num} out of {num_chunks} (Size: {len(chunk)} rows)")

        if partition:
            chunk = chunk.sample(n=min(partition_size, len(chunk)), random_state=42).reset_index(drop=True)
            print(f"  - Processing {len(chunk)} images after partitioning...")

        total_batches = math.ceil(len(chunk) / batch_size)
        print(f"  - Total batches in this chunk: {total_batches}")

        for i in range(0, len(chunk), batch_size):
            batch_data = chunk[i:i + batch_size]
            batch_results = process_chunk(batch_data, base_dir, dataset_dir, detector, class_index, score_threshold, save_images, none_list_file, max_image_size)
            results_list.extend(batch_results)

            del batch_data
            del batch_results
            gc.collect()

    return results_list


In [None]:
results_list = process_car_detection(base_dir=base_dir,
                                     dataset_dir=dataset_dir,
                                     data=df,
                                     detector=detector,
                                     batch_size = 128,
                                     chunk_size = 1024,
                                     )

## Visualization

In [None]:
def visualize_results(results_list,
                      data=None,
                      base_dir='./',
                      dataset_dir='./',
                      file_name='CarDataset.csv',
                      partition=False,
                      partition_size=10,
                      ):
    
    if data is None:
        data_path = os.path.join(dataset_dir, file_name)
        data = pd.read_csv(data_path)
    if partition:
        start_index = (partition - 1) * partition_size
        end_index = min(start_index + partition_size, len(results_list))
        results_list = results_list[start_index:end_index]

    for i, (image_path, cropped_image_path, bbox, score) in tqdm(enumerate(results_list), total=len(results_list), desc="Processing images: "):
        image_path = os.path.join(base_dir, image_path)
        try:
            original_image = cv2.imread(image_path)
            original_image = cv2.resize(original_image, (640, 640))
            original_image = cv2.cvtColor(original_image, cv2.COLOR_BGR2RGB)
            if original_image is None:
                print(f"Warning: Could not load image at {image_path}. Skipping.")
                continue
        except Exception as e:
            print(f"Error loading image {image_path}: {e}")
            continue

        fig, ax = plt.subplots(1, 2, figsize=(15, 5))

        ax[0].grid(False)
        ax[1].grid(False)

        ax[0].imshow(original_image)
        ax[0].set_title(f"Original Image: {os.path.basename(image_path)}")

        if bbox:
            left, right, top, bottom = bbox
            rect = patches.Rectangle((left, top), right - left, bottom - top,
                                     linewidth=2, edgecolor='r', facecolor='none')
            ax[0].add_patch(rect)
            ax[0].text(left, top - 5, f"Car (Score: {score:.2f})", color='red', fontsize=10)

        if cropped_image_path is not None:
            cropped_image = cv2.imread(cropped_image_path)
            ax[1].imshow(cropped_image)
            ax[1].set_title("Cropped Car")
        else:
            ax[1].set_title("No Car Detected Above Threshold")

        plt.tight_layout()
        plt.show()
        plt.close()

In [None]:
visualize_results(base_dir=base_dir,
                  dataset_dir = dataset_dir,
                    results_list = results_list,
                    data=df,
                    partition=True,
                    partition_size=10)