## Setup

In [None]:
!pip install --upgrade google-cloud-videointelligence

In [None]:
from google.colab import files
files.upload()

In [None]:
!ls

In [None]:
import os, io
os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = r'service-account.json'

## Detect Shot Changes

In [None]:
from google.cloud import videointelligence_v1 as vi

def detect_shot_changes(video_uri):
    video_client = vi.VideoIntelligenceServiceClient()
    request = vi.AnnotateVideoRequest(
        input_uri=video_uri,
        features=[vi.Feature.SHOT_CHANGE_DETECTION],
    )
    print(f"Processing video: {video_uri}...")
    operation = video_client.annotate_video(request)
    return operation.result()

In [None]:
# You can open the sample video here  https://storage.googleapis.com/cloudmleap/video/next/JaneGoodall.mp4

video_uri = "gs://cloudmleap/video/next/JaneGoodall.mp4"

response = detect_shot_changes(video_uri)

In [None]:
def print_video_shots(response):
    # First result only, as a single video is processed
    shots = response.annotation_results[0].shot_annotations
    print(f" Video shots: {len(shots)} ".center(40, "-"))
    for i, shot in enumerate(shots):
        t1 = shot.start_time_offset.total_seconds()
        t2 = shot.end_time_offset.total_seconds()
        print(f"{i+1:>3} | {t1:7.3f} | {t2:7.3f}")

In [None]:
print_video_shots(response)

## Detect labels

In [None]:
from google.cloud import videointelligence_v1 as vi


def detect_labels(video_uri, mode, segments=None):
    video_client = vi.VideoIntelligenceServiceClient()
    features = [vi.Feature.LABEL_DETECTION]
    config = vi.LabelDetectionConfig(label_detection_mode=mode)
    context = vi.VideoContext(segments=segments, label_detection_config=config)
    request = vi.AnnotateVideoRequest(
        input_uri=video_uri,
        features=features,
        video_context=context,
    )
    print(f"Processing video: {video_uri}...")
    operation = video_client.annotate_video(request)
    return operation.result() 

In [None]:
from datetime import timedelta

video_uri = "gs://cloudmleap/video/next/JaneGoodall.mp4"
mode = vi.LabelDetectionMode.SHOT_MODE
segment = vi.VideoSegment(
    start_time_offset=timedelta(seconds=0),
    end_time_offset=timedelta(seconds=37),
)

response = detect_labels(video_uri, mode, [segment])

In [None]:
def print_video_labels(response):
    # First result only, as a single video is processed
    labels = response.annotation_results[0].segment_label_annotations
    sort_by_first_segment_confidence(labels)

    print(f" Video labels: {len(labels)} ".center(80, "-"))
    for label in labels:
        categories = category_entities_to_str(label.category_entities)
        for segment in label.segments:
            confidence = segment.confidence
            t1 = segment.segment.start_time_offset.total_seconds()
            t2 = segment.segment.end_time_offset.total_seconds()
            print(
                f"{confidence:4.0%}",
                f"{t1:7.3f}",
                f"{t2:7.3f}",
                f"{label.entity.description}{categories}",
                sep=" | ",
            )


def sort_by_first_segment_confidence(labels):
    labels.sort(key=lambda label: label.segments[0].confidence, reverse=True)


def category_entities_to_str(category_entities):
    if not category_entities:
        return ""
    entities = ", ".join([e.description for e in category_entities])
    return f" ({entities})"

In [None]:
print_video_labels(response)

## Detect explicit content

In [None]:
from google.cloud import videointelligence_v1 as vi

def detect_explicit_content(video_uri, segments=None):
    video_client = vi.VideoIntelligenceServiceClient()
    features = [vi.Feature.EXPLICIT_CONTENT_DETECTION]
    context = vi.VideoContext(segments=segments)
    request = vi.AnnotateVideoRequest(
        input_uri=video_uri,
        features=features,
        video_context=context,
    )
    print(f"Processing video: {video_uri}...")
    operation = video_client.annotate_video(request)
    return operation.result()

In [None]:
from datetime import timedelta

video_uri = "gs://cloudmleap/video/next/JaneGoodall.mp4"
segment = vi.VideoSegment(
    start_time_offset=timedelta(seconds=0),
    end_time_offset=timedelta(seconds=10),
)

response = detect_explicit_content(video_uri, [segment])

In [None]:
def print_explicit_content(response):
    from collections import Counter

    # First result only, as a single video is processed
    frames = response.annotation_results[0].explicit_annotation.frames
    likelihood_counts = Counter([f.pornography_likelihood for f in frames])

    print(f" Explicit content frames: {len(frames)} ".center(40, "-"))
    for likelihood in vi.Likelihood:
        print(f"{likelihood.name:<22}: {likelihood_counts[likelihood]:>3}")

