Skip to content

Commit

Permalink
M/R phases working
Browse files Browse the repository at this point in the history
  • Loading branch information
Brian Rowe authored and Brian Rowe committed Nov 14, 2011
1 parent 7073ffa commit da9d179
Show file tree
Hide file tree
Showing 11 changed files with 227 additions and 94 deletions.
1 change: 1 addition & 0 deletions app/bucket-mr.html.template
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
<div id="mr_jobs">
<form action="#/mapred/<%= bucket %>" method="post">
<input type="hidden" id="mr_phases" name="phases" value=""/>
<select name="phase" multiple="multiple" title="Select map/reduce phases">
</select>
<button type="submit">Run</button>
Expand Down
3 changes: 3 additions & 0 deletions app/key-mr.html.template
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
<tr>
<td colspan="7"><%= value %></td>
</tr>
118 changes: 75 additions & 43 deletions app/rekon.js
Original file line number Diff line number Diff line change
@@ -1,3 +1,51 @@
// Create a closure that acts as a continuation for a sequence of jQuery get
// calls
function phaseContinuation(count, options, finalizer) {
// Holds a list of the phase specs
phases = [ ];
return function(job_name, phase_data) {
console.log("Adding phase "+job_name);
count--;
json_ready = phase_data
.replace(/[\n\t\r]/g,' ')
phases[phases.length] = {name:job_name, spec:jQuery.parseJSON(json_ready)};
if (count == 0) { finalizer(phases, options); }
}
}

// This is the finalizer function of the continuation
function runPhases(phases, options) {
bucket = options['bucket']
context = options['context']
var mapper = new RiakMapper(Rekon.client, bucket);
// p is an object {name:'job_name.map', spec:<json>}
phases.map(function(p) { addJob(mapper,p) });
mapper.run(null, function(status, list, xmlrequest) {
if (! status) {
context.render('bucket-err.html.template')
.replace('#keys tbody')
.wait();
return;
}

keyRows = list.map(function(obj) {
s = "";
// TODO Make formatting a custom function
for (k in obj) { s += k + " : " + obj[k] + "\n"; }
return {value:s}
});
context.renderEach('key-mr.html.template', keyRows)
.replace('#keys tbody')
.then(function(){ searchable('#bucket table tbody tr'); })
});
}

function addJob(mapper, p) {
if (/\.map$/i.test(p.name)) { mapper.map(p.spec); }
else if (/\.red$/i.test(p.name)) { mapper.reduce(p.spec); }
else { console.log("Skipping unknown file extension for "+p.name); }
}

