Skip to content

Commit

Permalink
Move FS updates to their own process
Browse files Browse the repository at this point in the history
This removes fileserver updating from the maintenance process and puts
them in a dedicated process. It also makes a couple changes reverting
some unicode changes that I made a couple months ago. A class' name is
never unicode in PY2.
  • Loading branch information
terminalmage committed Jan 10, 2018
1 parent 16096bf commit 958c99b
Showing 1 changed file with 154 additions and 15 deletions.
169 changes: 154 additions & 15 deletions salt/master.py
Expand Up @@ -18,6 +18,7 @@
import logging
import collections
import multiprocessing
import threading
import salt.serializers.msgpack
import threading

Expand Down Expand Up @@ -79,10 +80,11 @@
import salt.utils.platform
import salt.utils.process
import salt.utils.schedule
import salt.utils.ssdp
import salt.utils.user
import salt.utils.verify
import salt.utils.zeromq
import salt.utils.ssdp
from salt.config import DEFAULT_INTERVAL
from salt.defaults import DEFAULT_TARGET_DELIM
from salt.config import DEFAULT_MASTER_OPTS
from salt.exceptions import FileserverConfigError
Expand All @@ -91,6 +93,7 @@
enable_sigusr1_handler, enable_sigusr2_handler, inspect_stack
)
from salt.utils.event import tagify
from salt.utils.odict import OrderedDict

try:
import resource
Expand Down Expand Up @@ -188,9 +191,6 @@ def _post_fork_init(self):
in the parent process, then once the fork happens you'll start getting
errors like "WARNING: Mixing fork() and threads detected; memory leaked."
'''
# Avoid circular import
import salt.fileserver
self.fileserver = salt.fileserver.Fileserver(self.opts)
# Load Runners
ropts = dict(self.opts)
ropts['quiet'] = True
Expand Down Expand Up @@ -227,15 +227,13 @@ def run(self):
This is where any data that needs to be cleanly maintained from the
master is maintained.
'''
salt.utils.process.appendproctitle('Maintenance')
salt.utils.process.appendproctitle(self.__class__.__name__)

# init things that need to be done after the process is forked
self._post_fork_init()

# Make Start Times
last = int(time.time())
# Clean out the fileserver backend cache
salt.daemons.masterapi.clean_fsbackend(self.opts)

old_present = set()
while True:
Expand All @@ -249,7 +247,6 @@ def run(self):
self.handle_key_cache()
self.handle_presence(old_present)
self.handle_key_rotate(now)
salt.daemons.masterapi.fileserver_update(self.fileserver)
salt.utils.verify.check_max_open_files(self.opts)
last = now
time.sleep(self.loop_interval)
Expand Down Expand Up @@ -360,6 +357,144 @@ def handle_presence(self, old_present):
old_present.update(present)


class FileserverUpdate(salt.utils.process.SignalHandlingMultiprocessingProcess):
'''
A process from which to update any dynamic fileserver backends
'''
def __init__(self, opts, log_queue=None):
super(FileserverUpdate, self).__init__(log_queue=log_queue)
self.opts = opts
self.update_threads = {}
# Avoid circular import
import salt.fileserver
self.fileserver = salt.fileserver.Fileserver(self.opts)
self.fill_buckets()

# __setstate__ and __getstate__ are only used on Windows.
# We do this so that __init__ will be invoked on Windows in the child
# process so that a register_after_fork() equivalent will work on Windows.
def __setstate__(self, state):
self._is_child = True
self.__init__(state['opts'], log_queue=state['log_queue'])

def __getstate__(self):
return {'opts': self.opts,
'log_queue': self.log_queue}

