# Project Week 1: ActivityNet Video Data Preparation and Indexing

In this example we will use the ActivityNet dataset https://github.com/activitynet/ActivityNet. 

 - Select the 10 videos with more moments.
 - Download these videos onto your computer.
 - Extract the frames for every video.
 - Read the textual descriptions of each video.
 - Index the video data in OpenSearch.

 In this week, you will index the video data and make it searchable with OpenSearch. You should refer to the OpenSearch tutorial laboratory.

## Select videos
Download the `activity_net.v1-3.min.json` file containing the list of videos. The file is in the github repository of ActivityNet.
Parse this file and select the 10 videos with more moments.

In [1]:
import json
from pprint import pprint
import av
import os
import pprint as pp
from opensearchpy import OpenSearch
from opensearchpy import helpers
import requests
import pickle

In [2]:
# Load both JSON files
with open('captions/val_1.json', 'r') as file1, open('captions/val_2.json', 'r') as file2:
    data1 = json.load(file1)
    data2 = json.load(file2)

In [3]:
def merge_timestamp_dictionaries(ident, dict1, dict2, res = {}):
    # Extract all timestamp-string pairs from both dictionaries
    pairs = []
    dic_ident = "v_" + ident
    # Add pairs from dict1
    for i in range(len(dict1[dic_ident]['timestamps'])):
        pairs.append((dict1[dic_ident]['timestamps'][i], dict1[dic_ident]['sentences'][i]))
    
    # Add pairs from dict2
    for i in range(len(dict2[dic_ident]['timestamps'])):
        pairs.append((dict2[dic_ident]['timestamps'][i], dict2[dic_ident]['sentences'][i]))
    
    # Sort pairs by timestamp
    pairs.sort(key=lambda x: x[0])
    
    # Create new merged dictionary
    res.update({ident: {
        'duration': dict2[dic_ident]["duration"],
        'timestamps': [pair[0] for pair in pairs],
        'sentences': [pair[1] for pair in pairs]
        }
               }
    )
    
    return res

In [4]:
selected_videos_ids = ["QKEFacWrn_8", "_15t4WTR19s", "eXMF6Skt2To", "TNFoUBRsngY", "od1jHUzgrAU", "gXk9TiqGUHs", "IEqnfSiCIXc", "Ez7s36AwgLk", "mHVmDOxtVt0", "i2X7z9ywHV8"]

In [5]:
db = {}

In [6]:
for entry in selected_videos_ids:
    merge_timestamp_dictionaries(entry, data1, data2, db)

## Video frame extraction

PyAV is a wrapper library providing you access to `ffmpeg`, a command-line video processing tool. In the example below, you will be able to extract frames from the a video shot.

In [7]:
video_dir = "videos"
videos = [os.path.join(video_dir, vid) for vid in os.listdir(video_dir) if vid.endswith(".mp4")]

In [8]:
videos

['videos/i2X7z9ywHV8.mp4',
 'videos/mHVmDOxtVt0.mp4',
 'videos/QKEFacWrn_8.mp4',
 'videos/gXk9TiqGUHs.mp4',
 'videos/IEqnfSiCIXc.mp4',
 'videos/od1jHUzgrAU.mp4',
 'videos/TNFoUBRsngY.mp4',
 'videos/Ez7s36AwgLk.mp4',
 'videos/dSdZz_Royyc.mp4',
 'videos/eXMF6Skt2To.mp4',
 'videos/_15t4WTR19s.mp4']

In [9]:
frames = {}
video_dir = "videos"
frames_path = "frames_dict.pkl"

if os.path.exists(frames_path):
    with open(frames_path, 'rb') as f:
        try:
            frames = pickle.load(f)
        except Exception as e:
            print("Error loading pickle file:", e)


