# Pipeline drivers

After starting an `InferenceProcess` on a `Runtime` it is possible to manually run predictions via the `InferenceProcess.predict` method where your inputs are send to the pipeline's "virtual source" and the results are fetched from the pipeline's "virtual sink". While this good for initial testing, at some point the pipeline will need to be used in production where we want to connect the pipeline to some stream or HTTP server and let the predictions run automatically.

This is where **Pipeline drivers** come into play.

A `PipelineDriver` is a wrapper of the `InferenceProcess`.
The goal of the driver is to *drive* the pipeline with input generators (sources) and output processors (sinks).
Some examples:
* an HTTPServer that handles prediction requests and sends the result back as a response
* a driver that fetches images from a stream and writes the results to a database

![title](img/PipelineDriver.png)

## Reasoning behind pipeline drivers

* The reason why a 'source' and 'sink' is not part of the pipeline is to make it more easy to link a pipeline with a different stream/http server
without having to make dupplications or variations of the same pipeline. Instead a pipeline has a virtual source and sink that can be linked with any driver to fit a specific use case.
* Multiple drivers can be added for the same `InferenceProcess` process while using the same GPU resources. The SDK ensures that the `PipelineState` between all these streams are not intertwined as long as the driver implements sets the correct *state ids* for each prediction.
* Another advantage is that you don't need to manually startup these processes yourself since it's the runtime's task to start and stop the drivers.

## PipelineDriver interface

The separation between a source / sink is not always clear and is sometimes tightly coupled. For example for an HTTP server the HTTPRequest might
represent the source and the HTTPResponse might represent the sink. Therefore a simple `PipelineDriver` base class is provided that has a run method that receives an `InferenceProcess` that leaves the source/sink design to the implementer:

```python
# pseudo implementation
class PipelineDriver(metaclass=ABCMeta):
    
    @abstractmethod
    def run(self, inference: "InferenceProcess"):
        """Start the driver."""
        raise NotImplementedError

    @abstractmethod
    def stop(self):
        """Stop the driver and cleanup resources."""
        raise NotImplementedError
```

## Usage

The SDK already provides a few drivers out of the box:
* `FastAPIDriver` that provides an HTTP Rest API server for the pipeline
* `SourceSinkDriver` that that can fetch inputs from arbitrary `Source`s (e.g. RTSPSource) and send the results to arbitrary `Sink`s (e.g. DBWriterSink)

But first we will create a minimal dummy pipeline to demonstrate the drivers concept.

In [None]:
!pip install -qqq rvai==1.1.0rc51 pygraphviz

In [None]:
from dataclasses import dataclass
from rvai.base.cell import Cell, cell
from rvai.base.data import (
    Inputs,
    Outputs,
    Parameters,
    State,
)
from rvai.base.context import InferenceContext
from rvai.base.pipeline import PipelineFactory
from rvai.types import Integer

# Cell
@dataclass
class MyInputs(Inputs):
    integer: Integer = Inputs.field(name="Dummy input", description="Some dummy input")


@dataclass
class MyOutputs(Outputs):
    integer: Integer = Outputs.field(name="Dummy output", description="Some dummy output")
    counter: Integer = Outputs.field(name="Counter", description="Prediction counter")


@dataclass
class MyParameters(Parameters):
    adder: Integer = Parameters.field(default=Integer(1), name="Adder")


@dataclass
class MyState(State):
    counter: Integer = State.field(default=Integer(0))


@cell(state=MyState)
class MyCell(Cell):

    @classmethod
    def call(
        cls, context: InferenceContext, parameters: MyParameters, inputs: MyInputs,
    ) -> MyOutputs:
        state: MyState = context.state
        state.counter = Integer(state.counter + 1)
        return MyOutputs(
            integer=Integer(inputs.integer + parameters.adder),
            counter=state.counter,
        )

# Pipeline
pf = PipelineFactory(name="MyPipeline")
my_cell = MyCell()
pf.add_cell(ref="my-cell", cell=my_cell)
pf.declare_input(ref="input", input=my_cell.inputs.integer)
pf.declare_output(ref="output", output=my_cell.outputs.integer)
pf.declare_output(ref="counter", output=my_cell.outputs.counter)

pipeline = pf.build()
pipeline.show()

As you can see from the pipeline's graph there's a "virtual source" and a "virtual sink" added. Now we will define a driver that will connect these to an actual source and sink.


Now start an inference process:

In [None]:
from rvai.base.runtime import Inference, init

rt = init("debug")
inference = Inference(pipeline=pipeline)

proc = rt.start_inference(inference=inference)

