Skip to content

Commit

Permalink
Merge pull request #10 from bimpression/dynamo_batch_get_items
Browse files Browse the repository at this point in the history
Dynamo batch get items
  • Loading branch information
ngr committed Jan 28, 2019
2 parents d89bf67 + fea619e commit 149d8d8
Show file tree
Hide file tree
Showing 3 changed files with 94 additions and 1 deletion.
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
long_description = f.read()

setup(name='sosw',
version='0.3.7',
version='0.3.9',
description='Serverless Orchestrator of Serverless Workers',
long_description=long_description,
long_description_content_type="text/markdown",
Expand Down
70 changes: 70 additions & 0 deletions sosw/components/dynamo_db.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import logging
import json
import os
import time
from collections import defaultdict

from .benchmark import benchmark
Expand Down Expand Up @@ -316,6 +317,75 @@ def _build_scan_iterator(self, attrs=None, table_name=None, strict=True):
return response_iterator


def batch_get_items_one_table(self, keys_list, table_name=None, max_retries=0, retry_wait_base_time=0.2):
"""
Gets a batch of items from a single dynamo table.
Only accepts keys, can't query by other columns.
:param list keys_list: A list of the keys of the items we want to get. Gets the items that match the given keys.
If some key doesn't exist - it just skips it and gets the others.
e.g. [{'hash_col': '1, 'range_col': 2}, {'hash_col': 3}]
- will get a row where `hash_col` is 1 and `range_col` is 2, and also all rows where
`hash_col` is 3.
Optional
:param str table_name:
:param int max_retries: If failed to get some items, retry this many times. Waiting between retries is
multiplied by 2 after each retry, so `retries` shouldn't be a big number.
Default is 1.
:param int retry_wait_base_time: Wait this much time after first retry. Will wait twice longer in each retry.
:return: List of items from the table
:rtype: list
"""

table_name = self._get_validate_table_name(table_name)

# Convert given keys to dynamo syntax
query_keys = [self.dict_to_dynamo(item) for item in keys_list]

batch_get_item_query = {
'RequestItems': {
table_name: {
'Keys': query_keys
}
}
}

logger.debug(f"batch_get_item query: {batch_get_item_query}")

db_result = self.dynamo_client.batch_get_item(**batch_get_item_query)
logger.debug(f"batch_get_items_one_table response: {db_result}")

# Check if we skipped something - if we did, try again.
def is_action_incomplete(db_result):
return 'UnprocessedKeys' in db_result and db_result['UnprocessedKeys'] \
and table_name in db_result['UnprocessedKeys'] and db_result['UnprocessedKeys'][table_name]

if is_action_incomplete(db_result):
# Retry several times
retry_num = 0
wait_time = retry_wait_base_time
while is_action_incomplete(db_result) and retry_num < max_retries:
logger.warning(f"batch_get_item action did NOT finish successfully.")
time.sleep(wait_time)
db_result = self.dynamo_client.batch_get_item(**batch_get_item_query)
retry_num += 1
wait_time *= 2

# After the retries still we have a bad result... then raise Exception
if is_action_incomplete(db_result):
raise Exception(f"batch_get_items action failed for table {table_name}, keys_list {keys_list}")

items = db_result['Responses'][table_name]

result = []
for item in items:
result.append(self.dynamo_to_dict(item))

return result


def build_put_query(self, row, table_name=None):
table_name = self._get_validate_table_name(table_name)
dynamo_formatted_row = self.dict_to_dynamo(row, strict=False)
Expand Down
23 changes: 23 additions & 0 deletions sosw/components/test/test_dynamo_db.py
Original file line number Diff line number Diff line change
Expand Up @@ -364,5 +364,28 @@ def test_get_by_scan__with_filter(self):
assert r in result, f"row not in result from dynamo scan: {r}"


def test_batch_get_items(self):
rows = [
{'hash_col': 'cat1', 'range_col': 121, 'some_col': 'test1'},
{'hash_col': 'cat1', 'range_col': 122, 'some_col': 'test2'},
{'hash_col': 'cat2', 'range_col': 122, 'some_col': 'test2'},
]
for x in rows:
self.dynamo_client.put(x, self.table_name)

keys_list_query = [
{'hash_col': 'cat1', 'range_col': 121},
{'hash_col': 'doesnt_exist', 'range_col': 40},
{'hash_col': 'cat2', 'range_col': 122},
]

result = self.dynamo_client.batch_get_items_one_table(keys_list_query)

self.assertEquals(len(result), 2)

self.assertIn(rows[0], result)
self.assertIn(rows[2], result)


if __name__ == '__main__':
unittest.main()

0 comments on commit 149d8d8

Please sign in to comment.