In [1]:
# Install vector database
# ! pip install vectordb

## Import modules


In [1]:
import os
import numpy as np
from tqdm.notebook import trange, tqdm
from PIL import Image, ImageFont, ImageDraw
import torch
import clip
import json as js
from docarray import DocList, BaseDoc
from docarray.typing import NdArray
import numpy as np
from vectordb import InMemoryExactNNVectorDB, HNSWVectorDB
from IPython.display import clear_output, display, HTML
from natsort import natsorted
import pandas as pd
from typing import List
import webbrowser
import shutil
import random
from utils import create_html_script, format_keyframes, clean_dbs, get_all_feats

## Constants


In [2]:
MODEL = "ViT-B/32"
DEVICE = "cuda" if torch.cuda.is_available() else "cpu"
print(DEVICE)

METADATA_PATH = "../data/metadata/"
KEYFRAME_PATH = "../data/keyframes/"
FEATURE_PATH = "../data/features/"
MAP_KEYFRAMES = "../data/map-keyframes/"
VIDEOS_PATH = "../data/videos/"
SCRIPT_PATH = "../data/scripts/"

WORKSPACE = "./vectordb"

cuda


In [3]:
clean_dbs()
format_keyframes()

## Text embedding


In [4]:
class TextEmbedding:
    def __init__(self):
        self.device = DEVICE
        self.model, _ = clip.load(MODEL, device=self.device)

    def __call__(self, text: str) -> np.ndarray:
        text_inputs = clip.tokenize([text]).to(self.device)
        with torch.no_grad():
            text_feature = self.model.encode_text(text_inputs)[0]
        return text_feature.detach().cpu().numpy()

    def __call__(self, texts) -> np.ndarray:
        text_inputs = clip.tokenize(texts).to(self.device)
        with torch.no_grad():
            text_feature = self.model.encode_text(text_inputs)[0]
        return text_feature.detach().cpu().numpy()

## Vector Database


### Frame Document Class


In [5]:
class FrameDoc(BaseDoc):
    embedding: NdArray[512]
    video_name = ""
    image_path = ""
    keyframe_id = 0
    actual_idx = 0
    actual_time = 0.0
    fps=0
    metadata = {}

    def __str__(self):
        return f"""
            Video name: {self.video_name}
            Image path: {self.image_path}
            Keyframe Id: {self.keyframe_id}
            Actual keyframe idx: {self.actual_idx}
            Time: {self.actual_time}
            FPS: {self.fps}
            Metadata: {self.metadata}
          """

## Database Handler


In [6]:
class VectorDB:
    text_embedding = TextEmbedding()
    workspace = os.getcwd()
    method = "ANN"

    def __init__(self, method="ANN"):
        # Check if parent workspace exists
        if not os.path.isdir(WORKSPACE):
            os.mkdir(WORKSPACE, 0o666)
        # Create new workspae
        exits = [int(name.rsplit("_")[1]) for name in os.listdir(WORKSPACE)]
        while True:
            id = random.getrandbits(128)
            if id not in exits:
                self.workspace = os.path.join(
                    self.workspace, WORKSPACE, "DB_" + str(id)
                )
                break

        self.method = method
        #   Approximate Nearest Neighbour based on HNSW algorithm
        if method == "ANN":
            self.DB = HNSWVectorDB[FrameDoc](workspace=self.workspace)

        # Exhaustive search on the embeddings
        else:
            self.DB = InMemoryExactNNVectorDB[FrameDoc](workspace=self.workspace)

    def index(self, doc_list: List[FrameDoc]):
        # Index database
        self.DB.index(inputs=DocList[FrameDoc](doc_list))

    def search(self, query_text: str, topk=100):
        query_doc = FrameDoc(embedding=self.text_embedding(query_text))
        return self.DB.search(inputs=DocList[FrameDoc]([query_doc]), limit=topk)[
            0
        ].matches

    def delete(self, del_doc_list: List[FrameDoc]):
        self.DB.delete(docs=DocList[FrameDoc](del_doc_list))

### Needed functions


### Get all features files


In [7]:
all_feat_files = get_all_feats()
print(all_feat_files)
print(len(all_feat_files))

