Permalink
Browse files

Merge pull request #1 from russelldb/zd820_fetchMeta

Fix fetchMeta to handle siblings zd://820
  • Loading branch information...
2 parents 4788e02 + c86fc52 commit ee2b93c21237a7ab0c26c9e83de797957f71cd85 @eriksoe eriksoe committed Sep 14, 2011
View
88 src/main/java/com/basho/riak/client/RiakClient.java
@@ -243,34 +243,50 @@ public StoreResponse store(RiakObject object) {
return store(object, null);
}
- /**
- * Fetch metadata (e.g. vclock, last modified, vtag) for the
- * {@link RiakObject} stored at <code>bucket</code> and <code>key</code>.
- *
- * @param bucket
- * The bucket containing the {@link RiakObject} to fetch.
- * @param key
- * The key of the {@link RiakObject} to fetch.
- * @param meta
- * Extra metadata to attach to the request such as an r- value
- * for the request, HTTP headers, and other query parameters. See
- * {@link RequestMeta#readParams(int)}.
- *
- * @return {@link FetchResponse} containing HTTP response information and a
- * {@link RiakObject} containing only metadata and no value.
- *
- * @throws RiakIORuntimeException
- * If an error occurs during communication with the Riak server.
- * @throws RiakResponseRuntimeException
- * If the Riak server returns a malformed response.
- */
- public FetchResponse fetchMeta(String bucket, String key, RequestMeta meta) {
- try {
- return getFetchResponse(helper.fetchMeta(bucket, key, meta));
- } catch (RiakResponseRuntimeException e) {
- return new FetchResponse(helper.toss(e), this);
- }
- }
+ /**
+ * Fetch metadata (e.g. vclock, last modified, vtag) for the
+ * {@link RiakObject} stored at <code>bucket</code> and <code>key</code>.
+ * <p>
+ * NOTE: if there a sibling values (HTTP status code 300), then a full
+ * {@link RiakClient#fetch(String, String, RequestMeta)} is executed as well
+ * to get all meta values. Examine the {@link FetchResponse#hasSiblings()}
+ * value to determine if you need to perform conflict resolution.
+ * </p>
+ *
+ * @param bucket
+ * The bucket containing the {@link RiakObject} to fetch.
+ * @param key
+ * The key of the {@link RiakObject} to fetch.
+ * @param meta
+ * Extra metadata to attach to the request such as an r- value
+ * for the request, HTTP headers, and other query parameters. See
+ * {@link RequestMeta#readParams(int)}.
+ *
+ * @return {@link FetchResponse} containing HTTP response information and a
+ * {@link RiakObject} containing only metadata and no value.
+ *
+ * @throws RiakIORuntimeException
+ * If an error occurs during communication with the Riak server.
+ * @throws RiakResponseRuntimeException
+ * If the Riak server returns a malformed response.
+ */
+ public FetchResponse fetchMeta(String bucket, String key, RequestMeta meta) {
+ try {
+ if (meta == null) {
+ meta = new RequestMeta();
+ }
+ setAcceptHeader(meta);
+ HttpResponse resp = helper.fetchMeta(bucket, key, meta);
+
+ if (resp.getStatusCode() != 300) {
+ return getFetchResponse(resp);
+ } else {
+ return fetch(bucket, key, meta);
+ }
+ } catch (RiakResponseRuntimeException e) {
+ return new FetchResponse(helper.toss(e), this);
+ }
+ }
public FetchResponse fetchMeta(String bucket, String key) {
return fetchMeta(bucket, key, null);
@@ -338,12 +354,7 @@ FetchResponse fetch(String bucket, String key, RequestMeta meta, boolean streamR
meta = new RequestMeta();
}
- String accept = meta.getHeader(Constants.HDR_ACCEPT);
- if (accept == null) {
- meta.setHeader(Constants.HDR_ACCEPT, Constants.CTYPE_ANY + ", " + Constants.CTYPE_MULTIPART_MIXED);
- } else {
- meta.setHeader(Constants.HDR_ACCEPT, accept + ", " + Constants.CTYPE_MULTIPART_MIXED);
- }
+ setAcceptHeader(meta);
HttpResponse r = helper.fetch(bucket, key, meta, streamResponse);
@@ -355,6 +366,15 @@ FetchResponse fetch(String bucket, String key, RequestMeta meta, boolean streamR
}
+ private void setAcceptHeader(RequestMeta meta) {
+ String accept = meta.getHeader(Constants.HDR_ACCEPT);
+ if (accept == null) {
+ meta.setHeader(Constants.HDR_ACCEPT, Constants.CTYPE_ANY + ", " + Constants.CTYPE_MULTIPART_MIXED);
+ } else {
+ meta.setHeader(Constants.HDR_ACCEPT, accept + ", " + Constants.CTYPE_MULTIPART_MIXED);
+ }
+ }
+
/**
* Fetch and process the object stored at <code>bucket</code> and
* <code>key</code> as a stream.
View
3 src/main/java/com/basho/riak/client/response/FetchResponse.java
@@ -71,8 +71,9 @@ public FetchResponse(HttpResponse r, RiakClient riak) throws RiakResponseRuntime
if (r.getStatusCode() == 300) {
String contentType = headers.get(Constants.HDR_CONTENT_TYPE);
- if (contentType == null || !(contentType.trim().toLowerCase().startsWith(Constants.CTYPE_MULTIPART_MIXED)))
+ if (contentType == null || !(contentType.trim().toLowerCase().startsWith(Constants.CTYPE_MULTIPART_MIXED))) {
throw new RiakResponseRuntimeException(r, "multipart/mixed content expected when object has siblings");
+ }
if (r.isStreamed()) {
try {
View
21 src/test/java/com/basho/riak/client/TestRiakClient.java
@@ -79,6 +79,7 @@
impl.store(object, meta);
verify(mockHelper).store(object, meta);
+ when(mockHelper.fetchMeta(bucket, key, meta)).thenReturn(mockHttpResponse);
impl.fetchMeta(bucket, key, meta);
verify(mockHelper).fetchMeta(bucket, key, meta);
@@ -116,6 +117,7 @@
impl.store(object);
verify(impl).store(object, null);
+ when(mockHelper.fetchMeta(eq(bucket), eq(key), any(RequestMeta.class))).thenReturn(mockHttpResponse);
impl.fetchMeta(bucket, key);
verify(impl).fetchMeta(bucket, key, null);
@@ -163,6 +165,7 @@
verify(mockHelper).toss(any(RiakResponseRuntimeException.class));
reset(mockHelper);
+ when(mockHelper.fetchMeta(bucket, key, meta)).thenReturn(mockHttpResponse);
impl.fetchMeta(bucket, key, meta);
verify(mockHelper).toss(any(RiakResponseRuntimeException.class));
reset(mockHelper);
@@ -266,4 +269,22 @@ public HttpResponse answer(InvocationOnMock invocation) throws Throwable {
assertSame(impl, r.getSteps().get(0).get(0).getRiakClient());
}
+
+ @Test public void fetch_meta_issues_a_fetch_if_siblings_present() {
+ when(mockHttpResponse.getStatusCode()).thenReturn(300);
+ when(mockHelper.fetchMeta(bucket, key, meta)).thenReturn(
+ mockHttpResponse);
+ impl.fetchMeta(bucket, key, meta);
+ verify(mockHelper).fetchMeta(bucket, key, meta);
+ verify(mockHelper).fetch(bucket, key, meta, false);
+ }
+
+ @Test public void fetch_meta_does_not_issue_a_fetch_if_siblings_not_present() {
+ when(mockHttpResponse.getStatusCode()).thenReturn(200);
+ when(mockHelper.fetchMeta(bucket, key, meta)).thenReturn(
+ mockHttpResponse);
+ impl.fetchMeta(bucket, key, meta);
+ verify(mockHelper).fetchMeta(bucket, key, meta);
+ verify(mockHelper, never()).fetch(bucket, key, meta, false);
+ }
}
View
40 src/test/java/com/basho/riak/client/itest/ITestBasic.java
@@ -16,15 +16,19 @@
import static com.basho.riak.client.itest.Utils.*;
import static org.junit.Assert.*;
+import java.util.UUID;
+
import org.junit.Test;
import com.basho.riak.client.RiakBucketInfo;
import com.basho.riak.client.RiakClient;
import com.basho.riak.client.RiakLink;
import com.basho.riak.client.RiakObject;
+import com.basho.riak.client.request.RequestMeta;
import com.basho.riak.client.response.BucketResponse;
import com.basho.riak.client.response.FetchResponse;
import com.basho.riak.client.response.StoreResponse;
+import com.basho.riak.client.util.Constants;
/**
* Basic exercises such as store, fetch, and modify objects for the Riak client.
@@ -130,4 +134,40 @@
bucketInfo = bucketresp.getBucketInfo();
assertEquals(nval + 1, bucketInfo.getNVal().intValue());
}
+
+ @Test public void fetch_meta_with_siblings() {
+ final RiakClient c1 = new RiakClient(RIAK_URL);
+ final RiakClient c2 = new RiakClient(RIAK_URL);
+ final String bucket = UUID.randomUUID().toString();
+ final String key = UUID.randomUUID().toString();
+
+ RiakBucketInfo bucketInfo = new RiakBucketInfo();
+ bucketInfo.setAllowMult(true);
+ assertSuccess(c1.setBucketSchema(bucket, bucketInfo));
+
+ c1.store(new RiakObject(c1, bucket, key, "v".getBytes()));
+
+ final FetchResponse fm1 = c1.fetchMeta(bucket, key);
+
+ assertTrue(fm1.hasObject());
+ assertNull(fm1.getObject().getValue());
+ assertFalse(fm1.hasSiblings());
+
+ final FetchResponse fr = c1.fetch(bucket, key);
+
+ RiakObject ro = fr.getObject();
+
+ RiakObject ro2 = new RiakObject(c2, bucket, key);
+ ro2.copyData(ro);
+ ro2.setValue("v2".getBytes());
+
+ c1.store(ro);
+ c2.store(ro2);
+
+ FetchResponse fm2 = c1.fetchMeta(bucket, key);
+ // since there are siblings fetchMeta will do a full fetch to get them
+ assertTrue(fm2.hasObject());
+ assertNotNull(fm2.getObject().getValue());
+ assertTrue(fm2.hasSiblings());
+ }
}

0 comments on commit ee2b93c

Please sign in to comment.