Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

initial commit

  • Loading branch information...
commit 4a167afe11b37999288a7b4d346365830517f506 0 parents
@neilmock authored
1  .gitignore
@@ -0,0 +1 @@
+*.pyc
87 README.md
@@ -0,0 +1,87 @@
+Understudy
+==========
+
+Understudy is a framework for distributed computing in Python. It allows
+you to write Python code and transparently execute it on any number of
+listening nodes.
+
+Overview
+--------
+
+Understudy provides a decorator for enabling Python classes to
+distribute work to an understudy:
+
+ class Job(object):
+ @understudy("calculator")
+ def add(self, num1, num2):
+ return num1 + num2
+
+The "understudy" decorator takes an understudy name as the first argument.
+This essentially sends the method to a named understudy process that is
+listening for requests.
+
+Here's an example of starting an understudy:
+
+ understudy = Understudy("calculator")
+ understudy.start()
+
+These will typically run daemonized on compute nodes.
+
+Example
+-------
+
+Continuing the example above, here's how to add two numbers:
+
+ >>> job = Job()
+ >>> result = job.add(1,1)
+ >>> result.check()
+ None
+ >>> result.check()
+ 2
+
+The add() method immediately returns a Result class, from which the result
+store can be polled and ultimately returned with a value. "None" is returned
+if the understudy has not yet completed the task.
+
+Virtual Environments
+--------------------
+
+Understudy requires that all imports used in the module are available on
+the Python environment of the understudy node(s).
+
+To facilitate this, Understudy has support for automatically bootstrapping
+a virtual environment and executing the called method within the context of
+that environment.
+
+Here's an example:
+
+ from understudy.decorators import understudy
+ from boto.s3 import Connection
+
+ class Downloader(object):
+ @understudy("s3_understudy", packages=['boto>=1.9b'])
+ def download(key):
+ connection = Connection()
+ key = connection.get_bucket("mybucket").get_key(key)
+ key.get_contents_to_filename("/path/to/myfile")
+
+This bootstraps a virtual environment on the understudy node by installing
+boto via PIP before executing the download() method. Packages are defined
+in PIP requirements.txt fashion.
+
+Redis
+-----
+
+Understudy is built on Redis, and both the decorator and the Understudy
+constructor take standard redis-py connection parameters as arguments:
+
+ class Job(object):
+ @understudy("calculator", host="127.0.0.1", port=6332, db=12)
+ def add(self, num1, num2):
+ return num1 + num2
+
+Disclaimer
+----------
+
+This is a work in progress and should be considered alpha as in
+"please dont use" until further notice.
2  requirements.txt
@@ -0,0 +1,2 @@
+simplejson
+redis==2.0.0
40 setup.py
@@ -0,0 +1,40 @@
+#!/usr/bin/env python
+
+"""
+@file setup.py
+@author Neil Mock
+@date 07/02/2010
+@brief setuptools config for understudy
+"""
+
+version = '0.0.1'
+
+sdict = {
+ 'name' : 'understudy',
+ 'version' : version,
+ 'description' : 'Distributed computing framework for Python.',
+ 'long_description' : 'Distributed computing framework for Python.',
+ 'url': 'http://github.com/neilmock/understudy',
+ 'download_url' : 'http://cloud.github.com/downloads/neilmock/understudy/understudy-%s.tar.gz' % version,
+ 'author' : 'Neil Mock',
+ 'author_email' : 'neilmock@gmail.com',
+ 'maintainer' : 'Neil Mock',
+ 'maintainer_email' : 'neilmock@gmail.com',
+ 'keywords' : ['understudy', 'distributed', 'cloud'],
+ 'license' : 'MIT',
+ 'packages' : ['understudy'],
+ 'classifiers' : [
+ 'Development Status :: 4 - Beta',
+ 'Environment :: Console',
+ 'Intended Audience :: Developers',
+ 'License :: OSI Approved :: MIT License',
+ 'Operating System :: OS Independent',
+ 'Programming Language :: Python'],
+}
+
+try:
+ from setuptools import setup
+except ImportError:
+ from distutils.core import setup
+
+setup(**sdict)
67 tests/test_decorator.py
@@ -0,0 +1,67 @@
+import unittest
+from multiprocessing import Process
+from understudy import Understudy, Result
+from understudy.exceptions import NoUnderstudiesError
+from understudy.decorators import understudy
+
+
+def start_understudy(channel):
+ understudy = Understudy(channel, db=9)
+ understudy.start()
+
+class Job(object):
+ @understudy("test", db=9)
+ def add(self, num1, num2):
+ return num1 + num2
+
+class JobWithRequirements(object):
+ @understudy("test", db=9, packages=['boto>=1.9b'])
+ def do_import(self):
+ try:
+ import boto
+ return True
+ except ImportError, e:
+ return e
+
+class TestDecorator(unittest.TestCase):
+ def setUp(self):
+ # fork the understudy
+ self.p = Process(target=start_understudy, args=("test",))
+ self.p.start()
+
+ def tearDown(self):
+ self.p.terminate()
+
+ def test_without_requirements(self):
+ while True:
+ try:
+ job = Job()
+ result = job.add(1,1)
+ break
+ except NoUnderstudiesError:
+ pass
+
+ self.assertTrue(isinstance(result, Result))
+
+ actual_result = result.check()
+ while not actual_result:
+ actual_result = result.check()
+
+ self.assertEquals(actual_result, "2")
+
+ def test_with_requirements(self):
+ while True:
+ try:
+ job = JobWithRequirements()
+ result = job.do_import()
+ break
+ except NoUnderstudiesError:
+ pass
+
+ self.assertTrue(isinstance(result, Result))
+
+ actual_result = result.check()
+ while not actual_result:
+ actual_result = result.check()
+
+ self.assertEquals(actual_result, "True")
47 tests/test_lead.py
@@ -0,0 +1,47 @@
+import re
+import unittest
+import simplejson
+from multiprocessing import Process
+from redis import Redis
+from understudy import Lead, Understudy, Result
+from understudy.exceptions import NoUnderstudiesError
+
+
+UUID = r'[a-f0-9]{8}-[a-f0-9]{4}-[a-f0-9]{4}-[a-f0-9]{4}-[a-f0-9]{12}'
+
+def start_understudy(channel):
+ understudy = Understudy(channel, db=9)
+ understudy.start()
+
+class TestLead(unittest.TestCase):
+ def setUp(self):
+ self.lead = Lead("test", db=9)
+
+ # fork the understudy
+ self.p = Process(target=start_understudy, args=("test",))
+ self.p.start()
+
+ def tearDown(self):
+ self.p.terminate()
+
+ def test_shell(self):
+ # non-blocking
+ while True:
+ try:
+ result = self.lead.shell("echo \"test\"")
+ break
+ except NoUnderstudiesError:
+ pass
+
+ self.assertTrue(isinstance(result, Result))
+ self.assertTrue(re.match(UUID, result.uuid))
+
+ actual_result = result.check()
+ while not actual_result:
+ actual_result = result.check()
+
+ self.assertEquals("test\n", actual_result)
+
+ # blocking
+ result = self.lead.shell("echo \"test\"", block=True)
+ self.assertEquals("test\n", result)
8 understudy/__init__.py
@@ -0,0 +1,8 @@
+from understudy.core import Lead, Understudy, Result
+from understudy.exceptions import UnderstudyError, NoUnderstudiesError
+
+
+__all__ = [
+ 'Lead', 'Understudy', 'Result',
+ 'UnderstudyError', 'NoUnderstudiesError',
+ ]
158 understudy/core.py
@@ -0,0 +1,158 @@
+import subprocess
+import simplejson
+import pickle
+import tempfile
+import shutil
+from uuid import uuid4
+from redis import Redis
+from understudy.exceptions import NoUnderstudiesError
+
+
+class Result(object):
+ def __init__(self, uuid, redis):
+ self.uuid = uuid
+ self.redis = Redis(host=redis.host,
+ port=redis.port,
+ db=redis.db,
+ password=redis.connection.password)
+
+ def check(self):
+ result = self.redis.get("result-%s" % self.uuid)
+ if result:
+ self.redis.delete("result-%s" % self.uuid)
+
+ return result
+
+ return None
+
+class Understudy(object):
+ "Redis subscriber."
+ def __init__(self, channel,
+ host='localhost', port=6379, db=0, password=None):
+ self.channel = channel
+ self.redis = Redis(host=host, port=port, db=db, password=password)
+
+ self.redis.subscribe(self.channel)
+
+ def _mkvirtualenv(self, packages):
+ directory_name = tempfile.mkdtemp()
+ cmd = "virtualenv %s --no-site-packages" % directory_name
+
+ subprocess.Popen(cmd, shell=True,
+ stdin=subprocess.PIPE,
+ stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE,
+ close_fds=True).communicate()[0]
+
+ if packages:
+ packages = ["\"%s\"" % package for package in packages]
+ cmd = "source %s/bin/activate && pip install %s" % (directory_name,
+ " ".join(packages))
+ subprocess.Popen(cmd, shell=True,
+ stdin=subprocess.PIPE,
+ stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE,
+ close_fds=True).communicate()[0]
+
+ activation_file = "%s/bin/activate_this.py" % directory_name
+ execfile(activation_file, dict(__file__=activation_file))
+
+ return directory_name
+
+ def _rmvirtualenv(self, directory):
+ shutil.rmtree(directory)
+
+ def shell(self, command):
+ return subprocess.Popen(command, shell=True,
+ stdin=subprocess.PIPE,
+ stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE,
+ close_fds=True).communicate()[0]
+
+
+ def perform(self, serialized):
+ packages = serialized['packages']
+
+ virtualenv = self._mkvirtualenv(packages)
+
+ kwargs = serialized['kwargs']
+ args = serialized['args']
+ cls = pickle.loads(serialized['cls'])
+ funkt = getattr(cls, serialized['func'])
+
+ kwargs['__understudy__'] = True
+ retval = funkt(*args, **kwargs)
+
+ self._rmvirtualenv(virtualenv)
+
+ return retval
+
+ def start(self):
+ for message in self.redis.listen():
+ if message['type'] == 'subscribe':
+ continue
+
+ message = simplejson.loads(message['data'])
+ uuid = message['uuid']
+
+ _redis = Redis(host=self.redis.host,
+ port=self.redis.port,
+ db=self.redis.db,
+ password=self.redis.connection.password)
+
+ directive = message['directive']
+
+ for action, args in directive.items():
+ func = getattr(self, action)
+ retval = func(args)
+
+ _redis.set("result-%s" % uuid, retval)
+ _redis.publish(uuid, "COMPLETE")
+
+
+ def stop(self):
+ self.redis.unsubscribe(self.channel)
+
+class Lead(object):
+ "Redis publisher."
+ def __init__(self, channel,
+ host='localhost', port=6379, db=0, password=None):
+ self.channel = channel
+ self.redis = Redis(host=host, port=port, db=db, password=password)
+
+ def _block(self, uuid):
+ self.redis.subscribe(uuid)
+
+ for message in self.redis.listen():
+ if message['type'] == 'message':
+ self.redis.unsubscribe(uuid)
+
+ retval = self.redis.get("result-%s" % uuid)
+ self.redis.delete("result-%s" % uuid)
+
+ return retval
+
+ def _handle(self, directive, block):
+ understudies = self.redis.publish(self.channel,
+ simplejson.dumps(directive))
+
+ if not understudies:
+ raise NoUnderstudiesError("No understudies found for this channel!")
+
+ if block:
+ return self._block(directive['uuid'])
+ else:
+ return Result(directive['uuid'], self.redis)
+
+ def shell(self, command, block=False):
+ uuid = str(uuid4())
+ directive = {'uuid':uuid, 'directive':{'shell':command}}
+
+ return self._handle(directive, block)
+
+ def perform(self, action, block=False):
+ uuid = str(uuid4())
+
+ directive = {'uuid':uuid, 'directive':{'perform':action}}
+
+ return self._handle(directive, block)
34 understudy/decorators.py
@@ -0,0 +1,34 @@
+import pickle
+from understudy import Lead
+
+
+def understudy(*uargs, **ukwargs):
+ def _understudy(func):
+ def dispatch(cls, *args, **kwargs):
+ if '__understudy__' in kwargs:
+ del(kwargs['__understudy__'])
+ return func(cls, *args, **kwargs)
+
+ channel = uargs[0]
+ host = ukwargs.get('host', 'localhost')
+ port = ukwargs.get('port', 6379)
+ db = ukwargs.get('db', 0)
+ password = ukwargs.get('password', None)
+
+ packages = ukwargs.get('packages', [])
+
+ action = {'cls':pickle.dumps(cls),
+ 'func':func.__name__,
+ 'args':args,
+ 'kwargs':kwargs,
+ 'packages':packages}
+
+ lead = Lead(channel, host=host, port=port, db=db, password=password)
+ result = lead.perform(action)
+
+ return result
+
+ return dispatch
+
+ return _understudy
+
5 understudy/exceptions.py
@@ -0,0 +1,5 @@
+class UnderstudyError(Exception):
+ pass
+
+class NoUnderstudiesError(UnderstudyError):
+ pass
Please sign in to comment.
Something went wrong with that request. Please try again.