Skip to content

Commit

Permalink
KAA-1279: Fixed some log appenders and verifiers errors
Browse files Browse the repository at this point in the history
  • Loading branch information
sashadidukh committed Oct 6, 2016
1 parent f43d862 commit fb3de14
Show file tree
Hide file tree
Showing 11 changed files with 119 additions and 94 deletions.
Expand Up @@ -78,7 +78,9 @@ protected Map<String, GenericAvroConverter<GenericRecord>> initialValue() {


}; };



/**
* Instantiates a new CassandraLogAppender.
*/
public CassandraLogAppender() { public CassandraLogAppender() {
super(CassandraConfig.class); super(CassandraConfig.class);
scheduler.scheduleWithFixedDelay(new Runnable() { scheduler.scheduleWithFixedDelay(new Runnable() {
Expand Down Expand Up @@ -137,13 +139,15 @@ public void run() {
listener.onSuccess(); listener.onSuccess();
cassandraSuccessLogCount.getAndAdd(logCount); cassandraSuccessLogCount.getAndAdd(logCount);
break; break;
default:
break;
} }
LOG.debug("[{}] appended {} logs to cassandra collection", tableName, logEventPack.getEvents().size()); LOG.debug("[{}] appended {} logs to cassandra collection", tableName, logEventPack.getEvents().size());
} else { } else {
listener.onInternalError(); listener.onInternalError();
} }
} catch (Exception e) { } catch (Exception ex) {
LOG.warn("Got exception. Can't process log events", e); LOG.warn("Got exception. Can't process log events", ex);
listener.onInternalError(); listener.onInternalError();
} }
} }
Expand All @@ -167,8 +171,8 @@ protected void initFromConfiguration(LogAppenderDto appender, CassandraConfig co
executor = Executors.newFixedThreadPool(executorPoolSize); executor = Executors.newFixedThreadPool(executorPoolSize);
callbackExecutor = Executors.newFixedThreadPool(callbackPoolSize); callbackExecutor = Executors.newFixedThreadPool(callbackPoolSize);
LOG.info("Cassandra log appender initialized"); LOG.info("Cassandra log appender initialized");
} catch (Exception e) { } catch (Exception ex) {
LOG.error("Failed to init cassandra log appender: ", e); LOG.error("Failed to init cassandra log appender: ", ex);
} }
} }


Expand Down Expand Up @@ -229,9 +233,9 @@ protected List<CassandraLogEventDto> generateCassandraLogEvent(LogEventPack logE
GenericRecord decodedLog = eventConverter.decodeBinary(logEvent.getLogData()); GenericRecord decodedLog = eventConverter.decodeBinary(logEvent.getLogData());
events.add(new CassandraLogEventDto(header, decodedLog)); events.add(new CassandraLogEventDto(header, decodedLog));
} }
} catch (IOException e) { } catch (IOException ex) {
LOG.error("Unexpected IOException while decoding LogEvents", e); LOG.error("Unexpected IOException while decoding LogEvents", ex);
throw e; throw ex;
} }
return events; return events;
} }
Expand Down Expand Up @@ -286,12 +290,12 @@ public void onSuccess(ResultSet result) {
} }


