In [1]:
# Imports go here
from granite_io.io.query_rewrite import QueryRewriteIOProcessor
from granite_io.io.granite_3_2.input_processors.granite_3_2_input_processor import (
    Granite3Point2Inputs,
)
from granite_io.backend.vllm_server import LocalVLLMServer
from granite_io import make_io_processor, make_backend
from granite_io.io.base import ChatCompletionInputs, ChatCompletionResults

In [2]:
# Constants go here
base_model_name = "ibm-granite/granite-3.2-8b-instruct"
lora_model_name = "ibm-granite/granite-3.2-8b-lora-rag-query-rewrite"
run_server = False

In [None]:
if run_server:
    # Start by firing up a local vLLM server and connecting a backend instance to it.
    server = LocalVLLMServer(
        base_model_name, lora_adapters=[(lora_model_name, lora_model_name)]
    )
    server.wait_for_startup(200)
    query_rewrite_lora_backend = server.make_lora_backend(lora_model_name)
    backend = server.make_backend()
else:  # if not run_server
    # Use an existing server.
    # Modify the constants here as needed.
    openai_base_url = "http://localhost:55555/v1"
    # openai_base_url = "http://p1-r10-n4.bluevela.rmf.ibm.com:36101/v1"
    openai_api_key = "granite_intrinsics_1234"
    openai_base_model_name = base_model_name
    openai_lora_model_name = lora_model_name
    
    backend = make_backend(
        "openai",
        {
            "model_name": openai_base_model_name,
            "openai_base_url": openai_base_url,
            "openai_api_key": openai_api_key,
        },
    )
    query_rewrite_lora_backend = make_backend(
        "openai",
        {
            "model_name": openai_lora_model_name,
            "openai_base_url": openai_base_url,
            "openai_api_key": openai_api_key,
        },
    )

In [4]:
input_messages = [
    {
        "role": "assistant",
        "content": "Welcome to the California State Parks help desk.",
    },
    {
        "role": "user",
        "content": "I'm a student. Do you have internships?",
    },
    {
        "role": "assistant",
        "content": "The California State Parks hires Student Assistants "
        "to perform a variety of tasks that require limited or no previous "
        "work experience.",
    },
    {
        "role": "user", 
        "content": "Cool, how do I sign up?"
    },
]

def format_chat_history(messages):
    formatted = []
    for message in messages:
        role = message["role"]
        content = message["content"]
        formatted.append(f"{role}: {content}")
    return "\n".join(formatted)

chat_input = ChatCompletionInputs(messages=input_messages, generate_inputs={"temperature": 0.0,"max_tokens": 4096,})
print("Inputs for chat completion:", chat_input)

Inputs for chat completion: messages=[AssistantMessage(content='Welcome to the California State Parks help desk.', role='assistant', tool_calls=[], reasoning_content=None, citations=None, documents=None, hallucinations=None, stop_reason=None), UserMessage(content="I'm a student. Do you have internships?", role='user'), AssistantMessage(content='The California State Parks hires Student Assistants to perform a variety of tasks that require limited or no previous work experience.', role='assistant', tool_calls=[], reasoning_content=None, citations=None, documents=None, hallucinations=None, stop_reason=None), UserMessage(content='Cool, how do I sign up?', role='user')] tools=[] generate_inputs=GenerateInputs(prompt=None, model=None, best_of=None, echo=None, frequency_penalty=None, logit_bias=None, logprobs=None, max_tokens=4096, n=None, presence_penalty=None, stop=None, stream=None, stream_options=None, suffix=None, temperature=0.0, top_p=None, user=None, extra_headers=None, extra_body={})

In [5]:
# Spin up an IO processor for the base model
io_processor = make_io_processor(base_model_name, backend=backend)
rewrite_io_proc = QueryRewriteIOProcessor(query_rewrite_lora_backend)

In [None]:
from granite_io.io.base import InputOutputProcessor
from granite_io.types import (
    GenerateInputs, GenerateResults, ChatCompletionInputs, ChatCompletionResult, ChatCompletionResults, UserMessage
)
import asyncio

