# Azure Video Analytics Demo

## Create Azure AI Search and retrieve data

In [None]:
import os

from azure.core.credentials import AzureKeyCredential
from azure.identity import DefaultAzureCredential, get_bearer_token_provider
from azure.search.documents import SearchClient
from azure.search.documents.indexes import SearchIndexClient, SearchIndexerClient
from azure.search.documents.indexes.models import (
    AIServicesVisionParameters,
    AIServicesVisionVectorizer,
    AIStudioModelCatalogName,
    AzureMachineLearningVectorizer,
    AzureOpenAIVectorizer,
    AzureOpenAIModelName,
    AzureOpenAIParameters,
    BlobIndexerDataToExtract,
    BlobIndexerParsingMode,
    CognitiveServicesAccountKey,
    DefaultCognitiveServicesAccount,
    ExhaustiveKnnAlgorithmConfiguration,
    ExhaustiveKnnParameters,
    FieldMapping,
    HnswAlgorithmConfiguration,
    HnswParameters,
    IndexerExecutionStatus,
    IndexingParameters,
    IndexingParametersConfiguration,
    InputFieldMappingEntry,
    OutputFieldMappingEntry,
    ScalarQuantizationCompressionConfiguration,
    ScalarQuantizationParameters,
    SearchField,
    SearchFieldDataType,
    SearchIndex,
    SearchIndexer,
    SearchIndexerDataContainer,
    SearchIndexerDataIdentity,
    SearchIndexerDataSourceConnection,
    SearchIndexerSkillset,
    SemanticConfiguration,
    SemanticField,
    SemanticPrioritizedFields,
    SemanticSearch,
    SimpleField,
    VectorSearch,
    VectorSearchAlgorithmKind,
    VectorSearchAlgorithmMetric,
    VectorSearchProfile,
    VisionVectorizeSkill
)
from azure.search.documents.models import (
    HybridCountAndFacetMode,
    HybridSearch,
    SearchScoreThreshold,
    VectorizableTextQuery,
    VectorizableImageBinaryQuery,
    VectorizableImageUrlQuery,
    VectorSimilarityThreshold,
)
from azure.storage.blob import BlobServiceClient
from dotenv import load_dotenv
from IPython.display import Image, display, HTML
from openai import AzureOpenAI

In [None]:
# Load environment variables
load_dotenv()

# Configuration
AZURE_AI_VISION_API_KEY = os.getenv("AZURE_AI_VISION_API_KEY")
AZURE_AI_VISION_ENDPOINT = os.getenv("AZURE_AI_VISION_ENDPOINT")
AZURE_OPENAI_ENDPOINT = os.getenv("AZURE_OPENAI_ENDPOINT")
BLOB_CONNECTION_STRING = os.getenv("BLOB_CONNECTION_STRING")
BLOB_CONTAINER_NAME = os.getenv("BLOB_CONTAINER_NAME")
BLOB_CONNECTION_MANAGEDID_STRING=os.getenv("BLOB_CONNECTION_MANAGEDID_STRING")
INDEX_NAME = "azure-ai-video-analytics-0728"
AZURE_SEARCH_ADMIN_KEY = os.getenv("AZURE_SEARCH_ADMIN_KEY")
AZURE_SEARCH_ENDPOINT = os.getenv("AZURE_SEARCH_ENDPOINT")

In [None]:
# User-specified parameter
USE_AAD_FOR_SEARCH = False  # Set this to False to use API key for authentication

def authenticate_azure_search(api_key=None, use_aad_for_search=False):
    if use_aad_for_search:
        print("Using AAD for authentication.")
        credential = DefaultAzureCredential()
    else:
        print("Using API keys for authentication.")
        if api_key is None:
            raise ValueError("API key must be provided if not using AAD for authentication.")
        credential = AzureKeyCredential(api_key)
    return credential

azure_search_credential = authenticate_azure_search(api_key=AZURE_SEARCH_ADMIN_KEY, use_aad_for_search=USE_AAD_FOR_SEARCH)

## Create a blob data source connector on Azure AI Search

