Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP

Loading…

Wukong Script as jRubyDecorator #9

Merged
merged 1 commit into from

2 participants

Travis Dempsey Philip (flip) Kromer
Travis Dempsey
Owner

Small class addition to allow this

Philip (flip) Kromer mrflip merged commit 41c3e0a into from
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
This page is out of date. Refresh to see the latest.
Showing with 29 additions and 0 deletions.
  1. +1 −0  lib/wukong.rb
  2. +28 −0 lib/wukong/decorator.rb
1  lib/wukong.rb
View
@@ -10,6 +10,7 @@ module Wukong
autoload :Streamer, 'wukong/streamer'
autoload :Store, 'wukong/store'
autoload :FilenamePattern, 'wukong/filename_pattern'
+ autoload :Decorator, 'wukong/decorator'
def self.run mapper, reducer=nil, options={}
Wukong::Script.new(mapper, reducer, options).run
28 lib/wukong/decorator.rb
View
@@ -0,0 +1,28 @@
+require 'java'
+
+java_import 'com.cloudera.flume.core.Event'
+java_import 'com.cloudera.flume.core.EventImpl'
+java_import 'com.cloudera.flume.core.EventSinkDecorator'
+
+module Wukong
+ class Decorator < EventSinkDecorator
+
+ def initialize(mapper, reducer=nil, options={})
+ super(nil)
+ @mapper = mapper.new
+ end
+
+ def append(e)
+ line = String.from_java_bytes(e.getBody)
+ record = @mapper.recordize(line.chomp)
+ @mapper.process(*record) do |output|
+ processed = output.to_flat.join("\t")
+ event = EventImpl.new(processed.to_java_bytes, e.getTimestamp, e.getPriority, e.getNanos, e.getHost, e.getAttrs)
+ super event
+ end
+ end
+
+ def run() self ; end
+
+ end
+end
Something went wrong with that request. Please try again.