class QueryExpansionIOProcessor(InputOutputProcessor):
    def __init__(
        self,
        io_proc,
        rewrite_request_proc,
    ):
        self.io_proc = io_proc
        self.rewrite_request_proc = rewrite_request_proc
        

    async def acreate_chat_completion(
        self, inputs: ChatCompletionInputs
    ) -> ChatCompletionResults:
        
        coroutines = []
        #################### Run QUERY REWRITE LoRA ####################
        coroutines.append(self.rewrite_request_proc.acreate_chat_completion(inputs))        
        ####################Reformulate into Synonymous Query by prompting Granite####################
        input_conversation_string = format_chat_history(chat_input.model_dump()['messages'])
        print(f"Input Conversation: {input_conversation_string}")
        
        generate_inputs = GenerateInputs(max_tokens=512, top_p=1, temperature=1, stop="[[Input]]")
        generate_inputs.prompt = f"""You are given a multi-turn conversation between a user and an assistant. Reformulate the last-turn user query into a synonymous standalone query by replacing key terms with appropriate synonyms or closely related phrases, while preserving the original intent and meaning. This rewritten query will be used to retrieve relevant passages from a corpus, so it must remain faithful to the user's information need. Only output the rewritten query.\n\n[[Input]]\n{input_conversation_string}\n\n[[Output]]\n"""
        coroutines.append(backend.pipeline(generate_inputs))
        ####################Query Enrichment by prompting Granite####################
        generate_inputs = GenerateInputs(max_tokens=512, top_p=1, temperature=1)
        generate_inputs.prompt = f"""Your task is to generate a list of short, highly relevant technical keywords or search queries based on the conversation history, focusing on the user's last query. These keywords will be used to retrieve the most relevant technical passages. Use domain-specific terms, abbreviations, and key phrases where appropriate. Do not answer the question.\n\n[[Conversation]]\n{input_conversation_string}\n\n[[Search Keywords]]"""
        coroutines.append(backend.pipeline(generate_inputs))
        #################### Sample Answer from Granite ####################
        chat_input_ans_v1 = ChatCompletionInputs(messages=chat_input.messages, generate_inputs={"temperature": 1, "top_p": 1, "max_tokens": 512, })
        coroutines.append(self.io_proc.acreate_chat_completion(chat_input_ans_v1))
        ################################################################################
        print("coroutines", coroutines)
        
        # Merge results from parallel invocations
        sub_results = await asyncio.gather(*coroutines)
        print("sub_results", sub_results)
        print("QUERY REWRITE:", sub_results[0])
        print("SYNONMOUS QUERY:", sub_results[1])
        print("ENRICHED QUERIES:", sub_results[2])
        print("SAMPLED ANSWER:", sub_results[3])
        
        
        query_answer_v1 = sub_results[3].results[0].next_message.content
        print("query_answer_v1", query_answer_v1)
        ####################Reverse-Engineer the Question from the Sampled Answer####################
        generate_inputs = GenerateInputs(max_tokens=512, top_p=1, temperature=1, stop="[[Answer]]")
        generate_inputs.prompt = f"Generate a single question for the given answer.\n[[Answer]]\nAlbert Einstein was born in Germany.\n[[Question]]\nWhere was Albert Einstein born?\n[[Answer]]{query_answer_v1}\n[[Question]]\n"

        RevQ_v1_output = await backend.pipeline(generate_inputs)
        # RevQ_v1_output = RevQ_v1_output.results[0].completion_string
        # print(f"Reverse-Engineered Question: {RevQ_v1_output}")
        
        
        query_str_list = [
            sub_results[0].results[0].next_message.content,     #0
            sub_results[1].results[0].completion_string,        #1
            sub_results[2].results[0].completion_string,        #2
            sub_results[3].results[0].next_message.content,     #3
            RevQ_v1_output.results[0].completion_string,        #4
        ]
        
        print("\nList of Query Strings:", query_str_list)    
        
        results = []
        for cur_query in query_str_list:
            results.append(ChatCompletionResult(next_message=UserMessage(content=cur_query)))
        print("results", results)
        
        return ChatCompletionResults(results=results)
    

rag_io_proc = QueryExpansionIOProcessor(
    io_processor,
    rewrite_request_proc=rewrite_io_proc,
)

qe_result = rag_io_proc.create_chat_completion(chat_input)
print(qe_result)

qe_result_strs = [r.next_message.content for r in qe_result.results]
print("\nQuery Expansion Results:")
for i, result in enumerate(qe_result_strs):
    print(f"Result {i + 1}: {result}")


Input Conversation: assistant: Welcome to the California State Parks help desk.
user: I'm a student. Do you have internships?
assistant: The California State Parks hires Student Assistants to perform a variety of tasks that require limited or no previous work experience.
user: Cool, how do I sign up?
coroutines [<coroutine object ModelDirectInputOutputProcessorWithGenerate.acreate_chat_completion at 0x7f130e3d7680>, <coroutine object Backend.pipeline at 0x7f130e3d7530>, <coroutine object Backend.pipeline at 0x7f130e3d7610>, <coroutine object ModelDirectInputOutputProcessor.acreate_chat_completion at 0x7f130e3d76f0>]
sub_results [ChatCompletionResults(results=[ChatCompletionResult(next_message=UserMessage(content='How do I sign up for the Student Assistant program at California State Parks?', role='user'))]), GenerateResults(results=[GenerateResult(completion_string='How to apply for the student assistant program at California State Parks?', completion_tokens=[], stop_reason='stop')]), 

In [33]:
print(qe_result)
for x in qe_result.results:
    print(x.next_message.content)
    
print(qe_result.results[0].next_message.model_dump_json(indent=2))    

results=[ChatCompletionResult(next_message=UserMessage(content='How do I sign up for the Student Assistant program at California State Parks?', role='user')), ChatCompletionResult(next_message=UserMessage(content='How do I apply for a Student Assistant position at California State Parks?', role='user')), ChatCompletionResult(next_message=UserMessage(content='\n\n 1. California State Parks internship application\n 2. Student Assistant positions\n 3. Prerequisite-free internship programs\n 4. Applying for California State Parks internships\n 5. California State Parks student job opportunities\n 6. Internship requirements for California State Parks\n 7. How to become a Student Assistant\n 8. Submitting an application for California State Parks internships\n 9. Internship application process for California State Parks\n 10. Job application for Student Assistants in California State Parks', role='user')), ChatCompletionResult(next_message=UserMessage(content='California State Parks typicall