In [None]:
!pip install -q translate
!pip install -q underthesea==1.3.5a3
!pip install -q underthesea[deep]
!pip install -q pyvi
!pip install -q langdetect
!pip install -q googletrans==3.1.0a0
!pip install -q peft
!pip install bitsandbytes
!pip install transformers
!pip install flash-attn
!pip install -U sentence-transformers
!pip install xformers

In [None]:
!pip install -q faiss-cpu
!pip install -q git+https://github.com/openai/CLIP.git

In [None]:
import os
import torch
import clip
from PIL import Image
import faiss
import numpy as np
import json
import matplotlib.pyplot as plt
import math
import googletrans
import translate
import glob
import underthesea
import sys
import time
from tqdm import tqdm
from pyvi import ViUtils, ViTokenizer
from difflib import SequenceMatcher
from langdetect import detect
from pathlib import Path
import re

In [None]:
ROOT = Path(os.getcwd()).resolve()

# Add ROOT to sys.path
sys.path.append(str(ROOT))

# Determine the working directory
if len(ROOT.parents) > 1:
    WORK_DIR = ROOT.parents[0]
else:
    WORK_DIR = ROOT  # Fallback to ROOT if it doesn't have enough parents

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model, preprocess = clip.load("ViT-B/32", device=device)

paths = f"{WORK_DIR}/data/keyframes"
des_path =  f"{WORK_DIR}/working//dicts/npy_blip"
os.makedirs(des_path, exist_ok=True)

In [None]:
from transformers import DPRContextEncoder, AutoProcessor, DPRContextEncoderTokenizer, BlipModel,TrOCRProcessor, VisionEncoderDecoderModel, ViTFeatureExtractor, AutoTokenizer, BitsAndBytesConfig, BlipForConditionalGeneration
#from peft import get_peft_model, LoraConfig, TaskType, prepare_model_for_kbit_training
from sentence_transformers import SentenceTransformer


# tokenizer = DPRContextEncoderTokenizer.from_pretrained('facebook/dpr-ctx_encoder-single-nq-base', use_fast=False)
# embedding_model = DPRContextEncoder.from_pretrained('facebook/dpr-ctx_encoder-single-nq-base').to(device)

embedding_model= SentenceTransformer("nomic-ai/nomic-embed-text-v1", trust_remote_code=True)

In [None]:
model = BlipModel.from_pretrained("Salesforce/blip-image-captioning-base")
processor = AutoProcessor.from_pretrained("Salesforce/blip-image-captioning-base")

In [None]:
def blip_image(image_path):
  image = Image.open(image_path)
  inputs = processor(images=image, return_tensors="pt").to(device)
  pixel_values = inputs.pixel_values

  generated_ids = model.generate(pixel_values=pixel_values, max_length=50)
  generated_caption = processor.batch_decode(generated_ids, skip_special_tokens=True)[0]
  return generated_caption

In [None]:

for keyframe in tqdm(os.listdir(paths)):
  path_keyframe = os.path.join(paths,keyframe)
  video_paths = sorted(glob.glob(f"{path_keyframe}/*/"))
  video_paths = ['/'.join(i.split('/')[:-1]) for i in video_paths]

  start_time = time.time()
  for vd_path in video_paths:

    re_feats = []
    keyframe_paths = glob.glob(f'{vd_path}/*.jpg')
    keyframe_paths = sorted(keyframe_paths, key=lambda x : x.split('/')[-1].replace('.jpg',''))

    for keyframe_path in tqdm(keyframe_paths):


      #text = ocr_image(keyframe_path)

      #//////////////////////////////////
      text = blip_image(keyframe_path)
      #//////////////////////////////////
      if detect(text) == 'vi' :
        text = Translation(text)

      # Convert text to embedding vector
      #embedding = embedding_model(**tokenizer(text, return_tensors="pt", padding=True, truncation=True,max_length=512, add_special_tokens = True)).pooler_output.detach().numpy()
      embeddings = embedding_model.encode(text)
      # Append embedding to re_feats list
      re_feats.append(embeddings)

    name_npy = vd_path.split('/')[-1]

    # Construct output file path
    outfile = os.path.join(des_path, f'{name_npy}.npy')

    # Ensure the directory exists before saving
    os.makedirs(des_path, exist_ok=True)
    np.save(outfile, re_feats)

    print(f"Processed {vd_path} in {time.time() - start_time} seconds")


In [None]:
feature_shape = 512


def write_bin_file_ocr(bin_path: str, npy_path: str, method='cosine'):
    if method in 'L2':
      index = faiss.IndexFlatL2(feature_shape)
    elif method in 'cosine':
      index = faiss.IndexFlatIP(feature_shape)
    else:
      assert f"{method} not supported"
    npy_files = glob.glob(os.path.join(npy_path, "*.npy"))
    npy_files_sorted = sorted(npy_files)

    for npy_file in npy_files_sorted:
        feats = np.load(npy_file)
        print(f"Loaded {npy_file}, shape: {feats.shape}")


        # Convert to float32 and reshape to match feature_shape
        feats = feats.astype(np.float32)
        feats = feats.reshape(-1, feats.shape[-1])

        # Resize or trim feats_normalized to match feature_shape if necessary
        if feats.shape[1] != feature_shape:
            feats = feats[:, :feature_shape]

        assert feats.shape[1] == feature_shape, \
            f"Query features dimension {feats.shape[1]} do not match index dimension {feature_shape}"

        # Add to Faiss index
        index.add(feats)

    # Write the Faiss index to disk
    faiss.write_index(index, os.path.join(bin_path, f"faiss_BLIP_{method}.bin"))
    print(f'Saved {os.path.join(bin_path, f"faiss_BLIP_{method}.bin")}')


# write ocr
write_bin_file_ocr(bin_path=f"{WORK_DIR}/data/dicts/bin_blip", npy_path=f"{WORK_DIR}/data/dicts/npy_blip")