In [None]:
def create_or_update_data_source(indexer_client, container_name, connection_string, index_name):
    """
    Create or update a data source connection for Azure AI Search.
    """
    container = SearchIndexerDataContainer(name=container_name, query="metadata")
    data_source_connection = SearchIndexerDataSourceConnection(
        name=f"{index_name}-blob",
        type="azureblob",
        connection_string=connection_string,
        container=container
    )
    try:
        indexer_client.create_or_update_data_source_connection(data_source_connection)
        print(f"Data source '{index_name}-blob' created or updated successfully.")
    except Exception as e:
        raise Exception(f"Failed to create or update data source due to error: {e}")

# Create a SearchIndexerClient instance
indexer_client = SearchIndexerClient(AZURE_SEARCH_ENDPOINT, azure_search_credential)

# Call the function to create or update the data source
create_or_update_data_source(indexer_client, f"{BLOB_CONTAINER_NAME}", BLOB_CONNECTION_MANAGEDID_STRING, INDEX_NAME)

## Create a search index

In [None]:
def create_fields():
    """Creates the fields for the search index based on the specified schema."""
    return [
        SimpleField(
            name="id", type=SearchFieldDataType.String, key=True, filterable=True
        ),
        SearchField(name="caption", type=SearchFieldDataType.String, searchable=True),
        SearchField(name="video_filename", type=SearchFieldDataType.String, searchable=True),
        SearchField(name="frame_filename", type=SearchFieldDataType.String, searchable=True),
        SearchField(name="frame_url", type=SearchFieldDataType.String, searchable=True),
        SearchField(name="frame_number", type=SearchFieldDataType.String, searchable=True),
        SearchField(name="timestamp", type=SearchFieldDataType.String, searchable=True),
        SearchField(
            name="captionVector",
            type=SearchFieldDataType.Collection(SearchFieldDataType.Single),
            vector_search_dimensions=1024,
            vector_search_profile_name="myHnswProfile",
            stored=False,
            searchable=True
        ),
        SearchField(
            name="imageVector",
            type=SearchFieldDataType.Collection(SearchFieldDataType.Single),
            vector_search_dimensions=1024,
            vector_search_profile_name="myHnswProfile",
            stored=False,
            searchable=True
        ),
    ]

def create_vector_search_configuration():
    """Creates the vector search configuration."""
    return VectorSearch(
        algorithms=[
            HnswAlgorithmConfiguration(
                name="myHnsw",
                parameters=HnswParameters(
                    m=4,
                    ef_construction=400,
                    ef_search=500,
                    metric=VectorSearchAlgorithmMetric.COSINE,
                ),
            )
        ],
        compressions=[
            ScalarQuantizationCompressionConfiguration(
                name="myScalarQuantization",
                rerank_with_original_vectors=True,
                default_oversampling=10,
                parameters=ScalarQuantizationParameters(quantized_data_type="int8"),
            )
        ],
        vectorizers=[
            AIServicesVisionVectorizer(
                name="myAIServicesVectorizer",
                kind="aiServicesVision",
                ai_services_vision_parameters=AIServicesVisionParameters(
                    model_version="2023-04-15",
                    resource_uri=AZURE_AI_VISION_ENDPOINT,
                    api_key=AZURE_AI_VISION_API_KEY,
                ),
            )
        ],
        profiles=[
            VectorSearchProfile(
                name="myHnswProfile",
                algorithm_configuration_name="myHnsw",
                compression_configuration_name="myScalarQuantization",
                vectorizer="myAIServicesVectorizer",
            )
        ],
    )


def create_search_index(index_client, index_name, fields, vector_search):
    """Creates or updates a search index."""
    index = SearchIndex(
        name=index_name,
        fields=fields,
        vector_search=vector_search,
    )
    index_client.create_or_update_index(index=index)


index_client = SearchIndexClient(
    endpoint=AZURE_SEARCH_ENDPOINT, credential=azure_search_credential
)
fields = create_fields()
vector_search = create_vector_search_configuration()

# Create the search index with the adjusted schema
create_search_index(index_client, INDEX_NAME, fields, vector_search)
print(f"Created index: {INDEX_NAME}")

## Create a Skillset    

