# Video Indexer

### Import Environment Variables
### Below Code Block contains all the necessary constants fetched from the .env file

In [53]:
import os
import os
from dotenv import load_dotenv
load_dotenv()
BLOBACCOUNTNAME = os.getenv('BLOBACCOUNTNAME')
BLOBACCESSKEY = os.getenv('BLOBACCESSKEY')
BLOBACCOUNTNAME = os.getenv('BLOBACCOUNTNAME')
BLOBACCESSKEY = os.getenv('BLOBACCESSKEY')
AZUR_ENDPOINT = os.getenv('AZUR_ENDPOINT')
AZUR_OPENAI_KEY = os.getenv('AZUR_OPENAI_KEY')
SEARCH_KEY = os.getenv('SEARCH_KEY')
SEARCH_ENDPOINT = os.getenv('SEARCH_ENDPOINT')

### get container clinet

In [86]:
# Function to get the container client to store the video in to

from azure.storage.blob import BlobServiceClient

print(BLOBACCESSKEY)
print(BLOBACCOUNTNAME)
def CreateContainerIfNotExists(container_name):
    """
    Function Information:
        Info : Create container for particular KB for store upload document in this container
    """

    global account_name, account_key

    try:

        blob_service_client = BlobServiceClient(account_url=f"https://{BLOBACCOUNTNAME}.blob.core.windows.net", credential=BLOBACCESSKEY)

        # Azur container client
        container_client = blob_service_client.get_container_client(container_name)
        if not container_client.exists():
            container_client.create_container()
        return True, container_client
    except Exception as e:
        print(e)
        return False, None
    
print(CreateContainerIfNotExists('tenant-151-safetycheck'))

### get container SAS token

In [24]:
# Function code to get the Container SAS Token
from datetime import datetime, timedelta
from azure.storage.blob import generate_container_sas, ContainerSasPermissions

def CreateContainerSASToken(container_name='tenant-151-safetycheck'):
    """
    Function information:
        Info : Create SAS token for particular KB for access that document of that KB
    """


    try:
        # SAS token information        
        token_duration = timedelta(hours=1)
        permissions = ContainerSasPermissions(read = True, list=True)
        sas_token = generate_container_sas(
            account_name=BLOBACCOUNTNAME,
            container_name=container_name,
            account_key=BLOBACCESSKEY,
            permission=permissions,
            expiry=  datetime.utcnow() + token_duration
        )
        return sas_token
    except Exception as e:
        print(e)
        return False

### Upload Video to Storage account

In [25]:
# Code to upload Video to AZURE blob storage account
import uuid
def UploadVideoToStorageAccount(file_path, container_name = 'tenant-151-safetycheck'):
    # file_path = '/content/test-video-101.mp4'
    with open(file_path, 'rb') as video_file:
        create_container_status, container_client = CreateContainerIfNotExists(container_name)

        file_extension = file_path.split(".")[-1]
        upload_file_name = f"test-videos/{uuid.uuid4()}.{file_extension}"

        # Upload blob
        blob_client = container_client.get_blob_client(upload_file_name)
        video_data = video_file.read()
        blob_client.upload_blob(data=video_data, overwrite=True)

        # Generate video URL
        video_url = blob_client.url
        print(f"Video successfully uploaded: {video_url}")
        return video_url
    
    print("something went wrong")

### Get Bearer Token

In [26]:
# GET API bearer token:
import requests

def get_api_bearer_token():
    '''
    Function to get bearer token that can be used for getting access token for video indexing
    '''

    # Define the API endpoint
    tenant_id =  os.getenv('AZURE_VIDEO_INDEXER_TENANT_ID')
    url = f"https://login.microsoftonline.com/{tenant_id}/oauth2/v2.0/token"

    # Prepare the data and files
    client_id = os.getenv('AZURE_VIDEO_INDEXER_CLIENT_ID')
    client_secret = os.getenv('AZURE_VIDEO_INDEXER_CLIENT_SECRET')
    grant_type = "client_credentials"
    scope = "https://management.azure.com/.default"

    data = {
        "client_id": client_id,
        "client_secret": client_secret,
        "grant_type": grant_type,
        "scope": scope
    }

    # Send the request
    response = requests.post(url, data=data)

    # Handle the response
    if response.status_code == 200:
        return response.json()['access_token']
    else:
        print("Failed to get bearer token")
        print("Status code:", response.status_code)
        print("Response:", response.text)


