Skip to content
Browse files

AvroStorBusClientHttp now can negotiate Schemas (lazily) after being …

…constructed
  • Loading branch information...
1 parent d82f95e commit c0d6e5e6baf21463785bdfec285c5178734753ce @tingchen tingchen committed Apr 18, 2012
Showing with 74 additions and 14 deletions.
  1. +74 −14 krati-avro/src/main/java/krati/store/bus/client/AvroStoreBusClientHttp.java
View
88 krati-avro/src/main/java/krati/store/bus/client/AvroStoreBusClientHttp.java
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2010-2012 LinkedIn, Inc
+ * Copyright (c) 2011 LinkedIn, Inc
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
@@ -18,6 +18,7 @@
import java.net.URL;
+import krati.io.SerializationException;
import krati.io.Serializer;
import krati.store.avro.AvroGenericRecordSerializer;
import krati.store.avro.protocol.StoreKeys;
@@ -32,7 +33,6 @@
* @since 10/04, 2011
*/
public class AvroStoreBusClientHttp<K> extends StoreBusClientHttp<K, GenericRecord> {
- protected Schema _schema;
public AvroStoreBusClientHttp(URL serverURL, String source, Serializer<K> keySerializer) {
super(serverURL, source, keySerializer, null);
@@ -41,26 +41,86 @@ public AvroStoreBusClientHttp(URL serverURL, String source, Serializer<K> keySer
@Override
protected boolean init() {
boolean ret = super.init();
-
- try {
- String prop = getProperty(StoreKeys.KRATI_STORE_VALUE_SCHEMA);
- _schema = Schema.parse(prop);
- _valueSerializer = new AvroGenericRecordSerializer(_schema);
- } catch(Exception e) {
- ret = false;
- }
-
+ _valueSerializer = new LazyAvroGenericRecordSerializer();
return ret;
}
+
/**
* @return the Avro schema of a remote store.
*/
public final Schema getSchema() {
- if(_schema == null) {
- init();
+ return ((LazyAvroGenericRecordSerializer)_valueSerializer).getSchema();
+ }
+
+
+
+ /**
+ * A {@link Serializer} implementation that will lazily negotiate an avro {@link Schema} with
+ * the remote krati store.
+ *
+ * It will keep negotiating until it succeeds once. After that, it will ALWAYS use the same
+ * {@link Schema}.
+ * All {@link #serialize(GenericRecord)} and {@link #deserialize(byte[])} calls will throw an
+ * {@link IllegalStateException} if a {@link Schema} has not been negotiated yet.
+ *
+ * @author dbuthay
+ *
+ */
+ private class LazyAvroGenericRecordSerializer implements Serializer<GenericRecord> {
+ private AvroGenericRecordSerializer _delegate = null;
+ private Schema _schema = null;
+
+ @Override
+ public GenericRecord deserialize(byte[] bytes) throws SerializationException {
+ checkSchema();
+ return _delegate.deserialize(bytes);
+
+ }
+ @Override
+ public byte[] serialize(GenericRecord record) throws SerializationException {
+ checkSchema();
+ return _delegate.serialize(record);
+ }
+
+ /**
+ * Returns the {@link Schema} negotiated with the remote server or {@code null}
+ * if negotiation never succeeded.
+ *
+ * Reasons for negotiation not succeeding include
+ * <ul>
+ * <li>Network problems</li>
+ * <li>Schema String representation retrieved over the network is not parseable</li>
+ * <ul>
+ *
+ * NOTE: This method will NOT try to negotiate a Schema.
+ * @return the {@link Schema} negotiated with the remote server or {@code null} if negotiation never succeeded.
+ */
+ public Schema getSchema() {
+ return _schema;
+ }
+
+
+ /**
+ * Check if the {@link Schema} has already been negotiated.
+ * If not, try to negotiate and fail if not possible
+ *
+ * @throws IllegalStateException if there was a problem negotiating the Schema, or
+ * if the Schema is invalid.
+ */
+ private void checkSchema() {
+ if (_delegate == null) {
+ // try to create a delegate,
+ // First we need to negotiate a Schema
+ try {
+ String prop = getProperty(StoreKeys.KRATI_STORE_VALUE_SCHEMA);
+ _schema = Schema.parse(prop);
+ _delegate = new AvroGenericRecordSerializer(_schema);
+ } catch (Exception e) {
+ throw new IllegalStateException("while negotiating Schema: " + e.getMessage(), e);
+ }
+ }
}
- return _schema;
}
}

0 comments on commit c0d6e5e

Please sign in to comment.
Something went wrong with that request. Please try again.