Skip to content

Commit

Permalink
Merge pull request #2 from lifeomic/RLP-1
Browse files Browse the repository at this point in the history
Support Detail-Type in EventProcessor
  • Loading branch information
mjtieman committed Jul 26, 2018
2 parents ad45b72 + bb5f1fb commit aad0178
Show file tree
Hide file tree
Showing 4 changed files with 90 additions and 6 deletions.
9 changes: 9 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -321,6 +321,15 @@ def handler(event, context):
manager.process_event(event)
```

EventProcessors can also be configured with the event `detail-type`. The EventProcessor in the above example
can be tweaked to process EMR step changes.

```python
...
processor = EventProcessor('aws.emr', 'detail.stepId', type='EMR Step Status Change', predicate=predicate)
...
```

## Development

### Dependencies
Expand Down
24 changes: 19 additions & 5 deletions limiter/event_processors.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,8 @@ class EventProcessor(object):
Args:
source (str): The origination this processor supports events for, e.g. aws.emr
id_path (str): Dot delineated path to the resource id value.
predicate (ProcessorPredicate): Prediate to test the event against before extracting the id. Defaults to None.
predicate (ProcessorPredicate): Predicate to test the event against before extracting the id. Defaults to None.
type (str): Event detail-type of the event this porcessor supports. Defaults to None.
Examples:
>>> event = {'source': 'aws.emr', 'detail': {'clusterId': 'j-1YONHTCP3YZKC', 'state': 'COMPLETED'}}
Expand All @@ -109,10 +110,11 @@ class EventProcessor(object):
>>> processor.test_and_get_id(event)
j-1YONHTCP3YZKC
"""
def __init__(self, source, id_path, predicate=None):
def __init__(self, source, id_path, predicate=None, type=None):
self.source = source
self.id_path = id_path
self.predicate = predicate
self.typ = type

def test_and_get_id(self, event):
"""
Expand Down Expand Up @@ -154,10 +156,12 @@ class EventProcessorManager(object):
>>> manager.process_event(event)
"""
def __init__(self, table_name=None, index_name=None, processors=None):
self.processors = {x.source: x for x in processors} if processors else {}
self.processors = {_build_processor_key(x.source, x.type): x for x in processors} if processors else {}
self.table_name = validate_table_env_fallback(table_name, 'NON_FUNGIBLE_TABLE', 'non-fungible-tokens')
self.index_name = validate_table_env_fallback(index_name, 'NON_FUNGIBLE_RES_INDEX', 'resource-index')

self.cache = []

self._client = None
self._table = None

Expand Down Expand Up @@ -207,7 +211,7 @@ def process_event(self, event):
"""
processor = self._get_processor(event)
resource_id = processor.test_and_get_id(event)
if resource_id:
if resource_id and resource_id not in self.cache:
token = self._get_resource_token(resource_id)
if token:
logger.info('Removing %s token %s from %s', processor.source, resource_id, self.table_name)
Expand All @@ -219,6 +223,7 @@ def process_event(self, event):
)
else:
logger.warn('Could not find a token for resoure %s', resource_id)
self.cache.append(resource_id)

def _get_processor(self, event):
"""
Expand All @@ -237,7 +242,13 @@ def _get_processor(self, event):
raise ValueError('Cannot process event, source is a required field. Event: ' + str(event))

source = event['source']
processor = self.processors.get(source, None)
type = event.get('detail-type', None)
source_type_key = _build_processor_key(source, type)

processor = self.processors.get(source_type_key, None)
if not processor:
processor = self.processors.get(source, None)

if not processor:
raise ValueError('No processor for event source: ' + source)
return processor
Expand Down Expand Up @@ -282,3 +293,6 @@ def _reduce_to_path(obj, path):
except Exception:
sys.exc_clear()
return None

def _build_processor_key(source, type=None):
return source + ':' + type.replace(' ', '').lower() if type else source
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
from setuptools import setup, find_packages

setup(name='rate-limiter-py',
version='0.1.0',
version='0.2.0',
description='Rate-limiter module which leverages DynamoDB to enforce resource limits.',
keywords=['lifeomic', 'dynamodb', 'rate', 'limit'],
author='Matthew Tieman',
Expand Down
61 changes: 61 additions & 0 deletions test/test_event_processors.py
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,7 @@ def test_no_event_source(self):

mock_processor = Mock()
mock_processor.source = random_string()
mock_processor.type = None

manager = EventProcessorManager(table_name=self.table_name,
index_name=self.index_name,
Expand All @@ -201,6 +202,7 @@ def test_no_processor(self):

mock_processor = Mock()
mock_processor.source = random_string()
mock_processor.type = None

manager = EventProcessorManager(table_name=self.table_name,
index_name=self.index_name,
Expand All @@ -224,6 +226,7 @@ def test_delete_token(self):

mock_processor = Mock()
mock_processor.source = event_source
mock_processor.type = None
mock_processor.test_and_get_id = MagicMock(return_value=self.resource_id)

mock_table = create_non_fung_table(self.table_name, self.index_name)
Expand All @@ -245,6 +248,7 @@ def test_delete_no_token_for_id(self):

mock_processor = Mock()
mock_processor.source = event_source
mock_processor.type = None
mock_processor.test_and_get_id = MagicMock(return_value=random_string())

mock_table = create_non_fung_table(self.table_name, self.index_name)
Expand All @@ -266,6 +270,7 @@ def test_delete_no_id_from_processor(self):

mock_processor = Mock()
mock_processor.source = event_source
mock_processor.type = None
mock_processor.test_and_get_id = MagicMock(return_value=None)

mock_table = create_non_fung_table(self.table_name, self.index_name)
Expand All @@ -280,6 +285,62 @@ def test_delete_no_id_from_processor(self):

self.assertEquals(1, self._get_resource_id_count(mock_table))

@mock_dynamodb2
def test_delete_on_type(self):
event_source = random_string()
detail_type = random_string()
event = {'source': event_source, 'detail-type': detail_type}

mock_default_processor = Mock()
mock_default_processor.source = event_source
mock_default_processor.type = None
mock_default_processor.test_and_get_id = MagicMock(side_effect=StandardError('Wrong processor invoked'))

mock_type_processor = Mock()
mock_type_processor.source = event_source
mock_type_processor.type = detail_type
mock_type_processor.test_and_get_id = MagicMock(return_value=self.resource_id)

mock_table = create_non_fung_table(self.table_name, self.index_name)
self._insert_token(mock_table)
self.assertEquals(1, self._get_resource_id_count(mock_table))

manager = EventProcessorManager(table_name=self.table_name,
index_name=self.index_name,
processors=[mock_default_processor, mock_type_processor])
manager._table = mock_table
manager.process_event(event)

self.assertEquals(0, self._get_resource_id_count(mock_table))

@mock_dynamodb2
def test_delete_fallback_no_type(self):
event_source = random_string()
detail_type = random_string()
event = {'source': event_source, 'detail-type': detail_type}

mock_default_processor = Mock()
mock_default_processor.source = event_source
mock_default_processor.type = None
mock_default_processor.test_and_get_id = MagicMock(return_value=self.resource_id)

mock_type_processor = Mock()
mock_type_processor.source = event_source
mock_type_processor.type = detail_type + random_string()
mock_type_processor.test_and_get_id = MagicMock(side_effect=StandardError('Wrong processor invoked'))

mock_table = create_non_fung_table(self.table_name, self.index_name)
self._insert_token(mock_table)
self.assertEquals(1, self._get_resource_id_count(mock_table))

manager = EventProcessorManager(table_name=self.table_name,
index_name=self.index_name,
processors=[mock_default_processor, mock_type_processor])
manager._table = mock_table
manager.process_event(event)

self.assertEquals(0, self._get_resource_id_count(mock_table))

def _get_resource_id_count(self, mock_table):
response = mock_table.query(IndexName=self.index_name,
KeyConditionExpression=Key('resourceId').eq(self.resource_id))
Expand Down

0 comments on commit aad0178

Please sign in to comment.