Permalink
Browse files

Switch map-reduce control protocol to use pipes. This exposed a bug i…

…n the pipe compiler, which is now fixed.

Use hashmaps in MapReduce

Tweak word-count difficulty
  • Loading branch information...
1 parent d5b8bbb commit 1a276dba52b5e717d12ab410832bdc13c28e9a67 @eholk eholk committed Jul 11, 2012
Showing with 101 additions and 65 deletions.
  1. +2 −25 src/libsyntax/ext/pipes/pipec.rs
  2. +99 −40 src/test/bench/task-perf-word-count-generic.rs
@@ -56,34 +56,11 @@ impl methods for message {
// Return the type parameters actually used by this message
fn get_params() -> ~[ast::ty_param] {
- let mut used = ~[];
alt self {
- message(_, tys, this, _, next_tys) {
- let parms = this.ty_params;
- for vec::append(tys, next_tys).each |ty| {
- alt ty.node {
- ast::ty_path(path, _) {
- if path.idents.len() == 1 {
- let id = path.idents[0];
-
- let found = parms.find(|p| id == p.ident);
-
- alt found {
- some(p) {
- if !used.contains(p) {
- vec::push(used, p);
- }
- }
- none { }
- }
- }
- }
- _ { }
- }
- }
+ message(_, _, this, _, _) {
+ this.ty_params
}
}
- used
}
fn gen_send(cx: ext_ctxt) -> @ast::item {
@@ -14,7 +14,8 @@ import option = option;
import option::some;
import option::none;
import str;
-import std::treemap;
+import std::map;
+import std::map::hashmap;
import vec;
import io;
import io::{reader_util, writer_util};
@@ -30,10 +31,30 @@ import comm::recv;
import comm::send;
import comm::methods;
+macro_rules! move {
+ { $x:expr } => { unsafe { let y <- *ptr::addr_of($x); y } }
+}
+
trait word_reader {
fn read_word() -> option<str>;
}
+trait hash_key {
+ fn hash() -> uint;
+ fn eq(self) -> bool;
+}
+
+fn mk_hash<K: const hash_key, V: copy>() -> map::hashmap<K, V> {
+ fn hashfn<K: const hash_key>(k: K) -> uint { k.hash() }
+
+ map::hashmap(hashfn::<K>, |x, y| x.eq(y))
+}
+
+impl of hash_key for str {
+ fn hash() -> uint { str::hash(self) }
+ fn eq(&&x: str) -> bool { str::eq(self, x) }
+}
+
// These used to be in task, but they disappeard.
type joinable_task = port<()>;
fn spawn_joinable(+f: fn~()) -> joinable_task {
@@ -79,6 +100,23 @@ fn reduce(&&word: str, get: map_reduce::getter<int>) {
io::println(#fmt("%s\t%?", word, count));
}
+class box<T> {
+ let mut contents: option<T>;
+ new(+x: T) { self.contents = some(x); }
+
+ fn swap(f: fn(+T) -> T) {
+ let mut tmp = none;
+ self.contents <-> tmp;
+ self.contents = some(f(option::unwrap(tmp)));
+ }
+
+ fn unwrap() -> T {
+ let mut tmp = none;
+ self.contents <-> tmp;
+ option::unwrap(tmp)
+ }
+}
+
mod map_reduce {
export putter;
export getter;
@@ -99,54 +137,74 @@ mod map_reduce {
mapper_done
}
+
+ proto! ctrl_proto {
+ open: send<K: copy send, V: copy send> {
+ find_reducer(K) -> reducer_response<K, V>,
+ mapper_done -> terminated
+ }
+
+ reducer_response: recv<K: copy send, V: copy send> {
+ reducer(chan<reduce_proto<V>>) -> open<K, V>
+ }
+
+ terminated: send { }
+ }
+
enum reduce_proto<V: copy send> { emit_val(V), done, ref, release }
- fn start_mappers<K1: copy send, K2: copy send, V: copy send>(
+ fn start_mappers<K1: copy send, K2: const copy send hash_key,
+ V: copy send>(
map: mapper<K1, K2, V>,
- ctrl: chan<ctrl_proto<K2, V>>, inputs: ~[K1])
+ &ctrls: ~[ctrl_proto::server::open<K2, V>],
+ inputs: ~[K1])
-> ~[joinable_task]
{
let mut tasks = ~[];
for inputs.each |i| {
+ let (ctrl, ctrl_server) = ctrl_proto::init();
+ let ctrl = box(ctrl);
vec::push(tasks, spawn_joinable(|| map_task(map, ctrl, i) ));
+ vec::push(ctrls, ctrl_server);
}
ret tasks;
}
- fn map_task<K1: copy send, K2: copy send, V: copy send>(
+ fn map_task<K1: copy send, K2: const copy send hash_key, V: copy send>(
map: mapper<K1, K2, V>,
- ctrl: chan<ctrl_proto<K2, V>>,
+ ctrl: box<ctrl_proto::client::open<K2, V>>,
input: K1)
{
// log(error, "map_task " + input);
- let intermediates = treemap::treemap();
+ let intermediates = mk_hash();
- fn emit<K2: copy send, V: copy send>(
- im: treemap::treemap<K2, chan<reduce_proto<V>>>,
- ctrl: chan<ctrl_proto<K2, V>>, key: K2, val: V)
- {
- let c;
- alt treemap::find(im, key) {
- some(_c) { c = _c; }
+ do map(input) |key, val| {
+ let mut c = none;
+ alt intermediates.find(key) {
+ some(_c) { c = some(_c); }
none {
- let p = port();
- send(ctrl, find_reducer(key, chan(p)));
- c = recv(p);
- treemap::insert(im, key, c);
- send(c, ref);
+ do ctrl.swap |ctrl| {
+ let ctrl = ctrl_proto::client::find_reducer(ctrl, key);
+ alt pipes::recv(ctrl) {
+ ctrl_proto::reducer(c_, ctrl) {
+ c = some(c_);
+ move!{ctrl}
+ }
+ }
+ }
+ intermediates.insert(key, c.get());
+ send(c.get(), ref);
}
}
- send(c, emit_val(val));
+ send(c.get(), emit_val(val));
}
- map(input, {|a,b|emit(intermediates, ctrl, a, b)});
-
fn finish<K: copy send, V: copy send>(_k: K, v: chan<reduce_proto<V>>)
{
send(v, release);
}
- treemap::traverse(intermediates, finish);
- send(ctrl, mapper_done);
+ for intermediates.each_value |v| { send(v, release) }
+ ctrl_proto::client::mapper_done(ctrl.unwrap());
}
fn reduce_task<K: copy send, V: copy send>(
@@ -184,30 +242,32 @@ mod map_reduce {
reduce(key, || get(p, ref_count, is_done) );
}
- fn map_reduce<K1: copy send, K2: copy send, V: copy send>(
+ fn map_reduce<K1: copy send, K2: const copy send hash_key, V: copy send>(
map: mapper<K1, K2, V>,
reduce: reducer<K2, V>,
inputs: ~[K1])
{
- let ctrl = port();
+ let mut ctrl = ~[];
// This task becomes the master control task. It task::_spawns
// to do the rest.
- let reducers = treemap::treemap();
- let mut tasks = start_mappers(map, chan(ctrl), inputs);
+ let reducers = mk_hash();
+ let mut tasks = start_mappers(map, ctrl, inputs);
let mut num_mappers = vec::len(inputs) as int;
while num_mappers > 0 {
- alt recv(ctrl) {
- mapper_done {
+ let (_ready, message, ctrls) = pipes::select(ctrl);
+ alt option::unwrap(message) {
+ ctrl_proto::mapper_done(_) {
// #error("received mapper terminated.");
num_mappers -= 1;
+ ctrl = ctrls;
}
- find_reducer(k, cc) {
+ ctrl_proto::find_reducer(k, cc) {
let c;
// log(error, "finding reducer for " + k);
- alt treemap::find(reducers, k) {
+ alt reducers.find(k) {
some(_c) {
// log(error,
// "reusing existing reducer for " + k);
@@ -221,19 +281,17 @@ mod map_reduce {
vec::push(tasks,
spawn_joinable(|| reduce_task(r, kk, ch) ));
c = recv(p);
- treemap::insert(reducers, k, c);
+ reducers.insert(k, c);
}
}
- send(cc, c);
+ ctrl = vec::append_one(
+ ctrls,
+ ctrl_proto::server::reducer(move!{cc}, c));
}
}
}
- fn finish<K: copy send, V: copy send>(_k: K, v: chan<reduce_proto<V>>)
- {
- send(v, done);
- }
- treemap::traverse(reducers, finish);
+ for reducers.each_value |v| { send(v, done) }
for tasks.each |t| { join(t); }
}
@@ -254,7 +312,7 @@ fn main(argv: ~[str]) {
}
else {
let num_readers = 50;
- let words_per_reader = 1000;
+ let words_per_reader = 600;
vec::from_fn(
num_readers,
|_i| fn~() -> word_reader {
@@ -301,7 +359,8 @@ class random_word_reader: word_reader {
fn read_word() -> option<str> {
if self.remaining > 0 {
self.remaining -= 1;
- some(self.rng.gen_str(5))
+ let len = self.rng.gen_uint_range(1, 4);
+ some(self.rng.gen_str(len))
}
else { none }
}

0 comments on commit 1a276db

Please sign in to comment.