Skip to content

Commit

Permalink
Address miscellaneous review nits for batch-uploads work. (#52); r=Natim
Browse files Browse the repository at this point in the history
* Address miscellaneous review nits for batch-uploads work.
* Add a test for emitting batch-upload db timers.
  • Loading branch information
rfk committed Nov 5, 2016
1 parent 85871a7 commit c670709
Show file tree
Hide file tree
Showing 13 changed files with 279 additions and 183 deletions.
2 changes: 0 additions & 2 deletions Makefile
Expand Up @@ -8,15 +8,13 @@ BUILD_TMP = /tmp/syncstorage-build.${USER}
PYPI = https://pypi.python.org/simple

export MOZSVC_SQLURI = sqlite:///:memory:
#export MOZSVC_SQLURI = mysql://user:password@localhost/my_sync_db

# Hackety-hack around OSX system python bustage.
# The need for this should go away with a future osx/xcode update.
ARCHFLAGS = -Wno-error=unused-command-line-argument-hard-error-in-future

INSTALL = ARCHFLAGS=$(ARCHFLAGS) $(PIP) install -U -i $(PYPI)


.PHONY: all build test

all: build
Expand Down
10 changes: 2 additions & 8 deletions loadtest/stress.py
Expand Up @@ -178,14 +178,8 @@ def test_storage_session(self):

# POST requests with several WBOs batched together
num_requests = self._pick_weighted_count(post_count_distribution)
# Shallower distribution(for now) of "transactional" style batch
# uploads. Looking for roughly 10% of batch POSTs to be
# "transactional". Increase the frequency of "transactional" requests
# by changing the modulo 10 to something like modulo 5(~20%) or
# modulo 3(~30%).
# transact = int(random.paretovariate(1) * 100) % 10 == 0
#
# Let's do roughly 50% for fun.

# Let's do roughly 50% transactional batches.
transact = random.randint(0, 1)
batch_id = None
committing = False
Expand Down
3 changes: 3 additions & 0 deletions syncstorage/storage/__init__.py
Expand Up @@ -16,6 +16,9 @@

logger = logging.getLogger("syncstorage.storage")

# Rough guesstimate of the maximum reasonable life span of a batch.
BATCH_LIFETIME = 60 * 60 * 2 # 2 hours, in seconds


class StorageError(Exception):
"""Base class for exceptions raised from the storage backend."""
Expand Down
60 changes: 30 additions & 30 deletions syncstorage/storage/memcached.py
Expand Up @@ -59,7 +59,8 @@
CollectionNotFoundError,
ItemNotFoundError,
InvalidOffsetError,
InvalidBatch)
InvalidBatch,
BATCH_LIFETIME)

from pyramid.settings import aslist

Expand Down Expand Up @@ -944,23 +945,31 @@ def del_item(self, userid, item):
raise ItemNotFoundError
return modified

def get_cached_batches(self, userid):
return self.cache.gets(self.get_batches_key(userid))
def get_cached_batches(self, userid, ts=None):
if ts is None:
ts = get_timestamp()
ts = int(ts)
bdata, bcasid = self.cache.gets(self.get_batches_key(userid))
# Remove any expired batches, but let the
# calling code write it back out to memcache.
if bdata:
for batchid, batch in bdata.items():
if batch["created"] + BATCH_LIFETIME < ts:
del bdata[batchid]
return bdata, bcasid

def create_batch(self, userid):
bdata, bcasid = self.get_cached_batches(userid)
batch = get_timestamp()
batchid = int(batch * 1000)
ts = get_timestamp()
bdata, bcasid = self.get_cached_batches(userid, ts)
batchid = int(ts * 1000)
if not bdata:
bdata = {}
if batchid in bdata:
raise ConflictError
bdata[batchid] = {"user": userid,
"modified": batch,
# FIXME Rough guesstimate of the maximum
# reasonable life span of a batch
"expires": int(batch) + 2 * 3600,
"items": []}
bdata[batchid] = {
"created": int(ts),
"items": []
}
key = self.get_batches_key(userid)
if not self.cache.cas(key, bdata, bcasid):
raise ConflictError
Expand All @@ -969,27 +978,20 @@ def create_batch(self, userid):
def valid_batch(self, userid, batch):
ts = get_timestamp()
batchid = str(batch)
bdata, bcasid = self.get_cached_batches(userid)

if not bdata or batchid not in bdata:
return False
if bdata[batchid]["expires"] < ts:
self.close_batch(userid, batchid)
bdata, bcasid = self.get_cached_batches(userid, ts)
if not bdata:
return False
return True
return (batchid in bdata)

def append_items_to_batch(self, userid, batch, items):
modified = get_timestamp()
batchid = str(batch)
bdata, bcasid = self.get_cached_batches(userid)
bdata, bcasid = self.get_cached_batches(userid, modified)
# Invalid, closed, or expired batch
if (not bdata or
batchid not in bdata or
bdata[batchid]["expires"] <= int(modified)):
raise InvalidBatch(batch, modified, bdata)
if not bdata or batchid not in bdata:
raise InvalidBatch(batch)

