Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SySV Errors after a while. #17

Closed
thefuriouscoder opened this issue Jul 6, 2018 · 2 comments
Closed

SySV Errors after a while. #17

thefuriouscoder opened this issue Jul 6, 2018 · 2 comments
Labels

Comments

@thefuriouscoder
Copy link

Hello,

I'm trying to setup a daemon using your package, the daemon pulls n jobs from a queue (beanstalk/sqs/whatever) and for each job it calls to the needed worker, actually the worker does nothing but create a response, fake an execution time and returns the response to the daemon. It all goes fine until I start getting SySV related errors like these after a while:

2018-07-06 14:51:30.9168: 11157   11157: Loop 52
2018-07-06 14:51:31.2001: 11157   11157: SysV Error: Lifo\Daemon\IPC\SysV::put failed for call 513: Retrying. Error Code: 0 Success
2018-07-06 14:51:31.4774: 11157   11157: SysV Error: Lifo\Daemon\IPC\SysV::put failed for call 513: Retrying. Error Code: 11 Resource temporarily unavailable
2018-07-06 14:51:31.7886: 11157   11157: Mediator [facebook_video]: Error: Method call failed: workIn({arg1}): Unable to put call #513 on queue
2018-07-06 14:51:31.9814: 11157   11157: SysV Error: Lifo\Daemon\IPC\SysV::put failed for call 514: Retrying. Error Code: 11 Resource temporarily unavailable
2018-07-06 14:51:32.1996: 11157   11157: SysV Error: Lifo\Daemon\IPC\SysV::put failed for call 514: Retrying. Error Code: 11 Resource temporarily unavailable
2018-07-06 14:51:32.5721: 11157   11157: Mediator [facebook_video]: Error: Method call failed: workIn({arg1}): Unable to put call #514 on queue
2018-07-06 14:51:32.7659: 11157   11157: SysV Error: Lifo\Daemon\IPC\SysV::put failed for call 518: Retrying. Error Code: 11 Resource temporarily unavailable
2018-07-06 14:51:33.0003: 11157   11157: SysV Error: Lifo\Daemon\IPC\SysV::put failed for call 518: Retrying. Error Code: 11 Resource temporarily unavailable
2018-07-06 14:51:33.3191: 11157   11157: Mediator [facebook_video]: Error: Method call failed: workIn({arg1}): Unable to put call #518 on queue
2018-07-06 14:51:33.5107: 11157   11157: SysV Error: Lifo\Daemon\IPC\SysV::put failed for call 519: Retrying. Error Code: 11 Resource temporarily unavailable
2018-07-06 14:51:33.0786: 11157   11157: SysV Error: Lifo\Daemon\IPC\SysV::put failed for call 519: Retrying. Error Code: 11 Resource temporarily unavailable
2018-07-06 14:51:34.1396: 11157   11157: Mediator [facebook_video]: Error: Method call failed: workIn({arg1}): Unable to put call #519 on queue
2018-07-06 14:51:34.3497: 11157   11157: SysV Error: Lifo\Daemon\IPC\SysV::put failed for call 520: Retrying. Error Code: 11 Resource temporarily unavailable
2018-07-06 14:51:34.5784: 11157   11157: SysV Error: Lifo\Daemon\IPC\SysV::put failed for call 520: Retrying. Error Code: 11 Resource temporarily unavailable
2018-07-06 14:51:34.8872: 11157   11157: Mediator [facebook_video]: Error: Method call failed: workIn({arg1}): Unable to put call #520 on queue
2018-07-06 14:51:34.0888: 11157   11157: DEBUG: Daemon::wait: Loop took too long. [Interval=2.000000] [Duration=3.971637] [Extra=1.971637]

my main class looks as follows:

class QueueDaemon extends Daemon
{
    /**
     * @var QueueContainer
     */
    private $queues;

    public function execute()
    {
        $this->log("Loop %d", $this->getLoopIterations());
      
        try {
            $jobs = $this->queues->get('youtube_video')->get(10);

            foreach ($jobs as $job) {
                $this->worker($job->getJobType())->workIn($job)->then(function (JobResponse $response) {
                    $this->log('Received response from worker: ' . $response);
                    $job = $response->getJob();

                    if ($response->getStatus() == JobResponse::RESPONSE_STATUS_SUCCESS) {
                        $this->log('Job finished succesfuly. Deleting job with id: ' . $job->getQueueHandler());
                        $this->queues->get('youtube_video')->delete($job);

                    } else {
                        if ($this->shouldRetryJob($job, $this->queues->get('youtube_video'))) {
                            $this->log('Releasing job with id ' . $job->getQueueHandler() . ' back to queue for retry.');
                            $job->incrementRetries();
                            $this->queues->get('youtube_video')->release($job);

                        } else {
                            // Enqueue job in errror/delayed queue
                        }
                    }
                });

            }

        } catch (WrongPayloadAttributeException $e) {
            $this->log($e->getMessage());
        } catch (QueueOperationException $e) {
            $this->log($e->getMessage());
        } catch (\InvalidArgumentException $e) {
            $this->log($e->getMessage());
        }

    }

