## RayPipeline with detached component actors 

When you run `RayPipeline` under the hood it creates [ray actors](https://docs.ray.io/en/latest/ray-core/actors.html) for each component in the pipeline (`ComponentActor` instances). 
`ComponentActor` exposes `run_component` method which can be remotely executed by `RayPipelineProcessor` - another actor which orchestrates pipeline execution logic (e.g. runs components according to the order defined by pipeline connections). Lifetime of Haystack component instance is same as actor's. Please notice each Haystack component might have its own logic which could be not idempotent - we can not control it.

In case Haystack component requires `warm_up` - it will perform it during component's first run (first time `run_component` is called). Sometimes `warm_up` might be an expensive operation and ideally we would like to avoid it when we call `run_component` again. Internally ComponentActor will store a flag in case component has already been warmed up so that next time it runs we skip this step.

Generally, whenever you run pipeline with `pipeline.run` the following happens:

1. Pipeline is validated for correctness, along with given inputs if any
2. `RayPipelineProcessor` actor is created
3. `ComponentActor` is created for each component in pipeline
4. `RayPipelineProcessor` starts pipeline execution
5. When pipeline execution finishes all actors will be destroyed by default. Next time you call `pipeline.run` steps will be repeated, so **actor instances will be re-created**. 

In cases you would like to keep component actors alive between sequential pipeline runs that can be achieved with [Named Actors](https://docs.ray.io/en/latest/ray-core/actors/named-actors.html) and "detached" [Actor Lifetimes](https://docs.ray.io/en/latest/ray-core/actors/named-actors.html#actor-lifetimes). `ray-haystack` allows you to control actor names and lifetimes with `RayPipelineSettings`. So in order to reuse actor instances between pipeline runs we could configure actors as "detached". Detached actors are not destroyed until manually killed with `ray.kill` or cluster shutdown.

Lets explore "detached" named component actors in this notebook.

### Install Dependencies

In [None]:
%%bash

pip install "ray[default]" # Ray library with dashboard included
pip install ray-haystack
pip install "datasets>=2.6.1" # Required by pipeline
pip install "sentence-transformers>=3.0.0" # Required by pipeline

### Running a pipeline with default settings

Setup required env variables

In [2]:
import os
from getpass import getpass

if "OPENAI_API_KEY" not in os.environ:
    os.environ["OPENAI_API_KEY"] = getpass("Enter OpenAI API key:")

Start local Ray cluster

In [None]:
import ray

# Shutdown cluster if any is running at the moment
if ray.is_initialized():
    ray.shutdown()

ray.init()

Prepare DocumentStore by writing Documents with embeddings.

`RayInMemoryDocumentStore` is actually runs within an actor behind the scenes so we could share a single instance of it in Ray cluster

In [None]:
from datasets import load_dataset
from haystack.components.embedders import SentenceTransformersDocumentEmbedder
from haystack.dataclasses import Document

from ray_haystack.components import RayInMemoryDocumentStore

# RayInMemoryDocumentStore is a singleton with one Actor created in a cluster
document_store = RayInMemoryDocumentStore()

doc_embedder = SentenceTransformersDocumentEmbedder(model="sentence-transformers/all-MiniLM-L6-v2")
doc_embedder.warm_up()

dataset = load_dataset("bilgeyucel/seven-wonders", split="train")
docs = [Document(content=doc["content"], meta=doc["meta"]) for doc in dataset]

docs_with_embeddings = doc_embedder.run(docs)
document_store.write_documents(docs_with_embeddings["documents"])

Create RayPipeline for basic RAG

In [None]:
from haystack.components.builders.prompt_builder import PromptBuilder
from haystack.components.embedders import (
    SentenceTransformersTextEmbedder,
)
from haystack.components.generators import OpenAIGenerator

from ray_haystack import RayPipeline
from ray_haystack.components import RayInMemoryEmbeddingRetriever

template = """
Given the following information, answer the question.

Context:
{% for document in documents %}
    {{ document.content }}
{% endfor %}

Question: {{question}}
Answer:
"""

text_embedder = SentenceTransformersTextEmbedder(model="sentence-transformers/all-MiniLM-L6-v2")
retriever = RayInMemoryEmbeddingRetriever(document_store) # document_store is created above
generator = OpenAIGenerator(model="gpt-3.5-turbo")

prompt_builder = PromptBuilder(template=template)

basic_rag_pipeline = RayPipeline()

basic_rag_pipeline.add_component("text_embedder", text_embedder)
basic_rag_pipeline.add_component("retriever", retriever)
basic_rag_pipeline.add_component("prompt_builder", prompt_builder)
basic_rag_pipeline.add_component("llm", generator)

basic_rag_pipeline.connect("text_embedder.embedding", "retriever.query_embedding")
basic_rag_pipeline.connect("retriever", "prompt_builder.documents")
basic_rag_pipeline.connect("prompt_builder", "llm")

Run pipeline with named actors

We will start pipeline ensuring that actors will get same name as components. By default actors are not named, but with pipeline settings we can instruct pipeline manager component actor names (same as component name in pipeline)

Note: While pipeline is running navigate to http://127.0.0.1:8265/#/actors to see all actors being created for component execution, and then destroyed when pipeline finishes 

In [None]:
from ray_haystack import RayPipelineSettings

pipeline_settings: RayPipelineSettings = {"components": {"use_component_name_for_actor": True}}

question = "What does Rhodes Statue look like?"

basic_rag_pipeline.run(
    {"text_embedder": {"text": question}, "prompt_builder": {"question": question}},
    ray_settings=pipeline_settings,
)

Lets check if any actor is still available after pipeline finishes execution, which should result in error
("Failed to look up actor with name 'llm'.")

In [None]:
# This will result in exception as actor does not exist (in DEAD state)

ray.get_actor(name="llm")

Note: There could be some delay until actor is actually destroyed by Ray (e.g. have gone out of scope in Python). It could be a problem if you immediately create actor with same name (with "non_detached" lifetime) - it will result in error as no actors with same name can exist in same namespace. 

### Pipeline with detached component actors

`RayPipeline` can be configured with particular [actor options](https://docs.ray.io/en/latest/ray-core/api/doc/ray.actor.ActorClass.options.html) for all components within the pipeline.

So it should be possible to specify the lifetime of actors in case we don't want those to be destroyed when pipeline finishes.

Lets create same pipeline using settings to detach component actors and reuse same actor instances next time pipeline runs

In [8]:
from ray_haystack import RayPipelineSettings

ray_settings: RayPipelineSettings = {
    "components": {
        "actor_options": {
            "lifetime": "detached",  # Don't kill the actor after pipeline finishes
            "get_if_exists": True,  # When creating actor get one that already exists (with same name)
        },
        "use_component_name_for_actor": True, # Use component name for actor
    }
}

Run pipeline with common component actor settings

In [None]:
question = "Why did people build Great Pyramid of Giza?"

basic_rag_pipeline.run(
    {"text_embedder": {"text": question}, "prompt_builder": {"question": question}},
    ray_settings=ray_settings,
)

Get actor by name, it should be still available in detached state

In [None]:
ray.get_actor(name="llm")

Lets run pipeline once again with same settings, it should reuse previously created component actors

In [None]:
question = "How did Colossus of Rhodes collapse?"

basic_rag_pipeline.run(
    {"text_embedder": {"text": question}, "prompt_builder": {"question": question}}, ray_settings=ray_settings
)

The above pipeline execution should reuse component actors, potentially improving performance as there is no additional overhead. It should be quite useful when component needs to "warm up" and we would like to avoid repeating this step each time `run` method is called.

Note: You should no longer see the "Warming up component text_embedder..." coming from SentenceTransformersTextEmbedder component as it has previously warmed up during first run (considering it is detached and named ofc)

#### Terminate detached actors

According to [documentation](https://docs.ray.io/en/latest/ray-core/actors/terminating-actors.html) detached actors must be manually destroyed. 

So you will need a way to clean up actors so that those do not get stuck running and consuming resources once you no longer need working/running a pipeline.

One way to do it is to get actor handle and then use `ray.kill` util to terminate the actor, e.g.

```py
actor = ray.get_actor("component_name") # Assuming you did not change default name for the actor
ray.kill(actor)
```

Below is a more generic version of actor "cleanup" logic:

In [None]:
# This code is just an example how to iterate over component names and kill actors assuming all have been detached
# No need to run as below you will find a more generic and recommended way to terminate actors
# for component_name in basic_rag_pipeline.get_component_names():
#     ray.kill(ray.get_actor(component_name))

Note: If actor can not be found by name `ray.get_actor` will raise an error (ValueError). 

Please also consider cases when you manually provide name for a component actor - a custom name can be given using `actor_options` which does not have to be the same as component name in pipeline. In such case you will not be able to retrieve actor by component name (e.g. `ray.get_actor("component_name")`)

The `cleanup` method provided by `RayPipeline` can be used to kill detached actors as internally it tracks such actors regardless of its name

In [14]:
basic_rag_pipeline.cleanup() # kill actors belonging to the pipeline which were created as detached

In [None]:
# This should no longer work after `cleanup`
ray.get_actor(name="llm")

### Shutdown Ray cluster

In [16]:
ray.shutdown()