Permalink
Browse files

Improve fairness/responsiveness of background services

The "changeset" and "highlight" background services use a shared base-class
(JSONJobServer) that implements request queueing: a client connects, writes
a list of object literals ("requests") in JSON form, and then waits for a
list of results (typically copies of the requests with some more
information in them.)

Before these changes, the order in which requests were serviced was
"randomized" by storing all pending requests in a dictionary and then
popping one at a time to process them.  (The "randomization" achieved by
this scheme was not exactly intentional.)  Assuming the actual order in
which requests were serviced was actually random, this meant that on
average, all clients would finish at roughly the same time, and that time
would be determined by the client with the longest list of requests.  If
one client had a huge list of requests, the background service would be
unavailable to all clients for a long time.

This commit changes the queuing policy by keeping a queue of clients with
pending requests.  Each time a new job is to be started, a request is
picked from the client at the front of the queue, and the client is pushed
to the back of the queue.  This should achieve a situation where the time
it takes for any given client to have all requests processed is something
like nr_requests x system_load, which seems fair enough.

As a side-effect of the implementation changes, the strategy for dealing
with identical requests from multiple clients changed somewhat.  The base-
class used to group together such requests and run only one job.  Now, it
first allows the sub-class to intercept a request before it's processed
(which allows the sub-class to check if the request is actually a no-op)
and second assumes that processing a request is always okay, even though
an identical request was processed while the first was queued.

The syntax highlight service intercepts requests, since the check is very
cheap (checking if a file exists.)  The changeset service doesn't, since
it needs a database connection to check if a changeset already exists, and
only connects to the database in the slave processes today.  Changesets
are fewer and bigger, though, so the overhead of starting a slave process
only to discover the changeset is already in the database is insignificant.
  • Loading branch information...
1 parent 4ccd7bc commit 379538edcad6c7da1da0bdbaf27cde64a4a676f8 @jensl committed Dec 5, 2012
Showing with 165 additions and 97 deletions.
  1. +7 −0 background/highlight.py
  2. +113 −79 background/utils.py
  3. +45 −18 changeset/create.py
