Skip to content

Commit

Permalink
Merge pull request #123 from bimpression/scheduler_fix
Browse files Browse the repository at this point in the history
Scheduler fix
  • Loading branch information
ngr committed Jun 30, 2019
2 parents 63f569c + 7892a61 commit ee18e02
Show file tree
Hide file tree
Showing 5 changed files with 50 additions and 31 deletions.
20 changes: 20 additions & 0 deletions sosw/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ def __init__(self, custom_config=None, **kwargs):

self.lambda_context = kwargs.pop('context', None)
if self.lambda_context:
logger.warning("DEPRECATED: Processor.lambda_context is deprecated. Use global_vars.lambda_context instead")
self.aws_account = trim_arn_to_account(self.lambda_context.invoked_function_arn)

self.config = self.DEFAULT_CONFIG or {}
Expand Down Expand Up @@ -322,6 +323,22 @@ def __exit__(self, exc_type, exc_val, exc_tb):


class LambdaGlobals:
"""
Global placeholder for global_vars that we want to preserve in the lifetime of the Lambda Container.
e.g. once initiailised the given Processor, we keep it alive in the container to minimize warm-run time.
This namespace also contains the lambda_context which should be reset by `get_lambda_handler` method.
See Worker examples in documentation for more info.
"""

def __init__(self):
"""
Reset the lambda context for every reinitialization.
The Processor may stay alive in the scope of Lambda container, but the context is unique per invocation.
The Lambda Globals should also be reset by `get_lambda_handler` method.
"""
global _lambda_context
_lambda_context = None

@property
def lambda_context(self):
Expand Down Expand Up @@ -398,3 +415,6 @@ def lambda_handler(event, context):


return lambda_handler

# Global placeholder for global_vars.
global_vars = LambdaGlobals()
21 changes: 9 additions & 12 deletions sosw/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,27 +6,20 @@
__license__ = "MIT"
__status__ = "Development"

import boto3
import datetime
import json
import logging
import math
import os
import re
import time

from importlib import import_module
from collections import Counter
from collections import defaultdict
from collections import OrderedDict
from collections import Iterable
from copy import deepcopy
from typing import List, Set, Tuple, Union, Optional, Dict

from sosw.app import Processor
from sosw.app import Processor, LambdaGlobals
from sosw.components.helpers import get_list_of_multiple_or_one_or_empty_from_dict, trim_arn_to_name, chunks
from sosw.components.siblings import SiblingsManager
from sosw.labourer import Labourer
from sosw.managers.task import TaskManager


Expand Down Expand Up @@ -116,6 +109,8 @@ def __call__(self, event):

self.process_file()

super().__call__(event)


def parse_job_to_file(self, job: Dict):
"""
Expand Down Expand Up @@ -570,11 +565,11 @@ def process_file(self):
# Spawning another sibling to continue the processing
try:
payload = dict(file_name=file_name)
self.siblings_client.spawn_sibling(self.lambda_context, payload=payload)
self.siblings_client.spawn_sibling(global_vars.lambda_context, payload=payload)
self.stats['siblings_spawned'] += 1

except Exception as err:
logger.exception(f"Could not spawn sibling with context: {self.lambda_context}, payload: {payload}")
logger.exception(f"Could not spawn sibling with context: {global_vars.lambda_context}, payload: {payload}")

self.upload_and_unlock_queue_file()
self.clean_tmp()
Expand Down Expand Up @@ -648,7 +643,7 @@ def sufficient_execution_time_left(self) -> bool:
Return if there is a sufficient execution time for processing ('shutdown period' is in seconds).
"""

return self.lambda_context.get_remaining_time_in_millis() > self.config['shutdown_period'] * 1000
return global_vars.lambda_context.get_remaining_time_in_millis() > self.config['shutdown_period'] * 1000


def get_and_lock_queue_file(self) -> str:
Expand Down Expand Up @@ -716,7 +711,7 @@ def set_queue_file(self, name: str = None):
if name is None:
filename_parts = self.config['queue_file'].rsplit('.', 1)
assert len(filename_parts) == 2, "Got bad file name"
self._queue_file_name = f"{filename_parts[0]}_{self.lambda_context.aws_request_id}.{filename_parts[1]}"
self._queue_file_name = f"{filename_parts[0]}_{global_vars.lambda_context.aws_request_id}.{filename_parts[1]}"
else:
self._queue_file_name = name

