In [None]:
from typing import Optional, Any, Dict
from operator import add
import requests, json
from starlette.requests import Request
import numpy as np
import torch

from sentence_transformers import SentenceTransformer
from langchain.embeddings.base import Embeddings
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain.vectorstores import FAISS
from langchain.document_loaders import WikipediaLoader
from langchain import HuggingFacePipeline
from langchain.chains.question_answering import load_qa_chain
from langchain.prompts import PromptTemplate

from transformers import pipeline as hf_pipeline

import ray
from ray import serve

# Productionizing LLM Q&A Application with Ray Serve

In this notebook, we'll see how to productionize our Q&A application and its database service

<div class="alert alert-block alert-info">

__Initial Productionizing Roadmap__
1. Port the vector db service to a Ray Serve deployment and test it out
1. Port the Q&A service to a deployment and integrate with the vector db service
1. Extend the vector db service:
    1. Specify an autoscaling configuration so that the service can adjust to traffic load
    1. Build the index faster using Ray tasks to scale out
1. Extend the Q&A service by adding replicas (service instances)
</div>

<img src='https://technical-training-assets.s3.us-west-2.amazonaws.com/LLMs/QA_App.png' width=700/>

## Create a Ray Serve deployment for vector db functionality

In [None]:
class LocalHuggingFaceEmbeddings(Embeddings):
    def __init__(self, model_id):
        self.model = SentenceTransformer(model_id)

    def embed_documents(self, texts: list[str]) -> list[list[float]]:
        embeddings = self.model.encode(texts)
        return embeddings

    def embed_query(self, text: str) -> list[float]:
        embedding = self.model.encode(text)
        return list(map(float, embedding))

__To convert our vector db logic into a Serve deployment__

<div class="alert alert-block alert-info">
    
1. Wrap the ad-hoc login in a Python class
1. Add the `@serve.deployment` decorator
1. Add conditional logic to create the index and store it locally if it has not already been created on the node
1. Prepare to run the service -- effectively set up a constructor call -- by creating a "bound deployment"

</div>

In [None]:
# place logic inside a Python class and add Ray Serve deployment decorator
@serve.deployment
class VectorDBDeployment:
    FAISS_INDEX_PATH = "/home/ray/faiss_index"

    def __init__(self):
        self.embeddings = LocalHuggingFaceEmbeddings("multi-qa-mpnet-base-dot-v1")

        # try to load the index from the local node's filesystem
        try:
            self.db = FAISS.load_local(self.FAISS_INDEX_PATH, self.embeddings)
        except:
            # if the index is not local, run the setup logic
            self.setup_db()

    def setup_db(self):
        topics = ["The Eras Tour", "2023 XFL season"]
        loaders = [WikipediaLoader(query=topic, load_max_docs=20) for topic in topics]
        text_splitter = RecursiveCharacterTextSplitter(chunk_size=300, chunk_overlap=20, length_function=len,)
        docs = add(*[loader.load() for loader in loaders])
        print([d.metadata["title"] for d in docs])
        chunks = text_splitter.create_documents(
            [doc.page_content for doc in docs], metadatas=[doc.metadata for doc in docs]
        )
        self.db = FAISS.from_documents(chunks, self.embeddings)
        self.db.save_local(self.FAISS_INDEX_PATH)

    def similarity_search(self, query):
        return self.db.similarity_search(query)

# create a "bound deployment" -- a wrapped (deferred) call to the constructor, which will be invoked by Ray Serve
vecdb_deployment = VectorDBDeployment.bind()

We can test this service out by itself by launching a Ray Serve application

In [None]:
handle = serve.run(vecdb_deployment, name="db")

Run a quick test of this module by itself

In [None]:
ray.get(handle.similarity_search.remote("When did the XFL start?"))

We'll shut this down, since the db service will not be public in our final app

In [None]:
serve.delete("db")

## Create a Ray Serve deployment for Q&A service functionality

We'll prepare the helper code and data (e.g., the prompt template)

In [None]:
class StableLMPipeline(HuggingFacePipeline):
    # Class is temporary, we are working with the authors of LangChain to make these unnecessary.

    def _call(self, prompt: str, stop: Optional[list[str]] = None) -> str:
        response = self.pipeline(prompt, temperature=0.1, max_new_tokens=256, do_sample=True)
        print(f"Response is: {response}")
        text = response[0]["generated_text"][len(prompt) :]
        return text

    @classmethod
    def from_model_id(cls, model_id: str, task: str, device: Optional[str] = None, model_kwargs: Optional[dict] = None, **kwargs: Any):
        pipeline = hf_pipeline(model=model_id, task=task, device=device, model_kwargs=model_kwargs,)
        return cls(pipeline=pipeline, model_id=model_id, model_kwargs=model_kwargs, **kwargs,)

