Skip to content
Browse files

Linter

  • Loading branch information...
1 parent 5bd80d1 commit 5e3e9e3f326e356a56ee1013bda0671e6f9058b5 @sholiday committed Apr 9, 2012
Showing with 106 additions and 98 deletions.
  1. +2 −2 Makefile
  2. +71 −64 odin.py
  3. +33 −32 odinTest.py
View
4 Makefile
@@ -1,11 +1,11 @@
test: all odin-test
-all: proto odin-machine
+all: odin_pb2.py odin-machine
odin-test:
/usr/local/bin/python -u odinTest.py
odin-machine:
/usr/local/bin/python -u odin.py
-proto:
+odin_pb2.py : odin.proto
/usr/local/bin/protoc --python_out=. odin.proto
View
135 odin.py
@@ -12,10 +12,8 @@
Not because I think it's the best, but because it's a standard.
"""
-import json
import logging
import odin_pb2
-import os
import random
import signal
import sys
@@ -24,68 +22,73 @@
import xmlrpclib
import zookeeper
-logger = logging.getLogger()
+logger = logging.getLogger()
+
+ZOO_OPEN_ACL_UNSAFE = {"perms": 0x1f, "scheme": "world", "id": "anyone"}
-ZOO_OPEN_ACL_UNSAFE = {"perms":0x1f, "scheme":"world", "id" :"anyone"};
def path_join(a, b):
return a + '/' + b
-
+
+
def path_pop(a):
return a.split('/').pop()
+
def path_cell(a):
parts = a.split('/')
return '/' + parts[1] + '/' + parts[2]
+
class Odin():
'''
A machine in the cell exists via a node like:
/odin/cell/machines/m-??????
These are ephemeral. If we are creating a node, we will use create
sequence.
-
+
A job exists in the cell via a node like:
/odin/cell/jobs/j-?????
Maybe not ephemeral, but deleted when completed. Create sequence.
-
+
A task that is to be run on a machine is at:
/odin/cell/tasks/m-????/task-????
Where m-??? is the machine and the task is sequential. The contents is
in the job. Any new tasks under there will start a job on the machine.
-
+
'''
_cell = None
_cell_default = '/odin/test'
_z = None
- def __init__(self, zookeeper_servers = 'localhost:2181',
- cell = _cell_default):
+
+ def __init__(self, zookeeper_servers='localhost:2181',
+ cell=_cell_default):
if cell is None:
self._cell = self._cell_default
else:
self._cell = cell
logger.debug(self._cell)
-
+
zookeeper.set_debug_level(zookeeper.LOG_LEVEL_ERROR)
self._z = zookeeper.init(zookeeper_servers)
- self._recursivelyCreatePath('%s/machines/'%self._cell)
- self._recursivelyCreatePath('%s/jobs/'%self._cell)
- self._recursivelyCreatePath('%s/tasks/'%self._cell)
- self._recursivelyCreatePath('%s/politicians/'%self._cell)
+ self._recursivelyCreatePath('%s/machines/' % self._cell)
+ self._recursivelyCreatePath('%s/jobs/' % self._cell)
+ self._recursivelyCreatePath('%s/tasks/' % self._cell)
+ self._recursivelyCreatePath('%s/politicians/' % self._cell)
def __del__(self):
zookeeper.close(self._z)
-
- def create_machine(self, data, cell = None):
+
+ def create_machine(self, data, cell=None):
if not isinstance(data, odin_pb2.Machine):
raise TypeError('data must be a protobuffer Machine')
if cell is None:
cell = self._cell
machine = zookeeper.create(self._z,
- '%s/machines/m-'%cell,
+ '%s/machines/m-' % cell,
data.SerializeToString(),
[ZOO_OPEN_ACL_UNSAFE],
zookeeper.SEQUENCE | zookeeper.EPHEMERAL)
@@ -94,11 +97,11 @@ def create_machine(self, data, cell = None):
cell_name = path_cell(machine)
zookeeper.create(self._z,
- '%s/tasks/%s'%(cell_name, machine_name),
+ '%s/tasks/%s' % (cell_name, machine_name),
'',
[ZOO_OPEN_ACL_UNSAFE])
return machine
-
+
def register_machine(self, machine, data):
if not isinstance(data, odin_pb2.Machine):
raise TypeError('data must be a protobuffer Machine')
@@ -122,15 +125,15 @@ def get_machine(self, machine):
def get_tasks(self, machine):
machine_name = path_pop(machine)
cell = path_cell(machine)
- tasks_dir = '%s/tasks/%s'%(cell, machine_name)
+ tasks_dir = '%s/tasks/%s' % (cell, machine_name)
children = zookeeper.get_children(self._z, tasks_dir)
-
+
tasks = list()
for child_id in children:
- child = '%s/%s'%(tasks_dir, child_id)
+ child = '%s/%s' % (tasks_dir, child_id)
tasks.append(self.get_task(child))
-
+
return tasks
def get_task(self, task):
@@ -139,41 +142,41 @@ def get_task(self, task):
def watch_tasks(self, machine, watcher):
machine_name = path_pop(machine)
cell = path_cell(machine)
- tasks_dir = '%s/tasks/%s'%(cell, machine_name)
+ tasks_dir = '%s/tasks/%s' % (cell, machine_name)
children = zookeeper.get_children(self._z, tasks_dir, watcher)
tasks = list()
for child_id in children:
tasks.append(child_id)
-
+
return tasks
- def get_politicians(self, watcher = None, cell = None):
+ def get_politicians(self, watcher=None, cell=None):
if cell is None:
cell = self._cell
if watcher is None:
- children = zookeeper.get_children(self._z, '%s/politicians'%(cell))
+ children = zookeeper.get_children(self._z, '%s/politicians' % (cell))
else:
- children = zookeeper.get_children(self._z, '%s/politicians'%(cell),
+ children = zookeeper.get_children(self._z, '%s/politicians' % (cell),
watcher)
-
+
children = sorted(children)
politicians = list()
for child in children:
- politicians.append('%s/politicians/%s'%(cell, child))
+ politicians.append('%s/politicians/%s' % (cell, child))
return sorted(politicians)
- def add_politician(self, data, cell = None):
+ def add_politician(self, data, cell=None):
if not isinstance(data, odin_pb2.Politician):
raise TypeError('data must be a protobuffer Politician')
if cell is None:
cell = self._cell
return zookeeper.create(self._z,
- '%s/politicians/p-'%(cell),
+ '%s/politicians/p-' % (cell),
data.SerializeToString(),
[ZOO_OPEN_ACL_UNSAFE],
zookeeper.SEQUENCE | zookeeper.EPHEMERAL)
@@ -185,12 +188,12 @@ def add_task_to_machine(self, task, machine):
cell_name = path_cell(machine)
return zookeeper.create(self._z,
- '%s/tasks/%s/t-'%(cell_name, machine_name),
+ '%s/tasks/%s/t-' % (cell_name, machine_name),
task.SerializeToString(),
[ZOO_OPEN_ACL_UNSAFE],
zookeeper.SEQUENCE)
- def _recursivelyCreatePath(self, path, intermediate_data = ''):
+ def _recursivelyCreatePath(self, path, intermediate_data=''):
'''
Should do a check to see if this dir already exists.
'''
@@ -204,7 +207,8 @@ def _recursivelyCreatePath(self, path, intermediate_data = ''):
intermediate_data,
[ZOO_OPEN_ACL_UNSAFE])
except zookeeper.NodeExistsException:
- pass
+ pass
+
class OdinMachine(threading.Thread):
'''
@@ -219,20 +223,20 @@ class OdinMachine(threading.Thread):
_tasks = {}
_tasks_lock = threading.Lock()
- def __init__(self, machine = None, cell = None,
- supervisor_url = "http://localhost:9001/RPC2"):
+ def __init__(self, machine=None, cell=None,
+ supervisor_url="http://localhost:9001/RPC2"):
threading.Thread.__init__(self)
self._machine = machine
- self._supervisor = xmlrpclib.ServerProxy(supervisor_url)
+ self._supervisor = xmlrpclib.ServerProxy(supervisor_url)
self._supervisor_methods = self._supervisor.system.listMethods()
self.reset_group()
#rint self._supervisor_methods
# Create an Odin connection.
logger.debug(cell)
- self._odin = Odin(cell = cell)
+ self._odin = Odin(cell=cell)
machine_data = odin_pb2.Machine()
if self._machine is None:
@@ -244,19 +248,19 @@ def __init__(self, machine = None, cell = None,
def has_twiddler(self):
return 'twiddler.getAPIVersion' in self._supervisor_methods
- def create_process(self, name, config, group = _supervisor_default_group):
+ def create_process(self, name, config, group=_supervisor_default_group):
return self._supervisor.twiddler.addProgramToGroup(group, name, config)
-
- def remove_process(self, name, group = _supervisor_default_group):
+
+ def remove_process(self, name, group=_supervisor_default_group):
return self._supervisor.supervisor.removeProcessFromGroup(group, name)
-
- def get_process_info(self, name, group = _supervisor_default_group):
- return self._supervisor.supervisor.getProcessInfo('%s:%s'%(group, name))
-
- def get_processes(self, group = _supervisor_default_group):
+
+ def get_process_info(self, name, group=_supervisor_default_group):
+ return self._supervisor.supervisor.getProcessInfo('%s:%s' % (group, name))
+
+ def get_processes(self, group=_supervisor_default_group):
return self._supervisor.supervisor.getAllProcessInfo()
- def get_log(self, name, group = _supervisor_default_group):
+ def get_log(self, name, group=_supervisor_default_group):
return self._supervisor.supervisor.readProcessStderrLog(
'%s:%s' % (group, name), 0, 0)
@@ -271,7 +275,7 @@ def run(self):
time.sleep(60)
logger.debug('Heartbeat.')
- def reset_group(self, group = _supervisor_default_group):
+ def reset_group(self, group=_supervisor_default_group):
'''
Stops all tasks and removes the group, adding it back.
'''
@@ -304,15 +308,15 @@ def task_to_config(self, task):
def get_task(self, task_id):
machine_name = path_pop(self._machine)
cell = path_cell(self._machine)
- task = '%s/tasks/%s/%s'%(cell, machine_name, task_id)
+ task = '%s/tasks/%s/%s' % (cell, machine_name, task_id)
return self._odin.get_task(task)
def run_tasks(self, tasks):
logger.debug('run_tasks on %s tasks.' % len(tasks))
self._tasks_lock.acquire()
for task_id in tasks:
- if self._tasks.has_key(task_id):
+ if task_id in self._tasks:
logger.debug('Already have task %s.' % task_id)
else:
logger.debug('New task %s. Let\'s start it.' % task_id)
@@ -328,14 +332,15 @@ def _tasks_watcher(self, z, event, state, path):
'''
Watcher for a change in tasks.
'''
-
+
#line = (z, type(event), type(state), path)
#logger.debug(line)
logger.debug('Task List changed for %s.' % path)
self.run_tasks(self._odin.watch_tasks(self._machine,
self._tasks_watcher))
+
class OdinPolitician(threading.Thread):
'''
A possible scheduler of an Odin cell.
@@ -348,11 +353,11 @@ class OdinPolitician(threading.Thread):
_odin = None
_is_leader = False
- def __init__(self, cell = None):
+ def __init__(self, cell=None):
threading.Thread.__init__(self)
# Create an Odin connection.
- self._odin = Odin(cell = cell)
+ self._odin = Odin(cell=cell)
politician = odin_pb2.Politician()
@@ -375,7 +380,7 @@ def _politicians_watcher(self, z, event, state, path):
'''
Watcher for a change in leaders.
'''
-
+
#line = (z, type(event), type(state), path)
#logger.debug(line)
logger.debug('Task List changed for %s.' % path)
@@ -390,13 +395,15 @@ def run(self):
while True:
time.sleep(10)
- logger.debug('Heartbeat: _is_leader:%s'%self._is_leader)
+ logger.debug('Heartbeat: _is_leader:%s' % self._is_leader)
+
def signal_handler(self, signal, frame):
print 'Kill signal received'
sys.exit(0)
-def main(argv = None):
+
+def main(argv=None):
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
signal.signal(signal.SIGTSTP, signal_handler)
@@ -412,25 +419,25 @@ def main(argv = None):
logger.info('Starting OdinMachine.')
- cell = '/odin/test%s'%random.randint(0,9999)
+ cell = '/odin/test%s' % random.randint(0, 9999)
- odin_politician = OdinPolitician(cell = cell)
+ odin_politician = OdinPolitician(cell=cell)
odin_politician.start()
- odin_politician2 = OdinPolitician(cell = cell)
+ odin_politician2 = OdinPolitician(cell=cell)
odin_politician2.start()
- odin_machine = OdinMachine(cell = cell)
+ odin_machine = OdinMachine(cell=cell)
odin_machine.start()
task_1 = odin_pb2.Task()
task_1.runnable.command = 'ls'
task_1.runnable.startsecs = 0
task_name = odin_machine._odin.add_task_to_machine(task_1,
odin_machine._machine)
- logger.debug('Added task %s'%task_name)
+ logger.debug('Added task %s' % task_name)
odin_machine.join()
odin_politician.join()
logger.info('Threads joined.')
if __name__ == '__main__':
- main()
+ main()
View
65 odinTest.py
@@ -17,68 +17,69 @@
import random
import unittest
+
class OdinTest(unittest.TestCase):
def setUp(self):
- self.odin = odin.Odin(cell = '/odin/test%s'%random.randint(0,9999))
+ self.odin = odin.Odin(cell='/odin/test%s' % random.randint(0, 9999))
self.machine_data = odin_pb2.Machine()
self.machine_data_2 = odin_pb2.Machine()
self.task_1 = odin_pb2.Task()
self.task_1.id = 2
def test_create_machine(self):
- machine = self.odin.create_machine(self.machine_data)
- self.assertEquals(self.machine_data, self.odin.get_machine(machine))
+ machine = self.odin.create_machine(self.machine_data)
+ self.assertEquals(self.machine_data, self.odin.get_machine(machine))
def test_remove_machine(self):
- machine = self.odin.create_machine(self.machine_data)
- self.assertTrue(self.odin.remove_machine(machine))
+ machine = self.odin.create_machine(self.machine_data)
+ self.assertTrue(self.odin.remove_machine(machine))
def test_register_machine(self):
- machine = self.odin.create_machine(self.machine_data)
- self.assertTrue(self.odin.remove_machine(machine))
- self.odin.register_machine(machine, self.machine_data_2)
- self.assertEquals(self.machine_data_2, self.odin.get_machine(machine))
+ machine = self.odin.create_machine(self.machine_data)
+ self.assertTrue(self.odin.remove_machine(machine))
+ self.odin.register_machine(machine, self.machine_data_2)
+ self.assertEquals(self.machine_data_2, self.odin.get_machine(machine))
def test_add_task(self):
- machine = self.odin.create_machine(self.machine_data)
- task = self.odin.add_task_to_machine(self.task_1, machine)
- task_name = odin.path_pop(task)
- self.assertEquals(self.task_1, self.odin.get_tasks(machine)[0])
+ machine = self.odin.create_machine(self.machine_data)
+ self.odin.add_task_to_machine(self.task_1, machine)
+ self.assertEquals(self.task_1, self.odin.get_tasks(machine)[0])
def test_add_politician(self):
- politician = self.odin.add_politician(odin_pb2.Politician())
- politicians = self.odin.get_politicians()
+ politician = self.odin.add_politician(odin_pb2.Politician())
+ politicians = self.odin.get_politicians()
+
+ self.assertIn(politician, politicians)
- self.assertIn(politician, politicians)
class OdinMachineTest(unittest.TestCase):
def setUp(self):
- self.machine = odin.OdinMachine(cell =
- '/odin/test%s'%random.randint(0,9999))
+ self.machine = odin.OdinMachine(
+ cell='/odin/test%s' % random.randint(0, 9999))
self.task_1 = odin_pb2.Task()
self.task_1.id = 2
def test_has_twiddler(self):
- self.assertTrue(self.machine.has_twiddler())
+ self.assertTrue(self.machine.has_twiddler())
def test_task_to_config(self):
- task = odin_pb2.Task()
- task.runnable.command = 'ls'
- task.runnable.startsecs = 0
+ task = odin_pb2.Task()
+ task.runnable.command = 'ls'
+ task.runnable.startsecs = 0
- expected_conf = {'command' : 'ls', 'startsecs' : 0}
- generted_conf = self.machine.task_to_config(task)
+ expected_conf = {'command': 'ls', 'startsecs': 0}
+ generted_conf = self.machine.task_to_config(task)
- self.assertItemsEqual(generted_conf, expected_conf)
+ self.assertItemsEqual(generted_conf, expected_conf)
def test_get_processes(self):
- self.machine.get_processes()
+ self.machine.get_processes()
def test_get_task(self):
- task = self.machine._odin.add_task_to_machine(self.task_1,
- self.machine._machine)
- task_id = odin.path_pop(task)
- self.assertEquals(self.task_1, self.machine.get_task(task_id))
-
+ task = self.machine._odin.add_task_to_machine(self.task_1,
+ self.machine._machine)
+ task_id = odin.path_pop(task)
+ self.assertEquals(self.task_1, self.machine.get_task(task_id))
+
if __name__ == '__main__':
- unittest.main()
+ unittest.main()

0 comments on commit 5e3e9e3

Please sign in to comment.
Something went wrong with that request. Please try again.