Skip to content

Commit

Permalink
Merge pull request #23 from miguelaeh/configurable_socket_queues
Browse files Browse the repository at this point in the history
Support configurable socket queues
  • Loading branch information
miguelaeh committed Aug 29, 2023
2 parents fee1b45 + aa0d0c4 commit 1ee8195
Show file tree
Hide file tree
Showing 10 changed files with 52 additions and 21 deletions.
12 changes: 11 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,6 @@ To configure your app you can use either env vars or the config file (`config.ya
| Option | Description | Value(s)/Type | Env Var |
| ------ | ----------- | ------------- | ------- |
| `log_level` | Level of the logging| `DEBUG`, `INFO`, `WARN`, `ERROR` | `PIPELESS_LOG_LEVEL` |
| `n_workers` | Number of workers deployed | int | `PIPELESS_N_WORKERS` |
| `input.address.host` | Host where the input component is running | `localhost` (string) | `PIPELESS_INPUT_ADDRESS_HOST` |
| `input.address.port` | Port of the input component process | `1234` (int) | `PIPELESS_INPUT_ADDRESS_PORT` |
| `input.video.enable` | Whether to enable to video input | `true` (boolean) | `PIPELESS_INPUT_VIDEO_ENABLE` |
Expand All @@ -171,6 +170,9 @@ To configure your app you can use either env vars or the config file (`config.ya
| `output.address.port` | Port of the output component process | `1234` (int) | `PIPELESS_OUTPUT_ADDRESS_PORT` |
| `output.video.enable` | Whether to enable to video output | `true` (boolean) | `PIPELESS_OUTPUT_VIDEO_ENABLE` |
| `output.video.uri` | `screen` to see video directly on the device screen, or the URI where to send the processed output video. URIs **must** include the protocol (`file://`, `https://`, `rtmp://`, etc) | string | `PIPELESS_OUTPUT_VIDEO_URI` |
| `output.recv_buffer_size` | Size of the output socket reception buffer | `300` (int) | `PIPELESS_OUTPUT_RECV_BUFFER_SIZE` |
| `worker.recv_buffer_size` | Size of the worker socket reception buffer | `300` (int) | `PIPELESS_WORKER_RECV_BUFFER_SIZE` |
| `worker.n_workers` | Number of workers deployed | int | `PIPELESS_WORKER_N_WORKERS` |

## Ready to use models

Expand Down Expand Up @@ -250,3 +252,11 @@ This project is licensed under the [Apache License 2.0](LICENSE).
The Apache License 2.0 is a permissive open-source license that allows you to use, modify, and distribute this software for personal or commercial purposes. It comes with certain obligations, including providing attribution to the original authors and including the original license text in your distributions.

For the full license text, please refer to the [Apache License 2.0](LICENSE).

## Notable Changes

### Core version `0.1.6` and CLI version `0.1.5`

These versions include a new **optional** parameter to configure the reception buffers of the sockets, which is useful to adjust the buffer sizes to the processing time.

With the above change, we also moved the **n_workers** option to the `worker` configuration section, which constitutes a **breaking change**. Since Pipeless is still in alpha, we are not increasing the major version.
2 changes: 1 addition & 1 deletion cli/pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "pipeless-ai-cli"
version = "0.1.4"
version = "0.1.5"
description = "Pipeless is a framework to build and deploy multimodal perception apps in minutes without worrying about multimedia pipelines"
authors = ["Miguel Angel Cabrera Minagorri <devgorri@gmail.com>"]
license = "Apache-2.0"
Expand Down
6 changes: 4 additions & 2 deletions cli/src/pipeless_ai_cli/commands/create.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ def create_project(name: str):
# TODO: should be able to override default config via CLI options
default_config = {
'log_level': 'INFO',
'n_workers': 1,
'input': {
'video': {
'enable': True,
Expand All @@ -41,7 +40,10 @@ def create_project(name: str):
'host': 'localhost',
'port': 1236
},
}
},
'worker': {
'n_workers': 1,
},
}

try:
Expand Down
2 changes: 1 addition & 1 deletion core/pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "pipeless-ai"
version = "0.1.5"
version = "0.1.6"
description = "A framework to build and deploy multimodal perception apps in minutes without worrying about multimedia pipelines"
authors = ["Miguel Angel Cabrera Minagorri <devgorri@gmail.com>"]
license = "Apache-2.0"
Expand Down
33 changes: 24 additions & 9 deletions core/src/pipeless_ai/lib/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@

ENV_PREFIX = 'PIPELESS'

def prioritized_config(config, path, env_var_name, convert_to=str, required=False):
value = os.environ.get(env_var_name, None)
def prioritized_config(config, path, env_var_name, convert_to=str, required=False, default=None):
value = os.environ.get(env_var_name, default)
if value is None:
try:
value = config[path]
Expand Down Expand Up @@ -76,18 +76,33 @@ def get_address(self):