### get access token

In [27]:
import requests

def get_access_token():
    '''
    Function to get bearer token that can be used for getting access token for video indexing
    '''
    subscription_id = os.getenv('AZURE_VIDEO_INDEXER_SUBSCRIPTION_ID')
    resource_group = os.getenv('AZURE_VIDEO_INDEXER_RESOURCE_GROUP')
    account_name = os.getenv('AZURE_VIDEO_INDEXER_ACCOUNT_NAME')
    version = '2022-08-01'

    url = f"https://management.azure.com/subscriptions/{subscription_id}/resourcegroups/{resource_group}/providers/Microsoft.VideoIndexer/accounts/{account_name}/generateAccessToken?api-version={version}"
    
    data = {
        "permissionType": "Contributor",
        "scope": "Account"
    }

    bearer_token = get_api_bearer_token()

    headers = {
        "Content-Type": 'application/json',
        "Authorization": f"Bearer {bearer_token}"
    }

    response = requests.post(url, headers=headers, json=data)
    
    if response.status_code == 200:
        return response.json()['accessToken']
    else:
        print("Failed to get access token")
        print("Status code:", response.status_code)
        print("Response:", response.text)

### Functionality for uploading video using video url to Azure Video Indexer 

In [None]:
# Upload video for indexing
# please mention video name before running the code. 

########### Python 3.2 #############
import urllib.request, json, requests
from urllib.parse import quote


def upload_video_for_indexing(video_url):

    try:

        access_token = get_access_token()
        account_id = os.getenv('AZURE_VIDEO_INDEXER_ACCOUNT_ID')

        video_name = ''
        url = f"https://api.videoindexer.ai/eastus/Accounts/{account_id}/Videos?name={video_name}&videoUrl={quote(video_url)}&accessToken={access_token}&preventDuplicates=False"
        hdr ={
        # Request headers
        'Ocp-Apim-Subscription-Key':  os.getenv('AZURE_VIDEO_INDEXER_OCM_SUBSCRIPTION_KEY')
        }

        request = urllib.request.Request(url, headers=hdr, method='POST')
        response = urllib.request.urlopen(request)
        
        if response.status == 200:
            print("video uploaded successfully")
            response_data = json.loads(response.read().decode('utf-8'))
            print(response_data)
            return response_data
        else:
            print("Failed to upload video")
            print("Status code:", response.status)
            response_body = response.read().decode('utf-8')
            print("Response:", response_body)
    except urllib.error.HTTPError as e:
        print("HTTP Error:", e.code, e.reason)
    except urllib.error.URLError as e:
        print("URL Error:", e.reason)
    except Exception as e:
        print("Something went wrong")
        print(e)
####################################

### Function to Run until the video goes from processing state to processed state......

In [None]:
import time
from typing import Optional

def wait_for_index_sync(video_id:str) -> None:
    '''
    Calls getVideoIndex API in 10 second intervals until the indexing state is 'processed'
    (https://api-portal.videoindexer.ai/api-details#api=Operations&operation=Get-Video-Index).
    Prints video index when the index is complete, otherwise throws exception.

    :param video_id: The video ID to wait for
    :param language: The language to translate video insights
    '''
    accountId = os.getenv('AZURE_VIDEO_INDEXER_ACCOUNT_ID')
    access_token = get_access_token()
    url = f'https://api.videoindexer.ai/eastus/Accounts/{accountId}/Videos/{video_id}/Index'

    params = {
        'accessToken': access_token
    }

    print(f'Checking if video {video_id} has finished indexing...')
    processing = True
    start_time = time.time()
    while processing:
        response = requests.get(url, params=params)

        response.raise_for_status()

        video_result = response.json()
        video_state = video_result.get('state')

        if video_state == 'Processed':
            processing = False
            print(f'The video index has completed. Here is the full JSON of the index for video ID {video_id}: \n{video_result}')
            break
        elif video_state == 'Failed':
            processing = False
            print(f"The video index failed for video ID {video_id}.")
            break

        print(f'The video index state is {video_state}')
 
        time.sleep(10)


