Navigation Menu

Skip to content

Commit

Permalink
KAA-746: Add null checks
Browse files Browse the repository at this point in the history
  • Loading branch information
Bohdan Khablenko committed Nov 30, 2015
1 parent a3486ca commit 2b63704
Show file tree
Hide file tree
Showing 5 changed files with 78 additions and 21 deletions.
Expand Up @@ -39,6 +39,7 @@
import org.kaaproject.kaa.server.common.log.shared.appender.LogEvent;
import org.kaaproject.kaa.server.common.log.shared.appender.LogEventPack;
import org.kaaproject.kaa.server.common.log.shared.appender.data.BaseLogEventPack;
import org.kaaproject.kaa.server.common.log.shared.appender.data.ProfileInfo;
import org.kaaproject.kaa.server.common.log.shared.avro.gen.RecordHeader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -100,10 +101,25 @@ public void run() {
LOG.debug("[{}] appending {} logs to cassandra collection", tableName, logEventPack.getEvents().size());
GenericAvroConverter<GenericRecord> eventConverter = getConverter(logEventPack.getLogSchema().getSchema());
GenericAvroConverter<GenericRecord> headerConverter = getConverter(header.getSchema().toString());
GenericAvroConverter<GenericRecord> clientProfileConverter = getConverter(logEventPack.getClientProfile().getSchema());
String clientProfileJson = logEventPack.getClientProfile().getBody();
GenericAvroConverter<GenericRecord> serverProfileConverter = getConverter(logEventPack.getServerProfile().getSchema());
String serverProfileJson = logEventPack.getServerProfile().getBody();

// Get client profile data
GenericAvroConverter<GenericRecord> clientProfileConverter = null;
String clientProfileJson = null;
ProfileInfo clientProfile = logEventPack.getClientProfile();
if (clientProfile != null) {
clientProfileConverter = getConverter(clientProfile.getSchema());
clientProfileJson = clientProfile.getBody();
}

// Get server profile data
GenericAvroConverter<GenericRecord> serverProfileConverter = null;
String serverProfileJson = null;
ProfileInfo serverProfile = logEventPack.getServerProfile();
if (serverProfile != null) {
serverProfileConverter = getConverter(serverProfile.getSchema());
serverProfileJson = serverProfile.getBody();
}

List<CassandraLogEventDto> dtoList = generateCassandraLogEvent(logEventPack, header, eventConverter);
LOG.debug("[{}] saving {} objects", tableName, dtoList.size());
if (!dtoList.isEmpty()) {
Expand Down
Expand Up @@ -74,6 +74,9 @@ public class CassandraLogEventDao implements LogEventDao {
private static final String CREATE_TABLE = "CREATE TABLE IF NOT EXISTS $keyspace_name.$table_name ("
+ "$columns_definition PRIMARY KEY ( $primary_key_definition )) $clustering_definition;";

private static final String ABSENT_CLIENT_PROFILE_ERROR = "Client profile is not set!";
private static final String ABSENT_SERVER_PROFILE_ERROR = "Server profile is not set!";

private final ConcurrentMap<String, ThreadLocal<SimpleDateFormat>> dateFormatMap = new ConcurrentHashMap<String, ThreadLocal<SimpleDateFormat>>();

private Cluster cluster;
Expand Down Expand Up @@ -307,10 +310,23 @@ private Insert[] prepareQuery(List<CassandraLogEventDto> logEventDtoList, String
throws IOException {
String reuseTsValue = null;
Insert[] insertArray = new Insert[logEventDtoList.size()];
GenericRecord clientProfile = clientProfileConverter.decodeJson(clientProfileJson);
ByteBuffer clientProfileBinary = ByteBuffer.wrap(clientProfileConverter.encode(clientProfile));
GenericRecord serverProfile = serverProfileConverter.decodeJson(serverProfileJson);
ByteBuffer serverProfileBinary = ByteBuffer.wrap(serverProfileConverter.encode(serverProfile));

// Process client profile data
GenericRecord clientProfile = null;
ByteBuffer clientProfileBinary = null;
if (clientProfileConverter != null) {
clientProfile = clientProfileConverter.decodeJson(clientProfileJson);
clientProfileBinary = ByteBuffer.wrap(clientProfileConverter.encode(clientProfile));
}

// Process server profile data
GenericRecord serverProfile = null;
ByteBuffer serverProfileBinary = null;
if (serverProfileConverter != null) {
serverProfile = serverProfileConverter.decodeJson(serverProfileJson);
serverProfileBinary = ByteBuffer.wrap(serverProfileConverter.encode(serverProfile));
}

for (int i = 0; i < logEventDtoList.size(); i++) {
CassandraLogEventDto dto = logEventDtoList.get(i);
Insert insert = QueryBuilder.insertInto(keyspaceName, collectionName);
Expand All @@ -325,11 +341,19 @@ private Insert[] prepareQuery(List<CassandraLogEventDto> logEventDtoList, String
formatField(element.getColumnType(), dto.getEvent().get(element.getValue())));
break;
case CLIENT_FIELD:
insert.value(element.getColumnName(),
formatField(element.getColumnType(), clientProfile.get(element.getValue())));
if (clientProfile != null) {
insert.value(element.getColumnName(), formatField(element.getColumnType(), clientProfile.get(element.getValue())));
} else {
throw new RuntimeException(ABSENT_CLIENT_PROFILE_ERROR);
}
break;
case SERVER_FIELD:
insert.value(element.getColumnName(),
formatField(element.getColumnType(), serverProfile.get(element.getValue())));
if (serverProfile != null) {
insert.value(element.getColumnName(), formatField(element.getColumnType(), serverProfile.get(element.getValue())));
} else {
throw new RuntimeException(ABSENT_SERVER_PROFILE_ERROR);
}
break;
case HEADER_JSON:
insert.value(element.getColumnName(), headerConverter.encodeToJson(dto.getHeader()));
break;
Expand All @@ -343,16 +367,33 @@ private Insert[] prepareQuery(List<CassandraLogEventDto> logEventDtoList, String
insert.value(element.getColumnName(), ByteBuffer.wrap(eventConverter.encode(dto.getEvent())));
break;
case CLIENT_JSON:
insert.value(element.getColumnName(), clientProfileJson);
if (clientProfileJson != null) {
insert.value(element.getColumnName(), clientProfileJson);
}
else {
throw new RuntimeException(ABSENT_CLIENT_PROFILE_ERROR);
}
break;
case CLIENT_BINARY:
insert.value(element.getColumnName(), clientProfileBinary);
if (clientProfileBinary != null) {
insert.value(element.getColumnName(), clientProfileBinary);
} else {
throw new RuntimeException(ABSENT_CLIENT_PROFILE_ERROR);
}
break;
case SERVER_JSON:
insert.value(element.getColumnName(), serverProfileJson);
break;
if (serverProfileJson != null) {
insert.value(element.getColumnName(), serverProfileJson);
}
else {
throw new RuntimeException(ABSENT_SERVER_PROFILE_ERROR);
}
case SERVER_BINARY:
insert.value(element.getColumnName(), serverProfileBinary);
if (serverProfileBinary != null) {
insert.value(element.getColumnName(), clientProfileBinary);
} else {
throw new RuntimeException(ABSENT_SERVER_PROFILE_ERROR);
}
break;
case UUID:
insert.value(element.getColumnName(), UUID.randomUUID());
Expand Down

0 comments on commit 2b63704

Please sign in to comment.