In [None]:
pip install opencv-python
pip install azure-cognitiveservices-vision-customvision
pip install azure-cognitiveservices-vision-computervision

Let us first import the necessary libraries

In [None]:
import os
import json
import numpy as np
import time
import cv2
import io
import math
import DataPrep

from azure.cognitiveservices.vision.customvision.training import CustomVisionTrainingClient
from azure.cognitiveservices.vision.customvision.training.models import ImageFileCreateEntry,ImageFileCreateBatch,Region
from azure.cognitiveservices.vision.customvision.prediction import CustomVisionPredictionClient

from msrest.authentication import ApiKeyCredentials
from sklearn import preprocessing
from DataPrep import BuildCoordinates

In [None]:
#Adding subscription key and endpoints
training_endpoint = "https://csvsc1task4point2d.cognitiveservices.azure.com/"
training_key=""
prediction_key=""
prediction_endpoint = "https://csvsc1task4point2d-prediction.cognitiveservices.azure.com/"
prediction_resource_id="/subscriptions/be831466-57b1-483b-91b7-a81607321d1e/resourceGroups/RGTask4Point2SC1/providers/Microsoft.CognitiveServices/accounts/CSVSC1Task4Point2D-Prediction"
publish_iteration_name = "Iteration5"
# trainer = CustomVisionTrainingClient(training_key,training_endpoint)

In [None]:
#Here I have created an instance of the CustomVision training and prediction clients
credentials = ApiKeyCredentials(in_headers={"Training-key": training_key})
trainer = CustomVisionTrainingClient(training_endpoint, credentials)
prediction_credentials = ApiKeyCredentials(in_headers={"Prediction-key": prediction_key})
predictor = CustomVisionPredictionClient(prediction_endpoint, prediction_credentials)

In [None]:
obj_detection_domain = next(domain for domain in trainer.get_domains() if domain.type == "ObjectDetection" and domain.name == "General")

In [None]:
#Finding the object detection domain
obj_detection_domain = next(domain for domain in trainer.get_domains() if domain.type == "ObjectDetection" and domain.name == "General")
#Creating new project on custom vision
print("Creating project...")
project = trainer.create_project("CSVProj1Task4Point2",domain_id = obj_detection_domain.id)

In [None]:
project = trainer.get_project('87c97806-1dee-489a-a52b-6881edc9b760',None,False,)

In [None]:
#Making tags in the new project
bikes_tag = trainer.create_tag(project.id,"bike")
buses_tag = trainer.create_tag(project.id,"bus")
cars_tag = trainer.create_tag(project.id,"car")
autorickshaw_tag = trainer.create_tag(project.id,"autorickshaw")
truck_tag = trainer.create_tag(project.id,"truck")

In [None]:
#prepare to get co-ordinates from training images. Here I am defining the variables which will hold the co-ordinates of the images
bikes_regions= {}
cars_regions = {}
buses_regions = {}
autorickshaw_regions = {}
truck_regions = {}

In [None]:
#This method calls the DataPrep library and executes the BuildCoordinates() method of the library. Json files with each vehicle category 
#will be created on running this cell. 
#Each json file contains the co-ordinates of all images of that category. 
DataPrep.BuildCoordinates()

In [None]:
#This piece of code reads the json files created above and reads the files and writes them into dictionaries of each vehicle category.
json_dir = "C:/Applications/Machine Learning/MDS-Data Science/mds-deakin/Object Detection using CustomVision/json/" 

for file in os.listdir(json_dir):
    with open(os.path.join(json_dir, file),mode='rb') as f:
        if os.path.splitext(os.path.basename(file))[0] == "bus":
            buses_regions = json.load(f)
        elif os.path.splitext(os.path.basename(file))[0] == "car":
            cars_regions = json.load(f)
        elif os.path.splitext(os.path.basename(file))[0] == "bike":
            bikes_regions = json.load(f)
        elif os.path.splitext(os.path.basename(file))[0] == "autorickshaw":
            autorickshaw_regions = json.load(f)
        elif os.path.splitext(os.path.basename(file))[0] == "truck":
            truck_regions = json.load(f)