### Get Video Indexing using video id, recieved in response of Upload video indexing API

In [None]:
# get video index using video id got from uploading video for indexing
from pprint import pprint
import urllib.request
import json

########### Python 3.2 #############
def get_video_indexing(video_id):
    try:
        access_token = get_access_token()
        accountId = os.getenv('AZURE_VIDEO_INDEXER_ACCOUNT_ID')
        url = f"https://api.videoindexer.ai/eastus/Accounts/{accountId}/Videos/{video_id}/Index?includeSummarizedInsights=true&accessToken={access_token}"

        hdr ={
        # Request headers
        'Cache-Control': 'no-cache',
        'Ocp-Apim-Subscription-Key': os.getenv('AZURE_VIDEO_INDEXER_OCM_SUBSCRIPTION_KEY'),
        }

        req = urllib.request.Request(url, headers=hdr)

        req.get_method = lambda: 'GET'
        response = urllib.request.urlopen(req)
        response_data = response.read().decode("utf-8")
        response_json = json.loads(response_data)
        return response_json

    except Exception as e:
        print(e)  
####################################

### Main Python Code:

In [47]:
# either use input function to enter video path at runtime or update using static code for testing.
# file_path = input("Enter video file path:")

file_path = '../What is MCP_ Integrate AI Agents with Databases & APIs.mp4'


print('Uploading Video to Azure Storage account.')
# default container to store test videos is set as 'tenant-151-safetycheck', can be changed by adding container name in the UploadVideoToStorageAccount as 2nd argument.
video_url = UploadVideoToStorageAccount(file_path)

print('Generating video url with embedded SAS Token.')
# in the below step also in Create ContainerSASToken function default container is set as tenant-151-safetycheck, can be updated by passing the container name as an argument to the CreateContainerSASToken
access_video_url = f'{video_url}?{CreateContainerSASToken()}'

print('Uploading Video to Azure Video Indexer.')
response_data = upload_video_for_indexing(access_video_url)

azure_video_id = response_data['id']
print('Checking Video Processing State.')


# optional loop below that keeps checking if a video is in processing state or processed state. Instead check_video_indexing_status can also be used to get video indexing status at anytime.

wait_for_index_sync(azure_video_id)


In [49]:
def check_video_indexing_status(video_id):
    accountId = os.getenv('AZURE_VIDEO_INDEXER_ACCOUNT_ID')
    access_token = get_access_token()
    url = f'https://api.videoindexer.ai/eastus/Accounts/{accountId}/Videos/{video_id}/Index'

    params = {
        'accessToken': access_token
    }

    print(f'Checking if video {video_id} has finished indexing...')
    response = requests.get(url, params=params)

    response.raise_for_status()

    video_result = response.json()
    video_state = video_result.get('state')
    
    print(f'Video Processing has been: {video_state}')

check_video_indexing_status('2wur8zc1t2')    

In [85]:
# getting inshights from video indexer of the video using video id.
def get_video_insights(video_id):
    account_id = os.getenv('AZURE_VIDEO_INDEXER_ACCOUNT_ID')
    location = os.getenv('AZURE_VIDEO_INDEXER_LOCATION', 'eastus')
    access_token = get_access_token()

    url = f"https://api.videoindexer.ai/eastus/Accounts/{account_id}/Videos/{video_id}/Index?accessToken={access_token}"
    
    
    params = {
        'accessToken': access_token
    }

    print(f'Fetching insights for video {video_id}...')
    response = requests.get(url, params=params)
    response.raise_for_status()

    video_insights_data = response.json().get('videos', [])[0]
    return video_insights_data
    
