In [2]:
import pytesseract
# import easyocr
import cv2
import numpy as np
import spacy
import re
import os
import logging
from typing import Dict, Any, List, Tuple
from pathlib import Path
import time
import torch
from transformers import AutoTokenizer, AutoModelForCausalLM
# from transformers import pipeline

In [None]:
class TesseractProcessor:
    def __init__(self, output_dir: str = "tes_extracted_content"):
        """
        Initialize the document processor for images.
        
        Args:
            output_dir (str): Directory to save extracted images
        """
        logging.basicConfig(level=logging.INFO)
        self.logger = logging.getLogger(__name__)
        # Set Tesseract path - modify this according to your installation
        pytesseract.pytesseract.tesseract_cmd = r'C:\Program Files\Tesseract-OCR\tesseract.exe'
        # initializing model and tokenizer
        self.model_name = "facebook/bart-large-cnn" 
        self.tokenizer = AutoTokenizer.from_pretrained(self.model_name)
        self.model = AutoModelForCausalLM.from_pretrained(self.model_name)
        # Load SpaCy model for NLP tasks
        try:
            self.nlp = spacy.load('en_core_web_sm')
        except OSError:
            spacy.cli.download('en_core_web_sm')
            self.nlp = spacy.load('en_core_web_sm')
        
        # Create output directories
        self.output_dir = output_dir
        self.images_dir = os.path.join(output_dir, "images")
        os.makedirs(self.images_dir, exist_ok=True)
        
    def process_document(self, image_path: str) -> Dict[str, Any]:
        """
        Process a document image to extract text and images.
        
        Args:
            image_path (str): Path to the document image
            
        Returns:
            Dict[str, Any]: Extracted content including text, entities, and image paths
        """
        try:
            start_time = time.time()
            # Read and process image
            image = cv2.imread(image_path)
            if image is None:
                raise ValueError(f"Could not read image at {image_path}")
            
            # Extract text content
            text_result = self.extract_text(image)
            
            # Extract images
            image_result = self.extract_images(image, Path(image_path).stem)
            processing_time = time.time() - start_time
            return {
                **text_result,
                'extracted_images': image_result,
                'processing_time': processing_time
            }
            
        except Exception as e:
            self.logger.error(f"Error processing document: {str(e)}")
            raise

    def extract_text(self, image: np.ndarray) -> Dict[str, Any]:
        """
        Extract and process text from image.
        
        Args:
            image (np.ndarray): Input image
            
        Returns:
            Dict[str, Any]: Processed text data
        """
        # Preprocess image for better OCR
        processed_image = self.preprocess_for_ocr(image)
        
        # Perform OCR
        raw_text = pytesseract.image_to_string(processed_image)
        
        # Get OCR confidence data
        ocr_data = pytesseract.image_to_data(processed_image, output_type=pytesseract.Output.DICT)
        
        # Process extracted text
        processed_data = self.process_text(raw_text)
        summary=self.summary_generator(processed_data['processed_text'])
        
        return {
            'raw_text': raw_text,
            'processed_text': processed_data['processed_text'],
            'summary':summary,
            'entities': processed_data['entities'],
            'pos_tags': processed_data['pos_tags'],
            'confidence_scores': self._get_confidence_scores(ocr_data)
        }

    def extract_images(self, image: np.ndarray, base_name: str) -> List[Dict[str, Any]]:
        """
        Extract images from document using contour detection.
        
        Args:
            image (np.ndarray): Input image
            base_name (str): Base name for saving extracted images
            
        Returns:
            List[Dict[str, Any]]: List of extracted image information
        """
        extracted = []
        
        # Convert to grayscale
        gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
        
        # Apply different thresholding methods to detect various types of images
        binary_methods = [
            ('otsu', cv2.threshold(gray, 0, 255, cv2.THRESH_BINARY_INV + cv2.THRESH_OTSU)[1]),
            # ('adaptive', cv2.adaptiveThreshold(gray, 255, cv2.ADAPTIVE_THRESH_GAUSSIAN_C, 
            #                                 cv2.THRESH_BINARY_INV, 11, 2))
        ]
        
        for method_name, binary in binary_methods:
            # Find contours
            contours, _ = cv2.findContours(binary, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
            
            for idx, contour in enumerate(contours):
                # Filter small contours and check aspect ratio
                if not self._is_valid_image_region(contour, image.shape):
                    continue
                
                # Get bounding box
                x, y, w, h = cv2.boundingRect(contour)
                
                # Extract region
                roi = image[y:y+h, x:x+w]
                
                # Check if region contains enough variation to be an image
                if not self._is_image_content(roi):
                    continue
                
                # Save extracted region
                image_filename = f"{base_name}_region_{idx + 1}.png"
                image_path = os.path.join(self.images_dir, image_filename)
                
                cv2.imwrite(image_path, roi)
                
                extracted.append({
                    'filename': image_filename,
                    'path': image_path,
                    'position': {'x': x, 'y': y, 'width': w, 'height': h},
                    'area': cv2.contourArea(contour),
                })
        
        return extracted

    def _is_valid_image_region(self, contour: np.ndarray, image_shape: Tuple) -> bool:
        """
        Check if contour is likely to be an image region.
        """
        # Get area and dimensions
        area = cv2.contourArea(contour)
        x, y, w, h = cv2.boundingRect(contour)
        aspect_ratio = w / h if h != 0 else 0
        
        # Calculate relative size
        image_area = image_shape[0] * image_shape[1]
        relative_size = area / image_area
        
        # Define thresholds
        MIN_RELATIVE_SIZE = 0.01  # 1% of image
        MAX_RELATIVE_SIZE = 0.9   # 90% of image
        MIN_ASPECT_RATIO = 0.2
        MAX_ASPECT_RATIO = 5.0
        
        return (MIN_RELATIVE_SIZE <= relative_size <= MAX_RELATIVE_SIZE and 
                MIN_ASPECT_RATIO <= aspect_ratio <= MAX_ASPECT_RATIO)

    def _is_image_content(self, region: np.ndarray, std_threshold: float = 20) -> bool:
        """
        Check if region contains enough variation to be an image.
        """
        gray_region = cv2.cvtColor(region, cv2.COLOR_BGR2GRAY)
        return np.std(gray_region) > std_threshold

    def preprocess_for_ocr(self, image: np.ndarray) -> np.ndarray:
        """
        Preprocess image for better OCR results.
        """
        # Convert to grayscale
        gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
        
        # Apply thresholding
        _, binary = cv2.threshold(gray, 0, 255, cv2.THRESH_BINARY + cv2.THRESH_OTSU)
        
        # Denoise
        denoised = cv2.fastNlMeansDenoising(binary)
        
        # Deskew if needed
        angle = self._get_skew_angle(denoised)
        if abs(angle) > 0.5:
            denoised = self._rotate_image(denoised, angle)
        
        return denoised

    def _get_skew_angle(self, image: np.ndarray) -> float:
        """Calculate skew angle of the image."""
        coords = np.column_stack(np.where(image > 0))
        angle = cv2.minAreaRect(coords)[-1]
        if angle < -45:
            angle = 90 + angle
        return -angle

    def _rotate_image(self, image: np.ndarray, angle: float) -> np.ndarray:
        """Rotate the image by given angle."""
        (h, w) = image.shape[:2]
        center = (w // 2, h // 2)
        M = cv2.getRotationMatrix2D(center, angle, 1.0)
        rotated = cv2.warpAffine(image, M, (w, h), flags=cv2.INTER_CUBIC, borderMode=cv2.BORDER_REPLICATE)
        return rotated

    def process_text(self, text: str) -> Dict[str, Any]:
        """
        Process extracted text using NLP techniques.
        """
        # Clean text
        cleaned_text = self._clean_text(text)
        
        # Process with SpaCy
        doc = self.nlp(cleaned_text)
        
        # Extract entities
        entities = [(ent.text, ent.label_) for ent in doc.ents]
        
        # Get POS tags
        pos_tags = [(token.text, token.pos_) for token in doc]
        
        return {
            'processed_text': cleaned_text,
            'entities': entities,
            'pos_tags': pos_tags
        }

    def _clean_text(self, text: str) -> str:
        """Clean the extracted text."""
        # Remove special characters and extra whitespace
        cleaned = re.sub(r'[\x00-\x1F\x7F-\x9F]', '', text)
        cleaned = re.sub(r'\s+', ' ', cleaned)
        
        # Remove common OCR artifacts
        cleaned = re.sub(r'[|]', 'I', cleaned)
        cleaned = re.sub(r'[¢]', 'c', cleaned)
        
        return cleaned.strip()

    def summary_generator(self, text:str)->str:
        inputs = self.tokenizer(text, return_tensors="pt")
        
        with torch.no_grad():
            outputs = self.model.generate(
                inputs["input_ids"],
                max_length=200,
                num_return_sequences=1,
                temperature=0.7
            )
        
        response = self.tokenizer.decode(outputs[0], skip_special_tokens=True)
        return response
    
    def _get_confidence_scores(self, ocr_data: Dict[str, Any]) -> List[float]:
        """Extract confidence scores for OCR results."""
        return [conf for conf in ocr_data['conf'] if conf != -1]


Tprocessor = TesseractProcessor()
    # Eprocessor = EasyOCRProcessor()

        # Process a document
Tresult1 = [Tprocessor.process_document("dataset/News/94682942.jpg")]

    #     Eresult = Eprocessor.process_document("dataset/News/94682942.jpg")
    #     # Print performance metrics
    #     print("\nPerformance Metrics:")
    #     print(f"Processing Time: {Tresult['processing_time']:.2f} seconds")
    #     print(f"Processing Time: {Eresult['processing_time']:.2f} seconds")
    #     print(f"Confidence Score: {np.mean(Tresult['confidence_scores']):.2f},{np.mean(Eresult['confidence_scores']):.2f}")
        
    #     # Print text analysis
    #     print("\nExtracted Text Sample:")
    #     print(Tresult['processed_text'][:200] + "...")
    #     print(Eresult['processed_text'][:200] + "...")
        
    #     print("\nNamed Entities:")
    #     for entity, label in Tresult['entities']:
    #         print(f"{entity}: {label}")
    #     print("--------------")
    #     for entity, label, _ in Eresult['entities'][:5]:
    #         print(f"{entity}: {label}")
        
    #     print("\nPOS Tags Sample:")
    #     for token, pos in Tresult['pos_tags'][:10]:
    #         print(f"{token}: {pos}")
    #     print("--------------------")
    #     for token, pos, tag, dep in Eresult['pos_tags'][:5]:
    #         print(f"{token}: {pos} ({tag}) - {dep}")

    #     print("\nExtracted Images:")
    #     for img in Tresult['extracted_images']:
    #         print(f"Method: {img['method']}")
    #         print(f"Size: {img['size']}")
    #         print(f"Saved to: {img['filename']}")
    #     print("----------------")
    #     for img in Eresult['extracted_images']:
    #         print(f"Saved: {img['filename']}")
    #         print(f"Method: {img['method']}")
    #         print(f"Position: {img['position']}")
    #         print("---")
        
    # except Exception as e:
    #     print(f"Error: {str(e)}")
    
    # summarizer = pipeline("summarization", model="ainize/bart-base-cnn")
    # print(Tresult['processed_text'])
    # print(summarizer(Tresult['processed_text'], max_length=230, min_length=30, do_sample=False))

In [4]:
import psycopg2
from psycopg2.extras import execute_values
from pymilvus import (MilvusClient,DataType,AnnSearchRequest,RRFRanker,Collection)
from pymilvus.model.hybrid import BGEM3EmbeddingFunction
import numpy as np
from PIL import Image
from typing import Dict, List, Any
import logging
import torch
import timm
from sklearn.preprocessing import normalize
from timm.data import resolve_data_config
from timm.data.transforms_factory import create_transform


class PostGreDatabaseHandler:
    def __init__(self, postgres_conn_string: str):
        """
        Initialize database connections and models.
        
        Args:
            postgres_conn_string: PostgreSQL connection string

        """
        # Initialize logging
        logging.basicConfig(level=logging.INFO)
        self.logger = logging.getLogger(__name__)

        # Initialize PostgreSQL connection
        self.pg_conn = psycopg2.connect(postgres_conn_string)

        """
        Create the necessary tables in PostgreSQL database.
    
        Args:
        conn_params (dict): Database connection parameters
        """
        commands = (
        """
        CREATE TABLE IF NOT EXISTS documents (
           id SERIAL PRIMARY KEY,
           file_name VARCHAR(255) NOT NULL,
           content TEXT NOT NULL,
           summary TEXT NOT NULL,
           confidence FLOAT NOT NULL
        )
        """,
        """
        CREATE TABLE IF NOT EXISTS named_entities (
           id SERIAL PRIMARY KEY,
           doc_id INTEGER REFERENCES documents(id) ON DELETE CASCADE,
           entity TEXT NOT NULL,
           label VARCHAR(50) NOT NULL
        )
        """,
        """
        CREATE TABLE IF NOT EXISTS pos_tags (
           id SERIAL PRIMARY KEY,
           doc_id INTEGER REFERENCES documents(id) ON DELETE CASCADE,
           token TEXT NOT NULL,
           pos_tag VARCHAR(20) NOT NULL
        )
        """
        )
    
        try:
            with self.pg_conn.cursor() as cur:

              # Create each table
              for command in commands:
                cur.execute(command)
            
              cur.close()
              self.pg_conn.commit()
        
        except (Exception, psycopg2.DatabaseError) as error:
            print(f"Error: {error}")


    def store_document_data(self, doc_data: List[Dict[str, Any]]):
      """
      Store document data including file name, text, entities, and POS tags into PostgreSQL tables.
    
      Args:
        doc_data (List[Dict[str, Any]]): Dictionary containing:
            - file_name (str): Name of the file
            - text (str): Content of the file
            - summary (str): Summary of the content of the file
            - confidence_score (float): Confidence score for the document
            - entities (Dict[str, str]): Dictionary of entity:label pairs
            - pos (Dict[str, str]): Dictionary of text:pos_tag pairs
            - postgres_conn_string: PostgreSQL connection string
      """
      try:
        with self.pg_conn.cursor() as cur:
            for data in doc_data:
                # First, insert the document and get its ID
                doc_insert_query = """
                  INSERT INTO documents (file_name, content,summary, confidence)
                  VALUES (%s, %s, %s, %s)
                  RETURNING id;
                """
                cur.execute(doc_insert_query, (data['file_name'],data['processed_text'],data['summary'],data['confidence_scores']))
                doc_id = cur.fetchone()[0]
            
                # Insert named entities
                if data.get('entities'):
                  entities_insert_query = """
                    INSERT INTO named_entities (doc_id, entity, label)
                    VALUES (%s, %s, %s);
                """
                  entity_data = [
                    (doc_id, entity, label)for entity, label in data['entities']
                    ]
                cur.executemany(entities_insert_query, entity_data)
            
                # Insert POS tags
                if data.get('pos_tags'):
                  pos_insert_query = """
                    INSERT INTO pos_tags (doc_id, token, pos_tag)
                    VALUES (%s, %s, %s);
                """
                  pos_data = [
                    (doc_id, token, pos_tag)for token, pos_tag in data['pos_tags']
                    ]
                cur.executemany(pos_insert_query, pos_data)
            
            self.pg_conn.commit()
            
      except Exception as e:
        self.pg_conn.rollback()
        raise Exception(f"Error storing document data: {str(e)}")
      
    def postgres_file_entities(self, file_name: str) -> dict:
        """
        Search for a specific file name and return entities from the documents table.

        Args:
         file_name (str): Exact file name to search for

        Returns:
         dict: including entities and label.
              Returns empty dict if file not found.
        """
        query = """
          SELECT entity,label
          FROM named_entities
          JOIN documents ON named_entities.doc_id = documents.id
          WHERE documents.file_name = %s;
        """
    
        try:
          cur = self.pg_conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor)
          cur.execute(query, (file_name,))
          result = cur.fetchall()
          self.pg_conn.commit()
          return result if result else {}
          
        except Exception as e:
          self.logger.error(f"Error in file search: {str(e)}")
          if self.pg_conn:
            self.pg_conn.rollback()  # Rollback the transaction on error
        return {}
    
    def postgres_entities_file(self, entity: str) -> dict:
        """
        Search for entities and return file names.

        Args:
         entity (str): entity to search for

        Returns:
         dict: including file names.
              Returns empty dict if file not found.
        """
        query = """
          SELECT distinct file_name
          FROM documents
          JOIN named_entities ON documents.id=named_entities.doc_id
          WHERE named_entities.label = %s;
        """
    
        try:
          cur = self.pg_conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor)
          cur.execute(query, (entity,))
          result = cur.fetchall()
          self.pg_conn.commit()
          return result if result else {}
        
        except Exception as e:
          self.logger.error(f"Error in file search: {str(e)}")
          if self.pg_conn:
            self.pg_conn.rollback()  # Rollback the transaction on error
        return {}
      
    def postgres_file_summary(self, file_name: str) -> dict:
        """
        Search for a specific file name and return its details from the documents table.

        Args:
         file_name (str): Exact file name to search for

        Returns:
         dict: Document details including file name, content, and confidence score.
              Returns empty dict if file not found.
        """
        query = """
          SELECT
          summary
          FROM documents
          WHERE file_name = %s
          LIMIT 1;
        """
    
        try:
          cur = self.pg_conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor)
          cur.execute(query, (file_name,))
          result = cur.fetchone()
          self.pg_conn.commit()  # Commit the transaction
          return result if result else {}
        
        except Exception as e:
          self.logger.error(f"Error in file search: {str(e)}")
          if self.pg_conn:
            self.pg_conn.rollback()  # Rollback the transaction on error
        return {}
    
    def postgrekeyword_search(self, keyword: str) -> List[dict]:
       """
       Search for documents containing the given keyword and return their details.
    
       Args:
        keyword (str): Keyword to search for in documents content and file names
    
      Returns:
        List[dict]: List of matching documents with their file names and content
       """
       query = """
        SELECT DISTINCT 
        file_name, content
        FROM documents
        WHERE content ILIKE %s OR file_name ILIKE %s
        ORDER BY file_name;
        """
       try:
         cur = self.pg_conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor)
         # Add wildcards for partial matching
         search_pattern = f'%{keyword}%'
         cur.execute(query, (search_pattern, search_pattern))
         results = cur.fetchall()
         return results
            
       except Exception as e:
         self.logger.error(f"Error in keyword search: {str(e)}")
       if self.pg_conn:
         self.pg_conn.rollback()  # Rollback the transaction on error
       return []

    def close(self):
      """Close database connections."""
      if hasattr(self, 'pg_conn') and not self.pg_conn.closed:
            self.pg_conn.close()

