Skip to content

Commit

Permalink
Implement request response
Browse files Browse the repository at this point in the history
  • Loading branch information
adriacidre committed Oct 8, 2015
1 parent 5ecbe8c commit 88d5ca2
Show file tree
Hide file tree
Showing 6 changed files with 261 additions and 1 deletion.
12 changes: 12 additions & 0 deletions examples/reqres/req.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
<?php
require_once __DIR__ . "/../../vendor/autoload.php";

$connectionOptions = new \Nats\ConnectionOptions();
$connectionOptions->setHost('localhost')->setPort(4222);

$c = new Nats\Connection($connectionOptions);
$c->connect();

$c->request('sayhello', 'Marty McFly', function ($response) {
echo $response->getBody();
});
17 changes: 17 additions & 0 deletions examples/reqres/res.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
<?php
require_once __DIR__ . "/../../vendor/autoload.php";

$connectionOptions = new \Nats\ConnectionOptions();
$connectionOptions
->setHost('localhost')
->setPort(4222);
$c = new Nats\Connection($connectionOptions);
$c->connect();

$sid = $c->subscribe("sayhello", function ($res) {
$res->reply("Hello, " . $res->getBody() . " !!!");
});

$c->wait(2);

$c->unsubscribe($sid);
28 changes: 27 additions & 1 deletion src/Connection.php
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,25 @@ public function ping()
$this->pings += 1;
}

/**
* Request does a request and executes a callback with the response.
*
* @param string $subject Message topic.
* @param string $payload Message data.
* @param resource $callback Closure to be executed as callback.
* @param integer $quantity Number of messages to wait for.
*/
public function request($subject, $payload, $callback, $wait = 1) {
$inbox = uniqid('_INBOX.');
$this->subscribe($inbox, $callback);

$msg = 'PUB '.$subject.' '.$inbox.' '.strlen($payload);
$this->send($msg . "\r\n" . $payload);
$this->pubs += 1;

$this->wait($wait);
}

/**
* Publish publishes the data argument to the given subject.
*
Expand Down Expand Up @@ -264,14 +283,21 @@ private function handlePING()
private function handleMSG($line)
{
$parts = explode(' ', $line);
$subject = null;
$length = $parts[3];
$sid = $parts[2];

if (count($parts) == 5) {
$length = $parts[5];
$subject = $parts[3];
}

$payload = $this->receive($length);
$msg = new Message($subject, $payload, $sid, $this);

$func = $this->subscriptions[$sid];
if (is_callable($func)) {
$func($payload);
$func($msg);
} else {
return new \Exception('not callable');
}
Expand Down
150 changes: 150 additions & 0 deletions src/Message.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
<?php
namespace Nats;

/**
* Message Class.
*/
class Message
{
/**
* Message Subject
*
* @var string
*/
private $subject;

/**
* Message Body
*
* @var string
*/
private $body;

/**
* Message Ssid
*
* @var string
*/
private $sid;

/**
* Message related connection
*
* @var string
*/
private $conn;

/**
* Message constructor
* @param string $subject
* @param string $body
* @param string $sid
* @param Connection $conn
*/
public function __construct($subject, $body, $sid, $conn){
$this->setSubject($subject);
$this->setBody($body);
$this->setSid($sid);
$this->setConn($conn);
}

/**
* Set subject
*
* @param string $subject Subject
* @return $this
*/
public function setSubject($subject) {
$this->subject = $subject;
return $this;
}

/**
* Get subject
*
* @return string
*/
public function getSubject() {
return $this->subject;
}

/**
* Set body
*
* @param string $body Body
* @return $this
*/
public function setBody($body) {
$this->body = $body;
return $this;
}

/**
* Get body
*
* @return string
*/
public function getBody() {
return $this->body;
}

/**
* Set Ssid
*
* @param string $sid Ssid
* @return $this
*/
public function setSid($sid) {
$this->sid = $sid;
return $this;
}

/**
* Get Ssid
*
* @return string
*/
public function getSid() {
return $this->sid;
}

/**
* String representation of a message
*
* @return string
*/
public function __toString() {
return $this->getBody();
}

/**
* Set Conn
*
* @param Connection $conn
* @return $this
*/
public function setConn($conn) {
$this->conn = $conn;
return $this;
}

/**
* Get Conn
*
* @return string
*/
public function getConn() {
return $this->conn;
}

/**
* Allows you reply the message with a specific body
*
* @param string $body
*/
public function reply($body) {
$this->getConn()->publish(
$this->getSubject(), $body
);
}
}
17 changes: 17 additions & 0 deletions tests/Unit/ConnectionTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -153,4 +153,21 @@ public function testSubscription()
// time_nanosleep(1, 0);
$this->c->wait(1);
}

/**
* Test Request command
*
* @return void
*/
public function testRequest()
{
$this->c->subscribe("sayhello", function ($res) {
$res->reply("Hello, ".$res->getBody(). " !!!");
});

$this->c->request('sayhello', 'McFly', function ($message) {
$this->assertNotNull($message);
$this->assertEquals($message, 'Hello, McFly !!!');
});
}
}
38 changes: 38 additions & 0 deletions tests/Unit/MessageTest.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
<?php
namespace Nats\tests\Unit;

use Nats\Connection;
use Nats\Message;

/**
* Class MessageTest
*/
class MessageTest extends \PHPUnit_Framework_TestCase
{
/**
* Tests Message getters and setters. Only necessary for code coverage.
*
* @return void
*/
public function testSettersAndGetters()
{
$conn = new Connection();

$msg = new Message('subject', 'body', 'sid', $conn);

$this->assertEquals('subject', $msg->getSubject());
$this->assertEquals('body', $msg->getBody());
$this->assertEquals('sid', $msg->getSid());
$this->assertEquals($conn, $msg->getConn());

$msg
->setSubject('subject2')
->setBody('body2')
->setSid('sid2');

$this->assertEquals('subject2', $msg->getSubject());
$this->assertEquals('body2', $msg->getBody());
$this->assertEquals('sid2', $msg->getSid());

}
}

0 comments on commit 88d5ca2

Please sign in to comment.