Skip to content
This repository
Browse code

Polymorphic MapReduce!

  • Loading branch information...
commit 640886ceb662ae212cf3edc7f400142af3aa7bbf 1 parent 5c0cc47
authored August 25, 2011
1  src/comp/util/ppaux.rs
@@ -26,6 +26,7 @@ fn mode_str(m: &ty::mode) -> str {
26 26
       mo_val. { "" }
27 27
       mo_alias(false) { "&" }
28 28
       mo_alias(true) { "&mutable " }
  29
+      mo_move. { "-" }
29 30
     }
30 31
 }
31 32
 
328  src/test/bench/task-perf-word-count-generic.rs
... ...
@@ -0,0 +1,328 @@
  1
+/**
  2
+   A parallel word-frequency counting program.
  3
+
  4
+   This is meant primarily to demonstrate Rust's MapReduce framework.
  5
+
  6
+   It takes a list of files on the command line and outputs a list of
  7
+   words along with how many times each word is used.
  8
+
  9
+*/
  10
+
  11
+use std;
  12
+
  13
+import option = std::option::t;
  14
+import std::option::some;
  15
+import std::option::none;
  16
+import std::str;
  17
+import std::treemap;
  18
+import std::vec;
  19
+import std::io;
  20
+
  21
+import std::time;
  22
+import std::u64;
  23
+
  24
+import std::task;
  25
+import std::task::joinable_task;
  26
+import std::comm;
  27
+import std::comm::chan;
  28
+import std::comm::port;
  29
+import std::comm::recv;
  30
+import std::comm::send;
  31
+
  32
+fn map(filename: &[u8], emit: &map_reduce::putter<[u8], int>) {
  33
+    let f = io::file_reader(str::unsafe_from_bytes(filename));
  34
+
  35
+    while true {
  36
+        alt read_word(f) {
  37
+          some(w) { emit(str::bytes(w), 1); }
  38
+          none. { break; }
  39
+        }
  40
+    }
  41
+}
  42
+
  43
+fn reduce(word: &[u8], get: &map_reduce::getter<int>) {
  44
+    let count = 0;
  45
+
  46
+    while true { alt get() { some(_) { count += 1; } none. { break } } }
  47
+}
  48
+
  49
