# LLM Query Synthesis

## Setup

Before we can generate queries, we need to do some setup.
Make sure to set the following variables.

In [None]:
dataset_name = "fiqa" # fiqa, nq, nfcorpus
is_cloud_deployment = False # False if using Docker

# NB: Set if using Vespa Cloud
tenant = "my-tenant"
app = "my-app"
instance = "default"
url = "my-endpoint-url"

Make sure a ".env" file exists containing your OpenAI API key.

In [None]:
import openai
from pathlib import Path

openai.api_key_path = "./.env"
root = Path("..")
dataset = root / "datasets" / dataset_name

# Many datasets are divided into train, val/dev and test splits, while some are not.
# This information is needed when generating qrels, among other things.
has_splits = any(dataset.glob('train-*'))
queries_file_name = "train-queries" if has_splits else "queries"
qrels_file_name = "train-qrels" if has_splits else "qrels"

While we don't need Vespa to generate the queries, we will be using Vespa to generate [qrels](https://trec.nist.gov/data/qrels_eng/).
Run this cell to connect to your Vespa app (assuming it's running).

In [None]:
from vespa.application import  Vespa

if is_cloud_deployment:
    vespa_dir = Path.home() / ".vespa" / f"{tenant}.{app}.{instance}"
    vespa_app = Vespa(
        url=url,
        cert=vespa_dir / "data-plane-public-cert.pem",
        key=vespa_dir / "data-plane-private-key.pem"
    )
else:
    # Docker deployment
    vespa_app = Vespa(url = "http://localhost:8080")

Loading the documents into a dictionary makes them easier to work with

In [None]:
from typing import Dict
import json

def load_docs_as_dict(docs_path: Path) -> Dict[str, str]:
    """
    :param docs_path:
    :return: A dictionary mapping document ids to document text
    """
    docs = {}
    with open(docs_path) as d:
        for line in d:
            json_line = json.loads(line)
            did = json_line["doc_id"]
            passage = json_line["text"]
            title = json_line["title"]
            docs[did] = str({"title": title, "passage": passage})

    return docs

documents = load_docs_as_dict(dataset / "docs.jsonl")

In many cases, it's useful to be able to have a mapping between queries and documents, based on a line in a qrels file.
This class can be used to load information about the query and document connected by a given qrel.

In [None]:
from dataclasses import dataclass

@dataclass
class QueryDocumentMapping:
    """Usage: mapping = QueryDocumentMapping.from_qrel(qrel_file, line)"""

    qid: str = None
    query_text: str = None
    did: str = None
    document_text: str = None

    @staticmethod
    def from_qrel(query_file: Path, qrel: str) -> "QueryDocumentMapping":
        qid, _, did, _ = qrel.split(" ")

        query_text = None

        # Expand query text
        with open(query_file) as qf:
            for line in qf:
                query_id, text = line.split("\t")
                if query_id == qid:
                    query_text = text
                    break
        if not query_text:
            raise ValueError(f"No query found for id {qid}")

        # Expand document text
        document_text = documents[did]

        return QueryDocumentMapping(qid=qid, query_text=query_text.rstrip(), did=did, document_text=document_text)

In [None]:
import random
from typing import List

def get_random_doc_ids(num: int) -> List[int] | None:
    """
    :param num: number of doc ids to fetch
    :return: Returns a random list of document ids.
    """

    list_of_docs = list(documents)
    if len(list_of_docs) >= num:
        return random.sample(list_of_docs, num)
    else:
        print(f"Documents dictionary has less than {num} documents. Can't sample documents.")

In [None]:
def get_random_lines(file: Path, num_lines: int) -> List:
    """
    :param file: The file to get lines from
    :param num_lines: How many lines to fetch
    :return: The list of randomly selected lines
    """
    with open(file, "r") as f:
        lines = f.readlines()

    indices = random.sample(range(len(lines)), num_lines)
    sample = [lines[i] for i in indices]

    return sample

In [None]:
def get_random_qrels(qrel_file: Path, query_file: Path, num_qrels: int) -> List[QueryDocumentMapping]:
    """
    :param qrel_file: File to sample qrel lines from
    :param num_qrels: How many qrels to return
    :return: List of qrels
    """
    lines = get_random_lines(qrel_file, num_qrels)
    return [QueryDocumentMapping.from_qrel(query_file, line) for line in lines]

## ChatGPT stuff

### Generating random examples

Instead of creating a detailed prompt outlining exactly how queries should be generated,
we can show ChatGPT some examples of associations between queries and documents.
This is called few-shot learning and appears to improve the quality of the generated queries.
Randomly sampling examples from the existing training data should in theory
prevent the generated queries from becoming too similar to each other.

In [None]:
from typing import List, Dict

def generate_random_examples(num_examples: int=1, queries_per_ex_doc: int=1) -> List[Dict[str, str]]:
    """
    Create a list of examples of documents and queries that return those documents.
    :param num_examples: How many document-query pairs to generate
    :param queries_per_ex_doc: How many queries should be shown per example document
    :return: List of dictionary mapping document texts to query text(s)
    """
    mappings = get_random_qrels(dataset / qrels_file_name, dataset / queries_file_name, num_examples)
    if queries_per_ex_doc == 1:
        return _generate_single_examples(mappings)
    else:
        return _generate_multiple_examples(mappings, queries_per_ex_doc)

def _generate_single_examples(mappings: List[QueryDocumentMapping]) -> List[Dict[str, str]]:
    examples = []
    for m in mappings:
        examples.append({"role": "user", "content": m.document_text})
        examples.append({"role": "assistant", "content": f"['{m.query_text}']"})
    return examples

def _generate_multiple_examples(mappings: List[QueryDocumentMapping], examples_per_doc: int)-> List[Dict[str, str]]:
    with open(dataset / qrels_file_name, "r") as qrf:
        qrel_lines = qrf.readlines()

    qrels_for_each_doc = []
    doc_ids = [m.did for m in mappings]
    failure_count = 0
    failure_max = 10
    for did in doc_ids:
        qrels_for_current_doc = [line for line in qrel_lines if did in line]
        if len(qrels_for_current_doc) < examples_per_doc:
            # Means it could not find enough queries for that doc. Will attempt with a different doc.
            failure_count += 1
            doc_ids.extend(get_random_doc_ids(1))
            if failure_count == failure_max:
                raise ValueError(f"Couldn't find {examples_per_doc} queries for all documents. Dataset probably has too few queries per document in qrels.")
            else:
                continue
        else:
            qrels_for_each_doc.append(qrels_for_current_doc[:examples_per_doc])

    doc_query_mappings = []
    for did, qrels in zip(doc_ids, qrels_for_each_doc):
        doc_query_mappings.append({did: [QueryDocumentMapping.from_qrel(dataset / queries_file_name, qrel).query_text for qrel in qrels]})

    examples = []
    for m in doc_query_mappings:
        for did, queries in m.items():
            examples.append({"role": "user", "content": f"{documents[did]}"})
            examples.append({"role": "assistant", "content": f"{str(queries)}"})

    return examples

### Some useful helper functions

Calls to the OpenAI ChatCompletion API may fail for various reasons.
**tenacity** lets us retry API calls using exponential backoff.

In [None]:
from tenacity import (
    retry,
    stop_after_attempt,
    wait_random_exponential,
)

@retry(wait=wait_random_exponential(min=1, max=60), stop=stop_after_attempt(6))
def completion_with_backoff(**kwargs):
    return openai.ChatCompletion.create(**kwargs)

In [None]:
from openai import ChatCompletion

def get_chat_completion_content(completion: ChatCompletion) -> str:
    """
    Process and return the content of a ChatGPT completion as a string.
    """
    comp_dict = completion.to_dict_recursive()
    return comp_dict["choices"][0]["message"]["content"]

### Qrel generation

Query relevance judgements (qrels) are used to determine if a given document is relevant for a given query.
These are normally made manually by humans following certain guidelines.
It would be beneficial to be able to automatically generate these qrels using ChatGPT.

For each generated query, we'll use Vespa to return a list of documents.
Then, we'll ask ChatGPT to determine whether each returned document is relevant.

In [None]:
def vespa_query(app, query, num_hits=3):
    body = {
        "yql": "select * from doc where ({targetHits:100}nearestNeighbor(embedding,e))",
        "input.query(e)": f"embed({query})",
        "hits": num_hits,
        "ranking": "ann",
    }
    return app.query(body=body)

In [None]:
def process_returned_vespa_docs(query_results):
    """Turn the documents returned by Vespa into a less messy format."""
    processed_docs = []
    for hit in query_results.hits:
        id = hit["id"].split("::")[-1]
        text = hit["fields"]["abstract"].replace("passage: ", "")
        if "title" in hit["fields"].keys():
            title = hit["fields"]["title"].replace("passage: ", "")
            doc = {
                "id": id,
                "title": title,
                "text": text
            }
        else:
            doc = {
                "id": id,
                "text": text
            }

        processed_docs.append(json.dumps(doc))

    return processed_docs

In [None]:
def split_into_chunks(lst, chunk_size):
    """Splits a list into chunks of the specified size."""
    return [lst[i:i+chunk_size] for i in range(0, len(lst), chunk_size)]

We need to tell ChatGPT how it should process the list of documents returned by Vespa.
We ended up with this prompt through trial and error.
We found that the easiest format to receive the qrels in was as JSON.
This sometimes fails, though, but in that case, the results are simply discarded.

In [None]:
qrel_prompt = """
You are an advanced relevance ranking system.
You will receive a query and a list of documents, and output whether each document is relevant to the query.
A document is relevant if it, in a perfect world, is supposed to show up in the retrieved documents list for a given search.
Relevant documents are denoted with the number 1, and non-relevant documents are denoted with the number 0.

### Output format ###
The output is a valid list of JSON objects with the following fields

[
    {
     "id": <id>,
     "relevant": <1 or 0>
    },
    {
     "id": <id>,
     "relevant": <1 or 0>
    }
]
"""

In [None]:
def generate_qrels(qid: int, query: str, qrels_per_query: int, qrels_file: Path) -> List[str]:
    """
    Generate (up to) a given number of qrels for a query and write to file.
    If the number of relevant documents retrieved by Vespa is lower than qrels_per_query,
    then the number of qrels generated will be a bit lower.

    :param qid: Query id
    :param query: Query text to generate qrels from
    :param qrels_per_query: Target number of qrels to generate for 'query'
    :return: The generated qrels
    """

    result = vespa_query(vespa_app, query, num_hits=qrels_per_query)
    docs = process_returned_vespa_docs(result)

    chunked_docs = split_into_chunks(docs, 5) # Processing in chunks to save money by using 4k context instead of 16k

    qrels = []
    for chunk in chunked_docs:
        chat_completion = completion_with_backoff(
            model="gpt-3.5-turbo",
            messages=[
                {"role": "system", "content": qrel_prompt},
                {"role": "user", "content": f"Query: {query}\nDocuments: " + str(chunk)},
            ]
        )

        completion_content = get_chat_completion_content(chat_completion)
        with open(qrels_file, "a+") as gpt_qrels_file:
            try:
                comp = json.loads(completion_content)
                for doc in comp:
                    if doc["relevant"] == 1:
                        qrel = f"{qid} 0 {doc['id']} 1\n"
                        gpt_qrels_file.write(qrel)
                        qrels.append(qrel)
                print(comp)
            except:
                print(f"ChatGPT probably returned invalid JSON:\n{completion_content}")

    return qrels

## Prompt generation

Ideally, as little manual labor as possible should be necessary to generate data.
ChatGPT does require some guidance specific to each dataset, though.
In order to avoid having to hand craft prompts describing each dataset,
we've come up with a way to generate rules that describe the dataset, using ChatGPT.
ChatGPT is shown a random selection of query-document pairs, as well as queries by themselves,
and asked to describe the dataset with a bullet list.
This list is later injected into the prompt used to actually generate queries.

The generated rules can vary wildly between runs.
Presumably, some generated prompts will be better than others.
As of right now, we don't have a systematic way of determining which prompt is best.
Instead, we simply take a good look at the prompt to decide if it seems sound.

In [None]:
num_query_doc_pairs = 10 # These numbers may be tweaked to show ChatGPT more or less data
num_ex_queries = 15

ex_qrels = get_random_qrels(dataset / qrels_file_name, dataset / queries_file_name, num_query_doc_pairs)
ex_queries = [line.split("\t")[1] for line in get_random_lines(dataset / queries_file_name, num_ex_queries)]
examples = "Example query-document pairs:\n"
for qrel in ex_qrels:
    examples += "Document: " + json.dumps(qrel.document_text, indent=4)
    examples += "\nQuery: " + qrel.query_text + "\n\n"

examples += "Example queries:\n"
for query in ex_queries:
    examples += query

In [None]:
autogen_prompt = f"""
You are an AI system designed to describe datasets.
You will provide rules that describe how the queries of the dataset are formulated by looking at query-document pairs as well as queries.
Please include information about the length, style and formatting of the queries, among other things.
Focus on the style of the _query_ in the examples provided.

Please output a list of rules that describe the dataset as a bullet list like this:
- Queries contain x-y words
- Queries are [terse, elaborate, etc.]
- ...
"""

def autogenerate_prompt():
    messages = [
        {"role": "system", "content": autogen_prompt},
        {"role": "user", "content": examples}
    ]

    chat_completion = completion_with_backoff(
        model="gpt-3.5-turbo-16k",
        messages=messages
    )

    return get_chat_completion_content(chat_completion)

In [None]:
rules = autogenerate_prompt()
print(rules)


## Query generation

This is the cool part.
To generate queries, ChatGPT is first shown a couple of examples of query-document pairs.
Then, it's shown a new document and asked to generate a one or more queries that should return this document.

### Some setup

An experiment consists of a batch of generated queries and qrels, as well as information related to the experiment (like the prompt used, number of examples, etc.)
Experiments are saved to the **experiments** directory.

In [None]:
from datetime import datetime

def generate_new_experiment_dir() -> str:
    now = datetime.now()
    timestamp = now.strftime("%Y-%m-%d_%H-%M-%S")
    exp_dir = Path.cwd().parent / "experiments" / dataset_name / timestamp
    exp_dir.mkdir(parents=True, exist_ok=True)
    return exp_dir

It can be useful to be able to load a previous experiment to do more work on it, for example if query generation crashes.
Enable the flag below to load the last experiment, or set the **experiment_dir** variable manually to load an arbitrary experiment.
If you're running this for the first time, though, you most likely want to ignore this.

In [None]:
load_previous_experiment = False

def previous_experiment() -> Path:
    parent_dir = Path.cwd().parent / "experiments" / dataset_name
    dirs = [f for f in parent_dir.iterdir() if f.is_dir()]
    return dirs[0]

if load_previous_experiment:
    experiment_dir = previous_experiment()
    print(f"Loaded previous experiment: {experiment_dir}")
else:
    experiment_dir = generate_new_experiment_dir()
    print(f"Created experiment: {experiment_dir}")

The following options determine how many queries and qrels to generate,
as well as how many queries to generate per document.

In [None]:
target_queries = 100 # How many queries to end up with in total
queries_per_doc = 1 # Number of queries to generate per document
num_qrels_per_query = 1 # Number of qrels per document

num_queries = int(target_queries / queries_per_doc)  # How many calls to chatgpt we need to get to approx. that number

These options determine how many examples to show to ChatGPT for each query generation.
We believe a higher number of examples results in higher quality queries, but this costs more money.
In addition, if too many examples are shown, we may run out of tokens in the context window.
Setting **examples_per_doc** too high may cause example generation to fail if the original dataset
does not contain enough examples of documents having multiple queries.

In [None]:
num_examples = 3 # Number of example documents shown
examples_per_doc = 1 # Number of queries shown per example document

In [None]:
prompt = f"""
You are an AI system designed to write natural language search queries.
You will receive a document and write {queries_per_doc} {"queries" if queries_per_doc > 1 else "query"} for which this document is relevant.

### Rules ###
{rules}
"""

Useful information is written to a file to help document the experiment and make it reproducible.

In [None]:
info_file = experiment_dir / "info.txt"
info_file.parent.mkdir(parents=True, exist_ok=True)
info_file.touch()
with open(info_file, "w") as info:
    info.write(f"{prompt = }\n")
    info.write(f"{num_examples = }\n")
    info.write(f"{examples_per_doc = }\n")
    info.write(f"{queries_per_doc = }\n")
    info.write(f"{target_queries = }\n")
    info.write(f"{num_queries = }\n")
    info.write(f"{num_qrels_per_query = }\n")

To generate unique IDs for GPT-generated queries, we use the SHA256 hash function.
The chance of a collision should be extremely low, and in case there ever is one,
it probably wouldn't affect the rest of the process that much (famous last words).

In [None]:
import hashlib

def get_hash_id(seed: str) -> str:
    """
    :param seed: Value to be hashed. Should be unique. Could be query text.
    :return: The new query id
    """
    hash_object = hashlib.sha256(seed.encode())
    hex_dig = hash_object.hexdigest()

    return "GPT-" + hex_dig


### Query generation
Run the cells below to generate queries.
The generated queries and qrels are written to file continuously in case an error interrupts the process.

In [None]:
# Keep track of how many queries have been generated.
# If the query generation crashes, you can rerun the cell below to continue from where it crashed.
generated_so_far = 0

In [None]:
import ast

# Used to keep track of cost of query generation
prompt_tokens = []
completion_tokens = []

# Resume in case generation fails
num_queries = int((target_queries - generated_so_far) / queries_per_doc)

for idx, did in enumerate(get_random_doc_ids(num_queries)):
    random_examples = generate_random_examples(num_examples=num_examples, queries_per_ex_doc=examples_per_doc)
    messages = [
        {"role": "system", "content": prompt},
        *random_examples,
        {"role": "user", "content": str(documents[did])},
    ]

    chat_completion = completion_with_backoff(
        model="gpt-3.5-turbo",
        messages=messages
    )

    pt, ct, _ = chat_completion.to_dict_recursive()["usage"].values()
    prompt_tokens.append(pt)
    completion_tokens.append(ct)

    with open(experiment_dir / "gpt-queries", "a") as gpt_queries:
        with open(experiment_dir / "gpt-qrels", "a") as gpt_qrels:
            try:
                queries = ast.literal_eval(get_chat_completion_content(chat_completion)) # Get list from list literal. Should be safe (security wise), but can fail
                generated_so_far += 1
            except (ValueError, SyntaxError):
                print("Failed to parse generated queries, skipping this document.")
                continue  # Malformed output from ChatGPT. Skip to the next document
            print(idx)
            print(documents[did])
            for query in queries:
                qid = get_hash_id(query)
                gpt_queries.write(f"{qid}\t{query}\n")
                gpt_qrels.write(f"{qid} 0 {did} 1\n")

                print(query)
                if num_qrels_per_query > 1:
                    generate_qrels(qid, query, num_qrels_per_query, experiment_dir / f"gpt-qrels")

The mean number of tokens used is kept track of and written to a file.
This can be used to calculate the cost of query generation.

In [None]:
import numpy as np

mean_comp = np.mean(completion_tokens)
mean_prompt = np.mean(prompt_tokens)
print("Completions tokens:",mean_comp)
print("Prompt tokens:", mean_prompt)

tokens_file_path = experiment_dir / "tokens.txt"
tokens_file_path.touch()
with open(tokens_file_path, "w") as tokens_file:
    tokens_file.write(f"Completion tokens: {mean_comp}\n")
    tokens_file.write(f"Prompt tokens: {mean_prompt}")

### Post processing

Sometimes, it's useful to generate more qrels after queries have been generated, or to remove qrels after the fact.
The following snippets help with that.
Don't run this unless you know what you're doing.

Generate additional qrels.
These are written to a new file, in order to avoid cluttering the old one.
For training, you'll need to merge the two qrel files and filter out duplicates.

In [None]:
gen_qrels = False # Whether to generate more qrels or not
extra_qrels_per_query = 10 # per doc

def generate_extra_qrels(num_qrels: int, resume_from: int=0):
    """
    Generates more qrels for GPT-generated queries.
    :param num_qrels: How many qrels to generate per document
    :param resume_from: Ignores the first x documents. Useful if something crashes and you want to resume.
    :return:
    """
    with open(experiment_dir / "extra-qrels-info.txt", "w") as infof:
        infof.write(f"{num_qrels = }\n")

    qrel_in_file = experiment_dir / "gpt-qrels" # Existing qrels from inital generation
    qrel_out_file = experiment_dir / "gpt-qrels-extra"
    qrel_out_file.touch()
    with open(qrel_in_file, "r") as qrel_file:
        for i, qrel in enumerate(qrel_file.readlines()):
            if i < resume_from:
                continue
            print(i)
            mapping = QueryDocumentMapping.from_qrel(experiment_dir / "gpt-queries", qrel)
            generate_qrels(mapping.qid, mapping.query_text, qrels_per_query=num_qrels, qrels_file=qrel_out_file)

if gen_qrels:
    generate_extra_qrels(extra_qrels_per_query)

When generating a query, a qrel is automatically created.
In addition to this singular qrel, additional qrels can be generated with **generate_qrels()**.
This snippet removes all qrels except the one that was originally generated alongside with the query.
This can be useful for benchmarking, in case you want to see how the embedder would perform if you have fewer qrels.
The filtered qrels are written to a new file, so as not to mess with the original data.

In [None]:
do_filter_qrels = False

def filter_qrels(qrel_in_file, qrel_out_file):
    """Keep only the first qrel for each query."""
    seen = set() # Query ids for queries whose first qrel has been found
    with open(qrel_in_file, "r") as source_file:
        with open(qrel_out_file, "w") as output_file:
            for line in source_file:
                query_id = line.split()[0]
                if query_id not in seen:
                    print(f"{line = }")
                    output_file.write(line)
                    seen.add(query_id)

if do_filter_qrels:
    filter_qrels(experiment_dir / "gpt-qrels", experiment_dir / "gpt-one-qrel-per-query")