Skip to content
Browse files

added wukong decorator class

  • Loading branch information...
1 parent f93f2f0 commit d69137da7b7235389dad306ed36498c230cb5819 @kornypoet kornypoet committed Nov 3, 2011
Showing with 29 additions and 0 deletions.
  1. +1 −0 lib/wukong.rb
  2. +28 −0 lib/wukong/decorator.rb
View
1 lib/wukong.rb
@@ -10,6 +10,7 @@ module Wukong
autoload :Streamer, 'wukong/streamer'
autoload :Store, 'wukong/store'
autoload :FilenamePattern, 'wukong/filename_pattern'
+ autoload :Decorator, 'wukong/decorator'
def self.run mapper, reducer=nil, options={}
Wukong::Script.new(mapper, reducer, options).run
View
28 lib/wukong/decorator.rb
@@ -0,0 +1,28 @@
+require 'java'
+
+java_import 'com.cloudera.flume.core.Event'
+java_import 'com.cloudera.flume.core.EventImpl'
+java_import 'com.cloudera.flume.core.EventSinkDecorator'
+
+module Wukong
+ class Decorator < EventSinkDecorator
+
+ def initialize(mapper, reducer=nil, options={})
+ super(nil)
+ @mapper = mapper.new
+ end
+
+ def append(e)
+ line = String.from_java_bytes(e.getBody)
+ record = @mapper.recordize(line.chomp)
+ @mapper.process(*record) do |output|
+ processed = output.to_flat.join("\t")
+ event = EventImpl.new(processed.to_java_bytes, e.getTimestamp, e.getPriority, e.getNanos, e.getHost, e.getAttrs)
+ super event
+ end
+ end
+
+ def run() self ; end
+
+ end
+end

0 comments on commit d69137d

Please sign in to comment.
Something went wrong with that request. Please try again.