Skip to content

Commit

Permalink
Merge 6f6918a into e4d0453
Browse files Browse the repository at this point in the history
  • Loading branch information
SamMousa committed Dec 22, 2021
2 parents e4d0453 + 6f6918a commit bacc3e1
Show file tree
Hide file tree
Showing 28 changed files with 233 additions and 185 deletions.
3 changes: 2 additions & 1 deletion src/Command/AbstractCommand.php
Expand Up @@ -5,6 +5,7 @@
namespace Pheanstalk\Command;

use Pheanstalk\Contract\CommandInterface;
use Pheanstalk\Contract\ResponseInterface;
use Pheanstalk\Contract\ResponseParserInterface;
use Pheanstalk\Exception\CommandException;
use Pheanstalk\Response\ArrayResponse;
Expand Down Expand Up @@ -40,7 +41,7 @@ public function getResponseParser(): ResponseParserInterface
/**
* Creates a Response for the given data.
*/
protected function createResponse(string $name, array $data = []): ArrayResponse
protected function createResponse(string $name, array $data = []): ResponseInterface
{
return new ArrayResponse($name, $data);
}
Expand Down
2 changes: 1 addition & 1 deletion src/Command/BuryCommand.php
Expand Up @@ -33,7 +33,7 @@ public function getCommandLine(): string
);
}

