/
rclref_object.erl
100 lines (81 loc) · 2.96 KB
/
rclref_object.erl
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
-module(rclref_object).
-export_type([key/0, value/0, riak_object/0, vnode_error/0]).
-type key() :: term().
-type value() :: term().
-type vclock() :: vectorclock:vectorclock().
-record(r_content, {value :: value(), vclock = vectorclock:new() :: vclock()}).
-record(r_object,
{key :: key(),
r_content :: #r_content{},
partition :: non_neg_integer() | undefined,
node :: node() | undefined}).
-record(vnode_error,
{reason :: term(),
partition :: non_neg_integer() | undefined,
node :: node() | undefined}).
-opaque riak_object() :: #r_object{}.
-opaque vnode_error() :: #vnode_error{}.
-export([key/1, content/1, partition/1, node/1, value/1, vclock/1, error_reason/1,
increment_vclock/2, new_vclock/0, merge/1, new/2, new/4, new_content/2, new_error/3]).
-spec key(riak_object()) -> key().
key(#r_object{key = Key}) ->
Key.
-spec content(riak_object()) -> #r_content{}.
content(#r_object{r_content = Content}) ->
Content.
-spec partition(riak_object()) -> non_neg_integer().
partition(#r_object{partition = Partition}) ->
Partition.
-spec node(riak_object()) -> node().
node(#r_object{node = Node}) ->
Node.
-spec value(riak_object() | #r_content{}) -> value().
value(#r_object{r_content = #r_content{value = Value}}) ->
Value;
value(#r_content{value = Value}) ->
Value.
-spec vclock(riak_object() | #r_content{}) -> vclock().
vclock(#r_object{r_content = #r_content{vclock = VClock}}) ->
VClock;
vclock(#r_content{vclock = VClock}) ->
VClock.
-spec error_reason(vnode_error()) -> term().
error_reason(#vnode_error{reason = Reason}) ->
Reason.
-spec new_vclock() -> vclock().
new_vclock() ->
vectorclock:new().
-spec increment_vclock(node(), vclock()) -> vclock().
increment_vclock(Node, VClock) ->
vectorclock:update_with(Node,
fun (X) ->
X + 1
end,
1,
VClock).
-spec new_content(value(), vclock()) -> #r_content{}.
new_content(Value, VClock) ->
#r_content{value = Value, vclock = VClock}.
-spec new_error(term(), non_neg_integer(), node()) -> vnode_error().
new_error(Reason, Partition, Node) ->
#vnode_error{reason = Reason, partition = Partition, node = Node}.
-spec new(key(), value()) -> riak_object().
new(Key, Value) ->
#r_object{key = Key, r_content = #r_content{value = Value}}.
-spec new(key(), #r_content{}, non_neg_integer(), node()) -> riak_object().
new(Key, Content, Partition, Node) ->
#r_object{key = Key, r_content = Content, partition = Partition, node = Node}.
-spec merge([riak_object()]) -> riak_object().
merge([RObj]) ->
RObj;
merge([RObj0, RObj1 | RObjs]) ->
VClock0 = vclock(RObj0),
VClock1 = vclock(RObj1),
NewRObj =
case vectorclock:le(VClock0, VClock1) of
true ->
RObj1;
false ->
RObj0
end,
merge([NewRObj] ++ RObjs).