In [None]:
def create_text_embedding_skill():
    return VisionVectorizeSkill(
        name="text-embedding-skill",
        description="Skill to generate embeddings for text via Azure AI Vision",
        context="/document",
        model_version="2023-04-15",
        inputs=[InputFieldMappingEntry(name="text", source="/document/caption")],
        outputs=[OutputFieldMappingEntry(name="vector", target_name="captionVector")],
    )

def create_image_embedding_skill():
    return VisionVectorizeSkill(
        name="image-embedding-skill",
        description="Skill to generate embeddings for image via Azure AI Vision",
        context="/document",
        model_version="2023-04-15",
        inputs=[
			InputFieldMappingEntry(name="url", source="/document/frame_url_with_sas")
        ],
        outputs=[OutputFieldMappingEntry(name="vector", target_name="imageVector")],
    )

def create_skillset(client, skillset_name, text_embedding_skill, image_embedding_skill):
    skillset = SearchIndexerSkillset(
        name=skillset_name,
        description="Skillset for generating embeddings",
        skills=[text_embedding_skill, image_embedding_skill],
        cognitive_services_account=CognitiveServicesAccountKey(
            key=AZURE_AI_VISION_API_KEY,
            description="AI Vision Multi Service Account in West US",
        ),
    )
    client.create_or_update_skillset(skillset)

client = SearchIndexerClient(
    endpoint=AZURE_SEARCH_ENDPOINT, credential=azure_search_credential
)
skillset_name = f"{INDEX_NAME}-skillset"
text_embedding_skill = create_text_embedding_skill()
image_embedding_skill = create_image_embedding_skill()

create_skillset(client, skillset_name, text_embedding_skill, image_embedding_skill)
print(f"Created skillset: {skillset_name}")

## Run Indexer

In [None]:
def create_and_run_indexer(indexer_client, indexer_name, skillset_name, index_name, data_source_name):
    indexer = SearchIndexer(
        name=indexer_name,
        description="Indexer to index documents and generate embeddings",
        skillset_name=skillset_name,
        target_index_name=index_name,
        data_source_name=data_source_name,
        parameters=IndexingParameters(
            configuration=IndexingParametersConfiguration(
                parsing_mode=BlobIndexerParsingMode.JSON_ARRAY,
                query_timeout=None,
            ),
        ),
        field_mappings=[
            # FieldMapping(source_field_name="id", target_field_name="id"),
            FieldMapping(source_field_name="video_filename", target_field_name="video_filename"),
            FieldMapping(source_field_name="frame_filename", target_field_name="frame_filename"),
            FieldMapping(source_field_name="frame_url", target_field_name="frame_url"),
            FieldMapping(source_field_name="frame_number", target_field_name="frame_number"),
            FieldMapping(source_field_name="timestamp", target_field_name="timestamp"),
        ],
        output_field_mappings=[
            FieldMapping(source_field_name="/document/captionVector", target_field_name="captionVector"),
            FieldMapping(source_field_name="/document/imageVector", target_field_name="imageVector"),
        ],
    )

    indexer_client.create_or_update_indexer(indexer)
    print(f"{indexer_name} created or updated.")

    indexer_client.run_indexer(indexer_name)
    print(f"{indexer_name} is running. If queries return no results, please wait a bit and try again.")

indexer_client = SearchIndexerClient(
    endpoint=AZURE_SEARCH_ENDPOINT, credential=azure_search_credential
)
data_source_name = f"{INDEX_NAME}-blob"
indexer_name = f"{INDEX_NAME}-indexer"

create_and_run_indexer(indexer_client, indexer_name, skillset_name, INDEX_NAME, data_source_name)

## Search Query

In [None]:
from urllib.parse import urlparse

# Exract filename from URL
def extract_filename_from_url(url):
    parsed_url = urlparse(url)
    return parsed_url.path.split('/')[-1]

In [None]:
from azure.storage.blob import BlobServiceClient, BlobClient, generate_blob_sas, BlobSasPermissions
import datetime

