Skip to content

Commit

Permalink
[Monitor] Reduce localhost monitoring_interval and wait_dur_sec
Browse files Browse the repository at this point in the history
  • Loading branch information
JosepSampe committed May 11, 2024
1 parent e9766d8 commit 4a6b811
Show file tree
Hide file tree
Showing 6 changed files with 67 additions and 51 deletions.
7 changes: 4 additions & 3 deletions lithops/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@
MODE_DEFAULT = SERVERLESS

MONITORING_DEFAULT = 'storage'
MONITORING_INTERVAL = 2
MONITORING_INTERVAL = 2 # seconds
MONITORING_INTERVAL_LH = 0.1 # seconds

SERVERLESS_BACKEND_DEFAULT = 'aws_lambda'
STANDALONE_BACKEND_DEFAULT = 'aws_ec2'
Expand All @@ -47,8 +48,8 @@
LOGS_PREFIX = "lithops.logs"
RUNTIMES_PREFIX = "lithops.runtimes"

EXECUTION_TIMEOUT_DEFAULT = 1800
EXECUTION_TIMEOUT_LOCALHOST_DEFAULT = 3600
EXECUTION_TIMEOUT_DEFAULT = 1800 # seconds
EXECUTION_TIMEOUT_LOCALHOST_DEFAULT = 3600 # seconds

