# Running on new images
This notebook will walk you step by step through the process of using a pre-trained model to detect traffic signs in an image.

# Imports

In [1]:
import warnings
warnings.filterwarnings('ignore')
import numpy as np
import os
import tensorflow as tf
from matplotlib import pyplot as plt
from PIL import Image
import glob as glob
import random

%matplotlib inline

# Environment setup

In [2]:

import sys

sys.path.append('../models/research')  # Replace with the path to TensorFlow Object Detection API
sys.path.append('../darkflow')  # Replace with the path to Darkflow

from object_detection.utils import label_map_util
from object_detection.utils import visualization_utils as vis_util


# Tensorflow Object Detection API

## Model preparation 

In [3]:
# MODEL_NAME = 'faster_rcnn_inception_resnet_v2_atrous'
# MODEL_NAME = 'faster_rcnn_resnet_101'
# MODEL_NAME = 'faster_rcnn_resnet50'
MODEL_NAME = 'faster_rcnn_inception_v2'
# MODEL_NAME = 'rfcn_resnet101'
# MODEL_NAME = 'ssd_inception_v2'
# MODEL_NAME = 'ssd_mobilenet_v1'

In [4]:
# Path to frozen detection graph. This is the actual model that is used for the traffic sign detection.
MODEL_PATH = os.path.join('models', MODEL_NAME)
PATH_TO_CKPT = os.path.join(MODEL_PATH,'inference_graph/frozen_inference_graph.pb')

# List of the strings that is used to add correct label for each box.
PATH_TO_LABELS = os.path.join('gtsdb_data', 'gtsdb3_label_map.pbtxt')

NUM_CLASSES = 3

## Load a (frozen) Tensorflow model into memory

In [5]:
detection_graph = tf.Graph()
with detection_graph.as_default():
    od_graph_def = tf.GraphDef()
    with tf.gfile.GFile(PATH_TO_CKPT, 'rb') as fid:
        serialized_graph = fid.read()
        od_graph_def.ParseFromString(serialized_graph)
        tf.import_graph_def(od_graph_def, name='')

## Loading label map
Label maps map indices to category names, so that when our convolution network predicts `2`, we know that this corresponds to `mandatory`.

In [6]:
PATH_TO_LABELS = '/home/yuxing/my_project/traffic-sign-detection/gtsdb_data/gtsdb3_label_map.pbtxt'

label_map = label_map_util.load_labelmap(PATH_TO_LABELS)
categories = label_map_util.convert_label_map_to_categories(label_map, max_num_classes=NUM_CLASSES, use_display_name=True)
category_index = label_map_util.create_category_index(categories)
print(label_map)


item {
  name: "prohibitory"
  id: 1
}
item {
  name: "mandatory"
  id: 2
}
item {
  name: "danger"
  id: 3
}



## Helper code

In [7]:
def load_image_into_numpy_array(image):
    (im_width, im_height) = image.size
    return np.array(image.getdata()).reshape((im_height, im_width, 3)).astype(np.uint8)

## Detection

In [8]:

PATH_TO_TEST_IMAGES_DIR = '/home/yuxing/my_project/traffic-sign-detection/test_images'
# for png_image_path in glob.glob(os.path.join(PATH_TO_TEST_IMAGES_DIR, '*.png')):
#     with Image.open(png_image_path) as im:
#         # Get the image name without extension
#         image_name = os.path.splitext(os.path.basename(png_image_path))[0]
        
#         # Define the path for the jpg image
#         jpg_image_path = os.path.join(PATH_TO_TEST_IMAGES_DIR, f"{image_name}.jpg")
        
#         # Save the image in jpg format
#         im.convert('RGB').save(jpg_image_path)
        
#         # Optionally, remove the original png image
#         os.remove(png_image_path)

TEST_IMAGE_PATHS = glob.glob(os.path.join(PATH_TO_TEST_IMAGES_DIR, '*.jpg'))

# Size, in inches, of the output images.
IMAGE_SIZE = (20, 20)

In [9]:
# import shutil

