Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

first commit

  • Loading branch information...
commit 211668f8780b3ad5a63b05b93a88c61327c91b60 0 parents
naoya authored
0  README
No changes.
40 lib/Hadoop/Mapper.pm
@@ -0,0 +1,40 @@
+package Hadoop::Mapper;
+use Moose::Role;
+
+use IO::Handle;
+use Params::Validate qw/validate_pos/;
+
+with 'Hadoop::Role::Emitter';
+requires qw/map/;
+
+sub run {
+ my $class = shift;
+ my $self = $class->new;
+
+ ## FIXME: 入力の形式に併せて処理を変更
+ while (my $line = STDIN->getline) {
+ chomp $line;
+
+ ## SequenceFileAsTextInputFormat
+ #my ($key, $value) = split /\t/, $line;
+
+ $self->map(undef, $line);
+ }
+}
+
+sub emit {
+ my ($self, $key, $value) = @_;
+ eval {
+ $self->put($key, $value);
+ };
+ if ($@) {
+ warn $@;
+ }
+}
+
+sub put {
+ my ($self, $key, $value) = validate_pos(@_, 1, 1, 1);
+ printf "%s\t%s\n", $key, $value;
+}
+
+1;
42 lib/Hadoop/Reducer.pm
@@ -0,0 +1,42 @@
+package Hadoop::Reducer;
+use Moose::Role;
+
+with 'Hadoop::Role::Emitter';
+
+use IO::Handle;
+use Params::Validate qw/validate_pos/;
+use Hadoop::Reducer::Input;
+
+with 'Hadoop::Role::Emitter';
+requires qw/reduce/;
+
+sub run {
+ my $class = shift;
+ my $self = $class->new;
+
+ my $input = Hadoop::Reducer::Input->new(handle => \*STDIN);
+ my $iter = $input->iterator;
+
+ while ($iter->has_next) {
+ my ($key, $values_iter) = $iter->next or last;
+ $self->reduce( $key => $values_iter );
+ }
+}
+
+sub emit {
+ my ($self, $key, $value) = @_;
+ eval {
+ $self->put($key, $value);
+ };
+ if ($@) {
+ warn $@;
+ }
+}
+
+sub put {
+ my ($self, $key, $value) = validate_pos(@_, 1, 1, 1);
+ printf "%s\t%s\n", $key, $value;
+}
+
+1;
+
55 lib/Hadoop/Reducer/Input.pm
@@ -0,0 +1,55 @@
+package Hadoop::Reducer::Input;
+use Moose;
+use Hadoop::Reducer::Input::Iterator;
+
+has handle => (
+ is => 'ro',
+ does => 'FileHandle',
+ required => 1,
+);
+
+has buffer => (
+ is => 'rw',
+);
+
+sub next_key {
+ my $self = shift;
+ my $line = $self->buffer ? $self->buffer : $self->next_line;
+ return if not defined $line;
+
+ my ($key, $value) = split /\t/, $line;
+ return $key;
+}
+
+sub next_line {
+ my $self = shift;
+ return if $self->handle->eof;
+
+ $self->buffer( $self->handle->getline );
+ $self->buffer;
+}
+
+sub getline {
+ my $self = shift;
+ if (defined $self->buffer) {
+ my $buf = $self->buffer;
+ $self->buffer(undef);
+ return $buf;
+ } else {
+ return $self->next_line;
+ }
+}
+
+sub iterator {
+ my $self = shift;
+ Hadoop::Reducer::Input::Iterator->new( input => $self );
+}
+
+sub each {
+ my $self = shift;
+ my $line = $self->getline or return;
+ chomp $line;
+ split /\t/, $line;
+}
+
+1;
54 lib/Hadoop/Reducer/Input/Iterator.pm
@@ -0,0 +1,54 @@
+package Hadoop::Reducer::Input::Iterator;
+use Moose;
+with 'Hadoop::Role::Iterator';
+
+use Hadoop::Reducer::Input::ValuesIterator;
+
+has input => (
+ is => 'ro',
+ isa => 'Hadoop::Reducer::Input',
+ required => 1,
+);
+
+has current_key => (
+ is => 'rw',
+ does => 'Str'
+);
+
+sub has_next {
+ my $self = shift;
+ return if $self->input->handle->eof;
+ return if not defined $self->input->next_key;
+ 1;
+}
+
+sub next {
+ my $self = shift;
+
+ if (not defined $self->current_key or $self->current_key ne $self->input->next_key) {
+ my ($key, $value) = $self->input->each;
+ $self->current_key($key);
+ return $self->retval($key, $value);
+ }
+
+ my ($key, $value);
+ do {
+ ($value, $value) = $self->input->each or return;
+ } while ($self->current_key eq $key);
+
+ $self->current_key( $key );
+ return $self->retval($key, $value);
+}
+
+sub retval {
+ my ($self, $key, $value) = @_;
+ return (
+ $key,
+ Hadoop::Reducer::Input::ValuesIterator->new(
+ input_iter => $self,
+ first => $value,
+ ),
+ );
+}
+
+1;
32 lib/Hadoop/Reducer/Input/ValuesIterator.pm
@@ -0,0 +1,32 @@
+package Hadoop::Reducer::Input::ValuesIterator;
+use Moose;
+with 'Hadoop::Role::Iterator';
+
+has input_iter => (
+ is => 'ro',
+ does => 'Hadoop::Role::Iterator',
+ required => 1,
+);
+
+has first => (
+ is => 'rw',
+);
+
+sub has_next {
+ my $self = shift;
+ $self->input_iter->input->next_key or return;
+ $self->input_iter->current_key eq $self->input_iter->input->next_key;
+}
+
+sub next {
+ my $self = shift;
+ if (my $first = $self->first) {
+ $self->first( undef );
+ return $first;
+ }
+ my ($key, $value) = $self->input_iter->input->each;
+ $value;
+}
+
+1;
+
6 lib/Hadoop/Role/Emitter.pm
@@ -0,0 +1,6 @@
+package Hadoop::Role::Emitter;
+use Moose::Role;
+
+requires qw/run emit/;
+
+1;
7 lib/Hadoop/Role/Iterator.pm
@@ -0,0 +1,7 @@
+package Hadoop::Role::Iterator;
+use Moose::Role;
+
+requires qw/has_next next/;
+
+1;
+
Please sign in to comment.
Something went wrong with that request. Please try again.