# Example usage
get_video_insights('2wur8zc1t2')



In [106]:
'''
No need to run this cell again.
This code block was only used to create the Azure Cognitive Search index named "video-insights-index".
It has been commented out to prevent accidental re-execution.
'''

# from azure.search.documents.indexes import SearchIndexClient
# from azure.core.credentials import AzureKeyCredential
# from azure.search.documents.indexes.models import *

# admin_key = SEARCH_KEY
# index_name = "video-insights-index"

# client = SearchIndexClient(
#     endpoint='https://pri42291devaisearch.search.windows.net',
#     credential=AzureKeyCredential(admin_key)
# )

# fields = [
#     SimpleField(name="videoId", type="Edm.String", key=True),
#     SearchableField(name="transcript", type="Edm.String"),
#     SearchableField(name="faces", type="Edm.String", collection=True),
#     SearchableField(name="keywords", type="Edm.String", collection=True),
#     SearchableField(name="topics", type="Edm.String", collection=True),
#     SearchableField(name="emotions", type="Edm.String", collection=True)
# ]

# index = SearchIndex(name=index_name, fields=fields)
# client.create_index(index)


In [108]:
'''
Code Block to parse insights into a flatten document.
'''
import json

def parse_insights(video_insights):
    
    insights = video_insights.get('insights')
    transcript = " ".join([t["text"] for t in insights.get("transcript", [])])
    faces = [f["name"] for f in insights.get("faces", []) if f.get("name") != "Unknown"]
    keywords = [k["text"] for k in insights.get("keywords", [])]
    topics = [t["name"] for t in insights.get("topics", [])]
    emotions = [e["type"] for e in insights.get("emotions", [])]

    return {
        "videoId": video_insights.get("id"),
        "transcript": transcript,
        "faces": faces,
        "keywords": keywords,
        "topics": topics,
        "emotions": emotions
    }

raw = get_video_insights('2wur8zc1t2')
print(raw)
with open('output.json', 'w', encoding='utf-8') as f:
  json.dump(raw, f, indent=4)
doc = parse_insights(raw)
print(doc)

# Load insights.json

# '''
# Example Document:
# {
#   "videoId": "vid123",
#   "transcript": "In this keynote, Elon Musk talks about AI risks...",
#   "faces": ["Elon Musk"],
#   "keywords": ["AI", "Ethics", "Technology"],
#   "topics": ["Artificial Intelligence", "Moral Responsibility"],
#   "emotions": ["neutral", "positive"]
# }

# '''

### Below code is used for uploading derived video insights to azure cognitive search


In [None]:
'''
Push Documents to Azure Cognitive Search
'''
from azure.search.documents import SearchClient
from azure.core.credentials import AzureKeyCredential

index_name = "video-insights-index"
admin_key = SEARCH_KEY

search_client = SearchClient(
    endpoint=SEARCH_ENDPOINT,
    index_name=index_name,
    credential=AzureKeyCredential(admin_key)
)

raw = get_video_insights('2wur8zc1t2')
doc = parse_insights(raw)
with open('output.json', 'w', encoding='utf-8') as f:
  json.dump(doc, f, indent=4)
print(doc)
# Push to index
search_client.upload_documents(documents=[doc])
print("Uploaded to search index.")


In [117]:
## Just a simple index based search

from azure.search.documents import SearchClient
from azure.core.credentials import AzureKeyCredential

endpoint = SEARCH_ENDPOINT
index_name = "video-insights-index"
key = SEARCH_KEY


search_client = SearchClient(endpoint=endpoint, index_name=index_name, credential=AzureKeyCredential(key))
results = search_client.search(search_text="elon")
print(results)
if not results:
    print('Nothing simmilar found')
for result in results:
    print(result)
    print("Transcript (snippet):", result["transcript"][:150])
    print("Faces:", result["faces"])
    print("Keywords:", result["keywords"])
    print("Topics:", result["topics"])
    print("Emotions:", result["emotions"])
    print("=" * 60)
