Skip to content

Commit

Permalink
Added client base class.
Browse files Browse the repository at this point in the history
New client class to interact with workers as a client and not using
the HTTP interface.

New worker to stream data to client (random.php)

New sample client to demonstrate stream abilities.
  • Loading branch information
markkimsal committed Sep 10, 2014
1 parent eeacad0 commit 851f540
Show file tree
Hide file tree
Showing 3 changed files with 210 additions and 0 deletions.
46 changes: 46 additions & 0 deletions sample_clients/stream_data.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
<?php
chdir( dirname(dirname(__FILE__)) );

include_once ('src/client_base.php');


class Zmws_Sample_Client_Stream extends Zmws_Client_Base {

public function start() {
$this->stream('RANDSTREAM',
array('charset'=>'abcde1234567890'),
array($this, 'callback')
)->execute();

echo "sleeping for 3 and running again ....\n";
sleep(3);

$this->stream('RANDSTREAM',
array('charset'=>'abcde1234567890'),
array($this, 'callback')
)->execute();
}

public function callback($data, $status, $sv, $jobid) {
printf ("Got %s from %s [%s] with data ? %s\n", $status, $sv, $jobid, empty($data)?'NO': $data);
//COMPLETE streams have no data
if ($status == 'CONT')
$rev = $this->reverseString($data);
}

public function callbackRev($data, $status, $sv, $jobid) {
static $c=0;
$c++;
printf ("Got %s from %s [%s] with data ? %s\n", $status, $sv, $jobid, empty($data)?'NO': $data);
}

public function reverseString($str) {
$this->task('REV',
array('str'=>$str),
array($this, 'callbackRev')
)->execute();
}
}

$client = new Zmws_Sample_Client_Stream();
$client->start();
31 changes: 31 additions & 0 deletions sample_workers/random.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
<?php

chdir( dirname(dirname(__FILE__)) );

include_once ('src/worker_base.php');

class Zmws_Worker_Random extends Zmws_Worker_Base {

public function work($jobid, $param='') {
for ($x=0; $x<100; $x++) {
$answer = new Zmws_Worker_Answer();
$answer->status = TRUE;
$answer->retval = $this->randString(32);
$this->sendAnswer($answer, 'CONT');
}
return TRUE;
}

public function randString($length){
$charset='ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789';
$str = '';
$count = strlen($charset);
while ($length--) {
$str .= $charset[mt_rand(0, $count-1)];
}
return $str;
}
}

$w = new Zmws_Worker_Random();
while($w->loop()) {}
133 changes: 133 additions & 0 deletions src/client_base.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
<?php

include_once(dirname(__FILE__).'/zmsg.php');
class Zmws_Client_Base {

public $context = NULL;
public $frontend = NULL;
public $_identity = '';

protected $listFeSrv = array();
protected $listCallbacks = array();
protected $listJobs = array();
protected $streamReq = NULL;
protected $taskReq = NULL;
protected $recving = FALSE;

public function __construct($host='localhost', $port=5555) {
$this->addServer($host, $port);
}

public function addServer($host='localhost', $port=5555) {
$this->listFeSrv[] = array('host'=>$host, 'port'=>$port);

if ($this->frontend) {
$this->frontendConnect();
}
}

public function frontendSocket() {
$this->context = new ZMQContext();
$this->frontend = new ZMQSocket($this->context, ZMQ::SOCKET_DEALER);
// Configure socket to not wait at close time
// $this->frontend->setSockOpt(ZMQ::SOCKOPT_LINGER, 0);

//connect
$this->frontendConnect();
}

public function frontendConnect() {
//dealer sockets round robin send to all connections
// and fair-queue receive from all connectsion
foreach ($this->listFeSrv as $_srv) {
$this->frontend->connect("tcp://".$_srv['host'].":".$_srv['port']);
}
}

public function task($sv, $param, $callback) {

if (!$this->frontend) {
$this->frontendSocket();
}
//$this->listTaskCb[$sv] = $callback;
$this->listCallbacks[$sv] = $callback;
$this->listJobs[$sv]++;
$this->taskReq = new Zmsg($this->frontend);
$this->taskReq->body_set('JOB: SYNC-'.$sv);
$this->taskReq->push( 'PARAM-JSON: '.json_encode($param) );
return $this;
}

public function stream($sv, $param, $callback) {

if (!$this->frontend) {
$this->frontendSocket();
}
$this->listCallbacks[$sv] = $callback;
$this->listJobs[$sv]++;
$this->streamReq = new Zmsg($this->frontend);
$this->streamReq->body_set('JOB: SYNC-'.$sv);
$this->streamReq->push( 'PARAM-JSON: '.json_encode($param) );
return $this;
}

public function execute() {
if (is_object($this->streamReq)) {
$this->streamReq->send();
$this->streamReq = NULL;
}
if (is_object($this->taskReq)) {
$this->taskReq->send();
$this->taskReq = NULL;
}

if ($this->recving) return;

$listener = new Zmsg($this->frontend);
//while ( count($this->listCallbacks) && $reply = $this->streamReq->recv()) {
while ( $reply = $listener->recv()) {
$this->recving = TRUE;
//ID, null, message (for rep/req sockets)
$spacer = $reply->unwrap();
$body = $reply->unwrap();
list ($status, $sv, $jobid) = explode(' ', $body);
$status = substr($status, 0, -1);
$jobid = rtrim($jobid, ']');
$jobid = ltrim($jobid, '[');

$param = $reply->unwrap();
if (substr($param, 0, 10) == 'PARAM-JSON') {
$param = json_decode( substr($param, 12) );
}
if ( isset($this->listCallbacks[$sv]) ) {
$callback = $this->listCallbacks[$sv];

if (is_callable($callback) && !is_array($callback)) {
$callback($param, $status, $sv, $jobid);
}
if (is_array($callback)) {
$callback[0]->$callback[1]($param, $status, $sv, $jobid);
}
}

if ($status === 'FAIL') {
$this->listJobs[$sv]--;
}
if ($status === 'COMPLETE') {
$this->listJobs[$sv]--;
}

foreach ($this->listJobs as $sv =>$cnt) {
if ($cnt > 0) continue;
unset ($this->listJobs[$sv]);
unset ($this->listCallbacks[$sv]);
}
if (!count($this->listJobs)) {
$this->recving = FALSE;
break;
}
}
echo "done listening \n";
}
}

0 comments on commit 851f540

Please sign in to comment.