['../data/features/L01_V001.npy', '../data/features/L01_V002.npy', '../data/features/L01_V003.npy', '../data/features/L01_V004.npy', '../data/features/L01_V005.npy', '../data/features/L01_V006.npy', '../data/features/L01_V007.npy', '../data/features/L01_V008.npy', '../data/features/L01_V009.npy', '../data/features/L01_V010.npy', '../data/features/L01_V011.npy', '../data/features/L01_V012.npy', '../data/features/L01_V013.npy', '../data/features/L01_V014.npy', '../data/features/L01_V015.npy', '../data/features/L01_V016.npy', '../data/features/L01_V017.npy', '../data/features/L01_V018.npy', '../data/features/L01_V019.npy', '../data/features/L01_V020.npy', '../data/features/L01_V021.npy', '../data/features/L01_V022.npy', '../data/features/L01_V023.npy', '../data/features/L01_V024.npy', '../data/features/L01_V025.npy', '../data/features/L01_V026.npy', '../data/features/L01_V027.npy', '../data/features/L01_V028.npy', '../data/features/L01_V029.npy', '../data/features/L01_V030.npy', '../data/

### Create all the Docs


In [8]:
def get_all_docs(npy_files):
    doc_list = []
    for feat_npy in npy_files:
        video_name = feat_npy[feat_npy.find("L") :].split(".")[0]
        feats_arr = np.load(os.path.join(feat_npy))
        # Load metadata
        metadata = {}
        with open(os.path.join(METADATA_PATH, video_name + ".json")) as meta_f:
            metadata = js.load(meta_f)
            map_kf = pd.read_csv(
                os.path.join(MAP_KEYFRAMES, video_name + ".csv"),
                usecols=["pts_time", "fps", "frame_idx"],
            )
            metadata = {key: metadata[key] for key in ["publish_date", "watch_url"]}
            for frame_idx, feat in enumerate(feats_arr):
                image_path = os.path.join(
                    KEYFRAME_PATH, video_name, f"{frame_idx + 1:04d}.jpg"
                )
                actual_idx = map_kf["frame_idx"][frame_idx]
                doc_list.append(
                    FrameDoc(
                        embedding=feat,
                        video_name=video_name,
                        image_path=image_path,
                        keyframe_id=frame_idx + 1,
                        actual_idx=actual_idx,
                        actual_time=map_kf["pts_time"][frame_idx],
                        fps=map_kf["fps"][frame_idx],
                        metadata=metadata,
                    )
                )

    return doc_list

In [67]:
doc_list = get_all_docs(all_feat_files)

## Utils


### !!! Vip pro UI


In [66]:
def FrameDocToImage(docs):
    return [
        {
            "link": doc.metadata["watch_url"].split("v=")[-1],
            "path": doc.image_path,
            "video": doc.video_name,
            "frame": doc.actual_idx,
            "s": str(int(doc.actual_time) // 60)
            + "'"
            + str(round(doc.actual_time - 60 * (int(doc.actual_time) // 60), 1)),
        }
        for doc in docs
    ]

In [11]:
def visualize(docs):
    display(HTML(create_html_script(FrameDocToImage(docs))))

### Open video at specific times


## Filter by audio

In [12]:
SCRIPT_PATH = "../data/scripts/"
def check_script(file_content, keywords):
    for keyword in keywords:
        if (keyword not in file_content):
            return False
    return True
def filter_by_audio(result, keywords):
    for i in range(len(result) - 1, -1, -1):
        transcript_path = SCRIPT_PATH + result[i].video_name + ".txt"
        try:
            with open(transcript_path, 'r') as file:
                content = file.read()
                if (not check_script(content, keywords)):
                    result.pop(i)                 
        except FileNotFoundError:
            pass
    return result

## DEMO


### Create DB


In [13]:
DB = VectorDB()
DB.index(doc_list)

### Query


#### Query text


### Filter 1


In [134]:
results1 = DB.search(
    "the cosplay festival", 10000
)  # Nên lấy nhiều

In [136]:
visualize(filter_by_audio(results1, [])[100:200])

### Filter 2


In [92]:
DB2 = VectorDB()
DB2.index(results1)

In [94]:
results2 = DB2.search("three men are jogging on the road", 500)
visualize(results1[:50])