Skip to content

Commit

Permalink
Rewrite docs/internals::Loading #13
Browse files Browse the repository at this point in the history
  • Loading branch information
numberoverzero committed Jul 3, 2016
1 parent 77babe5 commit bb2c5b6
Show file tree
Hide file tree
Showing 2 changed files with 495 additions and 153 deletions.
153 changes: 47 additions & 106 deletions bloop/engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,14 @@
}


def list_of(objs):
"""wrap single elements in a list"""
def set_of(objs):
"""wrap single elements in a set"""
if isinstance(objs, str): # pragma: no cover
return [objs]
return {objs}
elif isinstance(objs, collections.abc.Iterable):
return objs
return set(objs)
else:
return [objs]
return {objs}


def value_of(column):
Expand Down Expand Up @@ -75,88 +75,6 @@ def config(engine, key, value):
return value


class LoadManager:
"""
The load operation involves a double indexing to provide O(1)
lookup from a table name and dictionary of attributes.
Besides the lookups to associate a blob of attributes with an
instance of a model, loading involves manipulating these blobs
into real python values, and modifying the tracking for each object.
This class exists to keep the more complex of the three pieces
separated, and easier to maintain.
"""
def __init__(self, engine, consistent):
self.engine = engine
self.consistent = config(engine, "consistent", consistent)

# TableIndex = {<T>: <KS>}
self.table_index = {}

# ObjectIndex = {<T>: {<I>: set(<O>)}}
self.object_index = {}

# Wire = {<T>: {"Keys": [<K>]}}
self.wire = {}

def add(self, obj):
# 0. Build indexes
table_name = obj.Meta.table_name
key = dump_key(self.engine, obj)
index = index_for(key)

# 1. New table, create empty dicts to add key/index
if table_name not in self.object_index:
# TableIndex[<T>] -> <KS>
self.table_index[table_name] = list(key.keys())
# ObjectIndex[<T>] -> new {<I>: set(<O>)}
self.object_index[table_name] = {}
# Wire[<T>] -> new {"Keys": [<K>]}
self.wire[table_name] = {
"Keys": [],
"ConsistentRead": self.consistent}

# 2. Associate key, index

# First time seeing this <I>, add <K> to Wire and new [<O>] to ObjectIndex[<I>]
if index not in self.object_index[table_name]:
self.wire[table_name]["Keys"].append(key)
self.object_index[table_name][index] = {obj}
# Already seen <I>, just add <O> to ObjectIndex[<I>]
else:
self.object_index[table_name][index].add(obj)

def pop(self, table_name, item):
# 0. Reconstruct index
# <T> -> <KS>
# <KS> x <Item> -> <I>
key_shape = self.table_index[table_name]
key = extract_key(key_shape, item)
index = index_for(key)

# 1. Get list of indexed objects
objects = self.object_index[table_name].pop(index)

# 2. Clean up empty table indexes
if not self.object_index[table_name]:
self.object_index.pop(table_name)

return objects

def not_loaded(self):
# We're going to flatten {<T>: {<I>: set(<O>)}} into set(<O>). Yep.
objects = set()

# This is an iterable of {<I>: set(<O>)}
for index in self.object_index.values():
# This is an iterable of set(<O>)
for index_set in index.values():
# Push this index's not-loaded set into the flat set
objects.update(index_set)
return objects


class Engine:
client = None

Expand Down Expand Up @@ -249,7 +167,7 @@ def is_validated(model):
self.type_engine.bind(context={"engine": self})

def delete(self, objs, *, condition=None, atomic=None):
objs = list_of(objs)
objs = set_of(objs)
for obj in objs:
if obj.Meta.abstract:
raise bloop.exceptions.AbstractModelException(obj)
Expand All @@ -272,18 +190,15 @@ def load(self, objs, consistent=None):
"""
Populate objects from dynamodb, optionally using consistent reads.
If any objects are not found, throws ObjectsNotFound with the attribute
`missing` containing a list of the objects that were not loaded.
If any objects are not found, raises NotModified with the attribute
`objects` containing a list of the objects that were not loaded.
Example
-------
Base = new_base()
engine = Engine()
class HashOnly(Base):
class HashOnly(bloop.new_base()):
user_id = Column(NumberType, hash_key=True)
class HashAndRange(Base):
class HashAndRange(bloop.new_base()):
user_id = Column(NumberType, hash_key=True)
game_title = Column(StringType, range_key=True)
Expand All @@ -294,26 +209,52 @@ class HashAndRange(Base):
engine.load(hash_only, consistent=True)
# Load multiple instances
engine.load(hash_only, hash_and_range)
engine.load([hash_only, hash_and_range])
"""
objs = list_of(objs)
# For an in-depth breakdown of the loading algorithm,
# see docs/dev/internal.rst::Loading
consistent = config(self, "consistent", consistent)
objs = set_of(objs)
for obj in objs:
if obj.Meta.abstract:
raise bloop.exceptions.AbstractModelException(obj)
request = LoadManager(self, consistent=consistent)

table_index, object_index, request = {}, {}, {}

for obj in objs:
request.add(obj)
response = self.client.batch_get_items(request.wire)
table_name = obj.Meta.table_name
key = dump_key(self, obj)
index = index_for(key)

if table_name not in object_index:
table_index[table_name] = list(sorted(key.keys()))
object_index[table_name] = {}
request[table_name] = {"Keys": [], "ConsistentRead": consistent}

if index not in object_index[table_name]:
request[table_name]["Keys"].append(key)
object_index[table_name][index] = set()
object_index[table_name][index].add(obj)

response = self.client.batch_get_items(request)

for table_name, items in response.items():
for item in items:
objects_to_load = request.pop(table_name, item)
for obj in objects_to_load:
key_shape = table_index[table_name]
key = extract_key(key_shape, item)
index = index_for(key)

for obj in object_index[table_name].pop(index):
self._update(obj, item, obj.Meta.columns)
bloop.tracking.sync(obj, self)

not_loaded = request.not_loaded()
if not_loaded:
if not object_index[table_name]:
object_index.pop(table_name)

if object_index:
not_loaded = set()
for index in object_index.values():
for index_set in index.values():
not_loaded.update(index_set)
raise bloop.exceptions.NotModified("load", not_loaded)

def query(self, obj, consistent=None):
Expand All @@ -331,7 +272,7 @@ def query(self, obj, consistent=None):
consistent=config(self, "consistent", consistent))

def save(self, objs, *, condition=None, atomic=None):
objs = list_of(objs)
objs = set_of(objs)
for obj in objs:
if obj.Meta.abstract:
raise bloop.exceptions.AbstractModelException(obj)
Expand Down
Loading

0 comments on commit bb2c5b6

Please sign in to comment.