Skip to content

Commit

Permalink
KAA-526: Implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
ashvayka committed May 30, 2015
1 parent 82dfe38 commit 41e70e8
Show file tree
Hide file tree
Showing 14 changed files with 1,357 additions and 216 deletions.
4 changes: 4 additions & 0 deletions pom.xml
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -1359,6 +1359,10 @@ Copyright 2014 CyberVision, Inc.
<id>central</id> <id>central</id>
<url>http://repo1.maven.org/maven2/</url> <url>http://repo1.maven.org/maven2/</url>
</repository> </repository>
<repository>
<id>twitter-twttr</id>
<url>http://maven.twttr.com/</url>
</repository>
<repository> <repository>
<id>repository.kaaproject</id> <id>repository.kaaproject</id>
<url>http://repository.kaaproject.org/repository/internal/</url> <url>http://repository.kaaproject.org/repository/internal/</url>
Expand Down
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -16,26 +16,30 @@


package org.kaaproject.kaa.server.appenders.cassandra.appender; package org.kaaproject.kaa.server.appenders.cassandra.appender;


import com.datastax.driver.core.ResultSet; import java.io.IOException;
import com.datastax.driver.core.exceptions.UnsupportedFeatureException; import java.util.ArrayList;
import com.google.common.util.concurrent.FutureCallback; import java.util.List;
import com.google.common.util.concurrent.Futures; import java.util.concurrent.ExecutorService;
import com.google.common.util.concurrent.ListenableFuture; import java.util.concurrent.Executors;

import org.apache.avro.generic.GenericRecord;
import org.kaaproject.kaa.common.avro.GenericAvroConverter;
import org.kaaproject.kaa.common.dto.logs.LogAppenderDto; import org.kaaproject.kaa.common.dto.logs.LogAppenderDto;
import org.kaaproject.kaa.common.dto.logs.LogEventDto;
import org.kaaproject.kaa.server.appenders.cassandra.config.gen.CassandraConfig; import org.kaaproject.kaa.server.appenders.cassandra.config.gen.CassandraConfig;
import org.kaaproject.kaa.server.appenders.cassandra.config.gen.CassandraExecuteRequestType; import org.kaaproject.kaa.server.appenders.cassandra.config.gen.CassandraExecuteRequestType;
import org.kaaproject.kaa.server.common.log.shared.appender.AbstractLogAppender; import org.kaaproject.kaa.server.common.log.shared.appender.AbstractLogAppender;
import org.kaaproject.kaa.server.common.log.shared.appender.LogDeliveryCallback; import org.kaaproject.kaa.server.common.log.shared.appender.LogDeliveryCallback;
import org.kaaproject.kaa.server.common.log.shared.appender.LogEvent;
import org.kaaproject.kaa.server.common.log.shared.appender.LogEventPack; import org.kaaproject.kaa.server.common.log.shared.appender.LogEventPack;
import org.kaaproject.kaa.server.common.log.shared.avro.gen.RecordHeader; import org.kaaproject.kaa.server.common.log.shared.avro.gen.RecordHeader;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;


import java.io.IOException; import com.datastax.driver.core.ResultSet;
import java.util.List; import com.datastax.driver.core.exceptions.UnsupportedFeatureException;
import java.util.concurrent.ExecutorService; import com.google.common.util.concurrent.FutureCallback;
import java.util.concurrent.Executors; import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;


