From c4b56084b8217ace37612b2d6c307401c4456967 Mon Sep 17 00:00:00 2001 From: Marsell Kukuljevic Date: Mon, 5 Dec 2011 16:33:39 +1100 Subject: [PATCH] Update riak_kv_test_backend and generated app.config so they work with Riak 1.0+. The riak_kv_test_backend is a modified riak_kv_memory_backend with reset/0, provided by the Ripple project. --- erl_src/riak_kv_test_backend.beam | Bin 3844 -> 5936 bytes erl_src/riak_kv_test_backend.erl | 573 ++++++++++++++++++++++++------ src/test_server.coffee | 10 +- 3 files changed, 468 insertions(+), 115 deletions(-) diff --git a/erl_src/riak_kv_test_backend.beam b/erl_src/riak_kv_test_backend.beam index 1d13e677f8f4dd44819ea7381a7fa725f57dcb80..7508eb754b3bec849225701e0a1082a60ce616d0 100644 GIT binary patch literal 5936 zcma)AO>7&-72ee>#TBKAS6oVRY$slO8Ot#(YZX$qDMj{(D~oab6Iyf9f*ji38VF5+7Hv_W>7i&3L5l)C6h+WBKvDG2 z?=6=WogW}8JnhVzH*dc8GjE1F`^?;!Bn|D)oR~f{?KPGqiTy~DqzkcTwX{@NS}S;t z>lNlp%3W!m_j6XkP1Ctyi3dYTc=HJIz|Dz9=8qR;_yu z7Abg*Lapk0ee{nyC17Tb=d!wUg_L=Y;(Cph?ut`$Jcj{OcdgM_T3zY#+|WX)<~oYk zTy;WDv)O3&)Ei!5p@HSt%BsiejVp3O9*Mw4%wBftUf)8aRw>M{!g|k*l-z6eGURKO z5H0U^1Ma-j4EcNov#oldXuv+K$QBtop94t$W2q7gAr`cU;6yTrkYTb33USGLUU#Kp&tazvuv37z) zN-Haci%xCDX@+UOQ(xvPOnlvnbN!vinm$pMQUqAfNmxV*!8&MY@%Ay`@T}-5zYM z5X=}&1=9M6zY|BTky<(avNbve6C}xQR2)g-uVepeAbQq8UjxNutt+}$l8rt|3QJOY zoK1`~Bf}&;!!$E7uFnaRg*8pl_zf|~qLMCKPcd0^MPHTVoXAQ{my2D|Hw<~D&ocTI z!Q$dACiIwM@Uw!MBBqP?m@xf*;_GZvZ0cguocfl0>t;wZ*lQ9C-inB0AuFs2k=69D zk)zp)F5d-Tn2ONj7}bP!qGyfiCwi82NJG>hQEfw;E$Z2bo@H4j8_&ww*sQ^_b6GuV z>GDQbG!oO~chm2%iFa7BZCr*K$qcjX#5;OK@3V5p;vc|PVJjjT3u{IB{p0b2e^oY* z#eXkitY7K(nZG?Amp>v$K<~%UJ7~$Omj1+?_{1z`OevW$H9KSG@`+E95uFRGD9jX~ z3lVIn8x@0%T-t z$<5f>Z7^IgTqEO3Z)RNW?F84~jBHQw0TZo!4FAe;_)fgd#BBwU$Xk4nO)@?>Bq9&< z{>Tm!7I#Pbns~=t3z<*BRN^!p5{Zab;vHl);w%2 zXKVvDTemZ|cr#lF0 z44mRSWc6XKD9svXHnKg7K(MTtjR_>Lx??l&PCY-(jGj@|34Y+YY$m*UnH;_X{vWOhAkweHMqnt?!H^B`X?knD+g!U6NCGs;O zc|O7SmN$i~3ztptxUBBgtXsm3iL#%yDP(QFm9>xgS)1aIA#+V+u1T4zF?8TD)AtyS znCO^BdrKs^nT%{FyKG%3w4cMq`)KqlC@cl|U?b)m?=S5ajTp>{YnFjhLdFw-875*g z>%yR^C@YG&ZhWDK9z&5*_rV=|;f~=n-v@W>+j2*O?;|(R_C#gD-i!msbgSZsh}i=8 zTZ<=X*CeoOcJTe!IgiWgep)(?%ET6>ATW`At;nDxsE>!Gj{ zI=~DJQweGpC{$GS0HIA#1P=o8jXXaWkJt=7W6oQ9``{f0XikC$xcY03p&*)@dcpA3G`urKBUR&ku*Py zz#M+?U`X)8bSfBvDmU^Wsf0DP{~$lAZ2DfNo~$0#EYuV#p|Pj|Z=|8qg3gIFPeW&V zOJ{r<6@C z=U0Zvc$S^eyO>{{r&@_1bV3}XlM&k|a@dzZ(xCv3XO$BsDhbu{V_5fyC9B6Z%W7F- zf{WulaujH$7fB|AUfPgVJHco4%|$YFhPE;i&SZ;}^(P=P0g2fJ&q6|gmEDraq5Bjh zCb7;*VJ0ciNw!!7`)RPBMCP30T2agzHVy`0)U$ThgaMF(p9T{AG>hzj+t)RT%Ef0h z^)&6DGthMmAkN|x>$Z4KR-Yjb7R~h;7HxEfy`X9deh$|VzkrCyai&`8AZDHO12~5n z8T9gfhT-qC7@Z`FzOHGQI;0&jaD~#e5*wCE)xQmEFFT=OX;>SU+V4WBI4m(lb~{sA z9zGA1voP`52Q4(SHq1K<^JMioh-88H`J4PgZbP^8`}qZy$29>Sy3m)yO$h!@i6r@( z?kCvuERJLS1v)$wW>cAoC2?e+M22vtK-E{E@+lbd!u?yBj>83481>-Jm;wdlJiN&Go1SGo}r>Q0^cFB8YBxXXfGvfYFA^8~F7XlEo@ zt%xW)tEszW)gcbRE|7D;G*q1Mi(ozjb1vRb;j`AqvRVZ@hxlGf^C}Fh`i9xKJ?Fu& z!LV_+F5)^<#&rhhm&!nH3mu2CAJ4pbO3<}hU2321HE2ACg_hI228}geBd)4+HLRh& z){@Ye!^zI*c1W7a(BzWot{o_=L&qFYHPXBe9rYF}bbzQ%h;r=TUgKmj?08Y1UT~K6{yI!Rk)aW9y7f(cQMm#&vY@^PIUorbr z>zai`JcX?r_cu86lT+OtR1BqcPHS-X6$<=SKztq$uk~3Ll^ZJc@vQ@##A#>YD%?82 zufvY(z8$TieVxkAD^PMllhqqX`AycRY153M@_y!*3LO}&O>zUmH&gr-j=C=xFajo3OO?kW3u(7>L@#AXQVj=wu zL_R$OBHar?8!xE>Is&3~j)PJll6ewj1?>sE4+U+K9RVpIvh(qvO}xE9n|w*JBAGp) zM?fLacF-V*)*?JH5Ute<>H!gU$`2Jp56vMSJzcY_b+YA$Ad-0=M0&1)=wYF6ULaqS z4Dme>>6Yc|e;}DWi16GO^r2d8X9Efl>Ks4T^48QOvyw3&wWxQ!z?z*UN;++fn zm+jz+dJ*;w6W8! z)6hfu3!tY!^AzbF?oAd;aqkAVmS>83XEDUT`7NS<ERgt!}KLO9TzW!dKpxP(vv_C%f^(r(i+*tAOCD+|rsC=ceB$=bn z;%lKh>MgAQ$=EFE_!*84kW3FG-=)p(BUJ zNsB@s5-)boBm4hY>MMotH&7b!^}CJLX4zSip8OJke1M<+>U{`kFHO(89_*I>4^o0k AI{*Lx literal 3844 zcmbVPPfQ!x8K3bRRHkDRdRZ;(_mmYFxkGsmHm!5KHRrS!H;G!c zBmMlo_r34`_vSs%;*FJ2K{)a3a>jGqJ;zl7My6COZ--sSbNm*^E#=CaitB6^J>PMiLbwFNYuPB4Fb!3< zrDD10w-(F3v+1}QzXD~?55?nA8fl(G6q4tZHspp8=J*~fJ0FsJ-#NUMD;G*mCZF^C zNV(!V)zaMqUa8bla7vEvuuaE59N1jnMesi2gP^ogDHT|C*AEj>l8au(*{S+>rGi_j zN+d3MJ}9sFJqM9)RM^baOsr-xlPi@nmG#?B-uJ?e>sDMT50>n?dznDVcXyo#&EUGF zRH`Kr(nX!?e1U~DD`Z$P zwqy#T8H%k4qIHA|n}Wrph`=qTSrYq*3lUwHxO&t2l*I&1Tn@#4$;IsC8k=L9F}`L* zxo&BUbht@6w!k8~72(T=vRXv1)@^VT94iFQJ)Y%@x@Iey%@%cLe`;%Fw;GBmZMt}m zn>uloW45M=55R0U#YZVM#xAgj9hUwZ^B>}qG!v3(#^=q4$?4?7j;zx02c`3mO4nb+ zr%`yB*DQkql62MN7GE^s?6}Qr330}_CO&&b$as^`^H+o}zDejsBa$@JSL@MOO1H|D z6Y5b>=21Os%`?l8tkt$no>DudDL_w4f|i!J5m!4kMYGzpqiqrM!D|k5O_YyDIZOlz zts0b*sh)jJ5IM;)^C6o?i5@9EsZVRy4WoKbwF$f66Uk(M0o(qpK2?T0ZvQwVu7M)gHxO zs`Zeo=iiubN3+PuH;QyxHfJ(_u8VS4o#di?9DAw{W4ERBX-oPGQ9c3AX>d-a)Lu3P z=xD2k4Tc8wqCruCf{E~FakYmjXX-D4+;f?UaxWQP;?G4{hm}sa>I-(TVJn?Bidyq< z3TtPvW~9_Xco+;k81RsctAhqSaLYe-Y!ggA$SID=O$t_~#?O%=#uO36<%xZplVlU2r6$!WG{Zrpy{YS$Yz#KW+uf|OM|3sX*24aC|f*> z1J*zb$qcixQIj~fMR^XMaXh7iw7+yD(X`-MtVq3~v*#`DOk2jeA6>8VwJ z0i0fN7A>W>#<9WaHbwbzW7Bm9J4hK1Hl8WBiSiYQoP)?!yj=+98m8w_tLy)E9msEk zJpuMoP3i`wlTcc&OW`Vk?zh0DZm!VP#^qz8yhKZD( zcCgpMNP>~nX}hPB!fHCn^Hi%1z<&x z>SGJW2tbbv90EcB@uPrPr&Wv%{*fkrD`NLp=)@jnG|EmrA@g{n>i6-5K4CPC7LzhOfIR!NGBAR4~f3eP|I?Mv0g*#L? z;Txb0NCBirJfi7Y0VV;my$nnMWN!&LU#G8QJO_}>RiL|0U&h!Akj+J)r%qqONPQwZ z7XTd~nQ@?z-w5av0NFeZbOI#bR|mrhX;>lBd2=eMyG)&=5d5o~?uU20(Qu`IB?L?_%8fK?%QXD^w8Jo>!)cSmwfS zZ+%_)=LenN{P648{d2!kyKj?#yzXq}_KFoZkSYkRjn6Rp^N%mEHwra+@I2 z8}M;5@GFtoNTuo*D`k(YhJrxy9~QQSvBVm_O1y;U3^$SJ!|I!Z19`NbV)t7RSM&YyCJ oa#~K!b>>b7?!3ybn|HQ_YyW$=K?<*a#*QEts?*%M=VK851(->uF8}}l diff --git a/erl_src/riak_kv_test_backend.erl b/erl_src/riak_kv_test_backend.erl index 1affe9a..4ddf745 100644 --- a/erl_src/riak_kv_test_backend.erl +++ b/erl_src/riak_kv_test_backend.erl @@ -1,8 +1,8 @@ %% ------------------------------------------------------------------- %% -%% riak_kv_test_backend: storage engine based on ETS tables +%% riak_memory_backend: storage engine using ETS tables %% -%% Copyright (c) 2007-2010 Basho Technologies, Inc. All Rights Reserved. +%% Copyright (c) 2007-2011 Basho Technologies, Inc. All Rights Reserved. %% %% This file is provided to you under the Apache License, %% Version 2.0 (the "License"); you may not use this file @@ -20,154 +20,503 @@ %% %% ------------------------------------------------------------------- -% @doc riak_kv_test_backend is a Riak storage backend using ets that exposes a reset function for efficiently clearing stored data. +%% @doc riak_kv_memory_backend is a Riak storage backend that uses ets +%% tables to store all data in memory. +%% +%% === Configuration Options === +%% +%% The following configuration options are available for the memory backend. +%% The options should be specified in the `memory_backend' section of your +%% app.config file. +%% +%%
    +%%
  • `ttl' - The time in seconds that an object should live before being expired.
  • +%%
  • `max_memory' - The amount of memory in megabytes to limit the backend to.
  • +%%