def get_url_with_sas(blob_url):
	# Setting for Azure Blob Storage
	blob_service_client = BlobServiceClient.from_connection_string(BLOB_CONNECTION_STRING)

	blob_name = extract_filename_from_url(blob_url)
	blob_client = blob_service_client.get_blob_client(container=BLOB_CONTAINER_NAME, blob=f"images/{blob_name}")
 
	start_time = datetime.datetime.now(datetime.timezone.utc)
	expiry_time = start_time + datetime.timedelta(days=1)

	sas_token = generate_blob_sas(
		account_name=blob_client.account_name,
		container_name=blob_client.container_name,
		blob_name=f"images/{blob_name}",
		account_key=blob_service_client.credential.account_key,
		permission=BlobSasPermissions(read=True),
		expiry=expiry_time,
		start=start_time
	)
	return f"{blob_url}?{sas_token}"

In [None]:
search_history = []

In [None]:
# Initialize the SearchClient
search_client = SearchClient(
    AZURE_SEARCH_ENDPOINT,
    index_name=INDEX_NAME,
    credential=azure_search_credential,
)

# Define the query
query = "Azure AI のアップデート"

vector_query_caption = VectorizableTextQuery(
    text=query,
    k_nearest_neighbors=10,
    fields="captionVector",
)

vector_query_image = VectorizableTextQuery(
    text=query,
    k_nearest_neighbors=10,
    fields="imageVector",
)

# Perform the search
results = search_client.search(
    search_text=query,
    vector_queries=[vector_query_caption, vector_query_image],
    top=10
)

frame_timestamps = []
results_history = []

# Print the results
for result in results:
    results_history.append(result)
    print(f"Caption: {result['caption']}")
    print(f"Score: {result['@search.score']}")
    print(f"timestamp: {result['timestamp']}")
    frame_timestamps.append(round(float(result["timestamp"])))
    frame_url_with_sas = get_url_with_sas(result["frame_url"])
    display(HTML(f'<img src="{frame_url_with_sas}" style="width:200px;"/>'))
    print("-" * 50)

search_history.append({
	"query": query,
	"results": results_history
})

## Create Highlight Video

In [None]:
import cv2
import os

def extract_highlight_frames(video_path, frame_timestamps, pre_post_seconds, fps):
    cap = cv2.VideoCapture(video_path)
    frame_indices = [int(ts * fps) for ts in frame_timestamps]
    highlight_frames = []

    for idx in frame_indices:
        start_frame = max(0, idx - int(pre_post_seconds * fps))
        end_frame = idx + int(pre_post_seconds * fps)
        
        cap.set(cv2.CAP_PROP_POS_FRAMES, start_frame)
        for frame_num in range(start_frame, end_frame + 1):
            ret, frame = cap.read()
            if not ret:
                break
            highlight_frames.append(frame)

    cap.release()
    return highlight_frames

def create_video_from_frames(frames, output_video_path, fps=30, frame_size=None):
    if frame_size is None:
        frame_size = (frames[0].shape[1], frames[0].shape[0])
        
    fourcc = cv2.VideoWriter_fourcc(*'mp4v')  # 'mp4v' is the MP4 codec
    out = cv2.VideoWriter(output_video_path, fourcc, fps, frame_size)

    for frame in frames:
        resized_frame = cv2.resize(frame, frame_size)
        out.write(resized_frame)

    out.release()
    print(f"Video saved to {output_video_path}")

# Path to the video file and highlight timestamps (in seconds)
video_path = 'data/keynote_ms_build_2024.mp4'
pre_post_seconds = 5  # Include frames 5 seconds before and after
fps = 30

# Extract highlight frames
highlight_frames = extract_highlight_frames(video_path, frame_timestamps, pre_post_seconds, fps)

# Path to the output video
output_video_path = 'output_video/highlight_video.mp4'

# Create the video
create_video_from_frames(highlight_frames, output_video_path, fps=fps)


In [None]:
#Note: The openai-python library support for Azure OpenAI is in preview.
#Note: This code sample requires OpenAI Python library version 1.0.0 or higher.
import os
from openai import AzureOpenAI
import json


def chatcompletion(message_text):
	client = AzureOpenAI(
	azure_endpoint = os.getenv("AZURE_OPENAI_ENDPOINT"), 
	api_key=os.getenv("AZURE_OPENAI_API_KEY"),  
	api_version="2024-02-01"
	)


	completion = client.chat.completions.create(
		model="gpt-4o", # model = "deployment_name"
		messages = message_text,
		temperature=0,
		max_tokens=800,
		top_p=0.95,
		frequency_penalty=0,
		presence_penalty=0,
		stop=None
		)
	print(completion.choices[0].message.content)
	return completion.choices[0].message.content


