# Implementation Semantic Search based on Vector Database for Personal Documents

## 1. Import necessary library

In [None]:
import os
import fitz 
import re
from pymilvus import MilvusClient, connections, Collection, CollectionSchema, FieldSchema, DataType, utility
import numpy as np
from docx import Document
import tempfile
from reportlab.lib.pagesizes import letter
from reportlab.pdfgen import canvas
from sentence_transformers import SentenceTransformer

## 2. Check the connection
make sure to run milvus on docker first.

In [None]:
connections.connect(alias="default", host="localhost", port="19530")

if connections.has_connection("default"):
    print("Connected!")
else:
    print("Connection failed!")

client = MilvusClient(uri="http://localhost:19530")

## 3. Preprocessing Dataset
### 3.1 Function to convert .docx format file to .pdf 

In [None]:
def convert_docx_to_pdf(docx_path, output_dir):
    try:
        doc = Document(docx_path)
        pdf_filename = os.path.splitext(os.path.basename(docx_path))[0] + ".pdf"
        pdf_path = os.path.join(output_dir, pdf_filename)

        # Create PDF
        c = canvas.Canvas(pdf_path, pagesize=letter)
        text = c.beginText(50, 750)  # Starting position
        text.setFont("Helvetica", 10)

        for paragraph in doc.paragraphs:
            text.textLine(paragraph.text)

        c.drawText(text)
        c.save()

        print(f"Converted {docx_path} to {pdf_path}.")
        return pdf_path
    except Exception as e:
        print(f"Error converting {docx_path} to PDF: {e}")
        return None

### 3.2 Function to extract text from PDF file

In [None]:
def extract_text_from_pdf(pdf_path):
    try:
        doc = fitz.open(pdf_path)
        text = ""
        for page_num in range(doc.page_count):
            page = doc.load_page(page_num)
            text += page.get_text()
        return text
    except Exception as e:
        print(f"Error processing {pdf_path}: {e}")
        return None

### 3.3 Function to extract metadata from each file

In [None]:
def extract_metadata(file_path):
    metadata = {"title": os.path.basename(file_path), "author": "Unknown", "subject": "General"}
    
    try:
        if file_path.endswith(".pdf"):
            # Extract metadata from PDF
            doc = fitz.open(file_path)
            pdf_metadata = doc.metadata
            metadata["title"] = pdf_metadata.get("title", metadata["title"])
            metadata["author"] = pdf_metadata.get("author", metadata["author"])
            metadata["subject"] = pdf_metadata.get("subject", metadata["subject"])
        elif file_path.endswith(".docx"):
            # Extract metadata from DOCX
            doc = Document(file_path)
            core_properties = doc.core_properties
            metadata["title"] = core_properties.title or metadata["title"]
            metadata["author"] = core_properties.author or metadata["author"]
            metadata["subject"] = core_properties.subject or metadata["subject"]
    except Exception as e:
        print(f"Error extracting metadata from {file_path}: {e}")
    
    return metadata

### 3.4 Function to clean text

In [None]:
def clean_text(text):
    text = text.lower()  # Convert to lowercase
    text = re.sub(r'\W+', ' ', text)  # Remove non-word characters
    return text

## 4. Generate Embedding

In [None]:
model = SentenceTransformer("sentence-transformers/all-MiniLM-L6-v2")

def generate_embeddings(text, model):
    embeddings = model.encode(text)
    return embeddings

## 5. Function to store Embeddings and metadata in Milvus