class Output():
def __init__(self, output_dict):
"""
When no output video URI is provided, the video is sent to the default
video output of the computer.
"""
self._video = Video(output_dict['video'], f'{ENV_PREFIX}_OUTPUT_VIDEO')
# Address where the output component is running
self._address = Address(output_dict['address'], f'{ENV_PREFIX}_OUTPUT_ADDRESS')
self._recv_buffer_size = prioritized_config(
output_dict, 'recv_buffer_size',
f'{ENV_PREFIX}_OUTPUT_RECV_BUFFER_SIZE', convert_to=int,
default=300) # 5 seconds of 60 pfs video

def get_video(self):
return self._video
def get_address(self):
return self._address
def get_recv_buffer_size(self):
return self._recv_buffer_size

class Worker():
def __init__(self, worker_dict):
self._n_workers = prioritized_config(worker_dict, 'n_workers', f'{ENV_PREFIX}_WORKER_N_WORKERS', convert_to=int, required=True)
self._recv_buffer_size = prioritized_config(
worker_dict, 'recv_buffer_size',
f'{ENV_PREFIX}_WORKER_RECV_BUFFER_SIZE', convert_to=int,
default=300) # 5 seconds of 60 pfs video

def get_n_workers(self):
return self._n_workers
def get_recv_buffer_size(self):
return self._recv_buffer_size

class Config(metaclass=Singleton):
def __init__(self, config):
Expand All @@ -99,14 +114,14 @@ def __init__(self, config):

self._input = Input(config['input'])
self._output = Output(config['output'])
self._n_workers = prioritized_config(config, 'n_workers', f'{ENV_PREFIX}_N_WORKERS', convert_to=int, required=True)
self._worker = Worker(config['worker'])
logger.debug('[green]Configuration parsed[/green]')

def get_input(self):
return self._input
def get_output(self):
return self._output
def get_worker(self):
return self._worker
def get_log_level(self):
return self._log_level
def get_n_workers(self):
return self._n_workers
7 changes: 4 additions & 3 deletions core/src/pipeless_ai/lib/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ def __init__(self, timeout=500):
self._socket = Pull0()
self._socket.recv_timeout = timeout
self._socket.recv_max_size = 0 # Unlimited receive size
self._socket.recv_buffer_size = 180 # 3 seconds of 60 pfs video
self._socket.recv_buffer_size = config.get_worker().get_recv_buffer_size()
self._name = 'InputPullSocket'

wait_socket_dial(self._socket, self._addr)
Expand All @@ -231,12 +231,13 @@ class OutputPullSocket(metaclass=Singleton):
"""
def __init__(self, timeout=500):
config = Config(None) # Get the already existing config instance
address = config.get_output().get_address()
out_config = config.get_output()
address = out_config.get_address()
self._addr = f'tcp://{address.get_address()}'
self._socket = Pull0(listen=self._addr)
self._socket.recv_timeout = timeout
self._socket.recv_max_size = 0 # Unlimited receive size
self._socket.recv_buffer_size = 180 # 3 seconds of 60 pfs video
self._socket.recv_buffer_size = out_config.get_recv_buffer_size()
self._name = 'OutputPullSocket'

@recv_error_handler
Expand Down
2 changes: 1 addition & 1 deletion core/src/pipeless_ai/lib/input/input.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ def on_bus_message(bus: Gst.Bus, msg: Gst.Message, loop: GLib.MainLoop):
m_msg = EndOfStreamMsg()
m_msg = m_msg.serialize()
config = Config(None)
for _ in range(config.get_n_workers()):
for _ in range(config.get_worker().get_n_workers()):
# The socket is round robin, send to all workers
# TODO: a broadcast socket for this is better for scaling
# by saving the n_workers config option
Expand Down
3 changes: 2 additions & 1 deletion examples/cats/config.yaml
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
log_level: INFO
n_workers: 1
input:
address:
host: localhost
Expand All @@ -14,3 +13,5 @@ output:
video:
enable: true
uri: file:///home/example/path/pipeless/examples/cats/cats-output.mp4
worker:
n_workers: 1
3 changes: 2 additions & 1 deletion examples/pose/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,12 @@ input:
enable: true
uri: file:///home/example/path/woman-walking.mp4
log_level: INFO
n_workers: 1
output:
address:
host: localhost
port: 1237
video:
enable: true
uri: file:///home/example/path/output.mp4
worker:
n_workers: 1
3 changes: 2 additions & 1 deletion examples/text-overlay/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,12 @@ input:
enable: true
uri: file:///home/example/path/book-video.mp4
log_level: INFO
n_workers: 1
output:
address:
host: localhost
port: 1237
video:
enable: true
uri: screen
worker:
n_workers: 1

0 comments on commit 1ee8195

Please sign in to comment.