Skip to content

Commit

Permalink
Fix code style
Browse files Browse the repository at this point in the history
  • Loading branch information
Kirill380 committed Sep 30, 2016
1 parent 3948511 commit 28b70aa
Show file tree
Hide file tree
Showing 17 changed files with 210 additions and 184 deletions.
Expand Up @@ -50,7 +50,8 @@ public AvroAsyncRpcClient(Properties starterProp, int numberOfClientThreads) {
} }


LOG.info("Number of Threads:" + numberOfClientThreads); LOG.info("Number of Threads:" + numberOfClientThreads);
executorService = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(numberOfClientThreads)); executorService = MoreExecutors
.listeningDecorator(Executors.newFixedThreadPool(numberOfClientThreads));
} }


public AvroAsyncRpcClient(String hostname, Integer port, int numberOfThreads) { public AvroAsyncRpcClient(String hostname, Integer port, int numberOfThreads) {
Expand All @@ -64,18 +65,21 @@ public AvroAsyncRpcClient(String hostname, Integer port, int numberOfThreads) {
} }


LOG.info("Number of Threads:" + numberOfClientThreads); LOG.info("Number of Threads:" + numberOfClientThreads);
executorService = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(numberOfClientThreads)); executorService = MoreExecutors
.listeningDecorator(Executors.newFixedThreadPool(numberOfClientThreads));
} }


public ListenableFuture<AppendAsyncResultPojo> appendAsync(final Event event) throws EventDeliveryException { public ListenableFuture<AppendAsyncResultPojo> appendAsync(final Event event)
ListenableFuture<AppendAsyncResultPojo> future = executorService.submit(new Callable<AppendAsyncResultPojo>() { throws EventDeliveryException {
public AppendAsyncResultPojo call() throws Exception { ListenableFuture<AppendAsyncResultPojo> future = executorService.submit(
RpcClient client = clientQueue.poll(); new Callable<AppendAsyncResultPojo>() {
client.append(event); public AppendAsyncResultPojo call() throws Exception {
clientQueue.add(client); RpcClient client = clientQueue.poll();
return new AppendAsyncResultPojo(true, event); client.append(event);
} clientQueue.add(client);
}); return new AppendAsyncResultPojo(true, event);
}
});
return future; return future;
} }


