Permalink
Browse files

Added FlatPack and SQL streamers

These streamers use their respective parsers
to automatically recordize lines.
  • Loading branch information...
1 parent c942834 commit d828af7a68f6540679ecfa4d5e4df89bae3c5b81 @dieterichlawson dieterichlawson committed Aug 31, 2012
Showing with 49 additions and 0 deletions.
  1. +2 −0 lib/wukong/streamer.rb
  2. +16 −0 lib/wukong/streamer/flatpack_streamer.rb
  3. +31 −0 lib/wukong/streamer/sql_streamer.rb
View
@@ -8,6 +8,8 @@ module Streamer
autoload :StructStreamer, 'wukong/streamer/struct_streamer'
autoload :StructRecordizer, 'wukong/streamer/struct_streamer'
autoload :InstanceStreamer, 'wukong/streamer/instance_streamer'
+ autoload :FlatPackStreamer, 'wukong/streamer/flatpack_streamer'
+ autoload :SQLStreamer, 'wukong/streamer/sql_streamer'
#
autoload :Filter, 'wukong/streamer/filter'
autoload :EncodingCleaner, 'wukong/streamer/encoding_cleaner'
@@ -0,0 +1,16 @@
+require 'wukong/parser/flatpack_parser'
+
+module Wukong
+ module Streamer
+ class FlatPackStreamer < Wukong::Streamer::Base
+
+ def self.format format
+ @@parser = Wukong::Parser::FlatPack.create_parser format
+ end
+
+ def recordize line
+ @@parser.parse line
+ end
+ end
+ end
+end
@@ -0,0 +1,31 @@
+require 'wukong/parser/sql_parser'
+
+module Wukong
+ module Streamer
+ class SQLStreamer < Wukong::Streamer::Base
+
+ def self.columns columns
+ @@sql_parser = Wukong::Parser::SQLParser.new columns
+ end
+
+ def stream
+ Log.info("Streaming on:\t%s" % [Script.input_file]) unless Script.input_file.blank?
+ before_stream
+ each_record do |line|
+ recordize(line.chomp) do |record|
+ next if record.nil?
+ process(*record) do |output_record|
+ emit output_record
+ end
+ track(record)
+ end
+ end
+ after_stream
+ end
+
+ def recordize line, &blk
+ @@sql_parser.parse line, &blk
+ end
+ end
+ end
+end

0 comments on commit d828af7

Please sign in to comment.