In [1]:
from typing import cast

from google.cloud import videointelligence_v1 as vi


def detect_shot_changes(video_uri: str) -> vi.VideoAnnotationResults:
    video_client = vi.VideoIntelligenceServiceClient()
    features = [vi.Feature.SHOT_CHANGE_DETECTION]
    request = vi.AnnotateVideoRequest(input_uri=video_uri, features=features)

    print(f'Processing video: "{video_uri}"...')
    operation = video_client.annotate_video(request)

    # Wait for operation to complete
    response = cast(vi.AnnotateVideoResponse, operation.result())
    # A single video is processed
    results = response.annotation_results[0]

    return results


In [5]:
import os
bucket_name = 'cloud-samples-data'
video_blob_name = 'video/JaneGoodall.mp4'
video_uri = os.path.join("gs://", bucket_name,video_blob_name)
print(video_uri)

gs://cloud-samples-data/video/JaneGoodall.mp4


In [2]:

results = detect_shot_changes(video_uri)

Processing video: "gs://cloud-samples-data/video/JaneGoodall.mp4"...


In [12]:
def print_video_shots(results: vi.VideoAnnotationResults):
    shots = results.shot_annotations
    print(f" Video shots: {len(shots)} ".center(40, "-"))
    for i, shot in enumerate(shots):
        t1 = shot.start_time_offset.total_seconds()
        t2 = shot.end_time_offset.total_seconds()
        print(f"{i+1:>3} | {t1:7.3f} | {t2:7.3f}")
    return shots

In [13]:
shots = print_video_shots(results)

----------- Video shots: 34 ------------
  1 |   0.000 |  12.880
  2 |  12.920 |  21.680
  3 |  21.720 |  27.880
  4 |  27.920 |  31.800
  5 |  31.840 |  34.720
  6 |  34.760 |  37.000
  7 |  37.040 |  39.520
  8 |  39.560 |  45.200
  9 |  45.240 |  49.960
 10 |  50.000 |  54.960
 11 |  55.000 |  58.000
 12 |  58.040 |  64.880
 13 |  64.920 |  71.840
 14 |  71.880 |  79.000
 15 |  79.040 |  84.560
 16 |  84.600 |  91.280
 17 |  91.320 |  95.800
 18 |  95.840 |  98.720
 19 |  98.760 | 101.720
 20 | 101.760 | 105.080
 21 | 105.120 | 108.320
 22 | 108.360 | 111.080
 23 | 111.120 | 113.560
 24 | 113.600 | 117.480
 25 | 117.520 | 120.000
 26 | 120.040 | 123.560
 27 | 123.600 | 126.080
 28 | 126.120 | 128.000
 29 | 128.040 | 131.600
 30 | 131.640 | 132.960
 31 | 133.000 | 135.120
 32 | 135.160 | 138.320
 33 | 138.360 | 146.200
 34 | 146.240 | 162.520


In [21]:
from google.cloud import storage
import tempfile
import cv2

# Set up GCS client
client = storage.Client()

# Specify your bucket and video file

# Get the bucket and blob
bucket = client.get_bucket(bucket_name)
blob = bucket.blob(video_blob_name)

# Download to a temporary file
with tempfile.NamedTemporaryFile(delete=False) as temp_file:
    blob.download_to_file(temp_file)
    temp_file_path = temp_file.name

# Play the video using OpenCV
cap = cv2.VideoCapture(temp_file_path)
fps = cap.get(cv2.CAP_PROP_FPS)
# Get the total number of frames
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
# Calculate duration in seconds
duration = total_frames / fps
print(f"{fps=}, {duration=}")

fps=25.0, duration=162.56


In [22]:

shot_times = [shot.end_time_offset.total_seconds() for shot in shots]

In [23]:
shot_idx = 0
elapsed_time = 0
caption = 'Shot: 0'

while cap.isOpened():
    ret, frame = cap.read()
    if not ret:
        break
    if elapsed_time > shot_times[shot_idx]:
        shot_idx += 1
        caption = 'Shot: {idx}'
    cv2.putText(frame, caption, (10, 50), cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 2, cv2.LINE_AA)
    cv2.imshow('Video', frame)
    
    # if q is pressed during the 40ms wait then break the loop
    if cv2.waitKey(40) & 0xFF == ord('q'):
        break
    elapsed_time += 0.0040
    
cap.release()
cv2.destroyAllWindows()