Now I will tag each image and upload it to the Azure Custom Vision project

In [None]:
#tag images and upload them
training_folder = "C:/Applications/Machine Learning/MDS-Data Science/mds-deakin/Object Detection using CustomVision/Dataset/mini_vehicle_dataset/train"
base_image_location = training_folder

In [None]:
# Go through the data table above and create the images
print ("Adding images...")
tagged_images_with_regions = []


for file_name in buses_regions.keys():
    x,y,w,h = buses_regions[file_name]
    arr = np.array([x,y,w,h])
    print(arr)
    #Here, the actual coordinates have been converted to the normalized coordinates using the sklearn library.
    normalized_corr= preprocessing.normalize([arr])
    print(normalized_corr)
    bu_regions = [ Region(tag_id=buses_tag.id, left=normalized_corr[0][0],top=normalized_corr[0][1],width=normalized_corr[0][2],height=normalized_corr[0][3]) ]
    #Here Images are being labelled using the tags and created using the below line.
    with open(base_image_location +"/buses/" + file_name + ".jpg", mode="rb") as image_contents:
        tagged_images_with_regions.append(ImageFileCreateEntry(name=file_name, contents=image_contents.read(), regions=bu_regions))


for file_name in cars_regions.keys():
    x,y,w,h = cars_regions[file_name]
    arr = np.array([x,y,w,h])
    print(arr)
    normalized_corr= preprocessing.normalize([arr])
    print(normalized_corr)
    c_regions = [ Region(tag_id=cars_tag.id, left=normalized_corr[0][0],top=normalized_corr[0][1],width=normalized_corr[0][2],height=normalized_corr[0][3]) ]

    with open(base_image_location +"/cars/" + file_name + ".jpg", mode="rb") as image_contents:
        tagged_images_with_regions.append(ImageFileCreateEntry(name=file_name, contents=image_contents.read(), regions=c_regions))

for file_name in bikes_regions.keys():
    x,y,w,h = bikes_regions[file_name]
    arr = np.array([x,y,w,h])
    print(arr)
    normalized_corr= preprocessing.normalize([arr])
    print(normalized_corr)
    bi_regions = [ Region(tag_id=bikes_tag.id, left=normalized_corr[0][0],top=normalized_corr[0][1],width=normalized_corr[0][2],height=normalized_corr[0][3]) ]

    with open(base_image_location +"/bikes/" + file_name + ".jpg", mode="rb") as image_contents:
        tagged_images_with_regions.append(ImageFileCreateEntry(name=file_name, contents=image_contents.read(), regions=bi_regions))

for file_name in autorickshaw_regions.keys():
    x,y,w,h = autorickshaw_regions[file_name]
    arr = np.array([x,y,w,h])
    print(arr)
    normalized_corr= preprocessing.normalize([arr])
    print(normalized_corr)
    ar_regions = [ Region(tag_id=autorickshaw_tag.id, left=normalized_corr[0][0],top=normalized_corr[0][1],width=normalized_corr[0][2],height=normalized_corr[0][3]) ]

    with open(base_image_location +"/autorickshaw/" + file_name + ".jpg", mode="rb") as image_contents:
        tagged_images_with_regions.append(ImageFileCreateEntry(name=file_name, contents=image_contents.read(), regions=ar_regions))

for file_name in truck_regions.keys():
    x,y,w,h = truck_regions[file_name]
    arr = np.array([x,y,w,h])
    print(arr)
    normalized_corr= preprocessing.normalize([arr])
    print(normalized_corr)
    tr_regions = [ Region(tag_id=truck_tag.id, left=normalized_corr[0][0],top=normalized_corr[0][1],width=normalized_corr[0][2],height=normalized_corr[0][3]) ]

    with open(base_image_location +"/trucks/" + file_name + ".jpg", mode="rb") as image_contents:
        tagged_images_with_regions.append(ImageFileCreateEntry(name=file_name, contents=image_contents.read(), regions=tr_regions))

