Skip to content
This repository has been archived by the owner on Dec 13, 2021. It is now read-only.

Commit

Permalink
Orchestrator must tolerant to unexpected deaths of processes. So add …
Browse files Browse the repository at this point in the history
…an unit test to /test. We should add many unit tests here in near future..

In current impl., an action being sent to dead process is lost. Even if process has revived, an action is lost because revived process has a different TCP socket.
  • Loading branch information
AkihiroSuda committed Jun 18, 2015
1 parent e07ff50 commit 5ee85d0
Show file tree
Hide file tree
Showing 4 changed files with 47 additions and 11 deletions.
44 changes: 33 additions & 11 deletions pyearthquake/orchestrator/orchestrator.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,11 +119,22 @@ def start(self):
explorer_worker_handle = eventlet.spawn(self.explorer.worker)
flask_app = Flask(self.__class__.__name__)
flask_app.debug = True
# self.regist_sigpipe_handler()
self.regist_flask_routes(flask_app)
server_sock = eventlet.listen(('localhost', self.listen_port))
wsgi.server(server_sock, flask_app)
raise RuntimeError('should not reach here!')

# import signal
# def regist_sigpipe_handler(self):
# orig_handler = signal.getsignal(signal.SIGPIPE)
# def handler(signum, frame):
# LOG.debug('SIGPIPE handler called')
# if hasattr(orig_handler, '__call__'):
# return orig_handler(signum, frame)
# signal.signal(signal.SIGPIPE, handler)
# LOG.info('Installed SIGPIPE handler')

def regist_flask_routes(self, app):
LOG.debug('registering flask routes')
@app.route('/')
Expand Down Expand Up @@ -156,35 +167,46 @@ def api_v1_post():
self.explorer.send_event(ev)
return jsonify({})


@app.route('/api/v1/<process_id>', methods=['GET'])
def api_v1_get(process_id):
## FIXME: make sure single conn exists for a single <process_id>
assert self.validate_process_id(process_id)
if not process_id in self.processes:
self.regist_process(process_id)

## acquire sem
LOG.debug('Acquiring sem for %s', process_id)
sem_acquired = self.processes[process_id]['sem'].acquire(blocking=False)
if not sem_acquired:
err = 'Could not acquire semaphore for %s' % process_id
LOG.warn(err)
return err

return err #TODO set HTTP error code
LOG.debug('Acquired sem for %s', process_id)

try:
ret = Response(_api_v1_get(process_id), 'application/json')
finally:
## release sem
self.processes[process_id]['sem'].release()
LOG.debug('Released sem for %s', process_id)
assert ret
return ret

def _api_v1_get(process_id):
## wait for action from explorer
## (FIXME: we should break this wait and release the sem when the conn is closed)
## WARNING: conn may be closed while waiting in this blocking q.get
## FIXME: we should break this wait and release the sem when the conn is closed,
## but Flask has no support for conn close detection, so we should not rely on Flask, maybe
## http://stackoverflow.com/questions/17787023/python-how-to-catch-a-flask-except-like-this
LOG.debug('Dequeuing action for %s', process_id)
got = self.processes[process_id]['queue'].get()
action = got['action']
LOG.debug('Dequeued action %s', action)
LOG.debug('Dequeued action %s for %s', action, process_id)
assert isinstance(action, ActionBase)

## return action
action_jsdict = action.to_jsondict()
LOG.debug('API <== %s', action_jsdict)

## release sem
self.processes[process_id]['sem'].release()
return jsonify(action_jsdict)
ret = json.dumps(action_jsdict)
yield ret

def send_action(self, action):
"""
Expand Down
1 change: 1 addition & 0 deletions test/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
# UNITTESTS GOES HERE
7 changes: 7 additions & 0 deletions test/http/dummy.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
{
"type": "event",
"class": "InspectionEndEvent",
"process": "dummy",
"uuid": "1f13eaa6-4b92-45f0-a4de-1236081dc652",
"option": {}
}
6 changes: 6 additions & 0 deletions test/http/test.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
$ curl http://localhost:10000/api/v1/dummy
^C (== KILL PROCESS)

$ curl --data @dummy.json http://localhost:10000/api/v1
==> WHAT HAPPENS?

0 comments on commit 5ee85d0

Please sign in to comment.