Skip to content

Commit

Permalink
Add ability to stream multiple answers per job.
Browse files Browse the repository at this point in the history
New status of a job, CONT:
when CONT is encountered, the parameter is sent back to the client
and the client should be ready for more information.

The COMPLETE: status may or may not have a parameter attached to it,
that is left up to worker implementation.
  • Loading branch information
markkimsal committed Sep 10, 2014
1 parent 64a611c commit eeacad0
Show file tree
Hide file tree
Showing 2 changed files with 73 additions and 41 deletions.
39 changes: 31 additions & 8 deletions src/server.php
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,8 @@ public function purgeStaleWorkers() {
if(microtime(true) > $jdef['hb']) {
$this->log ( sprintf("purging stale worker %s, havent seen HB in %d seconds.", $id, (HEARTBEAT_INTERVAL * HEARTBEAT_MAXTRIES) ), 'W');
unset($this->workerList[$serv][$id]);

//$this->activeJobList[$jid] = $_j;
}
}
}
Expand Down Expand Up @@ -225,7 +227,7 @@ public function startJobs() {
continue;
}

$this->log( sprintf("Starting job %s, [%s%s%s], for client: %s Queue size %d", $_j['service'], ($_j['sync'])? 'SYNC ':'', (strlen($_j['param']))?'PARAM ':'', $jid, $_j['clientid'], count($this->queueJobList)), 'I');
$this->log( sprintf("Starting job %s, [%s%s%s] to %s, for client: @%s Queue size %d", $_j['service'], ($_j['sync'])? 'SYNC ':'', (strlen($_j['param']))?'PARAM ':'', $jid, $wid, bin2hex($_j['clientid']), count($this->queueJobList)), 'I');
$zmsg = new Zmsg($this->backend);
$zmsg->body_set('JOB: '.$jid);
$zmsg->wrap( $_j['param'] );
Expand All @@ -237,6 +239,7 @@ public function startJobs() {

$zmsg->send();

$_j['worker'] = $wid;
$_j['startedat'] = microtime(true);
$this->activeJobList[$jid] = $_j;
//remove from array
Expand Down Expand Up @@ -284,13 +287,17 @@ public function handleBack($zmsg) {
$this->deleteWorker($identity, $service);
$this->appendWorker($identity, $service);
}
if( strtoupper(substr($zmsg->body(), 0, 4) == 'FAIL') ) {
$jobid = substr($zmsg->body(), 6);
$this->handleFinishJob($zmsg, $jobid, $identity, $service, FALSE, $retval);
}
if( strtoupper(substr($zmsg->body(), 0, 8) == 'COMPLETE') ) {
$jobid = substr($zmsg->body(), 10);
$this->handleFinishJob($zmsg, $jobid, $identity, $service, TRUE, $retval);
}
if( strtoupper(substr($zmsg->body(), 0, 4) == 'FAIL') ) {
if( strtoupper(substr($zmsg->body(), 0, 4) == 'CONT') ) {
$jobid = substr($zmsg->body(), 6);
$this->handleFinishJob($zmsg, $jobid, $identity, $service, FALSE, $retval);
$this->handleJobAnswer($zmsg, $jobid, $identity, $service, TRUE, $retval);
}
} else {
$this->log( sprintf ("got a response that might have a return value: (%d) parts", count($zmsg->parts())), 'I' );
Expand Down Expand Up @@ -350,7 +357,7 @@ public function handleFront($zmsg) {
$jobid = $this->handleWorkRequest($job, $client_id, $param, $sync);
if (!$sync) {
$zmsgReply = new Zmsg($this->frontend);
$zmsgReply->body_set("JOB: ".$jobid);
$zmsgReply->body_set("JOB: ".$jobid. " ".$job);
$zmsgReply->wrap( null );
$zmsgReply->wrap( $client_id );
$zmsgReply->send();
Expand All @@ -369,9 +376,9 @@ public function handleFinishJob($zmsg, $jobid, $identity, $service, $success=tru

$answer = 'COMPLETE';
if ($success)
$this->log( sprintf ("JOB COMPLETE: %s %s, client id: %s - took %0.4f sec", $_job['service'], $jobid, $_job['clientid'], (microtime(true) - $_job['startedat'])), 'I' );
$this->log( sprintf ("JOB COMPLETE: %s %s, client id: @%s - took %0.4f sec", $_job['service'], $jobid, bin2hex($_job['clientid']), (microtime(true) - $_job['startedat'])), 'I' );
else {
$this->log( sprintf ("JOB FAILED: %s %s, client id: %s - took %0.4f sec", $_job['service'], $jobid, $_job['clientid'], (microtime(true) - $_job['startedat'])), 'I' );
$this->log( sprintf ("JOB FAILED: %s %s, client id: @%s - took %0.4f sec", $_job['service'], $jobid, bin2hex($_job['clientid']), (microtime(true) - $_job['startedat'])), 'I' );
$answer = 'FAIL';
}

Expand All @@ -386,8 +393,8 @@ public function handleFinishJob($zmsg, $jobid, $identity, $service, $success=tru
//if sync, send reply now
if ($_job['sync'] == TRUE) {
$zmsgReply = new Zmsg($this->frontend);
$zmsgReply->body_set($answer.": ".$jobid);
$zmsgReply->wrap($retval);
$zmsgReply->body_set($retval);
$zmsgReply->wrap($answer.": " .$_job['service']. " [".$jobid."]" );
$zmsgReply->wrap( null );
$zmsgReply->wrap( $_job['clientid'] );
$zmsgReply->send();
Expand All @@ -397,6 +404,22 @@ public function handleFinishJob($zmsg, $jobid, $identity, $service, $success=tru
$this->appendWorker($identity, $service);
}


public function handleJobAnswer($zmsg, $jobid, $identity, $service, $success=true, $retval=NULL) {
$_job = $this->activeJobList[$jobid];

$this->log( sprintf ("JOB ANSWER: %s %s, client id: @%s - took %0.4f sec", $_job['service'], $jobid, bin2hex($_job['clientid']), (microtime(true) - $_job['startedat'])), 'I' );
//if sync, send reply now
if ($_job['sync'] == TRUE) {
$zmsgReply = new Zmsg($this->frontend);
$zmsgReply->body_set($retval);
$zmsgReply->wrap("CONT: " .$_job['service']. " [".$jobid."]" );
$zmsgReply->wrap(null);
$zmsgReply->wrap( $_job['clientid'] );
$zmsgReply->send();
}
}

/**
* Record the request for a new job
*/
Expand Down
75 changes: 42 additions & 33 deletions src/worker_base.php
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ class Zmws_Worker_Base {
public $log_level = 'W';

public $listBackendSrv = array('127.0.0.1');
protected $_socketCurrent = NULL;

public function __construct($backendPort='') {

Expand Down Expand Up @@ -69,7 +70,7 @@ public function frontendSocket($port=FALSE) {
}
$addrBackend = current($this->listBackendSrv);

$this->frontend = new ZMQSocket($this->context, ZMQ::SOCKET_REQ);
$this->frontend = new ZMQSocket($this->context, ZMQ::SOCKET_DEALER);

// Configure socket to not wait at close time
// $this->frontend->setSockOpt(ZMQ::SOCKOPT_LINGER, 0);
Expand Down Expand Up @@ -139,10 +140,19 @@ public function loop() {
$this->log("poll done.", "D");
if($events > 0) {
foreach($read as $socket) {
$this->_socketCurrent = $socket;
$zmsg = new Zmsg($socket);
$zmsg->recv();

$jobid = $zmsg->body();

if ($jobid == 'HEARTBEAT') {
//any comms with server resets HB retries
//but we don't want to treat this message
// as a job, so continue
continue;
}

$client_id = $zmsg->address();
//this is just to remove the address and null to
// test for sizes (params)
Expand All @@ -165,18 +175,11 @@ public function loop() {
unset($v);
}

if ($jobid == 'HEARTBEAT') {
//any comms with server resets HB retries
//but we don't want to treat this message
// as a job, so continue
continue;
}
//got a real job, restart idle
$this->idleCount = 0;
$jobid = substr($jobid, 5);
$this->_jobidCurrent = $jobid;

$zanswer = new Zmsg($socket);
try {
//workers can return TRUE/FALSE, or an object
// with a status and a return value
$answer = $this->work($jobid, $p);
Expand All @@ -196,29 +199,7 @@ public function loop() {
}
$answer = $x;
}
//transform answer into zanswer
if ($answer->status) {
$zanswer->body_set("COMPLETE: ".$jobid);
$zanswer->wrap($this->serviceName);
if ($answer->retval !== NULL) {
$zanswer->push('PARAM-JSON: '. json_encode($answer->retval));
}
$this->log(sprintf("Job %s complete", $jobid), 'I');
} else {
$zanswer->body_set("FAIL: ".$jobid);
$zanswer->wrap($this->serviceName);
$this->log(sprintf("Job %s failed", $jobid), 'I');
}
} catch (Exception $e) {
$this->log($e->getMessage(), 'E');
$this->log(print_r($e->getTrace(),1), 'E');
$zanswer->body_set("FAIL: ".$jobid);
$zanswer->wrap($this->serviceName);
}
$zanswer->send();
//work may have taken longer than one HB interval,
//we should start timing new HBs from now
$this->hbAt = microtime(true) + HEARTBEAT_INTERVAL;
$this->sendAnswer($answer);
}
//communication with server, reset HB retries
$this->hbRetries = HEARTBEAT_RETRIES;
Expand Down Expand Up @@ -252,10 +233,38 @@ public function loop() {
//don't idle again until we get a job
$this->idleCount=-1;
}

return TRUE;
}

public function sendAnswer($answer, $header='COMPLETE') {
$jobid = $this->_jobidCurrent;
$zanswer = new Zmsg($this->_socketCurrent);
try {
//transform answer into zanswer
if ($answer->status) {
$zanswer->body_set($header.": ".$jobid);
$zanswer->wrap($this->serviceName);
if ($answer->retval !== NULL) {
$zanswer->push('PARAM-JSON: '. json_encode($answer->retval));
}
$this->log(sprintf("Job %s complete", $jobid), 'D');
} else {
$zanswer->body_set("FAIL: ".$jobid);
$zanswer->wrap($this->serviceName);
$this->log(sprintf("Job %s failed", $jobid), 'W');
}
} catch (Exception $e) {
$this->log($e->getMessage(), 'E');
$this->log(print_r($e->getTrace(),1), 'E');
$zanswer->body_set("FAIL: ".$jobid);
$zanswer->wrap($this->serviceName);
}
$zanswer->send();
//work may have taken longer than one HB interval,
//we should start timing new HBs from now
$this->hbAt = microtime(true) + HEARTBEAT_INTERVAL;
}

/**
* Called after 3 heartbeat intervals with no work requests.
*
Expand Down

0 comments on commit eeacad0

Please sign in to comment.