Expand All @@ -739,3 +734,5 @@ def remote_queue_locked_file(self):
Concurrent processes should not touch it.
"""
return f"{self.config['s3_prefix'].strip('/')}/locked_{self._queue_file_name}"

global_vars = LambdaGlobals()
13 changes: 7 additions & 6 deletions sosw/test/integration/test_scheduler_i.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

from unittest.mock import MagicMock, patch

from sosw.scheduler import Scheduler
from sosw.scheduler import Scheduler, global_vars
from sosw.labourer import Labourer
from sosw.test.variables import TEST_SCHEDULER_CONFIG
from sosw.test.helpers_test import line_count
Expand Down Expand Up @@ -68,12 +68,13 @@ def setUp(self):
self.get_config_patch = self.patcher.start()

self.custom_config = self.TEST_CONFIG.copy()
self.lambda_context = types.SimpleNamespace()
self.lambda_context.aws_request_id = 'AWS_REQ_ID'
self.lambda_context.invoked_function_arn = 'arn:aws:lambda:us-west-2:000000000000:function:some_function'
self.lambda_context.get_remaining_time_in_millis = MagicMock(side_effect=[100000, 100])
lambda_context = types.SimpleNamespace()
lambda_context.aws_request_id = 'AWS_REQ_ID'
lambda_context.invoked_function_arn = 'arn:aws:lambda:us-west-2:000000000000:function:some_function'
lambda_context.get_remaining_time_in_millis = MagicMock(side_effect=[100000, 100])
global_vars.lambda_context = lambda_context

self.scheduler = Scheduler(self.custom_config, context=self.lambda_context)
self.scheduler = Scheduler(self.custom_config)

self.s3_client = boto3.client('s3')

Expand Down
7 changes: 3 additions & 4 deletions sosw/test/unit/test_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,14 @@
from unittest import mock
from unittest.mock import MagicMock, patch

os.environ["STAGE"] = "test"
os.environ["autotest"] = "True"

from sosw.app import Processor, LambdaGlobals, get_lambda_handler, logger
from sosw.components.sns import SnsManager
from sosw.components.siblings import SiblingsManager


os.environ["STAGE"] = "test"
os.environ["autotest"] = "True"


class app_UnitTestCase(unittest.TestCase):
TEST_CONFIG = {'test': True}

Expand Down
20 changes: 11 additions & 9 deletions sosw/test/unit/test_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
from unittest import mock
from unittest.mock import MagicMock, PropertyMock, patch

from sosw.scheduler import Scheduler, InvalidJob
from sosw.scheduler import Scheduler, InvalidJob, global_vars
from sosw.labourer import Labourer
from sosw.components.helpers import chunks
from sosw.test.variables import TEST_SCHEDULER_CONFIG
Expand Down Expand Up @@ -79,13 +79,15 @@ def setUp(self):
'auto_spawning': True
}

self.lambda_context = types.SimpleNamespace()
self.lambda_context.aws_request_id = 'AWS_REQ_ID'
self.lambda_context.invoked_function_arn = 'arn:aws:lambda:us-west-2:000000000000:function:some_function'
self.lambda_context.get_remaining_time_in_millis = MagicMock(side_effect=[100000, 100])
lambda_context = types.SimpleNamespace()
lambda_context.aws_request_id = 'AWS_REQ_ID'
lambda_context.invoked_function_arn = 'arn:aws:lambda:us-west-2:000000000000:function:some_function'
lambda_context.get_remaining_time_in_millis = MagicMock(side_effect=[100000, 100])
global_vars.lambda_context = lambda_context
self.custom_lambda_context = lambda_context # This is to access from tests.

with patch('boto3.client'):
self.scheduler = module.Scheduler(self.custom_config, context=self.lambda_context)
self.scheduler = module.Scheduler(self.custom_config)

self.scheduler.s3_client = MagicMock()
self.scheduler.sns_client = MagicMock()
Expand Down Expand Up @@ -130,7 +132,7 @@ def test_init__chunkable_attrs_not_end_with_s(self):
config['job_schema']['chunkable_attrs'] = [('bad_name_ending_with_s', {})]

with patch('boto3.client'):
self.assertRaises(AssertionError, Scheduler, custom_config=config, context=self.lambda_context)
self.assertRaises(AssertionError, Scheduler, custom_config=config)


def test_get_next_chunkable_attr(self):
Expand All @@ -147,13 +149,13 @@ def test__queue_bucket(self):

def test__remote_queue_file(self):
self.assertIn(f"{self.scheduler.config['s3_prefix'].strip('/')}", self.scheduler.remote_queue_file)
self.assertIn(self.lambda_context.aws_request_id, self.scheduler.remote_queue_file)
self.assertIn(self.custom_lambda_context.aws_request_id, self.scheduler.remote_queue_file)


def test__remote_queue_locked_file(self):
self.assertIn(f"{self.scheduler.config['s3_prefix'].strip('/')}", self.scheduler.remote_queue_locked_file)
self.assertIn('locked_', self.scheduler.remote_queue_locked_file)
self.assertIn(self.lambda_context.aws_request_id, self.scheduler.remote_queue_locked_file)
self.assertIn(self.custom_lambda_context.aws_request_id, self.scheduler.remote_queue_locked_file)


### Tests of file operations ###
Expand Down

0 comments on commit ee18e02

Please sign in to comment.