# PATH_TO_TEST_IMAGES_DIR = '/home/yuxing/my_project/traffic-sign-detection/Image_sent4'

# # Create a subdirectory named "images_positive_detection" if it doesn't exist
# detected_dir = os.path.join(PATH_TO_TEST_IMAGES_DIR, "images_positive_detection")
# if not os.path.exists(detected_dir):
#     os.makedirs(detected_dir)

# image_counter = 530  # Initialize counter for saved image filenames

# with detection_graph.as_default():
#     with tf.Session(graph=detection_graph) as sess:
#         for idx, image_path in enumerate(TEST_IMAGE_PATHS):
#             image = Image.open(image_path)
#             image_np = load_image_into_numpy_array(image)
#             image_np_expanded = np.expand_dims(image_np, axis=0)
            
#             image_tensor = detection_graph.get_tensor_by_name('image_tensor:0')
#             boxes = detection_graph.get_tensor_by_name('detection_boxes:0')
#             scores = detection_graph.get_tensor_by_name('detection_scores:0')
#             classes = detection_graph.get_tensor_by_name('detection_classes:0')
#             num_detections = detection_graph.get_tensor_by_name('num_detections:0')
            
#             (boxes, scores, classes, num_detections) = sess.run(
#                 [boxes, scores, classes, num_detections],
#                 feed_dict={image_tensor: image_np_expanded})

#             if np.max(scores) > 0.5:
#                 # If the image has a positive detection, save the original image
#                 output_filename = os.path.join(detected_dir, f"test_images_{image_counter:03}.jpg")
#                 shutil.copy2(image_path, output_filename)
                
#                 image_counter += 1  # Increment the counter for the next filename


In [10]:
# with detection_graph.as_default():
#     with tf.Session(graph=detection_graph) as sess:
#         for idx, image_path in enumerate(TEST_IMAGE_PATHS):
#             image = Image.open(image_path)
#             # the array based representation of the image will be used later in order to prepare the
#             # result image with boxes and labels on it.
#             image_np = load_image_into_numpy_array(image)
#             # Expand dimensions since the model expects images to have shape: [1, None, None, 3]
#             image_np_expanded = np.expand_dims(image_np, axis=0)
#             image_tensor = detection_graph.get_tensor_by_name('image_tensor:0')
#             # Each box represents a part of the image where a particular object was detected.
#             boxes = detection_graph.get_tensor_by_name('detection_boxes:0')
#             # Each score represent how level of confidence for each of the objects.
#             # Score is shown on the result image, together with the class label.
#             scores = detection_graph.get_tensor_by_name('detection_scores:0')
#             classes = detection_graph.get_tensor_by_name('detection_classes:0')
#             num_detections = detection_graph.get_tensor_by_name('num_detections:0')
#             # Actual detection.
#             (boxes, scores, classes, num_detections) = sess.run(
#                 [boxes, scores, classes, num_detections],
#                 feed_dict={image_tensor: image_np_expanded})
#             # Visualization of the results of a detection.
#             vis_util.visualize_boxes_and_labels_on_image_array(
#                 image_np,
#                 np.squeeze(boxes),
#                 np.squeeze(classes).astype(np.int32),
#                 np.squeeze(scores),
#                 category_index,
#                 use_normalized_coordinates=True,
#                 line_thickness=6)
#             plt.figure(idx, figsize=IMAGE_SIZE)
#             plt.axis('off')
#             plt.imshow(image_np)



In [11]:
confidence_threshold = 0.5

## Patch image processing

In [12]:
# from PIL import Image, ImageEnhance

# # Load the image
# image1 = Image.open('batch/1.jpg')
# image2 = Image.open('batch/2.jpg')

# # Initialize the ImageEnhancer objects for brightness and sharpness
# enhancer_brightness = ImageEnhance.Brightness(image1)
# enhancer_sharpness = ImageEnhance.Sharpness(image1)

# # Decrease the brightness and sharpness by 50% and save the image
# enhanced_image1 = enhancer_sharpness.enhance(0.5)
# enhanced_image1 = enhancer_brightness.enhance(0.5)
# enhanced_image1.save('batch/1_modified.jpg')