View
@@ -35,6 +35,7 @@
sys.stdout.write(json_encode(request))
else:
from background.utils import JSONJobServer
+ from syntaxhighlight import isHighlighted
from syntaxhighlight.context import importCodeContexts
import configuration
@@ -52,6 +53,12 @@ def __init__(self):
hour, minute = service["compact_at"]
self.register_maintenance(hour=hour, minute=minute, callback=self.__compact)
+ def request_result(self, request):
+ if isHighlighted(request["sha1"], request["language"]):
+ result = request.copy()
+ result["highlighted"] = True
+ return result
+
def request_started(self, job, request):
super(HighlightServer, self).request_started(job, request)
View
@@ -320,70 +320,75 @@ def __destroy_listening_socket(self):
try: os.unlink(self.__address)
except: pass
- def __run(self):
+ def run(self):
try:
- self.startup()
-
- while not self.terminated:
- self.interrupted = False
-
- if self.restart_requested:
- if not self.__peers: break
- else: self.debug("restart delayed; have %d peers" % len(self.__peers))
-
- poll = select.poll()
- poll.register(self.__listening_socket, select.POLLIN)
-
- for peer in self.__peers:
- if peer.writing(): poll.register(peer.writing(), select.POLLOUT)
- if peer.reading(): poll.register(peer.reading(), select.POLLIN)
-
- def fileno(file):
- if file: return file.fileno()
- else: return None
+ try:
+ self.startup()
while not self.terminated:
- timeout = self.run_maintenance()
-
- if not (timeout is None or self.__peers):
- self.debug("next maintenance task check scheduled in %d seconds" % timeout)
-
- try:
- events = poll.poll(timeout * 1000 if timeout else None)
- break
- except select.error, error:
- if error[0] == errno.EINTR: continue
- else: raise
-
- if self.terminated: break
-
- def catch_error(fn):
- try: fn()
- except socket.error, error:
- if error[0] not in (errno.EAGAIN, errno.EINTR): raise
- except OSError, error:
- if error.errno not in (errno.EAGAIN, errno.EINTR): raise
-
- for fd, event in events:
- if fd == self.__listening_socket.fileno():
- peersocket, peeraddress = self.__listening_socket.accept()
- peer = self.handle_peer(peersocket, peeraddress)
- if peer: self.__peers.append(peer)
+ self.interrupted = False
+
+ if self.restart_requested:
+ if not self.__peers: break
+ else: self.debug("restart delayed; have %d peers" % len(self.__peers))
+
+ poll = select.poll()
+ poll.register(self.__listening_socket, select.POLLIN)
+
+ for peer in self.__peers:
+ if peer.writing(): poll.register(peer.writing(), select.POLLOUT)
+ if peer.reading(): poll.register(peer.reading(), select.POLLIN)
+
+ def fileno(file):
+ if file: return file.fileno()
+ else: return None
+
+ while not self.terminated:
+ timeout = self.run_maintenance()
+
+ if not (timeout is None or self.__peers):
+ self.debug("next maintenance task check scheduled in %d seconds" % timeout)
+
+ try:
+ events = poll.poll(timeout * 1000 if timeout else None)
+ break
+ except select.error, error:
+ if error[0] == errno.EINTR: continue
+ else: raise
+
+ if self.terminated: break
+
+ def catch_error(fn):
+ try: fn()
+ except socket.error, error:
+ if error[0] not in (errno.EAGAIN, errno.EINTR): raise
+ except OSError, error:
+ if error.errno not in (errno.EAGAIN, errno.EINTR): raise
+
+ for fd, event in events:
+ if fd == self.__listening_socket.fileno():
+ peersocket, peeraddress = self.__listening_socket.accept()
+ peer = self.handle_peer(peersocket, peeraddress)
+ if peer: self.__peers.append(peer)
+ else:
+ try: peersocket.close()
+ except: pass
else:
- try: peersocket.close()
- except: pass
- else:
- for peer in self.__peers[:]:
- if fd == fileno(peer.writing()) and event != select.POLLIN:
- catch_error(peer.do_write)
- if fd == fileno(peer.reading()) and event != select.POLLOUT:
- catch_error(peer.do_read)
- if peer.is_finished():
- peer.destroy()
- self.peer_destroyed(peer)
- self.__peers.remove(peer)
-
- self.info("service shutting down ...")
+ for peer in self.__peers[:]:
+ if fd == fileno(peer.writing()) and event != select.POLLIN:
+ catch_error(peer.do_write)
+ if fd == fileno(peer.reading()) and event != select.POLLOUT:
+ catch_error(peer.do_read)
+ if peer.is_finished():
+ peer.destroy()
+ self.peer_destroyed(peer)
+ self.__peers.remove(peer)
+ except:
+ self.exception()
+ self.error("service crashed!")
+ sys.exit(1)
+ else:
+ self.info("service shutting down ...")
finally:
try: self.shutdown()
except: self.exception()
@@ -397,10 +402,6 @@ def catch_error(fn):
def add_peer(self, peer):
self.__peers.append(peer)
- def run(self):
- try: self.__run()
- except: self.exception()
-
def handle_peer(self, peersocket, peeraddress):
pass
@@ -438,9 +439,9 @@ def handle_peer(self, peersocket, peeraddress):
class JSONJobServer(PeerServer):
class Job(PeerServer.ChildProcess):
- def __init__(self, server, clients, request):
+ def __init__(self, server, client, request):
super(JSONJobServer.Job, self).__init__(server, [sys.executable, sys.argv[0], "--json-job"], stderr=subprocess.STDOUT)
- self.clients = clients
+ self.clients = [client]
self.request = request
self.write(json_encode(request))
self.close()
@@ -459,12 +460,19 @@ def handle_input(self, value):
decoded = json_decode(value)
if isinstance(decoded, list):
self.__requests = decoded
+ self.__pending_requests = map(freeze, decoded)
self.__results = []
- self.server.add_requests(self, self.__requests)
+ self.server.add_requests(self)
else:
assert isinstance(decoded, dict)
self.server.execute_command(self, decoded)
+ def has_requests(self):
+ return bool(self.__pending_requests)
+
+ def get_request(self):
+ return self.__pending_requests.pop()
+
def add_result(self, result):
self.__results.append(result)
if len(self.__results) == len(self.__requests):
@@ -473,25 +481,49 @@ def add_result(self, result):
def __init__(self, service):
super(JSONJobServer, self).__init__(service)
- self.__queued_requests = {}
+ self.__clients_with_requests = []
self.__started_requests = {}
self.__max_jobs = service.get("max_jobs", 4)
def __startJobs(self):
- while self.__queued_requests and len(self.__started_requests) < self.__max_jobs:
- frozen, clients = self.__queued_requests.popitem()
- request = thaw(frozen)
- job = JSONJobServer.Job(self, clients, request)
- self.add_peer(job)
- self.request_started(job, request)
+ # Repeat "start a job" while there are jobs to start and we haven't
+ # reached the limit on number of concurrent jobs to run.
+ while self.__clients_with_requests and len(self.__started_requests) < self.__max_jobs:
+ # Fetch next request from first client in list of clients with
+ # pending requests.
+ client = self.__clients_with_requests.pop(0)
+ frozen = client.get_request()
+
+ if client.has_requests():
+ # Client has more pending requests, so put it back at the end of
+ # the list of clients with pending requests.
+ self.__clients_with_requests.append(client)
- def add_requests(self, client, requests):
- for request in requests:
- frozen = freeze(request)
if frozen in self.__started_requests:
- self.__started_requests[frozen].clients.add(client)
+ # Another client has requested the same thing, piggy-back on
+ # that job instead of starting another.
+ self.__started_requests[frozen].clients.append(client)
+ continue
+
+ request = thaw(frozen)
+
+ # Check if this request is already finished. Default implementation
+ # of this callback always returns None.
+ result = self.request_result(request)
+
+ if result:
+ # Request is already finished; don't bother starting a child
+ # process, just report result directly to the client.
+ client.add_result(result)
else:
- self.__queued_requests.setdefault(frozen, set()).add(client)
+ # Start child process.
+ job = JSONJobServer.Job(self, client, request)
+ self.add_peer(job)
+ self.request_started(job, request)
+
+ def add_requests(self, client):
+ assert client.has_requests()
+ self.__clients_with_requests.append(client)
self.__startJobs()
def execute_command(self, client, command):
@@ -504,6 +536,8 @@ def handle_peer(self, peersocket, peeraddress):
def peer_destroyed(self, peer):
if isinstance(peer, JSONJobServer.Job): self.__startJobs()
+ def request_result(self, request):
+ pass
def request_started(self, job, request):
self.__started_requests[freeze(request)] = job
def request_finished(self, job, request, result):
View
@@ -74,27 +74,54 @@ def insertChangeset(db, parent, child, files):
return changeset_id
- if changeset_type == "merge":
- child = gitutils.Commit.fromSHA1(db, repository, request["child_sha1"])
- changes = diff.merge.parseMergeDifferences(db, repository, child)
+ changeset_ids = request["changeset_ids"] = {}
+
+ child = gitutils.Commit.fromSHA1(db, repository, request["child_sha1"])
+
+ cursor = db.cursor()
+
+ if "parent_sha1" in request:
+ assert changeset_type in ("custom", "conflicts")
+
+ parent_sha1 = request["parent_sha1"]
+ parent = gitutils.Commit.fromSHA1(db, repository, parent_sha1)
+
+ cursor.execute("""SELECT id, %s
+ FROM changesets
+ WHERE type=%s
+ AND parent=%s
+ AND child=%s""",
+ (parent_sha1, changeset_type, parent.getId(db), child.getId(db)))
else:
- if changeset_type == "direct":
- child = gitutils.Commit.fromSHA1(db, repository, request["child_sha1"])
+ assert changeset_type in ("direct", "merge")
+
+ cursor.execute("""SELECT changesets.id, commits.sha1
+ FROM changesets
+ LEFT OUTER JOIN commits ON (commits.id=changesets.parent)
+ WHERE type=%s
+ AND child=%s""",
+ (changeset_type, child.getId(db)))
+
+ rows = cursor.fetchall()
+
+ if rows:
+ # Changeset(s) already exists in database.
+
+ for changeset_id, parent_sha1 in rows:
+ changeset_ids[parent_sha1] = changeset_id
+ else:
+ # Parse diff and insert changeset(s) into the database.
+
+ if changeset_type == "merge":
+ changes = diff.merge.parseMergeDifferences(db, repository, child)
+ elif changeset_type == "direct":
changes = diff.parse.parseDifferences(repository, commit=child)
- elif changeset_type == "custom":
- parent = gitutils.Commit.fromSHA1(db, repository, request["parent_sha1"])
- child = gitutils.Commit.fromSHA1(db, repository, request["child_sha1"])
- changes = diff.parse.parseDifferences(repository, from_commit=parent, to_commit=child)
else:
- parent = gitutils.Commit.fromSHA1(db, repository, request["parent_sha1"])
- child = gitutils.Commit.fromSHA1(db, repository, request["child_sha1"])
changes = diff.parse.parseDifferences(repository, from_commit=parent, to_commit=child)
- changeset_ids = request["changeset_ids"] = {}
-
- for parent_sha1, files in changes.items():
- if parent_sha1 is None: parent = None
- else: parent = gitutils.Commit.fromSHA1(db, repository, parent_sha1)
- changeset_ids[parent_sha1] = insertChangeset(db, parent, child, files)
+ for parent_sha1, files in changes.items():
+ if parent_sha1 is None: parent = None
+ else: parent = gitutils.Commit.fromSHA1(db, repository, parent_sha1)
+ changeset_ids[parent_sha1] = insertChangeset(db, parent, child, files)
- db.commit()
+ db.commit()

0 comments on commit 379538e

Please sign in to comment.