In [2]:
from langchain_ollama.embeddings import OllamaEmbeddings
from langchain_community.llms import Ollama
from langchain_community.document_loaders import PyPDFLoader
from langchain_text_splitters import RecursiveCharacterTextSplitter
import hashlib
from datetime import datetime

class VectorIngestionPipeline:
    def __init__(self, vector_db, embedding_model):
        self.vector_db = vector_db
        self.embeddings = embedding_model
        self.text_splitter = RecursiveCharacterTextSplitter(
            chunk_size=1000,
            chunk_overlap=200
        )
    
    def process_document(self, file_path, metadata=None):
        # 1. Extract
        loader = PyPDFLoader(file_path)
        documents = loader.load()
        
        # 2. Chunk & Preprocess
        chunks = self.text_splitter.split_documents(documents)
        
        # 3. Generate embeddings and insert
        for i, chunk in enumerate(chunks):
            self.ingest_chunk(chunk, i, metadata)
    
    def ingest_chunk(self, chunk, chunk_id, base_metadata):
        # Generate embedding
        embedding = self.embeddings.embed_query(chunk.page_content)
        
        # Enrich metadata
        enriched_metadata = {
            **base_metadata,
            "chunk_id": chunk_id,
            "timestamp": datetime.now().isoformat(),
            "content_hash": hashlib.sha256(chunk.page_content.encode()).hexdigest()
        }
        
        # Insert to vector DB
        self.vector_db.upsert(
            vectors=[(f"{base_metadata['source']}_{chunk_id}", embedding, enriched_metadata)]
        )

In [3]:
import spacy
class SmartChunker:
    def __init__(self):
        self.nlp = spacy.load("en_core_web_sm")
        
    def semantic_chunking(self, text, max_chunk_size=1000):
        """Chunk by sentences while respecting max size"""
        doc = self.nlp(text)
        sentences = [sent.text.strip() for sent in doc.sents]
        
        chunks = []
        current_chunk = ""
        
        for sentence in sentences:
            if len(current_chunk + sentence) <= max_chunk_size:
                current_chunk += sentence + " "
            else:
                if current_chunk:
                    chunks.append(current_chunk.strip())
                current_chunk = sentence + " "
        
        if current_chunk:
            chunks.append(current_chunk.strip())
            
        return chunks
    
    def recursive_chunking(self, text):
        """Use LangChain's recursive splitter for complex documents"""
        splitter = RecursiveCharacterTextSplitter(
            chunk_size=1000,
            chunk_overlap=200,
            separators=["\n\n", "\n", ". ", " ", ""]
        )
        return splitter.split_text(text)

In [7]:
import json
from typing import List
class EmbeddingManager:
    def __init__(self, model_name="embeddinggemma:300m"):
        self.model_name = model_name
        self.model = OllamaEmbeddings(model=model_name)
    
    def _get_model_version(self):
        return f"{self.model_name}_v1.0"
    
    def embed_texts(self, texts: List[str]) -> List[List[float]]:
        try:
            return self.model.embed_documents(texts)
        except Exception:
            return [self.model.embed_query(t) for t in texts]

    
    def get_embedding_metadata(self):
        return {
            "model_name": self.model_name,
            "model_version": self.model_version,
            "embedding_dimension": self.model.get_sentence_embedding_dimension(),
            "normalization": "l2"
        }
# Usage example
embedding_manager = EmbeddingManager("embeddinggemma:300m")
texts = ["Sample text 1", "Sample text 2"]
embeddings = embedding_manager.embed_texts(texts)

In [9]:
import langdetect
from datetime import datetime
import re
class MetadataEnricher:
    def __init__(self):
        pass
    
    def enrich_metadata(self, text, base_metadata=None):
        base_metadata = base_metadata or {}
        
        enriched = {
            **base_metadata,
            "timestamp": datetime.now().isoformat(),
            "word_count": len(text.split()),
            "char_count": len(text),
            "language": self._detect_language(text),
            "content_hash": hashlib.sha256(text.encode()).hexdigest(),
            "has_numbers": bool(re.search(r'\d', text)),
            "has_urls": bool(re.search(r'http[s]?://', text)),
        }
        
        return enriched
    
    def _detect_language(self, text):
        try:
            return langdetect.detect(text)
        except:
            return "unknown"