+mod map_reduce {
  50
+    export putter;
  51
+    export getter;
  52
+    export mapper;
  53
+    export reducer;
  54
+    export map_reduce;
  55
+
  56
+    type putter<~K, ~V> = fn(&K, &V);
  57
+
  58
+    // FIXME: the first K1 parameter should probably be a -, but that
  59
+    // doesn't parse at the moment.
  60
+    type mapper<~K1, ~K2, ~V> = fn(&K1, &putter<K2, V>);
  61
+
  62
+    type getter<~V> = fn() -> option<V>;
  63
+
  64
+    type reducer<~K, ~V> = fn(&K, &getter<V>);
  65
+
  66
+    tag ctrl_proto<~K, ~V> {
  67
+        find_reducer(K, chan<chan<reduce_proto<V>>>);
  68
+        mapper_done;
  69
+    }
  70
+
  71
+    tag reduce_proto<~V> { emit_val(V); done; ref; release; }
  72
+
  73
+    fn start_mappers<~K1, ~K2, ~V>(map : mapper<K1, K2, V>,
  74
+                             ctrl: chan<ctrl_proto<K2, V>>, inputs: &[K1])
  75
+        -> [joinable_task] {
  76
+        let tasks = [];
  77
+        for i in inputs {
  78
+            let m = map, c = ctrl, ii = i;
  79
+            tasks += [task::spawn_joinable(bind map_task(m, c, ii))];
  80
+        }
  81
+        ret tasks;
  82
+    }
  83
+
  84
+    fn map_task<~K1, ~K2, ~V>(map : -mapper<K1,K2,V>,
  85
+                              ctrl: -chan<ctrl_proto<K2,V>>, input: -K1) {
  86
+        // log_err "map_task " + input;
  87
+        let intermediates = treemap::init();
  88
+
  89
+        fn emit<~K2, ~V>(im: &treemap::treemap<K2, chan<reduce_proto<V>>>,
  90
+                         ctrl: &chan<ctrl_proto<K2,V>>, key: &K2, val: &V) {
  91
+            let c;
  92
+            alt treemap::find(im, key) {
  93
+              some(_c) {
  94
+                c = _c
  95
+              }
  96
+              none. {
  97
+                let p = port();
  98
+                send(ctrl, find_reducer(key, chan(p)));
  99
+                c = recv(p);
  100
+                treemap::insert(im, key, c);
  101
+                send(c, ref);
  102
+              }
  103
+            }
  104
+            send(c, emit_val(val));
  105
+        }
  106
+
  107
+        map(input, bind emit(intermediates, ctrl, _, _));
  108
+
  109
+        fn finish<~K, ~V>(k : &K, v : &chan<reduce_proto<V>>) {
  110
+            send(v, release);
  111
+        }
  112
+        treemap::traverse(intermediates, finish);
  113
+        send(ctrl, mapper_done);
  114
+    }
  115
+
  116
+    fn reduce_task<~K, ~V>(reduce : -reducer<K,V>,
  117
+                           key: -K, out: -chan<chan<reduce_proto<V>>>) {
  118
+        let p = port();
  119
+
  120
+        send(out, chan(p));
  121
+
  122
+        let ref_count = 0;
  123
+        let is_done = false;
  124
+
  125
+        fn get<~V>(p: &port<reduce_proto<V>>, ref_count: &mutable int,
  126
+               is_done: &mutable bool) -> option<V> {
  127
+            while !is_done || ref_count > 0 {
  128
+                alt recv(p) {
  129
+                  emit_val(v) {
  130
+                    // log_err #fmt("received %d", v);
  131
+                    ret some(v);
  132
+                  }
  133
+                  done. {
  134
+                    // log_err "all done";
  135
+                    is_done = true;
  136
+                  }
  137
+                  ref. { ref_count += 1; }
  138
+                  release. { ref_count -= 1; }
  139
+                }
  140
+            }
  141
+            ret none;
  142
+        }
  143
+
  144
+        reduce(key, bind get(p, ref_count, is_done));
  145
+    }
  146
+
  147
+    fn map_reduce<~K1, ~K2, ~V>(map : mapper<K1,K2,V>,
  148
+                               reduce : reducer<K2, V>,
  149
+                               inputs: &[K1]) {
  150
+        let ctrl = port();
  151
+
  152
+        // This task becomes the master control task. It task::_spawns
  153
+        // to do the rest.
  154
+
  155
+        let reducers = treemap::init();
  156
+
  157
+        let tasks = start_mappers(map, chan(ctrl), inputs);
  158
+
  159
+        let num_mappers = vec::len(inputs) as int;
  160
+
  161
+        while num_mappers > 0 {
  162
+            alt recv(ctrl) {
  163
+              mapper_done. {
  164
+                // log_err "received mapper terminated.";
  165
+                num_mappers -= 1;
  166
+              }
  167
+              find_reducer(k, cc) {
  168
+                let c;
  169
+                // log_err "finding reducer for " + k;
  170
+                alt treemap::find(reducers, k) {
  171
+                  some(_c) {
  172
+                    // log_err "reusing existing reducer for " + k;
  173
+                    c = _c;
  174
+                  }
  175
+                  none. {
  176
+                    // log_err "creating new reducer for " + k;
  177
+                    let p = port();
  178
+                    let r = reduce, kk = k;
  179
+                    tasks +=
  180
+                        [task::spawn_joinable(bind reduce_task(r,
  181
+                                                               kk, chan(p)))];
  182
+                    c = recv(p);
  183
+                    treemap::insert(reducers, k, c);
  184
+                  }
  185
+                }
  186
+                send(cc, c);
  187
+              }
  188
+            }
  189
+        }
  190
+
  191
+        fn finish<~K, ~V>(k : &K, v : &chan<reduce_proto<V>>) {
  192
+            send(v, done);
  193
+        }
  194
+        treemap::traverse(reducers, finish);
  195
+
  196
+        for t in tasks { task::join(t); }
  197
+    }
  198
+}
  199
+
  200
