In [1]:
import os
import sys
sys.path.append(os.path.abspath(os.path.join(os.path.dirname("/home/agabriel/repos/video-stream-indexing/scripts/query/"), '../..')))

from pymilvus import (
    connections,
    MilvusClient
)

import numpy as np
import cv2
from torch.utils.data import Dataset, DataLoader
from PIL import Image
import argparse

from concurrent.futures import ThreadPoolExecutor, as_completed
from functools import partial

from policies.constants import (MILVUS_HOST, MILVUS_PORT, MILVUS_NAMESPACE,
                                LOG_PATH, RESULT_PATH)
from policies.components import get_model, inference

from index_utils import search, search_global

  _torch_pytree._register_pytree_node(


In [2]:
class VectorDataset(Dataset):
    def __init__(self, milvus, embedding, global_k, global_accuracy, global_f, local_k, fragment_offset, accuracy, result_path, frame_path, parallelism_candidates, parallelism_exports):
        # Search Global Index for candidates
        candidates, _ = search_global("global", embedding.detach().numpy(), ["collection"], int(global_k), float(global_accuracy), global_f)
        
        # Generate video fragments
        fragments = []
        with ThreadPoolExecutor(max_workers=int(parallelism_candidates)) as executor:
            search_partial = partial(
                search, 
                milvus_client=milvus, 
                embedding=embedding.detach().numpy(), 
                fields=["offset", "pk"], 
                local_k=int(local_k),
                fragment_offset=int(fragment_offset), 
                accuracy=float(accuracy), 
                result_path=result_path,
                parallelism=int(parallelism_exports)
            )
            futures = {executor.submit(search_partial, collection_name=collection_name): collection_name for collection_name in candidates}

            for future in as_completed(futures):
                collection_name = futures[future]
                try:
                    fragment, _, _ = future.result()
                    fragments.extend(fragment)
                except Exception as e:
                    print(f"Error processing collection {collection_name}: {e}")

        # Save individual images on disk
        self.frame_path = frame_path # Path to store the frame .jpgs
        frame_count = 0
        print("Generating dataset...")
        for file in fragments:
            cap = cv2.VideoCapture(f"{result_path}/{file}")
            while True:
                ret, frame = cap.read()
                if not ret:
                    break
                cv2.imwrite(f"{frame_path}/frame_{frame_count}.jpg", frame)
                frame_count += 1
            cap.release()
        self.frame_count = frame_count # Dataset length
            
    def __len__(self):
        return self.frame_count

    def __getitem__(self, idx):
        image = Image.open(f"{self.frame_path}/frame_{idx}.jpg")
        image_array = np.array(image)
        return image_array

In [3]:
def main():
    result_path = "/home/agabriel/repos/video-stream-indexing/results"
    os.makedirs(result_path, exist_ok=True)
    frame_path = f"{result_path}/frames"
    os.makedirs(frame_path, exist_ok=True)
    
    ## Connect to Milvus
    print("Connecting to Milvus")
    connections.connect("default", host="localhost", port=19530)
    client = MilvusClient(uri=f"http://localhost:19530")
    
    ## Read image and perform inference
    model, device = get_model()
    img = Image.open("/home/agabriel/repos/video-stream-indexing/benchmarks/experiment3/cholec_frame_ref.png")
    if img.mode == 'RGBA':
        img = img.convert('RGB')
    img.resize((940, 560))
    embedding = inference(model, img, device)

    ## Create the dataset and dataloader
    dataset = VectorDataset(
        milvus=client,
        embedding=embedding,
        global_k=3,
        global_accuracy=0.0,
        global_f=20,
        local_k=20,
        fragment_offset=5,
        accuracy=0.8,
        result_path=RESULT_PATH,
        parallelism_candidates=3,
        parallelism_exports=3,
        frame_path=frame_path
    )
    dataloader = DataLoader(dataset, batch_size=5, shuffle=True)

    ## Example use
    for idx, batch in enumerate(dataloader):
        print(f"Batch {idx} with shape: {batch.shape}")
        last = batch
        break
        
    # Display last batch in separate window
    for idx, img in enumerate(last):
        # Save to disk
        img = Image.fromarray(np.array(img, dtype=np.uint8))
        img.save(f"{result_path}/last_frame_{idx}.jpg")
    
        
main()

Connecting to Milvus
Total hits: 60
Collection: 0.00458836555480957
Load: 0.0035758018493652344
Search: 0.002328157424926758
Filter1: 9.131431579589844e-05
Filter2: 1.7881393432617188e-05
Searching in col02
Exporting fragments from col02
An error occurred: Command '['bash', '/project/scripts/query/export.sh', 'col02', '/project/video_fragments/col02_0_173647945_179926018.h264', '173647945', '179926018']' returned non-zero exit status 127.
An error occurred: Command '['bash', '/project/scripts/query/export.sh', 'col02', '/project/video_fragments/col02_1_197199240_202011079.h264', '197199240', '202011079']' returned non-zero exit status 127.
An error occurred: Command '['bash', '/project/scripts/query/export.sh', 'col02', '/project/video_fragments/col02_2_204066934_206771498.h264', '204066934', '206771498']' returned non-zero exit status 127.
An error occurred: Command '['bash', '/project/scripts/query/export.sh', 'col02', '/project/video_fragments/col02_3_210965401_213760072.h264', '210

ValueError: num_samples should be a positive integer value, but got num_samples=0