In [None]:
!pip install -q translate
!pip install -q underthesea==1.3.5a3
!pip install -q underthesea[deep]
!pip install -q pyvi
!pip install -q langdetect
!pip install -q googletrans==3.1.0a0
!pip install -q peft
!pip install bitsandbytes
!pip install transformers
!pip install flash-attn
!pip install -U sentence-transformers
!pip install xformers
!pip install einops



In [None]:
!pip install git+https://github.com/JaidedAI/EasyOCR.git

In [None]:
from transformers import DPRContextEncoder, AutoProcessor, DPRContextEncoderTokenizer, BlipModel,TrOCRProcessor, VisionEncoderDecoderModel, ViTFeatureExtractor, AutoTokenizer, BitsAndBytesConfig, BlipForConditionalGeneration
#from peft import get_peft_model, LoraConfig, TaskType, prepare_model_for_kbit_training
from sentence_transformers import SentenceTransformer


# tokenizer = DPRContextEncoderTokenizer.from_pretrained('facebook/dpr-ctx_encoder-single-nq-base', use_fast=False)
# embedding_model = DPRContextEncoder.from_pretrained('facebook/dpr-ctx_encoder-single-nq-base').to(device)

embedding_model= SentenceTransformer("nomic-ai/nomic-embed-text-v1", trust_remote_code=True)

In [None]:
import os
import torch
import clip
from PIL import Image
import faiss
import numpy as np
import json
import matplotlib.pyplot as plt
import math
import googletrans
import translate
import glob
import underthesea
import sys
import time
from tqdm import tqdm
from pyvi import ViUtils, ViTokenizer
from difflib import SequenceMatcher
from langdetect import detect
from pathlib import Path
import re

In [None]:
ROOT = Path(os.getcwd()).resolve()

# Add ROOT to sys.path
sys.path.append(str(ROOT))

# Determine the working directory
if len(ROOT.parents) > 1:
    WORK_DIR = ROOT.parents[0]
else:
    WORK_DIR = ROOT  # Fallback to ROOT if it doesn't have enough parents

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model, preprocess = clip.load("ViT-B/32", device=device)

paths = f"{WORK_DIR}/data/keyframes"
des_path =  f"{WORK_DIR}/working//dicts/npy_ocr"
os.makedirs(des_path, exist_ok=True)

In [None]:
class Text_Preprocessing():
    def __init__(self, stopwords_path=f"{WORK_DIR}/data/dicts/vietnamese-stopwords-dash.txt"):
        with open(stopwords_path, 'r', encoding='utf-8') as f:  # Open in text mode for easier string handling
            self.stop_words = [line.strip() for line in f.readlines()]

    def find_substring(self, string1, string2):
        match = SequenceMatcher(None, string1, string2, autojunk=False).find_longest_match(0, len(string1), 0, len(string2))
        return string1[match.a:match.a + match.size].strip()

    def remove_stopwords(self, text):
        text = ViTokenizer.tokenize(text)
        filtered_words = [w for w in text.split() if w not in self.stop_words]
        return " ".join(filtered_words)

    def lowercasing(self, text):
        return text.lower()

    def uppercasing(self, text):
        return text.upper()

    def add_accents(self, text):
        return ViUtils.add_accents(text)

    def remove_accents(self, text):
        return ViUtils.remove_accents(text)

    def sentence_segment(self, text):
        return underthesea.sent_tokenize(text)

    def text_norm(self, text):
        return underthesea.text_normalize(text)

    def text_classify(self, text):
        return underthesea.classify(text)

    def sentiment_analysis(self, text):
        return underthesea.sentiment(text)

    def __call__(self, text):
        # Apply preprocessing steps
        text = self.lowercasing(text)
        text = self.remove_stopwords(text)
        # Uncomment and adjust as needed
        # text = self.remove_accents(text)
        # text = self.add_accents(text)
        text = self.text_norm(text)
        return text  # Return the processed text

