Skip to content

Comparing changes

Choose two branches to see what’s changed or to start a new pull request. If you need to, you can also compare across forks.

Open a pull request

Create a new pull request by comparing changes across two branches. If you need to, you can also compare across forks.
...
  • 7 commits
  • 7 files changed
  • 0 commit comments
  • 1 contributor
Commits on Mar 21, 2013
@jaredbischof Lots of changes. Mostly moving arg parsing beyond the first argument …
…and user/host checking out of MCP and into the subsystem class. Arg parsing will be done on subsets of the command at each level (main program, class, method).
3fecd74
@jaredbischof Moving part of init into subsystem base class. Other edits. 3221c07
@jaredbischof Moving part of init into subsystem base class. Other edits. b34752e
@jaredbischof Moving part of init into subsystem base class. Also, setting the log …
…levels is now done independently within each class (by the subsystem base class) rather than by the mlog class. We could add log level adjustment back into the mlog class later if we want to allow subsystems in logging that do not have a class in MCP.
f43fe61
@jaredbischof Moving part of init into subsystem base class. Other edits. 05b599b
@jaredbischof Added a bunch of stuff that was in MCP program into the base subsyste…
…m class including arg parsing (after the subsystem argument) and calling of the subsystem action with the run() method.
9cc12bc
@jaredbischof Moving part of init into subsystem base class. Fixed error in unlocki…
…ng upload page. Other edits.
2e198bc
Showing with 144 additions and 171 deletions.
  1. +21 −72 MCP
  2. +17 −18 mcp_modules/mcp_api.py
  3. +5 −10 mcp_modules/memcache.py
  4. +4 −29 mcp_modules/mlog.py
  5. +10 −17 mcp_modules/queue.py
  6. +76 −10 mcp_modules/subsystem.py
  7. +11 −15 mcp_modules/upload.py