def fill_buckets(self):
'''
Get the configured backends and the intervals for any backend which
supports them, and set up the update "buckets". There will be one
bucket for each thing being updated at a given interval.
'''
update_intervals = self.fileserver.update_intervals()
self.buckets = {}
for backend in self.fileserver.backends():
fstr = '{0}.update'.format(backend)
try:
update_func = self.fileserver.servers[fstr]
except KeyError:
log.debug(
'No update function for the %s filserver backend',
backend
)
continue
if backend in update_intervals:
# Variable intervals are supported for this backend
for id_, interval in six.iteritems(update_intervals[backend]):
if not interval:
# Don't allow an interval of 0
interval = DEFAULT_INTERVAL
log.debug(
'An update_interval of 0 is not supported, '
'falling back to %s', interval
)
i_ptr = self.buckets.setdefault(interval, OrderedDict())
# Backend doesn't technically need to be present in the
# key, all we *really* need is the function reference, but
# having it there makes it easier to provide meaningful
# debug logging in the update threads.
i_ptr.setdefault((backend, update_func), []).append(id_)
else:
# Variable intervals are not supported for this backend, so
# fall back to the global interval for that fileserver. Since
# this backend doesn't support variable updates, we have
# nothing to pass to the backend's update func, so we'll just
# set the value to None.
try:
interval_key = '{0}_update_interval'.format(backend)
interval = self.opts[interval_key]
except KeyError:
interval = DEFAULT_INTERVAL
log.error(
'%s key missing from master configuration. This is '
'a bug, please report it. Falling back to default '
'interval of %d seconds', interval_key, interval
)
self.buckets.setdefault(
interval, OrderedDict())[(backend, update_func)] = None

def update_fileserver(self, interval, backends):
'''
Threading target which handles all updates for a given wait interval
'''
def _do_update():
log.debug(
'Performing fileserver updates for items with an update '
'interval of %d', interval
)
for backend, update_args in six.iteritems(backends):
backend_name, update_func = backend
try:
if update_args:
log.debug(
'Updating %s fileserver cache for the following '
'targets: %s', backend_name, update_args
)
args = (update_args,)
else:
log.debug('Updating %s fileserver cache', backend_name)
args = ()

update_func(*args)
except Exception as exc:
log.exception(
'Uncaught exception while updating %s fileserver '
'cache', backend_name
)

log.debug(
'Completed fileserver updates for items with an update '
'interval of %d, waiting %d seconds', interval, interval
)

condition = threading.Condition()
_do_update()
while True:
with condition:
condition.wait(interval)
_do_update()

def run(self):
'''
Start the update threads
'''
salt.utils.process.appendproctitle(self.__class__.__name__)
# Clean out the fileserver backend cache
salt.daemons.masterapi.clean_fsbackend(self.opts)

for interval in self.buckets:
self.update_threads[interval] = threading.Thread(
target=self.update_fileserver,
args=(interval, self.buckets[interval]),
)
self.update_threads[interval].start()

# Keep the process alive
while True:
time.sleep(60)


class Master(SMaster):
'''
The salt master server
Expand Down Expand Up @@ -609,16 +744,20 @@ def start(self):
name='ReqServer')

# Fire up SSDP discovery publisher
if self.opts[u'discovery']:
if self.opts['discovery']:
if salt.utils.ssdp.SSDPDiscoveryServer.is_available():
self.process_manager.add_process(salt.utils.ssdp.SSDPDiscoveryServer(
port=self.opts[u'discovery'].get(u'port', DEFAULT_MASTER_OPTS[u'discovery'][u'port']),
listen_ip=self.opts[u'interface'],
answer={u'mapping': self.opts[u'discovery'].get(u'mapping', {})}).run)
port=self.opts['discovery'].get('port', DEFAULT_MASTER_OPTS['discovery']['port']),
listen_ip=self.opts['interface'],
answer={'mapping': self.opts['discovery'].get('mapping', {})}).run)
else:
log.error(u'Unable to load SSDP: asynchronous IO is not available.')
log.error('Unable to load SSDP: asynchronous IO is not available.')
if sys.version_info.major == 2:
log.error(u'You are using Python 2, please install "trollius" module to enable SSDP discovery.')
log.error('You are using Python 2, please install "trollius" module to enable SSDP discovery.')

self.process_manager.add_process(
FileserverUpdate,
args=(self.opts,))

# Install the SIGINT/SIGTERM handlers if not done so far
if signal.getsignal(signal.SIGINT) is signal.SIG_DFL:
Expand Down Expand Up @@ -1824,7 +1963,7 @@ def wheel(self, clear_load):
jid = salt.utils.jid.gen_jid(self.opts)
fun = clear_load.pop('fun')
tag = tagify(jid, prefix='wheel')
data = {'fun': u"wheel.{0}".format(fun),
data = {'fun': "wheel.{0}".format(fun),
'jid': jid,
'tag': tag,
'user': username}
Expand Down

0 comments on commit 958c99b

Please sign in to comment.