public class CassandraLogAppender extends AbstractLogAppender<CassandraConfig> { public class CassandraLogAppender extends AbstractLogAppender<CassandraConfig> {


Expand All @@ -59,18 +63,20 @@ public void doAppend(LogEventPack logEventPack, RecordHeader header, LogDelivery
if (!closed) { if (!closed) {
try { try {
LOG.debug("[{}] appending {} logs to cassandra collection", tableName, logEventPack.getEvents().size()); LOG.debug("[{}] appending {} logs to cassandra collection", tableName, logEventPack.getEvents().size());
List<LogEventDto> dtoList = generateLogEvent(logEventPack, header); GenericAvroConverter<GenericRecord> eventConverter = getConverter(logEventPack.getLogSchema().getSchema());
GenericAvroConverter<GenericRecord> headerConverter = getConverter(header.getSchema().toString());
List<CassandraLogEventDto> dtoList = generateCassandraLogEvent(logEventPack, header, eventConverter);
LOG.debug("[{}] saving {} objects", tableName, dtoList.size()); LOG.debug("[{}] saving {} objects", tableName, dtoList.size());
if (!dtoList.isEmpty()) { if (!dtoList.isEmpty()) {
switch (executeRequestType) { switch (executeRequestType) {
case ASYNC: case ASYNC:
ListenableFuture<ResultSet> result = logEventDao.saveAsync(dtoList, tableName); ListenableFuture<ResultSet> result = logEventDao.saveAsync(dtoList, tableName, eventConverter, headerConverter);
Futures.addCallback(result, new Callback(listener), callbackExecutor); Futures.addCallback(result, new Callback(listener), callbackExecutor);
break; break;
case SYNC: case SYNC:
logEventDao.save(dtoList, tableName); logEventDao.save(dtoList, tableName, eventConverter, headerConverter);
listener.onSuccess(); listener.onSuccess();
break; break;
} }
LOG.debug("[{}] appended {} logs to cassandra collection", tableName, logEventPack.getEvents().size()); LOG.debug("[{}] appended {} logs to cassandra collection", tableName, logEventPack.getEvents().size());
} else { } else {
Expand All @@ -88,9 +94,9 @@ public void doAppend(LogEventPack logEventPack, RecordHeader header, LogDelivery


@Override @Override
protected void initFromConfiguration(LogAppenderDto appender, CassandraConfig configuration) { protected void initFromConfiguration(LogAppenderDto appender, CassandraConfig configuration) {
LOG.info("Initializing new instance of Cassandra log appender"); LOG.info("Initializing new appender instance using {}", configuration);
try { try {
checkExecuteRequestType(configuration); setExecuteRequestType(configuration);
logEventDao = new CassandraLogEventDao(configuration); logEventDao = new CassandraLogEventDao(configuration);
createTable(appender.getApplicationToken()); createTable(appender.getApplicationToken());
Integer callbackPoolSize = configuration.getCallbackThreadPoolSize(); Integer callbackPoolSize = configuration.getCallbackThreadPoolSize();
Expand All @@ -107,12 +113,7 @@ protected void initFromConfiguration(LogAppenderDto appender, CassandraConfig co
} }


private void createTable(String applicationToken) { private void createTable(String applicationToken) {
if (tableName == null) { tableName = logEventDao.createTable(applicationToken);
tableName = LOG_TABLE_PREFIX + applicationToken;
logEventDao.createTable(tableName);
} else {
LOG.warn("Appender is already initialized..");
}
} }


@Override @Override
Expand All @@ -130,7 +131,28 @@ public void close() {
LOG.info("Cassandra log appender stoped."); LOG.info("Cassandra log appender stoped.");
} }


private void checkExecuteRequestType(CassandraConfig configuration) { protected List<CassandraLogEventDto> generateCassandraLogEvent(LogEventPack logEventPack, RecordHeader header,
GenericAvroConverter<GenericRecord> eventConverter) throws IOException {
LOG.debug("Generate LogEventDto objects from LogEventPack [{}] and header [{}]", logEventPack, header);
List<CassandraLogEventDto> events = new ArrayList<>(logEventPack.getEvents().size());
try {
for (LogEvent logEvent : logEventPack.getEvents()) {
LOG.debug("Convert log events [{}] to dto objects.", logEvent);
if (logEvent == null | logEvent.getLogData() == null) {
continue;
}
LOG.trace("Avro record converter [{}] with log data [{}]", eventConverter, logEvent.getLogData());
GenericRecord decodedLog = eventConverter.decodeBinary(logEvent.getLogData());
events.add(new CassandraLogEventDto(header, decodedLog));
}
} catch (IOException e) {
LOG.error("Unexpected IOException while decoding LogEvents", e);
throw e;
}
return events;
}

private void setExecuteRequestType(CassandraConfig configuration) {
CassandraExecuteRequestType requestType = configuration.getCassandraExecuteRequestType(); CassandraExecuteRequestType requestType = configuration.getCassandraExecuteRequestType();
if (CassandraExecuteRequestType.ASYNC.equals(requestType)) { if (CassandraExecuteRequestType.ASYNC.equals(requestType)) {
executeRequestType = CassandraExecuteRequestType.ASYNC; executeRequestType = CassandraExecuteRequestType.ASYNC;
Expand Down
Loading

0 comments on commit 41e70e8

Please sign in to comment.