Permalink
Browse files

Merge branch 'rc/1.112100'

Signed-off-by: Alexei Znamensky <russoz@cpan.org>
  • Loading branch information...
2 parents f26cfbf + c4bdadc commit e03aa12ee65a14326d88ebee0db2ce7a3292b115 @russoz committed Jul 29, 2011
View
@@ -4,12 +4,10 @@ license = Perl_5
copyright_holder = Alexei Znamensky
copyright_year = 2011
-[AutoVersion]
[@Author::RUSSOZ]
+version = auto
twitter_tags = #opendata #dataflow
[Prereqs]
perl = 5.008
-[@Git]
-
View
@@ -8,10 +8,13 @@ use warnings;
# VERSION
use Moose;
+use Moose::Exporter;
with 'DataFlow::Role::Processor';
with 'DataFlow::Role::Dumper';
-use DataFlow::Types qw(ProcessorList);
+use DataFlow::Types qw(WrappedProcList);
+
+use Moose::Autobox;
use namespace::autoclean;
use Queue::Base 2.1;
@@ -23,7 +26,16 @@ with 'MooseX::OneArgNew' => { 'type' => 'DataFlow', 'init_arg' => 'procs', };
with 'MooseX::OneArgNew' =>
{ 'type' => 'DataFlow::Proc', 'init_arg' => 'procs', };
+Moose::Exporter->setup_import_methods( as_is => ['dataflow'] );
+
# attributes
+has 'default_channel' => (
+ 'is' => 'ro',
+ 'isa' => 'Str',
+ 'lazy' => 1,
+ 'default' => 'default',
+);
+
has 'auto_process' => (
'is' => 'ro',
'isa' => 'Bool',
@@ -33,7 +45,7 @@ has 'auto_process' => (
has 'procs' => (
'is' => 'ro',
- 'isa' => 'ProcessorList',
+ 'isa' => 'WrappedProcList',
'required' => 1,
'coerce' => 1,
'builder' => '_build_procs',
@@ -49,7 +61,7 @@ has '_queues' => (
'has_queued_data' =>
sub { return _count_queued_items( shift->_queues ) },
'_make_queues' => sub {
- return [ map { Queue::Base->new() } @{ shift->procs } ];
+ shift->procs->map( sub { Queue::Base->new() } );
},
},
);
@@ -61,21 +73,7 @@ has '_lastq' => (
'default' => sub { return Queue::Base->new },
);
-has 'dump_input' => (
- 'is' => 'ro',
- 'isa' => 'Bool',
- 'lazy' => 1,
- 'default' => 0,
- 'documentation' => 'Prints a dump of the input load to STDERR',
-);
-
-has 'dump_output' => (
- 'is' => 'ro',
- 'isa' => 'Bool',
- 'lazy' => 1,
- 'default' => 0,
- 'documentation' => 'Prints a dump of the output load to STDERR',
-);
+##############################################################################
sub _build_procs {
return;
@@ -86,7 +84,7 @@ sub _count_queued_items {
my $q = shift;
my $count = 0;
- map { $count = $count + $_->size } @{$q};
+ $q->map( sub { $count = $count + $_->size } );
return $count;
}
@@ -102,7 +100,8 @@ sub _process_queues {
sub _reduce {
my ( $p, @q ) = @_;
- map { _process_queues( $p->[$_], $q[$_], $q[ $_ + 1 ] ) } ( 0 .. $#q - 1 );
+ [ 0 .. $#q - 1 ]
+ ->map( sub { _process_queues( $p->[$_], $q[$_], $q[ $_ + 1 ] ) } );
return;
}
@@ -112,12 +111,19 @@ sub clone {
return DataFlow->new( procs => $self->procs );
}
-sub input {
- my ( $self, @args ) = @_;
+sub channel_input {
+ my ( $self, $channel, @args ) = @_;
$self->prefix_dumper( $self->has_name ? $self->name . ' <<' : '<<', @args )
if $self->dump_input;
- $self->_firstq->add(@args);
+ $self->_firstq->add(
+ @{ @args->map( sub { DataFlow::Item->itemize( $channel, $_ ) } ) } );
+ return;
+}
+
+sub input {
+ my ( $self, @args ) = @_;
+ $self->channel_input( $self->default_channel, @args );
return;
}
@@ -128,18 +134,40 @@ sub process_input {
return;
}
-sub output {
- my $self = shift;
+sub _unitem {
+ my ( $item, $channel ) = @_;
+ return unless defined $item;
+ return $item->get_data($channel);
+}
+sub _output_items {
+ my $self = shift;
$self->process_input if ( $self->_lastq->empty && $self->auto_process );
- my @res = wantarray ? $self->_lastq->remove_all : $self->_lastq->remove;
+ return wantarray ? $self->_lastq->remove_all : $self->_lastq->remove;
+}
+
+sub output_items {
+ my $self = shift;
+ my @res = wantarray ? $self->_output_items : scalar( $self->_output_items );
$self->prefix_dumper( $self->has_name ? $self->name . ' >>' : '>>', @res )
if $self->dump_output;
return wantarray ? @res : $res[0];
}
+sub output {
+ my $self = shift;
+ my $channel = shift || $self->default_channel;
+
+ my @res = wantarray ? $self->_output_items : scalar( $self->_output_items );
+ $self->prefix_dumper( $self->has_name ? $self->name . ' >>' : '>>', @res )
+ if $self->dump_output;
+ return wantarray
+ ? @{ @res->map( sub { _unitem( $_, $channel ) } ) }
+ : _unitem( $res[0], $channel );
+}
+
sub reset { ## no critic
- return map { $_->clear } @{ shift->_queues };
+ return shift->_queues->map( sub { $_->clear } );
}
sub flush {
@@ -160,12 +188,21 @@ sub process {
sub proc_by_index {
my ( $self, $index ) = @_;
- return $self->procs->[$index];
+ return unless $self->procs->[$index];
+ return $self->procs->[$index]->on_proc;
}
sub proc_by_name {
my ( $self, $name ) = @_;
- return ( grep { $_->name eq $name } @{ $self->procs } )[0];
+ return $self->procs->map( sub { $_->on_proc } )
+ ->grep( sub { $_->name eq $name } )->[0];
+
+ #return $procs[0];
+}
+
+sub dataflow (@) { ## no critic
+ my @args = @_;
+ return __PACKAGE__->new( procs => [@args] );
}
__PACKAGE__->meta->make_immutable;
@@ -227,6 +264,10 @@ caller.
(Str) A descriptive name for the dataflow. (OPTIONAL)
+=attr default_channel
+
+(Str) The name of the default communication channel. (DEFAULT: 'default')
+
=attr auto_process
(Bool) If there is data available in the output queue, and one calls the
@@ -277,17 +318,32 @@ Processors using the L<DataFlow::Policy::ProcessInto> policy (default) will
process the items inside an array reference, and the values (not the keys)
inside a hash reference.
+=method channel_input
+
+Accepts input data into a specific channel for the data flow:
+
+ $flow->channel_input( 'mydatachannel', qw/all the simple things/ );
+
=method process_input
Processes items in the array of queues and place at least one item in the
output (last) queue. One will typically call this to flush out some unwanted
data and/or if C<auto_process> has been disabled.
+=method output_items
+
+Fetches items, more specifically objects of the type L<DataFlow::Item>, from
+the data flow. If called in scalar context it will return one processed item
+from the flow. If called in list context it will return all the items from
+the last queue.
+
=method output
-Fetches data from the data flow. If called in scalar context it will return
-one processed item from the flow. If called in list context it will return all
-the elements in the last queue.
+Fetches data from the data flow. It accepts a parameter that points from which
+data channel the data must be fetched. If no channel is specified, it will
+default to the 'default' channel.
+If called in scalar context it will return one processed item from the flow.
+If called in list context it will return all the elements in the last queue.
=method reset
@@ -313,9 +369,21 @@ Expects a name (Str) as parameter. Returns the first processor in this
data flow, for which the C<name> attribute has the same value of the C<name>
parameter, or C<undef> otherwise.
+=func dataflow
+
+Syntax sugar function that can be used to instantiate a new flow. It can be
+used like this:
+
+ my $flow = dataflow
+ [ 'Proc' => p => sub { ... } ],
+ ...
+ [ 'CSV' => direction => 'CONVERT_TO' ];
+
+ $flow->process('bananas');
+
=head1 HISTORY
-This is a framework for data flow processing. It started as a spinoff project
+This is a framework for data flow processing. It started as a spin-off project
from the L<OpenData-BR|http://www.opendatabr.org/> initiative.
As of now (Mar, 2011) it is still a 'work in progress', and there is a lot of
Oops, something went wrong.

0 comments on commit e03aa12

Please sign in to comment.