In [4]:
"""
Copyright (c) Microsoft Corporation.
Licensed under the MIT license.
"""
#!pip install -r  operations/reprocess/requirements.txt

In [13]:
import json
import re
import time
import os
from tqdm import tqdm 
from dotenv import load_dotenv
from azure.cosmos import CosmosClient
from azure.storage.blob import BlobServiceClient
from azure.servicebus import ServiceBusClient, ServiceBusMessage

# Reprocess insights

This notebook can be used to re-process insights. It will take all the insights from Cosmos DB, retrieve their original JSON data file from storage, and send that file to the data processing queue.

This will re-run the `func_dataproc` pipeline, which merges the insights and then writes the results to Cosmos DB.

**Note:** this notebooks requires the Cosmos DB and Service Bus SDKs. See `requirements.txt`.

In [17]:
# Cosmos DB (in Keys -> Primary connection String)
file_with_secrets = "python/.env"
load_dotenv(file_with_secrets)

# Cosmos DB (in Keys -> Primary connection String)
STG_COSMOS_ENDPOINT =  os.getenv("STG_COSMOS_ENDPOINT") 
STG_COSMOS_KEY = os.getenv("STG_COSMOS_KEY") 
STG_COSMOS_NAME = os.getenv("STG_COSMOS_NAME")
STG_CONTAINER =  os.getenv("STG_CONTAINER")

# Storage (connection string is in Access Keys -> Connection String)
BLOB_UPLOAD_STORAGE_KEY = os.getenv("BLOB_UPLOAD_STORAGE_KEY") 
BLOB_CONNECTION_STRING = os.getenv("BLOB_CONNECTION_STRING") 
BLOB_STORAGE_ACCOUNT_NAME = os.getenv("BLOB_STORAGE_ACCOUNT_NAME")
BLOB_CONTAINER_NAME = os.getenv("BLOB_CONTAINER_NAME") 

# Service Bus (take from Shared Access Policies -> RootManageSharedAccessKey -> Primary Connection String)
SERVICEBUS_KEY = os.getenv("SERVICEBUS_KEY")
SERVICEBUS_RESOURCE_NAME = os.getenv("SERVICEBUS_RESOURCE_NAME")
SERVICEBUS_CONNECTION_STRING = os.getenv("SERVICEBUS_CONNECTION_STRING") 
SERVICEBUS_QUEUE = os.getenv("SERVICEBUS_QUEUE")

## Trigger Processing Including Reparsing JSON Metadata

By adding metadata to the blob, the last update date is changed and the trigger orchestrator is triggered.

In [18]:
# list all JSON files in folder
blob_service_client = BlobServiceClient.from_connection_string(BLOB_CONNECTION_STRING)
container_client = blob_service_client.get_container_client(BLOB_CONTAINER_NAME)
# folder in the container to take files from
folder_base = 'msr-vtt/'
all_json_files = list(container_client.list_blobs(name_starts_with = folder_base))
all_json_files = [i for i in all_json_files if i['name'].endswith('.json')]

In [19]:
# add metadata to each blob in folder
for blob in tqdm(all_json_files):
    blob_client = container_client.get_blob_client(blob['name'])
    blob_client.set_blob_metadata({'manually_triggered': '08-15-2022'})

100%|██████████| 4023/4023 [06:12<00:00, 10.81it/s]


## Trigger Processing **WITHOUT** Reparsing JSON Metadata

In [21]:
# Read all items from Cosmos DB

cosmos_client = CosmosClient(STG_COSMOS_ENDPOINT, STG_COSMOS_KEY)
database = cosmos_client.get_database_client(STG_COSMOS_NAME)
cosmos_container = database.get_container_client(STG_CONTAINER)
cosmos_items = list(cosmos_container.read_all_items())

In [22]:
# For each item, read the parsed JSON from silver storage
# And use it to send the JSON messages to the dataproc queue

blob_service_client = BlobServiceClient.from_connection_string(BLOB_CONNECTION_STRING)
container_client = blob_service_client.get_container_client(BLOB_CONTAINER_NAME)

servicebus_client = ServiceBusClient.from_connection_string(conn_str = SERVICEBUS_CONNECTION_STRING, logging_enable=True)
sender = servicebus_client.get_queue_sender(queue_name=SERVICEBUS_QUEUE)

In [23]:
def find_latest_metadata(blobs):
    """Find the latest version of metadata file in a list of blobs."""
    latest_blob = {}
    latest_version = 0

    for blob in blobs:
        match = blob['name'].endswith('.json')

        if match:
            blob_client = container_client.get_blob_client(blob['name'])
            metadata = blob_client.download_blob().readall()
            metadata_dict = json.loads(metadata)
            
            version = metadata_dict['version']
        
            if version > latest_version:
                latest_version = version
                latest_blob = metadata_dict

    return latest_blob

In [27]:
folder_base = 'msr-vtt/'

for i in tqdm(cosmos_items):
    id = i['id']
    desc = i['description']

    if '/' in desc:
        desc = desc.split('/')[1]
    # This below is necessary due to the naming conventions in the sample folder(to be modified if necessary) 
    folder = desc.split('.')[0].split('_')[0]
    folder = folder_base + folder + "/"

    # List all blobs and find and load the latest JSON Metadata
    blobs = container_client.list_blobs(name_starts_with = folder)
    metadata_dict = find_latest_metadata(blobs)

    # Add video_id to the message
    metadata_dict['video_id'] = id

    # Post the message to queue
    payload = json.dumps(metadata_dict)

    message = ServiceBusMessage(payload)
    sender.send_messages(message)

100%|██████████| 3945/3945 [18:42<00:00,  3.51it/s]