    protected function initialize()
    {
        parent::initialize(); // TODO: Change the autogenerated stub
        $this->initializeQueueContainer();
    }

    protected function initializeQueueContainer()
    {
        $this->queues = new QueueContainer();
        $queues = ConfigLoader::get('queues');

        foreach ($queues as $queueId => $queueConfig) {
            $queue = QueueFactory::create($queueConfig);
            $queue->setAdapter(
                AdapterFactory::create(
                    $queueConfig[AbstractQueue::QUEUE_CONFIG_TYPE],
                    ConfigLoader::get($queueConfig[AbstractQueue::QUEUE_CONFIG_ADAPTER][AbstractQueue::QUEUE_CONFIG_CONNECTION])
                )
            );

            $this->queues->add($queue,$queueId);
        }
    }

    protected function shouldRetryJob(AbstractJob $job, AbstractQueue $queue) : bool
    {
        return  $job->getRetries() <= $queue->getMaxRetries();
    }


}

about workers, it is set to autorestart and max_processes = 10, my worker code:

class Video extends QueueWorker
{
    use LogTrait;

    const WORKER_NAME = 'youtube_video';

    public function initialize()
    {
        // TODO: Implement initialize() method.
    }

    public function workIn(AbstractJob $job) : JobResponse
    {
        $jobResponse = new JobResponse();
        $jobResponse->setJob($job);
        $jobResponse->setStartedAd(new \DateTime());

        $this->log('Received job on worker with pid ' . posix_getpid() . ' jobId: ' . $job->getQueueHandler() . ' and type: ' . $job->getType() . ' and payload ' . json_encode($job->getPayload()));

        $statuses = [JobResponse::RESPONSE_STATUS_SUCCESS,JobResponse::RESPONSE_STATUS_ERROR];
        $jobResponse->setStatus($statuses[rand(0,1)]);

//        $fakeDelay = rand(0,1000);
//        $this->log('Faking execution delay: ' . $fakeDelay . ' microseconds');
//        usleep($fakeDelay);

        $jobResponse->setFinishedAt(new \DateTime());

        $this->log('Returning job response: ' . json_encode($jobResponse));

        return $jobResponse;

    }

}

and daemon init code:

QueueDaemon::getInstance()
    ->setDaemonize(true)
    ->setLoopInterval(2)
    ->setVerbose(true)
    ->setDebug(true)
    ->setDebugLevel(3)
    ->setLogFile('/tmp/queue_daemon.log')
    ->run();

hope you can help me.

Thanks in advance.
Diego.

@phuze
Copy link

phuze commented Jul 14, 2018

I stayed away from workers due to limitations with the IPC layer. A few things the author has noted:

If you spend too much time polling in each iteration the rest of your daemon will suffer. So, try not spend more than 1 second at a time in the execute loop.

One thing to be aware of. The daemon is not really meant for high-speed processing. The 'calling' mechanism in the back-end that communicates with sub-processes can sometimes break if you make too many calls too quickly. A few messages per second is fine, but hundreds per second will start to become unstable, most likely. I've never tested how much I could do within workers.

Looking at your logs, I think you're encountering those limitations:

DEBUG: Daemon::wait: Loop took too long.

I'm not sure if it'll help your situation, but have you considered using tasks instead of workers? Tasks are the same thing, only they don't communicate back to the main daemon process thereby avoiding the IPC bottleneck entirely. They are fire and forget.

In my use case, I've got 5 daemons running, each one monitors various azure queues and fires off various processes (tasks) when there are messages in a queue. Each task runs as its own process in linux -- they exit gracefully and I have been hammering these daemons pretty hard. I can process hundreds of messages per second using this approach and its been super stable.

Just gotta make sure you keep the execute() logic light and have your tasks do all the heavy lifting. I process multiple message per task before letting that task exit -- just so im not spawning a process for every single message a queue.

Example:

/**
* Main application logic.
* Called every loop cycle
**/
protected function execute()
{

    # broadcast a heartbeat every 5 minutes (based on run interval)
    if ($this->getLoopIterations() % 60 == 0) {
        $this->log("service status: okay");
    }

    try {
        # get order queue counts
        $createCount = AzureQ::queueCount('shopify-create');
        $ordersCount = AzureQ::queueCount('shopify-orders');
    } catch (Exception $e) {
        return;
    }

    # kick off appropriate processor
    if ($createCount > 0) { $this->task('OrderCreate'); }
    if ($ordersCount > 0) { $this->task('OrderProcess'); }

}
use Lifo\Daemon\Task\AbstractTask;

class OrderProcess extends AbstractTask
{

# easy access to the daemon logging routines
# so we don't have to use Daemon::getInstance()->log
use \Lifo\Daemon\LogTrait;

public function run()
{
    #do stuff
}

@lifo101
Copy link
Owner

lifo101 commented Oct 25, 2018

nothing i can do with this right now.

@lifo101 lifo101 closed this as completed Oct 25, 2018
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

3 participants