Permalink
Browse files

Introducing react/zmq

  • Loading branch information...
0 parents commit 2a06ebd12245b82343634774fc616a9b2b1682df @igorw igorw committed May 26, 2012
@@ -0,0 +1 @@
+vendor
@@ -0,0 +1,11 @@
+language: php
+
+php:
+ - 5.3
+ - 5.4
+
+before_script:
+ - curl -s http://getcomposer.org/installer | php
+ - php composer.phar install
+
+script: phpunit --coverage-text
@@ -0,0 +1,19 @@
+Copyright (c) 2012 Igor Wiedler
+
+Permission is hereby granted, free of charge, to any person obtaining a copy
+of this software and associated documentation files (the "Software"), to deal
+in the Software without restriction, including without limitation the rights
+to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+copies of the Software, and to permit persons to whom the Software is furnished
+to do so, subject to the following conditions:
+
+The above copyright notice and this permission notice shall be included in all
+copies or substantial portions of the Software.
+
+THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+THE SOFTWARE.
@@ -0,0 +1,73 @@
+# React-ZMQ
+
+ZeroMQ bindings for React.
+
+[![Build Status](https://secure.travis-ci.org/react-php/react.png)](http://travis-ci.org/react-php/react)
+
+## Install
+
+The recommended way to install react is [through composer](http://getcomposer.org).
+
+```JSON
+{
+ "require": {
+ "react/react": "dev-master"
+ }
+}
+```
+
+## Example
+
+Here is an example of a push socket:
+
+```php
+<?php
+
+$loop = React\EventLoop\Factory::create();
+
+$context = new React\Zmq\Context($loop);
+
+$push = $context->getSocket(ZMQ::SOCKET_PUSH);
+$push->connect('tcp://127.0.0.1:5555');
+
+$i = 0;
+$loop->addPeriodicTimer(1, function () use (&$i, $push) {
+ $i++;
+ echo "sending $i\n";
+ $push->send($i);
+});
+
+$loop->run();
+```
+
+And the pull socket that goes with it:
+
+```php
+<?php
+$loop = React\EventLoop\Factory::create();
+
+$context = new React\Zmq\Context($loop);
+
+$pull = $context->getSocket(ZMQ::SOCKET_PULL);
+$pull->bind('tcp://127.0.0.1:5555');
+
+$pull->on('error', function ($e) {
+ var_dump($e->getMessage());
+});
+
+$pull->on('message', function ($msg) {
+ echo "Received: $msg\n";
+});
+
+$loop->run();
+```
+
+## Tests
+
+To run the test suite, you need PHPUnit.
+
+ $ phpunit
+
+## License
+
+MIT, see LICENSE.
@@ -0,0 +1,15 @@
+{
+ "name": "react/zmq",
+ "description": "ZeroMQ bindings for React.",
+ "keywords": ["zmq", "zeromq"],
+ "license": "MIT",
+ "require": {
+ "php": ">=5.3.2",
+ "ext-zmq": "1.0.*",
+ "evenement/evenement": "dev-master",
+ "react/event-loop": "dev-master"
+ },
+ "autoload": {
+ "psr-0": { "React\\Zmq": "src" }
+ }
+}
@@ -0,0 +1,24 @@
+{
+ "hash": "79cb5af3dbfcf10015c40406d50b3664",
+ "packages": [
+ {
+ "package": "evenement/evenement",
+ "version": "dev-master",
+ "source-reference": "b3b308f4b124e643bb632e0a3cf5944894b7d952"
+ },
+ {
+ "package": "react/event-loop",
+ "version": "dev-master",
+ "source-reference": "76db13526c13e266f71e4ddd541d08f08ed2f84e"
+ }
+ ],
+ "packages-dev": null,
+ "aliases": [
+
+ ],
+ "minimum-stability": "dev",
+ "stability-flags": {
+ "evenement/evenement": 20,
+ "react/event-loop": 20
+ }
+}
@@ -0,0 +1,27 @@
+<?php
+
+require __DIR__.'/../vendor/autoload.php';
+
+$loop = React\EventLoop\Factory::create();
+
+$context = new React\Zmq\Context($loop);
+
+$sub = $context->getSocket(ZMQ::SOCKET_SUB);
+$sub->bind('tcp://127.0.0.1:5555');
+$sub->subscribe('foo');
+
+$sub->on('message', function ($msg) {
+ echo "Received: $msg\n";
+});
+
+$pub = $context->getSocket(ZMQ::SOCKET_PUB);
+$pub->connect('tcp://127.0.0.1:5555');
+
+$i = 0;
+$loop->addPeriodicTimer(1, function () use (&$i, $pub) {
+ $i++;
+ echo "publishing $i\n";
+ $pub->send('foo '.$i);
+});
+
+$loop->run();
@@ -0,0 +1,20 @@
+<?php
+
+require __DIR__.'/../vendor/autoload.php';
+
+$loop = React\EventLoop\Factory::create();
+
+$context = new React\Zmq\Context($loop);
+
+$pull = $context->getSocket(ZMQ::SOCKET_PULL);
+$pull->bind('tcp://127.0.0.1:5555');
+
+$pull->on('error', function ($e) {
+ var_dump($e->getMessage());
+});
+
+$pull->on('message', function ($msg) {
+ echo "Received: $msg\n";
+});
+
+$loop->run();
@@ -0,0 +1,23 @@
+<?php
+
+require __DIR__.'/../vendor/autoload.php';
+
+$loop = React\EventLoop\Factory::create();
+
+$context = new React\Zmq\Context($loop);
+
+$push = $context->getSocket(ZMQ::SOCKET_PUSH);
+$push->connect('tcp://127.0.0.1:5555');
+
+$push->on('error', function ($e) {
+ var_dump($e->getMessage());
+});
+
+$i = 0;
+$loop->addPeriodicTimer(1, function () use (&$i, $push) {
+ $i++;
+ echo "sending $i\n";
+ $push->send($i);
+});
+
+$loop->run();
@@ -0,0 +1,25 @@
+<?xml version="1.0" encoding="UTF-8"?>
+
+<phpunit backupGlobals="false"
+ backupStaticAttributes="false"
+ colors="true"
+ convertErrorsToExceptions="true"
+ convertNoticesToExceptions="true"
+ convertWarningsToExceptions="true"
+ processIsolation="false"
+ stopOnFailure="false"
+ syntaxCheck="false"
+ bootstrap="tests/bootstrap.php"
+>
+ <testsuites>
+ <testsuite name="React-ZMQ Test Suite">
+ <directory>./tests/React/</directory>
+ </testsuite>
+ </testsuites>
+
+ <filter>
+ <whitelist>
+ <directory>./src/</directory>
+ </whitelist>
+ </filter>
+</phpunit>
@@ -0,0 +1,69 @@
+<?php
+
+namespace React\Zmq;
+
+use Evenement\EventEmitter;
+use React\EventLoop\LoopInterface;
+
+class Buffer extends EventEmitter
+{
+ public $socket;
+ public $closed = false;
+ public $listening = false;
+ private $loop;
+ private $messages = array();
+
+ public function __construct(\ZMQSocket $socket, LoopInterface $loop)
+ {
+ $this->socket = $socket;
+ $this->loop = $loop;
+ }
+
+ public function send($message)
+ {
+ if ($this->closed) {
+ return;
+ }
+
+ $this->messages[] = $message;
+
+ if (!$this->listening) {
+ $fd = $this->socket->getSockOpt(\ZMQ::SOCKOPT_FD);
+ $that = $this;
+ $listener = function () use ($that) {
+ if ($that->socket->getSockOpt(\ZMQ::SOCKOPT_EVENTS) & \ZMQ::POLL_OUT) {
+ $that->handleWrite();
+ }
+ };
+ $this->loop->addWriteStream($fd, $listener);
+
+ $this->listening = true;
+ }
+ }
+
+ public function end()
+ {
+ $this->closed = true;
+
+ if (!$this->listening) {
+ $this->emit('end');
+ }
+ }
+
+ public function handleWrite()
+ {
+ $fd = $this->socket->getSockOpt(\ZMQ::SOCKOPT_FD);
+ foreach ($this->messages as $i => $message) {
+ try {
+ $sent = (bool) $this->socket->send($message, \ZMQ::MODE_DONTWAIT);
+ if ($sent) {
+ unset($this->messages[$i]);
+ $this->loop->removeWriteStream($fd);
+ $that->listening = false;
+ }
+ } catch (\ZMQSocketException $e) {
+ $that->emit('error', array($e));
+ }
+ }
+ }
+}
@@ -0,0 +1,43 @@
+<?php
+
+namespace React\Zmq;
+
+use React\EventLoop\LoopInterface;
+
+class Context
+{
+ private $loop;
+ private $context;
+
+ public function __construct(LoopInterface $loop, \ZMQContext $context = null)
+ {
+ $this->loop = $loop;
+ $this->context = $context ?: new \ZMQContext();
+ }
+
+ public function __call($method, $args)
+ {
+ $res = call_user_func_array(array($this->context, $method), $args);
+ if ($res instanceof \ZMQSocket) {
+ $res = $this->wrapSocket($res);
+ }
+ return $res;
+ }
+
+ private function wrapSocket(\ZMQSocket $socket)
+ {
+ $wrapped = new SocketWrapper($socket, $this->loop);
+
+ $fd = $socket->getSockOpt(\ZMQ::SOCKOPT_FD);
+ $this->loop->addReadStream($fd, function ($fd) use ($wrapped, $socket) {
+ while ($socket->getSockOpt(\ZMQ::SOCKOPT_EVENTS) & \ZMQ::POLL_IN) {
+ $message = $socket->recv(\ZMQ::MODE_DONTWAIT);
+ if (false !== $message) {
+ $wrapped->emit('message', array($message));
+ }
+ }
+ });
+
+ return $wrapped;
+ }
+}
Oops, something went wrong.

0 comments on commit 2a06ebd

Please sign in to comment.