# # Repeat the same process for the second image
# enhancer_brightness = ImageEnhance.Brightness(image2)
# enhancer_sharpness = ImageEnhance.Sharpness(image2)

# # Decrease the brightness and sharpness by 50% and save the image
# enhanced_image2 = enhancer_sharpness.enhance(0.5)
# enhanced_image2 = enhancer_brightness.enhance(0.5)
# enhanced_image2.save('batch/2_modified.jpg')


## Adding two patchs on the detected traffic signs (random positions)

In [13]:


# # Perform object detection on the images
# total_boxes_processed = 0
# total_batches_added = 0
# modified_images_list = []

# with detection_graph.as_default():
#     with tf.Session(graph=detection_graph) as sess:
#         for idx, image_path in enumerate(TEST_IMAGE_PATHS):
#             image = Image.open(image_path)
#             image_np = load_image_into_numpy_array(image)
#             image_np_expanded = np.expand_dims(image_np, axis=0)
#             image_tensor = detection_graph.get_tensor_by_name('image_tensor:0')
#             boxes = detection_graph.get_tensor_by_name('detection_boxes:0')
#             scores = detection_graph.get_tensor_by_name('detection_scores:0')
#             classes = detection_graph.get_tensor_by_name('detection_classes:0')
#             num_detections = detection_graph.get_tensor_by_name('num_detections:0')
#             (boxes, scores, classes, num_detections) = sess.run(
#                 [boxes, scores, classes, num_detections],
#                 feed_dict={image_tensor: image_np_expanded})

#             # Get the number of valid detected boxes and print their indices
#             valid_boxes = boxes[0, :int(num_detections[0])]
#             valid_scores = scores[0, :int(num_detections[0])]
#             num_valid_boxes = sum(valid_scores > confidence_threshold)  # Count boxes with confidence above threshold
#             total_boxes_processed += num_valid_boxes
#             print("Image", idx, "Detected Boxes:", num_valid_boxes)
#             print([len(box) for box in valid_boxes])

#             # Add batch image to the boxes and store the modified image
#             batch_image_paths = ['batch/1_modified.jpg', 'batch/2_modified.jpg']  # Replace with the paths to your batch images
#             image_np_with_batch = add_patch_to_boxes(image_np.copy(), valid_boxes[:num_valid_boxes], batch_image_paths)

#             # Count the number of batches added in this image
#             num_batches_added = num_valid_boxes
#             total_batches_added += num_batches_added
#             print("Batches added in this image:", num_batches_added)

#             # Store the modified image without showing bounding boxes
#             modified_images_list.append(image_np_with_batch)

# # Display the modified images without showing bounding boxes
# for idx, image_np_with_boxes in enumerate(modified_images_list):
#     plt.figure(figsize=IMAGE_SIZE)
#     plt.axis('off')
#     plt.imshow(image_np_with_boxes)
#     # plt.savefig(f'image_with_batches_{idx+1}.png')  # Save the image with a unique name
#     plt.show()

# # Print the total number of boxes processed and total batches added
# print("Total boxes processed:", total_boxes_processed)
# print("Total batches added:", total_batches_added)

In [14]:
def central_coordinates(ymin, xmin, ymax, xmax, box_height, box_width, image_np):
    center_y = (ymin + ymax) * 0.5 * image_np.shape[0]
    center_x = (xmin + xmax) * 0.5 * image_np.shape[1]
    new_height = box_height * 0.7
    new_width = box_width * 0.7
    new_ymin = max(0, center_y - new_height * 0.5)
    new_xmin = max(0, center_x - new_width * 0.5)
    new_ymax = min(image_np.shape[0], center_y + new_height * 0.5)
    new_xmax = min(image_np.shape[1], center_x + new_width * 0.5)
    return new_ymin, new_xmin, new_ymax, new_xmax

