Permalink
Browse files

Add a 'taskOwner' who must recognise a task to keep it alive

The creator of a task is responsible for destroying it. To highlight
bugs where a task creator fails in their duty, we add the concept of
a task 'owner'. The owner must acknowledge their interest in active
tasks every 30s or so, otherwise the tasks are 'destroy' (i.e.
set to automatically disappear on completion). Note that 'destroy'
only removes the object *when the activity has finished*. It does
not auto-cancel.

Signed-off-by: David Scott <dave.scott@eu.citrix.com>
  • Loading branch information...
1 parent 6cfe9f6 commit 20cd4b2befdf53144453abea47d8153f2921afd9 David Scott committed Feb 11, 2014
Showing with 80 additions and 13 deletions.
  1. +67 −13 dbus/vm/python/dbus-resource-script.py
  2. +2 −0 dbus/vm/resource.xml
  3. +11 −0 dbus/vm/taskOwner.xml
@@ -10,8 +10,10 @@
import syslog
import threading
+import urlparse
import time
-import getopt
+import getopt
+import os
import sys
import subprocess
import gtk
@@ -68,6 +70,7 @@ def error(message):
next_task_id = 0
TASK_INTERFACE="org.xenserver.api.task"
+TASKOWNER_INTERFACE="org.xenserver.api.taskOwner"
class Canceller(threading.Thread):
"""A thread which attempts to cleanly shutdown a process, resorting
@@ -88,7 +91,7 @@ def run(self):
continue
sys.stdout.flush()
time.sleep(1)
- if self.process.poll() <> None:
+ if self.process.poll() == None:
info("%s: cancelling: sending SIGKILL to %d" % (self.task.path, self.process.pid))
try:
self.process.kill()
@@ -100,8 +103,55 @@ def run(self):
pass
info("%s: cancelling: process has terminated" % self.task.path)
+class TaskMonitor(threading.Thread):
+ def __init__(self, interval = 30):
+ threading.Thread.__init__(self)
+ self.owners = {}
+ self.interval = interval
+ self.daemon = True
+ self.start()
+ def add(self, task, owner_uri):
+ if owner_uri in self.owners:
+ self.owners[owner_uri].append(task)
+ else:
+ self.owners[owner_uri] = [ task ]
+ def remove(self, task, owner_uri):
+ if owner_uri in self.owners:
+ self.owners[owner_uri] = filter(lambda x:x <> task, self.owners[owner_uri])
+ if self.owners[owner_uri] == []:
+ del self.owners[owner_uri]
+ def run(self):
+ # Every <interval>, check if the owner still wants this task. If the
+ # owner doesn't recognise the task URI (eg because it crashed), then
+ # call destroy() to prevent a leak
+ while True:
+ time.sleep(self.interval)
+ bus = dbus.SessionBus()
+ for owner_uri in self.owners.keys():
+ try:
+ uri = urlparse.urlparse(owner_uri)
+ proxy = bus.get_object(uri.scheme, uri.path)
+ tasks = self.owners[owner_uri]
+ alive = proxy.ping(map(lambda x:x.uri, tasks), dbus_interface=TASKOWNER_INTERFACE)
+ for i in range(0, len(alive)):
+ if not(alive[i]):
+ task = tasks[i]
+ error("%s: owner %s has failed and orphaned this task. Deleting task now" % (task.uri, owner_uri))
+ self.remove(task, owner_uri)
+ task.destroy()
+ except Exception, e:
+ # If the owner has gone, then we delete the tasks. It's
+ # the owner's job to stay alive.
+ tasks = self.owners[owner_uri]
+ for task in tasks:
+ error("%s: owner %s could not be contacted (%s). Deleting task now." % (task.uri, owner_uri, str(e)))
+ self.remove(task, owner_uri)
+ task.destroy()
+
+taskMonitor = TaskMonitor()
+
class Task(dbus.service.Object, threading.Thread):
- def __init__(self, cmd):
+ def __init__(self, cmd, owner_uri):
threading.Thread.__init__(self)
self.bus = dbus.SessionBus()
@@ -111,14 +161,18 @@ def __init__(self, cmd):
self.task_id = next_task_id
next_task_id = next_task_id + 1
self.path = "/org/xenserver/task/" + str(self.task_id)
+ self.uri = name + "://" + self.path
self.completed = False
self.canceller = None
self.result = None
self.returncode = None
self.auto_destroy = False
+ self.owner_uri = owner_uri
+
+ taskMonitor.add(self, owner_uri)
try:
- self.process = subprocess.Popen(cmd, stdout=subprocess.PIPE)
+ self.process = subprocess.Popen(cmd, stdout=subprocess.PIPE, shell=False)
except Exception, e:
error("%s: %s" % (" ".join(cmd), str(e)))
raise dbus.exceptions.DBusException(
@@ -162,12 +216,12 @@ def cancel(self):
@dbus.service.method(dbus_interface=TASK_INTERFACE)
def destroy(self):
self.auto_destroy = True
+ taskMonitor.remove(self, self.owner_uri)
+
if self.completed:
info("%s: destroy: task complete, removing object" % self.path)
self.remove_from_connection()
return
- info("%s: destroy: task is running, requesting cancel with auto-destroy" % self.path)
- self.cancel()
@dbus.service.method(dbus_interface=TASK_INTERFACE)
def getResult(self):
@@ -196,7 +250,7 @@ def GetAll(self, interface_name):
else:
raise dbus.exceptions.DBusException(
'com.example.UnknownInterface',
- 'The Foo object does not implement the %s interface'
+ 'The Task object does not implement the %s interface'
% interface_name)
RESOURCE_INTERFACE="org.xenserver.api.resource"
@@ -207,12 +261,12 @@ def __init__(self):
bus_name = dbus.service.BusName(name, bus=self.bus)
dbus.service.Object.__init__(self, bus_name, "/" + name.replace(".", "/"))
- @dbus.service.method(dbus_interface=RESOURCE_INTERFACE, in_signature="s", out_signature="o")
- def attach(self, global_uri):
- return Task([script, "attach", global_uri]).path
- @dbus.service.method(dbus_interface=RESOURCE_INTERFACE, in_signature="s", out_signature="")
- def detach(self, id):
- return Task([script, "detach", id]).path
+ @dbus.service.method(dbus_interface=RESOURCE_INTERFACE, in_signature="ss", out_signature="s")
+ def attach(self, global_uri, owner_uri):
+ return Task([script, "attach", global_uri], owner_uri).uri
+ @dbus.service.method(dbus_interface=RESOURCE_INTERFACE, in_signature="ss", out_signature="s")
+ def detach(self, id, owner_uri):
+ return Task([script, "detach", id], owner_uri).uri
gobject.threads_init()
DBusGMainLoop(set_as_default=True)
View
@@ -2,10 +2,12 @@
<interface name="org.xenserver.api.resource">
<method name="attach">
<arg name="global_uri" type="s" direction="in"/>
+ <arg name="owner_uri" type="s" direction="in"/>
<arg name="task" type="o" direction="out"/>
</method>
<method name="detach">
<arg name="id" type="s" direction="in"/>
+ <arg name="owner_uri" type="s" direction="in"/>
<arg name="task" type="o" direction="out"/>
</method>
</interface>
View
@@ -0,0 +1,11 @@
+<!DOCTYPE node PUBLIC
+"-//freedesktop//DTD D-BUS Object Introspection 1.0//EN"
+"http://www.freedesktop.org/standards/dbus/1.0/introspect.dtd">
+<node name="/" xmlns:doc="http://www.freedesktop.org/dbus/1.0/doc.dtd">
+ <interface name="org.xenserver.api.taskOwner">
+ <method name="ping">
+ <arg name="tasks" type="as" direction="in"/>
+ <arg name="alive" type="ab" direction="out"/>
+ </method>
+ </interface>
+</node>

0 comments on commit 20cd4b2

Please sign in to comment.