Skip to content

Commit

Permalink
Merge pull request #173 from sosw/0_7_29
Browse files Browse the repository at this point in the history
v0.7.29
  • Loading branch information
nimirium committed Jan 6, 2020
2 parents 192b640 + bdca99f commit 92f66e2
Show file tree
Hide file tree
Showing 12 changed files with 301 additions and 27 deletions.
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
long_description = f.read()

setup(name='sosw',
version='0.7.27',
version='0.7.29',
description='Serverless Orchestrator of Serverless Workers',
long_description=long_description,
long_description_content_type="text/markdown",
Expand Down
1 change: 1 addition & 0 deletions sosw/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ def __init__(self, custom_config=None, **kwargs):
logger.info(f"Final {self.__class__.__name__} processor config: {self.config}")

self.stats = defaultdict(int)
self.result = defaultdict(int)

self.register_clients(self.config.get('init_clients', []))

Expand Down
58 changes: 48 additions & 10 deletions sosw/components/dynamo_db.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
__version__ = "1.6"

import boto3
import datetime
import logging
import json
import os
Expand Down Expand Up @@ -84,10 +85,8 @@ def __init__(self, config):

self.config = config

if not str(config.get('table_name')).startswith('autotest_mock_'):
self.dynamo_client = boto3.client('dynamodb')
else:
logger.info(f"Initialized DynamoClient without boto3 client for table {config.get('table_name')}")
# create a dynamodb client
self.dynamo_client = boto3.client('dynamodb', region_name=config.get('region_name'))

# storage for table description(s)
self._table_descriptions: Optional[Dict[str, Dict]] = {}
Expand Down Expand Up @@ -384,7 +383,8 @@ def dict_to_dynamo(self, row_dict, add_prefix=None, strict=True):
def get_by_query(self, keys: Dict, table_name: Optional[str] = None, index_name: Optional[str] = None,
comparisons: Optional[Dict] = None, max_items: Optional[int] = None,
filter_expression: Optional[str] = None, strict: bool = None, return_count: bool = False,
desc: bool = False, fetch_all_fields: bool = None) -> Union[List[Dict], int]:
desc: bool = False, fetch_all_fields: bool = None, expr_attrs_names: list = None) \
-> Union[List[Dict], int]:
"""
Get an item from a table, by some keys. Can specify an index.
If an index is not specified, will query the table.
Expand Down Expand Up @@ -416,6 +416,12 @@ def get_by_query(self, keys: Dict, table_name: Optional[str] = None, index_name:
To reverse the order set the argument `desc = True`.
:param bool fetch_all_fields: If False, will only get the attributes specified in the row mapper.
If True, will get all attributes. Default is False.
:param list expr_attrs_names: List of attributes names, in case if an attribute name begins with a number or
contains a space, a special character, or a reserved word, you must use an expression attribute name to
replace that attribute's name in the expression.
Example, if the list ['session', 'key'] is received, then a new dict will be assigned to
`ExpressionAttributeNames`:
{'#session': 'session', '#key': 'key'}
:return: List of items from the table, each item in key-value format
OR the count if `return_count` is True
Expand All @@ -432,6 +438,10 @@ def get_by_query(self, keys: Dict, table_name: Optional[str] = None, index_name:
cond_expr_parts = []

for key_attr_name in keys:
# Check if key attribute name is in `expr_attrs_names`, and create a prefix
# We add this prefix in case need to use `ExpressionAttributeNames`
expr_attr_prefix = '#' if expr_attrs_names and key_attr_name in expr_attrs_names else ''

# Find comparison for key. The formatting of conditions could be different, so a little spaghetti.
if key_attr_name.startswith('st_between_'): # This is just a marker to construct a custom expression later
compr = 'between'
Expand All @@ -443,14 +453,15 @@ def get_by_query(self, keys: Dict, table_name: Optional[str] = None, index_name:
compr = '='

if compr == 'begins_with':
cond_expr_parts.append(f"begins_with ({key_attr_name}, :{key_attr_name})")
cond_expr_parts.append(f"begins_with ({expr_attr_prefix}{key_attr_name}, :{key_attr_name})")

elif compr == 'between':
key = key_attr_name[11:]
cond_expr_parts.append(f"{key} between :st_between_{key} and :en_between_{key}")
expr_attr_prefix = '#' if expr_attrs_names and key in expr_attrs_names else ''
cond_expr_parts.append(f"{expr_attr_prefix}{key} between :st_between_{key} and :en_between_{key}")
else:
assert compr in ('=', '<', '<=', '>', '>='), f"Comparison not valid: {compr} for {key_attr_name}"
cond_expr_parts.append(f"{key_attr_name} {compr} :{key_attr_name}")
cond_expr_parts.append(f"{expr_attr_prefix}{key_attr_name} {compr} :{key_attr_name}")

cond_expr = " AND ".join(cond_expr_parts)

Expand All @@ -464,6 +475,11 @@ def get_by_query(self, keys: Dict, table_name: Optional[str] = None, index_name:
'KeyConditionExpression': cond_expr # Ex: "key1_name = :key1_name AND ..."
}

# In case of any of the attributes names are in the list of Reserved Words in DynamoDB or other situations when,
# there is a need to specify ExpressionAttributeNames, then a dict should be passed to the query.
if expr_attrs_names:
query_args['ExpressionAttributeNames'] = {f'#{el}': el for el in expr_attrs_names}

# In case we have a filter expression, we parse it and add variables (values) to the ExpressionAttributeValues
# Expression is also transformed to use these variables.
if filter_expression:
Expand Down Expand Up @@ -844,8 +860,9 @@ def update(self, keys: Dict, attributes_to_update: Optional[Dict] = None,
if condition_expression:
expr, values = self._parse_filter_expression(condition_expression)
update_item_query['ConditionExpression'] = expr
update_item_query['ExpressionAttributeValues'] = update_item_query.get('ExpressionAttributeValues', {})
update_item_query['ExpressionAttributeValues'].update(values)
if values:
update_item_query['ExpressionAttributeValues'] = update_item_query.get('ExpressionAttributeValues', {})
update_item_query['ExpressionAttributeValues'].update(values)

logger.debug(f"Updating an item, query: {update_item_query}")
response = self.dynamo_client.update_item(**update_item_query)
Expand Down Expand Up @@ -965,6 +982,27 @@ def get_capacity(self, table_name=None):
return self._table_capacity[table_name]


def sleep_db(self, last_action_time: datetime.datetime, action: str):
"""
Sleeps between calls to dynamodb (if it needs to).
Uses the table's capacity to decide how long it needs to sleep.
:param last_action_time: Last time when we did this action (read/write) to this dynamo table
:param action: "read" or "write"
"""

capacity = self.get_capacity()[action] # Capacity per second
time_between_actions = 1 / capacity

time_elapsed = datetime.datetime.now().timestamp() - last_action_time.timestamp()

time_to_sleep = time_between_actions - time_elapsed

if time_to_sleep > 0:
logging.debug(f"Sleeping {time_to_sleep} sec")
time.sleep(time_to_sleep)


def reset_stats(self):
"""
Cleans statistics.
Expand Down
46 changes: 40 additions & 6 deletions sosw/components/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,17 +57,20 @@
'trim_arn_to_name',
'trim_arn_to_account',
'make_hash',
'to_bool'
'to_bool',
'get_message_dict_from_sns_event',
'is_event_from_sns'
]

