Skip to content
This repository

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Fetching contributors…

Octocat-spinner-32-eaf2f5

Cannot retrieve contributors at this time

file 309 lines (221 sloc) 9.172 kb
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308
package POE::Queue;

use vars qw($VERSION);
$VERSION = '1.281'; # NOTE - Should be #.### (three decimal places)

use Carp qw(croak);

sub new {
  my $type = shift;
  croak "$type is a virtual base class and not meant to be used directly";
}

1;

__END__

=head1 NAME

POE::Queue - a flexible, generic priority queue API

=head1 SYNOPSIS

POE::Queue specifies additional methods not illustrated here.

#!perl

use warnings;
use strict;
use POE::Queue::Array;

my $pqa = POE::Queue::Array->new();

# Enqueue a few items.

foreach my $priority (505, 404, 303, 202, 101) {
$pqa->enqueue($priority, "payload $priority");
}

# Dequeue until the queue is drained.

while (1) {
my ($priority, $queue_id, $payload) = $pqa->dequeue_next();
last unless defined $priority;

print(
"dequeued id($queue_id) ",
"priority($priority) ",
"payload($payload)\n",
);
}

Sample output:

dequeued id(5) priority(101) payload(payload 101)
dequeued id(4) priority(202) payload(payload 202)
dequeued id(3) priority(303) payload(payload 303)
dequeued id(2) priority(404) payload(payload 404)
dequeued id(1) priority(505) payload(payload 505)

=head1 DESCRIPTION

Priority queues may be implemented a number of ways, but they tend to
behave similiar to lists that are kept in order by some kind of
"priority". Enqueued items are stored such that the "next" item to be
retrieved is the one with the highest priority. Subsequent fetches
return the next lowest priority, and so on, until the queue is
emptied.

Priority queues (also known as priority heaps) attempt to do this
while consuming the fewest resources. Go read about it! It's
fascinating stuff!

=head2 POE::Queue Items

POE::Queue items consist of three fields: A priority, a unique ID
assigned at enqueue time, and a payload. The priority and payload are
specified by the caller, and the unique ID is generated by POE::Queue
when an item is enqueued.

POE::Queue imposes two limitations on priorities: Priorities must be
numeric, and lower numbers indicate higher priorities. Aside from
that, POE::Queue doesn't care what the numbers mean.

Unique IDs are handles into the queue. POE::Queue generates and
returns them as new items are enqueued. Some methods manipulate
items, and they take IDs to identify the items to alter.

Item payloads are arbitrary application data. POE::Queue does not
examine or alter payloads itself. Any methods that need to examine
payloads will accept a filter function (see L<Filter Functions>).
Filter functions examine payloads so POE::Queue need not.

=head1 Public Methods

POE::Queue is an API specification. Subclasses like
L<POE::Queue::Array> provide actual implementations.

=head2 new

Creates a new priority queue. Returns a reference to the queue.

my $queue = POE::Queue::Array->new();

=head2 enqueue PRIORITY, PAYLOAD

Enqueues a PAYLOAD, which can be just about anything that will fit
into a Perl scalar, at a particular PRIORITY level. enqueue() returns
a unique ID which can be used to manipulate the payload or its
priority directly.

Following the UNIX tradition, lower priority numbers indicate higher
priorities. The payload with the lowest priority number will be
dequeued first. If two payloads have the same PRIORITY, then they
will be dequeued in the order in which they were enqueued.

In this example, a queue is used to manage a number of alarms. The
"next" alarm will be the one due soonest.

my $payload_id = $queue->enqueue($alarm_time, [ "stuff" ]);

=head2 dequeue_next

Removes the next item from the queue, returning it as three fields:
priority, ID and payloade.

The "next" item is the one with the lowest priority number. If
multiple items exist with the same priority, dequeue_next() will
return the one that was given the priority first.

ITEM: while (1) {
my ($priority, $id, $payload) = $queue->dequeue_next();
last ITEM unless defined $priority;
...;
}

=head2 get_next_priority

Returns the priority of the item at the nead of the queue. This is
the lowest numeric priority in the queue.

get_next_priority() can be useful for checking the queue to see if
it's time to dequeue some items. In this case, the queue manages
multiple alarms, and there's nothing to do if the next alarm isn't due
yet.

ALARM: while (1) {
my $next_alarm_time = $queue->get_next_priority();
last ALARM unless defined $next_alarm_time;

if ($next_alarm_time - time() > 0) {
sleep($next_alarm_time - time());
}

my ($priority, $id, $payload) = $queue->dequeue_next();
...;
}

=head2 get_item_count

Returns the number of items in the queue. It's another way to tell
whether the queue has been fully drained. Here's an alternative
version of the example for get_next_priority().

