From c6707098a4dac0e134fadf964ecca7c5eb66ee1f Mon Sep 17 00:00:00 2001 From: Ryan Kelly Date: Sat, 5 Nov 2016 13:01:36 +1100 Subject: [PATCH] Address miscellaneous review nits for batch-uploads work. (#52); r=Natim * Address miscellaneous review nits for batch-uploads work. * Add a test for emitting batch-upload db timers. --- Makefile | 2 - loadtest/stress.py | 10 +- syncstorage/storage/__init__.py | 3 + syncstorage/storage/memcached.py | 60 +++---- syncstorage/storage/sql/__init__.py | 188 ++++++++++++-------- syncstorage/storage/sql/queries_generic.py | 23 ++- syncstorage/storage/sql/queries_mysql.py | 22 ++- syncstorage/storage/sql/queries_postgres.py | 27 ++- syncstorage/storage/sql/queries_sqlite.py | 18 +- syncstorage/tests/test_scripts.py | 66 ++++--- syncstorage/tests/test_wsgiapp.py | 40 +++++ syncstorage/tests/tests-hostname.ini | 1 + syncstorage/views/__init__.py | 2 - 13 files changed, 279 insertions(+), 183 deletions(-) diff --git a/Makefile b/Makefile index 351ef999..091514ac 100644 --- a/Makefile +++ b/Makefile @@ -8,7 +8,6 @@ BUILD_TMP = /tmp/syncstorage-build.${USER} PYPI = https://pypi.python.org/simple export MOZSVC_SQLURI = sqlite:///:memory: -#export MOZSVC_SQLURI = mysql://user:password@localhost/my_sync_db # Hackety-hack around OSX system python bustage. # The need for this should go away with a future osx/xcode update. @@ -16,7 +15,6 @@ ARCHFLAGS = -Wno-error=unused-command-line-argument-hard-error-in-future INSTALL = ARCHFLAGS=$(ARCHFLAGS) $(PIP) install -U -i $(PYPI) - .PHONY: all build test all: build diff --git a/loadtest/stress.py b/loadtest/stress.py index 18d0d213..8dd78837 100644 --- a/loadtest/stress.py +++ b/loadtest/stress.py @@ -178,14 +178,8 @@ def test_storage_session(self): # POST requests with several WBOs batched together num_requests = self._pick_weighted_count(post_count_distribution) - # Shallower distribution(for now) of "transactional" style batch - # uploads. Looking for roughly 10% of batch POSTs to be - # "transactional". Increase the frequency of "transactional" requests - # by changing the modulo 10 to something like modulo 5(~20%) or - # modulo 3(~30%). - # transact = int(random.paretovariate(1) * 100) % 10 == 0 - # - # Let's do roughly 50% for fun. + + # Let's do roughly 50% transactional batches. transact = random.randint(0, 1) batch_id = None committing = False diff --git a/syncstorage/storage/__init__.py b/syncstorage/storage/__init__.py index 71eeedfe..0ca8af7a 100644 --- a/syncstorage/storage/__init__.py +++ b/syncstorage/storage/__init__.py @@ -16,6 +16,9 @@ logger = logging.getLogger("syncstorage.storage") +# Rough guesstimate of the maximum reasonable life span of a batch. +BATCH_LIFETIME = 60 * 60 * 2 # 2 hours, in seconds + class StorageError(Exception): """Base class for exceptions raised from the storage backend.""" diff --git a/syncstorage/storage/memcached.py b/syncstorage/storage/memcached.py index 1f409b9b..bcf5a084 100644 --- a/syncstorage/storage/memcached.py +++ b/syncstorage/storage/memcached.py @@ -59,7 +59,8 @@ CollectionNotFoundError, ItemNotFoundError, InvalidOffsetError, - InvalidBatch) + InvalidBatch, + BATCH_LIFETIME) from pyramid.settings import aslist @@ -944,23 +945,31 @@ def del_item(self, userid, item): raise ItemNotFoundError return modified - def get_cached_batches(self, userid): - return self.cache.gets(self.get_batches_key(userid)) + def get_cached_batches(self, userid, ts=None): + if ts is None: + ts = get_timestamp() + ts = int(ts) + bdata, bcasid = self.cache.gets(self.get_batches_key(userid)) + # Remove any expired batches, but let the + # calling code write it back out to memcache. + if bdata: + for batchid, batch in bdata.items(): + if batch["created"] + BATCH_LIFETIME < ts: + del bdata[batchid] + return bdata, bcasid def create_batch(self, userid): - bdata, bcasid = self.get_cached_batches(userid) - batch = get_timestamp() - batchid = int(batch * 1000) + ts = get_timestamp() + bdata, bcasid = self.get_cached_batches(userid, ts) + batchid = int(ts * 1000) if not bdata: bdata = {} if batchid in bdata: raise ConflictError - bdata[batchid] = {"user": userid, - "modified": batch, - # FIXME Rough guesstimate of the maximum - # reasonable life span of a batch - "expires": int(batch) + 2 * 3600, - "items": []} + bdata[batchid] = { + "created": int(ts), + "items": [] + } key = self.get_batches_key(userid) if not self.cache.cas(key, bdata, bcasid): raise ConflictError @@ -969,27 +978,20 @@ def create_batch(self, userid): def valid_batch(self, userid, batch): ts = get_timestamp() batchid = str(batch) - bdata, bcasid = self.get_cached_batches(userid) - - if not bdata or batchid not in bdata: - return False - if bdata[batchid]["expires"] < ts: - self.close_batch(userid, batchid) + bdata, bcasid = self.get_cached_batches(userid, ts) + if not bdata: return False - return True + return (batchid in bdata) def append_items_to_batch(self, userid, batch, items): modified = get_timestamp() batchid = str(batch) - bdata, bcasid = self.get_cached_batches(userid) + bdata, bcasid = self.get_cached_batches(userid, modified) # Invalid, closed, or expired batch - if (not bdata or - batchid not in bdata or - bdata[batchid]["expires"] <= int(modified)): - raise InvalidBatch(batch, modified, bdata) + if not bdata or batchid not in bdata: + raise InvalidBatch(batch) bdata[batchid]["items"].extend(items) - bdata[batchid]["modified"] = modified key = self.get_batches_key(userid) if not self.cache.cas(key, bdata, bcasid): raise ConflictError @@ -998,12 +1000,10 @@ def append_items_to_batch(self, userid, batch, items): def apply_batch(self, userid, batch): modified = get_timestamp() batchid = str(batch) - bdata, bcasid = self.get_cached_batches(userid) + bdata, bcasid = self.get_cached_batches(userid, modified) # Invalid, closed, or expired batch - if (not bdata or - batchid not in bdata or - bdata[batchid]["expires"] <= int(modified)): - raise InvalidBatch(batch, modified, bdata) + if not bdata or batchid not in bdata: + raise InvalidBatch(batch) data, casid = self.get_cached_data(userid) self._set_items(userid, bdata[batchid]["items"], modified, data, casid) diff --git a/syncstorage/storage/sql/__init__.py b/syncstorage/storage/sql/__init__.py index 1151cf0e..2e62b3a4 100644 --- a/syncstorage/storage/sql/__init__.py +++ b/syncstorage/storage/sql/__init__.py @@ -30,7 +30,8 @@ ConflictError, CollectionNotFoundError, ItemNotFoundError, - InvalidOffsetError) + InvalidOffsetError, + BATCH_LIFETIME) from syncstorage.storage.sql.dbconnect import (DBConnector, MAX_TTL, BackendError) @@ -493,6 +494,10 @@ def create_batch(self, session, userid, collection): @with_session def valid_batch(self, session, userid, collection, batchid): """Checks to see if the batch ID is valid and still open""" + # Avoid hitting the db for batches that are obviously too old. + # Recall that the batchid is a millisecond timestamp. + if (batchid / 1000 + BATCH_LIFETIME) < session.timestamp: + return False collectionid = self._get_collection_id(session, collection) params = { "batch": batchid, @@ -502,6 +507,7 @@ def valid_batch(self, session, userid, collection, batchid): valid = session.query_scalar("VALID_BATCH", params=params) return valid + @metrics_timer("syncstorage.storage.sql.append_items_to_batch") @with_session def append_items_to_batch(self, session, userid, collection, batchid, items): @@ -514,7 +520,7 @@ def append_items_to_batch(self, session, userid, collection, batchid, session.insert_or_update("batch_upload_items", rows) return session.timestamp - @metrics_timer("syncstorage.storage.sql.db.execute.apply_batch") + @metrics_timer("syncstorage.storage.sql.apply_batch") @with_session def apply_batch(self, session, userid, collection, batchid): collectionid = self._get_collection_id(session, collection) @@ -692,90 +698,118 @@ def delete_item(self, session, userid, collection, item): # Administrative/maintenance methods. # def purge_expired_items(self, grace_period=0, max_per_loop=1000): - """Purges items with an expired TTL from the bso and batch_upload_items - table(s). + """Purges expired items from the bso and batch-related tables.""" + res = self._purge_expired_bsos(grace_period, max_per_loop) + num_bso_rows_purged = res["num_purged"] + is_complete = res["is_complete"] - The grace period for batch_upload_items is hard coded to 1.5 time - the current upper limit(2 hours) for a batch session's lifetime. - """ + res = self._purge_expired_batches(grace_period, max_per_loop) + num_batches_purged = res["num_purged"] + is_complete = is_complete and res["is_complete"] + + res = self._purge_expired_batch_items(grace_period, max_per_loop) + num_bui_rows_purged = res["num_purged"] + is_complete = is_complete and res["is_complete"] - table_sets = {"bso": {"replace": "bso", - "query": "PURGE_SOME_EXPIRED_ITEMS", - "total_affected": 0}, - "batch_upload_items": {"replace": "bui", - "query": "PURGE_BATCH_CONTENTS", - "total_affected": 0, - "grace_period": 3 * 60 * 60}} + return { + "num_batches_purged": num_batches_purged, + "num_bso_rows_purged": num_bso_rows_purged, + "num_bui_rows_purged": num_bui_rows_purged, + "is_complete": is_complete, + } - total_batches_purged = 0 - batches_purged = max_per_loop + 1 + def _purge_expired_bsos(self, grace_period=0, max_per_loop=1000): + """Purges BSOs with an expired TTL from the database.""" + # Get the set of all BSO tables in the database. + # This will be different depending on whether sharding is done. + if not self.dbconnector.shard: + tables = set(("bso",)) + else: + tables = set(self.dbconnector.get_bso_table(i).name + for i in xrange(self.dbconnector.shardsize)) + assert len(tables) == self.dbconnector.shardsize + # Purge each table in turn, summing rowcounts. + num_purged = 0 is_incomplete = False + for table in sorted(tables): + res = self._purge_items_loop(table, "PURGE_SOME_EXPIRED_ITEMS", { + "bso": table, + "grace": grace_period, + "maxitems": max_per_loop, + }) + num_purged += res["num_purged"] + is_incomplete = is_incomplete or not res["is_complete"] + return { + "num_purged": num_purged, + "is_complete": not is_incomplete, + } - for table_type in ("bso", "batch_upload_items"): - tabula = table_sets[table_type] - if table_type == "batch_upload_items": - # Tidy up the batch_uploads table first - if batches_purged <= max_per_loop: - with self._get_or_create_session() as session: - batches_purged = session.query("PURGE_BATCHES", { - "grace": tabula["grace_period"], - "maxitems": max_per_loop, - }) - total_batches_purged += batches_purged - - if not self.dbconnector.shard: - tables = set((table_type,)) - else: - shard_func = self.dbconnector.get_bso_table - if table_type == "batch_upload_items": - shard_func = self.dbconnector.get_batch_item_table - tables = set(shard_func(i).name - for i in xrange(self.dbconnector.shardsize)) - assert len(tables) == self.dbconnector.shardsize - # Purge each table in turn, summing rowcounts. - # We set an upper limit on the number of iterations, to avoid - # getting stuck indefinitely on a single table. - for table in sorted(tables): - logger.info("Purging expired items from %s", table) - num_iters = 1 - num_affected = 0 - with self._get_or_create_session() as session: - rowcount = session.query(tabula["query"], { - tabula["replace"]: table, - "grace": tabula.get("grace_period", grace_period), - "maxitems": max_per_loop, - }) - while rowcount > 0: - num_affected += rowcount - logger.debug("After %d iterations, %s items purged", - num_iters, num_affected) - num_iters += 1 - if num_iters > 100: - logger.debug("Too many iterations on %s, bailing outXS" - % table) - is_incomplete = True - break - with self._get_or_create_session() as session: - rowcount = session.query(tabula["query"], { - tabula["replace"]: table, - "grace": tabula.get("grace_period", grace_period), - "maxitems": max_per_loop, - }) - logger.info("Purged %d expired items from %s", - num_affected, table) - tabula["total_affected"] += num_affected - - logger.info("Purged %d expired batches from " - "batch_uploads" % total_batches_purged) - - # Return the required data to the caller. + def _purge_expired_batches(self, grace_period=0, max_per_loop=1000): + return self._purge_items_loop("batch_uploads", "PURGE_BATCHES", { + "lifetime": BATCH_LIFETIME, + "grace": grace_period, + "maxitems": max_per_loop, + }) + + def _purge_expired_batch_items(self, grace_period=0, max_per_loop=1000): + # Get the set of all BUI tables in the database. + # This will be different depending on whether sharding is done. + if not self.dbconnector.shard: + tables = set(("batch_upload_items",)) + else: + tables = set(self.dbconnector.get_batch_item_table(i).name + for i in xrange(self.dbconnector.shardsize)) + assert len(tables) == self.dbconnector.shardsize + # Purge each table in turn, summing rowcounts. + num_purged = 0 + is_incomplete = False + for table in sorted(tables): + res = self._purge_items_loop(table, "PURGE_BATCH_CONTENTS", { + "bui": table, + "lifetime": BATCH_LIFETIME, + "grace": grace_period, + "maxitems": max_per_loop, + }) + num_purged += res["num_purged"] + is_incomplete = is_incomplete or not res["is_complete"] + return { + "num_purged": num_purged, + "is_complete": not is_incomplete, + } + + def _purge_items_loop(self, table, query, params): + """Helper function to incrementally purge items in a loop.""" + # Purge some items, a few at a time, in a loop. + # We set an upper limit on the number of iterations, to avoid + # getting stuck indefinitely on a single table. + logger.info("Purging expired items from %s", table) + MAX_ITERS = 100 + num_iters = 1 + num_purged = 0 + is_incomplete = False + # Note that we take a new session for each run of the query. + # This avoids holdig open a long-running transaction, so + # the incrementality can let other jobs run properly. + with self._get_or_create_session() as session: + rowcount = session.query(query, params) + while rowcount > 0: + num_purged += rowcount + logger.debug("After %d iterations, %s items purged", + num_iters, num_purged) + num_iters += 1 + if num_iters > MAX_ITERS: + logger.debug("Too many iterations, bailing out.") + is_incomplete = True + break + with self._get_or_create_session() as session: + rowcount = session.query(query, params) + logger.info("Purged %d expired items from %s", num_purged, table) # We use "is_incomplete" rather than "is_complete" in the code above # because we expect that, most of the time, the purge will complete. # So it's more efficient to flag the case when it doesn't. + # But the caller really wants to know is_complete. return { - "batches_purged": total_batches_purged, - "num_bso_rows_purged": table_sets["bso"]["total_affected"], - "num_bui_rows_purged": table_sets["batch_upload_items"]["total_affected"], # noqa + "num_purged": num_purged, "is_complete": not is_incomplete, } diff --git a/syncstorage/storage/sql/queries_generic.py b/syncstorage/storage/sql/queries_generic.py index e81a9585..57ba10ef 100644 --- a/syncstorage/storage/sql/queries_generic.py +++ b/syncstorage/storage/sql/queries_generic.py @@ -251,14 +251,19 @@ def FIND_ITEMS(bso, params): # The idea is to delete them in small batches to keep overhead low. # Unfortunately there's no generic way to achieve this in SQL so the default # case winds up deleting all expired items. There is a MySQL-specific -# version using DELETE LIMIT 1000. -PURGE_SOME_EXPIRED_ITEMS = "DELETE FROM %(bso)s "\ - "WHERE ttl < (UNIX_TIMESTAMP() - :grace) " \ - "LIMIT :maxitems" +# version using DELETE LIMIT 1000. -PURGE_BATCHES = "DELETE FROM batch_uploads WHERE batch < " \ - " (UNIX_TIMESTAMP() - :grace) * 1000 LIMIT :maxitems" +PURGE_SOME_EXPIRED_ITEMS = """ + DELETE FROM %(bso)s + WHERE ttl < (UNIX_TIMESTAMP() - :grace) +""" + +PURGE_BATCHES = """ + DELETE FROM batch_uploads + WHERE batch < (UNIX_TIMESTAMP() - :lifetime - :grace) * 1000 +""" -PURGE_BATCH_CONTENTS = "DELETE FROM %(bui)s " \ - "WHERE batch < (UNIX_TIMESTAMP() - :grace) * 1000" \ - "LIMIT :maxitems" +PURGE_BATCH_CONTENTS = """ + DELETE FROM %(bui)s + WHERE batch < (UNIX_TIMESTAMP() - :lifetime - :grace) * 1000 +""" diff --git a/syncstorage/storage/sql/queries_mysql.py b/syncstorage/storage/sql/queries_mysql.py index 1e2c1c0d..19ad7815 100644 --- a/syncstorage/storage/sql/queries_mysql.py +++ b/syncstorage/storage/sql/queries_mysql.py @@ -10,13 +10,23 @@ # MySQL's non-standard DELETE ORDER BY LIMIT is incredibly useful here. -PURGE_SOME_EXPIRED_ITEMS = "DELETE FROM %(bso)s "\ - "WHERE ttl < (UNIX_TIMESTAMP() - :grace) " \ - "ORDER BY ttl LIMIT :maxitems" +PURGE_SOME_EXPIRED_ITEMS = """ + DELETE FROM %(bso)s + WHERE ttl < (UNIX_TIMESTAMP() - :grace) + ORDER BY ttl LIMIT :maxitems +""" + +PURGE_BATCHES = """ + DELETE FROM batch_uploads + WHERE batch < (UNIX_TIMESTAMP() - :lifetime - :grace) * 1000 + ORDER BY batch LIMIT :maxitems +""" -PURGE_BATCH_CONTENTS = "DELETE FROM %(bui)s " \ - "WHERE batch < (UNIX_TIMESTAMP() - :grace) * 1000 " \ - "ORDER BY batch LIMIT :maxitems" +PURGE_BATCH_CONTENTS = """ + DELETE FROM %(bui)s + WHERE batch < (UNIX_TIMESTAMP() - :lifetime - :grace) * 1000 + ORDER BY batch LIMIT :maxitems +""" # MySQL's non-standard ON DUPLICATE KEY UPDATE means we can # apply a batch efficiently with a single query. diff --git a/syncstorage/storage/sql/queries_postgres.py b/syncstorage/storage/sql/queries_postgres.py index d3347ec5..1be2b673 100644 --- a/syncstorage/storage/sql/queries_postgres.py +++ b/syncstorage/storage/sql/queries_postgres.py @@ -55,14 +55,23 @@ """.strip() -PURGE_SOME_EXPIRED_ITEMS = \ - "DELETE FROM %(bso)s "\ - "WHERE ttl < (EXTRACT(EPOCH FROM CURRENT_TIMESTAMP) - :grace)" +# Use correct timestamp functions for postgres. -PURGE_BATCHES = "DELETE FROM batch_uploads WHERE batch < " \ - "(SELECT EXTRACT(EPOCH FROM CURRENT_TIMSTAMP) - :grace) " \ - " * 1000" +PURGE_SOME_EXPIRED_ITEMS = """ + DELETE FROM %(bso)s + WHERE ttl < (EXTRACT(EPOCH FROM CURRENT_TIMESTAMP) - :grace) +""" + +PURGE_BATCHES = """ + DELETE FROM batch_uploads + WHERE batch < ( + SELECT EXTRACT(EPOCH FROM CURRENT_TIMSTAMP) - :lifetime - :grace + ) * 1000 +""" -PURGE_BATCH_CONTENTS = "DELETE FROM %(bui)s WHERE batch < " \ - " (SELECT EXTRACT(EPOCH FROM CURRENT_TIMSTAMP) " \ - " - :grace) * 1000" +PURGE_BATCH_CONTENTS = """ + DELETE FROM %(bui)s + WHERE batch < ( + SELECT EXTRACT(EPOCH FROM CURRENT_TIMSTAMP) - :lifetime - :grace + ) * 1000 +""" diff --git a/syncstorage/storage/sql/queries_sqlite.py b/syncstorage/storage/sql/queries_sqlite.py index c0828c9b..ca268f1e 100644 --- a/syncstorage/storage/sql/queries_sqlite.py +++ b/syncstorage/storage/sql/queries_sqlite.py @@ -22,14 +22,20 @@ # Use the correct timestamp-handling functions for sqlite. -PURGE_SOME_EXPIRED_ITEMS = "DELETE FROM %(bso)s "\ - "WHERE ttl < (strftime('%%s', 'now') - :grace) " +PURGE_SOME_EXPIRED_ITEMS = """ + DELETE FROM %(bso)s + WHERE ttl < (strftime('%%s', 'now') - :grace) +""" -PURGE_BATCHES = "DELETE FROM batch_uploads WHERE batch < " \ - " (SELECT strftime('%%s', 'now') - :grace) * 1000" +PURGE_BATCHES = """ + DELETE FROM batch_uploads + WHERE batch < (strftime('%s', 'now') - :lifetime - :grace) * 1000 +""" -PURGE_BATCH_CONTENTS = "DELETE FROM %(bui)s WHERE batch < " \ - "(SELECT strftime('%%s', 'now') - :grace) * 1000" +PURGE_BATCH_CONTENTS = """ + DELETE FROM %(bui)s + WHERE batch < (strftime('%%s', 'now') - :lifetime - :grace) * 1000 +""" # We can use INSERT OR REPLACE to apply a batch in a single query. # However, to correctly cope with with partial data udpates, we need diff --git a/syncstorage/tests/test_scripts.py b/syncstorage/tests/test_scripts.py index 276e09fe..a4881155 100644 --- a/syncstorage/tests/test_scripts.py +++ b/syncstorage/tests/test_scripts.py @@ -12,7 +12,8 @@ from syncstorage.tests.support import StorageTestCase from syncstorage.storage import (load_storage_from_settings, - NotFoundError) + NotFoundError, + BATCH_LIFETIME) try: from syncstorage.storage.memcached import MemcachedStorage # NOQA @@ -116,8 +117,10 @@ def count_items(query): total_items = 0 for i in xrange(storage.dbconnector.shardsize): with storage.dbconnector.connect() as c: - res = c.execute(query % {"bso": "bso" + str(i), - "bui": "batch_upload_items" + str(i)}) # noqa + res = c.execute(query % { + "bso": "bso" + str(i), + "bui": "batch_upload_items" + str(i) + }) total_items += res.fetchall()[0][0] return total_items @@ -129,46 +132,42 @@ def count_bui_items(): return count_items("SELECT COUNT(*) FROM %(bui)s " "/* queryName=COUNT_BUI_ITEMS /*") + def count_batches(): + query = "SELECT * FROM batch_uploads "\ + "/* queryName=PRINT_BATCHES /*" + with storage.dbconnector.connect() as c: + res = c.execute(query) + query = "SELECT COUNT(*) FROM batch_uploads "\ + "/* queryName=COUNT_BATCHES /*" + with storage.dbconnector.connect() as c: + res = c.execute(query) + return res.fetchall()[0][0] + storage.set_item(1, "col", "test1", {"payload": "X", "ttl": 0}) storage.set_item(1, "col", "test2", {"payload": "X", "ttl": 0}) storage.set_item(1, "col", "test3", {"payload": "X", "ttl": 30}) self.assertEquals(count_bso_items(), 3) - # Have to get a little creative here to insert old enough batch IDs - # Three hours plus one second to make sure it'll be wiped - batchid = int((time.time() - ((3 * 60 * 60))) * 1000) + # Have to get a little creative here to insert old enough batch IDs. + batchid = int((time.time() - BATCH_LIFETIME) * 1000) with storage.dbconnector.connect() as c: c.execute("INSERT INTO batch_uploads (batch, userid, " "collection) VALUES (:batch, :userid, :collection) " "/* queryName=purgeBatchId */", {"batch": batchid, "userid": 1, "collection": 1}) storage.append_items_to_batch(1, "col", batchid, - [{"id": "test1", "payload": "Y", - "ttl": 0}, - {"id": "test2", "payload": "Y", - "ttl": 0}, - {"id": "test3", "payload": "Y", - "ttl": 30}]) - batchid = int((time.time() + 2 - (3 * 60 * 60)) * 1000) - with storage.dbconnector.connect() as c: - c.execute("INSERT INTO batch_uploads (batch, userid, " - "collection) VALUES (:batch, :userid, :collection) " - "/* queryName=purgeBatchId */", - {"batch": batchid, "userid": 2, "collection": 1}) - storage.append_items_to_batch(2, "col", batchid, - [{"id": "test4", "payload": "A", - "ttl": 0}]) + [{"id": "test1", "payload": "Y"}, + {"id": "test2", "payload": "Y"}, + {"id": "test3", "payload": "Y"}]) batchid = storage.create_batch(3, "col") storage.append_items_to_batch(3, "col", batchid, - [{"id": "test5", "payload": "Z", - "ttl": 0}, - {"id": "test6", "payload": "Z", - "ttl": 0}, - {"id": "test7", "payload": "Z", - "ttl": 30}]) - self.assertEquals(count_bui_items(), 7) + [{"id": "test5", "payload": "Z"}, + {"id": "test6", "payload": "Z"}, + {"id": "test7", "payload": "Z"}]) + self.assertEquals(count_bui_items(), 6) + self.assertEquals(count_batches(), 2) - time.sleep(1) + time.sleep(1.1) # Long grace period == not purged ini_file = os.path.join(os.path.dirname(__file__), self.TEST_INI_FILE) @@ -179,12 +178,10 @@ def count_bui_items(): ini_file) assert proc.wait() == 0 self.assertEquals(count_bso_items(), 3) - self.assertEquals(count_bui_items(), 4) - - # Necessary for batch_upload_items purging to test reliably - time.sleep(2) + self.assertEquals(count_bui_items(), 6) + self.assertEquals(count_batches(), 2) - # Short grace period == not purged + # Short grace period == purged ini_file = os.path.join(os.path.dirname(__file__), self.TEST_INI_FILE) proc = spawn_script("purgettl.py", "--oneshot", @@ -194,3 +191,4 @@ def count_bui_items(): assert proc.wait() == 0 self.assertEquals(count_bso_items(), 1) self.assertEquals(count_bui_items(), 3) + self.assertEquals(count_batches(), 1) diff --git a/syncstorage/tests/test_wsgiapp.py b/syncstorage/tests/test_wsgiapp.py index 313ece77..bfb9404d 100644 --- a/syncstorage/tests/test_wsgiapp.py +++ b/syncstorage/tests/test_wsgiapp.py @@ -65,3 +65,43 @@ def new_do_request(req, *args, **kwds): break else: assert False, "metrics were not collected" + + def test_metrics_capture_for_batch_uploads(self): + app = TestApp(self.config.make_wsgi_app()) + + # Monkey-patch the app to make legitimate hawk-signed requests. + user_id = 42 + auth_policy = self.config.registry.getUtility(IAuthenticationPolicy) + req = Request.blank("http://localhost/") + auth_token, auth_secret = auth_policy.encode_hawk_id(req, user_id) + + def new_do_request(req, *args, **kwds): + hawkauthlib.sign_request(req, auth_token, auth_secret) + return orig_do_request(req, *args, **kwds) + + orig_do_request = app.do_request + app.do_request = new_do_request + + collection = "/1.5/42/storage/col1" + + with testfixtures.LogCapture() as logs: + bso = {"id": "1", "payload": "x"} + res = app.post_json(collection + "?batch=true", [bso]) + batch = res.json["batch"] + + for r in logs.records: + if "syncstorage.storage.sql.append_items_to_batch" in r.__dict__: + break + else: + assert False, "timer metrics were not emitted" + + with testfixtures.LogCapture() as logs: + endpoint = collection + "?batch={0}&commit=true".format(batch) + app.post_json(endpoint, []) + + # DB timing metrics should have been generated in a log message. + for r in logs.records: + if "syncstorage.storage.sql.apply_batch" in r.__dict__: + break + else: + assert False, "timer metrics were not emitted" diff --git a/syncstorage/tests/tests-hostname.ini b/syncstorage/tests/tests-hostname.ini index 23660206..6b0bcf0c 100644 --- a/syncstorage/tests/tests-hostname.ini +++ b/syncstorage/tests/tests-hostname.ini @@ -3,6 +3,7 @@ backend = syncstorage.storage.sql.SQLStorage sqluri = sqlite:///:memory: quota_size = 5242880 create_tables = true +batch_upload_enabled = true [host:some-test-host] storage.sqluri = sqlite:////tmp/some-test-host-${MOZSVC_UUID}.db diff --git a/syncstorage/views/__init__.py b/syncstorage/views/__init__.py index e33972b3..f554673f 100644 --- a/syncstorage/views/__init__.py +++ b/syncstorage/views/__init__.py @@ -385,8 +385,6 @@ def post_collection_batch(request): try: batch = storage.create_batch(userid, collection) except ConflictError, e: - # ConflictError here means a client is spamming requests, - # I think. logger.error('Collision in batch creation!') logger.error(e) raise