Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

Initial commit

  • Loading branch information...
commit 55daf57556a77db31dcd18c43cead619cbf8d2c3 0 parents
@krestenkrab authored
166 README.md
@@ -0,0 +1,166 @@
+
+
+<h1>Link Indexing</h1>
+
+This module allows you to create simple secondary indexes
+in Riak based on Riaks' link model. The basic idea is thus:
+
+Assume we model people and companies as separate buckets:
+
+ /riak/person/Name
+ /riak/company/Name
+
+When you store a `/riak/person/Kresten` object, you describe the
+employment relation by including this link in the Kresten object:
+
+ X-Riak-Link: </riak/company/Trifork>; riaktag="idx@employs"
+
+The magic is that `riak_link_index` will then automatically add a link
+in the opposite direction; from `Trifork` to `Kresten`, and that link
+will have tag `employs`. The tag needs to bve start with `idx@` for
+`riak_link_index` to recognize it.
+
+Whenever you update a person object, you can pass in new (or multiple)
+such links, and the old reverse links will automatically be
+deleted/updated as appropriate.
+
+This module also allows you to install an `index_hook`, which can be
+used to extract links from your objects. Index hooks can be written in
+both JavaScript for Erlang.
+
+
+Installation
+============
+
+To install, you need to make the `ebin` directory containing
+`riak_link_index.beam` accessible to your Riak install. You can do that
+by adding a line like this to riaks `etc/vm.args`
+
+<pre>-pz /Path/to/riak_function_contrib/other/riak_link_index/ebin</pre>
+
+If you're an Erlang wiz there are other ways, but that should work.
+
+
+Next, you configure a bucket to support indexing. This involves two things:
+
+1. Install a set of commit hooks (indexing needs both a pre- and a
+ post-commit hook).
+
+2. (optionally) configure a function to extract index information
+ from your bucket data. We'll do that later, and start out with
+ the easy version.
+
+If your bucket is name `person`, it could be done thus:
+
+<pre>
+prompt$ cat > bucket_props.json
+{
+ "precommit" : [{"mod": "riak_link_index", "fun": "precommit"}],
+ "postcommit" : [{"mod": "riak_link_index", "fun": "postcommit"}]
+}
+^D
+prompt$ curl -X PUT --data @bucket_props.json \
+ -H 'Content-Type: application/json' \
+ http://127.0.0.1:8091/riak/person
+</pre>
+
+There you go: you're ready for some action.
+
+Explicit Indexing
+=================
+
+
+The simple indexer now works for the `person` bucket, by interpreting
+links on `/riak/person/XXX` objects that have tags starting with
+`idx@`. The special `idx@` prefix is recognized by the indexer, and
+it will create and maintain a link in the opposite direction, tagged
+with whatever comes after the `idx@` prefix.
+
+Let's say we add me:
+
+ curl -X PUT \
+ -H 'X-Riak-Link: </riak/company/Trifork>; riaktag="idx@employs"' \
+ -H 'Content-Type: application/json' \
+ --data '{ "name": "Kresten Krab Thorup", "employer":"Trifork" }' \
+ http://127.0.0.1:8091/riak/people/kresten
+
+As this gets written to Riak, the indexer will then
+create an object by the name of `/riak/company/Trifork`,
+which has a link pointing back to me:
+
+ curl -v -X GET http://127.0.0.1:8091/riak/company/Trifork
+ < 200 OK
+ < X-Riak-Links: </riak/people/Kresten>; riaktag="employs"
+ < Content-Length: 0
+
+If there was already an object at `/company/Trifork`, then the indexer
+would leave the contents alone, but still add the reverse link. If no
+such object existed, then it would be created with empty contents.
+
+Link Walking
+============
+
+The beauty of this is that you can now do link-walk queries to find
+your stuff. For instance, this link query should give you a list of
+people employed at Trifork. Lucky them :-)
+
+ curl http://localhost:8091/riak/company/Trifork/_,_,employs
+
+Using a link_index hook
+=======================
+
+You can also install an index hook as a bucket property, which designates
+a function that can be used to decide which index records to create. This way
+you can keep the index creation on the server side; and also more easily
+generate some more indexes.
+
+You install the index hook the same way you install a pre-commit hook; and the
+hook can be written in either Erlang or JavaScript, just like precommits.
+
+Assume you have this installed inside `priv/` in your riak setup
+
+ // Return list of [Bucket,Key] that will link to me
+ function employmentIndexing(metaData, contents) {
+ personData = JSON.parse(contents);
+ if(personData.employer) {
+ return [ ['company', personData.employer] ];
+ } else {
+ reuturn [];
+ }
+ }
+
+To install it as an indexer, you need to get install it as a bucket
+property in the person bucket. You can have more indexes, so it's a
+list of functions. Link-Index hooks can also be erlang functions.
+
+ prompt$ cat > bucket_props.json
+ {
+ "link_index" : [{"name": "employmentIndexing", "tag": "employs"}],
+ }
+ ^D
+ prompt$ curl -X PUT --data @bucket_props.json \
+ -H 'Content-Type: application/json' \
+ http://127.0.0.1:8091/riak/person
+
+Now, we can add objects to the person bucket *without* having to put
+the `idx@employs` link on the object. The index hook will do it for
+you. Happy you!
+
+
+Maintenance
+===========
+
+The indexer will also handle delete/update of your records as
+appropriate, and should work fine with `allow_mult` buckets too.
+
+
+Status
+======
+
+The code is at present completely untested, and does not work yet on
+0.14 (release) branch, only on the `master`, because it needs to
+communicate stuff from the precommit hook to the postcommit hook, and
+right now it just passes that in the pdict, but as I said, that only
+wortks on the current develepoment master.
+
+
7 ebin/riak_link_index.app
@@ -0,0 +1,7 @@
+{application,riak_link_index,
+ [{description,"Simple link-based indexing for riak"},
+ {vsn,"0.1"},
+ {modules,[riak_link_index]},
+ {registered,[]},
+ {applications,[kernel,stdlib]},
+ {env,[]}]}.
BIN  rebar
Binary file not shown
5 rebar.config
@@ -0,0 +1,5 @@
+{erl_opts, [debug_info]}.
+{deps, [{edown, ".*", {git, "git://github.com/esl/edown.git", "HEAD"}}]}.
+{edoc_opts, [{doclet, edown_doclet},
+ {src_path, ["src/"]},
+ {subpackages, true}]}.
9 src/riak_link_index.app.src
@@ -0,0 +1,9 @@
+{application, riak_link_index,
+ [{description, "Simple link-based indexing for riak"},
+ {vsn, "0.1"},
+ {modules, []},
+ {registered, []},
+ {applications, [kernel, stdlib]},
+ {env, []}
+ ]}.
+
346 src/riak_link_index.erl
@@ -0,0 +1,346 @@
+%% -------------------------------------------------------------------
+%%
+%% This file is provided to you under the Apache License,
+%% Version 2.0 (the "License"); you may not use this file
+%% except in compliance with the License. You may obtain
+%% a copy of the License at
+%%
+%% http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing,
+%% software distributed under the License is distributed on an
+%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+%% KIND, either express or implied. See the License for the
+%% specific language governing permissions and limitations
+%% under the License.
+%%
+%% -------------------------------------------------------------------
+
+-module(riak_link_index).
+-author("Kresten Krab Thorup <krab@trifork.com>").
+
+-export([precommit/1,postcommit/1]).
+
+-define(MD_LINKS,<<"Links">>).
+-define(MD_DELETED,<<"X-Riak-Deleted">>).
+-define(IDX_PREFIX,"idx@").
+-define(JSPOOL_HOOK, riak_kv_js_hook).
+
+precommit(Object) ->
+
+ {ok, StorageMod} = riak:local_client(),
+ Bucket = riak_object:bucket(Object),
+ Key = riak_object:key(Object),
+
+ %% Indexing works in two phases: precommit will use a hook to add links as
+ %%
+ %% </riak/IBucket/IKey>; riaktag="idx@Tag"
+ %%
+ %% to the object being stored. Then postcommit creates empty-contents
+ %% objects named IBucket/IKey, with links to this object thus:
+ %%
+ %% <riak/Bucket/Key>; riaktag="Tag"
+ %%
+
+ case riak_object:is_updated(Object) of
+ true ->
+ OldLinksToMe = get_index_links(riak_object:get_metadatas(Object)),
+ [{MD,_Value}] = index_contents(StorageMod,
+ Bucket,
+ [{ riak_object:get_update_metadata(Object),
+ riak_object:get_update_value(Object) }]),
+ IndexedObject = riak_object:update_metadata(Object, MD);
+
+ false ->
+ case StorageMod:get(Bucket, Key) of
+ {ok, OldRO} ->
+ OldLinksToMe = get_index_links(riak_object:get_metadatas(OldRO));
+ _ ->
+ OldLinksToMe = []
+ end,
+ MDVs = index_contents(StorageMod,
+ Bucket,
+ riak_object:get_contents(Object)),
+ IndexedObject = riak_object:set_contents(Object, MDVs)
+ end,
+
+ %% compute links to add/remove in postcommit
+ NewLinksToMe = get_index_links(IndexedObject),
+ LinksToRemove = ordsets:subtract(OldLinksToMe, NewLinksToMe),
+ LinksToAdd = ordsets:subtract(NewLinksToMe, OldLinksToMe),
+
+ %% this only works in recent riak_kv master branch
+ put(?MODULE, {LinksToAdd, LinksToRemove}),
+
+ IndexedObject.
+
+postcommit(Object) ->
+
+ case erlang:erase(?MODULE) of
+ {[], []} ->
+ ok;
+ {LinksToAdd, LinksToRemove} ->
+ {ok, StorageMod} = riak:local_client(),
+ Bucket = riak_object:bucket(Object),
+ Key = riak_object:key(Object),
+ add_links(StorageMod, LinksToAdd, Bucket, Key),
+ remove_links(StorageMod, LinksToRemove, Bucket, Key),
+ ok;
+ _ ->
+ error_logger:error_msg("error in pre/postcommit interaction", []),
+ ok
+ end.
+
+add_links(StorageMod, Links, Bucket, Key) ->
+ lists:foreach(fun({{IndexB,IndexK}, <<?IDX_PREFIX,Tag/binary>>}) ->
+ add_link(StorageMod, IndexB, IndexK, {{Bucket,Key},Tag})
+ end,
+ Links).
+
+add_link(StorageMod, Bucket, Key, Link) ->
+ update(
+ fun(Object) ->
+ Links = get_all_links(Object),
+ IO1 = riak_object:update_value(Object, <<>>),
+ riak_object:update_metadata
+ (IO1, dict:store(?MD_LINKS,
+ [Link] ++ Links,
+ riak_object:get_update_metadata(IO1)))
+ end,
+ StorageMod, Bucket, Key).
+
+remove_links(StorageMod, Links, Bucket, Key) ->
+ lists:foreach(fun({{IndexB,IndexK}, <<?IDX_PREFIX,Tag/binary>>}) ->
+ remove_link(StorageMod, IndexB, IndexK, {{Bucket,Key},Tag})
+ end,
+ Links).
+
+remove_link(StorageMod, Bucket, Key, Link) ->
+ update(
+ fun(Object) ->
+ Links = get_all_links(Object),
+ UpdLinks = ordsets:del_element(Link,Links),
+ IO1 = riak_object:update_value(Object, <<>>),
+ riak_object:update_metadata
+ (IO1, dict:store(?MD_LINKS,
+ UpdLinks,
+ riak_object:get_update_metadata(IO1)))
+ end,
+ StorageMod, Bucket, Key).
+
+update(Fun,StorageMod,Bucket,Key) ->
+ Updated =
+ case StorageMod:get(Bucket,Key) of
+ {ok, Object} ->
+ Fun(Object);
+ _ ->
+ Fun(riak_object:new(Bucket, Key, <<>>))
+ end,
+
+ StorageMod:put(Updated, 0).
+
+
+get_index_links(MDList) ->
+ ordsets:filter(fun({_, <<?IDX_PREFIX,_/binary>>}) ->
+ true;
+ (_) ->
+ false
+ end,
+ get_all_links(MDList)).
+
+get_all_links(Object) when element(1,Object) =:= r_object ->
+ get_all_links
+ (case riak_object:is_updated(Object) of
+ true ->
+ [riak_object:get_update_metadata(Object)]
+ ++ riak_object:get_metadatas(Object);
+ false ->
+ riak_object:get_metadatas(Object)
+ end);
+
+get_all_links(MetaDatas) when is_list(MetaDatas) ->
+ Links = lists:fold(fun(MetaData, Acc) ->
+ case dict:find(?MD_LINKS, MetaData) of
+ error ->
+ Acc;
+ {ok, LinksList} ->
+ LinksList ++ Acc
+ end
+ end,
+ [],
+ MetaDatas),
+
+ ordsets:from_list(Links).
+
+index_contents(StorageMod, Bucket, Contents) ->
+
+ %% grab indexes from bucket properties
+ {ok, IndexHooks} = get_index_hooks(StorageMod, Bucket),
+
+ lists:map
+ (fun({MD,Value}) ->
+ case dict:find(?MD_DELETED, MD) of
+ {ok, "true"} ->
+ {remove_idx_links(MD),Value};
+ _ ->
+ NewMD = compute_indexed_md(MD, Value, IndexHooks),
+ {NewMD, Value}
+ end
+ end,
+ Contents).
+
+remove_idx_links(MD) ->
+ %% remove any "idx#..." links
+ case dict:find(?MD_LINKS, MD) of
+ error ->
+ MD;
+ {ok, Links} ->
+ dict:store
+ (?MD_LINKS,
+ lists:filter(fun({_,<<?IDX_PREFIX,_/binary>>}) ->
+ false;
+ (_) ->
+ true
+ end,
+ Links),
+ MD)
+ end.
+
+
+compute_indexed_md(MD, Value, IndexHooks) ->
+ lists:foldl
+ (fun({struct, PropList}=IndexHook, MDAcc) ->
+ {<<"tag">>, Tag} = proplists:lookup(PropList, <<"tag">>),
+ Links = case dict:find(?MD_LINKS, MDAcc) of
+ error -> [];
+ {ok, MDLinks} -> MDLinks
+ end,
+ IdxTag = <<?IDX_PREFIX,Tag/binary>>,
+ KeepLinks =
+ lists:filter(fun({{_,_}, TagValue}) when TagValue =:= IdxTag ->
+ false;
+ (_) ->
+ true
+ end,
+ Links),
+ NewLinksSansTag =
+ try apply_index_hook(IndexHook, MD, Value) of
+ {erlang, _, {ok, IL}} when is_list(IL) ->
+ IL;
+ {js, _, {ok, IL}} when is_list(IL) ->
+ IL;
+ _Val ->
+ error_logger:error_msg
+ ("indexing function returned ~p", [_Val]),
+ []
+ catch
+ _:_ ->
+ error_logger:error_msg
+ ("exception invoking indexing function", []),
+ []
+ end,
+
+ ResultLinks =
+ lists:map(fun({Bucket,Key}) when is_binary(Bucket), is_binary(Key) ->
+ {{Bucket, Key}, Tag};
+ ([Bucket, Key]) when is_binary(Bucket), is_binary(Key) ->
+ {{Bucket, Key}, Tag}
+ end,
+ NewLinksSansTag)
+ ++
+ KeepLinks,
+
+ dict:store(?MD_LINKS, ResultLinks, MDAcc)
+ end,
+ MD,
+ IndexHooks).
+
+
+%%%%%% code from riak_kv_put_fsm %%%%%%
+
+
+get_index_hooks(_StorageMod, Bucket) ->
+
+ {ok,Ring} = riak_core_ring_manager:get_my_ring(),
+ BucketProps = riak_core_bucket:get_bucket(Bucket, Ring),
+
+ IndexHooks = proplists:get_value(link_index, BucketProps, []),
+ case IndexHooks of
+ <<"none">> ->
+ [];
+ {struct, Hook} ->
+ [{struct, Hook}];
+ IndexHooks when is_list(IndexHooks) ->
+ IndexHooks
+ end.
+
+
+-spec apply_index_hook(_,MD::dict(),Value::term()) ->
+ [{{binary(),binary()},binary()}].
+
+apply_index_hook({struct, Hook}, MD, Value) ->
+ Mod = proplists:get_value(<<"mod">>, Hook),
+ Fun = proplists:get_value(<<"fun">>, Hook),
+ JSName = proplists:get_value(<<"name">>, Hook),
+ invoke_hook(Mod, Fun, JSName, MD, Value);
+apply_index_hook(HookDef, _, _) ->
+ {error, {invalid_hook_def, HookDef}}.
+
+invoke_hook(Mod0, Fun0, undefined, MD, Value) when Mod0 /= undefined, Fun0 /= undefined ->
+ Mod = binary_to_atom(Mod0, utf8),
+ Fun = binary_to_atom(Fun0, utf8),
+ try
+ {erlang, {Mod, Fun}, Mod:Fun(MD, Value)}
+ catch
+ Class:Exception ->
+ {erlang, {Mod, Fun}, {'EXIT', Mod, Fun, Class, Exception}}
+ end;
+invoke_hook(undefined, undefined, JSName, MD, Value) when JSName /= undefined ->
+ {js, JSName, riak_kv_js_manager:blocking_dispatch
+ (?JSPOOL_HOOK, {{jsfun, JSName}, [jsonify_metadata(MD), Value]}, 5)};
+invoke_hook(_, _, _, _, _) ->
+ {error, {invalid_hook_def, no_hook}}.
+
+
+
+
+%%%%% code from riak_object %%%%%%
+
+jsonify_metadata(MD) ->
+ MDJS = fun({LastMod, Now={_,_,_}}) ->
+ % convert Now to JS-readable time string
+ {LastMod, list_to_binary(
+ httpd_util:rfc1123_date(
+ calendar:now_to_local_time(Now)))};
+ ({<<"Links">>, Links}) ->
+ {<<"Links">>, [ [B, K, T] || {{B, K}, T} <- Links ]};
+ ({Name, List=[_|_]}) ->
+ {Name, jsonify_metadata_list(List)};
+ ({Name, Value}) ->
+ {Name, Value}
+ end,
+ {struct, lists:map(MDJS, dict:to_list(MD))}.
+
+%% @doc convert strings to binaries, and proplists to JSON objects
+jsonify_metadata_list([]) -> [];
+jsonify_metadata_list(List) ->
+ Classifier = fun({Key,_}, Type) when (is_binary(Key) orelse is_list(Key)),
+ Type /= array, Type /= string ->
+ struct;
+ (C, Type) when is_integer(C), C >= 0, C =< 256,
+ Type /= array, Type /= struct ->
+ string;
+ (_, _) ->
+ array
+ end,
+ case lists:foldl(Classifier, undefined, List) of
+ struct -> {struct, [ {if is_list(Key) -> list_to_binary(Key);
+ true -> Key
+ end,
+ if is_list(Value) -> jsonify_metadata_list(Value);
+ true -> Value
+ end}
+ || {Key, Value} <- List]};
+ string -> list_to_binary(List);
+ array -> List
+ end.
Please sign in to comment.
Something went wrong with that request. Please try again.