Skip to content

Commit

Permalink
backoff when conflicting, don't double-parse docs
Browse files Browse the repository at this point in the history
  • Loading branch information
danielrichman committed Aug 26, 2017
1 parent 63c6e5b commit 12adf09
Showing 1 changed file with 19 additions and 5 deletions.
24 changes: 19 additions & 5 deletions habitat/parser_daemon.py
Expand Up @@ -24,6 +24,8 @@
import restkit
import copy
import statsd
import time
import random

from . import parser
from .utils import immortal_changes
Expand Down Expand Up @@ -53,6 +55,7 @@ def __init__(self, config, daemon_name="parserdaemon"):
self.couch_server = couchdbkit.Server(config["couch_uri"])
self.db = self.couch_server[config["couch_db"]]
self.last_seq = self.db.info()["update_seq"]
self.last_id = None

self.parser = parser.Parser(config)

Expand All @@ -71,12 +74,20 @@ def _couch_callback(self, result):
to Parser.parse, then saves the result.
"""
self.last_seq = result['seq']
doc = self.parser.parse(result['doc'])
doc = result['doc']

if self.last_id == doc["_id"]:
logger.debug("Destuttering: ignoring change for id {0}, since we "
"just processed it".format(self.last_id))
return

self.last_id = doc["_id"]
doc = self.parser.parse(doc)
if doc:
self._save_updated_doc(doc)

@statsd.StatsdTimer.wrap('parser_daemon.save_time')
def _save_updated_doc(self, doc, attempts=0):
def _save_updated_doc(self, doc, attempts=1):
"""
Save doc to the database, retrying with a merge in the event of
resource conflicts. This should definitely be a method of some Telem
Expand All @@ -86,7 +97,8 @@ class thing.
latest['data'].update(doc['data'])
try:
self.db.save_doc(latest)
logger.debug("Saved doc {0} successfully".format(doc["_id"]))
logger.debug("Saved doc {0} successfully after {1} attempts" \
.format(doc["_id"], attempts))
statsd.increment("parser_daemon.saved")
except couchdbkit.exceptions.ResourceConflict:
attempts += 1
Expand All @@ -97,8 +109,10 @@ class thing.
statsd.increment("parser_daemon.save_error")
raise RuntimeError(err)
else:
logger.debug("Save conflict, trying again (#{0})" \
.format(attempts))
delay = random.uniform(0.01, 0.1)
logger.debug("Save conflict (doc {0}, attempt #{1}, delay {2}s)" \
.format(doc["_id"], attempts, delay))
time.sleep(delay)
statsd.increment("parser_daemon.save_conflict")
self._save_updated_doc(doc, attempts)
except restkit.errors.Unauthorized as e:
Expand Down

0 comments on commit 12adf09

Please sign in to comment.