Riak supports writing map/reduce query functions in Javascript, as well as specifying query execution over HTTP. This document will teach you how to use these features.
This section hits the ground running with a quick example to demonstrate what HTTP/Javascript map/reduce looks like in Riak. This example will store several chunks of text in Riak, and then compute a word counts on the set of documents.
We will use the Riak HTTP interface to store the texts we want to process:
$ curl -X PUT -H "content-type: text/plain" \ http://localhost:8098/riak/alice/p1 --data-binary @- Alice was beginning to get very tired of sitting by her sister on the bank, and of having nothing to do: once or twice she had peeped into the book her sister was reading, but it had no pictures or conversations in it, 'and what is the use of a book,' thought Alice 'without pictures or conversation?' ^D $ curl -X PUT -H "content-type: text/plain" \ http://localhost:8098/riak/alice/p2 --data-binary @- So she was considering in her own mind (as well as she could, for the hot day made her feel very sleepy and stupid), whether the pleasure of making a daisy-chain would be worth the trouble of getting up and picking the daisies, when suddenly a White Rabbit with pink eyes ran close by her. ^D $ curl -X PUT -H "content-type: text/plain" \ http://localhost:8098/riak/alice/p5 --data-binary @- The rabbit-hole went straight on like a tunnel for some way, and then dipped suddenly down, so suddenly that Alice had not a moment to think about stopping herself before she found herself falling down a very deep well.
With data loaded, we can now run a query:
$ curl -X POST -H "content-type: application/json" http://localhost:8098/mapred --data @- {"inputs":[["alice","p1"],["alice","p2"],["alice","p5"]],"query":[{"map":{"language":"javascript","source":"function(v) { var m = v.values[0].data.toLowerCase().match('\\\\w*','g'); var r = []; for(var i in m) if (m[i] != '') { var o = {}; o[m[i]] = 1; r.push(o); } return r; }"}},{"reduce":{"language":"javascript","source":"function(v) { var r = {}; for (var i in v) { for(var w in v[i]) { if (w in r) r[w] += v[i][w]; else r[w] = v[i][w]; } } return [r]; }"}}]} ^D
And we end up with the word counts for the three documents.
[{"the":8,"rabbit":2,"hole":1,"went":1,"straight":1,"on":2,"like":1,"a":6,"tunnel":1,"for":2,"some":1,"way":1,"and":5,"then":1,"dipped":1,"suddenly":3,"down":2,"so":2,"that":1,"alice":3,"had":3,"not":1,"moment":1,"to":3,"think":1,"about":1,"stopping":1,"herself":2,"before":1,"she":4,"found":1,"falling":1,"very":3,"deep":1,"well":2,"was":3,"considering":1,"in":2,"her":5,"own":1,"mind":1,"as":2,"could":1,"hot":1,"day":1,"made":1,"feel":1,"sleepy":1,"stupid":1,"whether":1,"pleasure":1,"of":5,"making":1,"daisy":1,"chain":1,"would":1,"be":1,"worth":1,"trouble":1,"getting":1,"up":1,"picking":1,"daisies":1,"when":1,"white":1,"with":1,"pink":1,"eyes":1,"ran":1,"close":1,"by":2,"beginning":1,"get":1,"tired":1,"sitting":1,"sister":2,"bank":1,"having":1,"nothing":1,"do":1,"once":1,"or":3,"twice":1,"peeped":1,"into":1,"book":2,"reading":1,"but":1,"it":2,"no":1,"pictures":2,"conversations":1,"what":1,"is":1,"use":1,"thought":1,"without":1,"conversation":1}]
For more details about what each bit of syntax means, and other syntax options, read the following sections. As a quick explanation of how this example map/reduce query worked, though:
- The objects named
p1
,p2
, andp5
from thealice
bucket were given as inputs to the query. - The map function from the phase was run on each object. The function:
function(v) {
var m = v.values[0].data.match('\\w*','g');
var r = [];
for(var i in m)
if (m[i] != '') {
var o = {};
o[m[i]] = 1;
r.push(o);
}
return r;
}
creates a list of JSON objects, one for each word (non-unique) in the text. The object has as a key, the word, and as the value for that key, the integer 1.
- The reduce function from the phase was run on the outputs of the map functions. The function:
function(v) {
var r = {};
for (var i in v) {
for(var w in v[i]) {
if (w in r)
r[w] += v[i][w];
else
r[w] = v[i][w];
}
}
return [r];
}
looks at each JSON object in the input list. It steps through each key in each object, and produces a new object. That new object has a key for each key in every other object, the value of that key being the sum of the values of that key in the other objects. It returns this new object in a list, because it may be run a second time on a list including that object and more inputs from the map phase.
- The final output is a list with one element: a JSON object with a key for each word in all of the documents (unique), with the value of that key being the number of times the word appeared in the documents.
Map/Reduce queries are issued over HTTP via a POST to the /mapred
resource. The body should be application/json
of the form
{"inputs":[...inputs...],"query":[...query...]}
.
Map/Reduce queries have a default timeout of 60000 milliseconds
(60 seconds). The default timeout can be overridden by supplying
a different value, in milliseconds, in the JSON document
{"inputs":[...inputs...],"query":[...query...],"timeout": 90000}
The list of input objects is given as a list of 2-element lists of
the form [Bucket,Key]
or 3-element lists of the form
[Bucket,Key,KeyData]
.
You may also pass just the name of a bucket
({"inputs":"mybucket",...}
), which is equivalent to passing all
of the keys in that bucket as inputs (i.e. “a map/reduce across the
whole bucket”). You should be aware that this triggers the
somewhat expensive “list keys” operation, so you should use it
sparingly.
The query is given as a list of phases, each phase being of the
form {PhaseType:{...spec...}}
. Valid PhaseType
values are
“map”, “reduce”, and “link”.
Every phase spec may include a keep
field, which must have a
boolean value: true
means that the results of this phase should
be included in the final result of the map/reduce, false
means
the results of this phase should be used only by the next phase.
Omitting the keep
field accepts its default value, which is
false
for all phases except the final phase (Riak assumes that
you were most interested in the results of the last phase of your
map/reduce query).
Map phases must be told where to find the code for the function to execute, and what language that function is in.
Function source can be specified directly in the query by using the “source” spec field. Function source can also be loaded from a pre-stored riak object by providing “bucket” and “key” fields in the spec.
For example:
:{“map”:{“language”:”javascript”,”source”:”function(v) { return [v]; }”,”keep”:true}}
would run the Javascript function given in the spec, and include the results in the final output of the m/r query.
:{“map”:{“language”:”javascript”,”bucket”:”myjs”,”key”:”mymap”,”keep”:false}}
would run the Javascript function declared in the content of the
Riak object under mymap
in the myjs
bucket, and the results of
the funciton would not be included in the final output of the m/r
query.
Map phases may also be passed static arguments by using the “arg” spec field.
Reduce phases look exactly like map phases, but are labeled “reduce”.
Link phases accept bucket
and tag
fields that specify which
links match the link query. The string “_” (underscore) in each
field means “match all”, while any other string means “match
exactly this string”. If either field is left out, it is
considered to be set to “_” (match all).
For example:
:{“link”:{“bucket”:”foo”,”keep”:false}}
Would follow all links pointing to objects in the foo
bucket,
regardless of their tag.
Map functions are passed three parameters: the object that the map is being applied to, the “keydata” for that object, and the static argument for the phase.
The object will be a JSON object of the form:
{ "bucket":BucketAsString, "key":KeyAsString, "vclock":VclockAsString, "values":[ { "metadata":{ "X-Riak-VTag":VtagAsString, "X-riak-Last-Modified":LastModAsString, ...other metadata... }, "data":ObjectData }, ...other metadata/data values (siblings)... ] }
object.values[0].data
is probably what you will be interested in
most of the time, but the rest of the details of the object are
provided for your use.
The “keydata” is the third element of the item from the input
bucket/key list (called KeyData
in the Inputs section above), or
“undefined” if none was provided.
The static argument for the phase is the value of the arg
field
from the map spec in the query list.
A map phase should produce a list of results. You will see errors if the output of your map function is not a list. Return the empty list if your map function chooses not to produce output.
Reduce functions are passed two parameters: a list of inputs to reduce, and the static argument for the phase.
The list of inputs to reduce may contain values from previous executions of the reduce function. It will also contain results produced by the preceding map or reduce phase.
The static argument for the phase is the value of the arg
field
from the reduce spec in the query list.
A reduce phase should produce a list of results. You will see errors if the output of your reduce function is not a list. The function should return an empty list, if it has no other output to produce.
If you are storing data through the HTTP interface, and using the
Link
HTTP header, you do not need to worry about writing a
link-extraction function. Just use the predefined
raw_link_walker_resource:mapreduce_linkfun/3
.
But, if you need to extract links from your data in some other
manner, there are many ways to specify Javascript functions to do
that. They all start with setting the linkfun
bucket property.
Through the HTTP interface:
:$ curl -X PUT -H “application/json” http://localhost:8098/riak/bucket \ :> –data “{"props":{"linkfun":{…function…}}}”
The three ways to fill in the value of the linkfun
key are:
- Quoted source code, as the value of the
jsanon
key::{“jsanon”:”function(v,kd,bt) { return []; }”}
- The bucket and key of an object containing the function source:
:{“jsanon”:{“bucket”:Bucket,”key”:Key}}
- The name of a predefined Javascript function:
:{“jsfun”:FunctionName}
The function has basically the same contract as a map function.
The first argument is the object from which links should be
extracted. The second argument is the KeyData
for the object.
The third argument is a Javascript object representing the links
to match at return. The two fields in the object, bucket
and
tag
, will have the values given in the link phase spec from the
query.
The link fun should return a list of the same form as the inputs
list: 2-item bucket/key lists, or 3-item bucket/key/keydata lists.
The main goal of Map/Reduce is to spread the processing of a query across many systems to take advantage of parallel processing power. This is generally done by dividing the query into several steps, dividing the dataset into several chunks, and then running those step/chunk pairs in separate physical hosts.
One step type is called “map”. Map functions take one piece of data as input, and produce zero or more results as output. If you’re familiar with “mapping over a list” in functional programming style, you’re already familiar with “map” steps in a map/reduce query.
Another step type is called “reduce”. The purpose of a “reduce” step is to combine the output of many “map” step evaluations, into one result.
The common example of a map/reduce query involves a “map” step that takes a body of text as input, and produces a word count for that body of text. A reduce step then takes the word counts produced from many bodies of text and either sums them to provide a word count for the corpus, or filters them to produce a list of documents containing only certain counts.
Riak’s map/reduce has an additional goal: increasing data-locality. When processing a large dataset, it’s often much more efficient to take the computation to the data than it is to bring the data to the computation.
It is Riak’s solution to the data-locality problem that determines how Riak spreads the processing across the cluster. In the same way that any Riak node can coordinate a read or write by sending requests directly to the other nodes responsible for maintaining that data, any Riak node can also coordinate a map/reduce query by sending a map-step evaluation request directly to the node responsible for maintaining the input data. Map-step results are sent back to the coordinating node, where reduce-step processing can produce a unified result.
Put more simply: Riak runs map-step functions right on the node holding the input data for those functions, and it runs reduce-step functions on the node coordinating the map/reduce query.
Map/Reduce queries in Riak have two components: a list of inputs and a list of “steps”, or “phases”.
Each element of the input list is a bucket-key pair. This bucket-key pair may also be annotated with “key-data”, which will be passed as an argument to a map function, when evaluated on the object stored under that bucket-key pair.
Each element of the phases list is a description of a map function, a reduce function, or a link function. The description includes where to find the code for the phase function (for map and reduce phases), static data passed to the function every time it is executed during that phase, and a flag indicating whether or not to include the results of that phase in the final output of the query.
The phase list describes the chain of operations each input will flow through. That is, the initial inputs will be fed to the first phase in the list, and the output of that phase will be fed as input to the next phase in the list. This stream will continue through the final phase.
The input list to a map phase must be a list of (possibly annotated) bucket-key pairs. For each pair, Riak will send the request to evaluate the map function to the partition that is responsible for storing the data for that bucket-key. The vnode hosting that partition will lookup the object stored under that bucket-key, and evaluate the map function with the object as an argument. The other arguments to the function will be the annotation, if any is included, with the bucket-key, and the static data for the phase, as specified in the query.
Reduce phases accept any list of data as input, and produce any list of data as output. They also receive a phase-static value, specified in the query definition.
The important thing to understand is that the function defining the reduce phase may be evaluated multiple times, and the input of later evaluations will include the input of earlier evaluations.
For example, a reduce phase may implement the “set-union”
function. In that case, the first set of inputs might be
[1,2,2,3]
, and the output would be [1,2,3]
. When the phase
receives more inputs, say [3,4,5]
, the function will be called
with the concatentation of the two lists: [1,2,3,3,4,5]
.
Other systems refer to the second application of the reduce function as a “re-reduce”. There are at least a couple of reduce-query implementation strategies that work with Riak’s model.
One strategy is to implement the phase preceeding the reduce phase, such that its output is “the same shape” as the output of the reduce phase. This is how the examples in this document are written, and the way that we have found produces cleaner code.
An alternate strategy is to make the output of a reduce phase recognizable, such that it can be extracted from the input list on subsequent applications. For example, if inputs from the preceeding phase are numbers, outputs from the reduce phase could be objects or strings. This would allow the function to find the previous result, and apply new inputs to it.
Link phases find links matching patterns specified in the query definition. The patterns specify which buckets and tags links must have.
“Following a link” means adding it to the output list of this phase. The output of this phase is often most useful as input to a map phase, or another reduce phase.
Riak can also use pre-defined named functions for map and reduce phase processing. Named functions are invoked with the following form:
{"map": {"language": "javascript", "name": "Riak.mapValues", "keep": true}} {"reduce": {"language": "javascript", "name": "Riak.reduceSort", "keep": true}}
The key name
in both examples points to the name of the function
to be used. Riak expects the function to be defined prior to the
execution of the phase using it.
Defining a named function for Riak is a simple process.
- Create a Javascript source file containing the definitions for all the functions you would like Riak to pre-define.
- Edit the
app.config
of your Riak nodes and add the line{js_source_dir, <path_to_source_dir>}
to theriak
configuration block.<path_to_source_dir>
should point to the directory where the file created in step #1 was saved. - Start using the functions in your map/reduce jobs.
When js_source_dir
is enabled, Riak scans the directory for
files ending in .js
. These files are then loaded into each
Javascript VM when it is created.
NOTE: Named functions must be available on all nodes in a cluster for proper map/reduce results.
Named functions can be better than anonymous functions in certain situations. Since named functions live in a file they can be managed using source code control and deployed automatically using tools such as Chef or Puppet. This can be a significant advantage when administrating large Riak clusters.
More important, though, is the fact that named functions execute much faster than the equivalent anonymous functions. Invoking anonymous functions requires Riak to ensure the anonymous function is defined before invoking it. Named functions allow Riak to skip the definition check and execute the function call immediately.
Also, since named functions do not change between invocations, Riak is able to cache named function call results and short circuit the call entirely. Currently, Riak performs this optimization on named functions executed during map phases only.
In general, anonymous functions should be used during development and named functions should be used for production deployments where possible. This combination provides the maximum flexibility and performance.
Riak supplies several named functions out of the box. These
functions are defined on a global Javascript object named Riak
and should not be modified or overridden. These functions, along
with descriptions and notes on their use are described in the
next two sections.
Riak.mapValues(values, keyData, arg)
Extracts and returns only the values contained in a bucket and key.
Riak.mapValuesJson(values, keyData, arg)
Same as mapValues
except the values are passed through a JSON
decoder first.
Riak.reduceSum(values, arg)
Returns the sum of values
Riak.reduceMin(values, arg)
Returns the minimum value from values
Riak.reduceMax(values, arg)
Returns the maximum value from values
Riak.reduceSort(values, arg)
Returns the sorted version of values
. If arg
is the source
to a Javascript function, it will be eval’d and used to
control the sort via Array.sort
.
Riak.reduceLimit(values, arg)
Returns the leftmost n members of values where arg
is used as n.
Riak.reduceSlice(values, arg)
Returns a slice of the values array. arg
must be a two
element array containing the starting and ending positions for
the slice.