import re
import collections
import uuid
import datetime
from datetime import timezone
import json
import re
import uuid

from collections import defaultdict, Hashable
from copy import deepcopy
from datetime import timezone
from typing import Iterable, Callable, Dict, Mapping, List, Optional


Expand Down Expand Up @@ -604,11 +607,11 @@ def recursive_matches_extract(src, key, separator=None, **kwargs):
def dunder_to_dict(data: dict, separator=None):
"""
Converts the flat dict with keys using dunder notation for nesting elements to regular nested dictionary.
E.g.:
.. code-block:: python
data = {'a': 'v1', 'b__c': 'v2', 'b__d__e': 'v3'}
result = dunder_to_dict(data)
Expand Down Expand Up @@ -887,3 +890,34 @@ def to_bool(val):
elif val.lower() in ['false', '0']:
return False
raise Exception(f"Can't convert unexpected value to bool: {val}, type: {type(val)}")


def get_message_dict_from_sns_event(event):
"""
Extract SNS event message and return it loaded as a dict.
:param dict event: Lambda SNS event (payload). Must be a JSON document.
:rtype dict
:return: The SNS message, converted to dict
"""

if is_event_from_sns(event):
return json.loads(event['Records'][0]['Sns']['Message'])

raise ValueError(f"Event is not from SNS")


def is_event_from_sns(event):
"""
Check if the lambda invocation was by SNS.
:param dict event: Lambda Event (payload)
:rtype: bool
"""

try:
result = bool(event['Records'][0]['Sns'])
except:
result = False

return result
12 changes: 11 additions & 1 deletion sosw/components/sns.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ def __init__(self, **kwargs):
self.subject = kwargs.get('subject')

self.queue = []
self.separator = "\n\n#####\n\n"

self.test = kwargs.get('test') or True if os.environ.get('STAGE') == 'test' else False
if self.test:
Expand Down Expand Up @@ -112,6 +113,15 @@ def set_subject(self, value):
self.set_client_attr('subject', value=str(value))


def set_separator(self, separator):
"""
Set custom separator for messages from the queue
"""

assert isinstance(separator, str), f"Invalid format of separator: {separator}. Separator must be string."
setattr(self, 'separator', separator)


def commit(self):
"""
Combines messages from self.queue and pushes them to self.recipient.
Expand All @@ -128,7 +138,7 @@ def commit(self):
raise RuntimeError("You did not specify Subject for the message. "
"We don't want you to write code like this, please fix.")

