Skip to content

Commit

Permalink
fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
mcuadros committed Jul 8, 2014
1 parent 43560da commit 5152914
Show file tree
Hide file tree
Showing 4 changed files with 65 additions and 179 deletions.
231 changes: 59 additions & 172 deletions src/Skeetr/Client.php
Expand Up @@ -23,93 +23,28 @@ class Client
{
protected $memoryLimit = 67108864; //64mb
protected $interationsLimit;
protected $sleepTimeOnError = 5;

protected $methods = [];
protected $logger;
protected $socket;
protected $callback;
protected $journal;
protected $loop;

protected $waitingSince;

public function __construct(Socket $socket)
{
$this->id = uniqid(null, true);
$this->journal = new Journal();
$this->socket = $socket;

//TODO: Optional
Manager::auto();
}

/**
* Sets the client id
*
* @param string $id
* @return self The current Process instance
*/
public function setId($id)
{
$this->id = $id;

return $this;
}

/**
* Sets the Gearman worker instance
*
* @param Worker $worker
* @return self The current Process instance
*/
public function setWorker(Worker $worker)
{
$this->worker = $worker;

return $this;
}

/**
* Sets the journal instance
*
* @param Journal $journal
* @return self The current Process instance
*/
public function setJournal(Journal $journal)
{
$this->journal = $journal;

return $this;
}

/**
* Sets the request channel name, where the request will be attended
*
* @param string $channelName
* @return self The current Process instance
*/
public function setChannelName($channelName)
{
$this->channelName = $channelName;

return $this;
}

/**
* Sets the callback, this callback generate the result of the request
*
* @param string $channelName
* @return self The current Process instance
*/
public function setCallback($callback)
public function setCallback(Callable $callback)
{
if ( !is_callable($callback) ) {
throw new \InvalidArgumentException(
'Invalid argument $callback, must be callabe.'
);
}

$this->callback = $callback;

return $this;
Expand Down Expand Up @@ -141,19 +76,6 @@ public function setInterationsLimit($times)
return $this;
}

/**
* Sets the number of seconds to wait when the client lost the connection with the server
*
* @param integer $secs
* @return self The current Process instance
*/
public function setSleepTimeOnError($secs)
{
$this->sleepTimeOnError = $secs;

return $this;
}

/**
* Sets the logger instance
*
Expand All @@ -168,49 +90,37 @@ public function setLogger(LoggerInterface $logger = null)
}

/**
* Returns the client id
* Sets the methods
*
* @return string
* @param Array $methods
* @return self The current Process instance
*/
public function getId()
public function setMethods(Array $methods)
{
return $this->id;
}

/**
* Returns the Gearman worker instance
*
* @return Worker
*/
public function getWorker()
{
return $this->worker;
}
$this->methods = [];
foreach ($methods as $method) {
$this->addMethod($method);
}

/**
* Returns the journal instance
*
* @return Journal
*/
public function getJournal()
{
return $this->journal;
return $this;
}

/**
* Returns the request channel name
* Adds a method instance
*
* @return string
* @param RPC\Method $method
* @return self The current Process instance
*/
public function getChannelName()
public function addMethod(RPC\Method $method)
{
return $this->channelName;
$tmp = explode('\\', get_class($method));
$this->methods[end($tmp)] = $method;
}

/**
* Returns the callback
*
* @return callback
* @return callable
*/
public function getCallback()
{
Expand Down Expand Up @@ -238,60 +148,66 @@ public function getInterationsLimit()
}

/**
* Returns the number of seconds to wait when the client lost the connection with the server
* Returns the logger instance
*
* @return LoggerInterface
*/
public function getLogger()
{
return $this->logger;
}

/**
* Returns all the methods
*
* @return integer
* @return Array
*/
public function getSleepTimeOnError()
public function getMethods()
{
return $this->sleepTimeOnError;
return $this->methods;
}

/**
* Returns the logger instance
* Returns the method with the given name
*
* @return LoggerInterface
* @param string $name
* @return RPC\Method
*/
public function getLogger()
public function getMethod($name)
{
return $this->logger;
var_dump(array_keys($this->methods));

if (!isset($this->methods[$name])) {
throw new RuntimeException(sprintf(
'Unable to find RPC method %s', $name
));
}

return $this->methods[$name];
}

/**
* Wait for and perform requests
*/
public function work()
{
$this->createdMethods();
$this->socket->connect();
$this->log('notice', 'Waiting for client...');
$this->socket->waitForConnection();

$this->log('notice', 'Waiting for job...');
$this->initializeMethods();
$this->initializeSocket();
$this->loop();
}

private function createdMethods()
private function initializeMethods()
{
$process = new RPC\Method\Process($this->callback);
$this->addMethod($process);
}

public function addMethod(RPC\Method $method)
private function initializeSocket()
{
$tmp = explode('\\', get_class($method));
$this->methods[end($tmp)] = $method;
}

public function getMethod($name)
{
if (!isset($this->methods[$name])) {
throw new RuntimeException(sprintf(
'Unable to find RPC method %s', $name
));
}

return $this->methods[$name];
$this->socket->connect();
$this->log('notice', 'Waiting for client...');
$this->socket->waitForConnection();
$this->log('notice', 'Waiting for job...');
}

protected function loop()
Expand All @@ -309,6 +225,7 @@ protected function loop()
$this->socket->put($response->toJSON());

//$this->checkStatus();*/
var_dump('request');
}
}