if not frames:
    for vid in selected_videos_ids:
        curr_dir = vid + "_keyframes"
        frames_saved = os.path.isdir(curr_dir)
        
        if not frames_saved:
            os.mkdir(curr_dir)
    
        """
        With this implementation we go through every frame and save them if they:
         - are a key frame, or
         - no other frame was saved within that second
    
        On top of that, we create the dictionary frames that contains every frame where:
        - the key is: video_id + "_" + number_of_saved_frame
        - the value is: another dictionary with "timestamp" and "type"
        The type is either sec - for frames saved for the second they are in - or
        key - for being a keyframe.
        """
        with av.open(os.path.join(video_dir, vid + ".mp4")) as container:
            stream = container.streams.video[0]
            last_saved_second = -1
    
            i = 0
            for j, frame in enumerate(container.decode(stream)):
                if frame.pts is None:
                    print("Something wrong here")
                    continue
                
                # Calculate the timestamp in seconds
                timestamp = float(frame.pts * stream.time_base)
                current_second = int(timestamp)
                
                # Check if this is a keyframe
                is_keyframe = frame.key_frame
                
                # Determine if we should save this frame
                save_frame = False
                frame_type = None
                
                if is_keyframe:
                    # Always save keyframes
                    save_frame = True
                    frame_type = "key"
                elif current_second > last_saved_second:
                    # Save non-keyframes only if we don't have a frame for this second yet
                    save_frame = True
                    frame_type = "sec"
                
                if save_frame:
                    i+=1
                    # Update the last second we saved a frame for
                    last_saved_second = current_second
                    
                    # Create a descriptive frame name
                    frame_name = f"{vid}_{i}"
                    name = os.path.join(curr_dir, frame_name + ".jpg")
                    
                    # Update the frames dictionary
                    frames.update({
                        frame_name: {"timestamp": timestamp,
                                     "type": frame_type
                                    }
                    })
                    
                    # Save the frame image if required
                    if not frames_saved:
                        frame.to_image().save(name, quality=80)

    with open(frames_path, 'wb') as f:
        pickle.dump(frames, f)

## Video metadata

Process the video metadata provided in the `json` file and index the video data in OpenSearch.

### Check the current OpenSearch Index

locally

In [10]:
host = 'localhost'
port = 9200

# Define the connection to the local OpenSearch server
client = OpenSearch(
    hosts = [{'host': host, 'port': port}],
    http_auth = ('admin', 'JIMMY\neutron509'),  
    http_compress = True  # Enables gzip compression for request bodies
    #use_ssl = True,
    #verify_certs = False,
    #ssl_assert_hostname = False
)

index_name = "wiirijo"  # Replace with your actual index name

# Check if index exists
if client.indices.exists(index=index_name):
    resp = client.indices.open(index=index_name)
    print(resp)

    print('\n----------------------------------------------------------------------------------- INDEX SETTINGS')
    settings = client.indices.get_settings(index=index_name)
    pp.pprint(settings)

    print('\n----------------------------------------------------------------------------------- INDEX MAPPINGS')
    mappings = client.indices.get_mapping(index=index_name)
    pp.pprint(mappings)

    print('\n----------------------------------------------------------------------------------- INDEX #DOCs')
    print(client.count(index=index_name))
else:
    print("Index does not exist.")


{'acknowledged': True, 'shards_acknowledged': True}

----------------------------------------------------------------------------------- INDEX SETTINGS
{'wiirijo': {'settings': {'index': {'creation_date': '1744713255323',
                                    'knn': 'true',
                                    'number_of_replicas': '0',
                                    'number_of_shards': '4',
                                    'provided_name': 'wiirijo',
                                    'refresh_interval': '-1',
                                    'replication': {'type': 'DOCUMENT'},
                                    'uuid': 'AeBZuNqaRsq82Gvj_YKS_g',
                                    'version': {'created': '136407927'}}}}}

