In [1]:
# This script uses the person detection module
# of the Google Cloud Video Intelligence API

In [2]:
import io
import os
os.environ["GOOGLE_APPLICATION_CREDENTIALS"]="Project-4484a7a8a88b.json"

import cv2
import numpy as np
import pandas as pd
from google.cloud import videointelligence_v1 as videointelligence

In [3]:
def detect_person(local_file_path="video/DIOR_test_video.mp4"):
    """Detects and tracks people in a video from a local file."""

    client = videointelligence.VideoIntelligenceServiceClient()

    with io.open(local_file_path, "rb") as f:
        input_content = f.read()

    # Configure the request
    config = videointelligence.types.PersonDetectionConfig(
        include_bounding_boxes=True,
        include_attributes=False,
        include_pose_landmarks=False,
        
    )
    context = videointelligence.types.VideoContext(
        person_detection_config=config
    )

    # Start the asynchronous request
    operation = client.annotate_video(
        request={
            "features": [videointelligence.Feature.PERSON_DETECTION],
            "input_content": input_content,
            "video_context": context,
        }
    )

    print("\nProcessing video for person detection annotations.")
    result = operation.result(timeout=300)

    print("\nFinished processing.\n")

    # Retrieve the first result, because a single video was processed.
    annotation_result = result.annotation_results[0]
    
    # Create list containing timestamps and boxes with detection
    dat_lst = []

    # loop through people detected
    for annotation in annotation_result.person_detection_annotations:
        
         for track in annotation.tracks:
            
            # Get box coordinates for each person in each frame
            for timestamped_object in track.timestamped_objects:
                box = timestamped_object.normalized_bounding_box
#                 print("Time is:")
#                 print(
#                         "\{}s".format(
#                             timestamped_object.time_offset.seconds
#                             + timestamped_object.time_offset.microseconds / 1e6,)
#                 )            
#                 print("Bounding box:")
#                 print("\tleft  : {}".format(box.left))
#                 print("\ttop   : {}".format(box.top))
#                 print("\tright : {}".format(box.right))
#                 print("\tbottom: {}".format(box.bottom))
#                 print
            
                # Append relevant data from the timestamped object
                dat_lst.append([
                    (track.segment.start_time_offset.seconds
                    + track.segment.start_time_offset.microseconds / 1e6),
                    (track.segment.end_time_offset.seconds
                    + track.segment.end_time_offset.microseconds / 1e6),
                    (timestamped_object.time_offset.seconds
                    + timestamped_object.time_offset.microseconds / 1e6),
                    box.left,
                    box.top,
                    box.right,
                    box.bottom
                ])
    
    columns = ['Begin','End', 'Time','BoxLeft', 'BoxTop', 'BoxRight', 'BoxBottom']
    person_dat = pd.DataFrame(dat_lst, columns=columns)
    
    return person_dat

In [4]:
person_dat = detect_person()
# detect_person()


Processing video for person detection annotations.

Finished processing.



In [6]:
# We then use the openCV library to
# create a new video displaying the results

# Create a VideoCapture object and read from input file
cap = cv2.VideoCapture('video/DIOR_test_video.mp4')

# Check if camera opened successfully
if (cap.isOpened()== False): 
  print("Error opening video stream or file")

# Default resolutions of the frame are obtained.The default resolutions are system dependent.
# We convert the resolutions from float to integer.
frame_width = int(cap.get(3))
frame_height = int(cap.get(4))

# Define the codec and create VideoWriter object.
fourcc = cv2.VideoWriter_fourcc(*'MP4V') 
out = cv2.VideoWriter('results/DIOR_test_result_video_intelligence.mp4', fourcc, 25., (frame_width,frame_height))

# Adapt the box coordinates to the frame width and height
box_dat = person_dat.copy()
box_dat['BoxLeft'] = round(box_dat.BoxLeft * frame_width)
box_dat['BoxRight'] = round(box_dat.BoxRight * frame_width)
box_dat['BoxTop'] = round(box_dat.BoxTop * frame_height)
box_dat['BoxBottom'] = round(box_dat.BoxBottom * frame_height)

In [7]:
# Draw boxes around people detected frame by frame
i_frame = 0
while(True):
    ret, frame = cap.read()
    
    if ret == True:
    
        timestamp = i_frame / 24
        i_frame = i_frame + 1
        
        # Draw a box around people detected in the frame 
        if box_dat['Time'].sub(timestamp).abs().min() <= 0.1:
            
            # Check the number of people detected in the frame
            result_index = box_dat['Time'].sub(timestamp).abs().idxmin()
            
            box_dat_detec = box_dat.loc[box_dat['Time'] == box_dat['Time'].iloc[result_index]]
            box_dat_detec.apply(
            lambda row: cv2.rectangle(
                frame, 
                (int(row['BoxLeft']),int(row['BoxTop'])), 
                (int(row['BoxRight']),int(row['BoxBottom'])), 
                (0,255,0), 5),
                axis=1
            )
    
        cv2.putText(frame, 'TEST RESULT', (50, 50), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 255), 2)
        #cv2.putText(frame, str(i_frame), (150, 150), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 255), 2)
        
        # Write the frame into the file 'output.avi'
        out.write(frame)


#     # Press Q on keyboard to stop recording
#         if cv2.waitKey(1) & 0xFF == ord('q'):
#             break

  # Break the loop
    else:
        break  

# When everything done, release the video capture and video write objects
cap.release()
out.release()

# Closes all the frames
cv2.destroyAllWindows()