class Translation():
    def __init__(self, from_lang='vi', to_lang='en', mode='google'):
        # The class Translation is a wrapper for the two translation libraries, googletrans and translate.
        self.__mode = mode
        self.__from_lang = from_lang
        self.__to_lang = to_lang
        self.text_processing = Text_Preprocessing()
        if mode in 'googletrans':
            self.translator = googletrans.Translator()
        elif mode in 'translate':
            self.translator = translate.Translator(from_lang=from_lang,to_lang=to_lang)

    def preprocessing(self, text):

        return self.text_processing(text) #text.lower()

    def __call__(self, text):

        text = self.preprocessing(text)
        return self.translator.translate(text) if self.__mode in 'translate' \
                else self.translator.translate(text, dest=self.__to_lang).text



In [None]:

# Function to perform OCR on an image and return text
def ocr_image(image_path):
    image = Image.open(image_path).convert("RGB")
    pixel_values = processor(image, return_tensors="pt").pixel_values
    generated_ids = model.generate(pixel_values)
    generated_text = processor.batch_decode(generated_ids, skip_special_tokens=True)[0]
    return generated_text

def easyocr_image(image_path):
    texts = []
    result = reader.readtext(image_path)

    for i in result:
        if i[2] >0.3:
          texts.append(i[1])

    merged_text = "\n".join(texts)
    merged_text = Translation(merged_text)
    return merged_text



In [None]:

for keyframe in tqdm(os.listdir(paths)):
  path_keyframe = os.path.join(paths,keyframe)
  video_paths = sorted(glob.glob(f"{path_keyframe}/*/"))
  video_paths = ['/'.join(i.split('/')[:-1]) for i in video_paths]

  start_time = time.time()
  for vd_path in video_paths:

    re_feats = []
    keyframe_paths = glob.glob(f'{vd_path}/*.jpg')
    keyframe_paths = sorted(keyframe_paths, key=lambda x : x.split('/')[-1].replace('.jpg',''))

    for keyframe_path in tqdm(keyframe_paths):


      #text = ocr_image(keyframe_path)

      #//////////////////////////////////
      text = easyocr_image(keyframe_path)
      #//////////////////////////////////
      if detect(text) == 'vi' :
        text = Translation(text)

      # Convert text to embedding vector
      #embedding = embedding_model(**tokenizer(text, return_tensors="pt", padding=True, truncation=True,max_length=512, add_special_tokens = True)).pooler_output.detach().numpy()
      embeddings = embedding_model.encode(text)
      # Append embedding to re_feats list
      re_feats.append(embeddings)

    name_npy = vd_path.split('/')[-1]

    # Construct output file path
    outfile = os.path.join(des_path, f'{name_npy}.npy')

    # Ensure the directory exists before saving
    os.makedirs(des_path, exist_ok=True)
    np.save(outfile, re_feats)

    print(f"Processed {vd_path} in {time.time() - start_time} seconds")


In [None]:
feature_shape = 384


def write_bin_file_ocr(bin_path: str, npy_path: str, method='cosine'):
    if method in 'L2':
      index = faiss.IndexFlatL2(feature_shape)
    elif method in 'cosine':
      index = faiss.IndexFlatIP(feature_shape)
    else:
      assert f"{method} not supported"
    npy_files = glob.glob(os.path.join(npy_path, "*.npy"))
    npy_files_sorted = sorted(npy_files)

    for npy_file in npy_files_sorted:
        feats = np.load(npy_file)
        print(f"Loaded {npy_file}, shape: {feats.shape}")


        # Convert to float32 and reshape to match feature_shape
        feats = feats.astype(np.float32)
        feats = feats.reshape(-1, feats.shape[-1])

        # Resize or trim feats_normalized to match feature_shape if necessary
        if feats.shape[1] != feature_shape:
            feats = feats[:, :feature_shape]

        assert feats.shape[1] == feature_shape, \
            f"Query features dimension {feats.shape[1]} do not match index dimension {feature_shape}"

        # Add to Faiss index
        index.add(feats)

    # Write the Faiss index to disk
    faiss.write_index(index, os.path.join(bin_path, f"faiss_OCR_{method}.bin"))
    print(f'Saved {os.path.join(bin_path, f"faiss_OCR_{method}.bin")}')


# write ocr
write_bin_file_ocr(bin_path=f"{WORK_DIR}/data/dicts/bin_ocr", npy_path=f"{WORK_DIR}/data/dicts/npy_ocr")

