AnyEvent::Stomper - Flexible non-blocking STOMP client
use AnyEvent;
use AnyEvent::Stomper;
my $stomper = AnyEvent::Stomper->new(
host => 'localhost',
prot => '61613',
login => 'guest',
passcode => 'guest',
);
my $cv = AE::cv;
$stomper->subscribe(
id => 'foo',
destination => '/queue/foo',
on_receipt => sub {
my $err = $_[1];
if ( defined $err ) {
warn $err->message . "\n";
$cv->send;
return;
}
$stomper->send(
destination => '/queue/foo',
body => 'Hello, world!',
);
},
on_message => sub {
my $msg = shift;
my $body = $msg->body;
print "Consumed: $body\n";
$cv->send;
},
);
$cv->recv;
AnyEvent::Stomper is flexible non-blocking STOMP client. Supports following STOMP versions: 1.0, 1.1, 1.2.
Is recommended to read STOMP protocol specification before using the client: https://stomp.github.io/index.html
my $stomper = AnyEvent::Stomper->new(
host => 'localhost',
port => '61613',
login => 'guest',
passcode => 'guest',
vhost => '/',
heartbeat => [ 5000, 5000 ],
connection_timeout => 5,
lazy => 1,
reconnect_interval => 5,
on_connect => sub {
# handling...
},
on_disconnect => sub {
# handling...
},
on_error => sub {
my $err = shift;
# error handling...
},
);
-
host => $host
Server hostname (default: localhost)
-
port => $port
Server port (default: 61613)
-
login => $login
The user identifier used to authenticate against a secured STOMP server.
-
passcode => $passcode
The password used to authenticate against a secured STOMP server.
-
vhost => $vhost
The name of a virtual host that the client wishes to connect to.
-
heartbeat => \@heartbeat
Heart-beating can optionally be used to test the healthiness of the underlying TCP connection and to make sure that the remote end is alive and kicking. The first number sets interval in milliseconds between outgoing heart-beats to the STOMP server.
0
means, that the client will not send heart-beats. The second number sets interval in milliseconds between incoming heart-beats from the STOMP server.0
means, that the client does not want to receive heart-beats.heartbeat => [ 5000, 5000 ],
Not set by default.
-
connection_timeout => $connection_timeout
Specifies connection timeout. If the client could not connect to the server after specified timeout, the
on_error
callback is called with theE_CANT_CONN
error. The timeout specifies in seconds and can contain a fractional part.connection_timeout => 10.5,
By default the client use kernel's connection timeout.
-
lazy => $boolean
If enabled, the connection establishes at time when you will send the first command to the server. By default the connection establishes after calling of the
new
method.Disabled by default.
-
reconnect_interval => $reconnect_interval
If the connection to the server was lost, the client will try to restore the connection when you execute next command. By default reconnection is performed immediately, on next command execution. If the
reconnect_interval
parameter is specified, the client will try to reconnect only after this interval and commands executed between reconnections will be queued.The client will try to reconnect only once and, if attempt fails, the error object is passed to command callback. If you need several attempts of the reconnection, you must retry a command from the callback as many times, as you need.
reconnect_interval => 5,
Not set by default.
-
handle_params => \%params
Specifies AnyEvent::Handle parameters.
handle_params => { autocork => 1, linger => 60, }
Enabling of the
autocork
parameter can improve performance. See documentation on AnyEvent::Handle for more information. -
default_headers => \%headers
Specifies default headers for all outgoing frames.
default_headers => { 'x-foo' => 'foo_value', 'x-bar' => 'bar_value', }
-
command_headers
Specifies default headers for particular commands.
command_headers => { SEND => { receipt => 'auto', }, SUBSCRIBE => { durable => 'true', ack => 'client', }, }
-
on_connect => $cb->()
The
on_connect
callback is called when the connection is successfully established.Not set by default.
-
on_disconnect => $cb->()
The
on_disconnect
callback is called when the connection is closed by any reason.Not set by default.
-
on_error => $cb->( $err )
The
on_error
callback is called when occurred an error, which was affected on entire client (e. g. connection error or authentication error). Also theon_error
callback is called on command errors if the command callback is not specified. If theon_error
callback is not specified, the client just print an error messages toSTDERR
.
To execute the STOMP command you must call appropriate method. STOMP headers
can be specified as command parameters. The client automatically adds
content-length
header to all outgoing frames. Every command method can also
accept two additional parameters: the body
parameter where you can specify
the body of the frame, and the on_receipt
parameter that is the alternative
way to specify the command callback.
If you want to receive RECEIPT
frame, you must specify receipt
header.
The receipt
header can take the special value auto
. If it set, the
receipt identifier will be generated automatically by the client. The
RECEIPT
frame is passed to the command callback in first argument as the
object of the class AnyEvent::Stomper::Frame. If the receipt
header is
not specified the first argument of the command callback will be undef
.
For commands SUBSCRIBE
, UNSUBSCRIBE
, DISCONNECT
the client
automatically adds receipt
header for internal usage.
The command callback is called in one of two cases depending on the presence of
the receipt
header. First case, when the command was successfully written to
the socket. Second case, when the RECEIPT
frame will be received. In first
case on_receipt
callback can be called synchronously. If any error occurred
during the command execution, the error object is passed to the callback in
second argument. Error object is the instance of the class
AnyEvent::Stomper::Error.
The command callback is optional. If it is not specified and any error
occurred, the on_error
callback of the client is called.
The full list of all available headers for every command you can find in STOMP protocol specification and in documentation on your STOMP server. For various versions of STOMP protocol and various STOMP servers they can be differ.
Sends a message to a destination in the messaging system.
$stomper->send(
destination => '/queue/foo',
body => 'Hello, world!',
);
$stomper->send(
destination => '/queue/foo',
body => 'Hello, world!',
sub {
my $err = $_[1];
if ( defined $err ) {
my $err_msg = $err->message;
my $err_code = $err->code;
my $err_frame = $err->frame;
# error handling...
return;
}
}
);
$stomper->send(
destination => '/queue/foo',
receipt => 'auto',
body => 'Hello, world!',
on_receipt => sub {
my $receipt = shift;
my $err = shift;
if ( defined $err ) {
my $err_msg = $err->message;
my $err_code = $err->code;
my $err_frame = $err->frame;
# error handling...
return;
}
# receipt handling...
}
);
The method is used to register to listen to a given destination. The
subscribe
method require the on_message
callback, which is called on
every received MESSAGE
frame from the server. The MESSAGE
frame is passed
to the on_message
callback in first argument as the object of the class
AnyEvent::Stomper::Frame. If the subscribe
method is called with one
callback, this callback will be act as on_message
callback.
$stomper->subscribe(
id => 'foo',
destination => '/queue/foo',
sub {
my $msg = shift;
my $headers = $msg->headers;
my $body = $msg->body;
# message handling...
},
);
$stomper->subscribe(
id => 'foo',
destination => '/queue/foo',
ack => 'client',
on_receipt => sub {
my $receipt = shift;
my $err = shift;
if ( defined $err ) {
my $err_msg = $err->message;
my $err_code = $err->code;
my $err_frame = $err->frame;
return;
}
# receipt handling...
},
on_message => sub {
my $msg = shift;
my $headers = $msg->headers;
my $body = $msg->body;
# message handling...
},
);
The method is used to remove an existing subscription.
$stomper->unsubscribe(
id => 'foo',
destination => '/queue/foo',
sub {
my $receipt = shift;
my $err = shift;
if ( defined $err ) {
my $err_msg = $err->message;
my $err_code = $err->code;
my $err_frame = $err->frame;
return;
}
# receipt handling...
}
);
The method is used to acknowledge consumption of a message from a subscription
using client
or client-individual
acknowledgment. Any messages received
from such a subscription will not be considered to have been consumed until the
message has been acknowledged via an ack()
method. Method ack()
must be
called with required parameter message
in which must be specified the
MESSAGE
frame.
$stomper->ack( message => $msg );
$stomper->ack(
message => $msg,
receipt => 'auto',
sub {
my $receipt = shift;
my $err = shift;
if ( defined $err ) {
my $err_msg = $err->message;
my $err_code = $err->code;
my $err_frame = $err->frame;
# error handling...
}
# receipt handling...
}
);
The nack
method is the opposite of ack
method. It is used to tell the
server that the client did not consume the message. Method nack()
must be
called with required parameter message
in which must be specified the
MESSAGE
frame.
$stomper->nack( message => $msg );
$stomper->nack(
message => $msg,
receipt => 'auto',
sub {
my $receipt = shift;
my $err = shift;
if ( defined $err ) {
my $err_msg = $err->message;
my $err_code = $err->code;
my $err_frame = $err->frame;
# error handling...
}
# receipt handling...
}
);
The method begin
is used to start a transaction.
The method commit
is used to commit a transaction.
The method abort
is used to roll back a transaction.
A client can disconnect from the server at anytime by closing the socket but
there is no guarantee that the previously sent frames have been received by
the server. To do a graceful shutdown, where the client is assured that all
previous frames have been received by the server, you must call disconnect
method and wait for the RECEIPT
frame.
An alternative method to execute commands. In some cases it can be more convenient.
$stomper->execute( 'SEND',
destination => '/queue/foo',
receipt => 'auto',
body => 'Hello, world!',
sub {
my $receipt = shift;
my $err = shift;
if ( defined $err ) {
my $err_msg = $err->message;
my $err_code = $err->code;
my $err_frame = $err->frame;
# error handling...
return;
}
# receipt handling...
}
);
Every error object, passed to callback, contain error code, which can be used for programmatic handling of errors. AnyEvent::Stomper provides constants for error codes. They can be imported and used in expressions.
use AnyEvent::Stomper qw( :err_codes );
-
E_CANT_CONN
Can't connect to the server. All operations were aborted.
-
E_IO
Input/Output operation error. The connection to the STOMP server was closed and all operations were aborted.
-
E_CONN_CLOSED_BY_REMOTE_HOST
The connection closed by remote host. All operations were aborted.
-
E_CONN_CLOSED_BY_CLIENT
Connection closed by client prematurely. Uncompleted operations were aborted
-
E_OPRN_ERROR
Operation error. For example, missing required header.
-
E_UNEXPECTED_DATA
The client received unexpected data from the server. The connection to the STOMP server was closed and all operations were aborted.
-
E_READ_TIMEDOUT
Read timed out. The connection to the STOMP server was closed and all operations were aborted.
Gets current host of the client.
Gets current port of the client.
Gets or sets the connection_timeout
of the client. The undef
value resets
the connection_timeout
to default value.
Gets or sets reconnect_interval
of the client.
Gets or sets the on_connect
callback.
Gets or sets the on_disconnect
callback.
Gets or sets the on_error
callback.
The method for forced disconnection. All uncompleted operations will be aborted.
If you have the cluster of STOMP servers, you can use AnyEvent::Stomper::Cluster to work with it.
Eugene Ponizovsky, ponizovsky@gmail.com
Sponsored by SMS Online, dev.opensource@sms-online.com
Copyright (c) 2016-2017, Eugene Ponizovsky, SMS Online. All rights reserved.
This module is free software; you can redistribute it and/or modify it under the same terms as Perl itself.