LOCALHOST_RUNTIME_DEFAULT = os.path.basename(sys.executable)
LOCALHOST_SERVICE_IDLE_TIMEOUT = 3
Expand Down
10 changes: 5 additions & 5 deletions lithops/executors.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
from lithops.future import ResponseFuture
from lithops.invokers import create_invoker
from lithops.storage import InternalStorage
from lithops.wait import wait, ALL_COMPLETED, THREADPOOL_SIZE, WAIT_DUR_SEC, ALWAYS
from lithops.wait import wait, ALL_COMPLETED, THREADPOOL_SIZE, ALWAYS
from lithops.job import create_map_job, create_reduce_job
from lithops.config import default_config, \
extract_localhost_config, extract_standalone_config, \
Expand Down Expand Up @@ -399,7 +399,7 @@ def wait(
download_results: Optional[bool] = False,
timeout: Optional[int] = None,
threadpool_size: Optional[int] = THREADPOOL_SIZE,
wait_dur_sec: Optional[int] = WAIT_DUR_SEC,
wait_dur_sec: Optional[int] = None,
show_progressbar: Optional[bool] = True
) -> Tuple[FuturesList, FuturesList]:
"""
Expand All @@ -416,7 +416,7 @@ def wait(
:param download_results: Download results. Default false (Only get statuses)
:param timeout: Timeout of waiting for results
:param threadpool_size: Number of threads to use. Default 64
:param wait_dur_sec: Time interval between each check
:param wait_dur_sec: Time interval between each check. Default 1 second
:param show_progressbar: whether or not to show the progress bar.
:return: `(fs_done, fs_notdone)` where `fs_done` is a list of futures that have
Expand Down Expand Up @@ -470,7 +470,7 @@ def get_result(
throw_except: Optional[bool] = True,
timeout: Optional[int] = None,
threadpool_size: Optional[int] = THREADPOOL_SIZE,
wait_dur_sec: Optional[int] = WAIT_DUR_SEC,
wait_dur_sec: Optional[int] = None,
show_progressbar: Optional[bool] = True
):
"""
Expand All @@ -480,7 +480,7 @@ def get_result(
:param throw_except: Reraise exception if call raised. Default True.
:param timeout: Timeout for waiting for results.
:param threadpool_size: Number of threads to use. Default 128
:param wait_dur_sec: Time interval between each check.
:param wait_dur_sec: Time interval between each check. Default 1 second
:param show_progressbar: whether or not to show the progress bar.
:return: The result of the future/s
Expand Down
18 changes: 4 additions & 14 deletions lithops/localhost/v2/localhost.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
from lithops.constants import (
JOBS_DIR,
LOCALHOST_RUNTIME_DEFAULT,
RN_LOG_FILE,
TEMP_DIR,
LITHOPS_TEMP_DIR,
COMPUTE_CLI_MSG,
Expand Down Expand Up @@ -299,8 +298,7 @@ def run_task(self, job_key, call_id):
task_filename = os.path.join(JOBS_DIR, job_key, call_id + '.task')

cmd = [self.runtime_name, RUNNER_FILE, 'run_job', task_filename]
log = open(RN_LOG_FILE, 'a')
process = sp.Popen(cmd, stdout=log, stderr=log, start_new_session=True)
process = sp.Popen(cmd, start_new_session=True)
self.task_processes[job_key_call_id] = process
process.communicate() # blocks until the process finishes
del self.task_processes[job_key_call_id]
Expand Down Expand Up @@ -394,12 +392,8 @@ def start(self):
cmd += f'--rm -v {tmp_path}:/tmp -it --detach '
cmd += f'--entrypoint=/bin/bash {self.runtime_name}'

log = open(RN_LOG_FILE, 'a')
self.container_process = sp.Popen(
shlex.split(cmd), stdout=log,
stderr=log, start_new_session=True
)
self.container_process.communicate()
self.container_process = sp.Popen(shlex.split(cmd), start_new_session=True)
self.container_process.communicate() # blocks until the process finishes

super().start()

Expand All @@ -415,11 +409,7 @@ def run_task(self, job_key, call_id):
cmd += f'"python3 /tmp/{USER_TEMP_DIR}/localhost-runner.py '
cmd += f'run_job {docker_task_filename}"'

log = open(RN_LOG_FILE, 'a')
process = sp.Popen(
shlex.split(cmd), stdout=log,
stderr=log, start_new_session=True
)
process = sp.Popen(shlex.split(cmd), start_new_session=True)
self.task_processes[job_key_call_id] = process
process.communicate() # blocks until the process finishes
del self.task_processes[job_key_call_id]
Expand Down
26 changes: 18 additions & 8 deletions lithops/monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,10 @@
import threading
import concurrent.futures as cf
from tblib import pickling_support
from lithops.constants import MONITORING_INTERVAL
from lithops.constants import (
MONITORING_INTERVAL,
MONITORING_INTERVAL_LH
)

pickling_support.install()

Expand Down Expand Up @@ -464,23 +467,30 @@ def __init__(self, executor_id, internal_storage, config=None):
self.executor_id = executor_id
self.internal_storage = internal_storage
self.config = config
self.backend = self.config['lithops']['monitoring'].lower() if config else 'storage'
self.backend_type = self.config['lithops']['monitoring'].lower() if config else 'storage'
self.storage_backend = self.internal_storage.backend
self.token_bucket_q = queue.Queue()
self.monitor = None
self.job_chunksize = {}

self.MonitorClass = getattr(
lithops.monitor,
f'{self.backend.capitalize()}Monitor'
f'{self.backend_type.capitalize()}Monitor'
)

def start(self, fs, job_id=None, chunksize=None, generate_tokens=False):
if self.backend == 'storage':
mi = self.config['lithops'].get('monitoring_interval', MONITORING_INTERVAL) \
if self.config else MONITORING_INTERVAL
bk_config = {'monitoring_interval': mi}
if self.backend_type == 'storage':
monitoring_interval = None
if self.config and 'lithops' in self.config:
monitoring_interval = self.config['lithops'].get('monitoring_interval')
if not monitoring_interval:
if self.storage_backend == 'localhost':
monitoring_interval = MONITORING_INTERVAL_LH
else:
monitoring_interval = MONITORING_INTERVAL
bk_config = {'monitoring_interval': monitoring_interval}
else:
bk_config = self.config.get(self.backend)
bk_config = self.config.get(self.backend_type)

if job_id:
self.job_chunksize[job_id] = chunksize
Expand Down
46 changes: 30 additions & 16 deletions lithops/tests/test_map_reduce.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,24 +114,21 @@ def test_url_processing(self):
result = fexec.get_result()
assert result == self.words_in_files

def test_chunks_bucket(self):
def test_bucket_chunk_size(self):
"""tests the ability to create a separate function invocation
based on the following parameters: chunk_size creates [file_size//chunk_size]
invocations to process each chunk_size bytes, of a given object. chunk_number
creates 'chunk_number' invocations that process [file_size//chunk_number] bytes each.
invocations to process each chunk_size bytes, of a given object.
"""

logger.info('Testing chunks on a bucket')
OBJ_CHUNK_SIZE = 1 * 800 ** 2 # create a new invocation
OBJ_CHUNK_NUMBER = 2
activations = 0

data_prefix = self.storage_backend + '://' + self.bucket + '/' + DATASET_PREFIX + '/'

fexec = lithops.FunctionExecutor(config=pytest.lithops_config)
futures = fexec.map_reduce(my_map_function_obj, data_prefix,
my_reduce_function,
obj_chunk_size=OBJ_CHUNK_SIZE)
futures = fexec.map_reduce(
my_map_function_obj, data_prefix,
my_reduce_function, obj_chunk_size=OBJ_CHUNK_SIZE
)
result = fexec.get_result(futures)
assert result == self.words_in_files

Expand All @@ -140,27 +137,34 @@ def test_chunks_bucket(self):

assert len(futures) == activations + 1 # +1 due to the reduce function

def test_bucket_chunk_number(self):
"""tests the ability to create a separate function invocation
based on the following parameters: chunk_number
creates 'chunk_number' invocations that process [file_size//chunk_number] bytes each.
"""
OBJ_CHUNK_NUMBER = 2

data_prefix = self.storage_backend + '://' + self.bucket + '/' + DATASET_PREFIX + '/'

fexec = lithops.FunctionExecutor(config=pytest.lithops_config)
futures = fexec.map_reduce(my_map_function_obj, data_prefix,
my_reduce_function, obj_chunk_number=OBJ_CHUNK_NUMBER)
futures = fexec.map_reduce(
my_map_function_obj, data_prefix,
my_reduce_function, obj_chunk_number=OBJ_CHUNK_NUMBER
)
result = fexec.get_result(futures)
assert result == self.words_in_files

assert len(futures) == len(TEST_FILES_URLS) * OBJ_CHUNK_NUMBER + 1

def test_chunks_bucket_one_reducer_per_object(self):
def test_bucket_chunk_size_one_reducer_per_object(self):
"""tests the ability to create a separate function invocation based
on the following parameters, as well as create a separate invocation
of a reduce function for each object: chunk_size creates [file_size//chunk_size]
invocations to process each chunk_size bytes, of a given object. hunk_number
creates 'chunk_number' invocations that process [file_size//chunk_number] bytes each.
"""

logger.info('Testing chunks on a bucket with one reducer per object')
OBJ_CHUNK_SIZE = 1 * 1024 ** 2
OBJ_CHUNK_NUMBER = 2
activations = 0

data_prefix = self.storage_backend + '://' + self.bucket + '/' + DATASET_PREFIX + '/'

fexec = lithops.FunctionExecutor(config=pytest.lithops_config)
Expand All @@ -179,6 +183,16 @@ def test_chunks_bucket_one_reducer_per_object(self):
# + len(TEST_FILES_URLS) due to map_reduce activation per object
assert len(futures) == activations + len(TEST_FILES_URLS)

def test_bucket_chunk_number_one_reducer_per_object(self):
"""tests the ability to create a separate function invocation based
on the following parameters, as well as create a separate invocation
of a reduce function for each object: chunk_size creates [file_size//chunk_size]
invocations to process each chunk_size bytes, of a given object. hunk_number
creates 'chunk_number' invocations that process [file_size//chunk_number] bytes each.
"""
OBJ_CHUNK_NUMBER = 2
data_prefix = self.storage_backend + '://' + self.bucket + '/' + DATASET_PREFIX + '/'

fexec = lithops.FunctionExecutor(config=pytest.lithops_config)
futures = fexec.map_reduce(
my_map_function_obj,
Expand Down
11 changes: 6 additions & 5 deletions lithops/wait.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ def wait(fs: Union[ResponseFuture, FuturesList, List[ResponseFuture]],
download_results: Optional[bool] = False,
timeout: Optional[int] = None,
threadpool_size: Optional[int] = THREADPOOL_SIZE,
wait_dur_sec: Optional[int] = WAIT_DUR_SEC,
wait_dur_sec: Optional[int] = None,
show_progressbar: Optional[bool] = True,
futures_from_executor_wait: Optional[bool] = False) -> Tuple[FuturesList, FuturesList]:
"""
Expand All @@ -68,7 +68,7 @@ def wait(fs: Union[ResponseFuture, FuturesList, List[ResponseFuture]],
:param download_results: Download results. Default false (Only get statuses)
:param timeout: Timeout of waiting for results.
:param threadpool_size: Number of threads to use. Default 64
:param wait_dur_sec: Time interval between each check.
:param wait_dur_sec: Time interval between each check. Default 1 second
:param show_progressbar: whether or not to show the progress bar.
:return: `(fs_done, fs_notdone)`
Expand Down Expand Up @@ -131,7 +131,8 @@ def wait(fs: Union[ResponseFuture, FuturesList, List[ResponseFuture]],
internal_storage=executor_data.internal_storage)
job_monitor.start(fs=executor_data.futures)

sleep_sec = wait_dur_sec if job_monitor.backend == 'storage' else 0.3
sleep_sec = wait_dur_sec or WAIT_DUR_SEC if job_monitor.backend_type == 'storage' \
and job_monitor.storage_backend != 'localhost' else 0.1

if return_when == ALWAYS:
for executor_data in executors_data:
Expand Down Expand Up @@ -186,7 +187,7 @@ def get_result(fs: Optional[Union[ResponseFuture, FuturesList, List[ResponseFutu
throw_except: Optional[bool] = True,
timeout: Optional[int] = None,
threadpool_size: Optional[int] = THREADPOOL_SIZE,
wait_dur_sec: Optional[int] = WAIT_DUR_SEC,
wait_dur_sec: Optional[int] = None,
show_progressbar: Optional[bool] = True):
"""
For getting the results from all function activations
Expand All @@ -196,7 +197,7 @@ def get_result(fs: Optional[Union[ResponseFuture, FuturesList, List[ResponseFutu
:param throw_except: Reraise exception if call raised. Default True.
:param timeout: Timeout for waiting for results.
:param threadpool_size: Number of threads to use. Default 128
:param wait_dur_sec: Time interval between each check.
:param wait_dur_sec: Time interval between each check. Default 1 second
:param show_progressbar: whether or not to show the progress bar.
:return: The result of the future/s
Expand Down

0 comments on commit 4a6b811

Please sign in to comment.