Skip to content
Browse files

removed obsolete streamers

  • Loading branch information...
1 parent deea1a4 commit bfaccc9bbb56574e6f7b2873eed2bd7e5b29d2c9 Philip (flip) Kromer committed Jan 28, 2011
View
61 lib/wukong/streamer/cassandra_streamer.rb
@@ -1,61 +0,0 @@
-# Defines a base class for streaming data into a cassandra db connection.
-require 'cassandra' ; include Cassandra::Constants
-module Wukong
- module Streamer
-
- class CassandraStreamer < Wukong::Streamer::Base
- attr_accessor :batch_count, :batch_record_count, :batch_size, :column_space, :db_seeds, :cassandra_db
-
- def initialize *args
- super *args
- self.batch_count = 0
- self.batch_record_count = 0
- self.column_space ||= 'Twitter'
- self.batch_size ||= 100
- self.db_seeds ||= %w[10.244.191.178 10.243.19.223 10.243.17.219 10.245.70.85 10.244.206.241].map{ |s| s.to_s+':9160'}
- self.cassandra_db ||= Cassandra.new(self.column_space, self.db_seeds)
- end
-
- def stream
- while still_lines? do
- start_batch do
- while still_lines? && batch_not_full? do
- line = get_line
- record = recordize(line.chomp) or next
- next if record.blank?
- process(*record) do |output_record|
- emit output_record
- end
- self.batch_record_count += 1
- end
- end
- end
- end
-
- def process *args, &blk
- Raise "Overwrite this method to insert into cassandra db"
- end
-
- def start_batch &blk
- self.batch_record_count = 0
- self.batch_count += 1
- self.cassandra_db.batch(&blk)
- end
-
- def get_line
- $stdin.gets
- end
-
- def still_lines?
- !$stdin.eof?
- end
-
- def batch_not_full?
- self.batch_record_count < self.batch_size
- end
-
- end
- end
-
-end
-
View
11 lib/wukong/streamer/count_keys.rb
@@ -4,25 +4,20 @@ module Streamer
# Emit each unique key and the count of its occurrences
#
class CountKeys < Wukong::Streamer::AccumulatingReducer
- attr_accessor :key_count
-
- def formatted_key_count
- "%10d"%key_count.to_i
- end
# reset the counter to zero
def start! *args
- self.key_count = 0
+ @count = 0
end
# record one more for this key
def accumulate *vals
- self.key_count += 1
+ @count += 1
end
# emit each key field and the count, tab-separated.
def finalize
- yield [key, formatted_key_count]
+ yield [key, @count]
end
end
View
26 lib/wukong/streamer/count_lines.rb
@@ -1,26 +0,0 @@
-module Wukong
- module Streamer
- #
- # For each identical line in the map phase output, emit one representative
- # line followed by the count of occrrences (separated by a tab).
- #
- # (This is the functional equivalent of +'uniq -c'+)
- #
- class CountLines < Wukong::Streamer::Base
- def formatted_count item, key_count
- "%s\t%10d" % [item, key_count.to_i]
- end
-
- #
- # Delegate to +uniq -c+, but put the count last for idempotence.
- #
- def stream
- %x{/usr/bin/uniq -c}.split("\n").each do |line|
- key_count, item = line.chomp.strip.split(/\s+/, 2)
- puts formatted_count(item, key_count)
- end
- end
- end
-
- end
-end
View
25 lib/wukong/streamer/counting_reducer.rb
@@ -1,25 +0,0 @@
-module Wukong
- module Streamer
-
- #
- # Count the number of records for each key.
- #
- class CountingReducer < AccumulatingReducer
- attr_accessor :count
-
- # start the sum with 0 for each key
- def start! *_
- self.count = 0
- end
- # ... and count the number of records for this key
- def accumulate *_
- self.count += 1
- end
- # emit [key, count]
- def finalize
- yield [key, count].flatten
- end
- end
-
- end
-end
View
22 lib/wukong/streamer/preprocess_with_pipe_streamer.rb
@@ -1,22 +0,0 @@
-module Wukong
- module Streamer
- module PreprocessWithPipeStreamer
- #
- # Runs STDIN through a shell command and then begins processing.
- #
- # If you don't need to do anything to the output of the command, just
- # inherit from Wukong::Script and override the #map_command.
- #
- # You must provide a @preprocess_pipe_command@ method that returns a shell
- # command to run the input through.
- #
- def stream
- #
- `#{preprocess_pipe_command}`.each do |line|
- item = itemize(line) ; next if item.blank?
- process(*item)
- end
- end
- end
- end
-end

0 comments on commit bfaccc9

Please sign in to comment.
Something went wrong with that request. Please try again.