Skip to content

Commit

Permalink
Fix code style
Browse files Browse the repository at this point in the history
  • Loading branch information
Kirill380 committed Oct 3, 2016
1 parent afde88a commit c7b5eaa
Show file tree
Hide file tree
Showing 19 changed files with 380 additions and 316 deletions.
Expand Up @@ -13,79 +13,80 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */

package org.kaaproject.kaa.server.flume; package org.kaaproject.kaa.server.flume;


import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSConfigKeys;


public interface ConfigurationConstants { public interface ConfigurationConstants {


public static final String CONFIG_ROOT_HDFS_PATH = "rootHdfsPath"; String CONFIG_ROOT_HDFS_PATH = "rootHdfsPath";
public static final String DEFAULT_ROOT_HDFS_PATH = "hdfs://localhost:8020/logs"; String DEFAULT_ROOT_HDFS_PATH = "hdfs://localhost:8020/logs";


public static final String CONFIG_STATISTICS_INTERVAL = "statisticsInterval"; String CONFIG_STATISTICS_INTERVAL = "statisticsInterval";
public static final long DEFAULT_STATISTICS_INTERVAL = 60; //seconds long DEFAULT_STATISTICS_INTERVAL = 60; //seconds


public static final String CONFIG_HDFS_TXN_EVENT_MAX = "hdfs.txnEventMax"; String CONFIG_HDFS_TXN_EVENT_MAX = "hdfs.txnEventMax";
public static final long DEFAULT_HDFS_TXN_EVENT_MAX = 100; long DEFAULT_HDFS_TXN_EVENT_MAX = 100;


public static final String CONFIG_HDFS_THREAD_POOL_SIZE = "hdfs.threadsPoolSize"; String CONFIG_HDFS_THREAD_POOL_SIZE = "hdfs.threadsPoolSize";
public static final int DEFAULT_HDFS_THREAD_POOL_SIZE = 10; int DEFAULT_HDFS_THREAD_POOL_SIZE = 10;


public static final String CONFIG_HDFS_WRITER_EXPIRATION_INTERVAL = "hdfs.writerExpirationInterval"; String CONFIG_HDFS_WRITER_EXPIRATION_INTERVAL = "hdfs.writerExpirationInterval";
public static final int DEFAULT_HDFS_WRITER_EXPIRATION_INTERVAL = 60 * 60; int DEFAULT_HDFS_WRITER_EXPIRATION_INTERVAL = 60 * 60;


public static final String CONFIG_HDFS_CALL_TIMEOUT = "hdfs.callTimeout"; String CONFIG_HDFS_CALL_TIMEOUT = "hdfs.callTimeout";
public static final long DEFAULT_HDFS_CALL_TIMEOUT = 10000; long DEFAULT_HDFS_CALL_TIMEOUT = 10000;


public static final String CONFIG_HDFS_DEFAULT_BLOCK_SIZE = "hdfs.default.blockSize"; String CONFIG_HDFS_DEFAULT_BLOCK_SIZE = "hdfs.default.blockSize";
public static final long DEFAULT_HDFS_DEFAULT_BLOCK_SIZE = DFSConfigKeys.DFS_BLOCK_SIZE_DEFAULT; long DEFAULT_HDFS_DEFAULT_BLOCK_SIZE = DFSConfigKeys.DFS_BLOCK_SIZE_DEFAULT;


public static final String CONFIG_HDFS_ROLL_TIMER_POOL_SIZE = "hdfs.rollTimerPoolSize"; String CONFIG_HDFS_ROLL_TIMER_POOL_SIZE = "hdfs.rollTimerPoolSize";
public static final int DEFAULT_HDFS_ROLL_TIMER_POOL_SIZE = 1; int DEFAULT_HDFS_ROLL_TIMER_POOL_SIZE = 1;


public static final String CONFIG_HDFS_MAX_OPEN_FILES = "hdfs.maxOpenFiles"; String CONFIG_HDFS_MAX_OPEN_FILES = "hdfs.maxOpenFiles";
public static final int DEFAULT_HDFS_MAX_OPEN_FILES = 5000; int DEFAULT_HDFS_MAX_OPEN_FILES = 5000;


public static final String CONFIG_HDFS_CACHE_CLEANUP_INTERVAL = "hdfs.cacheCleanupInterval"; String CONFIG_HDFS_CACHE_CLEANUP_INTERVAL = "hdfs.cacheCleanupInterval";
public static final int DEFAULT_HDFS_CACHE_CLEANUP_INTERVAL = 10 * 60; int DEFAULT_HDFS_CACHE_CLEANUP_INTERVAL = 10 * 60;


public static final String CONFIG_HDFS_ROLL_INTERVAL = "hdfs.rollInterval"; String CONFIG_HDFS_ROLL_INTERVAL = "hdfs.rollInterval";
public static final long DEFAULT_HDFS_ROLL_INTERVAL = 30; long DEFAULT_HDFS_ROLL_INTERVAL = 30;


public static final String CONFIG_HDFS_ROLL_SIZE = "hdfs.rollSize"; String CONFIG_HDFS_ROLL_SIZE = "hdfs.rollSize";
public static final long DEFAULT_HDFS_ROLL_SIZE = 1024; long DEFAULT_HDFS_ROLL_SIZE = 1024;


public static final String CONFIG_HDFS_ROLL_COUNT = "hdfs.rollCount"; String CONFIG_HDFS_ROLL_COUNT = "hdfs.rollCount";
public static final long DEFAULT_HDFS_ROLL_COUNT = 10; long DEFAULT_HDFS_ROLL_COUNT = 10;


public static final String CONFIG_HDFS_BATCH_SIZE = "hdfs.batchSize"; String CONFIG_HDFS_BATCH_SIZE = "hdfs.batchSize";
public static final long DEFAULT_HDFS_BATCH_SIZE = 1; long DEFAULT_HDFS_BATCH_SIZE = 1;


public static final String CONFIG_HDFS_FILE_PREFIX = "hdfs.filePrefix"; String CONFIG_HDFS_FILE_PREFIX = "hdfs.filePrefix";
public static final String DEFAULT_HDFS_FILE_PREFIX = "data"; String DEFAULT_HDFS_FILE_PREFIX = "data";


public static final String CONFIG_HDFS_KERBEROS_PRINCIPAL = "hdfs.kerberosPrincipal"; String CONFIG_HDFS_KERBEROS_PRINCIPAL = "hdfs.kerberosPrincipal";


public static final String CONFIG_HDFS_KERBEROS_KEYTAB = "hdfs.kerberosKeytab"; String CONFIG_HDFS_KERBEROS_KEYTAB = "hdfs.kerberosKeytab";


public static final String CONFIG_HDFS_PROXY_USER = "hdfs.proxyUser"; String CONFIG_HDFS_PROXY_USER = "hdfs.proxyUser";


public static final String CONFIG_AVRO_EVENT_SERIALIZER_SCHEMA_SOURCE = "avro.schema.source"; String CONFIG_AVRO_EVENT_SERIALIZER_SCHEMA_SOURCE = "avro.schema.source";


public static final String SCHEMA_SOURCE_REST = "rest"; String SCHEMA_SOURCE_REST = "rest";
public static final String SCHEMA_SOURCE_LOCAL = "local"; String SCHEMA_SOURCE_LOCAL = "local";


public static final String DEFAULT_AVRO_EVENT_SERIALIZER_SCHEMA_SOURCE = SCHEMA_SOURCE_REST; String DEFAULT_AVRO_EVENT_SERIALIZER_SCHEMA_SOURCE = SCHEMA_SOURCE_REST;


public static final String CONFIG_KAA_REST_HOST = "kaa.rest.host"; String CONFIG_KAA_REST_HOST = "kaa.rest.host";
public static final String DEFAULT_KAA_REST_HOST = "localhost"; String DEFAULT_KAA_REST_HOST = "localhost";


public static final String CONFIG_KAA_REST_PORT = "kaa.rest.port"; String CONFIG_KAA_REST_PORT = "kaa.rest.port";
public static final int DEFAULT_KAA_REST_PORT = 8080; int DEFAULT_KAA_REST_PORT = 8080;


public static final String CONFIG_KAA_REST_USER = "kaa.rest.user"; String CONFIG_KAA_REST_USER = "kaa.rest.user";
public static final String CONFIG_KAA_REST_PASSWORD = "kaa.rest.password"; String CONFIG_KAA_REST_PASSWORD = "kaa.rest.password";


public static final String CONFIG_AVRO_EVENT_SERIALIZER_SCHEMA_LOCAL_ROOT = "avro.schema.local.root"; String CONFIG_AVRO_EVENT_SERIALIZER_SCHEMA_LOCAL_ROOT = "avro.schema.local.root";


} }
Expand Up @@ -13,12 +13,15 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */

package org.kaaproject.kaa.server.flume.sink.hdfs; package org.kaaproject.kaa.server.flume.sink.hdfs;


import static org.apache.flume.serialization.AvroEventSerializerConfigurationConstants.COMPRESSION_CODEC; import static org.apache.flume.serialization.AvroEventSerializerConfigurationConstants.COMPRESSION_CODEC;
import static org.apache.flume.serialization.AvroEventSerializerConfigurationConstants.DEFAULT_COMPRESSION_CODEC; import static org.apache.flume.serialization.AvroEventSerializerConfigurationConstants.DEFAULT_COMPRESSION_CODEC;
import static org.apache.flume.serialization.AvroEventSerializerConfigurationConstants.DEFAULT_SYNC_INTERVAL_BYTES; import static org.apache.flume.serialization.AvroEventSerializerConfigurationConstants.DEFAULT_SYNC_INTERVAL_BYTES;
import static org.apache.flume.serialization.AvroEventSerializerConfigurationConstants.SYNC_INTERVAL_BYTES; import static org.apache.flume.serialization.AvroEventSerializerConfigurationConstants.SYNC_INTERVAL_BYTES;
import static org.kaaproject.kaa.server.common.log.shared.RecordWrapperSchemaGenerator.RECORD_HEADER_FIELD;
import static org.kaaproject.kaa.server.common.log.shared.RecordWrapperSchemaGenerator.generateRecordWrapperSchema;