----------------------------------------------------------------------------------- INDEX MAPPINGS
{'wiirijo': {'mappings': {'dynamic': 'strict',
                          'properties': {'caption': {'type': 'text'},
                                         

### Delete Existing Index (if needed)

In [28]:
client.indices.delete(index=index_name, ignore=[400, 404])
print(f"Index '{index_name}' deleted.")

Index 'wiirijo' deleted.


### Creating new index with mappings
Play around here ig

In [29]:
index_body = {
   "settings": {
      "index": {
         "number_of_replicas": 0,
         "number_of_shards": 4,
         "refresh_interval": "-1", # Keep it off for now, change it to "1s" later (for searching)
         "knn": "true"
      }
   },
   "mappings": {
      "dynamic": "strict",
      "properties": {
         "video_id": {"type": "keyword"},
         "start_timestamp": {"type": "float"},
         "end_timestamp": {"type": "float"},
         "caption": {"type": "text"},
         "caption_bow": {"type": "text"},
         "caption_vec": {
            "type": "knn_vector",
            "dimension": 768,
            "method": {
               "name": "hnsw",
               "space_type": "innerproduct",
               "engine": "nmslib",
               "parameters": {
                  "m": 16,
                  "ef_construction": 200,
               }
            }
         },
         "duration": {"type": "float"},
         "resolution": {"type": "keyword"},
         "keyframe_path": {"type": "keyword"},
         "keyframe_vec": {
            "type": "knn_vector",
            "dimension": 512,
            "method": {
               "name": "hnsw",
               "space_type": "innerproduct",
               "engine": "nmslib",
               "parameters": {
                  "m": 16,
                  "ef_construction": 200,
               }
            }
         }
      }
   }
}



# Create the index
response = client.indices.create(index=index_name, body=index_body)
print(f"Index '{index_name}' created.")
# Check the index settings
settings = client.indices.get_settings(index=index_name)
print("Index settings:")
pp.pprint(settings)
# Check the index mappings
mappings = client.indices.get_mapping(index=index_name)
print("Index mappings:")
pp.pprint(mappings)



Index 'wiirijo' created.
Index settings:
{'wiirijo': {'settings': {'index': {'creation_date': '1744713255323',
                                    'knn': 'true',
                                    'number_of_replicas': '0',
                                    'number_of_shards': '4',
                                    'provided_name': 'wiirijo',
                                    'refresh_interval': '-1',
                                    'replication': {'type': 'DOCUMENT'},
                                    'uuid': 'AeBZuNqaRsq82Gvj_YKS_g',
                                    'version': {'created': '136407927'}}}}}
Index mappings:
{'wiirijo': {'mappings': {'dynamic': 'strict',
                          'properties': {'caption': {'type': 'text'},
                                         'caption_bow': {'type': 'text'},
                                         'caption_vec': {'dimension': 768,
                                                         'method': {'engine': 'nmslib',

## Video captions

The ActivityNetCaptions dataset https://cs.stanford.edu/people/ranjaykrishna/densevid/ dataset provides a textual description of each videos. Index the video captions on a text field of your OpenSearch index.

### Generating embeddings for Captions

In [11]:
from transformers import AutoTokenizer, AutoModel
import torch
import numpy as np

# Load the pre-trained model and tokenizer
model_name = 'sentence-transformers/all-mpnet-base-v2'
tokenizer = AutoTokenizer.from_pretrained(model_name)
model = AutoModel.from_pretrained(model_name)
model.eval()  # Set the model to evaluation mode

print(f"Loaded model type: {type(model)}") 

def generate_caption_embedding(caption):
    inputs = tokenizer(caption, return_tensors='pt', padding=True, truncation=True)
    with torch.no_grad():
        outputs = model(**inputs)  # Pass the entire dictionary
        # Mean pooling
        embeddings = outputs.last_hidden_state.mean(dim=1).squeeze().numpy()
    return embeddings
    

Loaded model type: <class 'transformers.models.mpnet.modeling_mpnet.MPNetModel'>


### Generate embeddings for Keyframes

dependency:

pip install openai-clip

In [12]:
from PIL import Image
import torch
import open_clip

# Load CLIP model
device = "cuda" if torch.cuda.is_available() else "cpu"
clip_model, _, preprocess = open_clip.create_model_and_transforms('ViT-B-32', pretrained='openai')
clip_model.to(device)
clip_model.eval()

def generate_keyframe_embedding(image_path):
    try:
        image = preprocess(Image.open(image_path)).unsqueeze(0).to(device)
        with torch.no_grad():
            image_features = clip_model.encode_image(image)
            image_features /= image_features.norm(dim=-1, keepdim=True)  # Normalize
        embedding_vector = image_features.cpu().numpy().flatten()
        if len(embedding_vector) == 0:
            print(f"Failed to generate embedding for {image_path}")
            return None
        return embedding_vector
    except Exception as e:
        print(f"Error processing {image_path}: {e}")
        return None




### Indexing Data

In [3]:
from opensearchpy import OpenSearch
from opensearchpy import helpers
import os
import json
import numpy as np
import requests

host = 'api.novasearch.org'
port = 443

user = 'user08' # Add your user name here.
password = '55LL.TTSS' # Add your user password here. For testing only. Don't store credentials in code. 
index_name = user

# Create the client with SSL/TLS enabled, but hostname verification disabled.
client = OpenSearch(
    hosts = [{'host': host, 'port': port}],
    http_compress = True, # enables gzip compression for request bodies
    http_auth = (user, password),
    use_ssl = True,
    url_prefix = 'opensearch_v2',
    verify_certs = False,
    ssl_assert_hostname = False,
    ssl_show_warn = False
)

# Load combined JSON data
with open('captions/val_1.json', 'r') as file1, open('captions/val_2.json', 'r') as file2:
    val1_data = json.load(file1)
    val2_data = json.load(file2)
combined_data = {**val1_data, **val2_data}

with open('activity_net.v1-3.min.json', 'r') as json_data:
    activity_data = json.load(json_data)

# Output directory for frames
video_dir = 'videos'
output_dir = 'keyframes'

# List all video files in the directory (already downloaded)
video_files = [f for f in os.listdir(video_dir) if f.endswith('.mp4')]

# extract video ID from the filename
selected_videos = []
for video_file in video_files:
    video_id = video_file.split('[')[-1].split(']')[0]
    selected_videos[video_id] = 'v_' + video_id


for clean_id, caption_ds_id in selected_videos.items():
    if caption_ds_id not in combined_data:
        print(f"Video ID {video_id} not found in either val_1 or val_2 datasets.")
        continue
    
    video_data = combined_data[caption_ds_id]
    duration = video_data['duration']
    timestamps = video_data['timestamps']
    captions = video_data['sentences']
    
    # fetch resolution from activity_net.v1-3.min.json
    if clean_id in activity_data['database']:
        video_metadata = activity_data['database'][clean_id]
        resolution = video_metadata.get('resolution', 'unknown')
    else:
        resolution = 'unknown'
        
        
    for idx, (timestamp, caption) in enumerate(zip(timestamps, captions)):
        start_time, end_time = timestamp
        caption_embedding = generate_caption_embedding(caption)
        caption_bow = ' '.join(caption.split())
        caption_vec = caption_embedding.tolist()
        
        # keyframe image extraction
        keyframe_path_val1 = os.path.join(output_dir, f"{video_id}_val_1_frame_{idx}.jpg")
        keyframe_path_val2 = os.path.join(output_dir, f"{video_id}_val_2_frame_{idx}.jpg")
        keyframe_path = keyframe_path_val1 if os.path.exists(keyframe_path_val1) else keyframe_path_val2
        if not os.path.exists(keyframe_path):
            print(f"Keyframe image not found for {video_id} at index {idx}.")
            continue

        keyframe_embedding = generate_keyframe_embedding(keyframe_path)
        keyframe_vec = keyframe_embedding.tolist()
        
        # Prepare the document to be indexed
        doc = {
            'video_id': video_id,
            'start_timestamp': start_time,
            'end_timestamp': end_time,
            'caption': caption,
            'caption_bow': caption_bow,
            'caption_vec': caption_vec,
            'duration': duration,
            'resolution': resolution,
            'keyframe_path': keyframe_path,
            'keyframe_vec': keyframe_vec
        }
        
        # Index the document
        try:
            response = client.index(index=index_name, body=doc)
            print(f"Indexed document for video {video_id} with caption '{caption}'")
        except Exception as e:
            print(f"Error indexing document for video {video_id}: {e}")
            continue
# Refresh the index to make the documents searchable
client.indices.refresh(index=index_name)
print("Index refreshed.")

TypeError: list indices must be integers or slices, not str

locally

In [19]:
# Get the activity dataset
with open('activity_net.v1-3.min.json', 'r') as json_data:
    activity_data = json.load(json_data)
# Specify the index name you want to check
index_name = "wiirijo"

# Get document count for the index
doc_count = client.count(index=index_name)

if doc_count['count'] > 0:
    print("Are you sure documents arent already indexed?")
else:
    # Cycle through the videos
    for video_id in selected_videos_ids:
        video_info = db[video_id]
        duration = video_info["duration"]
        timestamps = video_info["timestamps"]
        captions = video_info["sentences"]
        resolution = activity_data["database"][video_id]["resolution"]
        keyframe_dir = video_id + "_keyframes"
        keyframes = [os.path.splitext(f)[0] for f in os.listdir(keyframe_dir)]
    
        # Cycle through the video keyframes
        for frame_name in keyframes:
    
            kf_timestamp = frames[frame_name]["timestamp"]
    
            for i in range(len(timestamps)):
    
                start_time, end_time = timestamps[i]
    
                if start_time <= kf_timestamp <= end_time:
                    
                    caption = captions[i]
                    caption_embedding = generate_caption_embedding(caption)
                    caption_bow = ' '.join(caption.split())
                    caption_vec = caption_embedding.tolist()
                    keyframe_path = os.path.join(keyframe_dir, frame_name + ".jpg")
                    keyframe_embedding = generate_keyframe_embedding(keyframe_path)
                    keyframe_vec = keyframe_embedding.tolist()
                    
                    doc = {
                            'video_id': video_id,
                            'start_timestamp': start_time,
                            'end_timestamp': end_time,
                            'caption': caption,
                            'caption_bow': caption_bow,
                            'caption_vec': caption_vec,
                            'duration': duration,
                            'resolution': resolution,
                            'keyframe_path': keyframe_path,
                            'keyframe_vec': keyframe_vec
                        }
                    
                    try:
                        response = client.index(index=index_name, body=doc)
                        #print(f"Indexed document for keyframe {frame_name} with caption '{caption}'")
                    except Exception as e:
                        print(f"Error indexing document for video {video_id}: {e}")
                        continue

    # Refresh the index to make the documents searchable
    client.indices.refresh(index=index_name)
    print("Index refreshed.")

Are you sure documents arent already indexed?


In [14]:
caption = "A person is skateboarding in a park."
embedding = generate_caption_embedding(caption)
print(embedding.shape) 

(768,)


In [15]:
mapping = client.indices.get_mapping(index=index_name)
print(mapping)

{'wiirijo': {'mappings': {'dynamic': 'strict', 'properties': {'caption': {'type': 'text'}, 'caption_bow': {'type': 'text'}, 'caption_vec': {'type': 'knn_vector', 'dimension': 768, 'method': {'engine': 'nmslib', 'space_type': 'innerproduct', 'name': 'hnsw', 'parameters': {'ef_construction': 200, 'm': 16}}}, 'duration': {'type': 'float'}, 'end_timestamp': {'type': 'float'}, 'keyframe_path': {'type': 'keyword'}, 'keyframe_vec': {'type': 'knn_vector', 'dimension': 512, 'method': {'engine': 'nmslib', 'space_type': 'innerproduct', 'name': 'hnsw', 'parameters': {'ef_construction': 200, 'm': 16}}}, 'resolution': {'type': 'keyword'}, 'start_timestamp': {'type': 'float'}, 'video_id': {'type': 'keyword'}}}}}


# Testing

## Check Index Health and Mapping

In [16]:
# Check the health of the OpenSearch cluster
cluster_health = client.cluster.health()
print("Cluster Health:")
pp.pprint(cluster_health)

# Retrieve and print the mapping of the index
index_mapping = client.indices.get_mapping(index=index_name)
print("\nIndex Mapping:")
pp.pprint(index_mapping)

Cluster Health:
{'active_primary_shards': 11,
 'active_shards': 11,
 'active_shards_percent_as_number': 78.57142857142857,
 'cluster_name': 'docker-cluster',
 'delayed_unassigned_shards': 0,
 'discovered_cluster_manager': True,
 'discovered_master': True,
 'initializing_shards': 0,
 'number_of_data_nodes': 1,
 'number_of_in_flight_fetch': 0,
 'number_of_nodes': 1,
 'number_of_pending_tasks': 0,
 'relocating_shards': 0,
 'status': 'yellow',
 'task_max_waiting_in_queue_millis': 0,
 'timed_out': False,
 'unassigned_shards': 3}

Index Mapping:
{'wiirijo': {'mappings': {'dynamic': 'strict',
                          'properties': {'caption': {'type': 'text'},
                                         'caption_bow': {'type': 'text'},
                                         'caption_vec': {'dimension': 768,
                                                         'method': {'engine': 'nmslib',
                                                                    'name': 'hnsw',
                

## Search the Index

In [17]:
# Specify the index name you want to check
index_name = "wiirijo"

# Get document count for the index
doc_count = client.count(index=index_name)

# Print the document count
print(f"Number of documents in {index_name}: {doc_count['count']}")

Number of documents in wiirijo: 6030


In [26]:
query = {
    "query": {
        "match": {
            "caption": "skateboarding in a park"
        }
    }
}

response = client.search(index=index_name, body=query)
print("Search Results:")
for hit in response['hits']['hits']:
    print(f"Caption: {hit['_source']['caption']}, Score: {hit['_score']}")

Search Results:
Caption: A man is skating in a skate park., Score: 7.855377
Caption: A man is skating in a skate park., Score: 7.855377
Caption: A man is skating in a skate park., Score: 7.855377
Caption: A man is skating in a skate park., Score: 7.855377
Caption: A man is skating in a skate park., Score: 7.855377
Caption: A man is skating in a skate park., Score: 7.855377
Caption: A man is skating in a skate park., Score: 7.855377
Caption: A man is skating in a skate park., Score: 7.855377
Caption: A man is skating in a skate park., Score: 7.855377
Caption: A man is skating in a skate park., Score: 7.855377


## Semantic Search with KNN Vectors

In [27]:
import torch
from transformers import AutoTokenizer, AutoModel

# Generate embedding for your query
query_text = "A man skateboarding on a park"
tokenizer = AutoTokenizer.from_pretrained(model_name)
model = AutoModel.from_pretrained(model_name)
model.eval()

inputs = tokenizer(query_text, return_tensors='pt', padding=True, truncation=True)
with torch.no_grad():
    outputs = model(**inputs)
    query_embedding = outputs.last_hidden_state.mean(dim=1).squeeze().numpy()

# Construct the query for OpenSearch
query = {
    "query": {
        "knn": {
            "caption_vec": {
                "vector": query_embedding.tolist(),
                "k": 5  # Number of nearest neighbors to retrieve
            }
        }
    }
}

response = client.search(index=index_name, body=query)
print("KNN Search Results:")
for hit in response['hits']['hits']:
    print(f"Caption: {hit['_source']['caption']}, Score: {hit['_score']}")


KNN Search Results:
Caption: A man is skating in a skate park., Score: 7.1102877
Caption: A man is skating in a skate park., Score: 7.1102877
Caption: A man is skating in a skate park., Score: 7.1102877
Caption: A man is skating in a skate park., Score: 7.1102877
Caption: A man is skating in a skate park., Score: 7.1102877
Caption: A man is skating in a skate park., Score: 7.1102877
Caption: A man is skating in a skate park., Score: 7.1102877
Caption: A man is skating in a skate park., Score: 7.1102877
Caption: A man is skating in a skate park., Score: 7.1102877
Caption: A man is skating in a skate park., Score: 7.1102877
