diff --git a/sample_clients/stream_data.php b/sample_clients/stream_data.php new file mode 100644 index 0000000..914fb94 --- /dev/null +++ b/sample_clients/stream_data.php @@ -0,0 +1,46 @@ +stream('RANDSTREAM', + array('charset'=>'abcde1234567890'), + array($this, 'callback') + )->execute(); + + echo "sleeping for 3 and running again ....\n"; + sleep(3); + + $this->stream('RANDSTREAM', + array('charset'=>'abcde1234567890'), + array($this, 'callback') + )->execute(); + } + + public function callback($data, $status, $sv, $jobid) { + printf ("Got %s from %s [%s] with data ? %s\n", $status, $sv, $jobid, empty($data)?'NO': $data); + //COMPLETE streams have no data + if ($status == 'CONT') + $rev = $this->reverseString($data); + } + + public function callbackRev($data, $status, $sv, $jobid) { + static $c=0; + $c++; + printf ("Got %s from %s [%s] with data ? %s\n", $status, $sv, $jobid, empty($data)?'NO': $data); + } + + public function reverseString($str) { + $this->task('REV', + array('str'=>$str), + array($this, 'callbackRev') + )->execute(); + } +} + +$client = new Zmws_Sample_Client_Stream(); +$client->start(); diff --git a/sample_workers/random.php b/sample_workers/random.php new file mode 100644 index 0000000..170d7ce --- /dev/null +++ b/sample_workers/random.php @@ -0,0 +1,31 @@ +status = TRUE; + $answer->retval = $this->randString(32); + $this->sendAnswer($answer, 'CONT'); + } + return TRUE; + } + + public function randString($length){ + $charset='ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789'; + $str = ''; + $count = strlen($charset); + while ($length--) { + $str .= $charset[mt_rand(0, $count-1)]; + } + return $str; + } +} + +$w = new Zmws_Worker_Random(); +while($w->loop()) {} diff --git a/src/client_base.php b/src/client_base.php new file mode 100644 index 0000000..58c83b2 --- /dev/null +++ b/src/client_base.php @@ -0,0 +1,133 @@ +addServer($host, $port); + } + + public function addServer($host='localhost', $port=5555) { + $this->listFeSrv[] = array('host'=>$host, 'port'=>$port); + + if ($this->frontend) { + $this->frontendConnect(); + } + } + + public function frontendSocket() { + $this->context = new ZMQContext(); + $this->frontend = new ZMQSocket($this->context, ZMQ::SOCKET_DEALER); + // Configure socket to not wait at close time +// $this->frontend->setSockOpt(ZMQ::SOCKOPT_LINGER, 0); + + //connect + $this->frontendConnect(); + } + + public function frontendConnect() { + //dealer sockets round robin send to all connections + // and fair-queue receive from all connectsion + foreach ($this->listFeSrv as $_srv) { + $this->frontend->connect("tcp://".$_srv['host'].":".$_srv['port']); + } + } + + public function task($sv, $param, $callback) { + + if (!$this->frontend) { + $this->frontendSocket(); + } + //$this->listTaskCb[$sv] = $callback; + $this->listCallbacks[$sv] = $callback; + $this->listJobs[$sv]++; + $this->taskReq = new Zmsg($this->frontend); + $this->taskReq->body_set('JOB: SYNC-'.$sv); + $this->taskReq->push( 'PARAM-JSON: '.json_encode($param) ); + return $this; + } + + public function stream($sv, $param, $callback) { + + if (!$this->frontend) { + $this->frontendSocket(); + } + $this->listCallbacks[$sv] = $callback; + $this->listJobs[$sv]++; + $this->streamReq = new Zmsg($this->frontend); + $this->streamReq->body_set('JOB: SYNC-'.$sv); + $this->streamReq->push( 'PARAM-JSON: '.json_encode($param) ); + return $this; + } + + public function execute() { + if (is_object($this->streamReq)) { + $this->streamReq->send(); + $this->streamReq = NULL; + } + if (is_object($this->taskReq)) { + $this->taskReq->send(); + $this->taskReq = NULL; + } + + if ($this->recving) return; + + $listener = new Zmsg($this->frontend); + //while ( count($this->listCallbacks) && $reply = $this->streamReq->recv()) { + while ( $reply = $listener->recv()) { + $this->recving = TRUE; + //ID, null, message (for rep/req sockets) + $spacer = $reply->unwrap(); + $body = $reply->unwrap(); + list ($status, $sv, $jobid) = explode(' ', $body); + $status = substr($status, 0, -1); + $jobid = rtrim($jobid, ']'); + $jobid = ltrim($jobid, '['); + + $param = $reply->unwrap(); + if (substr($param, 0, 10) == 'PARAM-JSON') { + $param = json_decode( substr($param, 12) ); + } + if ( isset($this->listCallbacks[$sv]) ) { + $callback = $this->listCallbacks[$sv]; + + if (is_callable($callback) && !is_array($callback)) { + $callback($param, $status, $sv, $jobid); + } + if (is_array($callback)) { + $callback[0]->$callback[1]($param, $status, $sv, $jobid); + } + } + + if ($status === 'FAIL') { + $this->listJobs[$sv]--; + } + if ($status === 'COMPLETE') { + $this->listJobs[$sv]--; + } + + foreach ($this->listJobs as $sv =>$cnt) { + if ($cnt > 0) continue; + unset ($this->listJobs[$sv]); + unset ($this->listCallbacks[$sv]); + } + if (!count($this->listJobs)) { + $this->recving = FALSE; + break; + } + } + echo "done listening \n"; + } +} +