Skip to content

Commit

Permalink
Fixed some warns.
Browse files Browse the repository at this point in the history
  • Loading branch information
Acarus committed Oct 1, 2016
1 parent d1982fd commit 993a6ad
Show file tree
Hide file tree
Showing 47 changed files with 659 additions and 415 deletions.
Expand Up @@ -251,20 +251,24 @@ public String toString() {
*/
public boolean requireImmediateReply() {
ServerSync response = getResponse();
if (response.getProfileSync() != null && response.getProfileSync().getResponseStatus() != SyncResponseStatus.NO_DELTA) {
if (response.getProfileSync() != null
&& response.getProfileSync().getResponseStatus() != SyncResponseStatus.NO_DELTA) {
return true;
}
if (response.getConfigurationSync() != null && response.getConfigurationSync().getResponseStatus() != SyncResponseStatus.NO_DELTA) {
if (response.getConfigurationSync() != null
&& response.getConfigurationSync().getResponseStatus() != SyncResponseStatus.NO_DELTA) {
return true;
}
if (response.getNotificationSync() != null && response.getNotificationSync().getResponseStatus() != SyncResponseStatus.NO_DELTA) {
if (response.getNotificationSync() != null
&& response.getNotificationSync().getResponseStatus() != SyncResponseStatus.NO_DELTA) {
return true;
}
if (response.getEventSync() != null) {
if (response.getEventSync().getEventSequenceNumberResponse() != null) {
return true;
}
if (response.getEventSync().getEvents() != null && !response.getEventSync().getEvents().isEmpty()) {
if (response.getEventSync().getEvents() != null
&& !response.getEventSync().getEvents().isEmpty()) {
return true;
}
if (response.getEventSync().getEventListenersResponses() != null
Expand All @@ -274,10 +278,12 @@ public boolean requireImmediateReply() {
}
if (response.getUserSync() != null) {
UserServerSync userResponse = response.getUserSync();
if (userResponse.getEndpointAttachResponses() != null && !userResponse.getEndpointAttachResponses().isEmpty()) {
if (userResponse.getEndpointAttachResponses() != null
&& !userResponse.getEndpointAttachResponses().isEmpty()) {
return true;
}
if (userResponse.getEndpointDetachResponses() != null && !userResponse.getEndpointDetachResponses().isEmpty()) {
if (userResponse.getEndpointDetachResponses() != null
&& !userResponse.getEndpointDetachResponses().isEmpty()) {
return true;
}
if (userResponse.getUserAttachResponse() != null) {
Expand Down
Expand Up @@ -41,10 +41,10 @@ public GetDeltaException(String message) {
/**
* Instantiates a new delta exception.
*
* @param e the e
* @param ex exception
*/
public GetDeltaException(Exception e) {
super(e);
public GetDeltaException(Exception ex) {
super(ex);
}


Expand Down
Expand Up @@ -102,9 +102,12 @@ public void initActorSystem() {
LOG.info("Initializing Akka system...");
akka = ActorSystem.create(EPS, context.getConfig());
LOG.info("Initializing Akka EPS actor...");
opsActor = akka.actorOf(Props.create(new OperationsServerActor.ActorCreator(context)).withDispatcher(CORE_DISPATCHER_NAME), EPS);
opsActor = akka.actorOf(Props.create(
new OperationsServerActor.ActorCreator(context))
.withDispatcher(CORE_DISPATCHER_NAME), EPS);
LOG.info("Lookup platform protocols");
Set<String> platformProtocols = PlatformLookup.lookupPlatformProtocols(PlatformLookup.DEFAULT_PROTOCOL_LOOKUP_PACKAGE_NAME);
Set<String> platformProtocols = PlatformLookup.lookupPlatformProtocols(
PlatformLookup.DEFAULT_PROTOCOL_LOOKUP_PACKAGE_NAME);
LOG.info("Initializing Akka io router...");
ioRouter = akka.actorOf(
new RoundRobinPool(context.getIOWorkerCount())
Expand Down Expand Up @@ -156,10 +159,12 @@ public void onRedirectionRule(RedirectionRule redirectionRule) {
*/
@Override
public void onNotification(Notification notification) {
ApplicationDto applicationDto = context.getApplicationService().findAppById(notification.getAppId());
ApplicationDto applicationDto = context.getApplicationService()
.findAppById(notification.getAppId());
if (applicationDto != null) {
LOG.debug("Sending message {} to EPS actor", notification);
opsActor.tell(new ThriftNotificationMessage(applicationDto.getApplicationToken(), notification), ActorRef.noSender());
opsActor.tell(new ThriftNotificationMessage(
applicationDto.getApplicationToken(), notification), ActorRef.noSender());
} else {
LOG.warn("Can't find corresponding application for: {}", notification);
}
Expand Down Expand Up @@ -189,7 +194,8 @@ public void onUserConfigurationUpdate(UserConfigurationUpdate update) {
}

@Override
public void setStatusListener(final AkkaStatusListener listener, final long statusUpdateFrequency) {
public void setStatusListener(final AkkaStatusListener listener,
final long statusUpdateFrequency) {
this.statusListenerThread = new StatusListenerThread(listener, statusUpdateFrequency);
this.statusListenerThread.start();
}
Expand Down Expand Up @@ -218,9 +224,9 @@ public void run() {
while (!stopped) {
try {
Thread.sleep(statusUpdateFrequency);
} catch (InterruptedException e) {
} catch (InterruptedException ex) {
if (!stopped) {
LOG.warn("Status update thread was interrupted", e);
LOG.warn("Status update thread was interrupted", ex);
} else {
break;
}
Expand Down
Expand Up @@ -53,7 +53,8 @@ public class ApplicationLogActorMessageProcessor {
/**
* The Constant LOG.
*/
private static final Logger LOG = LoggerFactory.getLogger(ApplicationLogActorMessageProcessor.class);
private static final Logger LOG =
LoggerFactory.getLogger(ApplicationLogActorMessageProcessor.class);

private final LogAppenderService logAppenderService;
private final CacheService cacheService;
Expand Down Expand Up @@ -81,7 +82,9 @@ public ApplicationLogActorMessageProcessor(AkkaContext context, String applicati
this.cacheService = context.getCacheService();
this.ctlService = context.getCtlService();
this.applicationToken = applicationToken;
this.applicationId = context.getApplicationService().findAppByApplicationToken(applicationToken).getId();
this.applicationId = context.getApplicationService()
.findAppByApplicationToken(applicationToken)
.getId();
this.logAppenders = new HashMap<>();
this.logAppendersCache = new HashMap<>();
this.logSchemas = new HashMap<>();
Expand All @@ -94,7 +97,8 @@ public ApplicationLogActorMessageProcessor(AkkaContext context, String applicati
}

protected void processLogEventPack(ActorContext context, LogEventPackMessage message) {
LOG.debug("[{}] Processing a log event pack with {} appenders", applicationToken, logAppenders.size());
LOG.debug("[{}] Processing a log event pack with {} appenders",
applicationToken, logAppenders.size());
fetchSchemas(message);
LogSchema logSchema = message.getLogSchema();
List<LogAppender> required = filterAppenders(logSchema.getVersion(), true);
Expand All @@ -106,15 +110,18 @@ protected void processLogEventPack(ActorContext context, LogEventPackMessage mes
} else {
LogDeliveryCallback callback;
if (required.size() == 1) {
callback = new SingleLogDeliveryCallback(message.getOriginator(), message.getRequestId());
callback = new SingleLogDeliveryCallback(
message.getOriginator(), message.getRequestId());
} else {
callback = new MultiLogDeliveryCallback(message.getOriginator(), message.getRequestId(), required.size());
callback = new MultiLogDeliveryCallback(
message.getOriginator(), message.getRequestId(), required.size());
}
required.forEach(appender -> {
try {
appender.doAppend(message.getLogEventPack(), callback);
} catch (Exception cause) {
String text = String.format("Failed to append logs using [%s] (ID: %s)", appender.getName(), appender.getAppenderId());
String text = String.format("Failed to append logs using [%s] (ID: %s)",
appender.getName(), appender.getAppenderId());
LOG.warn(text, cause);
sendErrorMessageToEndpoint(message, LogDeliveryErrorCode.APPENDER_INTERNAL_ERROR);
}
Expand Down Expand Up @@ -149,7 +156,8 @@ public List<LogAppender> filterAppenders(int schemaVersion, boolean confirmDeliv
if (result == null) {
result = new ArrayList<LogAppender>();
for (LogAppender appender : logAppenders.values()) {
if (appender.isSchemaVersionSupported(schemaVersion) && appender.isDeliveryConfirmationRequired() == confirmDelivery) {
if (appender.isSchemaVersionSupported(schemaVersion)
&& appender.isDeliveryConfirmationRequired() == confirmDelivery) {
result.add(appender);
}
}
Expand All @@ -171,7 +179,8 @@ private void fetchSchemas(LogEventPackMessage message) {
EndpointProfileDataDto profileDto = logPack.getProfileDto();
ProfileInfo clientProfile = logPack.getClientProfile();
if (clientProfile == null) {
AppVersionKey key = new AppVersionKey(applicationToken, profileDto.getClientProfileVersion());
AppVersionKey key = new AppVersionKey(
applicationToken, profileDto.getClientProfileVersion());
BaseSchemaInfo schemaInfo = clientProfileSchemas.get(key);
if (schemaInfo == null) {
EndpointProfileSchemaDto profileSchema = cacheService.getProfileSchemaByAppAndVersion(key);
Expand All @@ -180,26 +189,33 @@ private void fetchSchemas(LogEventPackMessage message) {
schemaInfo = new BaseSchemaInfo(ctlSchemaDto.getId(), schema);
clientProfileSchemas.put(key, schemaInfo);
}
logPack.setClientProfile(new BaseProfileInfo(schemaInfo, profileDto.getClientProfileBody()));
logPack.setClientProfile(
new BaseProfileInfo(schemaInfo, profileDto.getClientProfileBody()));
}
ProfileInfo serverProfile = logPack.getServerProfile();
if (serverProfile == null) {
AppVersionKey key = new AppVersionKey(applicationToken, profileDto.getServerProfileVersion());
AppVersionKey key = new AppVersionKey(
applicationToken, profileDto.getServerProfileVersion());
BaseSchemaInfo schemaInfo = serverProfileSchemas.get(key);
if (schemaInfo == null) {
ServerProfileSchemaDto serverProfileSchema = cacheService.getServerProfileSchemaByAppAndVersion(key);
CTLSchemaDto ctlSchemaDto = cacheService.getCtlSchemaById(serverProfileSchema.getCtlSchemaId());
ServerProfileSchemaDto serverProfileSchema =
cacheService.getServerProfileSchemaByAppAndVersion(key);
CTLSchemaDto ctlSchemaDto = cacheService.getCtlSchemaById(
serverProfileSchema.getCtlSchemaId());
String schema = ctlService.flatExportAsString(ctlSchemaDto);
schemaInfo = new BaseSchemaInfo(ctlSchemaDto.getId(), schema);
serverProfileSchemas.put(key, schemaInfo);
}
logPack.setServerProfile(new BaseProfileInfo(schemaInfo, profileDto.getServerProfileBody()));
logPack.setServerProfile(
new BaseProfileInfo(schemaInfo, profileDto.getServerProfileBody()));
}
}

protected void sendErrorMessageToEndpoint(LogEventPackMessage message, LogDeliveryErrorCode errorCode) {
protected void sendErrorMessageToEndpoint(LogEventPackMessage message,
LogDeliveryErrorCode errorCode) {
if (message.getOriginator() != null) {
message.getOriginator().tell(new LogDeliveryMessage(message.getRequestId(), false, errorCode), ActorRef.noSender());
message.getOriginator().tell(new LogDeliveryMessage(
message.getRequestId(), false, errorCode), ActorRef.noSender());
} else {
LOG.warn("[{}] Can't send error message to unknown originator.", applicationToken);
}
Expand All @@ -220,36 +236,43 @@ protected void processLogAppenderNotification(Notification notification) {
addLogAppender(appenderId);
break;
default:
LOG.debug("[{}][{}] Operation [{}] is not supported.", applicationToken, appenderId, notification.getOp());
LOG.debug("[{}][{}] Operation [{}] is not supported.",
applicationToken, appenderId, notification.getOp());
}
}

protected void stop() {
for (LogAppender logAppender : logAppenders.values()) {
LOG.info("[{}] Closing appender [{}] with name {}", applicationToken, logAppender.getAppenderId(), logAppender.getName());
LOG.info("[{}] Closing appender [{}] with name {}",
applicationToken, logAppender.getAppenderId(), logAppender.getName());
logAppender.close();
}
}

private void addLogAppender(String appenderId) {
LOG.info("[{}] Adding log appender with id [{}].", applicationId, appenderId);
LOG.info("[{}] Adding log appender with id [{}].",
applicationId, appenderId);
if (!logAppenders.containsKey(appenderId)) {
LogAppender logAppender = logAppenderService.getApplicationAppender(appenderId);
if (logAppender != null) {
addAppender(appenderId, logAppender);
LOG.info("[{}] Log appender [{}] registered.", applicationId, appenderId);
LOG.info("[{}] Log appender [{}] registered.",
applicationId, appenderId);
}
} else {
LOG.info("[{}] Log appender [{}] is already registered.", applicationId, appenderId);
LOG.info("[{}] Log appender [{}] is already registered.",
applicationId, appenderId);
}
}

private void removeLogAppender(String appenderId) {
if (logAppenders.containsKey(appenderId)) {
LOG.info("[{}] Closing log appender with id [{}].", applicationToken, appenderId);
LOG.info("[{}] Closing log appender with id [{}].",
applicationToken, appenderId);
removeAppender(appenderId).close();
} else {
LOG.warn("[{}] Can't remove unregistered appender with id [{}]", applicationToken, appenderId);
LOG.warn("[{}] Can't remove unregistered appender with id [{}]",
applicationToken, appenderId);
}
}

Expand Down
Expand Up @@ -90,16 +90,20 @@ public TopicActor(NotificationDeltaService notificationService) {
* @param calendar the calendar
* @return the list
*/
public static List<NotificationDto> filterMap(SortedMap<Integer, NotificationDto> pendingNotificationMap, int systemNfSchemaVersion,
int userNfSchemaVersion, Calendar calendar) {
public static List<NotificationDto> filterMap(
SortedMap<Integer, NotificationDto> pendingNotificationMap,
int systemNfSchemaVersion,
int userNfSchemaVersion,
Calendar calendar) {
List<NotificationDto> pendingNotifications = new ArrayList<>(pendingNotificationMap.size());

long now = calendar.getTimeInMillis();

List<NotificationDto> expiredNotifications = null;
for (NotificationDto dto : pendingNotificationMap.values()) {
LOG.trace("Filtering notification {} using system schema version {} and user schema version {}", dto, systemNfSchemaVersion,
userNfSchemaVersion);
LOG.trace("Filtering notification {} using system schema version "
+ "{} and user schema version {}",
dto, systemNfSchemaVersion, userNfSchemaVersion);
Date date = dto.getExpiredAt();
if (date != null && date.getTime() > now) {
if (isSchemaVersionMatch(dto, systemNfSchemaVersion, userNfSchemaVersion)) {
Expand All @@ -110,12 +114,14 @@ public static List<NotificationDto> filterMap(SortedMap<Integer, NotificationDto
expiredNotifications = new ArrayList<>();
}
expiredNotifications.add(dto);
LOG.trace("Detected expired notification: {}, nfTime: {}, curTime: {}", dto, date == null ? date : date.getTime(), now);
LOG.trace("Detected expired notification: {}, nfTime: {}, curTime: {}",
dto, date == null ? date : date.getTime(), now);
}
}

if (expiredNotifications != null) {
LOG.trace("Removing {} notifications from pendingNotificationMap", expiredNotifications.size());
LOG.trace("Removing {} notifications from pendingNotificationMap",
expiredNotifications.size());
pendingNotificationMap.values().removeAll(expiredNotifications);
}
return pendingNotifications;
Expand All @@ -129,7 +135,9 @@ public static List<NotificationDto> filterMap(SortedMap<Integer, NotificationDto
* @param userNfVersion the user nf version
* @return true, if is schema version match
*/
public static boolean isSchemaVersionMatch(NotificationDto notificationDto, int systemNfVersion, int userNfVersion) {
public static boolean isSchemaVersionMatch(NotificationDto notificationDto,
int systemNfVersion,
int userNfVersion) {
if (notificationDto.getType() == NotificationTypeDto.SYSTEM) {
return notificationDto.getNfVersion() == systemNfVersion;
} else if (notificationDto.getType() == NotificationTypeDto.USER) {
Expand Down Expand Up @@ -168,18 +176,22 @@ public void onReceive(Object message) throws Exception {
private void processEndpointRegistration(TopicSubscriptionMessage message) {
ActorRef endpointActor = message.getOriginator();
Integer seqNum = message.getSeqNumber();
SortedMap<Integer, NotificationDto> pendingNotificationMap = notificationCache.tailMap(seqNum, false);
SortedMap<Integer, NotificationDto> pendingNotificationMap = notificationCache.tailMap(
seqNum, false);
Calendar calendar = new GregorianCalendar(TimeZone.getTimeZone("UTC"));
List<NotificationDto> pendingNotifications = filterMap(pendingNotificationMap, message.getSystemNfSchemaVersion(),
List<NotificationDto> pendingNotifications = filterMap(
pendingNotificationMap, message.getSystemNfSchemaVersion(),
message.getUserNfSchemaVersion(), calendar);
if (!pendingNotifications.isEmpty()) {
LOG.debug("Detected new messages during endpoint subscription!");
NotificationMessage notificationMessage = NotificationMessage.fromNotifications(pendingNotifications);
NotificationMessage notificationMessage = NotificationMessage.fromNotifications(
pendingNotifications);
endpointActor.tell(notificationMessage, self());
} else {
LOG.debug("No new messages detected. Subscribing endpoint actor to topic actor");
String endpointKey = message.getOriginator().path().name();
ActorInfo actorInfo = new ActorInfo(endpointActor, message.getSystemNfSchemaVersion(), message.getUserNfSchemaVersion());
ActorInfo actorInfo = new ActorInfo(
endpointActor, message.getSystemNfSchemaVersion(), message.getUserNfSchemaVersion());
if (endpointSessions.put(endpointKey, actorInfo) != null) {
LOG.warn("Detected duplication of registration message: {}", message);
}
Expand Down Expand Up @@ -208,10 +220,13 @@ private void broadcastToAllEndpoints(ThriftNotificationMessage message) {
LOG.warn("Can't find notification by id {}. Probably it has already expired!");
} else {
notificationCache.put(notificationDto.getSecNum(), notificationDto);
LOG.debug("[{}] Put notification to topic actor cache {}", notificationDto.getTopicId(), notificationDto);
NotificationMessage notificationMessage = NotificationMessage.fromNotifications(Collections.singletonList(notificationDto));
LOG.debug("[{}] Put notification to topic actor cache {}",
notificationDto.getTopicId(), notificationDto);
NotificationMessage notificationMessage = NotificationMessage.fromNotifications(
Collections.singletonList(notificationDto));
for (ActorInfo endpoint : endpointSessions.values()) {
if (isSchemaVersionMatch(notificationDto, endpoint.getSystemNfVersion(), endpoint.getUserNfVersion())) {
if (isSchemaVersionMatch(
notificationDto, endpoint.getSystemNfVersion(), endpoint.getUserNfVersion())) {
endpoint.getActorRef().tell(notificationMessage, self());
}
}
Expand Down

0 comments on commit 993a6ad

Please sign in to comment.