## Generate description of hightlight scene

In [None]:
system_message = f"""
Generate description of highlight scene from search results.

Search results: 

"""

message_text = [
	{"role":"system","content":system_message},
	{"role":"user","content": [
	]}
]
for search_results in search_history:
    
	for entry in search_results["results"]:
		message_text[1]["content"].append(
			{
				"type": "text",
				"content": entry["caption"]
			}
		)
		message_text[1]["content"].append(
			{
				"type": "image_url",
				"image_url": {
					"url": get_url_with_sas(entry["frame_url"])
				}
			}
		)


completion = chatcompletion(message_text)
print(f"Description of highlight scene: {completion}")

## TBD: Edit pick-up frames with interactions

we need to revise system prompt for editting pick-up frames.

**Note that cells after this are incomplete.**

In [None]:
user_prompt = "講演者をアップ"

In [None]:
system_message = f"""
Generate standalone search queries for image searches from user prompts.
You need to integreate previous queries and the latest query.

# Previous query
{search_history[0]["query"]}

# Latest query
{user_prompt}

"""

user_message = f"""
New search query:

"""

message_text = [
	{"role":"system","content":system_message},
	{"role":"user","content": [
		{
			"type": "text",
			"content": user_message
		},
	]}
]

refine_query = chatcompletion(message_text)

In [None]:
refine_query = user_prompt

In [None]:
vector_query_caption = VectorizableTextQuery(
    text=refine_query,
    k_nearest_neighbors=10,
    fields="captionVector",
)

vector_query_image = VectorizableTextQuery(
    text=refine_query,
    k_nearest_neighbors=10,
    fields="imageVector",
)

# Perform the search
results = search_client.search(
    search_text=refine_query,
    vector_queries=[vector_query_caption, vector_query_image],
    top=10
)

frame_timestamps = []
results_history = []

# Print the results
for result in results:
    results_history.append(result)
    print(f"Caption: {result['caption']}")
    print(f"Score: {result['@search.score']}")
    print(f"timestamp: {result['timestamp']}")
    frame_timestamps.append(round(float(result["timestamp"])))
    frame_url_with_sas = get_url_with_sas(result["frame_url"])
    display(HTML(f'<img src="{frame_url_with_sas}" style="width:200px;"/>'))
    print("-" * 50)

search_history.append({
	"query": refine_query,
	"results": results_history
})

## New highlight Video

In [None]:
system_message = """
# Your tasks
- You need to pick up 10 frames from the search results.
- You must output the original JSON data format and key-value pair.

"""

user_message = f"""
# Search results
{search_history}

"""

message_text = [
	{"role":"system","content":system_message},
	{"role":"user","content": [
		{
			"type": "text",
			"content": user_message
		},
	]}
]

aoai_client = AzureOpenAI(
	azure_endpoint = os.getenv("AZURE_OPENAI_ENDPOINT"), 
	api_key=os.getenv("AZURE_OPENAI_API_KEY"),  
	api_version="2024-02-01"
)

refine_results = aoai_client.chat.completions.create(
	model="gpt-4o", # model = "deployment_name"
	messages = message_text,
	response_format={"type": "json_object"},
	temperature=0,
	max_tokens=800,
	top_p=0.95,
	frequency_penalty=0,
	presence_penalty=0,
	stop=None
	)
print(refine_results.choices[0].message.content)

In [None]:
# 動画ファイルのパスとハイライトのタイムスタンプ（秒単位）
video_path = 'data/keynote_ms_build_2024.mp4'
pre_post_seconds = 5  # 前後5秒のフレームを含める
fps = 30

# ハイライトフレームを抽出
highlight_frames = extract_highlight_frames(video_path, frame_timestamps, pre_post_seconds, fps)

# 出力動画のパス
output_video_path = 'highlight_video.mp4'

# 動画を作成
create_video_from_frames(highlight_frames, output_video_path, fps=fps)