### FastAPIDriver

Adding a `FastAPIDriver` to an inference process is quite easy. The method `InferenceProcess.add_driver` expects a driver reference, a callable that instantiates a driver and optional arguments:

In [None]:
from rvai.extensions.drivers.fast_api_driver import FastAPIDriver

fast_api_driver_proc = proc.add_driver("webapi", FastAPIDriver)
assert fast_api_driver_proc == proc.get_driver("webapi")

This method also returns a handle to the driver process that can be used to manage the spawned driver.

The driver process can also be fetched via `proc.get_driver("webapi")` or `proc.list_drivers()` to get an overview of all added drivers.

#### Driver services

Some drivers like the `FastAPIDriver` can expose a service that an external client can connect to remotely. Those services can be obtained via `PipelineDriverProcess.list_services()` and `PipelineDriverProcess.get_service()`:

In [None]:
# get accesspoints of this driver
print("FastAPIDriver services:", ", ".join([s["name"] for s in fast_api_driver_proc.list_services()]))

http_service = fast_api_driver_proc.get_service("http")
host = http_service["taggedAddresses"]["wan"]["address"]
port = http_service["taggedAddresses"]["wan"]["port"]
fast_api_url = f"http://{host}:{port}"
print(f"The API server should be available on {fast_api_url}")

In [None]:
from IPython.display import IFrame
IFrame(fast_api_url, width=1000, height=500)

The SDK also provides a python client. Instead of providing the host and port manually, it is also possible to provide a `PipelineDriverProcess`. The client will then automatically fetch the correct host and por

In [None]:
from rvai.extensions.drivers.fast_api_driver import FastAPIClient, FastAPIClientError

client = FastAPIClient(process=fast_api_driver_proc)

# wait for api server te be online
client.wait()

# regular predict
print("HTTP predict result:", client.predict({"input": Integer(1)}))

# prediction task
task_id = client.predict_task({"input": Integer(1)})
print("HTTP prediction task:", task_id)
print("HTTP prediction task result:", client.task_result(task_id))


Drivers can be added and removed at runtime. `InferenceProcess.add_driver` returns a `PipelineDriverProcess` which can be stopped via `PipelineDriverProcess.stop` or by giving its `pid` to and `InferenceProcess.remove_driver(driver_proc.pid)`. All running driver processes can be fetched via `InferenceProcess.list_drivers()`. While doing this the actual pipeline and all its resources are still available.

In [None]:
from rvai.base.exc import DriverNotFoundError
from rvai.types import Integer

try:
    # stop the driver
    proc.remove_driver(fast_api_driver_proc.pid)
except DriverNotFoundError as e:
    print('Could not remove driver:', e)
    
print('* manual inference still possible:', proc.predict({"input": Integer(1)}).result())

print("* client not responding (as expected):")
try:
    client.predict({"input": Integer(1)})
except FastAPIClientError as e:
    print(e)

try:
    client.wait(timeout=1)
except FastAPIClientError as e:
    print("Can also be tested via `client.wait()`:", e)

In [None]:
fast_api_driver_proc = proc.add_driver("webapi", FastAPIDriver)
client = FastAPIClient(process=fast_api_driver_proc)
client.wait()
print("Server back online!")

#### Driver parameters

Driver parameters can be accessed and changed via `get_parameters` and `set_parameters` methods on the driver process:

In [None]:
fast_api_params = fast_api_driver_proc.get_parameters()
print(f"Current FastAPI parameters: {fast_api_params}")
fast_api_params.task_ttl = Integer(10)
fast_api_driver_proc.set_parameters(fast_api_params)
print(f"Updated FastAPI parameters: {fast_api_driver_proc.get_parameters()}")

### SourceSinkDriver

The `SourceSinkDriver` is capable of connecting multiple arbitrary sources and sinks tot the virtual sources and sinks.

In order to be able to re-use or add more sources and sinks the interface is standardized via some base class `Source` and `Sink`. The API of those classes can be found in the docs: https://base.rvai.dev/rvai.base.drivers.html

For testing purposes the SDK already provides a `DummySource` and `DummySink`. The `DummySource` can be configured to emit any input format at any rate, while the `DummySink` takes a `drain` method (by default it will write the outputs to logging).

**!!WARNING** *Drivers may only be instantiated by the runtime. Since the `SourceSinkDriver` expects `Source` and `Sink` instances to be added, we wrap our driver definition in a function that returns a driver instance. Which is ok since the `add_driver` method expects any `Callable` that generates a `PipelineDriver` instance.*

