In [4]:
import requests
import json
from typing import List, Dict, Any

def clean_json_string(json_string: str) -> str:
    """
    Cleans a JSON string by removing escape characters and unnecessary whitespace.
    """
    return (
        json_string.replace('"', '"')
        .replace("\"", '"')
        .replace("\\n", " ")
        .replace("\\'", "'")
        .replace('"""', '"')
        .replace('\n', ' ')
        .replace('""""', '"')
        .strip()
    )

def chunk_text(
    text: str,
    sentences_per_passage: int,
    filter_sentence_len: int,
    sliding_distance: int = None,
) -> List[str]:
    """
    Chunks text into passages using a sliding window.

    Args:
        text: Text to chunk into passages.
        sentences_per_passage: Number of sentences for each passage.
        filter_sentence_len: Maximum number of chars of each sentence before being filtered.
        sliding_distance: Sliding distance over the text. Allows the passages to have
            overlap. The sliding distance cannot be greater than the window size.
    Returns:
        passages: Chunked passages from the text.
    """
    if not sliding_distance or sliding_distance > sentences_per_passage:
        sliding_distance = sentences_per_passage
    assert sentences_per_passage > 0 and sliding_distance > 0

    passages = []
    try:
        import spacy
        nlp = spacy.load("en_core_web_sm", disable=["ner", "tagger", "lemmatizer"])
        doc = nlp(text[:500000])  # Take 500k chars to not break tokenization.
        sents = [
            s.text
            for s in doc.sents
            if len(s.text) <= filter_sentence_len  # Long sents are usually metadata.
        ]
        for idx in range(0, len(sents), sliding_distance):
            passages.append(" ".join(sents[idx : idx + sentences_per_passage]))
    except UnicodeEncodeError as _:  # Sometimes run into Unicode error when tokenizing.
        print("Unicode error when using Spacy. Skipping text.")

    return passages

def score_and_sort_passages(passages: List[str], query: str) -> List[Dict[str, Any]]:
    """
    Scores passages by relevance to the query and sorts them by score.

    Args:
        passages: A list of text passages.
        query: The query string.

    Returns:
        A list of dictionaries containing passages and their scores, sorted by score.
    """
    from sentence_transformers import CrossEncoder

    # Load the cross-encoder for scoring
    model = CrossEncoder("cross-encoder/ms-marco-MiniLM-L-6-v2", device="cuda")

    # Score each passage
    scores = model.predict([(query, passage) for passage in passages]).tolist()

    # Combine passages and scores, then sort by score in descending order
    scored_passages = [
        {"text": passage, "score": score}
        for passage, score in zip(passages, scores)
    ]
    scored_passages.sort(key=lambda x: x["score"], reverse=True)

    return scored_passages

def run_search_and_process(queries: List[str], search_url: str, sentences_per_passage: int, filter_sentence_len: int, sliding_distance: int) -> List[List[Dict[str, Any]]]:
    """
    Sends queries to the search server, retrieves results, chunks the texts into passages,
    scores the passages, and sorts them by relevance.

    Args:
        queries: A list of query strings.
        search_url: URL of the search server.
        sentences_per_passage: Number of sentences per passage.
        filter_sentence_len: Maximum number of characters per sentence.
        sliding_distance: Sliding distance for overlapping passages.

    Returns:
        A list of lists where each sublist contains scored and sorted passages for the corresponding query.
    """
    headers = {"User-Agent": "Test Client"}
    payload = {"query": queries}

    try:
        response = requests.post(search_url, headers=headers, json=payload)
        response.raise_for_status()
    except requests.exceptions.RequestException as e:
        print(f"Error in search server request: {e}")
        return [[] for _ in queries]

    try:
        data = json.loads(response.content)
        print(f"data ::: {data}\n\n")
        outputs = data.get("document", [])
        processed_outputs = [clean_json_string(doc) for docs in outputs for doc in docs]

        final_results = []
        for query, text in zip(queries, processed_outputs):
            passages = chunk_text(
                text=text,
                sentences_per_passage=sentences_per_passage,
                filter_sentence_len=filter_sentence_len,
                sliding_distance=sliding_distance,
            )
            scored_passages = score_and_sort_passages(passages, query)
            final_results.append(scored_passages)

        return final_results
    except (KeyError, json.JSONDecodeError) as e:
        print(f"Error parsing search server response: {e}")
        return [[] for _ in queries]


def test_run_search_and_process():
    # Search server URL (replace with actual server URL)
    search_url = "http://127.0.0.1:8000/"  # Example URL, replace with actual server URL

    # Example queries
    queries = [
        "What is the capital of France?",
        "Who discovered gravity?",
        "Explain the theory of relativity."
    ]

    # Parameters for passage chunking
    sentences_per_passage = 5
    filter_sentence_len = 250  # Maximum sentence length
    sliding_distance = 1

    # Process each query individually
    for query in queries:
        print(f"Processing query: {query}")
        results = run_search_and_process(
            queries=[query],  # Single query as a list
            search_url=search_url,
            sentences_per_passage=sentences_per_passage,
            filter_sentence_len=filter_sentence_len,
            sliding_distance=sliding_distance
        )
        
        print(f"final_result ::: {results}")

        # Print results for the current query
        print(f"Results for query: {query}")
        for passage in results[0]:  # Results[0] since only one query was processed
            print(f"  Passage: {passage['text']}")
            print(f"  Score: {passage['score']}\n")

# Run the test
if __name__ == "__main__":
    test_run_search_and_process()

Processing query: What is the capital of France?
data ::: {'Hello': 'POST', 'query': ['What is the capital of France?'], 'status': 'search completed', 'document': [['"Council of Paris"\nCouncil of Paris The Council of Paris (""Conseil de Paris"") is the deliberative body responsible for the governing of Paris, the capital of France. It possesses simultaneously the powers of a Paris Municipal Council (""Conseil municipal"") and those of a General Council (Departmental Council) for the ""Département de Paris"", as defined by the so-called PLM Law ("""") of 1982 that redefined the governance of Paris, Lyon, and Marseilles. Paris is, in effect, the only territorial collectivity in France to be, at one time, a ""commune"" (commune or municipality) and a ""département"" (county or shire), and this arrangement has been a', '"Municipal arrondissements of France"\n466,000 inhabitants in Lyon) and the law was meant to have the local administrations become more accessible and tied to their respec