message = "\n\n#####\n\n".join(self.queue)
message = self.separator.join(self.queue)

if message:
self.client.publish(
Expand Down
38 changes: 38 additions & 0 deletions sosw/components/test/integration/test_dynamo_db_i.py
Original file line number Diff line number Diff line change
Expand Up @@ -461,6 +461,22 @@ def test_get_by_query__return_count(self):
self.assertEqual(result, 3)


def test_get_by_query__expr_attr(self):
rows = [
{self.HASH_COL: 'cat1', self.RANGE_COL: 121},
{self.HASH_COL: 'cat1', self.RANGE_COL: 122},
{self.HASH_COL: 'cat1', self.RANGE_COL: 123}
]

for x in rows:
self.dynamo_client.put(x, table_name=self.table_name)

result = self.dynamo_client.get_by_query({self.HASH_COL: 'cat1', self.RANGE_COL: 121},
table_name=self.table_name, expr_attrs_names=[self.HASH_COL])

self.assertEqual(result[0], rows[0])


def test_get_by_query__reverse(self):
rows = [
{self.HASH_COL: 'cat1', self.RANGE_COL: 121, 'some_col': 'test1'},
Expand Down Expand Up @@ -639,5 +655,27 @@ def test_update__remove_attrs__without_update(self):
self.assertNotIn('other_col', updated_row)


def test_patch__remove_attrs__without_update(self):
keys = {self.HASH_COL: 'cat', self.RANGE_COL: '123'}
row = {self.HASH_COL: 'cat', self.RANGE_COL: '123', 'some_col': 'no', 'other_col': 'foo'}

self.dynamo_client.put(row, self.table_name)

self.dynamo_client.patch(keys, attributes_to_update={}, table_name=self.table_name,
attributes_to_remove=['other_col'])

updated_row = self.dynamo_boto3_client.get_item(
Key={self.HASH_COL: {'S': row[self.HASH_COL]},
self.RANGE_COL: {self.RANGE_COL_TYPE: str(row[self.RANGE_COL])}},
TableName=self.table_name,
)['Item']

updated_row = self.dynamo_client.dynamo_to_dict(updated_row)

self.assertIsNotNone(updated_row)
self.assertEqual(updated_row['some_col'], 'no'), "Field was not supposed to be updated"
self.assertNotIn('other_col', updated_row)


if __name__ == '__main__':
unittest.main()
53 changes: 53 additions & 0 deletions sosw/components/test/unit/test_dynamo_db.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
import datetime
import logging
import time
import unittest
import os
from decimal import Decimal
Expand Down Expand Up @@ -218,6 +220,20 @@ def test_get_by_query__return_count(self):
self.dynamo_client.dynamo_client.get_paginator.assert_called()


def test_get_by_query__expr_attr(self):
keys = {'st_between_range_col': '3', 'en_between_range_col': '6', 'session': 'ses1'}
expr_attrs_names = ['range_col', 'session']

self.dynamo_client = DynamoDbClient(config=self.TEST_CONFIG)
self.dynamo_client.get_by_query(keys=keys, expr_attrs_names=expr_attrs_names)

args, kwargs = self.paginator_mock.paginate.call_args
self.assertIn('#range_col', kwargs['ExpressionAttributeNames'])
self.assertIn('#session', kwargs['ExpressionAttributeNames'])
self.assertIn('#range_col between :st_between_range_col and :en_between_range_col AND #session = :session',
kwargs['KeyConditionExpression'])


def test__parse_filter_expression(self):
TESTS = {
'key = 42': ("key = :filter_key", {":filter_key": {'N': '42'}}),
Expand Down Expand Up @@ -318,5 +334,42 @@ def test_patch__transfers_attrs_to_remove(self):
condition_expression='attribute_exists hash_col')


def test_sleep_db__get_capacity_called(self):
self.dynamo_client.get_capacity = MagicMock(return_value={'read': 10, 'write': 5})

self.dynamo_client.sleep_db(last_action_time=datetime.datetime.now(), action='write')
self.dynamo_client.get_capacity.assert_called_once()


def test_sleep_db__wrong_action(self):
self.assertRaises(KeyError, self.dynamo_client.sleep_db, last_action_time=datetime.datetime.now(),
action='call')

@patch.object(time, 'sleep')
def test_sleep_db__fell_asleep(self, mock_sleep):
self.dynamo_client.get_capacity = MagicMock(return_value={'read': 10, 'write': 5})
# Check that went to sleep
time_between_ms = 100
last_action_time = datetime.datetime.now() - datetime.timedelta(milliseconds=time_between_ms)
self.dynamo_client.sleep_db(last_action_time=last_action_time, action='write')
self.assertEqual(mock_sleep.call_count, 1)
args, kwargs = mock_sleep.call_args

# Should sleep around 1 / capacity second minus "time_between_ms" minus code execution time
self.assertGreater(args[0], 1 / self.dynamo_client.get_capacity()['write'] - time_between_ms - 0.02)
self.assertLess(args[0], 1 / self.dynamo_client.get_capacity()['write'])


@patch.object(time, 'sleep')
def test_sleep_db__(self, mock_sleep):
self.dynamo_client.get_capacity = MagicMock(return_value={'read': 10, 'write': 5})

# Shouldn't go to sleep
last_action_time = datetime.datetime.now() - datetime.timedelta(milliseconds=900)
self.dynamo_client.sleep_db(last_action_time=last_action_time, action='write')
# Sleep function should not be called
self.assertEqual(mock_sleep.call_count, 0)


if __name__ == '__main__':
unittest.main()

0 comments on commit 92f66e2

Please sign in to comment.