From 093a4e5fcf5a0ff6d6d8520401046c82464e16a0 Mon Sep 17 00:00:00 2001 From: Maciej Mazur Date: Fri, 20 Mar 2015 16:56:30 +0100 Subject: [PATCH] Prototype --- src/Riak/Client/Command/DataType/FetchMap.php | 11 +++++--- .../DataType/FetchDataTypeOperation.php | 20 ++++++++++---- src/Riak/Client/Core/RiakCluster.php | 16 +++++++++--- src/Riak/Client/Core/RiakHttpTransport.php | 26 +++++++++++++++++++ src/Riak/Client/Core/RiakNode.php | 18 +++++++++++++ .../Core/Transport/Http/DataType/HttpGet.php | 20 +++++++++++--- src/Riak/Client/RiakClient.php | 12 +++++++++ 7 files changed, 108 insertions(+), 15 deletions(-) diff --git a/src/Riak/Client/Command/DataType/FetchMap.php b/src/Riak/Client/Command/DataType/FetchMap.php index 22e419e..6b06992 100644 --- a/src/Riak/Client/Command/DataType/FetchMap.php +++ b/src/Riak/Client/Command/DataType/FetchMap.php @@ -40,14 +40,19 @@ public function __construct(RiakLocation $location = null, array $options = []) */ public function execute(RiakCluster $cluster) { - $config = $cluster->getRiakConfig(); - $converter = $config->getCrdtResponseConverter(); - $operation = new FetchMapOperation($converter, $this->location, $this->options); + $operation = $this->createOperation($cluster); $response = $cluster->execute($operation); return $response; } + public function createOperation(RiakCluster $cluster) + { + $config = $cluster->getRiakConfig(); + $converter = $config->getCrdtResponseConverter(); + return new FetchMapOperation($converter, $this->location, $this->options); + } + /** * @param \Riak\Client\Core\Query\RiakLocation $location * @param array $options diff --git a/src/Riak/Client/Core/Operation/DataType/FetchDataTypeOperation.php b/src/Riak/Client/Core/Operation/DataType/FetchDataTypeOperation.php index c39ad29..4c135ee 100644 --- a/src/Riak/Client/Core/Operation/DataType/FetchDataTypeOperation.php +++ b/src/Riak/Client/Core/Operation/DataType/FetchDataTypeOperation.php @@ -48,12 +48,9 @@ public function __construct(CrdtResponseConverter $converter, RiakLocation $loca */ public function execute(RiakTransport $adapter) { - $getRequest = $this->createGetRequest(); + $getRequest = $this->createRequest(); $getResponse = $adapter->send($getRequest); - $datatype = $this->converter->convert($getResponse); - $response = $this->createDataTypeResponse($datatype, $getResponse->context); - - return $response; + return $this->createResponse($getResponse); } /** @@ -75,6 +72,19 @@ private function createGetRequest() return $request; } + public function createRequest() + { + return $this->createGetRequest(); + } + + public function createResponse($response) + { + $datatype = $this->converter->convert($response); + $response = $this->createDataTypeResponse($datatype, $response->context); + + return $response; + } + /** * @param \Riak\Client\Core\Query\Crdt\DataType $datatype * @param string $context diff --git a/src/Riak/Client/Core/RiakCluster.php b/src/Riak/Client/Core/RiakCluster.php index 9b90f47..0270972 100644 --- a/src/Riak/Client/Core/RiakCluster.php +++ b/src/Riak/Client/Core/RiakCluster.php @@ -97,6 +97,18 @@ public function getRiakConfig() * @return \Riak\Client\RiakResponse */ public function execute(RiakOperation $operation) + { + $node = $this->selectNode(); + return $node->execute($operation); + } + + public function batch(array $operations) + { + $node = $this->selectNode(); + return $node->batch($operations); + } + + private function selectNode() { if (empty($this->nodes)) { throw new RiakException('There are no nodes in the cluster.'); @@ -104,8 +116,6 @@ public function execute(RiakOperation $operation) $size = count($this->nodes); $index = mt_rand(0, $size - 1); - $node = $this->nodes[$index]; - - return $node->execute($operation); + return $this->nodes[$index]; } } diff --git a/src/Riak/Client/Core/RiakHttpTransport.php b/src/Riak/Client/Core/RiakHttpTransport.php index 7e80c19..5abb509 100644 --- a/src/Riak/Client/Core/RiakHttpTransport.php +++ b/src/Riak/Client/Core/RiakHttpTransport.php @@ -91,4 +91,30 @@ public function send(Request $request) throw RiakTransportException::httpRequestException($exc); } } + + public function batch(array $requests) + { + $strategies = []; + foreach ($requests as $key => $req) { + $strategies[$key] = $this->createAdapterStrategyFor($req); + } + + $strategiesRequests = []; + foreach ($strategies as $key => $strategy) { + $strategiesRequests[$key] = $strategy->buildRequest($requests[$key]); + } + + $results = \GuzzleHttp\Pool::batch($this->client, $strategiesRequests); + + $strategiesResponses = []; + foreach($strategiesRequests as $key => $strategyRequest) { + $strategiesResponses[$key] = $results->getResult($strategyRequest); + } + + $responses = []; + foreach ($strategiesResponses as $key => $strategyResponse) { + $responses[$key] = $strategies[$key]->buildResponse($strategiesResponses[$key]); + } + return $responses; + } } diff --git a/src/Riak/Client/Core/RiakNode.php b/src/Riak/Client/Core/RiakNode.php index 4d61a42..99b34fe 100644 --- a/src/Riak/Client/Core/RiakNode.php +++ b/src/Riak/Client/Core/RiakNode.php @@ -40,4 +40,22 @@ public function execute(RiakOperation $operation) { return $operation->execute($this->adapter); } + + public function batch(array $operations) + { + $requests = array_map( + function ($operation) { + return $operation->createRequest(); + }, + $operations + ); + + $adapterResponses = $this->adapter->batch($requests); + + $responses = []; + foreach ($adapterResponses as $key => $resp) { + $responses[$key] = $operations[$key]->createResponse($resp); + } + return $responses; + } } diff --git a/src/Riak/Client/Core/Transport/Http/DataType/HttpGet.php b/src/Riak/Client/Core/Transport/Http/DataType/HttpGet.php index 920c03c..6da057a 100644 --- a/src/Riak/Client/Core/Transport/Http/DataType/HttpGet.php +++ b/src/Riak/Client/Core/Transport/Http/DataType/HttpGet.php @@ -1,5 +1,5 @@ createHttpRequest($request); + $httpRequest = $this->buildRequest($request); try { $httpResponse = $this->client->send($httpRequest); @@ -78,6 +77,19 @@ public function send(Request $request) throw $e; } + return $this->buildResponse($httpResponse); + } + + public function buildRequest(Request $request) + { + return $this->createHttpRequest($request); + } + + public function buildResponse($httpResponse) + { + $response = new GetResponse(); + + $code = $httpResponse->getStatusCode(); if ( ! isset($this->validResponseCodes[$code])) { throw RiakTransportException::unexpectedStatusCode($code); } @@ -91,6 +103,6 @@ public function send(Request $request) $response->context = $context; $response->value = $this->opConverter->fromArray($type, $value); - return $response; + return $response; } } diff --git a/src/Riak/Client/RiakClient.php b/src/Riak/Client/RiakClient.php index 35a1ff4..cef0484 100644 --- a/src/Riak/Client/RiakClient.php +++ b/src/Riak/Client/RiakClient.php @@ -60,4 +60,16 @@ public function execute(RiakCommand $command) { return $command->execute($this->cluster); } + + public function batch(array $commands) + { + $cluster = $this->cluster; + $operations = array_map( + function($command) use ($cluster) { + return $command->createOperation($cluster); + }, + $commands + ); + return $this->cluster->batch($operations); + } }