Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

cluster: fix plugin pickling bug

The recent plugins kwarg refactored the code to use a list of plugin
objects rather than plugin settings from the config. The plugin objects
were then being serialized and stored in each node's userdata when
adding nodes.

This worked, however, if the plugin stored anything that's not
serializable after its first run, such as the list of nodes, this leads
to errors when later adding nodes in the same process. The loadbalancer
in particular is affected by this bug.

Fixed this by storing the __init__ kwargs and fully qualified class path
used to create the plugin in a __plugin_metadata__ attribute which gets
dynamically created via ClusterSetup.__new__. Updated Cluster class to
serialize a list of __plugin_metadata__ attributes (stored in
Cluster._plugins) rather than the plugin object list itself (stored in
Cluster.plugins). Updated Node.get_plugins to iterate through the
__plugin_metadata__ attributes and recreate the plugin objects.

closes gh-165
  • Loading branch information...
commit 1d5726cf156437fa5e3b08a4bf325314e6e7e1d4 1 parent 24ca7f8
@jtriley authored
View
7 starcluster/cluster.py
@@ -385,7 +385,6 @@ def __init__(self,
self._zone = None
self._master = None
self._nodes = []
- self._plugins = plugins
self._pool = None
self._progress_bar = None
@@ -428,6 +427,10 @@ def _get_cluster_zone(self):
zone = self.ec2.get_zone(common_zone)
return zone
+ @property
+ def _plugins(self):
+ return [p.__plugin_metadata__ for p in self.plugins]
+
def load_plugins(self, plugins):
if plugins and isinstance(plugins[0], dict):
warnings.warn("In a future release the plugins kwarg for Cluster "
@@ -713,7 +716,7 @@ def create_node(self, alias, image_id=None, instance_type=None, zone=None,
def _get_cluster_userdata(self, aliases):
alias_file = utils.string_to_file('\n'.join(['#ignored'] + aliases),
static.UD_ALIASES_FNAME)
- plugins = utils.dump_compress_encode(self.plugins)
+ plugins = utils.dump_compress_encode(self._plugins)
plugins_file = utils.string_to_file('\n'.join(['#ignored', plugins]),
static.UD_PLUGINS_FNAME)
udfiles = [alias_file, plugins_file]
View
13 starcluster/clustersetup.py
@@ -3,6 +3,7 @@
"""
import posixpath
+from starcluster import utils
from starcluster import threadpool
from starcluster.utils import print_timing
from starcluster.logger import log
@@ -52,6 +53,18 @@ def run(self, nodes, master, user, user_shell, volumes):
"""
raise NotImplementedError('run method not implemented')
+ def __new__(typ, *args, **kwargs):
+ """
+ DO NOT OVERRIDE!
+
+ This is an internal method used for plugin accounting.
+ Do not override! If you *must* don't forget to call super!
+ """
+ plugin = super(ClusterSetup, typ).__new__(typ)
+ plugin_class_name = utils.get_fq_class_name(plugin)
+ plugin.__plugin_metadata__ = (plugin_class_name, args, kwargs)
+ return plugin
+
class DefaultClusterSetup(ClusterSetup):
"""
View
18 starcluster/node.py
@@ -131,7 +131,23 @@ def alias(self):
def get_plugins(self):
plugstxt = self.user_data.get(static.UD_PLUGINS_FNAME)
payload = plugstxt.split('\n', 2)[2]
- return utils.decode_uncompress_load(payload)
+ plugins_metadata = utils.decode_uncompress_load(payload)
+ plugs = []
+ for klass, args, kwargs in plugins_metadata:
+ mod_path, klass_name = klass.rsplit('.', 1)
+ try:
+ mod = __import__(mod_path, fromlist=[klass_name])
+ except SyntaxError, e:
+ raise exception.PluginSyntaxError(
+ "Plugin %s (%s) contains a syntax error at line %s" %
+ (klass_name, e.filename, e.lineno))
+ except ImportError, e:
+ raise exception.PluginLoadError(
+ "Failed to import plugin %s: %s" %
+ (klass_name, e[0]))
+ plug = getattr(mod, klass_name)(*args, **kwargs)
+ plugs.append(plug)
+ return plugs
def _remove_all_tags(self):
tags = self.tags.keys()[:]
Please sign in to comment.
Something went wrong with that request. Please try again.