In [81]:
pgdb=PostGreDatabaseHandler("postgresql://postgres:12072024@localhost:5432/postgres")
pgdb.store_document_data(Tresult1)
pgdb.close()

In [78]:
class FeatureExtractor:
    def __init__(self, modelname):
        # Load the pre-trained model
        self.model = timm.create_model(
            modelname, pretrained=True, num_classes=0, global_pool="avg"
        )
        self.model.eval()

        # Get the input size required by the model
        self.input_size = self.model.default_cfg["input_size"]

        config = resolve_data_config({}, model=modelname)
        # Get the preprocessing function provided by TIMM for the model
        self.preprocess = create_transform(**config)

    def __call__(self, imagepath):
        # Preprocess the input image
        input_image = Image.open(imagepath).convert("RGB")  # Convert to RGB if needed
        input_image = self.preprocess(input_image)

        # Convert the image to a PyTorch tensor and add a batch dimension
        input_tensor = input_image.unsqueeze(0)

        # Perform inference
        with torch.no_grad():
            output = self.model(input_tensor)

        # Extract the feature vector
        feature_vector = output.squeeze().numpy()

        return normalize(feature_vector.reshape(1, -1), norm="l2").flatten()

In [None]:
class MilvusDatabaseHandler:
    def __init__(self,milvusdb:str):
        """
        Initialize database connections and models.

        Args:
            postgres_conn_string: PostgreSQL connection string

        """
        # Initialize logging
        logging.basicConfig(level=logging.INFO)
        self.logger = logging.getLogger(__name__)

        # Initialize Milvus Client
        self.client=MilvusClient(milvusdb)

        # Embedding Model
        self.bge=BGEM3EmbeddingFunction(
        model_name='BAAI/bge-m3',
        device='cpu',
        use_fp16=False
        )

        # Image Embedding
        self.extractor = FeatureExtractor("resnet34")

        # Creating schema for texts
        text_schema=MilvusClient.create_schema(
            enable_dynamic_filter=True,
            )

        # Addings fields
        text_schema.add_field(field_name="file_name",datatype=DataType.VARCHAR,is_primary=True,max_length=100)
        text_schema.add_field(field_name="Text",datatype=DataType.VARCHAR,max_length=1000)
        text_schema.add_field(field_name="Summary",datatype=DataType.VARCHAR,max_length=100)
        text_schema.add_field(field_name="sparse",datatype=DataType.SPARSE_FLOAT_VECTOR)
        text_schema.add_field(field_name="dense",datatype=DataType.FLOAT_VECTOR,dim=self.bge.dim["dense"])
        text_schema.add_field(field_name="confidence",datatype=DataType.FLOAT)

        # Creating Index
        text_index_param=self.client.prepare_index_params()
        text_index_param.add_index(
            field_name="dense",
            index_name="dense_index",
            index_type="AUTOINDEX",
            metric_type="IP",
            params={"nlist":128}
            )

        text_index_param.add_index(
            field_name="sparse",
            index_name="sparse_index",
            index_type="SPARSE_INVERTED_INDEX",
            metric_type="IP",
            params={"drop_ratio_build":0.2},
            )

        # Creating Collection
        if self.client.has_collection(collection_name="Text_collection"):
            self.client.drop_collection(collection_name="Text_collection")
        self.client.create_collection(
             collection_name="Text_collection",
             schema=text_schema,
             index_params=text_index_param
            )

        # Creating schema for Named-Entities
        named_schema = MilvusClient.create_schema(
            auto_id=True,
            enable_dynamic_filter=True,
            )

        # Adding fields
        named_schema.add_field(field_name="id", datatype=DataType.INT64, is_primary=True)
        named_schema.add_field(field_name="file_name", datatype=DataType.VARCHAR,max_length=1000)
        named_schema.add_field(field_name="entity", datatype=DataType.VARCHAR, max_length=200)
        named_schema.add_field(field_name="label", datatype=DataType.VARCHAR, max_length=50)

        # Creating Collection
        if self.client.has_collection(collection_name="Named_collection"):
            self.client.drop_collection(collection_name="Named_collection")
        self.client.create_collection(
             collection_name="Named_collection",
             schema=named_schema,
            )

        # Creating schema for Pos
        pos_schema = MilvusClient.create_schema(
            auto_id=True,
            enable_dynamic_field=True,
            )

        # Adding fields
        pos_schema.add_field(field_name="id", datatype=DataType.INT64, is_primary=True)
        pos_schema.add_field(field_name="file_name", datatype=DataType.VARCHAR,max_length=1000)
        pos_schema.add_field(field_name="token", datatype=DataType.VARCHAR, max_length=100)
        pos_schema.add_field(field_name="pos", datatype=DataType.VARCHAR, max_length=20)

        # Creating Collection
        if self.client.has_collection(collection_name="Pos_collection"):
            self.client.drop_collection(collection_name="Pos_collection")
        self.client.create_collection(
             collection_name="Pos_collection",
             schema=pos_schema,
            )

        # Creating schema for image
        img_schema = MilvusClient.create_schema(
            auto_id=True,
            enable_dynamic_field=True,
            )

        # Adding fields
        img_schema.add_field(field_name="id", datatype=DataType.INT64, is_primary=True)
        img_schema.add_field(field_name="file_name", datatype=DataType.VARCHAR,max_length=1000)
        img_schema.add_field(field_name="img_file_name", datatype=DataType.VARCHAR, max_length=100)
        img_schema.add_field(field_name="img_file_path", datatype=DataType.VARCHAR, max_length=100)
        img_schema.add_field(field_name="dense",datatype=DataType.FLOAT_VECTOR,dim=512)

        # Creating Index
        img_index_param=self.client.prepare_index_params()
        img_index_param.add_index(
            field_name="dense",
            index_name="dense_index",
            index_type="AUTOINDEX",
            metric_type="COSINE",
            params={"nlist":128}
            )

        # Creating collection
        if self.client.has_collection(collection_name="Img_collection"):
            self.client.drop_collection(collection_name="Img_collection")
        self.client.create_collection(
             collection_name="Img_collection",
             schema=img_schema,
             index_params=img_index_param
            )


    def store_document_data(self,doc_data:List[Dict[str, Any]]):
      """
      Store document data including file name, text, entities, and POS tags into Milvus Collections.

      Args:
        doc_data (List[Dict[str, Any]]): Dictionary containing:
            - file_name (str): Name of the file
            - text (str): Content of the file
            - summary (str): Summary of the content of the file
            - confidence_score (float): Confidence score for the document
            - entities (Dict[str, str]): Dictionary of entity:label pairs
            - pos_tags (Dict[str, str]): Dictionary of text:pos_tag pairs
            - images (list(Dict[str,str])): List of dictionary of images
      """

      try:
        for data in doc_data:
            # Generate text embeddings
            embeddings = self.bge.encode_documents([data['processed_text']])
            sparse_vector = embeddings['sparse']
            dense_vector = embeddings['dense'][0]
            # Insert into Text_collection
            text_data = [{
                'file_name': data['file_name'],
                'Text': data['processed_text'],
                'summary':data['summary'],
                'sparse': sparse_vector,
                'dense': dense_vector,
                'confidence': data['confidence_scores']
            }]

            text_insert_result = self.client.insert(
                collection_name="Text_collection",
                data=text_data
            )

            # Insert into Named_collection
            if data.get('entities'):
                named_data = [
                    {
                        'file_name': data['file_name'],
                        'entity': entity,
                        'label': label
                    }
                    for entity, label in data['entities']
                ]
                self.client.insert(
                    collection_name="Named_collection",
                    data=named_data
                )

            # Insert into Pos_collection
            if data.get('pos_tags'):
                pos_data = [
                    {
                        'file_name': data['file_name'],
                        'token': token,
                        'pos': pos
                    }
                    for token, pos in data['pos_tags']
                ]
                self.client.insert(
                    collection_name="Pos_collection",
                    data=pos_data
                )

            # Insert into Img_collection
            if data.get('extracted_images'):
                for img_info in data['extracted_images']:
                    img_embedding = self.extractor(img_info['path'])
                    img_data = [{
                        'file_name': data['file_name'],
                        'img_file_name': img_info['filename'],
                        'img_file_path': img_info['path'],
                        'dense': img_embedding.tolist()
                    }]
                    self.client.insert(
                        collection_name="Img_collection",
                        data=img_data
                    )

            # Flush collections to ensure data is written
            self.client.flush(collection_name="Text_collection")
            self.client.flush(collection_name="Named_collection")
            self.client.flush(collection_name="Pos_collection")
            self.client.flush(collection_name="Img_collection")

      except Exception as e:
            raise Exception(f"Error inserting data into Milvus: {str(e)}")


    def milvuskeyword_search(self,keyword:str)->List[Dict[str, Any]]:
      """
        Perform hybrid search using both sparse and dense vectors.

        Args:
            keyword (str): Search keyword/query

        Returns:
            List[Dict]: List of search results with scores and metadata
      """

      try:

        # Generate text embeddings
        embeddings = self.bge.encode_documents([keyword])
        sparse_vector = embeddings['sparse']
        dense_vector = embeddings['dense'][0]

        search_par_1={
            "data":[dense_vector],
            "anns_field":"dense",
            "param":{
                "metric_type":"IP",
                "params":{"nprobe":10}
                },
                "limit":2
                }
        req1=AnnSearchRequest(**search_par_1)

        search_par_2={
            "data":[sparse_vector],
            "anns_field":"sparse",
            "param":{
                "metric_type":"IP",
                "params":{"drop_ratio_build":0.2}
                },
                "limit":2
                }
        req2=AnnSearchRequest(**search_par_2)

        reqs=[req1,req2]
        rank=RRFRanker(100)

        res = self.client.hybrid_search(
            collection_name="Text_collection",
            reqs=reqs,
            ranker=rank,
            limit=4
            )
        result=[]
        for hits in res:
         for hit in hits:
          d={
              'file_name':hit['id'],
              'score':hit['distance'],
              'text':self.client.get(
                  collection_name="Text_collection",
                  ids=[hit['id']],
                  output_fields=["Text"]
              )
          }
          result.append(d)
        return result

      except Exception as e:
        raise Exception(f"Error performing hybrid search: {str(e)}")



    def milvusimage_search(self,path:str)->List[Dict]:
        """
        Search for similar images in the database.

        Args:
            image_path (str): Path to the query image
            limit (int): Maximum number of results to return
            distance_threshold (float): Maximum distance threshold for similarity

        Returns:
            List[Dict]: List of similar images with metadata
        """
        try:
             embeddings=self.extractor(path)

             search_params = {
                "metric_type": "COSINE",
            }

             # Execute search
             results = self.client.search(
                "Img_collection",
                data=[embeddings],
                anns_field="dense",
                search_params=search_params,
                limit=2,
                output_fields=["file_name", "img_file_name", "img_file_path"]
            )
             # Process results
             processed_results = []
             for hits in results:
                for hit in hits:
                    result = {
                        "distance":hit['distance'],
                        "image_file_name": hit['entity']['img_file_name'],
                        "image_file_path": hit['entity']['img_file_path'],
                        "document_name": hit['entity']['file_name'],
                        "document_text": self.client.get(
                                         collection_name="Text_collection",
                                         ids=[hit['entity']['file_name']],
                                         output_fields=["Text"]
              )
                    }
                    processed_results.append(result)

             return processed_results

        except Exception as e:
            raise Exception(f"Error performing image search: {str(e)}")


    def close(self):
        """Release collection resources."""
        self.text_collection.release()
        self.named_collection.release()
        self.pos_collection.release()
        self.img_collection.release()