public function parseResponse(string $responseLine, ?string $responseData): ArrayResponse
public function parseResponse(\Pheanstalk\ResponseLine $responseLine, ?string $responseData): \Pheanstalk\Contract\ResponseInterface
{
if ($responseLine == ResponseInterface::RESPONSE_NOT_FOUND) {
throw new Exception\JobNotFoundException(sprintf(
Expand Down
2 changes: 1 addition & 1 deletion src/Command/DeleteCommand.php
Expand Up @@ -20,7 +20,7 @@ public function getCommandLine(): string
return 'delete ' . $this->jobId;
}

public function parseResponse(string $responseLine, ?string $responseData): ArrayResponse
public function parseResponse(\Pheanstalk\ResponseLine $responseLine, ?string $responseData): \Pheanstalk\Contract\ResponseInterface
{
if ($responseLine == ResponseInterface::RESPONSE_NOT_FOUND) {
throw new Exception\JobNotFoundException(sprintf(
Expand Down
17 changes: 7 additions & 10 deletions src/Command/IgnoreCommand.php
Expand Up @@ -8,6 +8,7 @@
use Pheanstalk\Contract\ResponseParserInterface;
use Pheanstalk\Exception;
use Pheanstalk\Response\ArrayResponse;
use Pheanstalk\ResponseLine;

/**
* The 'ignore' command.
Expand All @@ -20,18 +21,14 @@ public function getCommandLine(): string
return 'ignore ' . $this->tube;
}

public function parseResponse(string $responseLine, ?string $responseData): ArrayResponse
public function parseResponse(ResponseLine $responseLine, ?string $responseData): ResponseInterface
{
if (preg_match('#^WATCHING (\d+)$#', $responseLine, $matches)) {
return $this->createResponse('WATCHING', [
'count' => (int) $matches[1],
]);
} elseif ($responseLine == ResponseInterface::RESPONSE_NOT_IGNORED) {
throw new Exception\ServerException(
$responseLine . ': cannot ignore last tube in watchlist'
);
if ($responseLine->getName() === ResponseInterface::RESPONSE_WATCHING) {
return $this->createResponse($responseLine->getName(), ['count' => (int)$responseLine->getArguments()[0]]);
} elseif ($responseLine->getName() === ResponseInterface::RESPONSE_NOT_IGNORED) {
throw new Exception\ServerException('Cannot ignore last tube in watchlist');
} else {
throw new Exception('Unhandled response: ' . $responseLine);
throw new Exception("Unhandled response: {$responseLine->getName()}");
}
}
}
2 changes: 1 addition & 1 deletion src/Command/KickCommand.php
Expand Up @@ -34,7 +34,7 @@ public function getCommandLine(): string
/* (non-phpdoc)
* @see ResponseParser::parseResponse()
*/
public function parseResponse(string $responseLine, ?string $responseData): ArrayResponse
public function parseResponse(\Pheanstalk\ResponseLine $responseLine, ?string $responseData): \Pheanstalk\Contract\ResponseInterface
{
list($code, $count) = explode(' ', $responseLine);

Expand Down
2 changes: 1 addition & 1 deletion src/Command/KickJobCommand.php
Expand Up @@ -29,7 +29,7 @@ public function getCommandLine(): string
/* (non-phpdoc)
* @see ResponseParser::parseResponse()
*/
public function parseResponse(string $responseLine, ?string $responseData): ArrayResponse
public function parseResponse(\Pheanstalk\ResponseLine $responseLine, ?string $responseData): \Pheanstalk\Contract\ResponseInterface
{
if ($responseLine == ResponseInterface::RESPONSE_NOT_FOUND) {
throw new Exception\JobNotFoundException(sprintf(
Expand Down
2 changes: 1 addition & 1 deletion src/Command/ListTubeUsedCommand.php
Expand Up @@ -19,7 +19,7 @@ public function getCommandLine(): string
return 'list-tube-used';
}

public function parseResponse(string $responseLine, ?string $responseData): ArrayResponse
public function parseResponse(\Pheanstalk\ResponseLine $responseLine, ?string $responseData): \Pheanstalk\Contract\ResponseInterface
{
return $this->createResponse('USING', [
'tube' => preg_replace('#^USING (.+)$#', '$1', $responseLine),
Expand Down
18 changes: 8 additions & 10 deletions src/Command/PauseTubeCommand.php
Expand Up @@ -8,6 +8,7 @@
use Pheanstalk\Contract\ResponseParserInterface;
use Pheanstalk\Exception;
use Pheanstalk\Response\ArrayResponse;
use Pheanstalk\ResponseLine;

/**
* The 'pause-tube' command.
Expand Down Expand Up @@ -40,18 +41,15 @@ public function getCommandLine(): string
);
}

public function parseResponse(string $responseLine, ?string $responseData): ArrayResponse
public function parseResponse(ResponseLine $responseLine, ?string $responseData): ResponseInterface
{
if ($responseLine == ResponseInterface::RESPONSE_NOT_FOUND) {
throw new Exception\ServerException(sprintf(
return match ($responseLine->getName()) {
ResponseInterface::RESPONSE_NOT_FOUND => throw new Exception\ServerException(sprintf(
'%s: tube %s does not exist.',
$responseLine,
$responseLine->getName(),
$this->tube
));
} elseif ($responseLine == ResponseInterface::RESPONSE_PAUSED) {
return $this->createResponse(ResponseInterface::RESPONSE_PAUSED);
} else {
throw new Exception('Unhandled response: "' . $responseLine . '"');
}
)),
ResponseInterface::RESPONSE_PAUSED => $this->createResponse($responseLine->getName())
};
}
}
28 changes: 8 additions & 20 deletions src/Command/PeekCommand.php
Expand Up @@ -8,6 +8,7 @@
use Pheanstalk\Contract\ResponseParserInterface;
use Pheanstalk\Exception;
use Pheanstalk\Response\ArrayResponse;
use Pheanstalk\ResponseLine;

/**
* The 'peek', 'peek-ready', 'peek-delayed' and 'peek-buried' commands.
Expand All @@ -28,10 +29,7 @@ class PeekCommand extends AbstractCommand implements ResponseParserInterface
self::TYPE_BURIED,
];

/**
* @var string
*/
private $subcommand;
private string $subcommand;

public function __construct(string $peekSubject)
{
Expand All @@ -50,22 +48,12 @@ public function getCommandLine(): string
return sprintf('peek-%s', $this->subcommand);
}

public function parseResponse(string $responseLine, ?string $responseData): ArrayResponse
public function parseResponse(ResponseLine $responseLine, ?string $responseData): ResponseInterface
{
if ($responseLine == ResponseInterface::RESPONSE_NOT_FOUND) {
return $this->createResponse(ResponseInterface::RESPONSE_NOT_FOUND);
}

if (preg_match('#^FOUND (\d+) \d+$#', $responseLine, $matches)) {
return $this->createResponse(
ResponseInterface::RESPONSE_FOUND,
[
'id' => (int) $matches[1],
'jobdata' => $responseData,
]
);
}

throw new Exception\ServerException("Unexpected response");
return match($responseLine->getName()) {
ResponseInterface::RESPONSE_NOT_FOUND => $this->createResponse($responseLine->getName()),
ResponseInterface::RESPONSE_FOUND => $this->createResponse(
$responseLine->getName(), ['id' => (int) $responseLine->getArguments()[0], 'jobdata' => $responseData]),
};
}
}
2 changes: 1 addition & 1 deletion src/Command/PeekJobCommand.php
Expand Up @@ -21,7 +21,7 @@ public function getCommandLine(): string
return sprintf('peek %u', $this->jobId);
}

public function parseResponse(string $responseLine, ?string $responseData): ArrayResponse
public function parseResponse(\Pheanstalk\ResponseLine $responseLine, ?string $responseData): \Pheanstalk\Contract\ResponseInterface
{
if ($responseLine == ResponseInterface::RESPONSE_NOT_FOUND) {
$message = sprintf(
Expand Down
41 changes: 15 additions & 26 deletions src/Command/PutCommand.php
Expand Up @@ -4,9 +4,12 @@

namespace Pheanstalk\Command;

use Pheanstalk\Contract\ResponseInterface;
use Pheanstalk\Contract\ResponseParserInterface;
use Pheanstalk\Exception;
use Pheanstalk\Exception\ServerOutOfMemoryException;
use Pheanstalk\Response\ArrayResponse;
use Pheanstalk\ResponseLine;

/**
* The 'put' command.
Expand Down Expand Up @@ -64,37 +67,23 @@ public function getDataLength(): int
return mb_strlen($this->data, '8bit');
}

public function parseResponse(string $responseLine, ?string $responseData): ArrayResponse
public function parseResponse(\Pheanstalk\ResponseLine $responseLine, ?string $responseData): \Pheanstalk\Contract\ResponseInterface
{
if (preg_match('#^INSERTED (\d+)$#', $responseLine, $matches)) {
return $this->createResponse('INSERTED', [
'id' => (int) $matches[1],
]);
} elseif (preg_match('#^BURIED (\d)+$#', $responseLine, $matches)) {
throw new Exception\ServerOutOfMemoryException(sprintf(
'%s: server ran out of memory trying to grow the priority queue data structure.',
return match($responseLine->getName()) {
ResponseInterface::RESPONSE_INSERTED => $this->createResponse($responseLine->getName(), ['id' => (int) $responseLine->getArguments()]),
ResponseInterface::RESPONSE_BURIED => throw new ServerOutOfMemoryException(sprintf(
'%s: server ran out of memory trying to grow the priority queue data structure.',
$responseLine
));
} elseif (preg_match('#^JOB_TOO_BIG$#', $responseLine)) {
throw new Exception\JobTooBigException(sprintf(
)),
ResponseInterface::RESPONSE_JOB_TOO_BIG => throw new Exception\JobTooBigException(sprintf(
'%s: job data exceeds server-enforced limit',
$responseLine
));
} elseif (preg_match('#^EXPECTED_CRLF#', $responseLine)) {
throw new Exception\ClientBadFormatException(sprintf(
)),
ResponseInterface::RESPONSE_EXPECTED_CRLF => throw new Exception\ClientBadFormatException(sprintf(
'%s: CRLF expected',
$responseLine
));
} elseif (preg_match('#^DRAINING#', $responseLine)) {
throw new Exception\ServerDrainingException(sprintf(
'%s: server is in drain mode and no longer accepting new jobs',
$responseLine
));
} else {
throw new Exception(sprintf(
'Unhandled response: %s',
$responseLine
));
}
)),

};
}
}
9 changes: 5 additions & 4 deletions src/Command/ReleaseCommand.php
Expand Up @@ -9,6 +9,7 @@
use Pheanstalk\Contract\ResponseParserInterface;
use Pheanstalk\Exception;
use Pheanstalk\Response\ArrayResponse;
use Pheanstalk\ResponseLine;

/**
* The 'release' command.
Expand Down Expand Up @@ -40,24 +41,24 @@ public function getCommandLine(): string
);
}

public function parseResponse(string $responseLine, ?string $responseData): ArrayResponse
public function parseResponse(ResponseLine $responseLine, ?string $responseData): \Pheanstalk\Contract\ResponseInterface
{
if ($responseLine == ResponseInterface::RESPONSE_BURIED) {
if ($responseLine->getName() === ResponseInterface::RESPONSE_BURIED) {
throw new Exception\ServerOutOfMemoryException(sprintf(
'Job %u %s: out of memory trying to grow data structure',
$this->jobId,
$responseLine
));
}

if ($responseLine == ResponseInterface::RESPONSE_NOT_FOUND) {
if ($responseLine->getName() === ResponseInterface::RESPONSE_NOT_FOUND) {
throw new Exception\JobNotFoundException(sprintf(
'Job %u %s: does not exist or is not reserved by client',
$this->jobId,
$responseLine
));
}

return $this->createResponse($responseLine);
return $this->createResponse($responseLine->getName());
}
}
2 changes: 1 addition & 1 deletion src/Command/ReserveCommand.php
Expand Up @@ -21,7 +21,7 @@ public function getCommandLine(): string
return 'reserve';
}

public function parseResponse(string $responseLine, ?string $responseData): ArrayResponse
public function parseResponse(\Pheanstalk\ResponseLine $responseLine, ?string $responseData): \Pheanstalk\Contract\ResponseInterface
{
if ($responseLine === ResponseInterface::RESPONSE_DEADLINE_SOON) {
throw new DeadlineSoonException();
Expand Down
10 changes: 5 additions & 5 deletions src/Command/ReserveJobCommand.php
Expand Up @@ -9,6 +9,7 @@
use Pheanstalk\Contract\ResponseParserInterface;
use Pheanstalk\Exception\JobNotFoundException;
use Pheanstalk\Response\ArrayResponse;
use Pheanstalk\ResponseLine;

/**
* The 'reserve-job' command.
Expand All @@ -28,15 +29,14 @@ public function getCommandLine(): string
return sprintf('reserve-job %d', $this->job);
}

public function parseResponse(string $responseLine, ?string $responseData): ArrayResponse
public function parseResponse(ResponseLine $responseLine, ?string $responseData): ResponseInterface
{
if ($responseLine === ResponseInterface::RESPONSE_NOT_FOUND) {
if ($responseLine->getName() === ResponseInterface::RESPONSE_NOT_FOUND) {
throw new JobNotFoundException();
}

list($code, $id) = explode(' ', $responseLine);
return $this->createResponse($code, [
'id' => (int) $id,
return $this->createResponse($responseLine->getName(), [
'id' => (int) $responseLine->getArguments()[0],
'jobdata' => $responseData,
]);
}
Expand Down
2 changes: 1 addition & 1 deletion src/Command/ReserveWithTimeoutCommand.php
Expand Up @@ -32,7 +32,7 @@ public function getCommandLine(): string
return sprintf('reserve-with-timeout %s', $this->timeout);
}

public function parseResponse(string $responseLine, ?string $responseData): ArrayResponse
public function parseResponse(\Pheanstalk\ResponseLine $responseLine, ?string $responseData): \Pheanstalk\Contract\ResponseInterface
{
if (
$responseLine === ResponseInterface::RESPONSE_DEADLINE_SOON
Expand Down
2 changes: 1 addition & 1 deletion src/Command/TouchCommand.php
Expand Up @@ -24,7 +24,7 @@ public function getCommandLine(): string
return sprintf('touch %u', $this->jobId);
}

public function parseResponse(string $responseLine, ?string $responseData): ArrayResponse
public function parseResponse(\Pheanstalk\ResponseLine $responseLine, ?string $responseData): \Pheanstalk\Contract\ResponseInterface
{
if ($responseLine == ResponseInterface::RESPONSE_NOT_FOUND) {
throw new Exception\JobNotFoundException(sprintf(
Expand Down
6 changes: 3 additions & 3 deletions src/Command/UseCommand.php
Expand Up @@ -21,10 +21,10 @@ public function getCommandLine(): string
return 'use ' . $this->tube;
}

public function parseResponse(string $responseLine, ?string $responseData): ArrayResponse
public function parseResponse(\Pheanstalk\ResponseLine $responseLine, ?string $responseData): \Pheanstalk\Contract\ResponseInterface
{
return $this->createResponse('USING', [
'tube' => preg_replace('#^USING (.+)$#', '$1', $responseLine),
return $this->createResponse($responseLine->getName(), [
'tube' => $responseLine->getArguments()[0]
]);
}
}
6 changes: 3 additions & 3 deletions src/Command/WatchCommand.php
Expand Up @@ -18,10 +18,10 @@ public function getCommandLine(): string
return 'watch ' . $this->tube;
}

public function parseResponse(string $responseLine, ?string $responseData): ArrayResponse
public function parseResponse(\Pheanstalk\ResponseLine $responseLine, ?string $responseData): \Pheanstalk\Contract\ResponseInterface
{
return $this->createResponse('WATCHING', [
'count' => preg_replace('#^WATCHING (.+)$#', '$1', $responseLine),
return $this->createResponse($responseLine->getName(), [
'count' => $responseLine->getArguments()[0],
]);
}
}

0 comments on commit bacc3e1

Please sign in to comment.