In [None]:
#In the below piece of code, I have set the upper limit of uploading images to 64. The total number of batches is calculated using the math.ceil method. 
#Now, each batch is iterated over to upload the images in the Azure Custom Vision training project.
upper_limit = 64
batch_count = math.ceil(len(tagged_images_with_regions)/upper_limit)
print(batch_count)
#Iterate over each batch
for i in range(batch_count):
    start = i * upper_limit
    end = min((i+1)*upper_limit,len(tagged_images_with_regions))
    tagged_images = tagged_images_with_regions[start:end]
    print(len(tagged_images))
    
    #Finally, upload the images of the current batch to Azure CustomVision project        
    upload_result = trainer.create_images_from_files(project.id, ImageFileCreateBatch(images=tagged_images))
    if not upload_result.is_batch_successful:
        print("Image batch upload failed.")
        for image in upload_result.images:
            print("Image status: ", image.status)

In [None]:
""" upload_result = trainer.create_images_from_files(project.id, ImageFileCreateBatch(images=tagged_images_with_regions))
if not upload_result.is_batch_successful:
    print("Image batch upload failed.")
    for image in upload_result.images:
        print("Image status: ", image.status)
    #exit(-1) """

In [None]:
#Here I am training the model using the train_project API. Once trained the project is published
print("Training...")
iteration = trainer.train_project(project.id)
while(iteration.status != "Completed"):
    iteration = trainer.get_iteration(project.id,iteration.id)
    print("Training status: " + iteration.status)

#the iteration is now trained. Publish it to the project endpoint
trainer.publish_iteration(project.id,iteration.id,publish_iteration_name,prediction_resource_id)
print("Done!")

We will now train the model using an actual traffic video and check if vehicles are being detected using the tags we have created.

In [None]:
#Here I have captured the video using OpenCV videocapture class
cap=cv2.VideoCapture("./Dataset/traffic.mp4/input videos/unlabelled_traffic2.mp4")

In [None]:
#Here I 
print("Frame count of video: ",int(cap.get(cv2.CAP_PROP_FRAME_COUNT)))
print("Number of frames per second of video: ",int(cap.get(cv2.CAP_PROP_FPS)))
# Get the video width and height
frame_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
frame_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
print("Width of video is {frame_width} and height of video is {frame_height} ",frame_width,frame_height)

In [None]:
# Set the frame rate to 1 frame per second
frame_rate = 1
frame_count = 0


In the below piece of code, I am iterating through each frame and passing the frame to the "detect_image" API at 1 frame per second. Next I am drawing a bounding box on the frame and displaying the tag along with the confidence percent and saving the frame as an image on my local disk.