In [None]:
mvdb=MilvusDatabaseHandler("/milvus_demo.db")
mvdb.store_document_data(Tresult1)

In [None]:
import re
from typing import Dict, Any, Optional, Tuple
from transformers import AutoTokenizer, AutoModelForCausalLM
import torch
import asyncio

%pip install nest_asyncio
import nest_asyncio
nest_asyncio.apply()

In [49]:
class QueryProcessor:
    def __init__(self):
        # Initialize LLM model and tokenizer
        self.model_name = "facebook/bart-large-cnn"
        self.tokenizer = AutoTokenizer.from_pretrained(self.model_name)
        self.model = AutoModelForCausalLM.from_pretrained(self.model_name)
        
        # Define intent patterns
        self.intent_patterns = {
            "DOC_SUMMARY": r"(?:give|show|get|what is the)?\s*summary\s+(?:of|for)?\s*document\s*(\d+\.jpg)",
            "DOC_ENTITIES": r"(?:give|show|get|what are the)?\s*(?:entities|pos|named entities|parts of speech)\s+(?:in|of|for|present in)?\s*document\s*(\d+\.jpg)",
            "ENTITY_DOCS": r"(?:give|show|get|what are the)?\s*documents\s+containing\s+entities\s+(?:like|such as)?\s*(\w+)",
            "TOPIC_DOCS": r"(?:give|show|get|what are the)?\s*documents\s+containing\s+information\s+about\s+(.*)",
            "SIMILAR_IMAGE_DOCS": r"(?:given this image|given image|with this image|using this image)?\s*(?:return|find|get|show)?\s*(?:documents containing similar images|similar images|documents with similar images|images like this)"
        }
        self.sqlconnection=PostGreDatabaseHandler("postgresql://postgres:12072024@localhost:5432/postgres")

    def classify_intent(self, query: str) -> Tuple[str, Optional[Dict[str, str]]]:
        """Classify the intent of the user query and extract parameters."""
        query = query.lower().strip()
        
        for intent, pattern in self.intent_patterns.items():
            match = re.search(pattern, query)
            if match:
                params = self._extract_parameters(intent, match)
                return intent, params
                
        return "UNKNOWN", None

    def _extract_parameters(self, intent: str, match) -> Dict[str, str]:
        """Extract parameters based on the intent and regex match."""
        params = {}
        
        if intent == "DOC_SUMMARY":
            # Extract the complete filename (numbers.jpg)
            params["doc_id"] = match.group(1)
            
        elif intent == "DOC_ENTITIES":
            # Extract the complete filename (numbers.jpg)
            params["doc_id"] = match.group(1)
            params["entity_type"] = "entities" if "entities" in match.group(0) else "pos"
            
        elif intent == "ENTITY_DOCS":
            params["entity_type"] = match.group(1).upper()
            
        elif intent == "TOPIC_DOCS":
            params["topic"] = match.group(1).strip()
            
        elif intent == "SIMILAR_IMAGE_DOCS":
            params["path"] = ""
            
        return params

    async def execute_db_query(self, intent: str, params: Dict[str, str]) -> Dict[str, Any]:
        """Placeholder for database query execution."""
        if intent == "DOC_SUMMARY":
            result=self.sqlconnection.postgres_file_summary(params['doc_id'])
            if not result:
                return {'content':[]}
            return {"content":[result['content']]}
            
        elif intent == "DOC_ENTITIES":
            result=self.sqlconnection.postgres_file_entities(params['doc_id'])
            if not result:
               return {'entities': [], 'label': []}
            x = {'entities':[result[i]['entity']for i in range(5)],
               'label':[result[i]['label']for i in range(5)]}
            return x
              
        elif intent == "ENTITY_DOCS":
            result=self.sqlconnection.postgres_entities_file(params['entity_type'])
            if not result:
                return {'file name':[]}
            return {'file name':[result[i]['file_name']for i in range(len(result))]}

        elif intent == "TOPIC_DOCS":
            result=mvdb.milvuskeyword_search(params['topic'])
            if not result:
              return {'file name':[]}
            return {'file name':[x[i]['file_name']for i in range(len(x))]}

        elif intent == "SIMILAR_IMAGE_DOCS":
            result=mvdb.milvusimage_search(params['path'])
            if not result:
                return {'similar_docs':[]}
            return {'document name':[result[i]['document_name'] for i in range(min(2,len(result)))]}

            
        return {"error": "Unknown intent"}
    
    def generate_llm_prompt(self, intent: str, db_results: Dict[str, Any], original_query: str) -> str:
        """Generate appropriate prompt for LLM based on intent and results."""
        match intent:

            case "DOC_SUMMARY":
                return f"""
                You are a skilled document analyst tasked with creating a comprehensive yet concise summary. Please analyze the following document content:
                {db_results['content']}
                Provide a summary that:
                1. Captures the main ideas and key points
                2. Maintains the original meaning and intent
                3. Preserves important details and supporting evidence
                4. Uses clear, professional language
                5. Follows a logical flow

                Format the summary in well-organized paragraphs. Include:
                - A brief overview of the document's main topic/purpose
                - The key arguments or findings
                - Important supporting details or examples
                - Any significant conclusions or implications

                If the document contains technical terms, numbers, or data, incorporate them accurately.
                Aim for a length that balances completeness with conciseness - typically 15-25% of the original length.
                If you're unsure about any part of the content, maintain factual accuracy by focusing on the clearly stated information.
                """
            
            case "DOC_ENTITIES":

                entity_label_pairs = [
                    f"- Entity: {entity}\n  Label: {label}"
                    for entity, label in zip(db_results['entities'], db_results['label'])
                    ]
                formatted_entities = "\n".join(entity_label_pairs)
                return f"""
                You are a skilled entity analyzer tasked with explaining the named entities found in the text. Here are the entities and their labels:
                {formatted_entities}
                
                Please provide:
                1. A clear explanation of each entity and its classification
                2. Any patterns or relationships between the entities
                3. Context about why these entities might be important
                4. Group similar entities together (e.g., all organizations, all works of art)

                Format your response as follows:
                - Start with a brief overview of the types of entities found
                - Group entities by their labels
                - For each entity, explain its significance and why it was classified as its given label
                - Note any interesting patterns or relationships between entities

                If there are multiple mentions of the same entity with slight variations (e.g., "ABC" and "ABC Family"), explain the relationship between these variations.

                Keep your response clear and professional, focusing on accuracy and meaningful insights about the entities and their roles.
                """
            
            case "ENTITY_DOCS":
                # Format the list of filenames
                files = "\n".join(f"- {filename}" for filename in db_results['file name'])
                return f"""
                You are a document retrieval specialist tasked with explaining the search results for entity-related documents. The following files contain the requested entity:
                {files}

                Please provide:
                1. A clear overview of the search results
                2. The number of documents found

                Format your response as follows:
                - Start with a summary of the search results (e.g., "Found X documents containing the requested entity")
                - List the documents in a clear, organized manner

                Keep your response:
                - Clear and professional
                - Focused on helping the user understand what documents are available
                - Organized in a way that makes the search results easy to understand
                """
            case "TOPIC_DOCS":
                # Format the list of document names
                topic_docs = "\n".join(f"- {doc}" for doc in db_results['documents'])
                return f"""
                You are a topic search specialist tasked with explaining document search results. The following analysis is based on:
                Original Query: {original_query}

                Documents found containing relevant information:
                {topic_docs}

                Please provide:
                1. A clear explanation of how these documents relate to the query topic
                2. The total number of relevant documents found
                3. Any patterns in the type of information available
                4. Recommendations for which documents might be most relevant to the query

                Format your response as follows:
                - Start with a restatement of the search query and what was found
                - List the relevant documents in order of likely importance
                - Explain why each document might contain relevant information
                - Suggest a reading order that would best address the original query

                Important considerations:
                - Focus on the connection between the documents and the query topic
                - Note if there appear to be different aspects of the topic covered
                - Highlight documents that seem most directly relevant to the query
                - Consider how the documents might complement each other in answering the query

                Keep your response:
                - Focused on helping the user understand why these documents were found
                - Clear about the relationship between the documents and the query
                - Organized to help the user efficiently find the information they need
                If you notice any patterns in how the topic is covered across documents, include these insights to help guide the user's reading.
                """
                
            case "SIMILAR_IMAGE_DOCS":
                # Format the list of filenames
                similar_files = "\n".join(f"- {filename}" for filename in db_results['file name'])
                return f"""
                You are an image search specialist tasked with explaining the results of a visual similarity search. The following images were found to be visually similar to the query image:
                {similar_files}

                Please provide:
                1. A clear overview of the search results
                2. The total number of similar images found
                3. Any patterns in the types of images discovered
                4. Recommendations for which images might be most relevant

                Format your response as follows:
                - Begin with a summary of the visual search results (e.g., "Found X visually similar images")
                - List the similar images in a clear, organized manner
                - If there are patterns in the image names or types, explain their significance
                - Provide context about why these images might be similar to the query image

                Important considerations:
                - Focus on explaining the potential visual similarities
                - Note if there appear to be groups or clusters of similar images
                - Highlight any particularly strong matches based on the file organization
                - Suggest which images might be most worth reviewing first

                Keep your response clear and professional, helping the user understand the visual similarity search results and their potential relevance.
                """
            
    async def generate_llm_response(self, prompt: str) -> str:
        """Generate response using the LLM."""
        inputs = self.tokenizer(prompt, return_tensors="pt")
        
        with torch.no_grad():
            outputs = self.model.generate(
                inputs["input_ids"],
                max_length=200,
                num_return_sequences=1,
                temperature=0.7
            )
        
        response = self.tokenizer.decode(outputs[0], skip_special_tokens=True)
        return response

    async def process_query(self, query: str, image_data: Optional[bytes] = None) -> str:
        """Main method to process user query and generate response."""
        # 1. Classify intent and extract parameters
        intent, params = self.classify_intent(query)
        
        if intent == "UNKNOWN":
            return "I'm sorry, I couldn't understand your query. Please rephrase it."
            
        # 2. Check if image is required but not provided
        if intent == "SIMILAR_IMAGE_DOCS" and image_data is None:
            return "Please provide an image for image similarity search."
        elif intent=="DOC_SUMMARY":
            return await self.execute_db_query(intent, params)
            
        # 3. Execute database query
        db_results = await self.execute_db_query(intent, params)
        print("db result",db_results)
        # 4. Generate LLM prompt
        prompt = self.generate_llm_prompt(intent, db_results, query)
        print("prompt",prompt)
        # 5. Generate final response
        response = await self.generate_llm_response(prompt)
        
        return response

In [50]:
# Initialize the processor
processor = QueryProcessor()

# Function to test queries
async def test_query(query: str, image_data=None):
    print(f"Query: {query}")
    
    # Get intent and parameters
    intent, params = processor.classify_intent(query)
    print(f"Detected Intent: {intent}")
    print(f"Extracted Parameters: {params}")
    
    # Get response
    response = await processor.process_query(query, image_data)
    print(f"Response: {response}")
    print("-" * 50)

# Create event loop
loop = asyncio.get_event_loop()

In [None]:
# Test multiple queries
test_queries = [
    "give summary of document X123",
    "show entities present in document Y456",
    "give documents containing entities",
    "give documents containing information about climate change"
]

for query in test_queries:
    loop.run_until_complete(test_query(query))