In [None]:
template = """
<|SYSTEM|># StableLM Tuned (Alpha version)
- You are a helpful, polite, fact-based agent for answering questions. 
- Your answers include enough detail for someone to follow through on your suggestions. 
<|USER|>
If you don't know the answer, just say that you don't know. Don't try to make up an answer.
Please answer the following question using the context provided. 

CONTEXT: 
{context}
=========
QUESTION: {question} 
ANSWER: <|ASSISTANT|>"""

PROMPT = PromptTemplate(template=template, input_variables=["context", "question"])

__To create a Ray Serve deployment from our existing QA class__
<div class="alert alert-block alert-info">

1. Add the `@serve.deployment` decorator
1. Specify `num_gpus` as 1.0 to ensure that this deployment always has access to a GPU (and all of its memory)
1. Add the vector db service as a contructor param
1. Prepare to provide the db service to the deployment constructor by passing it into `.bind(...)`

</div>

In [None]:
# add decorator and specify GPU resource requirement
@serve.deployment(ray_actor_options={"num_gpus": 1.0})
class QADeployment:
    # take vector db bound deployment instance as a constructor param
    def __init__(self, db):
        self.embeddings = LocalHuggingFaceEmbeddings("multi-qa-mpnet-base-dot-v1")
        self.db = db
        self.llm = StableLMPipeline.from_model_id(
            model_id="stabilityai/stablelm-tuned-alpha-7b",
            task="text-generation",
            model_kwargs={
                "torch_dtype": torch.float16,
                "device_map": "auto",
                "cache_dir": "/mnt/local_storage",
            },
        )
        self.chain = load_qa_chain(llm=self.llm, chain_type="stuff", prompt=PROMPT)

    async def qa(self, query):
        # when we run the QADeployment, the vector db bound deployment will become a "live" serve handle, so that we can call remote methods on it
        search_results_ref = await self.db.similarity_search.remote(query)

        # return value from the call is a Ray ObjectRef (future/promise) and we await it to get the actual Python object we need
        search_results = await search_results_ref
        print(f"Results from db are: {search_results}")
        result = self.chain({"input_documents": search_results, "question": query})
        print(f"Result is: {result}")
        return result["output_text"]

# create bound deployment, taking the vector db bound deployment as a parameter
qa_deployment = QADeployment.bind(vecdb_deployment)

And that's it for an initial port to Ray Serve!

Let's try it out

In [None]:
handle = serve.run(qa_deployment, name="qa")

In [None]:
ray.get(handle.qa.remote("How many people live in San Francisco?"))

In [None]:
ray.get(handle.qa.remote("When did Taylor Swift's Eras tour start?"))

In [None]:
serve.delete("qa")

## Extend the db service for performance and scale

<div class="alert alert-block alert-info">
To provide multiple replicas -- and autoscaling -- of the db service, add an autoscaling config to the deployment decorator
<br/><br/>
We can also speed up the index build by splitting up ("sharding") the array of document vectors and defining a Ray Task to create part of the index from that shard. 

1. Add the `@ray.remote` decorator to the `process_shard` Python function makes it schedulable by Ray
1. Call `process_shard.remote(...)` tells Ray to schedule these tasks -- ideally all in parallel (limited only by our compute capacity)
1. Use `ray.get(futures)` to wait for all of the shards to be processes into index chunks
1. Merge chunks using a local `for` loop, and write the results to disk
</div>