Expand Down Expand Up @@ -103,17 +107,17 @@ public int getBatchSize() {
public void append(Event event) throws EventDeliveryException { public void append(Event event) throws EventDeliveryException {
try { try {
this.appendAsync(event).get(); this.appendAsync(event).get();
} catch (Exception e) { } catch (Exception ex) {
throw new EventDeliveryException(e); throw new EventDeliveryException(ex);
} }
} }


@Override @Override
public void appendBatch(List<Event> events) throws EventDeliveryException { public void appendBatch(List<Event> events) throws EventDeliveryException {
try { try {
this.appendBatchAsync(events).get(); this.appendBatchAsync(events).get();
} catch (Exception e) { } catch (Exception ex) {
throw new EventDeliveryException(e); throw new EventDeliveryException(ex);
} }
} }


Expand Down
Expand Up @@ -16,6 +16,10 @@


package org.kaaproject.kaa.server.appenders.mongo.appender; package org.kaaproject.kaa.server.appenders.mongo.appender;


import static com.mongodb.util.JSON.parse;
import static org.kaaproject.kaa.server.common.nosql.mongo.dao.model.MongoDaoUtil.decodeReservedCharacteres;
import static org.kaaproject.kaa.server.common.nosql.mongo.dao.model.MongoDaoUtil.encodeReservedCharacteres;

import com.mongodb.DBObject; import com.mongodb.DBObject;
import com.mongodb.util.JSON; import com.mongodb.util.JSON;


Expand Down Expand Up @@ -45,10 +49,13 @@ public LogEvent() {


public LogEvent(LogEventDto dto, ProfileInfo clientProfile, ProfileInfo serverProfile) { public LogEvent(LogEventDto dto, ProfileInfo clientProfile, ProfileInfo serverProfile) {
this.id = dto.getId(); this.id = dto.getId();
this.header = MongoDaoUtil.encodeReservedCharacteres((DBObject) JSON.parse(dto.getHeader())); this.header = encodeReservedCharacteres((DBObject) parse(dto.getHeader()));
this.event = MongoDaoUtil.encodeReservedCharacteres((DBObject) JSON.parse(dto.getEvent())); this.event = encodeReservedCharacteres((DBObject) parse(dto.getEvent()));
this.clientProfile = (clientProfile != null) ? MongoDaoUtil.encodeReservedCharacteres((DBObject) JSON.parse(clientProfile.getBody())) : null; this.clientProfile = (clientProfile != null)
this.serverProfile = (serverProfile != null) ? MongoDaoUtil.encodeReservedCharacteres((DBObject) JSON.parse(serverProfile.getBody())) : null; ? encodeReservedCharacteres((DBObject) parse(clientProfile.getBody())) : null;

this.serverProfile = (serverProfile != null)
? encodeReservedCharacteres((DBObject) parse(serverProfile.getBody())) : null;
} }


public String getId() { public String getId() {
Expand All @@ -64,39 +71,43 @@ public DBObject getEvent() {
} }


public void setEvent(DBObject event) { public void setEvent(DBObject event) {
this.event = MongoDaoUtil.encodeReservedCharacteres(event); this.event = encodeReservedCharacteres(event);
} }


public DBObject getHeader() { public DBObject getHeader() {
return header; return header;
} }


public void setHeader(DBObject header) { public void setHeader(DBObject header) {
this.header = MongoDaoUtil.encodeReservedCharacteres(header); this.header = encodeReservedCharacteres(header);
} }


public DBObject getClientProfile() { public DBObject getClientProfile() {
return clientProfile; return clientProfile;
} }


public void setClientProfile(DBObject clientProfile) { public void setClientProfile(DBObject clientProfile) {
this.clientProfile = MongoDaoUtil.encodeReservedCharacteres(clientProfile); this.clientProfile = encodeReservedCharacteres(clientProfile);
} }


public DBObject getServerProfile() { public DBObject getServerProfile() {
return serverProfile; return serverProfile;
} }


public void setServerProfile(DBObject serverProfile) { public void setServerProfile(DBObject serverProfile) {
this.serverProfile = MongoDaoUtil.encodeReservedCharacteres(serverProfile); this.serverProfile = encodeReservedCharacteres(serverProfile);
} }


@Override @Override
public String toString() { public String toString() {
return "LogEvent [id=" + id + ", header=" + header != null ? MongoDaoUtil.decodeReservedCharacteres(header).toString() : "" + ", event=" + final StringBuilder sb = new StringBuilder("LogEvent[");
event != null ? MongoDaoUtil.decodeReservedCharacteres(event).toString() : "" + ", clientProfile=" + sb.append("id='").append(id).append('\'');
clientProfile != null ? MongoDaoUtil.decodeReservedCharacteres(clientProfile).toString() : "" + ", serverProfile=" + sb.append(", header=").append(header);
serverProfile != null ? MongoDaoUtil.decodeReservedCharacteres(serverProfile).toString() : "" + "]"; sb.append(", event=").append(event);
sb.append(", clientProfile=").append(clientProfile);
sb.append(", serverProfile=").append(serverProfile);
sb.append(']');
return sb.toString();
} }


} }
Expand Up @@ -58,7 +58,8 @@ public LogEventMongoDao(MongoDbConfig configuration) throws Exception {
List<MongoCredential> credentials = new ArrayList<>(); List<MongoCredential> credentials = new ArrayList<>();
if (configuration.getMongoCredentials() != null) { if (configuration.getMongoCredentials() != null) {
for (MongoDBCredential credential : configuration.getMongoCredentials()) { for (MongoDBCredential credential : configuration.getMongoCredentials()) {
credentials.add(MongoCredential.createMongoCRCredential(credential.getUser(), configuration.getDbName(), credentials.add(MongoCredential.createMongoCRCredential(credential.getUser(),
configuration.getDbName(),
credential.getPassword().toCharArray())); credential.getPassword().toCharArray()));
} }
} }
Expand All @@ -85,7 +86,8 @@ public LogEventMongoDao(MongoDbConfig configuration) throws Exception {


MongoDbFactory dbFactory = new SimpleMongoDbFactory(mongoClient, configuration.getDbName()); MongoDbFactory dbFactory = new SimpleMongoDbFactory(mongoClient, configuration.getDbName());


MappingMongoConverter converter = new MappingMongoConverter(dbFactory, new MongoMappingContext()); MappingMongoConverter converter = new MappingMongoConverter(dbFactory,
new MongoMappingContext());
converter.setTypeMapper(new DefaultMongoTypeMapper(null)); converter.setTypeMapper(new DefaultMongoTypeMapper(null));


mongoTemplate = new MongoTemplate(dbFactory, converter); mongoTemplate = new MongoTemplate(dbFactory, converter);
Expand All @@ -98,13 +100,14 @@ public void createCollection(String collectionName) {
if (!mongoTemplate.collectionExists(collectionName)) { if (!mongoTemplate.collectionExists(collectionName)) {
mongoTemplate.createCollection(collectionName); mongoTemplate.createCollection(collectionName);
} }
} catch (UncategorizedMongoDbException e) { } catch (UncategorizedMongoDbException ex) {
LOG.warn("Failed to create collection {} due to", collectionName, e); LOG.warn("Failed to create collection {} due to", collectionName, ex);
} }
} }