def generate_non_overlapping_positions(ymin, xmin, ymax, xmax, batch_height, batch_width, aspect_ratio):
    if aspect_ratio > 1:  # Box is taller than it is wide
        half_height = (ymax + ymin) // 2
        y_start_1 = ymin
        y_end_1 = max(ymin, half_height - batch_height)
        y_start_2 = half_height
        y_end_2 = max(half_height, ymax - batch_height)

        if y_end_1 <= y_start_1 or y_end_2 <= y_start_2:  # not enough space
            return None, None

        first_y = random.randint(y_start_1, y_end_1)
        second_y = random.randint(y_start_2, y_end_2)
        x = random.randint(xmin, xmax - batch_width)
        return (first_y, x), (second_y, x)

    else:  # Box is wider than it is tall
        half_width = (xmax + xmin) // 2
        x_start_1 = xmin
        x_end_1 = max(xmin, half_width - batch_width)
        x_start_2 = half_width
        x_end_2 = max(half_width, xmax - batch_width)

        if x_end_1 <= x_start_1 or x_end_2 <= x_start_2:  # not enough space
            return None, None

        first_x = random.randint(x_start_1, x_end_1)
        second_x = random.randint(x_start_2, x_end_2)
        y = random.randint(ymin, ymax - batch_height)
        return (y, first_x), (y, second_x)

def add_patch_to_boxes(image_np, boxes, scores, batch_image_paths, confidence_threshold=0.5):
    """Add the batch images to the bounding boxes."""
    for idx, box in enumerate(boxes):
        if scores[idx] < confidence_threshold:
            continue  # Skip boxes with low confidence
        ymin, xmin, ymax, xmax = box
        box_height = int((ymax - ymin) * image_np.shape[0])
        box_width = int((xmax - xmin) * image_np.shape[1])

        new_ymin, new_xmin, new_ymax, new_xmax = central_coordinates(ymin, xmin, ymax, xmax, box_height, box_width, image_np)

        batch_image_height = box_height // 4
        batch_image_width = box_width // 4
        aspect_ratio = box_height / box_width

        positions = generate_non_overlapping_positions(
            int(new_ymin), int(new_xmin), int(new_ymax), int(new_xmax),
            batch_image_height, batch_image_width, aspect_ratio
        )

        if not positions[0] or not positions[1]:  # Not enough space for the batches
            continue  # Skip adding batches for this box

        (y1, x1), (y2, x2) = positions

        for batch_idx, batch_image_path in enumerate(batch_image_paths):
            batch_image = Image.open(batch_image_path)
            batch_image_resized = batch_image.resize((batch_image_width, batch_image_height))
            batch_np = np.array(batch_image_resized)
            if batch_idx == 0:
                image_np[y1:y1 + batch_np.shape[0], x1:x1 + batch_np.shape[1]] = batch_np
            else:
                image_np[y2:y2 + batch_np.shape[0], x2:x2 + batch_np.shape[1]] = batch_np

    return image_np



In [15]:

# START_IDX = 1  # Starting index (inclusive)
# END_IDX = 600  # Ending index (inclusive)

# PATH_TO_TEST_IMAGES_DIR = '/home/yuxing/my_project/traffic-sign-detection/test_images'
# TEST_IMAGE_PATHS = [os.path.join(PATH_TO_TEST_IMAGES_DIR, f'test_image_{i:03}.jpg') for i in range(START_IDX, END_IDX+1)]
# confidence_threshold = 0.5
# IMAGE_SIZE = (20, 20)
# results = {}  # A dictionary to save detection results for all images

# def run_detection_and_save(image_np, image_path, idx, save_dir, prefix, save_bbox=True):
#     with detection_graph.as_default():
#         with tf.Session(graph=detection_graph) as sess:
#             image_np_expanded = np.expand_dims(image_np, axis=0)
#             image_tensor = detection_graph.get_tensor_by_name('image_tensor:0')
#             boxes = detection_graph.get_tensor_by_name('detection_boxes:0')
#             scores = detection_graph.get_tensor_by_name('detection_scores:0')
#             classes = detection_graph.get_tensor_by_name('detection_classes:0')
            
#             (output_boxes, output_scores, output_classes) = sess.run(
#                 [boxes, scores, classes],
#                 feed_dict={image_tensor: image_np_expanded})