rekonApp = Sammy(function() {

$container = $(this.$element);
Expand Down Expand Up @@ -68,10 +116,11 @@ rekonApp = Sammy(function() {
.wait();
renderPhase('#mr_jobs select[multiple]', find_mrs, function() {
$("#mr_jobs select[multiple]")
.asmSelect({sortable: true, animate: true, addItemTarget: 'top'});
.asmSelect({sortable: true, animate: true});
});
context.render('bucket.html.template', {bucket: name})
.appendTo('#main');
context.render('bucket.html.template', {bucket:name})
.appendTo('#main')
.wait();

bucket.keys(function(keys) {
if (keys.length > 0) {
Expand Down Expand Up @@ -408,50 +457,23 @@ rekonApp = Sammy(function() {
.attr('target', '_blank')
.text('Riak').addClass('action'));

context.render('bucket-mr.html.template', {bucket:name})
.appendTo('#main')
.wait();
renderPhase('#mr_jobs select[multiple]', find_mrs, function() {
$("#mr_jobs select[multiple]")
.asmSelect({sortable: true, animate: true});
});
context.render('bucket.html.template', {bucket:name})
.appendTo('#main')
.wait();

// TODO Assemble all phases together into a single job request
phase_url = Rekon.riakUrl('rekon.jobs/'+this.params['phase']);
jQuery.get(phase_url, function(data) {
phase_data = jQuery.parseJSON(data);
var mapper = new RiakMapper(Rekon.client, name);
mapper.map(phase_data);
mapper.run(null, function(status, list, xmlrequest) {
if (! status) {
context.render('bucket-err.html.template')
.replace('#keys tbody')
.wait();
return;
}

keyRows = list.map(
function(val) { return {bucket:name, key:val}; }
);
context.renderEach('key-row.html.template', keyRows)
.replace('#keys tbody')
.then(function(){ searchable('#bucket table tbody tr'); })
.then(function(){ renderPhase('#keys select#phase'); });

/*
switch(object.contentType) {
case 'image/png':
case 'image/jpeg':
case 'image/jpg':
case 'image/gif':
context.render('value-image.html.template', {bucket: name, key: key}).appendTo('#value');
return;
case 'application/json':
value = JSON.stringify(object, null, 4);
break;
default:
value = object;
break;
}
context.render('value-pre.html.template', {value: value}).appendTo('#value');
*/
});
raw_phases = jQuery.parseJSON(this.params['phases']);
options = {bucket:name,context:context};
continuation = phaseContinuation(raw_phases.length, options, runPhases);
raw_phases.map(function(p) {
phase_url = Rekon.riakUrl('rekon.jobs/'+p);
jQuery.get(phase_url, function(data) { continuation(p, data) });
});
});

Expand Down Expand Up @@ -527,6 +549,16 @@ Rekon = {

};

// Set the form field to contain all selected elements
$('#mr_jobs button').live('click', function(e){
var link = this;
//e.preventDefault();
var jobs = [ ];
$('#mr_jobs :selected')
.each(function(i, selected) { if (i>0) {jobs[i-1] = $(selected).val();} });
$('#mr_phases').val(JSON.stringify(jobs));
});

$('#keys a.move').live('click', function(e){
var link = this;
e.preventDefault();
Expand Down
5 changes: 5 additions & 0 deletions buckets/alice/p1
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
Alice was beginning to get very tired of sitting by her sister on the
bank, and of having nothing to do: once or twice she had peeped into the
book her sister was reading, but it had no pictures or conversations in
it, 'and what is the use of a book,' thought Alice 'without pictures or
conversation?'
5 changes: 5 additions & 0 deletions buckets/alice/p2
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
So she was considering in her own mind (as well as she could, for the
hot day made her feel very sleepy and stupid), whether the pleasure
of making a daisy-chain would be worth the trouble of getting up and
picking the daisies, when suddenly a White Rabbit with pink eyes ran
close by her.
4 changes: 4 additions & 0 deletions buckets/alice/p5
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
The rabbit-hole went straight on like a tunnel for some way, and then
dipped suddenly down, so suddenly that Alice had not a moment to think
about stopping herself before she found herself falling down a very deep
well.
3 changes: 2 additions & 1 deletion erlang/myjson.erl
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
-module(myjson).
-export([encode/3]).
-export([encode/3, encode/1]).

encode(RiakObject, _, _) ->
io:format("~p", [RiakObject]),
encode(riak_object:get_value(RiakObject)).

encode(<<"{}">>) -> [];
encode({M,S,N}) -> [M,S,N];
encode(Other) ->
Term = binary_to_term(Other),
[tl(tuple_to_list(Term))].
Expand Down
150 changes: 100 additions & 50 deletions install.sh
Original file line number Diff line number Diff line change
@@ -1,58 +1,108 @@
#!/bin/sh

# Allow running the script and specifiying an install target
# ./install node-address:host
if [ -n "$1" ] ; then
node=$1
else
node="127.0.0.1:8098"
fi

riak_url="http://$node/riak/rekon"

echo "Installing rekon to $node..."

base_dir="`dirname $0`/app"

# loop through everything in the app directory and put in in the rekon bucket
for f in $(ls $base_dir); do
# echo "Uploading $f to riak"
case $f in
go | *.html )
content_type="text/html"
;;
*.js )
content_type="application/javascript"
;;
*.css )
content_type="text/css"
;;
*.png )
content_type="image/png"
;;
*.gif )
content_type="image/gif"
;;
*.template )
content_type="application/x-sammy-template"
;;
#!/bin/bash
# Author: Brian Lee Yung Rowe
# Date: 2001.07.14
# Rewritten from the original version by Adam Hunter

do_exit()
{
echo $1
exit $2
}

do_usage()
{
do_exit "Usage: $0 [aejd] [host:port]" $1
}

content_type()
{
case $1 in
go | *.html) content_type="text/html";;
*.js) content_type="application/javascript";;
*.css) content_type="text/css";;
*.png) content_type="image/png";;
*.gif) content_type="image/gif";;
*.template) content_type="application/x-sammy-template";;
*) content_type="text/plain";;
esac
echo $content_type
}