@Override @Override
public List<LogEvent> save(List<LogEventDto> logEventDtos, ProfileInfo clientProfile, ProfileInfo serverProfile, String collectionName) { public List<LogEvent> save(List<LogEventDto> logEventDtos, ProfileInfo clientProfile,
ProfileInfo serverProfile, String collectionName) {
List<LogEvent> logEvents = new ArrayList<>(logEventDtos.size()); List<LogEvent> logEvents = new ArrayList<>(logEventDtos.size());
for (LogEventDto logEventDto : logEventDtos) { for (LogEventDto logEventDto : logEventDtos) {
logEvents.add(new LogEvent(logEventDto, clientProfile, serverProfile)); logEvents.add(new LogEvent(logEventDto, clientProfile, serverProfile));
Expand Down
Expand Up @@ -50,28 +50,39 @@ public MongoDbLogAppender() {
} }


@Override @Override
public void doAppend(LogEventPack logEventPack, RecordHeader header, LogDeliveryCallback listener) { public void doAppend(LogEventPack logEventPack, RecordHeader header,
LogDeliveryCallback listener) {
if (!closed) { if (!closed) {
try { try {
ProfileInfo clientProfile = (this.includeClientProfile) ? logEventPack.getClientProfile() : null; ProfileInfo clientProfile = (this.includeClientProfile)
ProfileInfo serverProfile = (this.includeServerProfile) ? logEventPack.getServerProfile() : null; ? logEventPack.getClientProfile() : null;

ProfileInfo serverProfile = (this.includeServerProfile)
? logEventPack.getServerProfile() : null;

LOG.debug("[{}] appending {} logs to mongodb collection",
collectionName, logEventPack.getEvents().size());


LOG.debug("[{}] appending {} logs to mongodb collection", collectionName, logEventPack.getEvents().size());
List<LogEventDto> dtos = generateLogEvent(logEventPack, header); List<LogEventDto> dtos = generateLogEvent(logEventPack, header);
LOG.debug("[{}] saving {} objects", collectionName, dtos.size()); LOG.debug("[{}] saving {} objects", collectionName, dtos.size());
if (!dtos.isEmpty()) { if (!dtos.isEmpty()) {
logEventDao.save(dtos, clientProfile, serverProfile, collectionName); logEventDao.save(dtos, clientProfile, serverProfile, collectionName);
LOG.debug("[{}] appended {} logs to mongodb collection", collectionName, logEventPack.getEvents().size());
LOG.debug("[{}] appended {} logs to mongodb collection",
collectionName, logEventPack.getEvents().size());
} }
listener.onSuccess(); listener.onSuccess();
} catch (MongoSocketException e) { } catch (MongoSocketException ex) {
LOG.error(MessageFormat.format("[{0}] Attempted to append logs failed due to network error", getName()), e); LOG.error(MessageFormat.format("[{0}] Attempted to append logs failed "
+ "due to network error", getName()), ex);
listener.onConnectionError(); listener.onConnectionError();
} catch (MongoInternalException | MongoServerException e) { } catch (MongoInternalException | MongoServerException ex) {
LOG.error(MessageFormat.format("[{0}] Attempted to append logs failed due to remote error", getName()), e); LOG.error(MessageFormat.format("[{0}] Attempted to append logs failed "
+ "due to remote error", getName()), ex);
listener.onRemoteError(); listener.onRemoteError();
} catch (Exception e) { } catch (Exception ex) {
LOG.error(MessageFormat.format("[{0}] Attempted to append logs failed due to internal error", getName()), e); LOG.error(MessageFormat.format("[{0}] Attempted to append logs failed "
+ "due to internal error", getName()), ex);
listener.onInternalError(); listener.onInternalError();
} }
} else { } else {
Expand All @@ -88,8 +99,8 @@ protected void initFromConfiguration(LogAppenderDto appender, MongoDbConfig conf
this.includeClientProfile = configuration.getIncludeClientProfile(); this.includeClientProfile = configuration.getIncludeClientProfile();
this.includeServerProfile = configuration.getIncludeServerProfile(); this.includeServerProfile = configuration.getIncludeServerProfile();
createCollection(appender.getApplicationToken()); createCollection(appender.getApplicationToken());
} catch (Exception e) { } catch (Exception ex) {
LOG.error("Failed to init MongoDB log appender: ", e); LOG.error("Failed to init MongoDB log appender: ", ex);
} }
} }


Expand Down
Expand Up @@ -41,10 +41,10 @@
import java.io.IOException; import java.io.IOException;
import java.net.URI; import java.net.URI;


public class HttpComponentsRequestFactoryBasicAuth extends public class HttpComponentsRequestFactoryBasicAuth extends HttpComponentsClientHttpRequestFactory {
HttpComponentsClientHttpRequestFactory {


private static final Logger LOG = LoggerFactory.getLogger(HttpComponentsRequestFactoryBasicAuth.class); private static final Logger LOG = LoggerFactory
.getLogger(HttpComponentsRequestFactoryBasicAuth.class);


private static final int DEFAULT_MAX_TOTAL_CONNECTIONS = 100; private static final int DEFAULT_MAX_TOTAL_CONNECTIONS = 100;


Expand All @@ -62,12 +62,12 @@ public HttpComponentsRequestFactoryBasicAuth(HttpHost host) {
} }


private static HttpClient createHttpClient() { private static HttpClient createHttpClient() {
CloseableHttpClient httpClient = HttpClientBuilder.create(). CloseableHttpClient httpClient = HttpClientBuilder.create()
setMaxConnTotal(DEFAULT_MAX_TOTAL_CONNECTIONS). .setMaxConnTotal(DEFAULT_MAX_TOTAL_CONNECTIONS)
setMaxConnPerRoute(DEFAULT_MAX_CONNECTIONS_PER_ROUTE). .setMaxConnPerRoute(DEFAULT_MAX_CONNECTIONS_PER_ROUTE)
setRetryHandler(new BasicHttpRequestRetryHandler(5, 10000)). .setRetryHandler(new BasicHttpRequestRetryHandler(5, 10000))
setServiceUnavailableRetryStrategy(new BaseServiceUnavailableRetryStrategy(3, 5000)). .setServiceUnavailableRetryStrategy(new BaseServiceUnavailableRetryStrategy(3, 5000))
build(); .build();
return httpClient; return httpClient;
} }


Expand Down Expand Up @@ -95,7 +95,7 @@ public void setCredentials(String username, String password) {
new UsernamePasswordCredentials(username, password)); new UsernamePasswordCredentials(username, password));
} }


static class BasicHttpRequestRetryHandler extends DefaultHttpRequestRetryHandler { private static class BasicHttpRequestRetryHandler extends DefaultHttpRequestRetryHandler {


private final long connectRetryInterval; private final long connectRetryInterval;


Expand All @@ -109,9 +109,11 @@ public boolean retryRequest(IOException exception, int executionCount,
HttpContext context) { HttpContext context) {
if (executionCount <= getRetryCount()) { if (executionCount <= getRetryCount()) {
try { try {
LOG.warn("IOException '{}'. Wait for {} before next attempt to connect...", exception.getMessage(), connectRetryInterval); LOG.warn("IOException '{}'. Wait for {} before next attempt to connect...",
exception.getMessage(), connectRetryInterval);
Thread.sleep(connectRetryInterval); Thread.sleep(connectRetryInterval);
} catch (InterruptedException e) { } catch (InterruptedException ex) {
LOG.error("Thread was interrupted", ex);
} }
return true; return true;
} else { } else {
Expand All @@ -121,7 +123,8 @@ public boolean retryRequest(IOException exception, int executionCount,


} }


static class BaseServiceUnavailableRetryStrategy implements ServiceUnavailableRetryStrategy { private static class BaseServiceUnavailableRetryStrategy
implements ServiceUnavailableRetryStrategy {


/** /**
* Maximum number of allowed retries if the server responds with a HTTP code * Maximum number of allowed retries if the server responds with a HTTP code
Expand Down Expand Up @@ -151,8 +154,10 @@ public BaseServiceUnavailableRetryStrategy() {
this(1, 1000); this(1, 1000);
} }


public boolean retryRequest(final HttpResponse response, int executionCount, final HttpContext context) { public boolean retryRequest(final HttpResponse response, int executionCount,
return executionCount <= maxRetries && response.getStatusLine().getStatusCode() == HttpStatus.SC_SERVICE_UNAVAILABLE; final HttpContext context) {
return executionCount <= maxRetries
&& response.getStatusLine().getStatusCode() == HttpStatus.SC_SERVICE_UNAVAILABLE;
} }


public long getRetryInterval() { public long getRetryInterval() {
Expand Down
Expand Up @@ -358,7 +358,7 @@ public class DaoConstants {
public static final String LOG_SCHEMA_TABLE_NAME = "log_schems"; public static final String LOG_SCHEMA_TABLE_NAME = "log_schems";


/** /**
* Notification schems constants * Notification schems constants.
*/ */
public static final String NOTIFICATION_SCHEMA_TABLE_NAME = "notification_schems"; public static final String NOTIFICATION_SCHEMA_TABLE_NAME = "notification_schems";
public static final String NOTIFICATION_SCHEMA_TYPE_PROPERTY = "type"; public static final String NOTIFICATION_SCHEMA_TYPE_PROPERTY = "type";
Expand All @@ -372,7 +372,7 @@ public class DaoConstants {
public static final String LOG_APPENDER_CONFIRM_DELIVERY = "confirm_delivery"; public static final String LOG_APPENDER_CONFIRM_DELIVERY = "confirm_delivery";


/** /**
* SDK profile constants * SDK profile constants.
*/ */
public static final String SDK_PROFILE_APPLICATION_ID = APPLICATION_ID; public static final String SDK_PROFILE_APPLICATION_ID = APPLICATION_ID;
public static final String SDK_PROFILE_CONFIGURATION_SCHEMA_VERSION = public static final String SDK_PROFILE_CONFIGURATION_SCHEMA_VERSION =
Expand All @@ -390,7 +390,7 @@ public class DaoConstants {
public static final String SDK_PROFILE_TOKEN = "token"; public static final String SDK_PROFILE_TOKEN = "token";


/** /**
* CTL schems constants * CTL schems constants.
*/ */
public static final String CTL_SCHEMA_TABLE_NAME = "ctl"; public static final String CTL_SCHEMA_TABLE_NAME = "ctl";
public static final String CTL_SCHEMA_META_INFO_ID = "metainfo_id"; public static final String CTL_SCHEMA_META_INFO_ID = "metainfo_id";
Expand All @@ -415,7 +415,7 @@ public class DaoConstants {
+ "." + ID; + "." + ID;


/** /**
* CTL schems meta info constants * CTL schems meta info constants.
*/ */
public static final String CTL_SCHEMA_META_INFO_TABLE_NAME = "ctl_metainfo"; public static final String CTL_SCHEMA_META_INFO_TABLE_NAME = "ctl_metainfo";
public static final String CTL_SCHEMA_META_INFO_FQN = FQN; public static final String CTL_SCHEMA_META_INFO_FQN = FQN;
Expand Down

0 comments on commit 28b70aa

Please sign in to comment.