+%% -module(riak_kv_test_backend). -behavior(riak_kv_backend). --behavior(gen_server). + +%% KV Backend API +-export([api_version/0, + start/2, + stop/1, + get/3, + put/5, + delete/4, + drop/1, + fold_buckets/4, + fold_keys/4, + fold_objects/4, + is_empty/1, + status/1, + callback/3, + reset/0]). + -ifdef(TEST). -include_lib("eunit/include/eunit.hrl"). -endif. --export([start/2,stop/1,get/2,put/3,list/1,list_bucket/2,delete/2, - is_empty/1, drop/1, fold/3, callback/3, reset/0]). --export([init/1, handle_call/3, handle_cast/2, handle_info/2, - terminate/2, code_change/3]). +-define(API_VERSION, 1). +-define(CAPABILITIES, [async_fold]). + +-record(state, {data_ref :: integer() | atom(), + time_ref :: integer() | atom(), + max_memory :: undefined | integer(), + used_memory=0 :: integer(), + ttl :: integer()}). -% @type state() = term(). --record(state, {t}). +-type state() :: #state{}. +-type config() :: []. -% @spec start(Partition :: integer(), Config :: proplist()) -> -% {ok, state()} | {{error, Reason :: term()}, state()} -start(Partition, _Config) -> - gen_server:start_link(?MODULE, [Partition], []). +%% =================================================================== +%% Public API +%% =================================================================== -% @spec reset() -> ok +%% TestServer reset + +-spec reset() -> ok | {error, timeout}. reset() -> - Pids = lists:filter(fun(Item) -> - lists:prefix("test_backend", atom_to_list(Item)) - end, registered()), - [gen_server:call(Pid, reset) || Pid <- Pids], + {ok, Ring} = riak_core_ring_manager:get_my_ring(), + [ ets:delete_all_objects(list_to_atom("kv" ++ integer_to_list(P))) || + P <- riak_core_ring:my_indices(Ring) ], ok. -%% @private -init([Partition]) -> - PName = list_to_atom("test_backend" ++ integer_to_list(Partition)), - register(PName, self()), - {ok, #state{t=ets:new(list_to_atom(integer_to_list(Partition)),[])}}. +%% KV Backend API -%% @private -handle_cast(_, State) -> {noreply, State}. +%% @doc Return the major version of the +%% current API and a capabilities list. +-spec api_version() -> {integer(), [atom()]}. +api_version() -> + {?API_VERSION, ?CAPABILITIES}. -%% @private -handle_call(stop,_From,State) -> {reply, srv_stop(State), State}; -handle_call({get,BKey},_From,State) -> {reply, srv_get(State,BKey), State}; -handle_call({put,BKey,Val},_From,State) -> - {reply, srv_put(State,BKey,Val),State}; -handle_call({delete,BKey},_From,State) -> {reply, srv_delete(State,BKey),State}; -handle_call(list,_From,State) -> {reply, srv_list(State), State}; -handle_call({list_bucket,Bucket},_From,State) -> - {reply, srv_list_bucket(State, Bucket), State}; -handle_call(is_empty, _From, State) -> - {reply, ets:info(State#state.t, size) =:= 0, State}; -handle_call(drop, _From, State) -> - ets:delete(State#state.t), - {reply, ok, State}; -handle_call({fold, Fun0, Acc}, _From, State) -> - Fun = fun({{B,K}, V}, AccIn) -> Fun0({B,K}, V, AccIn) end, - Reply = ets:foldl(Fun, Acc, State#state.t), - {reply, Reply, State}; -handle_call(reset, _From, State) -> - ets:delete_all_objects(State#state.t), - {reply, ok, State}. - -% @spec stop(state()) -> ok | {error, Reason :: term()} -stop(SrvRef) -> gen_server:call(SrvRef,stop). -srv_stop(State) -> - true = ets:delete(State#state.t), +%% @doc Start the memory backend +-spec start(integer(), config()) -> {ok, state()}. +start(Partition, Config) -> + TTL = config_value(ttl, Config), + MemoryMB = config_value(max_memory, Config), + case MemoryMB of + undefined -> + MaxMemory = undefined, + TimeRef = undefined; + _ -> + MaxMemory = MemoryMB * 1024 * 1024, + TimeRef = ets:new(list_to_atom(integer_to_list(Partition)), [ordered_set]) + end, + DataRef = ets:new(list_to_atom("kv" ++ integer_to_list(Partition)), [named_table, public]), + {ok, #state{data_ref=DataRef, + max_memory=MaxMemory, + time_ref=TimeRef, + ttl=TTL}}. + +%% @doc Stop the memory backend +-spec stop(state()) -> ok. +stop(#state{data_ref=DataRef, + max_memory=MaxMemory, + time_ref=TimeRef}) -> + catch ets:delete(DataRef), + case MaxMemory of + undefined -> + ok; + _ -> + catch ets:delete(TimeRef) + end, ok. -% get(state(), riak_object:bkey()) -> -% {ok, Val :: binary()} | {error, Reason :: term()} -% key must be 160b -get(SrvRef, BKey) -> gen_server:call(SrvRef,{get,BKey}). -srv_get(State, BKey) -> - case ets:lookup(State#state.t,BKey) of - [] -> {error, notfound}; - [{BKey,Val}] -> {ok, Val}; - Err -> {error, Err} +%% @doc Retrieve an object from the memory backend +-spec get(riak_object:bucket(), riak_object:key(), state()) -> + {ok, any(), state()} | + {ok, not_found, state()} | + {error, term(), state()}. +get(Bucket, Key, State=#state{data_ref=DataRef, + ttl=TTL}) -> + case ets:lookup(DataRef, {Bucket, Key}) of + [] -> {error, not_found, State}; + [{{Bucket, Key}, {{ts, Timestamp}, Val}}] -> + case exceeds_ttl(Timestamp, TTL) of + true -> + delete(Bucket, Key, undefined, State), + {error, not_found, State}; + false -> + {ok, Val, State} + end; + [{{Bucket, Key}, Val}] -> + {ok, Val, State}; + Error -> + {error, Error, State} end. -% put(state(), riak_object:bkey(), Val :: binary()) -> -% ok | {error, Reason :: term()} -% key must be 160b -put(SrvRef, BKey, Val) -> gen_server:call(SrvRef,{put,BKey,Val}). -srv_put(State,BKey,Val) -> - true = ets:insert(State#state.t, {BKey,Val}), - ok. +%% @doc Insert an object into the memory backend. +%% NOTE: The memory backend does not currently +%% support secondary indexing and the _IndexSpecs +%% parameter is ignored. +-type index_spec() :: {add, Index, SecondaryKey} | {remove, Index, SecondaryKey}. +-spec put(riak_object:bucket(), riak_object:key(), [index_spec()], binary(), state()) -> + {ok, state()} | + {error, term(), state()}. +put(Bucket, PrimaryKey, _IndexSpecs, Val, State=#state{data_ref=DataRef, + max_memory=MaxMemory, + time_ref=TimeRef, + ttl=TTL, + used_memory=UsedMemory}) -> + Now = now(), + case TTL of + undefined -> + Val1 = Val; + _ -> + Val1 = {{ts, Now}, Val} + end, + case do_put(Bucket, PrimaryKey, Val1, DataRef) of + {ok, Size} -> + %% If the memory is capped update timestamp table + %% and check if the memory usage is over the cap. + case MaxMemory of + undefined -> + UsedMemory1 = UsedMemory; + _ -> + time_entry(Bucket, PrimaryKey, Now, TimeRef), + Freed = trim_data_table(MaxMemory, + UsedMemory + Size, + DataRef, + TimeRef, + 0), + UsedMemory1 = UsedMemory + Size - Freed + end, + {ok, State#state{used_memory=UsedMemory1}}; + {error, Reason} -> + {error, Reason, State} + end. -% delete(state(), riak_object:bkey()) -> -% ok | {error, Reason :: term()} -% key must be 160b -delete(SrvRef, BKey) -> gen_server:call(SrvRef,{delete,BKey}). -srv_delete(State, BKey) -> - true = ets:delete(State#state.t, BKey), - ok. +%% @doc Delete an object from the memory backend +%% NOTE: The memory backend does not currently +%% support secondary indexing and the _IndexSpecs +%% parameter is ignored. +-spec delete(riak_object:bucket(), riak_object:key(), [index_spec()], state()) -> + {ok, state()}. +delete(Bucket, Key, _IndexSpecs, State=#state{data_ref=DataRef, + time_ref=TimeRef, + used_memory=UsedMemory}) -> + case TimeRef of + undefined -> + UsedMemory1 = UsedMemory; + _ -> + %% Lookup the object so we can delete its + %% entry from the time table and account + %% for the memory used. + [Object] = ets:lookup(DataRef, {Bucket, Key}), + case Object of + {_, {{ts, Timestamp}, _}} -> + ets:delete(TimeRef, Timestamp), + UsedMemory1 = UsedMemory - object_size(Object); + _ -> + UsedMemory1 = UsedMemory + end + end, + ets:delete(DataRef, {Bucket, Key}), + {ok, State#state{used_memory=UsedMemory1}}. + +%% @doc Fold over all the buckets. +-spec fold_buckets(riak_kv_backend:fold_buckets_fun(), + any(), + [], + state()) -> {ok, any()}. +fold_buckets(FoldBucketsFun, Acc, Opts, #state{data_ref=DataRef}) -> + FoldFun = fold_buckets_fun(FoldBucketsFun), + case lists:member(async_fold, Opts) of + true -> + BucketFolder = + fun() -> + {Acc0, _} = ets:foldl(FoldFun, {Acc, sets:new()}, DataRef), + Acc0 + end, + {async, BucketFolder}; + false -> + {Acc0, _} = ets:foldl(FoldFun, {Acc, sets:new()}, DataRef), + {ok, Acc0} + end. + +%% @doc Fold over all the keys for one or all buckets. +-spec fold_keys(riak_kv_backend:fold_keys_fun(), + any(), + [{atom(), term()}], + state()) -> {ok, term()} | {async, fun()}. +fold_keys(FoldKeysFun, Acc, Opts, #state{data_ref=DataRef}) -> + Bucket = proplists:get_value(bucket, Opts), + FoldFun = fold_keys_fun(FoldKeysFun, Bucket), + case lists:member(async_fold, Opts) of + true -> + {async, get_folder(FoldFun, Acc, DataRef)}; + false -> + Acc0 = ets:foldl(FoldFun, Acc, DataRef), + {ok, Acc0} + end. -% list(state()) -> [riak_object:bkey()] -list(SrvRef) -> gen_server:call(SrvRef,list). -srv_list(State) -> - MList = ets:match(State#state.t,{'$1','_'}), - list(MList,[]). -list([],Acc) -> Acc; -list([[K]|Rest],Acc) -> list(Rest,[K|Acc]). - -% list_bucket(term(), Bucket :: riak_object:bucket()) -> [Key :: binary()] -list_bucket(SrvRef, Bucket) -> - gen_server:call(SrvRef,{list_bucket, Bucket}). -srv_list_bucket(State, {filter, Bucket, Fun}) -> - MList = lists:filter(Fun, ets:match(State#state.t,{{Bucket,'$1'},'_'})), - list(MList,[]); -srv_list_bucket(State, Bucket) -> - case Bucket of - '_' -> MatchSpec = {{'$1','_'},'_'}; - _ -> MatchSpec = {{Bucket,'$1'},'_'} +%% @doc Fold over all the objects for one or all buckets. +-spec fold_objects(riak_kv_backend:fold_objects_fun(), + any(), + [{atom(), term()}], + state()) -> {ok, any()} | {async, fun()}. +fold_objects(FoldObjectsFun, Acc, Opts, #state{data_ref=DataRef}) -> + Bucket = proplists:get_value(bucket, Opts), + FoldFun = fold_objects_fun(FoldObjectsFun, Bucket), + case lists:member(async_fold, Opts) of + true -> + {async, get_folder(FoldFun, Acc, DataRef)}; + false -> + Acc0 = ets:foldl(FoldFun, Acc, DataRef), + {ok, Acc0} + end. + +%% @doc Delete all objects from this memory backend +-spec drop(state()) -> {ok, state()}. +drop(State=#state{data_ref=DataRef, + time_ref=TimeRef}) -> + ets:delete_all_objects(DataRef), + case TimeRef of + undefined -> + ok; + _ -> + ets:delete_all_objects(TimeRef) end, - MList = ets:match(State#state.t,MatchSpec), - list(MList,[]). + {ok, State}. -is_empty(SrvRef) -> gen_server:call(SrvRef, is_empty). +%% @doc Returns true if this memory backend contains any +%% non-tombstone values; otherwise returns false. +-spec is_empty(state()) -> boolean(). +is_empty(#state{data_ref=DataRef}) -> + ets:info(DataRef, size) =:= 0. -drop(SrvRef) -> gen_server:call(SrvRef, drop). +%% @doc Get the status information for this memory backend +-spec status(state()) -> [{atom(), term()}]. +status(#state{data_ref=DataRef, + time_ref=TimeRef}) -> + DataStatus = ets:info(DataRef), + case TimeRef of + undefined -> + [{data_table_status, DataStatus}]; + _ -> + TimeStatus = ets:info(TimeRef), + [{data_table_status, DataStatus}, + {time_table_status, TimeStatus}] + end. -fold(SrvRef, Fun, Acc0) -> gen_server:call(SrvRef, {fold, Fun, Acc0}, infinity). +%% @doc Register an asynchronous callback +-spec callback(reference(), any(), state()) -> {ok, state()}. +callback(_Ref, _Msg, State) -> + {ok, State}. -%% Ignore callbacks for other backends so multi backend works -callback(_State, _Ref, _Msg) -> - ok. +%% =================================================================== +%% Internal functions +%% =================================================================== + +%% @TODO Some of these implementations may be suboptimal. +%% Need to do some measuring and testing to refine the +%% implementations. %% @private -handle_info(_Msg, State) -> {noreply, State}. +%% Return a function to fold over the buckets on this backend +fold_buckets_fun(FoldBucketsFun) -> + fun({{Bucket, _}, _}, {Acc, BucketSet}) -> + case sets:is_element(Bucket, BucketSet) of + true -> + {Acc, BucketSet}; + false -> + {FoldBucketsFun(Bucket, Acc), + sets:add_element(Bucket, BucketSet)} + end + end. %% @private -terminate(_Reason, _State) -> ok. +%% Return a function to fold over keys on this backend +fold_keys_fun(FoldKeysFun, undefined) -> + fun({{Bucket, Key}, _}, Acc) -> + FoldKeysFun(Bucket, Key, Acc) + end; +fold_keys_fun(FoldKeysFun, Bucket) -> + fun({{B, Key}, _}, Acc) -> + case B =:= Bucket of + true -> + FoldKeysFun(Bucket, Key, Acc); + false -> + Acc + end + end. %% @private -code_change(_OldVsn, State, _Extra) -> {ok, State}. +%% Return a function to fold over keys on this backend +fold_objects_fun(FoldObjectsFun, undefined) -> + fun({{Bucket, Key}, Value}, Acc) -> + FoldObjectsFun(Bucket, Key, Value, Acc) + end; +fold_objects_fun(FoldObjectsFun, Bucket) -> + fun({{B, Key}, Value}, Acc) -> + case B =:= Bucket of + true -> + FoldObjectsFun(Bucket, Key, Value, Acc); + false -> + Acc + end + end. + +%% @private +get_folder(FoldFun, Acc, DataRef) -> + fun() -> + ets:foldl(FoldFun, Acc, DataRef) + end. + +%% @private +do_put(Bucket, Key, Val, Ref) -> + Object = {{Bucket, Key}, Val}, + true = ets:insert(Ref, Object), + {ok, object_size(Object)}. + +%% @private +config_value(Key, Config) -> + config_value(Key, Config, undefined). + +%% @private +config_value(Key, Config, Default) -> + case proplists:get_value(Key, Config) of + undefined -> + app_helper:get_env(memory_backend, Key, Default); + Value -> + Value + end. + +%% Check if this timestamp is past the ttl setting. +exceeds_ttl(Timestamp, TTL) -> + Diff = (timer:now_diff(now(), Timestamp) / 1000 / 1000), + Diff > TTL. + +%% @private +time_entry(Bucket, Key, Now, TimeRef) -> + ets:insert(TimeRef, {Now, {Bucket, Key}}). + +%% @private +%% @doc Dump some entries if the max memory size has +%% been breached. +trim_data_table(MaxMemory, UsedMemory, _, _, Freed) when + (UsedMemory - Freed) =< MaxMemory -> + Freed; +trim_data_table(MaxMemory, UsedMemory, DataRef, TimeRef, Freed) -> + %% Delete the oldest object + OldestSize = delete_oldest(DataRef, TimeRef), + trim_data_table(MaxMemory, + UsedMemory, + DataRef, + TimeRef, + Freed + OldestSize). + +%% @private +delete_oldest(DataRef, TimeRef) -> + OldestTime = ets:first(TimeRef), + case OldestTime of + '$end_of_table' -> + 0; + _ -> + OldestKey = ets:lookup_element(TimeRef, OldestTime, 2), + ets:delete(TimeRef, OldestTime), + case ets:lookup(DataRef, OldestKey) of + [] -> + delete_oldest(DataRef, TimeRef); + [Object] -> + ets:delete(DataRef, OldestKey), + object_size(Object) + end + end. + +%% @private +object_size(Object) -> + case Object of + {{Bucket, Key}, {{ts, _}, Val}} -> + ok; + {{Bucket, Key}, Val} -> + ok + end, + size(Bucket) + size(Key) + size(Val). + +%% =================================================================== +%% EUnit tests +%% =================================================================== -%% -%% Test -%% -ifdef(TEST). -% @private -simple_test() -> +simple_test_() -> riak_kv_backend:standard_test(?MODULE, []). --ifdef(EQC). +ttl_test_() -> + Config = [{ttl, 15}], + {ok, State} = start(42, Config), + + Bucket = <<"Bucket">>, + Key = <<"Key">>, + Value = <<"Value">>, + + [ + %% Put an object + ?_assertEqual({ok, State}, put(Bucket, Key, [], Value, State)), + %% Wait 1 second to access it + ?_assertEqual(ok, timer:sleep(1000)), + ?_assertEqual({ok, Value, State}, get(Bucket, Key, State)), + %% Wait 3 seconds and access it again + ?_assertEqual(ok, timer:sleep(3000)), + ?_assertEqual({ok, Value, State}, get(Bucket, Key, State)), + %% Wait 15 seconds and it should expire + {timeout, 30000, ?_assertEqual(ok, timer:sleep(15000))}, + %% This time it should be gone + ?_assertEqual({error, not_found, State}, get(Bucket, Key, State)) + ]. + %% @private -eqc_test() -> - ?assertEqual(true, backend_eqc:test(?MODULE, true)). +max_memory_test_() -> + %% Set max size to 1.5kb + Config = [{max_memory, 1.5 * (1 / 1024)}], + {ok, State} = start(42, Config), + + Bucket = <<"Bucket">>, + Key1 = <<"Key1">>, + Value1 = list_to_binary(string:copies("1", 1024)), + Key2 = <<"Key2">>, + Value2 = list_to_binary(string:copies("2", 1024)), + + %% Write Key1 to the datastore + {ok, State1} = put(Bucket, Key1, [], Value1, State), + timer:sleep(timer:seconds(1)), + %% Write Key2 to the datastore + {ok, State2} = put(Bucket, Key2, [], Value2, State1), + + [ + %% Key1 should be kicked out + ?_assertEqual({error, not_found, State2}, get(Bucket, Key1, State2)), + %% Key2 should still be present + ?_assertEqual({ok, Value2, State2}, get(Bucket, Key2, State2)) + ]. + +-ifdef(EQC). + +eqc_test_() -> + {spawn, + [{inorder, + [{setup, + fun setup/0, + fun cleanup/1, + [ + {timeout, 60000, + [?_assertEqual(true, + backend_eqc:test(?MODULE, true))]} + ]}]}]}. + +setup() -> + application:load(sasl), + application:set_env(sasl, sasl_error_logger, {file, "riak_kv_memory_backend_eqc_sasl.log"}), + error_logger:tty(false), + error_logger:logfile({open, "riak_kv_memory_backend_eqc.log"}), + ok. + +cleanup(_) -> + ok. -endif. % EQC + -endif. % TEST diff --git a/src/test_server.coffee b/src/test_server.coffee index e3c31c8..b3509db 100644 --- a/src/test_server.coffee +++ b/src/test_server.coffee @@ -14,15 +14,17 @@ class TestServer extends EventEmitter @defaults = appConfig: riak_core: - web_ip: "127.0.0.1" - web_port: 9000 + http: + "127.0.0.1": {atom: 9000} handoff_port: 9001 ring_creation_size: 64 riak_kv: storage_backend: {atom: "riak_kv_test_backend"} pb_ip: "127.0.0.1" pb_port: 9002 - js_vm_count: 8 + map_js_vm_count: 8 + reduce_js_vm_count: 6 + hook_js_vm_count: 2 js_max_vm_mem: 8 js_thread_stack: 16 riak_kv_stat: true @@ -181,6 +183,8 @@ class TestServer extends EventEmitter printable = @toErlangConfig(value, depth+1) else printable = value.toString() + if !key.match(/^[a-z][a-zA-Z0-9@_]*$/) + key = "\"#{key}\"" "{#{key}, #{printable}}" values = values.join(",\n#{padding}")