#echo "Uploading $f"
curl -X PUT -H"Content-Type: $content_type" $riak_url/$f --data-binary @$base_dir/$f
done
do_install_rekon()
{
echo "Installing rekon to $node..."
riak_url="http://$node/riak/rekon"
base_dir="`dirname $0`/app"

for f in $(ls $base_dir); do
[ -n "$verbose" ] && echo "Uploading $f to rekon"
content_type=$(content_type $f)
curl -X PUT -H"Content-Type: $content_type" $riak_url/$f --data-binary @$base_dir/$f
done
}

do_install_jobs()
{
echo "Installing jobs"
riak_url="http://$node/riak/rekon.jobs"
base_dir="`dirname $0`/jobs"
for f in $(ls $base_dir)
do
[ -n "$verbose" ] && echo "Uploading job $f to rekon.jobs"
content_type="application/javascript"
curl -X PUT -H"Content-Type: $content_type" $riak_url/$f --data-binary @$base_dir/$f
done
}

do_install_data()
{
echo "Installing data"
riak_url="http://$node/riak"
base_dir=$(dirname $0)/buckets
for bucket in $(ls $base_dir)
do
dir=$base_dir/$bucket
[ -n "$verbose" ] && echo "Adding bucket $bucket"
for f in $(ls $dir)
do
[ -n "$verbose" ] && echo "Adding file as $bucket/$f"
content_type=$(content_type $f)
curl -X PUT -H"Content-Type: $content_type" $riak_url/$bucket/$f --data-binary @$dir/$f
done
done
}

riak_url="http://$node/riak/rekon.jobs"
base_dir="`dirname $0`/jobs"
for f in $(ls $base_dir)
do_install_erlang()
{
echo "Installing erlang modules to $module_dir"
sudo mkdir -p $module_dir
sudo erlc -o $module_dir erlang/*.erl

}

node="127.0.0.1:8098"
module_dir=/etc/riak/erlang
while getopts "aeE:jdv?" option
do
content_type="application/javascript"
#echo "Uploading $f"
curl -X PUT -H"Content-Type: $content_type" $riak_url/$f --data-binary @$base_dir/$f
case $option in
a) erlang=yes; jobs=yes; data=yes;;
e) erlang=yes;;
E) erlang=yes; module_dir=$OPTARG;;
j) jobs=yes;;
d) data=yes;;
v) verbose=yes;;
'?') usage 0;;
*) usage 1;;
esac
done
shift $(($OPTIND - 1))

[ -n "$1" ] && node=$1

do_install_rekon
[ -n "$jobs" ] && do_install_jobs
[ -n "$data" ] && do_install_data
[ -n "$erlang" ] && do_install_erlang

module_dir=/etc/riak/erlang
sudo mkdir -p $module_dir
sudo erlc -o $module_dir erlang/*.erl

echo "Installed, now visit: $riak_url/go"
14 changes: 14 additions & 0 deletions jobs/count_words.map
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
{"language":"javascript","source":"
function(v) {
var m = v.values[0].data.toLowerCase().split(' ');
var r = [];
for(var i in m) {
if(m[i] != '') {
var o = {};
o[m[i]] = 1;
r.push(o);
}
}
return r;
}
"}
6 changes: 6 additions & 0 deletions jobs/list_to_table.map
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
{"language":"javascript", "source":"
function(v,k,a) {
print(v);
return [ x ];
}
"}
12 changes: 12 additions & 0 deletions jobs/sum_words.red
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
{"language":"javascript","source":"
function(v) {
var r = {};
for(var i in v) {
for(var w in v[i]) {
if(w in r) r[w] += v[i][w];
else r[w] = v[i][w];
}
}
return [r];
}
"}

0 comments on commit da9d179

Please sign in to comment.