Skip to content

Commit

Permalink
Adding to README
Browse files Browse the repository at this point in the history
  • Loading branch information
Philip (flip) Kromer committed Feb 16, 2009
1 parent 28ea53b commit 4dc43dd
Show file tree
Hide file tree
Showing 5 changed files with 221 additions and 22 deletions.
161 changes: 161 additions & 0 deletions README-tutorial.textile
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
Here's a script to count words in a text stream:

require 'wukong'
module WordCount
class Mapper < Wukong::Streamer::LineStreamer
# Emit each word in the line.
def process line
words = line.strip.split(/\W+/).reject(&:blank?)
words.each{|word| yield [word, 1] }
end
end

class Reducer < Wukong::Streamer::ListReducer
def finalize
yield [ key, values.map(&:last).map(&:to_i).sum ]
end
end
end

Wukong::Script.new(
WordCount::Mapper,
WordCount::Reducer
).run # Execute the script

The first class, the Mapper, eats lines and craps @[word, count]@ records. Here
the /key/ is the word, and the /value/ is its count.

The second class is an example of an accumulated list reducer. The values for
each key are stacked up into a list; then the record(s) yielded by @#finalize@
are emitted.

Here's another way to write the Reducer: accumulate the count of each line, then
yield the sum in @#finalize@:

class Reducer2 < Wukong::Streamer::AccumulatingReducer
attr_accessor :key_count
def reset!() self.key_count = 0 end
def accumulate(word, count)
self.key_count += count.to_i
end
def finalize
yield [ key, key_count ]
end
end

Of course you can be really lazy (that is, smart) and write your script instead as

class Script < Wukong::Script
def reducer_command
'uniq -c'
end
end


h2. Structured data

All of these deal with unstructured data. Wukong also lets you view your data
as a stream of structured objects.

Let's say you have a blog; its records look like

Post = Struct.new( :id, :created_at, :user_id, :title, :body, :link )
Comment = Struct.new( :id, :created_at, :post_id, :user_id, :body )
User = Struct.new( :id, :username, :fullname, :homepage, :description )
UserLoc = Struct.new( :user_id, :text, :lat, :lng )

You've been using "twitter":http://twitter.com for a long time, and you've
written something that from now on will inject all your tweets as Posts, and all
replies to them as Comments (by a common 'twitter_bot' account on your blog).
What about the past two years' worth of tweets? Let's assume you're so chatty that
a Map/Reduce script is warranted to handle the volume.

Cook up something that scrapes your tweets and all replies to your tweets:

Tweet = Struct.new( :id, :created_at, :twitter_user_id,
:in_reply_to_user_id, :in_reply_to_status_id, :text )
TwitterUser = Struct.new( :id, :username, :fullname,
:homepage, :location, :description )

Now we'll just process all those in a big pile, converting to Posts, Comments
and Users as appropriate. Serialize your scrape results so that each Tweet and
each TwitterUser is a single lines containing first the class name ('tweet' or
'twitter_user') followed by its constituent fields, in order, separated by tabs.

The RecordStreamer takes each such line, constructs its corresponding class, and
instantiates it with the

require 'wukong'
require 'my_blog' #defines the blog models
module TwitBlog
class Mapper < Wukong::Streamer::RecordStreamer
# Watch for tweets by me
MY_USER_ID = 24601
# structs for our input objects
Tweet = Struct.new( :id, :created_at, :twitter_user_id,
:in_reply_to_user_id, :in_reply_to_status_id, :text )
TwitterUser = Struct.new( :id, :username, :fullname,
:homepage, :location, :description )
#
# If this is a tweet is by me, convert it to a Post.
#
# If it is a tweet not by me, convert it to a Comment that
# will be paired with the correct Post.
#
# If it is a TwitterUser, convert it to a User record and
# a user_location record
#
def process record
case record
when TwitterUser
user = MyBlog::User.new.merge(record) # grab the fields in common
user_loc = MyBlog::UserLoc.new(record.id, record.location, nil, nil)
yield user
yield user_loc
when Tweet
if record.twitter_user_id == MY_USER_ID
post = MyBlog::Post.new.merge record
post.link = "http://twitter.com/statuses/show/#{record.id}"
post.body = record.text
post.title = record.text[0..65] + "..."
yield post
else
comment = MyBlog::Comment.new.merge record
comment.body = record.text
comment.post_id = record.in_reply_to_status_id
yield comment
end
end
end
end
end
Wukong::Script.new( TwitBlog::Mapper, nil ).run # identity reducer