#             # Diagnostic Print Statements
#             # print("Output boxes shape:", output_boxes.shape)
#             # print("Output classes shape:", output_classes.shape)
#             # print("Output scores shape:", output_scores.shape)

#             squeezed_boxes = np.squeeze(output_boxes)
#             squeezed_classes = np.squeeze(output_classes)
#             squeezed_scores = np.squeeze(output_scores)

#             # Further Diagnostic Print Statements
#             # print("Squeezed boxes shape:", squeezed_boxes.shape)
#             # print("Squeezed classes shape:", squeezed_classes.shape)
#             # print("Squeezed scores shape:", squeezed_scores.shape)

#             # print("Unique classes before conversion:", np.unique(squeezed_classes))
#             squeezed_classes = squeezed_classes.astype(np.int32)

#             # Save the results
#             results[image_path] = {"boxes": output_boxes, "scores": output_scores, "classes": output_classes}

#             # If need to save images with bounding boxes
#             if save_bbox:
#                 vis_util.visualize_boxes_and_labels_on_image_array(
#                     image_np,
#                     squeezed_boxes,
#                     squeezed_classes,
#                     squeezed_scores,
#                     category_index,
#                     use_normalized_coordinates=True,
#                     line_thickness=6)

#             output_filename = os.path.join(save_dir, f"{prefix}_{idx+1:03}.jpg")
#             Image.fromarray(image_np).save(output_filename)

# # Step 2: Detect on original images and save the results with bounding boxes
# for idx, image_path in enumerate(TEST_IMAGE_PATHS):
#     image = Image.open(image_path)
#     image_np = np.array(image)
#     run_detection_and_save(image_np, image_path, idx, "test_images_box", "test_image_box")

# # Step 1 and 3: Add patches and save without bounding boxes
# patch_image_save_paths = []  # List to store paths of saved patched images
# patch_image_paths = ['/home/yuxing/my_project/traffic-sign-detection/batch/1_modified.jpg',
#                      '/home/yuxing/my_project/traffic-sign-detection/batch/2_modified.jpg']

# for idx, image_path in enumerate(TEST_IMAGE_PATHS):
#     image = Image.open(image_path)
#     image_np = np.array(image)
    
#     squeezed_boxes = results[image_path]["boxes"].squeeze()  
#     squeezed_scores = results[image_path]["scores"].squeeze()  # Get the squeezed scores
    
#     image_np_with_patch = add_patch_to_boxes(image_np, squeezed_boxes, squeezed_scores, patch_image_paths)
    
    
#     output_filename = os.path.join("patch_images", f"patch_image_{idx+1:03}.jpg")
#     Image.fromarray(image_np_with_patch).save(output_filename)
    
#     patch_image_save_paths.append(output_filename)  # Append saved path to list

# # Step 4: Detect on patched images and save results with bounding boxes
# for idx, image_path in enumerate(patch_image_save_paths):  # Using the saved paths list
#     image = Image.open(image_path)
#     image_np = np.array(image)
#     run_detection_and_save(image_np, image_path, idx, "patch_images_box", "patch_image_box")


In [16]:

# import pickle

# with open('results.pkl', 'wb') as f:
#     pickle.dump(results, f)


In [20]:
import pickle

with open('results.pkl', 'rb') as file:
    results = pickle.load(file)

def iou(box1, box2):
    '''Computes the Intersection over Union (IoU) between two boxes.
    Each box is defined by its top-left and bottom-right corners: [y1, x1, y2, x2].
    '''
    y1_max = max(box1[0], box2[0])
    x1_max = max(box1[1], box2[1])
    y2_min = min(box1[2], box2[2])
    x2_min = min(box1[3], box2[3])
    
    inter_area = max(0, y2_min - y1_max) * max(0, x2_min - x1_max)
    
    box1_area = (box1[2] - box1[0]) * (box1[3] - box1[1])
    box2_area = (box2[2] - box2[0]) * (box2[3] - box2[1])
    
    union_area = box1_area + box2_area - inter_area
    
    return inter_area / union_area