In [None]:
import time
from rvai.extensions.drivers.source_sink_driver import SourceSinkDriver
from rvai.extensions.sources.dummy_source import DummySource
from rvai.extensions.sinks.dummy_sink import DummySink
from rvai.types import FloatRange

# create driver
def create_source_sink_driver():
    driver = SourceSinkDriver()
    
    dummy_source = DummySource(format={"input": Integer}, params=DummySource.parameters(rate=FloatRange(10)))
    dummy_sink = DummySink(drain=lambda x: print("\rdraining ", x, end=""))
    
    driver.add_source(ref="source", source=dummy_source)
    driver.add_sink(ref="sink", sink=dummy_sink)
    return driver

# add to inference process
driver_proc = proc.add_driver("dummy-stream", create_source_sink_driver)
print(f"SourceSinkDriver parameters: {driver_proc.get_parameters()}")
print(f"SourceSinkDriver services:", ", ".join([s["name"] for s in driver_proc.list_services()]))
time.sleep(5)
print("")
print("Stopping source sink driver")
proc.remove_driver(driver_proc.pid)

### Custom drivers

While `SourceSinkDriver` can cover many usecases because of the arbitrary `Source`s and `Sink`s, you sometimes want to be able to write a custom driver.

This can be done by subclassing the abstract class`PipelineDriver`. There are a few aspects to creating a driver:

#### 1. Defining parameters

A driver must define the parameters for the driver itself. This is done be creating a `DriverParameters` dataclass and implementing the `PipelineDriver.get_parameters` and `PipelineDriver.set_parameters` methods

#### 2. Requesting resources

Optionally, a driver can request some resources by overriding the `PipelineDriver.resources` method and returning a `ResourcesRequest` instance. Currently it's only possible to request ports.

#### 3. Implementing the `run` and `stop` methods

The 'loop' of the driver needs to be implemented in the `PipelineDriver.run` method. This method receives an `InferenceProcess` which can be used to perform requests on the pipeline and a `Resources` instance which contains the assigned resources that have been requested.

Finally the `stop` method must stop the driver loop that has been started in the `run` method.


Let's create one that does a certain amount of predictions and prints the average execution time and add it to the inference process:

In [None]:
import time
import numpy as np
from dataclasses import dataclass, field
from uuid import uuid4
from typing import Optional
from rvai.base.drivers import DriverParameters, PipelineDriver
from rvai.base.resources import DriverResources, DriverResourcesRequest
from rvai.base.runtime import RestrictedInferenceProcess
from rvai.types import Integer

@dataclass
class CustomDriverParameters(DriverParameters):
    param: Integer = field(default=Integer(0))

class CustomDriver(PipelineDriver):
    """
        Stress test the inference process and print the average execution time.
    """
    def __init__(self, predictions: int = 10, params: Optional[CustomDriverParameters] = None):
        self._predictions = predictions
        self._params = params or CustomDriverParameters()
        self._running = False
    
    def resources_request(self) -> DriverResourcesRequest:  
        # Optionally request resources
        return DriverResourcesRequest(ports=["dummy-port"])    
        
    def run(self, inference: RestrictedInferenceProcess, resources: DriverResources):
        print("Starting custom driver")
        print("Assigned ports:", resources.ports)
        self._running = True
        
        # create new state id
        state_id = str(uuid4())   
        
        times = []
        results = []
        for _ in range(self._predictions):
            if not self._running:
                # respect the `stop` action and stop the loop
                print("Prematurely stopping test!")
                break
            start = time.time()
            result = inference.predict(inputs={"input": Integer.fake()}, sid=state_id).result()
            times.append(time.time() - start)
            results.append(result)
        
        if len(results) >= 2:
            print(f"First result: {results[0]}")
            print(f"Second result: {results[1]}")
        print(f"Average time over {len(times)} predictions: {np.round(np.mean(times) * 1000, 1)}ms")    
    
    def stop(self):
        self._running = False
        
    def get_parameters(self) -> CustomDriverParameters:
        """Get parameters."""
        return self._params

    def set_parameters(self, parameters: CustomDriverParameters):
        """Set parameters."""
        self._params = parameters

driver_proc = proc.add_driver("custom", CustomDriver, predictions=1000)
# test premature stop
# time.sleep(0.1)
# proc.remove_driver(driver_proc.pid)

Stop inference process including all drivers.

In [None]:
proc.stop()

Stop the runtime

In [None]:
rt.stop()

## Packaging drivers, sources and sinks

Packaging of custom drivers, sources and sinks will be similar to how you register cells and pipelines.
**WORK IN PROGRESS**