In [None]:
def store_in_milvus(client, collection_name, embeddings_list, metadata_list):
    try:
        # Check
        if utility.has_collection(collection_name, using="default"):
            print(f"Collection '{collection_name}' already exists. Dropping and recreating...")
            utility.drop_collection(collection_name, using="default")

        # Milvus schema definition
        fields = [
            FieldSchema(name="id", dtype=DataType.INT64, is_primary=True, auto_id=True),
            FieldSchema(name="embedding", dtype=DataType.FLOAT_VECTOR, dim=384),  
            FieldSchema(name="title_embedding", dtype=DataType.FLOAT_VECTOR, dim=384),  
            FieldSchema(name="subject_embedding", dtype=DataType.FLOAT_VECTOR, dim=384),  
            FieldSchema(name="title", dtype=DataType.VARCHAR, max_length=256),
            FieldSchema(name="author", dtype=DataType.VARCHAR, max_length=256),
            FieldSchema(name="subject", dtype=DataType.VARCHAR, max_length=256),
        ]
        schema = CollectionSchema(fields, description="Schema for document embeddings")

        # Create the collection
        collection = Collection(name=collection_name, schema=schema)
         #  buat collection
        if not client.has_collection(collection_name):
            collection = Collection(name=collection_name, schema=schema)
            print(f"Collection '{collection_name}' created successfully.")
        else:
            collection = Collection(name=collection_name)
            print(f"Collection '{collection_name}' already exists.")

        if "embedding" not in [field.name for field in collection.schema.fields]:
            print(f"Field 'embedding' does not exist in the collection schema.")
            return
        
        
        # Index parameters
        index_params = {
            "index_type": "HNSW",
            "metric_type": "L2",
            "params": {"M": 16, "efConstruction": 200},
        }
        
        try:
            collection.create_index(field_name="embedding", index_params=index_params)
            print(f"Index created successfully for collection: '{collection_name}'.")
        except Exception as e:
            print(f"Failed to create an index on collection: {collection_name}. Error: {e}")

        try:
            collection.create_index(field_name="title_embedding", index_params=index_params)
            print(f"Index created successfully for 'title_embedding' in collection: '{collection_name}'.")
        except Exception as e:
            print(f"Failed to create an index on 'title_embedding' field in collection: {collection_name}. Error: {e}")

        try:
            collection.create_index(field_name="subject_embedding", index_params=index_params)
            print(f"Index created successfully for 'subject_embedding' in collection: '{collection_name}'.")
        except Exception as e:
            print(f"Failed to create an index on 'subject_embedding' field in collection: {collection_name}. Error: {e}")


        # memasukkan data ke collection
        embeddings_data = [embedding.flatten().tolist() for embedding in embeddings_list]
        title_embeddings = [generate_embeddings(metadata.get('title', ''), model).flatten().tolist() for metadata in metadata_list]
        subject_embeddings = [generate_embeddings(metadata.get('subject', ''), model).flatten().tolist() for metadata in metadata_list]
        
        embeddings_data = [embedding.tolist() for embedding in embeddings_list]
        titles = [metadata.get('title', '') for metadata in metadata_list]
        authors = [metadata.get('author', '') for metadata in metadata_list]
        subjects = [metadata.get('subject', '') for metadata in metadata_list]

        data = [
            embeddings_data,
            title_embeddings, 
            subject_embeddings,  
            titles,
            authors,
            subjects,
        ]

        # Insert data into Milvus
        collection.insert(data)
        print(f"Data stored successfully in collection '{collection_name}'.")

    except Exception as e:
        print(f"Error storing in Milvus: {e}")

## Main Function

In [None]:
def process_and_store_pdf(dataset_dir, collection_name):
    embeddings_list = []
    metadata_list = []

    with tempfile.TemporaryDirectory() as temp_pdf_dir:
        for subfolder in os.listdir(dataset_dir):
            subfolder_path = os.path.join(dataset_dir, subfolder)
            if os.path.isdir(subfolder_path):
                print(f"Processing folder: {subfolder_path}")

                for file in os.listdir(subfolder_path):
                    file_path = os.path.join(subfolder_path, file)

                    if file.endswith(".pdf"):
                        print(f"Processing PDF file: {file_path}...")
                        text = extract_text_from_pdf(file_path)
                    elif file.endswith(".docx"):
                        print(f"Converting DOCX to PDF: {file_path}...")
                        converted_pdf_path = convert_docx_to_pdf(file_path, temp_pdf_dir)
                        if converted_pdf_path:
                            text = extract_text_from_pdf(converted_pdf_path)
                        else:
                            print(f"Skipping conversion for {file_path} due to an error.")
                            continue
                    else:
                        print(f"Skipping unsupported file: {file_path}")
                        continue

                    if text:
                        cleaned_text = clean_text(text)
                        embeddings = generate_embeddings(cleaned_text, model)
                        metadata = extract_metadata(file_path)  
                        embeddings_list.append(embeddings)
                        metadata_list.append(metadata)

        if embeddings_list:
            store_in_milvus(client, collection_name, embeddings_list, metadata_list)


dataset_dir = "PersonalDocumentsDataset" # Adjust the directory
collection_name = "dataset" 

