Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Two methods added to AMQPConnection: #89

Closed
wants to merge 3 commits into from
Closed

Two methods added to AMQPConnection: #89

wants to merge 3 commits into from

Conversation

mkaczanowski
Copy link

  • attachSignal - setup signal handler
  • detachSignal - setup default handler

What is this slight modification about?
Worker/Cosumer listens for messages. I'd like to have a possibility to
check current status of the script.
Thanks to that modification, I can declare my own callback which
performs desirable action in response to signal ex. SIGUSR1.
I can't do it with pcntl extension (consume is blocking method).

This neat feature makes administrative work a bit easier.

+ attachSignal - setup signal handler
+ detachSignal - setup default handler

What is this slight modification about?
Worker/Cosumer listens for messages. I'd like to have a possibility to
check current status of the script.
Thanks to that modification, I can declare my own callback which
performs desirable action in response to signal ex. SIGUSR1.
I can't do it with pcntl extension (consume is blocking method).

This neat feature makes administrative work a bit easier.
@pinepain
Copy link
Contributor

Can you specify which exact problem does it solve? Maybe some demo, etc.?

P.S.: and I guess the extension will be broken if no pcntl extension loaded first.

@mkaczanowski
Copy link
Author

Example problems:

  1. I do want to print info about current work (there is no need to log it, just only to see what is happening)
  2. Some crucial actions cannot be interrupted, but administrator need to restart machine where workers are consuming, the easy solution will be to wait until the work will be done.

Quick code (may have errors):

<?php
$lastMessageRecivedTime = "";
$lastErrorMsg = "";
$lastStepAchieved = 0;
$canExit = 0;

function messageCallback($message, $queue){
    if($canExit == 0){
        $lastMessageRecivedTime = time();

        $ret_obj = do_some_work($message->getBody());
        $lastStepAchieved = 1;

        if($ret_obj->status){
            // This action takes a lot of time, 
            // ie. generates some statistical reports
            perform_slow_action();

            $lastStepAchieved = 2;

            // This action cannot be interrupted
            perform_very_important_action();

            $lastStepAchieved = 3;
        }else{
            $lastErrorMsg = $ret->errorMsg;
            $queue->nack($message->getDeliveryTag());
            return;
        }

        $queue->ack($message->getDeliveryTag());

        // Clean above parameters
        clean_params();
    }else{
        die("Stopping...");
    }
}

function getInfo($signo) use ($lastMessageRecivedTime, $lastErrorMsg, $lastStepAchieved){
    echo "Last Message Recieved Time: $lastMessageRecivedTime\n";
    echo "Last Error Message: $lastErrorMsg\n";
    echo "Last Step Achieved: $lastStepAchieved\n";
}

// Instead of instant exit, we wait for crucial action to be finished
function terminateCall($signo) use (&$canExit){
    $canExit = 1;
}

// Create a connection
$cnn = new AMQPConnection();
$cnn->setHost('127.0.0.1');
$cnn->setLogin('guest');
$cnn->setPassword('guest');
$cnn->connect();

// SIGUSR1
$cnn->attachSignal(10, getInfo);

// SIGINT
$cnn->attachSignal(2, terminateCall);

// Create a channel
$ch = new AMQPChannel($cnn);

// Declare a new exchange
$ex = new AMQPExchange($ch);
$ex->setName('ex1');
$ex->setType(AMQP_EX_TYPE_FANOUT);
$ex->declare();

// Create a new queue
$q = new AMQPQueue($ch);
$q->setName('queue1');
$q->declare();

// Read from the queue
$q->consume(messageCallback);

?>

@mkaczanowski
Copy link
Author

P.S.: and I guess the extension will be broken if no pcntl extension loaded first.

Could you explain it a bit more? :)
In amqp signal.h is included, so it should work.
Anyway thanks for notice I'll check it!

@pinepain
Copy link
Contributor

My bad, I was confused with php_signal_table usage in pcntl extension header file

@pinepain
Copy link
Contributor

In fact, I can't understand how does it works (sorry), can you explain (here or by mail)?

@mkaczanowski
Copy link
Author

Ok. I will write more detailed email to you, this evening :)
Thanks for fast response!

@pinepain
Copy link
Contributor

Am I right that you simply use raw C signal() instead of Zend mechanism?

@mkaczanowski
Copy link
Author

Yep.
is it bad? If, so I can change it.

@pinepain
Copy link
Contributor

I guess such signal handling shouldn't be in amqp extension but i would love to see this feature to bypass php signal handling mechanism as a separate extension while the use cases for such magic is much wider.

@mkaczanowski
Copy link
Author

According to Zend signal: https://wiki.php.net/rfc/zendsignals.

I see:
The pcntl extension allows signal handlers to be defined in PHP userspace via pctnl_signal(). The handler is installed via signal and any previously registered handlers for the specified signal are ignored

So, my solution is not so bas I think.

It could be in separate extension but my question is "Creating extension for two functions only have any sense?".

I had contact with two other firms which have this problem too (no possibility to check current status).
So thet did many "ugly" code tricks, to bypass it.
I'm pretty sure that this feature will be commonly used and that solution is directly connected to consumer.

There is detach function so if anyone will have problem with that, can remove the handler.

@pinepain
Copy link
Contributor

What you've done is awesome, but as a separate extension it gives support for those who need it. Note, that proctitle extension also provides few functions (on php5.5 it doesn't work properly for now).

I'm not one of those who make decision on include or reject modification, but I have to say that having weirdly designed application support in generic extension is not a good practice.

@mkaczanowski
Copy link
Author

Example problem:
Let us assume that we have some workers/consumers (denoted by "W"). Some of these workers are performing critical actions (i.e process, analyze clients data) so interrupting them is highly discouraged or even prohibited. Some of them does less important work but that effort takes a lot of time (that is why the task is in AMQP queue).

From the experience we know that we cannot foresee every efficiency related problem. What if one of our workers is simply killing database (someone i.e Developer wrote terrible SQL statement!)? Would it be a good idea to have possibility to check current status of the worker? Yes, indeed.

Thanks to that slight modification you'll be able to check what is happening in worker's flesh :)
What's more if you decide to send kill signal, then an appropriate callback will be called. So, developers and administrators are sure that no data are broken (in response to i.e SIGINT action).

Picture below presents general idea:
amqp module.

@pinepain
Copy link
Contributor

The pic is great and the idea is awesome, but this changes has nothing related to amqp. Your changes bypass PHP signal handling mechanism, that's all. Such changes are too general to be included in every extension that doing blocking read/write. As I note above, this MAY be a separate extension (experimental only) and probably other users will found it useful, incl. myself.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

2 participants