Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 6 additions & 5 deletions rayvens/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -322,6 +322,7 @@ def _eval(context, f, data):

def init(mode=os.getenv('RAYVENS_MODE', 'auto'),
transport=os.getenv('RAYVENS_TRANSPORT', 'auto'),
release=os.getenv('RAYVENS_RELEASE', False),
check_port=True):
modes = ['auto', 'local', 'mixed', 'operator']
transports = ['auto', 'http', 'kafka', 'ray-serve']
Expand All @@ -337,19 +338,19 @@ def init(mode=os.getenv('RAYVENS_MODE', 'auto'),

if mode in ['auto', 'local']:
if transport in ['auto', 'http']:
_global_camel = start_http(mode, check_port)
_global_camel = start_http(mode, check_port, release)
elif transport == 'kafka':
_global_camel = start_kafka(mode, check_port)
_global_camel = start_kafka(mode, check_port, release)
else:
raise RuntimeError(
f'{transport} transport unsupported for mode {mode}.')
elif mode in ['mixed', 'operator']:
if transport in ['auto', 'http']:
_global_camel = start_operator_http(mode, check_port)
_global_camel = start_operator_http(mode, check_port, release)
elif transport == 'kafka':
_global_camel = start_operator_kafka(mode, check_port)
_global_camel = start_operator_kafka(mode, check_port, release)
elif transport in ['ray-serve']:
_global_camel = start_operator_ray_serve(mode, check_port)
_global_camel = start_operator_ray_serve(mode, check_port, release)
else:
raise RuntimeError(
f'{transport} transport unsupported for mode {mode}.')
Expand Down
3 changes: 2 additions & 1 deletion rayvens/core/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ def __init__(self, data, headers={}):
self.headers = headers


def get_run_mode(camel_mode, check_port):
def get_run_mode(camel_mode, check_port, release):
if camel_mode == 'auto' or camel_mode == 'local':
mode.run_mode = RayvensMode.LOCAL
elif camel_mode == 'mixed':
Expand All @@ -47,6 +47,7 @@ def get_run_mode(camel_mode, check_port):
else:
raise RuntimeError("Unsupported camel mode.")
mode.check_port = check_port
mode.release = release
return mode


Expand Down
11 changes: 9 additions & 2 deletions rayvens/core/invocation.py
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,12 @@ def _check_kamel_output(self,
end_condition,
with_output=False,
with_timeout=False):
# Decide whether to print logs or not. If this is release mode
# then no output will be printed:
with_output_final = with_output
if self.mode.release:
with_output_final = False

# Implicit 5 minute timout in hundredths of a second:
timout_duration = 5 * 60 * 100
if with_timeout:
Expand All @@ -175,10 +181,11 @@ def _check_kamel_output(self,
if with_timeout:
output = utils.print_log_from_queue(self.subprocess_name,
reading_thread.queue,
with_output)
with_output_final)
else:
output = utils.print_log_from_subprocess(
self.subprocess_name, self.process.stdout, with_output)
self.subprocess_name, self.process.stdout,
with_output_final)

# Use the Kamel output to decide when Kamel instance is
# ready to receive requests.
Expand Down
4 changes: 2 additions & 2 deletions rayvens/core/kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@
from rayvens.core.integration import Integration


def start(camel_mode, check_port):
return Camel(get_run_mode(camel_mode, check_port))
def start(camel_mode, check_port, release):
return Camel(get_run_mode(camel_mode, check_port, release))


class Camel:
Expand Down
4 changes: 2 additions & 2 deletions rayvens/core/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@
from rayvens.core.common import get_run_mode, send_to, recv_from, await_start


def start(camel_mode, check_port):
return Camel(get_run_mode(camel_mode, check_port))
def start(camel_mode, check_port, release):
return Camel(get_run_mode(camel_mode, check_port, release))


class Camel:
Expand Down
1 change: 1 addition & 0 deletions rayvens/core/mode.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ def __init__(self, run_mode=RayvensMode.LOCAL):
self.namespace = "ray"
self.transport = None
self.check_port = True
self.release = False

def server_address(self, integration):
return self._get_server_address(integration.integration_name,
Expand Down
4 changes: 2 additions & 2 deletions rayvens/core/operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@
from rayvens.core.integration import Integration


def start(camel_mode, check_port):
return Camel(get_run_mode(camel_mode, check_port))
def start(camel_mode, check_port, release):
return Camel(get_run_mode(camel_mode, check_port, release))


class Camel:
Expand Down
4 changes: 2 additions & 2 deletions rayvens/core/operator_kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@
from rayvens.core.integration import Integration


def start(camel_mode, check_port):
return Camel(get_run_mode(camel_mode, check_port))
def start(camel_mode, check_port, release):
return Camel(get_run_mode(camel_mode, check_port, release))


class Camel:
Expand Down
4 changes: 2 additions & 2 deletions rayvens/core/ray_serve.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@
from rayvens.core.integration import Integration


def start(camel_mode, check_port):
return Camel(common.get_run_mode(camel_mode, check_port))
def start(camel_mode, check_port, release):
return Camel(common.get_run_mode(camel_mode, check_port, release))


class Camel:
Expand Down