From e95b1ee3a999f842f3b08a1f65d212d5cf33b666 Mon Sep 17 00:00:00 2001 From: Gheorghe Teodor Bercea Date: Wed, 5 Jan 2022 19:41:15 +0200 Subject: [PATCH 1/2] Add release flag to rayvens init. --- examples/cloud_object_storage/cos_sink_multitask.py | 2 +- rayvens/api.py | 11 ++++++----- rayvens/core/common.py | 3 ++- rayvens/core/invocation.py | 11 +++++++++-- rayvens/core/kafka.py | 4 ++-- rayvens/core/local.py | 4 ++-- rayvens/core/mode.py | 1 + rayvens/core/operator.py | 4 ++-- rayvens/core/operator_kafka.py | 4 ++-- rayvens/core/ray_serve.py | 4 ++-- 10 files changed, 29 insertions(+), 19 deletions(-) diff --git a/examples/cloud_object_storage/cos_sink_multitask.py b/examples/cloud_object_storage/cos_sink_multitask.py index fbe08f7..de13828 100644 --- a/examples/cloud_object_storage/cos_sink_multitask.py +++ b/examples/cloud_object_storage/cos_sink_multitask.py @@ -42,7 +42,7 @@ # Camel-K component for Kafka does not propagate message headers. This # will be fixed by Camel-K 1.8.0 release. # rayvens.init(transport="kafka") -rayvens.init() +rayvens.init(release=True) # Create an object stream stream = rayvens.Stream('upload-file') diff --git a/rayvens/api.py b/rayvens/api.py index a9e01df..9b128bf 100644 --- a/rayvens/api.py +++ b/rayvens/api.py @@ -322,6 +322,7 @@ def _eval(context, f, data): def init(mode=os.getenv('RAYVENS_MODE', 'auto'), transport=os.getenv('RAYVENS_TRANSPORT', 'auto'), + release=os.getenv('RAYVENS_RELEASE', False), check_port=True): modes = ['auto', 'local', 'mixed', 'operator'] transports = ['auto', 'http', 'kafka', 'ray-serve'] @@ -337,19 +338,19 @@ def init(mode=os.getenv('RAYVENS_MODE', 'auto'), if mode in ['auto', 'local']: if transport in ['auto', 'http']: - _global_camel = start_http(mode, check_port) + _global_camel = start_http(mode, check_port, release) elif transport == 'kafka': - _global_camel = start_kafka(mode, check_port) + _global_camel = start_kafka(mode, check_port, release) else: raise RuntimeError( f'{transport} transport unsupported for mode {mode}.') elif mode in ['mixed', 'operator']: if transport in ['auto', 'http']: - _global_camel = start_operator_http(mode, check_port) + _global_camel = start_operator_http(mode, check_port, release) elif transport == 'kafka': - _global_camel = start_operator_kafka(mode, check_port) + _global_camel = start_operator_kafka(mode, check_port, release) elif transport in ['ray-serve']: - _global_camel = start_operator_ray_serve(mode, check_port) + _global_camel = start_operator_ray_serve(mode, check_port, release) else: raise RuntimeError( f'{transport} transport unsupported for mode {mode}.') diff --git a/rayvens/core/common.py b/rayvens/core/common.py index 0025d7a..f0bab21 100644 --- a/rayvens/core/common.py +++ b/rayvens/core/common.py @@ -37,7 +37,7 @@ def __init__(self, data, headers={}): self.headers = headers -def get_run_mode(camel_mode, check_port): +def get_run_mode(camel_mode, check_port, release): if camel_mode == 'auto' or camel_mode == 'local': mode.run_mode = RayvensMode.LOCAL elif camel_mode == 'mixed': @@ -47,6 +47,7 @@ def get_run_mode(camel_mode, check_port): else: raise RuntimeError("Unsupported camel mode.") mode.check_port = check_port + mode.release = release return mode diff --git a/rayvens/core/invocation.py b/rayvens/core/invocation.py index 01e5816..dbe150b 100644 --- a/rayvens/core/invocation.py +++ b/rayvens/core/invocation.py @@ -153,6 +153,12 @@ def _check_kamel_output(self, end_condition, with_output=False, with_timeout=False): + # Decide whether to print logs or not. If this is release mode + # then no output will be printed: + with_output_final = with_output + if self.mode.release: + with_output_final = False + # Implicit 5 minute timout in hundredths of a second: timout_duration = 5 * 60 * 100 if with_timeout: @@ -175,10 +181,11 @@ def _check_kamel_output(self, if with_timeout: output = utils.print_log_from_queue(self.subprocess_name, reading_thread.queue, - with_output) + with_output_final) else: output = utils.print_log_from_subprocess( - self.subprocess_name, self.process.stdout, with_output) + self.subprocess_name, self.process.stdout, + with_output_final) # Use the Kamel output to decide when Kamel instance is # ready to receive requests. diff --git a/rayvens/core/kafka.py b/rayvens/core/kafka.py index 47a0f47..3123e2c 100644 --- a/rayvens/core/kafka.py +++ b/rayvens/core/kafka.py @@ -20,8 +20,8 @@ from rayvens.core.integration import Integration -def start(camel_mode, check_port): - return Camel(get_run_mode(camel_mode, check_port)) +def start(camel_mode, check_port, release): + return Camel(get_run_mode(camel_mode, check_port, release)) class Camel: diff --git a/rayvens/core/local.py b/rayvens/core/local.py index 7f7617e..8de9b8f 100644 --- a/rayvens/core/local.py +++ b/rayvens/core/local.py @@ -19,8 +19,8 @@ from rayvens.core.common import get_run_mode, send_to, recv_from, await_start -def start(camel_mode, check_port): - return Camel(get_run_mode(camel_mode, check_port)) +def start(camel_mode, check_port, release): + return Camel(get_run_mode(camel_mode, check_port, release)) class Camel: diff --git a/rayvens/core/mode.py b/rayvens/core/mode.py index 4d84ce2..1cdb502 100644 --- a/rayvens/core/mode.py +++ b/rayvens/core/mode.py @@ -53,6 +53,7 @@ def __init__(self, run_mode=RayvensMode.LOCAL): self.namespace = "ray" self.transport = None self.check_port = True + self.release = False def server_address(self, integration): return self._get_server_address(integration.integration_name, diff --git a/rayvens/core/operator.py b/rayvens/core/operator.py index d585a39..be83354 100644 --- a/rayvens/core/operator.py +++ b/rayvens/core/operator.py @@ -20,8 +20,8 @@ from rayvens.core.integration import Integration -def start(camel_mode, check_port): - return Camel(get_run_mode(camel_mode, check_port)) +def start(camel_mode, check_port, release): + return Camel(get_run_mode(camel_mode, check_port, release)) class Camel: diff --git a/rayvens/core/operator_kafka.py b/rayvens/core/operator_kafka.py index 38de180..ac4cbad 100644 --- a/rayvens/core/operator_kafka.py +++ b/rayvens/core/operator_kafka.py @@ -20,8 +20,8 @@ from rayvens.core.integration import Integration -def start(camel_mode, check_port): - return Camel(get_run_mode(camel_mode, check_port)) +def start(camel_mode, check_port, release): + return Camel(get_run_mode(camel_mode, check_port, release)) class Camel: diff --git a/rayvens/core/ray_serve.py b/rayvens/core/ray_serve.py index 9e8e84e..ef27f4b 100644 --- a/rayvens/core/ray_serve.py +++ b/rayvens/core/ray_serve.py @@ -24,8 +24,8 @@ from rayvens.core.integration import Integration -def start(camel_mode, check_port): - return Camel(common.get_run_mode(camel_mode, check_port)) +def start(camel_mode, check_port, release): + return Camel(common.get_run_mode(camel_mode, check_port, release)) class Camel: From 16a85a75c3ad272d49573b5905e8145ef263c87d Mon Sep 17 00:00:00 2001 From: Gheorghe Teodor Bercea Date: Fri, 14 Jan 2022 14:17:52 -0500 Subject: [PATCH 2/2] Clean-up. --- examples/cloud_object_storage/cos_sink_multitask.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/examples/cloud_object_storage/cos_sink_multitask.py b/examples/cloud_object_storage/cos_sink_multitask.py index de13828..fbe08f7 100644 --- a/examples/cloud_object_storage/cos_sink_multitask.py +++ b/examples/cloud_object_storage/cos_sink_multitask.py @@ -42,7 +42,7 @@ # Camel-K component for Kafka does not propagate message headers. This # will be fixed by Camel-K 1.8.0 release. # rayvens.init(transport="kafka") -rayvens.init(release=True) +rayvens.init() # Create an object stream stream = rayvens.Stream('upload-file')