h2. Uniqifying

The script above uses the identity reducer: every record from the mapper is sent
to the output. But what if you had grabbed the replying user's record every time
you saw a reply?

Fine, so pass it through @uniq@. But what if a user updated their location or
description during this time? You'll want to probably use UniqByLastReducer

Location might want to take the most /frequent/, and might want as well to
geolocate the location text. Use a ListReducer, find the most frequent element,
then finally call the expensive geolocation method.

h2. A note about keys

Now we're going to write this using the synthetic keys already extant in the
twitter records, making the unwarranted assumption that they won't collide with
the keys in your database.

Map/Reduce paradigm does badly with synthetic keys. Synthetic keys demand
locality, and map/reduce's remarkable scaling comes from not assuming
locality. In general, write your map/reduce scripts to use natural keys (the scre

h1. More info

There are many useful examples (including an actually-useful version of this
WordCount script) in examples/ directory.

78 changes: 58 additions & 20 deletions README.textile
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ command line.

h2. How to write a Wukong script


Here's a script to count words in a text stream:

require 'wukong'
Expand All @@ -25,43 +26,80 @@ Here's a script to count words in a text stream:
end
end

class Reducer < Wukong::Streamer::AccumulatingReducer
attr_accessor :key_count
def reset!() key_count = 0 end
def accumulate(*args) key_count += 1 end
class Reducer < Wukong::Streamer::ListReducer
def finalize
yield [ key, key_count ]
yield [ key, values.map(&:last).map(&:to_i).sum ]
end
end
end

Wukong::Script.new(
WordCount::Mapper,
WordCount::Reducer
).run # Execute the script

The first class, the Mapper, eats lines and craps words

The second class is an example of an accumulating reducer:
The first class, the Mapper, eats lines and craps @[word, count]@ records: word
is the /key/, its count is the /value/.

Here's another way to write the Reducer:
In the reducer, the values for each key are stacked up into a list; then the
record(s) yielded by @#finalize@ are emitted.

class Reducer < Wukong::Streamer::ListReducer
def finalize
values.map(&:last).map(&:to_i).sum
end
end
h3. Structured data stream

And yet another way:
You can also use structs to treat your dataset as a stream of objects:

class Reducer < Wukong::Streamer::CountKeys
require 'wukong'
require 'my_blog' #defines the blog models
# structs for our input objects
Tweet = Struct.new( :id, :created_at, :twitter_user_id,
:in_reply_to_user_id, :in_reply_to_status_id, :text )
TwitterUser = Struct.new( :id, :username, :fullname,
:homepage, :location, :description )
module TwitBlog
class Mapper < Wukong::Streamer::RecordStreamer
# Watch for tweets by me
MY_USER_ID = 24601
#
# If this is a tweet is by me, convert it to a Post.
#
# If it is a tweet not by me, convert it to a Comment that
# will be paired with the correct Post.
#
# If it is a TwitterUser, convert it to a User record and
# a user_location record
#
def process record
case record
when TwitterUser
user = MyBlog::User.new.merge(record) # grab the fields in common
user_loc = MyBlog::UserLoc.new(record.id, record.location, nil, nil)
yield user
yield user_loc
when Tweet
if record.twitter_user_id == MY_USER_ID
post = MyBlog::Post.new.merge record
post.link = "http://twitter.com/statuses/show/#{record.id}"
post.body = record.text
post.title = record.text[0..65] + "..."
yield post
else
comment = MyBlog::Comment.new.merge record
comment.body = record.text
comment.post_id = record.in_reply_to_status_id
yield comment
end
end
end
end
end
Wukong::Script.new( TwitBlog::Mapper, nil ).run # identity reducer


You can also write your

There are many useful examples (including an actually-useful version of this
h3. More info

There are many useful examples (including an actually-useful version of the
WordCount script) in examples/ directory.


h2. How to run a Wukong script

your/script.rb --go path/to/input_files path/to/output_dir
Expand Down
4 changes: 2 additions & 2 deletions wukong/streamer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -7,5 +7,5 @@
require 'wukong/streamer/accumulating_reducer'
require 'wukong/streamer/list_reducer'
require 'wukong/streamer/uniq_by_last_reducer'
require 'wukong/streamer/uniq_count_keys_reducer'
require 'wukong/streamer/uniq_count_lines_reducer'
require 'wukong/streamer/count_keys'
require 'wukong/streamer/count_lines'
File renamed without changes.
File renamed without changes.

0 comments on commit 4dc43dd

Please sign in to comment.