ALARM: while ($queue->get_item_count()) {
my $next_alarm_time = $queue->get_next_priority();
if ($next_alarm_time - time() > 0) {
sleep($next_alarm_time - time());
}

my ($priority, $id, $payload) = $queue->dequeue_next();
...;
}

=head2 remove_item ITEM_ID, FILTER_FUNCTION

Removes a single item by its ID, but only if a FILTER_FUNCTION
approves of the item's payload.

If a payload is found with the given ITEM_ID, it is passed to
FILTER_FUNCTION for examination. If FILTER_FUNCTION returns true, the
item is removed from the queue and is returned as three fields.

my ($priority, $id, $payload) = $queue->remove_item(
$target_id, \&monkeys
);

sub monkeys {
my $payload = shift;
$payload->{type} eq "monkey";
}

The returned $priority will be undef on failure, and $! will be set to
the reason why the item couldn't be removed. That will be ESRCH if
the ITEM_ID was not found in the queue, or EPERM if the filter
function returned false.

=head2 remove_items FILTER_FUNCTION [, MAX_ITEM_COUNT ]

Removes and returns items from the queue that match a FILTER_FUNCTION.
remove_items() will return immediately if MAX_ITEM_COUNT items is
specified and that many items have been removed from the queue.
MAX_ITEM_COUNT is a bit of optimization if the application knows in
advance how many items will match the FILTER_FUNCTION.

Returns a list of items that were removed. Each item is an array
reference containing a priority, item ID, and payload. Returns
nothing if FILTER_FUNCTION matched nothing.

# Remove up to 12 monkeys.
my @monkeys = $queue->remove_items(\&monkeys, 12);
foreach my $monkey (@monkeys) {
my ($priority, $id, $payload) = @$monkey;
print(
"Removed monkey:\n",
" priority = $priority\n",
" queue id = $id\n",
" payload = $payload\n",
);
}

There is no guarantee which items will be removed if MAX_ITEM_COUNT is
specified too low.

See L<Filter Functions> for more details about filter functions.

=head2 peek_items FILTER_FUNCTION [, MAX_ITEM_COUNT ]

peek_items() returns up to MAX_ITEM_COUNT items that match a given
FILTER_FUNCTION without removing them from the queue.

my @entire_queue = $queue->peek_items(sub { 1 });
foreach my $item (@entire_queue) {
my ($priority, $id, $payload) = @$item;
print(
"Item:\n",
" priority = $priority\n",
" queue id = $id\n",
" payload = $payload\n",
);
}

=head2 adjust_priority ITEM_ID, FILTER_FUNCTION, DELTA

Changes the priority of an item by DELTA. The item is identified by
its ITEM_ID, and the change will only happen if the item's payload
satisfies a FILTER_FUNCTION. Returns the new priority, which is the
previous priority + DELTA. DELTA may be negative.

my $new_priority = $queue->adjust_priority(
$item_id, \&one_of_mine, 100
);

sub one_of_mine {
my $payload = shift;
return $payload->{owner} == $me;
}

Returns undef if the item's priority could not be adjusted, and sets
$! to explain why: ESRCH means that the ITEM_ID could not be found,
and EPERM means that the FILTER_FUNCTION was not satsified.

=head2 set_priority ITEM_ID, FILTER_FUNCTION, ABSOLUTE_PRIORITY

Sets an item's priority to a new ABSOLUTE_PRIORITY. The item is
identified by its ITEM_ID, and the change will only be allowed to
happen if the item's payload satisfies a FILTER_FUNCTION. Returns the
new priority, which should match ABSOLUTE_PRIORITY.

Returns undef if the item's priority could not be set, and sets $! to
explain why: ESRCH means that the ITEM_ID could not be found, and
EPERM means that the FILTER_FUNCTION was not satsified.

my $new_priority = $queue->set_priority(
$item_id, \&one_of_mine, time() + 60
);

unless (defined $new_priority) {
die "one of our submarines is missing: $item_id" if $! == ESRCH;
die "set_priority disallowed for item $item_id" if $! == EPERM;
die $!;
}

sub one_of_mine {
$_[0]{owner} == $me;
}

=head1 SEE ALSO

L<POE>, L<POE::Queue::Array>

=head1 BUGS

None known.

TODO - Should set_priority return the old priority instead of the new
one?

TODO - Rename and repackage as its own distribution.

=head1 AUTHORS & COPYRIGHTS

Please see L<POE> for more information about authors, contributors,
and POE's licensing.

=cut

# rocco // vim: ts=2 sw=2 expandtab
# TODO - Edit.
Something went wrong with that request. Please try again.