+fn main(argv: [str]) {
  201
+    if vec::len(argv) < 2u {
  202
+        let out = io::stdout();
  203
+
  204
+        out.write_line(#fmt["Usage: %s <filename> ...", argv[0]]);
  205
+
  206
+        // TODO: run something just to make sure the code hasn't
  207
+        // broken yet. This is the unit test mode of this program.
  208
+
  209
+        ret;
  210
+    }
  211
+
  212
+    let iargs = [];
  213
+    for a in vec::slice(argv, 1u, vec::len(argv)) {
  214
+        iargs += [str::bytes(a)];
  215
+    }
  216
+
  217
+    // We can get by with 8k stacks, and we'll probably exhaust our
  218
+    // address space otherwise.
  219
+    task::set_min_stack(8192u);
  220
+
  221
+    let start = time::precise_time_ns();
  222
+
  223
+    map_reduce::map_reduce(map, reduce, iargs);
  224
+    let stop = time::precise_time_ns();
  225
+
  226
+    let elapsed = stop - start;
  227
+    elapsed /= 1000000u64;
  228
+
  229
+    log_err "MapReduce completed in " + u64::str(elapsed) + "ms";
  230
+}
  231
+
  232
+fn read_word(r: io::reader) -> option<str> {
  233
+    let w = "";
  234
+
  235
+    while !r.eof() {
  236
+        let c = r.read_char();
  237
+
  238
+
  239
+        if is_word_char(c) {
  240
+            w += str::from_char(c);
  241
+        } else { if w != "" { ret some(w); } }
  242
+    }
  243
+    ret none;
  244
+}
  245
+
  246
+fn is_digit(c: char) -> bool {
  247
+    alt c {
  248
+      '0' { true }
  249
+      '1' { true }
  250
+      '2' { true }
  251
+      '3' { true }
  252
+      '4' { true }
  253
+      '5' { true }
  254
+      '6' { true }
  255
+      '7' { true }
  256
+      '8' { true }
  257
+      '9' { true }
  258
+      _ { false }
  259
+    }
  260
+}
  261
+
  262
+fn is_alpha_lower(c: char) -> bool {
  263
+    alt c {
  264
+      'a' { true }
  265
+      'b' { true }
  266
+      'c' { true }
  267
+      'd' { true }
  268
+      'e' { true }
  269
+      'f' { true }
  270
+      'g' { true }
  271
+      'h' { true }
  272
+      'i' { true }
  273
+      'j' { true }
  274
+      'k' { true }
  275
+      'l' { true }
  276
+      'm' { true }
  277
+      'n' { true }
  278
+      'o' { true }
  279
+      'p' { true }
  280
+      'q' { true }
  281
+      'r' { true }
  282
+      's' { true }
  283
+      't' { true }
  284
+      'u' { true }
  285
+      'v' { true }
  286
+      'w' { true }
  287
+      'x' { true }
  288
+      'y' { true }
  289
+      'z' { true }
  290
+      _ { false }
  291
+    }
  292
+}
  293
+
  294
+fn is_alpha_upper(c: char) -> bool {
  295
+    alt c {
  296
+      'A' { true }
  297
+      'B' { true }
  298
+      'C' { true }
  299
+      'D' { true }
  300
+      'E' { true }
  301
+      'F' { true }
  302
+      'G' { true }
  303
+      'H' { true }
  304
+      'I' { true }
  305
+      'J' { true }
  306
+      'K' { true }
  307
+      'L' { true }
  308
+      'M' { true }
  309
+      'N' { true }
  310
+      'O' { true }
  311
+      'P' { true }
  312
+      'Q' { true }
  313
+      'R' { true }
  314
+      'S' { true }
  315
+      'T' { true }
  316
+      'U' { true }
  317
+      'V' { true }
  318
+      'W' { true }
  319
+      'X' { true }
  320
+      'Y' { true }
  321
+      'Z' { true }
  322
+      _ { false }
  323
+    }
  324
+}
  325
+
  326
+fn is_alpha(c: char) -> bool { is_alpha_upper(c) || is_alpha_lower(c) }
  327
+
  328
+fn is_word_char(c: char) -> bool { is_alpha(c) || is_digit(c) || c == '_' }

0 notes on commit 640886c

Please sign in to comment.
Something went wrong with that request. Please try again.