In [None]:
!pip install langchain-text-splitters langchain pymupdf langchain_ollama psycopg2-binary pgvector pulsar-client 

In [30]:
from queue import Queue
import multiprocessing
import threading
from threading import Thread
import psycopg2
import time
import hashlib
from io import BytesIO
import io
import requests
import pymupdf  # PyMuPDF
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_ollama import OllamaEmbeddings
import pulsar
from pdf_schema import PDFInfo
from pulsar.schema import JsonSchema
import logging
logging.getLogger("pulsar").setLevel(logging.ERROR)

In [32]:
# Clean shutdown handler
import signal
from queue import Queue
shutdown = threading.Event()
def signal_handler(sig, frame):
    print('Interrupt, shutting down gracefully...')
    shutdown.set()

signal.signal(signal.SIGINT, signal_handler)

# Postgres connection parameters
host = "pdf-service"
user = "postgres"
password = "admin"
db = "search"

# Insert queries
sources_insert = "INSERT INTO sources (id, uri, title, author, summary) VALUES (%s, %s, %s, %s, %s)"
#"""INSERT INTO sources (id, uri, title, summary, title_embedding, summary_embedding)
#VALUES (%s, %s, %s, %s, %s, %s)"""
semantic_insert = "INSERT INTO semantic_search (id, page, chunk, embedding) VALUES (%s, %s, %s, %s)"
keyword_insert = "INSERT INTO keyword_search (id, page, ts) VALUES (%s, %s, to_tsvector('english', %s))"

def pdf_retrieve(url):
    with BytesIO() as stream_buffer:
        response = requests.get(url, stream=True)
        if response.status_code != 200:
            raise Exception(f"Failed to download PDF; received HTTP {response.status_code} from underlying server")
        for chunk in response.iter_content(chunk_size=io.DEFAULT_BUFFER_SIZE):
            stream_buffer.write(chunk)
        doc = pymupdf.open(stream=stream_buffer, filetype="pdf")
        return doc

def thread_task(shutdown,consumer,con,chunk_size=500, chunk_overlap=50):
    # Embeddings connection for each thread
    embed = OllamaEmbeddings(
        model="all-minilm",
        base_url ="http://host.docker.internal:11434"
    )
    # LangChain's Recursive Text Splitter
    text_splitter = RecursiveCharacterTextSplitter(
        chunk_size=chunk_size, 
        chunk_overlap=chunk_overlap,  
        separators=["\n\n", "\n", " ", ""],  # Prioritize logical breaks
    )
    while shutdown.is_set():
        try:
            msg = consumer.receive(timeout_millis=5000)
            # Try to process message? Not totally sure why this is needed.
            try:
                print(f"Processing message: {msg.message_id()}")
                pdf_json = msg.value()
                print(f"PDF url: {pdf_json}")
                if embed.embed_query('hello') is None:
                    print("Ollama not available")
                # Acknowledge successful processing of the message
                consumer.acknowledge(msg)
                with con.cursor() as cur:
                    try:
                        key = hashlib.md5(pdf_json.url.encode()).hexdigest()
                        pdf_doc = pdf_retrieve(pdf_json.url)
                        cur.execute(sources_insert,(key,pdf_json.url,pdf_json.title,pdf_json.authors,pdf_json.summary,))
                        for page in pdf_doc:
                            try:
                                cur.execute(keyword_insert,(key,page.number,page.get_text().replace('\x00', ''),))
                                chunks = text_splitter.split_text(page.get_text())
                                embeddings = embed.embed_documents(chunks)
                                for chunk_id, (chunk,embedding) in enumerate(zip(chunks,embeddings)):
                                    cur.execute(semantic_insert,(key,page.number,chunk_id,embedding,))
                            except Exception as e:
                                print(f"Error processing page {page.number} of {pdf_json.url}: {e}")
                                continue  # Skip to the next page
                    except Exception as e:
                        print(f"Error downloading PDF from url: {pdf_json.url}: {e}")
                        continue  # Skip to the next page
            except Exception as e:
                print(f"Failed to process message: {e}")
                # Message failed to be processed
                consumer.negative_acknowledge(msg)
        except Exception:
            # We want timeout so we don't get stuck infinitely waiting for new message, need to evaluate shutdown variable every so often
            continue
    consumer.close()
    con.close()
        

cores = multiprocessing.cpu_count()

pulcli = pulsar.Client('pulsar://pdf-service:6650')
join_list = []
for _ in range(0,cores):
    con = psycopg2.connect(dbname=db, user=user, password=password, host=host)  # Dedicated connection per thread
    con.autocommit = True  # Ensure auto-commit mode to avoid locks
    consumer = pulcli.subscribe(
                  topic='scraper-output-test',
                  subscription_name=f"pdf-process-consumer",
                  consumer_type=pulsar.ConsumerType.Shared,
                  schema=JsonSchema(PDFInfo) )
    worker = Thread(target=thread_task, args=(shutdown,consumer,con))
    worker.start()
    join_list.append(worker)

print(f"Running {cores} workers") 
        
for thread in join_list:
    thread.join()
time.sleep(1)
print("Completed")

2025-03-26 20:17:27.335 INFO  [140535807162176] Client:86 | Subscribing on Topic :scraper-output-test
2025-03-26 20:17:27.336 INFO  [140535807162176] ClientConnection:193 | [<none> -> pulsar://pdf-service:6650] Create ClientConnection, timeout=10000
2025-03-26 20:17:27.336 INFO  [140535807162176] ConnectionPool:124 | Created connection for pulsar://pdf-service:6650-pulsar://pdf-service:6650-0
2025-03-26 20:17:27.339 INFO  [140532974073408] ClientConnection:410 | [10.1.1.31:36480 -> 10.102.154.227:6650] Connected to broker
2025-03-26 20:17:27.342 INFO  [140532974073408] HandlerBase:115 | [persistent://public/default/scraper-output-test, pdf-process-consumer, 0] Getting connection from pool
2025-03-26 20:17:27.344 INFO  [140532974073408] BinaryProtoLookupService:85 | Lookup response for persistent://public/default/scraper-output-test, lookup-broker-url pulsar://localhost:6650, from [10.1.1.31:36480 -> 10.102.154.227:6650] 
2025-03-26 20:17:27.344 INFO  [140532974073408] ClientConnection: