This repository has been archived by the owner on Sep 23, 2020. It is now read-only.
/
cluster.py
155 lines (119 loc) · 4.6 KB
/
cluster.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
import hashlib
from libcloud.base import NodeImage, NodeSize
from libcloud.drivers import ec2
from nimboss.nimbus import NimbusClusterDocument
from nimboss.node import NimbusNodeDriver, EC2NodeDriver
class Cluster(object):
"""A collection of Nodes.
All Nodes of a given Cluster instance
are defined in a given "cluster document" that
contains Node details, and inter-Node relationships.
"""
def __init__(self, id, driver, cluster_type=None, name=None):
self.id = id # id is actually a context URI
self.driver = driver
self.cluster_type = cluster_type
self.name = name or ''
self.uuid = self.get_uuid()
self.nodes = {}
def add_node(self, node):
"""Add a Node to this Cluster's "nodes" attribute.
The "nodes" attribute exists only for the lifetime
of the Cluster instance, and is not persisted in
any way.
"""
if isinstance(node, (list, tuple)):
for n in node:
self.nodes[n.uuid] = n
else:
self.nodes[node.uuid] = node
def get_uuid(self):
"""Unique id, created by hashing Cluster id, and the Node Driver type.
"""
return hashlib.sha1("%s" % (self.id)).hexdigest() #FIXME
def get_status(self):
"""Cluster status, as return by the Context Broker.
"""
resp = self.driver.broker_client.get_status(self.id)
return resp
def destroy(self):
"""Terminate all Nodes in this Cluster.
"""
self.driver.destroy_cluster(self)
def __repr__(self):
args = (self.uuid, self.name, len(self.nodes.keys()))
return "Cluster: uuid=%s, name=%s, total nodes=%d" % args
class ClusterDriver(object):
"""Logic to manage resource that make up a Cluster.
Contains references to the Broker Client,
and the Node Driver.
"""
def __init__(self, broker_client=None, node_driver=None):
self.broker_client = broker_client
self.node_driver = node_driver
def create_cluster(self, clusterdoc, context=None, **kwargs):
"""Create a new cluster of nodes
@keyword keyname: The name of the key pair
@type keyname: C{str}
@keyword securitygroup: Name of security group
@type securitygroup: C{str}
"""
if isinstance(clusterdoc, str):
nimbuscd = NimbusClusterDocument(clusterdoc)
if context is None:
context = self.broker_client.create_context()
nodes_specs = clusterdoc.build_specs(context)
cluster = self.new_bare_cluster(id=context.uri)
for spec in nodes_specs:
cluster.add_node(launch_node_spec(spec, self.node_driver,
**kwargs))
return cluster
def new_bare_cluster(self, id):
return Cluster(id, driver=self)
def launch_node_spec(self, spec, driver, **kwargs):
"""Launches a single node group.
Returns a single Node or a list of Nodes.
"""
node_data = self._create_node_data(spec, driver, **kwargs)
node = driver.create_node(**node_data)
if isinstance(node, (list, tuple)):
for n in node:
n.ctx_name = spec.name
else:
node.ctx_name = spec.name
return node
def _create_node_data(self, spec, driver, **kwargs):
"""Utility to get correct form of data to create a Node.
"""
image = NodeImage(spec.image, spec.name, driver)
sizes = driver.list_sizes()
size = None
for asize in sizes:
if asize.id == spec.size:
size = asize
break
if size is None:
raise KeyError("Node size %s not found for driver %s" %
(spec.size, self.node_driver))
node_data = {
'name':spec.name,
'size':size,
'image':image,
'ex_mincount':str(spec.count),
'ex_maxcount':str(spec.count),
'ex_userdata':spec.userdata,
'ex_keyname':spec.keyname,
}
node_data.update(kwargs)
# libcloud doesn't like args with None values
return dict(pair for pair in node_data.iteritems() if pair[1] is not None)
def destroy_cluster(self, cluster):
"""Terminate all Nodes from this Cluster.
"""
for (id, node) in cluster.nodes.iteritems():
node.destroy()
def reboot_cluster(self, cluster):
"""Reboot all Nodes from this Cluster.
"""
for (id, node) in cluster.nodes.iteritems():
node.destroy()