### Haystack Deepset Integration

The notebook shows how to integrate Deepsets Haystack with Aana SDK. `HaystackComponentDeployment` is a class that wraps the deployment that allows to deploy Haystack components as a separate deployment. This is quite useful for deploying components that represent deep learning models. This has a few advantages:
- It allows to deploy the model only once and reuse it from multiple Haystack Pipelines. This leads to more efficient resource usage like GPU memory.
- It allows you to scale Haystack Pipelines to a cluster of machines with minimal effort. 

In [1]:
import os

os.environ["CUDA_VISIBLE_DEVICES"] = "0"

Create Aana SDK and connect to the cluster.

In [2]:
from aana.sdk import AanaSDK


aana_app = AanaSDK().connect()

  from .autonotebook import tqdm as notebook_tqdm
2024-06-24 13:53:56,920	INFO util.py:154 -- Missing packages: ['ipywidgets']. Run `pip install -U ipywidgets`, then restart the notebook server for rich notebook output.

2024-06-24 13:54:01,948	INFO worker.py:1740 -- Started a local Ray instance. View the dashboard at [1m[32m127.0.0.1:8265 [39m[22m


Deploy Haystack components for embeddings

In [3]:
from aana.deployments.haystack_component_deployment import (
    HaystackComponentDeployment,
    HaystackComponentDeploymentConfig,
)

Deploy text embedding model

In [4]:
text_embedder_deployment = HaystackComponentDeployment.options(
    num_replicas=1,                                                                 # Number of replicas for the component
    ray_actor_options={"num_gpus": 0.25},                                           # Allocate 0.25 GPU, should be > 0 if the component requires GPU.
    user_config=HaystackComponentDeploymentConfig(
        component="haystack.components.embedders.SentenceTransformersTextEmbedder", # Path to the Haystack component class
        params={"model": "sentence-transformers/all-mpnet-base-v2"},                # Parameters of the Haystack component class
    ).model_dump(),
)
aana_app.register_deployment(
    name="text_embedder_deployment",    # Name of the deployment, which will be using to access the deployment
    instance=text_embedder_deployment,  # Instance of the deployment that we just created above
    deploy=True                         # Tell Aana to deploy the component immediately instead of waiting `aana_app.deploy()`
)

The new client HTTP config differs from the existing one in the following fields: ['location']. The new HTTP config is ignored.
2024-06-24 13:54:08,899	INFO handle.py:126 -- Created DeploymentHandle '0ie8tjwu' for Deployment(name='HaystackComponentDeployment', app='text_embedder_deployment').
2024-06-24 13:54:08,901	INFO handle.py:126 -- Created DeploymentHandle 'fu08b8jl' for Deployment(name='HaystackComponentDeployment', app='text_embedder_deployment').
2024-06-24 13:54:16,961	INFO handle.py:126 -- Created DeploymentHandle 'bs4h9h59' for Deployment(name='HaystackComponentDeployment', app='text_embedder_deployment').
2024-06-24 13:54:16,963	INFO api.py:584 -- Deployed app 'text_embedder_deployment' successfully.


Deploy document embeddings model similar to the text embeddings model

In [5]:
document_embedder_deployment = HaystackComponentDeployment.options(
    num_replicas=1,
    max_concurrent_queries=1000,
    ray_actor_options={"num_gpus": 0.25},
    user_config=HaystackComponentDeploymentConfig(
        component="haystack.components.embedders.SentenceTransformersDocumentEmbedder",
        params={"model": "sentence-transformers/all-mpnet-base-v2"},
    ).model_dump(),
)
aana_app.register_deployment(
    name="document_embedder_deployment", 
    instance=document_embedder_deployment, 
    deploy=True
)

The new client HTTP config differs from the existing one in the following fields: ['location']. The new HTTP config is ignored.
2024-06-24 13:54:16,999	INFO handle.py:126 -- Created DeploymentHandle 'eg65u5ye' for Deployment(name='HaystackComponentDeployment', app='document_embedder_deployment').
2024-06-24 13:54:17,001	INFO handle.py:126 -- Created DeploymentHandle 'kie5azxd' for Deployment(name='HaystackComponentDeployment', app='document_embedder_deployment').
2024-06-24 13:54:26,078	INFO handle.py:126 -- Created DeploymentHandle 'azhn85p7' for Deployment(name='HaystackComponentDeployment', app='document_embedder_deployment').
2024-06-24 13:54:26,080	INFO api.py:584 -- Deployed app 'document_embedder_deployment' successfully.


Now you can replace original Haystack components with Remote components that run on the cluster and can be reused by different endpoints and workers.

In [6]:
from haystack import Document, Pipeline
from haystack.components.retrievers.in_memory import InMemoryEmbeddingRetriever
from haystack.document_stores.in_memory import InMemoryDocumentStore

from aana.deployments.haystack_component_deployment import RemoteHaystackComponent

document_store = InMemoryDocumentStore(embedding_similarity_function="cosine")

documents = [
    Document(content="My name is Wolfgang and I live in Berlin"),
    Document(content="I saw a black horse running"),
    Document(content="Germany has many big cities"),
]

document_embedder = RemoteHaystackComponent(
    "document_embedder_deployment"
)  # instead of SentenceTransformersDocumentEmbedder()
document_embedder.warm_up()
documents_with_embeddings = document_embedder.run(documents=documents)["documents"]
document_store.write_documents(documents_with_embeddings)

text_embedder = RemoteHaystackComponent(
    "text_embedder_deployment"
)  # SentenceTransformersTextEmbedder()
text_embedder.warm_up()

query_pipeline = Pipeline()
query_pipeline.add_component("text_embedder", text_embedder)
query_pipeline.add_component(
    "retriever", InMemoryEmbeddingRetriever(document_store=document_store)
)
query_pipeline.connect("text_embedder.embedding", "retriever.query_embedding")

query = "Who lives in Berlin?"

result = query_pipeline.run({"text_embedder": {"text": query}})

print(result["retriever"]["documents"][0])

# Document(id=..., mimetype: 'text/plain',
#  text: 'My name is Wolfgang and I live in Berlin')

2024-06-24 13:54:26,131	INFO handle.py:126 -- Created DeploymentHandle 'lwf329xw' for Deployment(name='HaystackComponentDeployment', app='document_embedder_deployment').
2024-06-24 13:54:26,142	INFO pow_2_scheduler.py:260 -- Got updated replicas for Deployment(name='HaystackComponentDeployment', app='document_embedder_deployment'): {'y190ojim'}.


2024-06-24 13:54:26,170	INFO handle.py:126 -- Created DeploymentHandle 'jc770375' for Deployment(name='HaystackComponentDeployment', app='document_embedder_deployment').
2024-06-24 13:54:26,184	INFO handle.py:126 -- Created DeploymentHandle 'wad4u0zw' for Deployment(name='HaystackComponentDeployment', app='document_embedder_deployment').
2024-06-24 13:54:26,426	INFO handle.py:126 -- Created DeploymentHandle '726yltn1' for Deployment(name='HaystackComponentDeployment', app='text_embedder_deployment').
2024-06-24 13:54:26,437	INFO pow_2_scheduler.py:260 -- Got updated replicas for Deployment(name='HaystackComponentDeployment', app='text_embedder_deployment'): {'muzsj4w4'}.
2024-06-24 13:54:26,458	INFO handle.py:126 -- Created DeploymentHandle 'meuxfpwo' for Deployment(name='HaystackComponentDeployment', app='text_embedder_deployment').
2024-06-24 13:54:26,478	INFO handle.py:126 -- Created DeploymentHandle '7juh3dpv' for Deployment(name='HaystackComponentDeployment', app='text_embedder_de

Document(id=62fad790ad2af927af9432c87330ed2ea5e31332cdec8e9d6235a5105ab0aaf5, content: 'My name is Wolfgang and I live in Berlin', score: 0.5515621736829024)


And we can package it as endpoint and deploy.

In [7]:
from typing import TypedDict

from haystack import Document, Pipeline
from haystack.components.retrievers.in_memory import InMemoryEmbeddingRetriever
from haystack.document_stores.in_memory import InMemoryDocumentStore

from aana.api.api_generation import Endpoint


class HaystackTestEndpointOutput(TypedDict):
    """Output of the HaystackTestEndpoint."""
    response: str


class HaystackTestEndpoint(Endpoint):
    """Endpoint to deploy a Haystack pipeline."""
    async def initialize(self):
        """Initialize the endpoint by creating a Haystack pipeline."""
        document_store = InMemoryDocumentStore(embedding_similarity_function="cosine")

        documents = [
            Document(content="My name is Wolfgang and I live in Berlin"),
            Document(content="I saw a black horse running"),
            Document(content="Germany has many big cities"),
        ]

        document_embedder = RemoteHaystackComponent("document_embedder_deployment")
        document_embedder.warm_up()
        documents_with_embeddings = document_embedder.run(documents=documents)[
            "documents"
        ]
        document_store.write_documents(documents_with_embeddings)

        text_embedder = RemoteHaystackComponent("text_embedder_deployment")
        text_embedder.warm_up()

        self.query_pipeline = Pipeline()
        self.query_pipeline.add_component("text_embedder", text_embedder)
        self.query_pipeline.add_component(
            "retriever", InMemoryEmbeddingRetriever(document_store=document_store)
        )
        self.query_pipeline.connect(
            "text_embedder.embedding", "retriever.query_embedding"
        )
        super().initialize()

    async def run(self, query: str) -> HaystackTestEndpointOutput:
        """Query the pipeline with the given text."""
        result = self.query_pipeline.run({"text_embedder": {"text": query}})
        return {"response": result["retriever"]["documents"][0].content}

Now we register the endpoint with Aana and deploy it.

In [8]:
aana_app.register_endpoint(
    name="haystack_test_endpoint",
    summary="A test endpoint for Haystack",
    path="/query",
    endpoint_cls=HaystackTestEndpoint,
)

In [9]:
aana_app.deploy(blocking=False)

The new client HTTP config differs from the existing one in the following fields: ['location']. The new HTTP config is ignored.
2024-06-24 13:54:26,801	INFO handle.py:126 -- Created DeploymentHandle 'hg71i9xj' for Deployment(name='TaskQueueDeployment', app='task_queue_deployment').
2024-06-24 13:54:26,802	INFO handle.py:126 -- Created DeploymentHandle '3yojks31' for Deployment(name='TaskQueueDeployment', app='task_queue_deployment').
2024-06-24 13:54:29,839	INFO handle.py:126 -- Created DeploymentHandle 'zrroj18d' for Deployment(name='TaskQueueDeployment', app='task_queue_deployment').
2024-06-24 13:54:29,840	INFO api.py:584 -- Deployed app 'task_queue_deployment' successfully.
The new client HTTP config differs from the existing one in the following fields: ['location']. The new HTTP config is ignored.
2024-06-24 13:54:29,891	INFO handle.py:126 -- Created DeploymentHandle 'apqinbvo' for Deployment(name='RequestHandler', app='app').
2024-06-24 13:54:29,892	INFO handle.py:126 -- Created

Let's try a few requests.

In [10]:
import requests, json

data = {"query": "Who lives in Berlin?"}
response = requests.post(
    "http://127.0.0.1:8000/query",
    data={"body": json.dumps(data)},
)
print(response.json())

{'response': 'My name is Wolfgang and I live in Berlin'}


In [11]:
import requests, json

data = {"query": "What is the interesting fact about Germany?"}
response = requests.post(
    "http://127.0.0.1:8000/query",
    data={"body": json.dumps(data)},
)
print(response.json())

{'response': 'Germany has many big cities'}


Works! The Haystack pipeline is deployed successfully and the query is answered correctly. Now you have a Haystack pipeline deployed with Aana! 🚀
You can scale it to a cluster of machines or extend it with more components and pipelines.