process_and_store_pdf(dataset_dir, collection_name)

## Check Collection Content
Ensure the entire dataset is stored in Milvus

In [None]:
# cek isi koleksi
def check_collection_contents(collection_name):
    try:
        collection = Collection(name=collection_name)
        collection.load()
        results = collection.query(expr="id >= 0", output_fields=["id", "title", "author", "subject"], limit=200)

        for result in results:
            print(f"Document ID: {result['id']}, Title: {result.get('title', 'N/A')}, Author: {result.get('author', 'N/A')}, Subject: {result.get('subject', 'N/A')}")
    
    except Exception as e:
        print(f"Error checking collection contents: {e}")

collection_name = "dataset"
check_collection_contents(collection_name)


## 6. Implementation of vector search

In [None]:
def vector_search(collection_name, query_embedding):
    print("\n--- Scenario 1: Direct Vector Search ---")
    try:
        search_params = {
            "metric_type": "L2",
            "params": {"ef": 128}
        }

        collection = Collection(name=collection_name)
        collection.load()

        # Search on the document embedding
        results_doc = collection.search(
            data=[query_embedding],
            anns_field="embedding",  
            param=search_params,
            limit=5,
            output_fields=["title", "author"]
        )
        
        # Search on the title embedding
        results_title = collection.search(
            data=[query_embedding],
            anns_field="title_embedding",
            param=search_params,
            limit=5,
            output_fields=["title", "author"]
        )

        # Search on the subject embedding
        results_subject = collection.search(
            data=[query_embedding],
            anns_field="subject_embedding", 
            param=search_params,
            limit=5,
            output_fields=["title", "author"]
        )

        combined_results = []
        
        # Add results from document search
        for hits in results_doc:
            for result in hits:
                combined_results.append({"id": result.id, "distance": result.distance, "type": "document", "title": result.title, "author": result.author})
        
        # Add results from title search
        for hits in results_title:
            for result in hits:
                combined_results.append({"id": result.id, "distance": result.distance, "type": "title", "title": result.title, "author": result.author})
        
        # Add results from subject search
        for hits in results_subject:
            for result in hits:
                combined_results.append({"id": result.id, "distance": result.distance, "type": "subject", "title": result.title, "author": result.author})

        # Sort combined results by distance (ascending)
        combined_results = sorted(combined_results, key=lambda x: x['distance'])

        # Set to track unique document IDs and avoid duplicates
        seen_ids = set()
        final_results = []
        idx = 0

        # Loop to collect unique top 5 documents or you can customize as needed
        while len(final_results) < 5 and idx < len(combined_results):
            result = combined_results[idx]
            if result['id'] not in seen_ids:
                final_results.append(result)
                seen_ids.add(result['id'])
            idx += 1
            
        
        # Print the top 5 results (after ensuring uniqueness)
        print("Top 5 documents based on distance:")
        for result in final_results:
            print(f"Document ID: {result['id']}, Distance: {result['distance']}, Type: {result['type']}")
            print(f"Title: {result['title']}, Author: {result['author']}\n")

    except Exception as e:
        print(f"Unexpected error during direct vector search: {e}")

## 7. Pre-processing of user query
Function to fix typo and translate user query using Qwen2.5-72B-Instruct

In [None]:
from gradio_client import Client

def chatbot(query):
	client = Client("Qwen/Qwen2.5-72B-Instruct")
	result = client.predict(
		query=query,
		history=[],
		system="anda akan menerima inputan user. cukup tuliskan ulang query tersebut dan perbaiki typonya jika ada. tidak perlu penjelasan apa saja yang diperbaiki. jika input dalam bahasa inggris, translate ke bahasa indonesia.",
		api_name="/model_chat"
	)
	return result[1][0][1]

## 8. Processing user query and executing vector searches

In [None]:
def user_query_search(collection_name):
    # Input
    query_text = input("Masukkan query pencarian Anda: ")  
    
    processed_query = chatbot(query_text)
    print(f"Apakah yang anda maksud: {processed_query}")

    query_embedding = generate_embeddings(processed_query, model)
    
    collection_name =  "dataset"

    # RUN
    vector_search(collection_name=collection_name,  query_embedding=query_embedding.tolist())

In [None]:
# Memanggil fungsi untuk query pengguna 
user_query_search(collection_name="dataset")