Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

addnode options / loadbalancer stuff #20

Open
wants to merge 18 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
154 changes: 105 additions & 49 deletions starcluster/balancers/sge/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,25 +19,62 @@
import re
import time
import datetime
import traceback
import string
import xml.dom.minidom


from starcluster import utils
from starcluster import static
from starcluster import exception
from starcluster.balancers import LoadBalancer
from starcluster.logger import log

from starcluster import sge_utils

SGE_STATS_DIR = os.path.join(static.STARCLUSTER_CFG_DIR, 'sge')
DEFAULT_STATS_DIR = os.path.join(SGE_STATS_DIR, '%s')
DEFAULT_STATS_DIR = os.path.join(SGE_STATS_DIR, '%s_%s')
DEFAULT_STATS_FILE = os.path.join(DEFAULT_STATS_DIR, 'sge-stats.csv')

##changes:
# added queue_name, image_id, instance_id, zone, spot_bid params
# --> and corresponding arguments to commands.loadbalancer
# added queue_name to default stats dir
# added queue_name param to qhost,qacct,and qstat commands
# --> but using "sge_utils.get_qstat" instead of original qstat command
# to handle problem with 0 nodes
# removed SGE command output parsing functions (and other sge utils) to sge_utils.py
# removed slot number consistency check, and instead using default slots for instance type
# --> the way this is handled should be improved
# modified logic of load balancer to handle case with 0 nodes in queue
# _find_node_for_removal ensures that node is removed with queue that is being balanced
# creates queue if it doesn't already exist
# added host_group param, to specify of node should be added to host group instead of directly to queue
# --> creates host group if it doesn't already exist and adds hostgroup to specified queue
# upon adding node, adds the node to the specified queue / hostgroup if it hasn't already been added
# added slots option, so that number of slots new nodes should take in the queue can be specified

DEFAULT_SLOTS = {
't1.micro': 1,
'm1.small': 1,
'm1.large': 2,
'm1.xlarge': 4,
'c1.medium': 2,
'c1.xlarge': 8,
'm2.xlarge': 2,
'm2.2xlarge': 4,
'm2.4xlarge': 8,
'cc1.4xlarge': 8,
'cg1.4xlarge': 8,
}

class SGEStats(object):
"""
SunGridEngine stats parser
"""
def __init__(self):

def __init__(self, default_slots, queue_name=None):
self.queue_name = queue_name
self.default_slots = default_slots
self.jobstat_cachesize = 200
self.hosts = []
self.jobs = []
Expand Down Expand Up @@ -127,13 +164,6 @@ def _count_tasks(self, jdict):
(num_tasks, tasks))
return num_tasks

def qacct_to_datetime_tuple(self, qacct):
"""
Takes the SGE qacct formatted time and makes a datetime tuple
format is:
Tue Jul 13 16:24:03 2010
"""
return datetime.datetime.strptime(qacct, "%a %b %d %H:%M:%S %Y")

def parse_qacct(self, string, dtnow):
"""
Expand All @@ -142,40 +172,15 @@ def parse_qacct(self, string, dtnow):
Takes the string to parse, and a datetime object of the remote
host's current time.
"""
job_id = None
qd = None
start = None
end = None
counter = 0
lines = string.split('\n')
for l in lines:
l = l.strip()
if l.find('jobnumber') != -1:
job_id = int(l[13:len(l)])
if l.find('qsub_time') != -1:
qd = self.qacct_to_datetime_tuple(l[13:len(l)])
if l.find('start_time') != -1:
if l.find('-/-') > 0:
start = dtnow
else:
start = self.qacct_to_datetime_tuple(l[13:len(l)])
if l.find('end_time') != -1:
if l.find('-/-') > 0:
end = dtnow
else:
end = self.qacct_to_datetime_tuple(l[13:len(l)])
if l.find('==========') != -1:
if qd is not None:
self.max_job_id = job_id
hash = {'queued': qd, 'start': start, 'end': end}
self.jobstats[job_id % self.jobstat_cachesize] = hash
qd = None
start = None
end = None
counter = counter + 1
log.debug("added %d new jobs" % counter)
log.debug("There are %d items in the jobstats cache" %
len(self.jobstats))
jobstats,num_new_jobs = sge_utils.parse_qacct(string,dtnow)
for (jobid,hash) in jobstats.values():
self.max_job_id = jobid
self.jobstats[jobid % self.jobstat_cachesize] = hash

log.debug("added %d new jobs." % num_new_jobs)
log.debug("There are %d items in the jobstats cache." %
len(self.jobstats))

return self.jobstats

def is_jobstats_empty(self):
Expand Down Expand Up @@ -228,6 +233,7 @@ def slots_per_host(self):
inconsistent, this will return -1 for example, if you have m1.large and
m1.small in the same cluster
"""

total = self.count_total_slots()
if total == 0:
return total
Expand Down Expand Up @@ -376,8 +382,8 @@ class SGELoadBalancer(LoadBalancer):
approximately how long an instance will take to start up.
wait_time = 900

Keep this at 1 - your master, for now.
min_nodes = 1
Keep this at 0 - assume staring with an empty queue (for now)
min_nodes = 0

This would allow the master to be killed when the queue empties. UNTESTED.
kill_cluster = False
Expand Down Expand Up @@ -543,6 +549,44 @@ def get_stats(self):
"Failed to retrieve SGE stats after trying %d times, exiting..." %
retries)

def create_queue(self):
sge_utils.create_queue(self._cluster.master_node,self.queue_name)

def create_host_group(self):
if self.host_group:
sge_utils.create_host_group(self._cluster.master_node,
self.host_group)

def hosts_in_queue(self):
return sge_utils.get_hosts(self._cluster.master_node,
qname=self.queue_name)

def add_host_group_to_queue(self):
master = self._cluster.master_node
return sge_utils.add_to_queue(master,self.queue_name,[self.host_group])

def add_to_queue(self,aliases):
master = self._cluster.master_node
slots = self.slots
if slots is None:
nodes = [self._cluster.get_node_by_alias(alias) \
for alias in aliases]
slots = dict([(node.alias,get_num_procs(node)) for node in nodes])
return sge_utils.add_to_queue_with_slots(master,
self.queue_name,aliases,slots=slots)

def add_to_host_group(self,aliases):
master = self._cluster.master_node
slots = self.slots
if slots is None:
nodes = [self._cluster.get_node_by_alias(alias) \
for alias in aliases]
slots = dict([(node.alias,get_num_procs(node)) for node in nodes])
M1 = sge_utils.add_to_host_group(master,self.host_group,aliases)
M2 = sge_utils.add_slots_to_queue(master,
self.queue_name,aliases,slots=slots)
return M1 + M2

def run(self, cluster):
"""
This function will loop indefinitely, using SGELoadBalancer.get_stats()
Expand All @@ -551,16 +595,19 @@ def run(self, cluster):
durations (currently doesn't)
"""
self._cluster = cluster
qname = self.queue_name
if self.max_nodes is None:
self.max_nodes = cluster.cluster_size
use_default_stats_file = self.dump_stats and not self.stats_file
use_default_plots_dir = self.plot_stats and not self.plot_output_dir
if use_default_stats_file or use_default_plots_dir:
self._mkdir(DEFAULT_STATS_DIR % cluster.cluster_tag, makedirs=True)
self._mkdir(
DEFAULT_STATS_DIR % (cluster.cluster_tag,self.queue_name),
makedirs=True)
if not self.stats_file:
self.stats_file = DEFAULT_STATS_FILE % cluster.cluster_tag
self.stats_file = DEFAULT_STATS_FILE % (cluster.cluster_tag,qname)
if not self.plot_output_dir:
self.plot_output_dir = DEFAULT_STATS_DIR % cluster.cluster_tag
self.plot_output_dir = DEFAULT_STATS_DIR % (cluster.cluster_tag,qname)
if not cluster.is_cluster_up():
raise exception.ClusterNotRunning(cluster.cluster_tag)
if self.dump_stats:
Expand Down Expand Up @@ -588,6 +635,12 @@ def run(self, cluster):
log.info("Writing stats to file: %s" % self.stats_file)
if self.plot_stats:
log.info("Plotting stats to directory: %s" % self.plot_output_dir)

self.create_queue()
if self.host_group:
self.create_host_group()
self.add_host_group_to_queue()

while(self._keep_polling):
if not cluster.is_cluster_up():
log.info("Waiting for all nodes to come up...")
Expand Down Expand Up @@ -684,6 +737,7 @@ def _eval_add_node(self):
self.longest_allowed_queue_time)
max_add = self.max_nodes - len(self._cluster.running_nodes)
need_to_add = min(self.add_nodes_per_iteration, need_to_add, max_add)

if need_to_add > 0:
log.warn("Adding %d nodes at %s" %
(need_to_add, str(datetime.datetime.utcnow())))
Expand Down Expand Up @@ -792,3 +846,5 @@ def _minutes_uptime(self, node):
now = self.get_remote_time()
timedelta = now - dt
return timedelta.seconds / 60


87 changes: 62 additions & 25 deletions starcluster/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,10 @@
from starcluster.templates import user_msgs
from starcluster.logger import log

##changes:
# added spot_bid option to Cluster.create_nodes
# added various options available on Cluster.create_nodes to Cluster.add_node(s)
# and ClusterManager.add_node(s) methods

class ClusterManager(managers.Manager):
"""
Expand Down Expand Up @@ -164,26 +168,41 @@ def _get_cluster_name(self, cluster_name):
cluster_name = static.SECURITY_GROUP_TEMPLATE % cluster_name
return cluster_name

def add_node(self, cluster_name, alias=None, no_create=False,
image_id=None, instance_type=None, zone=None,
placement_group=None, spot_bid=None):
def add_node(self, cluster_name,
alias=None,
image_id=None,
instance_type=None,
zone=None,
placement_group=None,
spot_bid=None,
no_create=False):
cl = self.get_cluster(cluster_name)
return cl.add_node(alias=alias, image_id=image_id,
instance_type=instance_type, zone=zone,
placement_group=placement_group, spot_bid=spot_bid,
no_create=no_create)

def add_nodes(self, cluster_name, num_nodes, aliases=None, no_create=False,
image_id=None, instance_type=None, zone=None,
placement_group=None, spot_bid=None):
return cl.add_node(alias=alias,
image_id=image_id,
instance_type=instance_type,
zone=zone,
placement_group=placement_group,
spot_bid=spot_bid,
no_create=no_create)

def add_nodes(self, cluster_name, num_nodes, aliases=None,
image_id=None,
instance_type=None,
zone=None,
placement_group=None,
spot_bid=None,
no_create=False):
"""
Add one or more nodes to cluster
"""
cl = self.get_cluster(cluster_name)
return cl.add_nodes(num_nodes, aliases=aliases, image_id=image_id,
instance_type=instance_type, zone=zone,
placement_group=placement_group, spot_bid=spot_bid,
no_create=no_create)
return cl.add_nodes(num_nodes, aliases=aliases,
image_id=image_id,
instance_type=instance_type,
zone=zone,
placement_group=placement_group,
spot_bid=spot_bid,
no_create=no_create)

def remove_node(self, cluster_name, alias, terminate=True):
"""
Expand Down Expand Up @@ -860,23 +879,39 @@ def _get_next_node_num(self):
log.debug("Highest node number is %d. choosing %d." % (highest, next))
return next

def add_node(self, alias=None, no_create=False, image_id=None,
instance_type=None, zone=None, placement_group=None,
spot_bid=None):
def add_node(self,
alias=None,
image_id=None,
instance_type=None,
zone=None,
placement_group=None,
spot_bid=None,
no_create=False):

"""
Add a single node to this cluster
"""
aliases = None
if alias:
aliases = [alias]
return self.add_nodes(1, aliases=aliases, image_id=image_id,
instance_type=instance_type, zone=zone,
placement_group=placement_group,
spot_bid=spot_bid, no_create=no_create)

def add_nodes(self, num_nodes, aliases=None, image_id=None,
instance_type=None, zone=None, placement_group=None,
spot_bid=None, no_create=False):
return self.add_nodes(1, aliases=aliases,
image_id=image_id,
instance_type=instance_type,
zone=zone,
placement_group=placement_group,
spot_bid=spot_bid,
no_create=no_create)

def add_nodes(self,
num_nodes,
aliases=None,
image_id = None,
instance_type=None,
zone=None,
placement_group=None,
spot_bid=None,
no_create=False):
"""
Add new nodes to this cluster

Expand Down Expand Up @@ -913,6 +948,8 @@ def add_nodes(self, num_nodes, aliases=None, image_id=None,
for alias in aliases:
node = self.get_node_by_alias(alias)
self.run_plugins(method_name="on_add_node", node=node)

return aliases

def remove_node(self, node, terminate=True):
"""
Expand Down
3 changes: 3 additions & 0 deletions starcluster/clustersetup.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,11 @@
from starcluster import threadpool
from starcluster.utils import print_timing
from starcluster.logger import log
from starcluster import sge_utils
from starcluster import exception

##changes
# replaced _remove_node_from_sge contents with more general sge_utils function

class ClusterSetup(object):
"""
Expand Down
Loading