test_image_dir = '/home/yuxing/my_project/traffic-sign-detection/test_images/'
patch_image_dir = 'patch_images/'

changed_classes_indices = []
changed_confidence_indices = []

# 1. Filtering based on confidence score
for image_path, data in results.items():
    valid_indices = data['scores'] > 0.5
    results[image_path]['boxes'] = data['boxes'][valid_indices]
    results[image_path]['scores'] = data['scores'][valid_indices]
    results[image_path]['classes'] = data['classes'][valid_indices]


threshold_iou = 0.7  # Threshold to consider boxes as overlapping

for i in range(1, 530):
    test_image_path = os.path.join(test_image_dir, f'test_image_{i:03}.jpg')
    patch_image_path = os.path.join(patch_image_dir, f'patch_image_{i:03}.jpg')

    test_results = results.get(test_image_path, {})
    patch_results = results.get(patch_image_path, {})

    test_boxes = test_results.get('boxes', [])
    patch_boxes = patch_results.get('boxes', [])
    
    # If the number of detections is different, add to changed classes
    if len(test_boxes) != len(patch_boxes):
        changed_classes_indices.append(i)
    else:
        # Iterate over each box in test_results and find its best match in patch_results
        for idx, test_box in enumerate(test_boxes):
            best_iou = -1
            best_idx = -1
            for jdx, patch_box in enumerate(patch_boxes):
                current_iou = iou(test_box, patch_box)
                if current_iou > best_iou:
                    best_iou = current_iou
                    best_idx = jdx
        
            # If we have found a matching box in patch_results with sufficient overlap
            if best_iou > threshold_iou:
                if test_results['classes'][idx] != patch_results['classes'][best_idx]:
                    changed_classes_indices.append(i)
                score_diff = np.abs(test_results['scores'][idx] - patch_results['scores'][best_idx])
                if score_diff > 0.2:
                    changed_confidence_indices.append(i)

    # # Print samples for first 3 indices
    # if i <= 20:
    #     print(f"Results for index {i}")
    #     print(f"Test Image Results: {test_results}")
    #     print(f"Patch Image Results: {patch_results}")
    #     print("-------------------------")



print(f"indices with changed classes: {changed_classes_indices}")
print(f"indices with changed confidence scores: {changed_confidence_indices}")


indices with changed classes: [2, 4, 7, 15, 18, 19, 22, 24, 27, 28, 30, 31, 33, 36, 40, 51, 56, 59, 60, 62, 63, 67, 74, 78, 83, 91, 92, 93, 97, 99, 102, 103, 104, 107, 111, 113, 114, 115, 116, 117, 120, 121, 123, 124, 125, 126, 129, 131, 134, 135, 136, 137, 143, 145, 147, 152, 155, 162, 167, 168, 169, 176, 181, 182, 184, 187, 191, 196, 198, 202, 203, 206, 218, 219, 220, 227, 229, 230, 234, 235, 236, 237, 238, 241, 242, 246, 247, 249, 252, 253, 256, 268, 271, 272, 275, 276, 277, 278, 280, 282, 285, 286, 288, 289, 290, 293, 295, 299, 300, 305, 307, 312, 315, 318, 320, 321, 324, 326, 331, 337, 338, 342, 346, 347, 350, 352, 355, 358, 359, 365, 366, 367, 369, 371, 376, 384, 393, 395, 397, 398, 401, 403, 404, 405, 408, 409, 410, 411, 412, 414, 415, 416, 419, 425, 427, 428, 433, 434, 439, 441, 449, 451, 455, 457, 459, 460, 463, 468, 473, 474, 476, 477, 478, 479, 483, 486, 487, 488, 489, 490, 493, 499, 502, 504, 509, 512, 513, 515, 520, 521, 524, 525]
indices with changed confidence scores: [9

In [27]:
print(f"number of images with changed classes: {len(changed_classes_indices)}")
print(f"number of images with changed confidence: {len(changed_confidence_indices)}")

number of images with changed classes: 192
number of images with changed confidence: 29
