Skip to content

Commit

Permalink
guard json_encode on json rpc client & server
Browse files Browse the repository at this point in the history
  • Loading branch information
prolic committed Jul 7, 2017
1 parent 484d4a1 commit 7e26fa7
Show file tree
Hide file tree
Showing 4 changed files with 127 additions and 13 deletions.
6 changes: 5 additions & 1 deletion src/Driver/AmqpExtension/Queue.php
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,11 @@ public function bind(string $exchangeName, string $routingKey = '', array $argum
*/
public function get(int $flags = Constants::AMQP_NOPARAM)
{
$envelope = $this->queue->get($flags);
try {
$envelope = $this->queue->get($flags);
} catch (\AMQPChannelException $e) {
throw new ChannelException($e->getMessage());
}

if ($envelope instanceof \AMQPEnvelope) {
$envelope = new Envelope($envelope);
Expand Down
25 changes: 14 additions & 11 deletions src/JsonRpc/JsonRpcClient.php
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,6 @@ final class JsonRpcClient implements Client
*/
private $requestIds = [];

/**
* @var ResponseCollection
*/
private $responseCollection;

/**
* Milliseconds to wait between two tries when reply is not yet there
*
Expand Down Expand Up @@ -103,13 +98,21 @@ public function addRequest(Request $request)
$attributes = $this->createAttributes($request);

$exchange = $this->getExchange($request->server());
$exchange->publish(json_encode($request->params()), $request->routingKey(), Constants::AMQP_NOPARAM, $attributes);

$message = json_encode($request->params());

if (json_last_error() !== JSON_ERROR_NONE) {
throw new Exception\InvalidArgumentException('Error during json encoding');
}

$exchange->publish($message, $request->routingKey(), Constants::AMQP_NOPARAM, $attributes);


if (null !== $request->id()) {
$this->requestIds[] = $request->id();
}

if (0 != $request->expiration() && ceil($request->expiration() / 1000) > $this->timeout) {
if (0 !== $request->expiration() && ceil($request->expiration() / 1000) > $this->timeout) {
$this->timeout = ceil($request->expiration() / 1000);
}
}
Expand All @@ -127,26 +130,26 @@ public function getResponseCollection(float $timeout = 0) : ResponseCollection
}

$now = microtime(true);
$this->responseCollection = new JsonRpcResponseCollection();
$responseCollection = new JsonRpcResponseCollection();

do {
$message = $this->queue->get(Constants::AMQP_AUTOACK);

if ($message instanceof Envelope) {
$this->responseCollection->addResponse($this->responseFromEnvelope($message));
$responseCollection->addResponse($this->responseFromEnvelope($message));
} else {
usleep($this->waitMillis * 1000);
}
$time = microtime(true);
} while (
$this->responseCollection->count() < count($this->requestIds)
$responseCollection->count() < count($this->requestIds)
&& (0 == $timeout || ($timeout > 0 && (($time - $now) < $timeout)))
);

$this->requestIds = [];
$this->timeout = 0;

return $this->responseCollection;
return $responseCollection;
}

/**
Expand Down
13 changes: 12 additions & 1 deletion src/JsonRpc/JsonRpcServer.php
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,18 @@ protected function sendReply(Response $response, Envelope $envelope)
];
}

$this->exchange->publish(json_encode($payload), $envelope->getReplyTo(), Constants::AMQP_NOPARAM, $attributes);
$message = json_encode($payload);

if (json_last_error() !== JSON_ERROR_NONE) {
$message = json_encode([
'error' => [
'code' => JsonRpcError::ERROR_CODE_32603,
'message' => 'Internal error',
],
]);
}

$this->exchange->publish($message, $envelope->getReplyTo(), Constants::AMQP_NOPARAM, $attributes);
}

/**
Expand Down
96 changes: 96 additions & 0 deletions tests/JsonRpc/AbstractJsonRpcClientAndServerTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,19 @@

namespace HumusTest\Amqp\JsonRpc;

use Humus\Amqp\Connection;
use Humus\Amqp\ConnectionOptions;
use Humus\Amqp\Constants;
use Humus\Amqp\Envelope;
use Humus\Amqp\Exception\InvalidArgumentException;
use Humus\Amqp\Exchange;
use Humus\Amqp\JsonRpc\JsonRpcClient;
use Humus\Amqp\JsonRpc\JsonRpcError;
use Humus\Amqp\JsonRpc\JsonRpcResponse;
use Humus\Amqp\JsonRpc\Request;
use Humus\Amqp\JsonRpc\JsonRpcServer;
use Humus\Amqp\JsonRpc\JsonRpcRequest;
use Humus\Amqp\Queue;
use HumusTest\Amqp\Helper\CanCreateConnection;
use HumusTest\Amqp\Helper\DeleteOnTearDownTrait;
use HumusTest\Amqp\TestAsset\ArrayLogger;
Expand Down Expand Up @@ -636,4 +641,95 @@ public function it_handles_invalid_requests_and_responses()
$this->assertEquals(JsonRpcError::ERROR_CODE_32603, $response9->error()->code());
$this->assertEquals('Invalid JSON-RPC response', $response9->error()->message());
}

/**
* @test
*/
public function it_throws_exception_on_client_when_data_could_not_be_encoded_to_json()
{
$this->expectException(InvalidArgumentException::class);
$this->expectExceptionMessage('Error during json encoding');

$options = $this->prophesize(ConnectionOptions::class);
$options->getLogin()->willReturn('user123')->shouldBeCalled();

$connection = $this->prophesize(Connection::class);
$connection->getOptions()->willReturn($options->reveal())->shouldBeCalled();

$queue = $this->prophesize(Queue::class);
$queue->getName()->willReturn('test-queue')->shouldBeCalled();
$queue->getConnection()->willReturn($connection->reveal())->shouldBeCalled();

$exchane = $this->prophesize(Exchange::class);

$client = new JsonRpcClient($queue->reveal(), ['rpc-server' => $exchane->reveal()]);

$client->addRequest(new JsonRpcRequest(
'rpc-server',
'something',
"\xB1\x31"
));
}

/**
* @test
*/
public function it_returns_error_on_server_when_data_could_not_be_encoded_to_json()
{
$connection = $this->createConnection();
$channel = $connection->newChannel();
$channel2 = $connection->newChannel();

$clientExchange = $channel->newExchange();
$clientExchange->setType('direct');
$clientExchange->setName('rpc-client');
$clientExchange->delete();
$clientExchange->declareExchange();

$serverExchange = $channel2->newExchange();
$serverExchange->setType('direct');
$serverExchange->setName('rpc-server');
$serverExchange->delete();
$serverExchange->declareExchange();

$clientQueue = $channel->newQueue();
$clientQueue->setFlags(Constants::AMQP_AUTODELETE | Constants::AMQP_EXCLUSIVE);
$clientQueue->declareQueue();
$clientQueue->bind($clientExchange->getName());

$serverQueue = $channel2->newQueue();
$serverQueue->setName('rpc-server-queue');
$serverQueue->delete();
$serverQueue->declareQueue();
$serverQueue->bind($serverExchange->getName());

$this->addToCleanUp($clientExchange);
$this->addToCleanUp($serverExchange);
$this->addToCleanUp($clientQueue);
$this->addToCleanUp($serverQueue);

$client = new JsonRpcClient($clientQueue, ['rpc-server' => $serverExchange]);

$request1 = new JsonRpcRequest('rpc-server', 'first', 1, 'request-1');

$client->addRequest($request1);

$callback = function (Request $request) {
return JsonRpcResponse::withResult($request->id(), "\xB1\x31");
};

$logger = new NullLogger();
$server = new JsonRpcServer($serverQueue, $callback, $logger, 1.0);

$server->consume(1);

$responses = $client->getResponseCollection(2);

$this->assertCount(1, $responses);

$response1 = $responses->getResponse('request-1');
$this->assertTrue($response1->isError());
$this->assertSame(JsonRpcError::ERROR_CODE_32603, $response1->error()->code());
$this->assertSame('Internal error', $response1->error()->message());
}
}

0 comments on commit 7e26fa7

Please sign in to comment.