View
93 MCP
@@ -1,13 +1,12 @@
#!/soft/packages/python/2.6/bin/python
+import argparse
import os
-import inspect
import json
-import sys, shlex, socket
-import getpass, commands, subprocess
+import sys
import re
-# Getting paths
+# Getting path to MCP
MCP_path = os.path.realpath(__file__)
matchObj = re.match(r'(^.*\/)', MCP_path)
MCP_dir = matchObj.group(0)
@@ -19,76 +18,26 @@ sys.path.insert(0, MCP_dir+"mcp_modules/")
json_conf_file = open(MCP_dir+'conf/conf.json')
json_conf = json.load(json_conf_file)
-__doc__ = """
-MG-RAST Control Program (MCP)
-
-SERVICE \tUSER@HOST\t\t\tACTIONS
-"""
-
-service_name_to_module = {}
-service_action_to_req_user = {}
-for module_name in json_conf["global"]["services"]:
- module_name = module_name.strip()
- module = __import__(module_name)
- service_name_to_module[module_name] = module
- for name, obj in inspect.getmembers(module):
- if inspect.isclass(obj) and name == module_name and hasattr(obj, 'actions'):
- userhost = ""
- if 'user' in json_conf[module_name] and 'host' in json_conf[module_name]:
- userhost = json_conf[module_name]['user'] + '@' + json_conf[module_name]['host']
- for action in obj.actions.split(','):
- service_action_to_req_user[name + " " + action.strip()] = userhost
-
- if userhost == '':
- userhost = 'ANY '
-
- __doc__ += name + " \t" + userhost + " \t" + obj.actions + "\n"
-
-def main(args):
- usage = "\nUsage: MCP service action\n" + __doc__
-
- # Checking number of inputs.
- if len(sys.argv) < 3:
- sys.stderr.write(usage)
- sys.stderr.write("\n")
- else:
- service = sys.argv[1]
- action = sys.argv[2]
-
- # Checking that user has entered a valid service-action pair.
- service_action = service + " " + action
- if service_action not in service_action_to_req_user:
- sys.stderr.write("\nERROR: service-action '" + service_action + "' not available\n" + usage)
- sys.stderr.write("\n")
- return 0
-
- # Checking that the command is being called by the correct user@hostname. Otherwise, pass
- # the command onto the appropriate machine as the appropriate user via ssh. For the time
- # being we're just going to call 'sudo -s', but this will change later.
- user = getpass.getuser()
- host = socket.gethostname()
- if service_action_to_req_user[service_action] != "" and (user + "@" + host) != service_action_to_req_user[service_action]:
- print "Handing off command to run as: " + service_action_to_req_user[service_action]
- array = commands.getstatusoutput("sudo -s ssh " + service_action_to_req_user[service_action] + " " + MCP_path + " " + " ".join(map(str,sys.argv[1:])))
- print array[1]
- return 0
-
- # Retrieve the appropriate "service" python class
- myclass = getattr(service_name_to_module[service], service)
+def main():
+ parser = argparse.ArgumentParser(description='MG-RAST Control Program (MCP)')
+ parser.add_argument('subsystem', metavar='subsystem',
+ help="An MG-RAST subsystem. The current list of subsystems includes: '" +
+ "', '".join(json_conf["global"]["subsystems"])+"'")
+ parser.add_argument('params', metavar='params', nargs=argparse.REMAINDER,
+ help='Additional arguments should specify an action followed by action parameters.')
+ args = parser.parse_args()
+ subsystem = args.subsystem
+ params = args.params
+
+ if subsystem in json_conf["global"]["subsystems"]:
+ # Retrieve the appropriate "subsystem" python module and class (module and class names should be identical)
+ module = __import__(subsystem)
+ myclass = getattr(module, subsystem)
# Create the python class instance dynamically
- myservice = myclass(MCP_dir)
-
- # Retrieve the appropriate "action" python method
- myaction = getattr(myservice, action)
- # Run the python method
- if(len(sys.argv) == 3):
- myaction()
- elif(len(sys.argv) == 4):
- myaction(sys.argv[3])
- elif(len(sys.argv) == 5):
- myaction(sys.argv[3], sys.argv[4])
+ mysubsystem = myclass(MCP_path)
+ mysubsystem.run(params)
return 0
if __name__ == "__main__":
- sys.exit( main(sys.argv) )
+ sys.exit( main() )
View
35 mcp_modules/mcp_api.py
@@ -1,17 +1,12 @@
-import glob, json, os, sys, time
+import glob, json, os, sys
from subsystem import subsystem
class mcp_api(subsystem):
- actions = "start, stop, restart"
+ actions = [ 'start', 'stop', 'restart', 'log' ]
- def __init__(self, MCP_dir):
- subsystem.__init__(self, MCP_dir)
- self.state = { 'resource':self.__class__.__name__,
- 'updated':time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()),
- 'url':self.json_conf['global']['apiurl'] + "/" + str(self.json_conf['mcp_api']['version']) + "/" + self.__class__.__name__,
- 'status': { 'site' : 'online' }
- }
- self.services = self.json_conf['global']['services']
+ def __init__(self, MCP_path):
+ subsystem.__init__(self, MCP_path)
+ self.state['status'] = { 'site' : 'online' }
def start(self):
files = glob.glob(self.apidir+"/*")
@@ -19,16 +14,17 @@ def start(self):
sys.stderr.write("ERROR: Cannot initialize mcp_api because API directory (" + self.apidir + ") is not empty\n")
return 0
- for service in self.services:
- service = service.strip();
- module = __import__(service)
- myclass = getattr(module, service)
- myservice = myclass(self.MCP_dir)
-
- jstate = json.dumps(myservice.get_state())
- f = open(self.apidir + "/" + service, 'w')
+ for subsystem in self.json_conf['global']['subsystems']:
+ subsystem = subsystem.strip();
+ module = __import__(subsystem)
+ myclass = getattr(module, subsystem)
+ mysubsystem = myclass(self.MCP_dir)
+ jstate = json.dumps(mysubsystem.state)
+ f = open(self.apidir + "/" + subsystem, 'w')
f.write(jstate)
+ return 1
+
def stop(self):
for file_object in os.listdir(self.apidir):
file_object_path = os.path.join(self.apidir, file_object)
@@ -37,6 +33,9 @@ def stop(self):
else:
shutil.rmtree(file_object_path)
+ return 1
+
def restart(self):
self.stop()
self.start()
+ return 1
View
15 mcp_modules/memcache.py
@@ -1,19 +1,14 @@
-import time
from subsystem import subsystem
class memcache(subsystem):
- actions = "clear"
+ actions = [ 'clear', 'log' ]
- def __init__(self, MCP_dir):
- subsystem.__init__(self, MCP_dir)
- self.state = { 'resource':self.__class__.__name__,
- 'url':self.json_conf['global']['apiurl'] + "/" + str(self.json_conf['mcp_api']['version']) + "/" + self.__class__.__name__,
- 'updated':time.strftime("%Y-%m-%d %H:%M:%S")
- }
+ def __init__(self, MCP_path):
+ subsystem.__init__(self, MCP_path)
self.memhost = self.json_conf['memcache']['memhost']
def clear(self):
print "Clearing memcache:"
- sout, serr = self.run_cmd(self.MCP_dir + "bin/clear_memcache.pl " + self.memhost)
+ sout = self.run_cmd(self.MCP_dir + "bin/clear_memcache.pl " + self.memhost)
print "memcache cleared!"
- return 0
+ return 1
View
33 mcp_modules/mlog.py
@@ -1,33 +1,8 @@
-import json, sys, time
+import json, sys
from subsystem import subsystem
class mlog(subsystem):
- actions = "set"
+ actions = [ 'log' ]
- def __init__(self, MCP_dir):
- subsystem.__init__(self, MCP_dir)
- self.state = { 'resource':self.__class__.__name__,
- 'updated':time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()),
- 'url':self.json_conf['global']['apiurl'] + "/" + str(self.json_conf['mcp_api']['version']) + "/" + self.__class__.__name__,
- 'log_levels':{}
- }
-
- for component in self.json_conf['mlog']['log_levels']:
- self.state['log_levels'][component] = self.json_conf['mlog']['log_levels'][component]
-
- def set(self, component, level):
- if component not in self.json_conf['mlog']['log_levels']:
- sys.stderr.write("ERROR: '" + component + "' is not a valid logging component.\n")
- return 0
- else:
- try:
- int(level)
- except ValueError:
- sys.stderr.write("ERROR: '" + level + "' is not an integer value.\n")
- return 0
- self.state['log_levels'][component] = int(level)
- self.state['updated'] = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
- jstate = json.dumps(self.get_state())
- f = open(self.apidir + "/" + self.__class__.__name__, 'w')
- f.write(jstate)
- return 1
+ def __init__(self, MCP_path):
+ subsystem.__init__(self, MCP_path)
View
27 mcp_modules/queue.py
@@ -1,37 +1,30 @@
-import time
from subsystem import subsystem
class queue(subsystem):
- actions = "start, stop"
+ actions = [ 'start', 'stop', 'log' ]
- def __init__(self, MCP_dir):
- subsystem.__init__(self, MCP_dir)
+ def __init__(self, MCP_path):
+ subsystem.__init__(self, MCP_path)
# getting status of queues
- sout, serr = self.run_cmd("/usr/local/bin/qstat -Q batch")
+ sout = self.run_cmd("/usr/local/bin/qstat -Q batch")
lines = sout.splitlines()
batch_status = 'online' if lines[len(lines)-1].split()[3] == 'yes' else 'offline'
- sout, serr = self.run_cmd("/usr/local/bin/qstat -Q fast")
+ sout = self.run_cmd("/usr/local/bin/qstat -Q fast")
lines = sout.splitlines()
fast_status = 'online' if lines[len(lines)-1].split()[3] == 'yes' else 'offline'
- self.state = { 'resource':self.__class__.__name__,
- 'updated':time.strftime("%Y-%m-%d %H:%M:%S"),
- 'url':self.json_conf['global']['apiurl'] + "/" + str(self.json_conf['mcp_api']['version']) + "/" + self.__class__.__name__,
- 'status': { 'batch': batch_status,
- 'fast': fast_status
- }
- }
+ self.state['status'] = { 'batch': batch_status, 'fast': fast_status }
def start(self):
print "Starting nagasaki pipeline:"
- sout, serr = self.run_cmd("/usr/local/bin/qstart batch")
+ sout = self.run_cmd("/usr/local/bin/qstart batch")
print "nagasaki pipeline started!"
- return 0
+ return 1
def stop(self):
print "Stopping nagasaki pipeline:"
- sout, serr = self.run_cmd("/usr/local/bin/qstop batch")
+ sout = self.run_cmd("/usr/local/bin/qstop batch")
print "nagasaki pipeline stopped!"
- return 0
+ return 1
View
86 mcp_modules/subsystem.py
@@ -1,22 +1,88 @@
+import argparse
+import commands
import json
-import shlex, subprocess
+import getpass, re, socket
+import shlex, shutil, subprocess
+import sys, time
class subsystem (object):
- def __init__(self, MCP_dir):
- self.MCP_dir = MCP_dir
- self.state = {}
+ def __init__(self, MCP_path):
+ self.MCP_path = MCP_path
+ matchObj = re.match(r'(^.*\/)', self.MCP_path)
+ self.MCP_dir = matchObj.group(0)
+
+ self.subsystem = self.__class__.__name__
json_conf_file = open(self.MCP_dir+'conf/conf.json')
self.json_conf = json.load(json_conf_file)
self.apidir = self.json_conf['mcp_api']['dir'] + "/" + str(self.json_conf['mcp_api']['version'])
+ self.log_level_max = self.json_conf['global']['log_level_max']
+ self.log_level_min = self.json_conf['global']['log_level_min']
+
+ self.req_host = ""
+ if 'req_host' in self.json_conf[self.subsystem]:
+ self.req_host = self.json_conf[self.subsystem]['req_host']
+
+ self.req_user = ""
+ if 'req_user' in self.json_conf[self.subsystem]:
+ self.req_user = self.json_conf[self.subsystem]['req_user']
+
+ self.state = { 'log_level':self.json_conf['global']['default_log_level'],
+ 'subsystem':self.subsystem,
+ 'updated':time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()),
+ 'url':self.json_conf['global']['apiurl'] + "/" + str(self.json_conf['mcp_api']['version']) + "/" + self.subsystem
+ }
+
+ def run(self, params):
+ parser = argparse.ArgumentParser(prog='MCP ' + self.subsystem)
+ parser.add_argument('action', metavar='action',
+ help="An action for the subsystem '" + self.subsystem + "' to perform. Available actions include: '" + "', '".join(self.actions) + "'")
+ parser.add_argument('params', metavar='params', nargs=argparse.REMAINDER,
+ help="Additional arguments should specify an action followed by action parameters if required.")
+ args = parser.parse_args(params)
+ action = args.action
+ params = args.params
+
+ user = getpass.getuser()
+ host = socket.gethostname()
+ req_login = self.req_user + "@" + self.req_host
+ if self.req_host != "" and self.req_user != "" and (user + "@" + host) != req_login:
+ print "Handing off command to run as: " + req_login
+ sout = self.run_cmd("sudo -s ssh " + " ".join([req_login, self.MCP_path, self.subsystem, action]) + " " + " ".join(params))
+ return 1
+
+ myaction = getattr(self, action)
+ if params:
+ myaction(params)
+ else:
+ myaction()
+
+ def log(self, level):
+ try:
+ level = int(level)
+ except ValueError:
+ sys.stderr.write("ERROR: '" + level + "' is not an integer value.\n")
+ return 0
+
+ if level < self.log_level_min or level > self.log_level_max:
+ sys.stderr.write("ERROR: '" + str(level) + "' is not a valid logging level.\n")
+ return 0
+
+ self.state['log_level'] = level
+ self.state['updated'] = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
+ jstate = json.dumps(self.state)
+ fname = self.apidir + "/" + self.__class__.__name__
+ f = open(fname + ".tmp", 'w')
+ f.write(jstate)
+ shutil.move(fname + ".tmp", fname)
- def get_state(self):
- return self.state
+ return 1
def run_cmd(self, cmd_str):
cmd = shlex.split(str(cmd_str))
proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
- stdout, stderr = proc.communicate()
- if proc.returncode != 0:
- raise IOError("%s\n%s"%(" ".join(cmd), stderr))
+ sout, serr = proc.communicate()
+ if proc.returncode != 0 or serr != "":
+ raise IOError("%s\n%s"%(" ".join(cmd), serr))
+ return 0
- return stdout, stderr
+ return sout
View
26 mcp_modules/upload.py
@@ -1,22 +1,18 @@
-import os, json, sys, time
+import os, json, sys
from subsystem import subsystem
class upload(subsystem):
- actions = "lock_page, unlock_page"
+ actions = [ 'lock_page', 'unlock_page', 'log' ]
- def __init__(self, MCP_dir):
- subsystem.__init__(self, MCP_dir)
- self.state = { 'resource':self.__class__.__name__,
- 'updated':time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()),
- 'url':self.json_conf['global']['apiurl'] + "/" + str(self.json_conf['mcp_api']['version']) + "/" + self.__class__.__name__,
- 'status':{}
- }
+ def __init__(self, MCP_path):
+ subsystem.__init__(self, MCP_path)
+ self.state['status'] = {}
self.lock_file = self.json_conf['upload']['lock_dir'] + "/upload.lock"
if os.path.isfile(self.lock_file):
- self.state['status']['page'] = "locked"
+ self.state['status']['page'] = "offline"
else:
- self.state['status']['page'] = "not locked"
+ self.state['status']['page'] = "online"
def lock_page(self):
fh = file(self.lock_file, 'a')
@@ -25,8 +21,8 @@ def lock_page(self):
finally:
fh.close()
- self.state['status']['page'] = "locked"
- jstate = json.dumps(self.get_state())
+ self.state['status']['page'] = "offline"
+ jstate = json.dumps(self.state)
f = open(self.apidir + "/" + self.__class__.__name__, 'w')
f.write(jstate)
return 1
@@ -35,8 +31,8 @@ def unlock_page(self):
if os.path.isfile(self.lock_file):
os.unlink(self.lock_file)
- self.state['status']['page'] = "not locked"
- jstate = json.dumps(self.get_state())
+ self.state['status']['page'] = "online"
+ jstate = json.dumps(self.state)
f = open(self.apidir + "/" + self.__class__.__name__, 'w')
f.write(jstate)
return 1

No commit comments for this range

Something went wrong with that request. Please try again.