@Override @Override
public void onFailure(Throwable t) { public void onFailure(Throwable throwable) {
cassandraFailureLogCount.getAndAdd(size); cassandraFailureLogCount.getAndAdd(size);
LOG.warn("Failed to store record", t); LOG.warn("Failed to store record", throwable);
if (t instanceof UnsupportedFeatureException) { if (throwable instanceof UnsupportedFeatureException) {
callback.onRemoteError(); callback.onRemoteError();
} else if (t instanceof IOException) { } else if (throwable instanceof IOException) {
callback.onConnectionError(); callback.onConnectionError();
} else { } else {
callback.onInternalError(); callback.onInternalError();
Expand Down
Expand Up @@ -87,6 +87,9 @@ public class CassandraLogEventDao implements LogEventDao {
private String keyspaceName; private String keyspaceName;
private CassandraConfig configuration; private CassandraConfig configuration;


/**
* Instantiates a new CassandraLogEventDao.
*/
public CassandraLogEventDao(CassandraConfig configuration) throws UnknownHostException { public CassandraLogEventDao(CassandraConfig configuration) throws UnknownHostException {
if (configuration == null) { if (configuration == null) {
throw new IllegalArgumentException("Configuration shouldn't be null"); throw new IllegalArgumentException("Configuration shouldn't be null");
Expand Down Expand Up @@ -249,8 +252,10 @@ public List<CassandraLogEventDto> save(List<CassandraLogEventDto> logEventDtoLis


@Override @Override
public ListenableFuture<ResultSet> saveAsync(List<CassandraLogEventDto> logEventDtoList, String tableName, public ListenableFuture<ResultSet> saveAsync(List<CassandraLogEventDto> logEventDtoList, String tableName,
GenericAvroConverter<GenericRecord> eventConverter, GenericAvroConverter<GenericRecord> headerConverter, GenericAvroConverter<GenericRecord> eventConverter,
GenericAvroConverter<GenericRecord> clientProfileConverter, GenericAvroConverter<GenericRecord> serverProfileConverter, GenericAvroConverter<GenericRecord> headerConverter,
GenericAvroConverter<GenericRecord> clientProfileConverter,
GenericAvroConverter<GenericRecord> serverProfileConverter,
String clientProfileJson, String serverProfileJson) String clientProfileJson, String serverProfileJson)
throws IOException { throws IOException {
LOG.debug("Execute async bath request for cassandra table {}", tableName); LOG.debug("Execute async bath request for cassandra table {}", tableName);
Expand Down Expand Up @@ -388,6 +393,7 @@ private Insert[] prepareQuery(List<CassandraLogEventDto> logEventDtoList, String
} else { } else {
throw new RuntimeException(ABSENT_SERVER_PROFILE_ERROR); throw new RuntimeException(ABSENT_SERVER_PROFILE_ERROR);
} }
break;
case SERVER_BINARY: case SERVER_BINARY:
if (serverProfileBinary != null) { if (serverProfileBinary != null) {
insert.value(element.getColumnName(), clientProfileBinary); insert.value(element.getColumnName(), clientProfileBinary);
Expand All @@ -402,6 +408,8 @@ private Insert[] prepareQuery(List<CassandraLogEventDto> logEventDtoList, String
reuseTsValue = formatTs(reuseTsValue, element); reuseTsValue = formatTs(reuseTsValue, element);
insert.value(element.getColumnName(), reuseTsValue); insert.value(element.getColumnName(), reuseTsValue);
break; break;
default:
break;
} }
} }


Expand All @@ -421,20 +429,20 @@ private String formatTs(String tsValue, ColumnMappingElement element) {
if (pattern == null || pattern.isEmpty()) { if (pattern == null || pattern.isEmpty()) {
tsValue = ts + ""; tsValue = ts + "";
} else { } else {
ThreadLocal<SimpleDateFormat> formatterTL = dateFormatMap.get(pattern); ThreadLocal<SimpleDateFormat> formatterTl = dateFormatMap.get(pattern);
if (formatterTL == null) { if (formatterTl == null) {
formatterTL = new ThreadLocal<SimpleDateFormat>() { formatterTl = new ThreadLocal<SimpleDateFormat>() {
@Override @Override
protected SimpleDateFormat initialValue() { protected SimpleDateFormat initialValue() {
return new SimpleDateFormat(pattern); return new SimpleDateFormat(pattern);
} }
}; };
dateFormatMap.putIfAbsent(pattern, formatterTL); dateFormatMap.putIfAbsent(pattern, formatterTl);
} }
SimpleDateFormat formatter = formatterTL.get(); SimpleDateFormat formatter = formatterTl.get();
if (formatter == null) { if (formatter == null) {
formatter = new SimpleDateFormat(pattern); formatter = new SimpleDateFormat(pattern);
formatterTL.set(formatter); formatterTl.set(formatter);
} }
tsValue = formatter.format(new Date(ts)); tsValue = formatter.format(new Date(ts));
} }
Expand Down
Expand Up @@ -24,6 +24,9 @@ public class CassandraLogEventDto {
private final RecordHeader header; private final RecordHeader header;
private final GenericRecord event; private final GenericRecord event;


/**
* Instantiates a new CassandraLogEventDto.
*/
public CassandraLogEventDto(RecordHeader header, GenericRecord event) { public CassandraLogEventDto(RecordHeader header, GenericRecord event) {
super(); super();
this.header = header; this.header = header;
Expand Down
Expand Up @@ -66,6 +66,9 @@ protected Map<String, GenericAvroConverter<GenericRecord>> initialValue() {


}; };


/**
* Instantiates a new KafkaLogAppender.
*/
public KafkaLogAppender() { public KafkaLogAppender() {
super(KafkaConfig.class); super(KafkaConfig.class);
scheduler.scheduleWithFixedDelay(new Runnable() { scheduler.scheduleWithFixedDelay(new Runnable() {
Expand Down Expand Up @@ -123,8 +126,8 @@ public void run() {
} else { } else {
listener.onInternalError(); listener.onInternalError();
} }
} catch (Exception e) { } catch (Exception ex) {
LOG.warn("Got exception. Can't process log events", e); LOG.warn("Got exception. Can't process log events", ex);
listener.onInternalError(); listener.onInternalError();
} }
} }
Expand All @@ -145,8 +148,8 @@ protected void initFromConfiguration(LogAppenderDto appender, KafkaConfig config
executor = Executors.newFixedThreadPool(executorPoolSize); executor = Executors.newFixedThreadPool(executorPoolSize);
topicName = configuration.getTopic(); topicName = configuration.getTopic();
LOG.info("Kafka log appender initialized"); LOG.info("Kafka log appender initialized");
} catch (Exception e) { } catch (Exception ex) {
LOG.error("Failed to init kafka log appender: ", e); LOG.error("Failed to init kafka log appender: ", ex);
} }


} }
Expand All @@ -165,9 +168,9 @@ protected List<KafkaLogEventDto> generateKafkaLogEvent(LogEventPack logEventPack
GenericRecord decodedLog = eventConverter.decodeBinary(logEvent.getLogData()); GenericRecord decodedLog = eventConverter.decodeBinary(logEvent.getLogData());
events.add(new KafkaLogEventDto(header, decodedLog)); events.add(new KafkaLogEventDto(header, decodedLog));
} }
} catch (IOException e) { } catch (IOException ex) {
LOG.error("Unexpected IOException while decoding LogEvents", e); LOG.error("Unexpected IOException while decoding LogEvents", ex);
throw e; throw ex;
} }
return events; return events;
} }
Expand Down Expand Up @@ -208,14 +211,14 @@ private LogAppenderCallback(LogDeliveryCallback callback, AtomicInteger kafkaSuc
} }


@Override @Override
public void onCompletion(RecordMetadata record, Exception e) { public void onCompletion(RecordMetadata record, Exception ex) {
if (e == null) { if (ex == null) {
kafkaSuccessLogCount.getAndAdd(size); kafkaSuccessLogCount.getAndAdd(size);
callback.onSuccess(); callback.onSuccess();
} else { } else {
kafkaFailureLogCount.getAndAdd(size); kafkaFailureLogCount.getAndAdd(size);
LOG.warn("Failed to store record", e); LOG.warn("Failed to store record", ex);
if (e instanceof IOException) { if (ex instanceof IOException) {
callback.onConnectionError(); callback.onConnectionError();
} else { } else {
callback.onInternalError(); callback.onInternalError();
Expand Down
Expand Up @@ -43,13 +43,16 @@ public class KafkaLogEventDao implements LogEventDao {
private static final String KEY_SERIALIZER = "org.apache.kafka.common.serialization.StringSerializer"; private static final String KEY_SERIALIZER = "org.apache.kafka.common.serialization.StringSerializer";
private static final String VALUE_SERIALIZER = "org.apache.kafka.common.serialization.StringSerializer"; private static final String VALUE_SERIALIZER = "org.apache.kafka.common.serialization.StringSerializer";


private final Random RANDOM = new Random(); private static final Random RANDOM = new Random();


private KafkaProducer<String, String> producer; private KafkaProducer<String, String> producer;
private KafkaConfig configuration; private KafkaConfig configuration;
private String topicName; private String topicName;
private int partitionCount; private int partitionCount;


/**
* Instantiates a new KafkaLogEventDao.
*/
public KafkaLogEventDao(KafkaConfig configuration) { public KafkaLogEventDao(KafkaConfig configuration) {
if (configuration == null) { if (configuration == null) {
throw new IllegalArgumentException("Configuration shouldn't be null"); throw new IllegalArgumentException("Configuration shouldn't be null");
Expand All @@ -58,13 +61,13 @@ public KafkaLogEventDao(KafkaConfig configuration) {
this.configuration = configuration; this.configuration = configuration;
this.topicName = configuration.getTopic(); this.topicName = configuration.getTopic();
this.partitionCount = configuration.getPartitionCount(); this.partitionCount = configuration.getPartitionCount();
Properties kafkaProperties = new Properties();
StringBuilder serverList = new StringBuilder(); StringBuilder serverList = new StringBuilder();
for (KafkaServer server : configuration.getKafkaServers()) { for (KafkaServer server : configuration.getKafkaServers()) {
serverList.append(server.getHost() + ":" + server.getPort() + ","); serverList.append(server.getHost() + ":" + server.getPort() + ",");
} }
serverList = serverList.deleteCharAt(serverList.length() - 1); serverList = serverList.deleteCharAt(serverList.length() - 1);
LOG.info("Init kafka cluster with property {}={}", ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, serverList); LOG.info("Init kafka cluster with property {}={}", ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, serverList);
Properties kafkaProperties = new Properties();
kafkaProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, serverList.toString()); kafkaProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, serverList.toString());
LOG.info("Init kafka cluster with property {}={}", ProducerConfig.ACKS_CONFIG, LOG.info("Init kafka cluster with property {}={}", ProducerConfig.ACKS_CONFIG,
configuration.getKafkaAcknowledgement()); configuration.getKafkaAcknowledgement());
Expand Down Expand Up @@ -97,11 +100,11 @@ public List<Future<RecordMetadata>> save(List<KafkaLogEventDto> logEventDtoList,
for (KafkaLogEventDto dto : logEventDtoList) { for (KafkaLogEventDto dto : logEventDtoList) {
ProducerRecord<String, String> recordToWrite; ProducerRecord<String, String> recordToWrite;
if (configuration.getUseDefaultPartitioner()) { if (configuration.getUseDefaultPartitioner()) {
recordToWrite = new ProducerRecord<String, String>(topicName, getKey(dto), formKafkaJSON(dto, recordToWrite = new ProducerRecord<String, String>(topicName, getKey(dto), formKafkaJson(dto,
eventConverter, headerConverter)); eventConverter, headerConverter));
} else { } else {
recordToWrite = new ProducerRecord<String, String>(topicName, calculatePartitionID(dto), getKey(dto), recordToWrite = new ProducerRecord<String, String>(topicName, calculatePartitionId(dto), getKey(dto),
formKafkaJSON(dto, eventConverter, headerConverter)); formKafkaJson(dto, eventConverter, headerConverter));
} }
results.add(producer.send(recordToWrite, callback)); results.add(producer.send(recordToWrite, callback));
} }
Expand All @@ -116,7 +119,7 @@ public void close() {
} }
} }


private int calculatePartitionID(KafkaLogEventDto eventDto) { private int calculatePartitionId(KafkaLogEventDto eventDto) {
return eventDto.hashCode() % partitionCount; return eventDto.hashCode() % partitionCount;
} }


Expand All @@ -135,15 +138,15 @@ private String parseAcknowledgement(String record) {
} }
} }


private String formKafkaJSON(KafkaLogEventDto dto, GenericAvroConverter<GenericRecord> eventConverter, private String formKafkaJson(KafkaLogEventDto dto, GenericAvroConverter<GenericRecord> eventConverter,
GenericAvroConverter<GenericRecord> headerConverter) throws IOException { GenericAvroConverter<GenericRecord> headerConverter) throws IOException {
String eventJSON = eventConverter.encodeToJson(dto.getEvent()); String eventJson = eventConverter.encodeToJson(dto.getEvent());
String headerJSON = headerConverter.encodeToJson(dto.getHeader()); String headerJson = headerConverter.encodeToJson(dto.getHeader());
StringBuilder result = new StringBuilder("{"); StringBuilder result = new StringBuilder("{");
if (headerJSON != null && !headerJSON.isEmpty()) { if (headerJson != null && !headerJson.isEmpty()) {
result.append("\"header\":" + headerJSON + ","); result.append("\"header\":" + headerJson + ",");
} }
result.append("\"event\":" + eventJSON + "}"); result.append("\"event\":" + eventJson + "}");
return result.toString(); return result.toString();
} }


Expand Down
Expand Up @@ -28,6 +28,9 @@ public class KafkaLogEventDto implements Serializable {
private final RecordHeader header; private final RecordHeader header;
private final GenericRecord event; private final GenericRecord event;


/**
* Instantiates a new KafkaLogEventDto.
*/
public KafkaLogEventDto(RecordHeader header, GenericRecord event) { public KafkaLogEventDto(RecordHeader header, GenericRecord event) {
super(); super();
this.header = header; this.header = header;
Expand Down
Expand Up @@ -83,7 +83,7 @@ public OracleNoSqlLogAppender() {
private static String getHostName() { private static String getHostName() {
try { try {
return InetAddress.getLocalHost().getHostName(); return InetAddress.getLocalHost().getHostName();
} catch (UnknownHostException e) { } catch (UnknownHostException ex) {
return ""; return "";
} }
} }
Expand All @@ -96,11 +96,11 @@ public void doAppend(LogEventPack logEventPack, RecordHeader header, LogDelivery
try { try {
doAppendGenericAvro(logEventPack, header); doAppendGenericAvro(logEventPack, header);
listener.onSuccess(); listener.onSuccess();
} catch (FaultException e) { } catch (FaultException ex) {
LOG.error("Unable to append logs due to remote exception!", e); LOG.error("Unable to append logs due to remote exception!", ex);
listener.onRemoteError(); listener.onRemoteError();
} catch (Exception e) { } catch (Exception ex) {
LOG.error("Unable to append logs!", e); LOG.error("Unable to append logs!", ex);
listener.onInternalError(); listener.onInternalError();
} }
} else { } else {
Expand All @@ -117,8 +117,8 @@ public void doAppend(LogEventPack logEventPack, RecordHeader header, LogDelivery
protected void initFromConfiguration(LogAppenderDto appender, OracleNoSqlConfig configuration) { protected void initFromConfiguration(LogAppenderDto appender, OracleNoSqlConfig configuration) {
try { try {
kvStore = initKvStore(configuration); kvStore = initKvStore(configuration);
} catch (Exception e) { } catch (Exception ex) {
LOG.error("Failed to init kvStore: ", e); LOG.error("Failed to init kvStore: ", ex);
} }
} }


Expand All @@ -141,9 +141,9 @@ private void doAppendGenericAvro(LogEventPack logEventPack, RecordHeader header)
binaryDecoder = DecoderFactory.get().binaryDecoder(event.getLogData(), binaryDecoder); binaryDecoder = DecoderFactory.get().binaryDecoder(event.getLogData(), binaryDecoder);
try { try {
recordData = datumReader.read(recordData, binaryDecoder); recordData = datumReader.read(recordData, binaryDecoder);
} catch (IOException e) { } catch (IOException ex) {
LOG.error("[{}] Unable to read log event!", e); LOG.error("[{}] Unable to read log event!", ex);
throw e; throw ex;
} }
wrapperRecord.put(RecordWrapperSchemaGenerator.RECORD_HEADER_FIELD, header); wrapperRecord.put(RecordWrapperSchemaGenerator.RECORD_HEADER_FIELD, header);
wrapperRecord.put(RecordWrapperSchemaGenerator.RECORD_DATA_FIELD, recordData); wrapperRecord.put(RecordWrapperSchemaGenerator.RECORD_DATA_FIELD, recordData);
Expand All @@ -165,9 +165,9 @@ private void initialize(LogEventPack logEventPack) throws Exception {
Schema userSchema = new Schema.Parser().parse(logEventPack.getLogSchema().getSchema()); Schema userSchema = new Schema.Parser().parse(logEventPack.getLogSchema().getSchema());
datumReader = new GenericDatumReader<GenericRecord>(userSchema); datumReader = new GenericDatumReader<GenericRecord>(userSchema);
wrapperRecord = new GenericData.Record(recordWrapperSchema); wrapperRecord = new GenericData.Record(recordWrapperSchema);
} catch (Exception e) { } catch (Exception ex) {
LOG.error("[{}] Unable to initialize parameters for log event pack.", getName()); LOG.error("[{}] Unable to initialize parameters for log event pack.", getName());
throw e; throw ex;
} }


} }
Expand Down Expand Up @@ -231,8 +231,6 @@ private KVStore initKvStore(OracleNoSqlConfig configuration) throws Exception {
helperHostPorts[i] = node.getHost() + ":" + node.getPort(); helperHostPorts[i] = node.getHost() + ":" + node.getPort();
} }


KVStoreConfig config = new KVStoreConfig(configuration.getStoreName(), helperHostPorts);

Properties securityProperties = new Properties(); Properties securityProperties = new Properties();
if (configuration.getUsername() != null) { if (configuration.getUsername() != null) {
username = configuration.getUsername(); username = configuration.getUsername();
Expand Down Expand Up @@ -270,6 +268,8 @@ private KVStore initKvStore(OracleNoSqlConfig configuration) throws Exception {
if (configuration.getSslTrustStoreType() != null) { if (configuration.getSslTrustStoreType() != null) {
securityProperties.put(KVSecurityConstants.SSL_TRUSTSTORE_TYPE_PROPERTY, configuration.getSslTrustStoreType()); securityProperties.put(KVSecurityConstants.SSL_TRUSTSTORE_TYPE_PROPERTY, configuration.getSslTrustStoreType());
} }

KVStoreConfig config = new KVStoreConfig(configuration.getStoreName(), helperHostPorts);
config.setSecurityProperties(securityProperties); config.setSecurityProperties(securityProperties);


KVStore kvStore = KVStoreFactory.getStore(config); KVStore kvStore = KVStoreFactory.getStore(config);
Expand Down

0 comments on commit fb3de14

Please sign in to comment.