In [None]:
print_explicit_content(response)

## Transcribe speech

In [None]:
from google.cloud import videointelligence_v1 as vi


def transcribe_speech(video_uri, language_code, segments=None):
    video_client = vi.VideoIntelligenceServiceClient()
    features = [vi.Feature.SPEECH_TRANSCRIPTION]
    config = vi.SpeechTranscriptionConfig(
        language_code=language_code,
        enable_automatic_punctuation=True,
    )
    context = vi.VideoContext(
        segments=segments,
        speech_transcription_config=config,
    )
    request = vi.AnnotateVideoRequest(
        input_uri=video_uri,
        features=features,
        video_context=context,
    )
    print(f"Processing video: {video_uri}...")
    operation = video_client.annotate_video(request)
    return operation.result()

In [None]:
from datetime import timedelta

video_uri = "gs://cloudmleap/video/next/JaneGoodall.mp4"
language_code = "en-GB"
segment = vi.VideoSegment(
    start_time_offset=timedelta(seconds=55),
    end_time_offset=timedelta(seconds=80),
)

response = transcribe_speech(video_uri, language_code, [segment])

In [None]:
def print_video_speech(response, min_confidence=0.8):
    def keep_transcription(transcription):
        return min_confidence <= transcription.alternatives[0].confidence

    # First result only, as a single video is processed
    transcriptions = response.annotation_results[0].speech_transcriptions
    transcriptions = [t for t in transcriptions if keep_transcription(t)]

    print(f" Speech Transcriptions: {len(transcriptions)} ".center(80, "-"))
    for transcription in transcriptions:
        best_alternative = transcription.alternatives[0]
        confidence = best_alternative.confidence
        transcript = best_alternative.transcript
        print(f" {confidence:4.0%} | {transcript.strip()}")

In [None]:
print_video_speech(response)

In [None]:
def print_word_timestamps(response, min_confidence=0.8):
    def keep_transcription(transcription):
        return min_confidence <= transcription.alternatives[0].confidence

    # First result only, as a single video is processed
    transcriptions = response.annotation_results[0].speech_transcriptions
    transcriptions = [t for t in transcriptions if keep_transcription(t)]

    print(f" Word Timestamps ".center(80, "-"))
    for transcription in transcriptions:
        best_alternative = transcription.alternatives[0]
        confidence = best_alternative.confidence
        for word in best_alternative.words:
            t1 = word.start_time.total_seconds()
            t2 = word.end_time.total_seconds()
            word = word.word
            print(f"{confidence:4.0%} | {t1:7.3f} | {t2:7.3f} | {word}")

In [None]:
print_word_timestamps(response)

## Detect and track text

In [None]:
from google.cloud import videointelligence_v1 as vi


def detect_text(video_uri, language_hints=None, segments=None):
    video_client = vi.VideoIntelligenceServiceClient()
    features = [vi.Feature.TEXT_DETECTION]
    config = vi.TextDetectionConfig(
        language_hints=language_hints,
    )
    context = vi.VideoContext(
        segments=segments,
        text_detection_config=config,
    )
    request = vi.AnnotateVideoRequest(
        input_uri=video_uri,
        features=features,
        video_context=context,
    )
    print(f"Processing video: {video_uri}...")
    operation = video_client.annotate_video(request)
    return operation.result()

In [None]:
from datetime import timedelta

video_uri = "gs://cloudmleap/video/next/JaneGoodall.mp4"
segment = vi.VideoSegment(
    start_time_offset=timedelta(seconds=13),
    end_time_offset=timedelta(seconds=27),
)

response = detect_text(video_uri, segments=[segment])
      



In [None]:
def print_video_text(response, min_frames=15):
    # First result only, as a single video is processed
    annotations = response.annotation_results[0].text_annotations
    sort_by_first_segment_start(annotations)

    print(f" Detected Text ".center(80, "-"))
    for annotation in annotations:
        for segment in annotation.segments:
            frames = len(segment.frames)
            if frames < min_frames:
                continue
            text = annotation.text
            confidence = segment.confidence
            start = segment.segment.start_time_offset
            seconds = segment_seconds(segment.segment)
            print(text)
            print(f"  {confidence:4.0%} | {start} + {seconds:.1f}s | {frames} fr.")


def sort_by_first_segment_start(annotations):
    def first_segment_start(annotation):
        return annotation.segments[0].segment.start_time_offset.ToMilliseconds()

    annotations.sort(key=first_segment_start)


def segment_seconds(segment):
    t1 = segment.start_time_offset.total_seconds()
    t2 = segment.end_time_offset.total_seconds()
    return t2 - t1


In [None]:
print_video_text(response)