44import java .io .EOFException ;
55import java .io .IOException ;
66import java .io .InputStream ;
7- import java .io .InputStreamReader ;
8- import java .util .Iterator ;
9- import java .util .NoSuchElementException ;
107
118import javax .ws .rs .core .MediaType ;
129
1310import org .slf4j .Logger ;
1411import org .slf4j .LoggerFactory ;
1512
13+ import com .cloudata .clients .StreamingRecordsetBase ;
14+ import com .cloudata .util .ByteStringMessageBodyWriter ;
1615import com .cloudata .util .Hex ;
17- import com .google .common .base .Throwables ;
1816import com .google .common .io .ByteStreams ;
19- import com .google .gson .JsonElement ;
20- import com .google .gson .JsonObject ;
21- import com .google .gson .JsonParser ;
2217import com .google .protobuf .ByteString ;
2318import com .sun .jersey .api .client .Client ;
2419import com .sun .jersey .api .client .ClientResponse ;
@@ -40,7 +35,6 @@ public KeyValueClient(String url) {
4035 private static Client buildClient () {
4136 ClientConfig config = new DefaultClientConfig ();
4237 config .getClasses ().add (ByteStringMessageBodyWriter .class );
43- config .getClasses ().add (GsonObjectMessageBodyHandler .class );
4438 Client client = Client .create (config );
4539 client .setFollowRedirects (true );
4640 return client ;
@@ -65,25 +59,6 @@ public void put(long storeId, ByteString key, ByteString value) throws Exception
6559 }
6660 }
6761
68- public void put (long storeId , ByteString key , JsonObject value ) throws Exception {
69- ClientResponse response = CLIENT .resource (url ).path (toUrlPath (storeId , key ))
70- .entity (value , MediaType .APPLICATION_JSON ).post (ClientResponse .class );
71-
72- try {
73- int status = response .getStatus ();
74-
75- switch (status ) {
76- case 200 :
77- break ;
78-
79- default :
80- throw new IllegalStateException ("Unexpected status: " + status );
81- }
82- } finally {
83- response .close ();
84- }
85- }
86-
8762 public KeyValueEntry increment (long storeId , ByteString key ) throws Exception {
8863 ClientResponse response = CLIENT .resource (url ).path (toUrlPath (storeId , key )).queryParam ("action" , "increment" )
8964 .post (ClientResponse .class );
@@ -132,30 +107,6 @@ public String toString() {
132107
133108 }
134109
135- public static class KeyValueJsonEntry {
136- private final ByteString key ;
137- private final JsonElement value ;
138-
139- public KeyValueJsonEntry (ByteString key , JsonElement value ) {
140- this .key = key ;
141- this .value = value ;
142- }
143-
144- public ByteString getKey () {
145- return key ;
146- }
147-
148- public JsonElement getValue () {
149- return value ;
150- }
151-
152- @ Override
153- public String toString () {
154- return "JsonElement [key=" + Hex .forDebug (key ) + ", value=" + value + "]" ;
155- }
156-
157- }
158-
159110 public KeyValueEntry read (long storeId , ByteString key ) throws IOException {
160111 ClientResponse response = CLIENT .resource (url ).path (toUrlPath (storeId , key )).get (ClientResponse .class );
161112
@@ -182,33 +133,6 @@ public KeyValueEntry read(long storeId, ByteString key) throws IOException {
182133 }
183134 }
184135
185- public KeyValueJsonEntry readJson (long storeId , ByteString key ) throws IOException {
186- ClientResponse response = CLIENT .resource (url ).path (toUrlPath (storeId , key ))
187- .accept (MediaType .APPLICATION_JSON_TYPE ).get (ClientResponse .class );
188-
189- try {
190- int status = response .getStatus ();
191-
192- switch (status ) {
193- case 200 :
194- break ;
195-
196- case 404 :
197- return null ;
198-
199- default :
200- throw new IllegalStateException ("Unexpected status: " + status );
201- }
202-
203- InputStream is = response .getEntityInputStream ();
204- JsonElement value = new JsonParser ().parse (new InputStreamReader (is ));
205-
206- return new KeyValueJsonEntry (key , value );
207- } finally {
208- response .close ();
209- }
210- }
211-
212136 public KeyValueRecordset query (long storeId ) throws IOException {
213137 ClientResponse response = CLIENT .resource (url ).path (toUrlPath (storeId )).get (ClientResponse .class );
214138
@@ -233,107 +157,7 @@ public KeyValueRecordset query(long storeId) throws IOException {
233157 }
234158 }
235159
236- public KeyValueJsonRecordset queryJson (long storeId ) throws IOException {
237- ClientResponse response = CLIENT .resource (url ).path (toUrlPath (storeId )).accept (MediaType .APPLICATION_JSON_TYPE )
238- .get (ClientResponse .class );
239-
240- try {
241- int status = response .getStatus ();
242-
243- switch (status ) {
244- case 200 :
245- break ;
246-
247- default :
248- throw new IllegalStateException ("Unexpected status: " + status );
249- }
250-
251- KeyValueJsonRecordset records = new KeyValueJsonRecordset (response );
252- response = null ;
253- return records ;
254- } finally {
255- if (response != null ) {
256- response .close ();
257- }
258- }
259- }
260-
261- static abstract class KeyValueRecordsetBase <V > implements AutoCloseable , Iterable <V > {
262- final ClientResponse response ;
263-
264- boolean read ;
265-
266- public KeyValueRecordsetBase (ClientResponse response ) {
267- this .response = response ;
268- }
269-
270- @ Override
271- public void close () {
272- response .close ();
273- }
274-
275- @ Override
276- public Iterator <V > iterator () {
277- if (read ) {
278- throw new IllegalStateException ();
279- }
280- read = true ;
281- InputStream is = response .getEntityInputStream ();
282-
283- final DataInputStream dis = new DataInputStream (is );
284-
285- return new Iterator <V >() {
286- V next ;
287- boolean done ;
288-
289- @ Override
290- public void remove () {
291- throw new UnsupportedOperationException ();
292- }
293-
294- @ Override
295- public V next () {
296- ensureHaveNext ();
297-
298- if (next != null ) {
299- V ret = next ;
300- next = null ;
301- return ret ;
302- } else {
303- throw new NoSuchElementException ();
304- }
305- }
306-
307- @ Override
308- public boolean hasNext () {
309- ensureHaveNext ();
310-
311- return next != null ;
312- }
313-
314- private void ensureHaveNext () {
315- if (next == null ) {
316- if (!done ) {
317- try {
318- next = read (dis );
319- } catch (IOException e ) {
320- throw Throwables .propagate (e );
321- }
322- if (next == null ) {
323- done = true ;
324- }
325- }
326- }
327- }
328-
329- };
330-
331- }
332-
333- protected abstract V read (DataInputStream dis ) throws IOException ;
334- }
335-
336- static class KeyValueRecordset extends KeyValueRecordsetBase <KeyValueEntry > {
160+ static class KeyValueRecordset extends StreamingRecordsetBase <KeyValueEntry > {
337161 public KeyValueRecordset (ClientResponse response ) {
338162 super (response );
339163 }
@@ -360,30 +184,6 @@ protected KeyValueEntry read(DataInputStream dis) throws IOException {
360184 }
361185 }
362186
363- static class KeyValueJsonRecordset extends KeyValueRecordsetBase <KeyValueJsonEntry > {
364- public KeyValueJsonRecordset (ClientResponse response ) {
365- super (response );
366- }
367-
368- @ Override
369- protected KeyValueJsonEntry read (DataInputStream dis ) throws IOException {
370- int keyLength = dis .readInt ();
371- if (keyLength == -1 ) {
372- return null ;
373- }
374-
375- ByteString key = ByteString .readFrom (ByteStreams .limit (dis , keyLength ));
376- if (key .size () != keyLength ) {
377- throw new EOFException ();
378- }
379-
380- int valueLength = dis .readInt ();
381- JsonElement object = new JsonParser ().parse (new InputStreamReader (ByteStreams .limit (dis , valueLength )));
382-
383- return new KeyValueJsonEntry (key , object );
384- }
385- }
386-
387187 private String toUrlPath (long storeId , ByteString key ) {
388188 return Long .toString (storeId ) + "/" + Hex .toHex (key );
389189 }
0 commit comments