bdata[batchid]["items"].extend(items)
bdata[batchid]["modified"] = modified
key = self.get_batches_key(userid)
if not self.cache.cas(key, bdata, bcasid):
raise ConflictError
Expand All @@ -998,12 +1000,10 @@ def append_items_to_batch(self, userid, batch, items):
def apply_batch(self, userid, batch):
modified = get_timestamp()
batchid = str(batch)
bdata, bcasid = self.get_cached_batches(userid)
bdata, bcasid = self.get_cached_batches(userid, modified)
# Invalid, closed, or expired batch
if (not bdata or
batchid not in bdata or
bdata[batchid]["expires"] <= int(modified)):
raise InvalidBatch(batch, modified, bdata)
if not bdata or batchid not in bdata:
raise InvalidBatch(batch)

data, casid = self.get_cached_data(userid)
self._set_items(userid, bdata[batchid]["items"], modified, data, casid)
Expand Down
188 changes: 111 additions & 77 deletions syncstorage/storage/sql/__init__.py
Expand Up @@ -30,7 +30,8 @@
ConflictError,
CollectionNotFoundError,
ItemNotFoundError,
InvalidOffsetError)
InvalidOffsetError,
BATCH_LIFETIME)

from syncstorage.storage.sql.dbconnect import (DBConnector, MAX_TTL,
BackendError)
Expand Down Expand Up @@ -493,6 +494,10 @@ def create_batch(self, session, userid, collection):
@with_session
def valid_batch(self, session, userid, collection, batchid):
"""Checks to see if the batch ID is valid and still open"""
# Avoid hitting the db for batches that are obviously too old.
# Recall that the batchid is a millisecond timestamp.
if (batchid / 1000 + BATCH_LIFETIME) < session.timestamp:
return False
collectionid = self._get_collection_id(session, collection)
params = {
"batch": batchid,
Expand All @@ -502,6 +507,7 @@ def valid_batch(self, session, userid, collection, batchid):
valid = session.query_scalar("VALID_BATCH", params=params)
return valid

@metrics_timer("syncstorage.storage.sql.append_items_to_batch")
@with_session
def append_items_to_batch(self, session, userid, collection, batchid,
items):
Expand All @@ -514,7 +520,7 @@ def append_items_to_batch(self, session, userid, collection, batchid,
session.insert_or_update("batch_upload_items", rows)
return session.timestamp

@metrics_timer("syncstorage.storage.sql.db.execute.apply_batch")
@metrics_timer("syncstorage.storage.sql.apply_batch")
@with_session
def apply_batch(self, session, userid, collection, batchid):
collectionid = self._get_collection_id(session, collection)
Expand Down Expand Up @@ -692,90 +698,118 @@ def delete_item(self, session, userid, collection, item):
# Administrative/maintenance methods.
#
def purge_expired_items(self, grace_period=0, max_per_loop=1000):
"""Purges items with an expired TTL from the bso and batch_upload_items
table(s).
"""Purges expired items from the bso and batch-related tables."""
res = self._purge_expired_bsos(grace_period, max_per_loop)
num_bso_rows_purged = res["num_purged"]
is_complete = res["is_complete"]

The grace period for batch_upload_items is hard coded to 1.5 time
the current upper limit(2 hours) for a batch session's lifetime.
"""
res = self._purge_expired_batches(grace_period, max_per_loop)
num_batches_purged = res["num_purged"]
is_complete = is_complete and res["is_complete"]

res = self._purge_expired_batch_items(grace_period, max_per_loop)
num_bui_rows_purged = res["num_purged"]
is_complete = is_complete and res["is_complete"]

table_sets = {"bso": {"replace": "bso",
"query": "PURGE_SOME_EXPIRED_ITEMS",
"total_affected": 0},
"batch_upload_items": {"replace": "bui",
"query": "PURGE_BATCH_CONTENTS",
"total_affected": 0,
"grace_period": 3 * 60 * 60}}
return {
"num_batches_purged": num_batches_purged,
"num_bso_rows_purged": num_bso_rows_purged,
"num_bui_rows_purged": num_bui_rows_purged,
"is_complete": is_complete,
}

total_batches_purged = 0
batches_purged = max_per_loop + 1
def _purge_expired_bsos(self, grace_period=0, max_per_loop=1000):
"""Purges BSOs with an expired TTL from the database."""
# Get the set of all BSO tables in the database.
# This will be different depending on whether sharding is done.
if not self.dbconnector.shard:
tables = set(("bso",))
else:
tables = set(self.dbconnector.get_bso_table(i).name
for i in xrange(self.dbconnector.shardsize))
assert len(tables) == self.dbconnector.shardsize
# Purge each table in turn, summing rowcounts.
num_purged = 0
is_incomplete = False
for table in sorted(tables):
res = self._purge_items_loop(table, "PURGE_SOME_EXPIRED_ITEMS", {
"bso": table,
"grace": grace_period,
"maxitems": max_per_loop,
})
num_purged += res["num_purged"]
is_incomplete = is_incomplete or not res["is_complete"]
return {
"num_purged": num_purged,
"is_complete": not is_incomplete,
}

for table_type in ("bso", "batch_upload_items"):
tabula = table_sets[table_type]
if table_type == "batch_upload_items":
# Tidy up the batch_uploads table first
if batches_purged <= max_per_loop:
with self._get_or_create_session() as session:
batches_purged = session.query("PURGE_BATCHES", {
"grace": tabula["grace_period"],
"maxitems": max_per_loop,
})
total_batches_purged += batches_purged

if not self.dbconnector.shard:
tables = set((table_type,))
else:
shard_func = self.dbconnector.get_bso_table
if table_type == "batch_upload_items":
shard_func = self.dbconnector.get_batch_item_table
tables = set(shard_func(i).name
for i in xrange(self.dbconnector.shardsize))
assert len(tables) == self.dbconnector.shardsize
# Purge each table in turn, summing rowcounts.
# We set an upper limit on the number of iterations, to avoid
# getting stuck indefinitely on a single table.
for table in sorted(tables):
logger.info("Purging expired items from %s", table)
num_iters = 1
num_affected = 0
with self._get_or_create_session() as session:
rowcount = session.query(tabula["query"], {
tabula["replace"]: table,
"grace": tabula.get("grace_period", grace_period),
"maxitems": max_per_loop,
})
while rowcount > 0:
num_affected += rowcount
logger.debug("After %d iterations, %s items purged",
num_iters, num_affected)
num_iters += 1
if num_iters > 100:
logger.debug("Too many iterations on %s, bailing outXS"
% table)
is_incomplete = True
break
with self._get_or_create_session() as session:
rowcount = session.query(tabula["query"], {
tabula["replace"]: table,
"grace": tabula.get("grace_period", grace_period),
"maxitems": max_per_loop,
})
logger.info("Purged %d expired items from %s",
num_affected, table)
tabula["total_affected"] += num_affected

logger.info("Purged %d expired batches from "
"batch_uploads" % total_batches_purged)

# Return the required data to the caller.
def _purge_expired_batches(self, grace_period=0, max_per_loop=1000):
return self._purge_items_loop("batch_uploads", "PURGE_BATCHES", {
"lifetime": BATCH_LIFETIME,
"grace": grace_period,
"maxitems": max_per_loop,
})

def _purge_expired_batch_items(self, grace_period=0, max_per_loop=1000):
# Get the set of all BUI tables in the database.
# This will be different depending on whether sharding is done.
if not self.dbconnector.shard:
tables = set(("batch_upload_items",))
else:
tables = set(self.dbconnector.get_batch_item_table(i).name
for i in xrange(self.dbconnector.shardsize))
assert len(tables) == self.dbconnector.shardsize
# Purge each table in turn, summing rowcounts.
num_purged = 0
is_incomplete = False
for table in sorted(tables):
res = self._purge_items_loop(table, "PURGE_BATCH_CONTENTS", {
"bui": table,
"lifetime": BATCH_LIFETIME,
"grace": grace_period,
"maxitems": max_per_loop,
})
num_purged += res["num_purged"]
is_incomplete = is_incomplete or not res["is_complete"]
return {
"num_purged": num_purged,
"is_complete": not is_incomplete,
}

def _purge_items_loop(self, table, query, params):
"""Helper function to incrementally purge items in a loop."""
# Purge some items, a few at a time, in a loop.
# We set an upper limit on the number of iterations, to avoid
# getting stuck indefinitely on a single table.
logger.info("Purging expired items from %s", table)
MAX_ITERS = 100
num_iters = 1
num_purged = 0
is_incomplete = False
# Note that we take a new session for each run of the query.
# This avoids holdig open a long-running transaction, so
# the incrementality can let other jobs run properly.
with self._get_or_create_session() as session:
rowcount = session.query(query, params)
while rowcount > 0:
num_purged += rowcount
logger.debug("After %d iterations, %s items purged",
num_iters, num_purged)
num_iters += 1
if num_iters > MAX_ITERS:
logger.debug("Too many iterations, bailing out.")
is_incomplete = True
break
with self._get_or_create_session() as session:
rowcount = session.query(query, params)
logger.info("Purged %d expired items from %s", num_purged, table)
# We use "is_incomplete" rather than "is_complete" in the code above
# because we expect that, most of the time, the purge will complete.
# So it's more efficient to flag the case when it doesn't.
# But the caller really wants to know is_complete.
return {
"batches_purged": total_batches_purged,
"num_bso_rows_purged": table_sets["bso"]["total_affected"],
"num_bui_rows_purged": table_sets["batch_upload_items"]["total_affected"], # noqa
"num_purged": num_purged,
"is_complete": not is_incomplete,
}

Expand Down

0 comments on commit c670709

Please sign in to comment.