import org.apache.avro.AvroRuntimeException; import org.apache.avro.AvroRuntimeException;
import org.apache.avro.Schema; import org.apache.avro.Schema;
Expand Down Expand Up @@ -46,11 +49,11 @@
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;


public class AvroKaaEventSerializer implements EventSerializer, Configurable, public class AvroKaaEventSerializer
EventConstants { implements EventSerializer, Configurable, EventConstants {


private static final Logger LOG = LoggerFactory.getLogger(AvroKaaEventSerializer.class); private static final Logger LOG = LoggerFactory.getLogger(AvroKaaEventSerializer.class);
private static Map<KaaSinkKey, Schema> schemaCache = new HashMap<KaaSinkKey, Schema>(); private static Map<KaaSinkKey, Schema> schemaCache = new HashMap<>();
private final OutputStream out; private final OutputStream out;
private DatumReader<GenericRecord> datumReader; private DatumReader<GenericRecord> datumReader;
private BinaryDecoder binaryDecoder; private BinaryDecoder binaryDecoder;
Expand Down Expand Up @@ -100,23 +103,22 @@ public void write(Event event) throws IOException {
binaryDecoder = DecoderFactory.get().binaryDecoder(kaaRecordEvent.getBody(), binaryDecoder); binaryDecoder = DecoderFactory.get().binaryDecoder(kaaRecordEvent.getBody(), binaryDecoder);
GenericRecord recordData = datumReader.read(null, binaryDecoder); GenericRecord recordData = datumReader.read(null, binaryDecoder);


wrapperRecord.put(RecordWrapperSchemaGenerator.RECORD_HEADER_FIELD, kaaRecordEvent.getRecordHeader()); wrapperRecord.put(RECORD_HEADER_FIELD, kaaRecordEvent.getRecordHeader());
wrapperRecord.put(RecordWrapperSchemaGenerator.RECORD_DATA_FIELD, recordData); wrapperRecord.put(RecordWrapperSchemaGenerator.RECORD_DATA_FIELD, recordData);


dataFileWriter.append(wrapperRecord); dataFileWriter.append(wrapperRecord);
} }


private void initialize(Event event) throws IOException { private void initialize(Event event) throws IOException {
Schema schema = null; Schema schema;
Schema wrapperSchema = null;
KaaSinkKey key = new KaaSinkKey(event.getHeaders()); KaaSinkKey key = new KaaSinkKey(event.getHeaders());
schema = schemaCache.get(key); schema = schemaCache.get(key);
if (schema == null) { if (schema == null) {
try { try {
schema = schemaSource.loadByKey(key); schema = schemaSource.loadByKey(key);
} catch (Exception e) { } catch (Exception ex) {
LOG.error("Unable to load schema by key {}", key); LOG.error("Unable to load schema by key {}", key);
LOG.error("Caused by: ", e); LOG.error("Caused by: ", ex);
throw new FlumeException("Could not find schema for event " throw new FlumeException("Could not find schema for event "
+ event); + event);
} }
Expand All @@ -133,23 +135,23 @@ private void initialize(Event event) throws IOException {
schema = new Schema.Parser().parse(schemaString); schema = new Schema.Parser().parse(schemaString);
} }


datumReader = new GenericDatumReader<GenericRecord>(schema); datumReader = new GenericDatumReader<>(schema);


wrapperSchema = RecordWrapperSchemaGenerator.generateRecordWrapperSchema(schema.toString()); Schema wrapperSchema = generateRecordWrapperSchema(schema.toString());


writer = new GenericDatumWriter<Object>(wrapperSchema); writer = new GenericDatumWriter<>(wrapperSchema);
dataFileWriter = new DataFileWriter<Object>(writer); dataFileWriter = new DataFileWriter<>(writer);


dataFileWriter.setSyncInterval(syncIntervalBytes); dataFileWriter.setSyncInterval(syncIntervalBytes);


try { try {
CodecFactory codecFactory = CodecFactory CodecFactory codecFactory = CodecFactory
.fromString(compressionCodec); .fromString(compressionCodec);
dataFileWriter.setCodec(codecFactory); dataFileWriter.setCodec(codecFactory);
} catch (AvroRuntimeException e) { } catch (AvroRuntimeException ex) {
LOG.warn("Unable to instantiate avro codec with name (" LOG.warn("Unable to instantiate avro codec with name ("
+ compressionCodec + compressionCodec
+ "). Compression disabled. Exception follows.", e); + "). Compression disabled. Exception follows.", ex);
} }


dataFileWriter.create(wrapperSchema, out); dataFileWriter.create(wrapperSchema, out);
Expand Down
Expand Up @@ -13,6 +13,7 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */

package org.kaaproject.kaa.server.flume.sink.hdfs; package org.kaaproject.kaa.server.flume.sink.hdfs;


import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
Expand Down Expand Up @@ -48,7 +49,8 @@ public class AvroSchemaSource implements Configurable, ConfigurationConstants {


public static final String SCHEMA_SOURCE = "flume.avro.schema.source"; public static final String SCHEMA_SOURCE = "flume.avro.schema.source";
private static final String KAA_ADMIN_REST_API_LOG_SCHEMA = "/kaaAdmin/rest/api/logSchema/"; private static final String KAA_ADMIN_REST_API_LOG_SCHEMA = "/kaaAdmin/rest/api/logSchema/";
private static final String KAA_ADMIN_REST_API_CTL_SCHEMA = "/kaaAdmin/rest/api/CTL/getFlatSchemaByCtlSchemaId?id="; private static final String KAA_ADMIN_REST_API_CTL_SCHEMA = "/kaaAdmin/rest/api/CTL/"
+ "getFlatSchemaByCtlSchemaId?id=";
private String schemaSourceType; private String schemaSourceType;
private String kaaRestHost; private String kaaRestHost;
private int kaaRestPort; private int kaaRestPort;
Expand All @@ -62,24 +64,30 @@ public class AvroSchemaSource implements Configurable, ConfigurationConstants {


@Override @Override
public void configure(Context context) { public void configure(Context context) {
schemaSourceType = context.getString(CONFIG_AVRO_EVENT_SERIALIZER_SCHEMA_SOURCE, DEFAULT_AVRO_EVENT_SERIALIZER_SCHEMA_SOURCE); schemaSourceType = context
.getString(CONFIG_AVRO_EVENT_SERIALIZER_SCHEMA_SOURCE,
DEFAULT_AVRO_EVENT_SERIALIZER_SCHEMA_SOURCE);
if (schemaSourceType.equals(SCHEMA_SOURCE_REST)) { if (schemaSourceType.equals(SCHEMA_SOURCE_REST)) {
kaaRestHost = context.getString(CONFIG_KAA_REST_HOST, DEFAULT_KAA_REST_HOST); kaaRestHost = context.getString(CONFIG_KAA_REST_HOST, DEFAULT_KAA_REST_HOST);
kaaRestPort = context.getInteger(CONFIG_KAA_REST_PORT, DEFAULT_KAA_REST_PORT); kaaRestPort = context.getInteger(CONFIG_KAA_REST_PORT, DEFAULT_KAA_REST_PORT);
kaaRestUser = context.getString(CONFIG_KAA_REST_USER); kaaRestUser = context.getString(CONFIG_KAA_REST_USER);
kaaRestPassword = context.getString(CONFIG_KAA_REST_PASSWORD); kaaRestPassword = context.getString(CONFIG_KAA_REST_PASSWORD);


Preconditions.checkArgument(kaaRestUser != null && kaaRestUser.length() > 0, Preconditions.checkArgument(kaaRestUser != null && kaaRestUser.length() > 0,
CONFIG_KAA_REST_USER + " must be specified for " + SCHEMA_SOURCE_REST + " avro schema source"); CONFIG_KAA_REST_USER + " must be specified for "
+ SCHEMA_SOURCE_REST + " avro schema source");

Preconditions.checkArgument(kaaRestPassword != null && kaaRestPassword.length() > 0, Preconditions.checkArgument(kaaRestPassword != null && kaaRestPassword.length() > 0,
CONFIG_KAA_REST_PASSWORD + " must be specified for " + SCHEMA_SOURCE_REST + " avro schema source"); CONFIG_KAA_REST_PASSWORD + " must be specified for "
+ SCHEMA_SOURCE_REST + " avro schema source");


initHttpRestClient(); initHttpRestClient();
} else { } else {
schemaLocalRoot = context.getString(CONFIG_AVRO_EVENT_SERIALIZER_SCHEMA_LOCAL_ROOT); schemaLocalRoot = context.getString(CONFIG_AVRO_EVENT_SERIALIZER_SCHEMA_LOCAL_ROOT);


Preconditions.checkArgument(schemaLocalRoot != null && schemaLocalRoot.length() > 0, Preconditions.checkArgument(schemaLocalRoot != null && schemaLocalRoot.length() > 0,
CONFIG_AVRO_EVENT_SERIALIZER_SCHEMA_LOCAL_ROOT + " must be specified for " + SCHEMA_SOURCE_LOCAL + " avro schema source"); CONFIG_AVRO_EVENT_SERIALIZER_SCHEMA_LOCAL_ROOT
+ " must be specified for " + SCHEMA_SOURCE_LOCAL + " avro schema source");
} }
} }


Expand Down Expand Up @@ -107,17 +115,20 @@ public Schema loadByKey(KaaSinkKey key) throws Exception {
String schemaString = null; String schemaString = null;
String logSchema = null; String logSchema = null;
if (schemaSourceType.equals(SCHEMA_SOURCE_REST)) { if (schemaSourceType.equals(SCHEMA_SOURCE_REST)) {
HttpGet getRequest = new HttpGet(KAA_ADMIN_REST_API_LOG_SCHEMA + key.getApplicationToken() + "/" + key.getSchemaVersion()); HttpGet getRequest = new HttpGet(KAA_ADMIN_REST_API_LOG_SCHEMA + key.getApplicationToken()
+ "/" + key.getSchemaVersion());
HttpResponse httpResponse = httpClient.execute(restHost, getRequest, httpContext); HttpResponse httpResponse = httpClient.execute(restHost, getRequest, httpContext);
HttpEntity entity = httpResponse.getEntity(); HttpEntity entity = httpResponse.getEntity();
if (httpResponse.getStatusLine().getStatusCode() == HttpStatus.SC_OK && entity != null) { if (httpResponse.getStatusLine().getStatusCode() == HttpStatus.SC_OK && entity != null) {
String content = EntityUtils.toString(entity); String content = EntityUtils.toString(entity);
ObjectMapper mapper = new ObjectMapper(); ObjectMapper mapper = new ObjectMapper();
LogSchemaDto logSchemaDto = mapper.readValue(content, LogSchemaDto.class); LogSchemaDto logSchemaDto = mapper.readValue(content, LogSchemaDto.class);
HttpGet getCtlRequest = new HttpGet(KAA_ADMIN_REST_API_CTL_SCHEMA + logSchemaDto.getCtlSchemaId()); HttpGet getCtlRequest = new HttpGet(KAA_ADMIN_REST_API_CTL_SCHEMA
+ logSchemaDto.getCtlSchemaId());
HttpResponse httpCtlResponse = httpClient.execute(restHost, getCtlRequest, httpContext); HttpResponse httpCtlResponse = httpClient.execute(restHost, getCtlRequest, httpContext);
HttpEntity ctlEntity = httpCtlResponse.getEntity(); HttpEntity ctlEntity = httpCtlResponse.getEntity();
if (httpCtlResponse.getStatusLine().getStatusCode() == HttpStatus.SC_OK && ctlEntity != null) { if (httpCtlResponse.getStatusLine().getStatusCode() == HttpStatus.SC_OK
&& ctlEntity != null) {
String ctlContent = EntityUtils.toString(entity); String ctlContent = EntityUtils.toString(entity);
ObjectMapper ctlMapper = new ObjectMapper(); ObjectMapper ctlMapper = new ObjectMapper();
logSchema = ctlMapper.readValue(ctlContent, String.class); logSchema = ctlMapper.readValue(ctlContent, String.class);
Expand All @@ -129,7 +140,8 @@ public Schema loadByKey(KaaSinkKey key) throws Exception {
String applicationToken = key.getApplicationToken(); String applicationToken = key.getApplicationToken();
int version = key.getSchemaVersion(); int version = key.getSchemaVersion();
String separator = System.getProperty("file.separator"); String separator = System.getProperty("file.separator");
File schemaFile = new File(schemaLocalRoot + separator + applicationToken + separator + "schema_v" + version); File schemaFile = new File(schemaLocalRoot + separator + applicationToken
+ separator + "schema_v" + version);
if (schemaFile.exists()) { if (schemaFile.exists()) {
schemaString = FileUtils.readFileToString(schemaFile); schemaString = FileUtils.readFileToString(schemaFile);
} }
Expand Down

0 comments on commit c7b5eaa

Please sign in to comment.