Expand All @@ -333,41 +250,6 @@ public function shutdown($message = null)
return true;
}

protected function error()
{
$msg = $this->worker->lastError();
$this->log('notice', sprintf('Gearman error: "%s"', $msg));

$this->journal->addError($msg);
}

protected function timeout()
{
$this->log('notice', 'Timeout');
$this->journal->addTimeout();
}

protected function success($secs)
{
$this->log('notice', sprintf('Executed job in %f sec(s)', $secs));
$this->journal->addSuccess($secs);
}

protected function idle()
{
$this->log('debug', 'Waiting for job...');
$this->journal->addIdle();
}

protected function disconnected()
{
$this->log('notice', sprintf('Connection lost, waiting %s seconds ...', $this->sleepTimeOnError));

$this->journal->addLostConnection($this->sleepTimeOnError);
sleep($this->sleepTimeOnError);
$this->idle();
}

protected function checkStatus()
{
if ($this->memoryLimit) {
Expand Down Expand Up @@ -395,7 +277,12 @@ protected function checkStatus()

protected function log($type, $message)
{
if ( !$this->logger ) return false;
var_dump($type);

if (!$this->logger) {
return false;
}

return $this->logger->$type($message);
}
}
3 changes: 2 additions & 1 deletion src/Skeetr/Client/RPC/Request.php
Expand Up @@ -27,8 +27,9 @@ class Request
public static function fromJSON($json)
{
$request = new static();
print_r($json);
$data = json_decode($json, true);

print_r($data);
if (!$data) {
throw new UnexpectedValueException(sprintf(
'Unexpected message, invalid JSON from nginx: "%s"', $json
Expand Down
5 changes: 4 additions & 1 deletion src/Skeetr/Client/Socket.php
Expand Up @@ -32,7 +32,10 @@ public function connect()
private function throwExceptionIfPathExists()
{
if (file_exists($this->path)) {
throw new RuntimeException('Temporary socket already exists.');
throw new RuntimeException(sprintf(
'Temporary socket (%s) already exists.',
$this->path
));
}
}

Expand Down
5 changes: 0 additions & 5 deletions tests/Skeetr/Tests/Client/MethodTest.php
Expand Up @@ -46,8 +46,3 @@ public function testRegister()
$this->assertTrue($channel->register($worker));
}
}

class MethodMock extends Method
{
public function process(\GearmanJob $job) {}
}

0 comments on commit 5152914

Please sign in to comment.