In [None]:
# Looping through each frame in the video
while cap.isOpened():
    ret, frame = cap.read()
    if not ret:
        break
    
    # Here I am sending only frame per second to the Custom Vision Service
    if frame_count % frame_rate == 0:
        
        # Converting the frame to a byte array and sending it to the Custom Vision Service
        _, img_encoded = cv2.imencode('.jpg', frame)
        results = predictor.detect_image(project.id, publish_iteration_name, img_encoded.tobytes())
        
        # Here I am drawing a bounding box on the frame for each detected object
        for prediction in results.predictions:
            if prediction.probability > 0.40:
                x = prediction.bounding_box.left * frame.shape[1]
                y = prediction.bounding_box.top * frame.shape[0]
                width = prediction.bounding_box.width * frame.shape[1]
                height = prediction.bounding_box.height * frame.shape[0]
                #Here I am adding the tag and prediction probability to be displayed along with the bounding box.
                text = f"{prediction.tag_name} ({prediction.probability*100:.1f}%)"
                if prediction.tag_name == "bus":
                    cv2.rectangle(frame, (int(x), int(y)), (int(x+width), int(y+height)), (255, 0, 0), 2)
                    cv2.putText(frame, text, (int(x), int(y) - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.9, (255, 0, 0), 2)
                elif prediction.tag_name == "autorickshaw":
                    cv2.rectangle(frame, (int(x), int(y)), (int(x+width), int(y+height)), (0, 255, 0), 2)
                    cv2.putText(frame, text, (int(x), int(y) - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.9, (0, 255, 0), 2)
                elif prediction.tag_name == "bike":
                    cv2.rectangle(frame, (int(x), int(y)), (int(x+width), int(y+height)), (150, 0, 255), 2)
                    cv2.putText(frame, text, (int(x), int(y) - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.9, (150, 0, 255), 2)
                elif prediction.tag_name == "car":
                    cv2.rectangle(frame, (int(x), int(y)), (int(x+width), int(y+height)), (0, 0, 255), 2)
                    cv2.putText(frame, text, (int(x), int(y) - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.9, (0, 0, 255), 2)
                elif prediction.tag_name == "truck":
                    cv2.rectangle(frame, (int(x), int(y)), (int(x+width), int(y+height)), (255, 0, 255), 2)
                    cv2.putText(frame, text, (int(x), int(y) - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.9, (255, 0, 255), 2)
        #cv2.imwrite(output_filename, frame)
        # Show the frame with bounding boxes
        cv2.imshow('frame', frame)
        output_path =  "C:/Applications/Machine Learning/MDS-Data Science/mds-deakin/Object Detection using CustomVision/Dataset/video_images/labelled_images/"
        output_filename = output_path+"image_{:04d}.jpg".format(frame_count)
        cv2.imwrite(output_filename, frame)
    #cv2.imwrite(output_filename,frame_count )
    #output.write(frame)
    
  
    if cv2.waitKey(1) & 0xFF == ord('q'):
        break

      # Increment the frame count
    frame_count += 1
    

cap.release()
#output.release()
cv2.destroyAllWindows()


In [None]:
#Now that I have all the labelled images along with the bounding boxes, I will iterate over each file and write them to a video. 
#I have set the frame rate to 5 frames per second

frame_rate = 11.0
fourcc = cv2.VideoWriter_fourcc(*'MP4V')
output = cv2.VideoWriter('./DataSet/traffic.mp4/output videos/output_video1.mp4', fourcc, frame_rate, (frame_width,frame_height))

# Looping over the images in the directory and adding them to the video
image_directory = './Dataset/video_images/labelled_images/'
image_files = os.listdir(image_directory)
for image_file in sorted(image_files):
    # Reading the image
    image_path = os.path.join(image_directory, image_file)
    image = cv2.imread(image_path)
    
    # Resizing the image to the frame height and width
    if image.shape[:2] != (frame_width,frame_height):
        image = cv2.resize(image, (frame_width,frame_height))
    
    # Writing the image to the video
    output.write(image)

# Finally, I am releasing the video writer and closing the video file
output.release()

In [None]:
import cv2

# Open the video file
video = cv2.VideoCapture('./DataSet/traffic.mp4/output videos/labelled_traffic7.mp4')

# Get the original frame rate
fps = int(video.get(cv2.CAP_PROP_FPS))

# Set the new frame rate
new_fps = 11  # Reduce the frame rate by half
video.set(cv2.CAP_PROP_FPS, new_fps)

# Define the output video codec and file name
fourcc = cv2.VideoWriter_fourcc(*'mp4v')
output_file = './DataSet/traffic.mp4/output videos/output_video1.mp4'

# Create a video writer object to write the processed video
width, height = int(video.get(3)), int(video.get(4))
writer = cv2.VideoWriter(output_file, fourcc, new_fps, (width, height))

# Read and write each frame with the new frame rate
while True:
    ret, frame = video.read()
    if not ret:
        break
    writer.write(frame)

# Release the video objects
video.release()
writer.release()