In [None]:
# set the vector db service to autoscale, starting with 2 replicas and scaling between 1 and 5
@serve.deployment(
    autoscaling_config={"min_replicas": 1, "initial_replicas": 2, "max_replicas": 5}
)
class ParallelBuildVectorDBDeployment:
    FAISS_INDEX_PATH = "/home/ray/faiss_dist_built_index"

    def __init__(self):
        self.embeddings = LocalHuggingFaceEmbeddings("multi-qa-mpnet-base-dot-v1")
        try:
            self.db = FAISS.load_local(self.FAISS_INDEX_PATH, self.embeddings)
        except:
            self.setup_db()

    def setup_db(self):
        topics = ["The Eras Tour", "2023 XFL season"]
        loaders = [WikipediaLoader(query=topic, load_max_docs=20) for topic in topics]
        text_splitter = RecursiveCharacterTextSplitter(chunk_size=300, chunk_overlap=20, length_function=len)
        docs = add(*[loader.load() for loader in loaders])
        chunks = text_splitter.create_documents(
            [doc.page_content for doc in docs], metadatas=[doc.metadata for doc in docs]
        )
        
        # split dataset into chunks for parallel processing
        db_shards = 8
        print(f"Loading chunks into vector store ... using {db_shards} shards")
        shards = np.array_split(chunks, db_shards)

        # create a Ray task to generate embeddingfor a single chunk
        @ray.remote
        def process_shard(shard):
            embeddings = LocalHuggingFaceEmbeddings("multi-qa-mpnet-base-dot-v1")
            result = FAISS.from_documents(shard, embeddings)
            return result

        #schedule chunk processing on Ray and obtain ObjectRefs (futures/promises)
        futures = [process_shard.remote(shards[i]) for i in range(db_shards)]
        
        # wait for all chunks to be finished & retrieve Python objects
        results = ray.get(futures)
        
        # combine chunks locally
        self.db = results[0]
        for i in range(1, db_shards):
            self.db.merge_from(results[i])
            
        self.db.save_local(self.FAISS_INDEX_PATH)

    def similarity_search(self, query):
        return self.db.similarity_search(query)


vecdb_deployment = ParallelBuildVectorDBDeployment.bind()

We'll spin up the whole application -- but this time using our new, faster definition of the db service

In [None]:
qa_deployment = QADeployment.bind(vecdb_deployment)
handle = serve.run(qa_deployment, name="qa")

In [None]:
ray.get(handle.qa.remote("How many people live in San Francisco?"))

In [None]:
serve.delete("qa")

## Extend the Q&A service for perf, scale and HTTP traffic

<div class="alert alert-block alert-info">
Specifying additional replicas in the decorator and add a JSON/HTTP handler in preparation for production deployment
</div>

In [None]:
# specify 2 replicas of this service (in place of the default 1 replica)
@serve.deployment(ray_actor_options={"num_gpus": 1.0}, num_replicas=2)
class QADeployment:
    def __init__(self, db):
        self.embeddings = LocalHuggingFaceEmbeddings("multi-qa-mpnet-base-dot-v1")
        self.db = db
        self.llm = StableLMPipeline.from_model_id(model_id="stabilityai/stablelm-tuned-alpha-7b", task="text-generation",
            model_kwargs={"torch_dtype": torch.float16, "device_map": "auto", "cache_dir": "/mnt/local_storage",},)
        self.chain = load_qa_chain(llm=self.llm, chain_type="stuff", prompt=PROMPT)

    async def qa(self, query):
        search_results_ref = await self.db.similarity_search.remote(query)
        search_results = await search_results_ref
        result = self.chain({"input_documents": search_results, "question": query})
        return result["output_text"]

    # add a handler for HTTP requests    
    async def __call__(self, request: Request) -> Dict:

        # decode incoming request as JSON
        data = await request.json()
        data = json.loads(data)

        # call into existing qa method implementation and await async output
        output = await self.qa(data["user_input"])
        return {"result": output}

Start the service and test via Python

In [None]:
qa_deployment = QADeployment.bind(vecdb_deployment)
handle = serve.run(qa_deployment, name="qa")

In [None]:
ray.get(handle.qa.remote("How many people live in San Francisco?"))

In production, we expect to receive HTTP traffic, so we'll make sure that execution path works

In [None]:
message = "When did Taylor Swift's Eras tour start?"

json_doc = json.dumps({"user_input": message})

requests.post("http://localhost:8000/", json=json_doc).json()

In [None]:
serve.delete("qa")

## Discussion ideas: extending the architecture

We might want to extend our current vector db service in several ways
* Allow indexing additional topics/documents without rebuilding the entire index and service
* Ensure that all replicas of the db server are using the same (and most up-to-date) index

The FAISS architecture as a non-distributed database makes this an interesting project: we can use Ray's Actor API and Object Store to create a single service that manages updating the index and providing the authoritative index to all vector db deployment replica.

While the local FAISS architecture may not reach the performance of a fully-distributed-by-design vector database, using Ray allows us to decouple our document-serving architecture from our index update service, and lets us choose how to balance read and write performance as we consider moving to a scale-out vector db such as Milvus.