Skip to content

Commit

Permalink
Fixed some warns.
Browse files Browse the repository at this point in the history
  • Loading branch information
Acarus committed Oct 5, 2016
1 parent 4e11ae4 commit d95c886
Show file tree
Hide file tree
Showing 12 changed files with 160 additions and 4 deletions.
Expand Up @@ -98,6 +98,11 @@ private UserVerifier createUserVerifier(UserVerifierDto verifierDto) throws Exce
} }
} }


/**
* Verifies a user.
*
* @param message user verification request message
*/
public void verifyUser(UserVerificationRequestMessage message) { public void verifyUser(UserVerificationRequestMessage message) {
UserVerifier verifier = userVerifiers.get(message.getVerifierId()); UserVerifier verifier = userVerifiers.get(message.getVerifierId());
if (verifier != null) { if (verifier != null) {
Expand All @@ -111,6 +116,11 @@ public void verifyUser(UserVerificationRequestMessage message) {
} }
} }


/**
* Process a notification.
*
* @param notification notification
*/
public void processNotification(Notification notification) { public void processNotification(Notification notification) {
LOG.debug("Process user verifier notification [{}]", notification); LOG.debug("Process user verifier notification [{}]", notification);
String verifierToken = notification.getUserVerifierToken(); String verifierToken = notification.getUserVerifierToken();
Expand Down
Expand Up @@ -78,6 +78,12 @@ public long getInactivityTimeout() {
return inactivityTimeout; return inactivityTimeout;
} }


/**
* Process an actor timeout message.
*
* @param context actor context
* @param message actor timeout message
*/
public void processActorTimeoutMessage(ActorContext context, ActorTimeoutMessage message) { public void processActorTimeoutMessage(ActorContext context, ActorTimeoutMessage message) {
if (state.getLastActivityTime() <= message.getLastActivityTime()) { if (state.getLastActivityTime() <= message.getLastActivityTime()) {
LOG.debug("[{}][{}] Request stop of endpoint actor due to inactivity timeout", LOG.debug("[{}][{}] Request stop of endpoint actor due to inactivity timeout",
Expand All @@ -94,6 +100,12 @@ protected void tellActor(ActorContext context, ActorRef target, Object message)
target.tell(message, context.self()); target.tell(message, context.self());
} }


/**
* Process an endpoint actor message.
*
* @param context actor context
* @param msg endpoint actor message
*/
public void processEndpointActorMsg(ActorContext context, EndpointActorMsg msg) { public void processEndpointActorMsg(ActorContext context, EndpointActorMsg msg) {
if (msg instanceof ThriftEndpointActorMsg) { if (msg instanceof ThriftEndpointActorMsg) {
processThriftMsg(context, (ThriftEndpointActorMsg<?>) msg); processThriftMsg(context, (ThriftEndpointActorMsg<?>) msg);
Expand Down
Expand Up @@ -22,6 +22,9 @@ public abstract class AbstractEndpointActorState {
protected final String actorKey; protected final String actorKey;
private long lastActivityTime; private long lastActivityTime;


/**
* All-args constructor.
*/
public AbstractEndpointActorState(String endpointKey, String actorKey) { public AbstractEndpointActorState(String endpointKey, String actorKey) {
super(); super();
this.endpointKey = endpointKey; this.endpointKey = endpointKey;
Expand Down
Expand Up @@ -49,6 +49,9 @@ public class GlobalEndpointActorMessageProcessor
private final ClusterService clusterService; private final ClusterService clusterService;
private final OperationsService operationsService; private final OperationsService operationsService;


/**
* All-args constructor.
*/
public GlobalEndpointActorMessageProcessor(AkkaContext context, String appToken, public GlobalEndpointActorMessageProcessor(AkkaContext context, String appToken,
EndpointObjectHash key, String actorKey) { EndpointObjectHash key, String actorKey) {
super(new GlobalEndpointActorState(Base64Util.encode(key.getData()), actorKey), super(new GlobalEndpointActorState(Base64Util.encode(key.getData()), actorKey),
Expand All @@ -62,6 +65,11 @@ public GlobalEndpointActorMessageProcessor(AkkaContext context, String appToken,
routes = new RouteTable<>(nodeId); routes = new RouteTable<>(nodeId);
} }


/**
* Process an endpoint route message.
*
* @param message endpoint route message
*/
public void processRouteMessage(EndpointRouteMessage message) { public void processRouteMessage(EndpointRouteMessage message) {
LOG.debug("[{}] Processing {} operation for address {}", LOG.debug("[{}] Processing {} operation for address {}",
endpointKey, message.getOperation(), message.getAddress()); endpointKey, message.getOperation(), message.getAddress());
Expand All @@ -78,6 +86,11 @@ public void processRouteMessage(EndpointRouteMessage message) {
} }
} }


/**
* Process a cluster update.
*
* @param context actor context
*/
public void processClusterUpdate(ActorContext context) { public void processClusterUpdate(ActorContext context) {
if (!clusterService.isMainEntityNode(key)) { if (!clusterService.isMainEntityNode(key)) {
LOG.debug("[{}] No longer a global endpoint node for {}", endpointKey); LOG.debug("[{}] No longer a global endpoint node for {}", endpointKey);
Expand Down
Expand Up @@ -108,7 +108,9 @@ public class LocalEndpointActorMessageProcessor


private final Map<UUID, UserVerificationResponseMessage> userAttachResponseMap; private final Map<UUID, UserVerificationResponseMessage> userAttachResponseMap;



/**
* All-args constructor.
*/
public LocalEndpointActorMessageProcessor(AkkaContext context, public LocalEndpointActorMessageProcessor(AkkaContext context,
String appToken, String appToken,
EndpointObjectHash key, EndpointObjectHash key,
Expand All @@ -123,10 +125,22 @@ public LocalEndpointActorMessageProcessor(AkkaContext context,
this.userAttachResponseMap = new LinkedHashMap<>(); this.userAttachResponseMap = new LinkedHashMap<>();
} }


/**
* Process an endpoint sync.
*
* @param context actor context
* @param message sync request message
*/
public void processEndpointSync(ActorContext context, SyncRequestMessage message) { public void processEndpointSync(ActorContext context, SyncRequestMessage message) {
sync(context, message); sync(context, message);
} }


/**
* Process an endpoint event receive message.
*
* @param context actor context
* @param message endpoint event receive message
*/
public void processEndpointEventReceiveMessage(ActorContext context, public void processEndpointEventReceiveMessage(ActorContext context,
EndpointEventReceiveMessage message) { EndpointEventReceiveMessage message) {
EndpointEventDeliveryMessage response; EndpointEventDeliveryMessage response;
Expand All @@ -153,6 +167,11 @@ public void processEndpointEventReceiveMessage(ActorContext context,
tellParent(context, response); tellParent(context, response);
} }


/**
* Process a thrift notification.
*
* @param context actor context.
*/
public void processThriftNotification(ActorContext context) { public void processThriftNotification(ActorContext context) {
Set<ChannelMetaData> channels = state.getChannelsByTypes( Set<ChannelMetaData> channels = state.getChannelsByTypes(
TransportType.CONFIGURATION, TransportType.NOTIFICATION); TransportType.CONFIGURATION, TransportType.NOTIFICATION);
Expand All @@ -161,6 +180,12 @@ public void processThriftNotification(ActorContext context) {
syncChannels(context, channels, true, true); syncChannels(context, channels, true, true);
} }


/**
* Process a user configuration update message.
*
* @param context actor context
* @param message endpoint user configuration update message
*/
public void processUserConfigurationUpdate(ActorContext context, public void processUserConfigurationUpdate(ActorContext context,
EndpointUserConfigurationUpdateMessage message) { EndpointUserConfigurationUpdateMessage message) {
if (message.getUserConfigurationUpdate() != null) { if (message.getUserConfigurationUpdate() != null) {
Expand Down Expand Up @@ -210,6 +235,12 @@ private void processEndpointDeregistrationMessage(
} }
} }


/**
* Process a notification message.
*
* @param context actor context
* @param message notification message
*/
public void processNotification(ActorContext context, NotificationMessage message) { public void processNotification(ActorContext context, NotificationMessage message) {
LOG.debug("[{}][{}] Processing notification message {}", endpointKey, actorKey, message); LOG.debug("[{}][{}] Processing notification message {}", endpointKey, actorKey, message);


Expand Down Expand Up @@ -242,6 +273,12 @@ public void processNotification(ActorContext context, NotificationMessage messag
} }
} }


/**
* Process a request timeout message.
*
* @param context actor context
* @param message request timeout message
*/
public void processRequestTimeoutMessage(ActorContext context, RequestTimeoutMessage message) { public void processRequestTimeoutMessage(ActorContext context, RequestTimeoutMessage message) {
ChannelMetaData channel = state.getChannelByRequestId(message.getRequestId()); ChannelMetaData channel = state.getChannelByRequestId(message.getRequestId());
if (channel != null) { if (channel != null) {
Expand Down Expand Up @@ -338,7 +375,7 @@ private SyncContext sync(ClientSync request) throws GetDeltaException {
if (context.getStatus() != SyncStatus.SUCCESS) { if (context.getStatus() != SyncStatus.SUCCESS) {
return context; return context;
} }
if (state.isUcfHashRequiresIntialization()) { if (state.isUcfHashRequiresInitialization()) {
byte[] hash = operationsService.fetchUcfHash(appToken, state.getProfile()); byte[] hash = operationsService.fetchUcfHash(appToken, state.getProfile());
LOG.debug("[{}][{}] Initialized endpoint user configuration hash {}", LOG.debug("[{}][{}] Initialized endpoint user configuration hash {}",
endpointKey, context.getRequestHash(), endpointKey, context.getRequestHash(),
Expand Down Expand Up @@ -779,6 +816,12 @@ protected void sendEventsIfPresent(ActorContext context, EventClientSync request
} }
} }


/**
* Process an endpoint user action message.
*
* @param context actor context
* @param message endpoint user action message
*/
public void processEndpointUserActionMessage(ActorContext context, public void processEndpointUserActionMessage(ActorContext context,
EndpointUserActionMessage message) { EndpointUserActionMessage message) {
Set<ChannelMetaData> eventChannels = state.getChannelsByTypes( Set<ChannelMetaData> eventChannels = state.getChannelsByTypes(
Expand Down Expand Up @@ -836,6 +879,13 @@ public void processEndpointUserActionMessage(ActorContext context,
} }
} }


/**
* Process disconnect message.
*
* @param context actor context
* @param message channel aware message
* @return true if channel is disconnected otherwise false
*/
public boolean processDisconnectMessage(ActorContext context, ChannelAware message) { public boolean processDisconnectMessage(ActorContext context, ChannelAware message) {
LOG.debug("[{}][{}] Received disconnect message for channel [{}]", LOG.debug("[{}][{}] Received disconnect message for channel [{}]",
endpointKey, actorKey, message.getChannelUuid()); endpointKey, actorKey, message.getChannelUuid());
Expand All @@ -851,6 +901,13 @@ public boolean processDisconnectMessage(ActorContext context, ChannelAware messa
} }
} }


/**
* Process a ping message.
*
* @param context actor context
* @param message channel aware message
* @return true if channel is found otherwise false
*/
public boolean processPingMessage(ActorContext context, ChannelAware message) { public boolean processPingMessage(ActorContext context, ChannelAware message) {
LOG.debug("[{}][{}] Received ping message for channel [{}]", LOG.debug("[{}][{}] Received ping message for channel [{}]",
endpointKey, actorKey, message.getChannelUuid()); endpointKey, actorKey, message.getChannelUuid());
Expand All @@ -870,6 +927,14 @@ public boolean processPingMessage(ActorContext context, ChannelAware message) {
} }
} }


/**
*
* Process a timeout message.
*
* @param context actor context
* @param message channel timeout message
* @return true if channel is removed otherwise false
*/
public boolean processChannelTimeoutMessage(ActorContext context, public boolean processChannelTimeoutMessage(ActorContext context,
ChannelTimeoutMessage message) { ChannelTimeoutMessage message) {
LOG.debug("[{}][{}] Received channel timeout message for channel [{}]", LOG.debug("[{}][{}] Received channel timeout message for channel [{}]",
Expand Down Expand Up @@ -900,6 +965,12 @@ public boolean processChannelTimeoutMessage(ActorContext context,
} }
} }


/**
* Process a log delivery message.
*
* @param context actor context
* @param message log delivery message
*/
public void processLogDeliveryMessage(ActorContext context, LogDeliveryMessage message) { public void processLogDeliveryMessage(ActorContext context, LogDeliveryMessage message) {
LOG.debug("[{}][{}] Received log delivery message for request [{}] with status {}", LOG.debug("[{}][{}] Received log delivery message for request [{}] with status {}",
endpointKey, actorKey, message.getRequestId(), endpointKey, actorKey, message.getRequestId(),
Expand All @@ -921,6 +992,12 @@ public void processLogDeliveryMessage(ActorContext context, LogDeliveryMessage m
logUploadResponseMap.clear(); logUploadResponseMap.clear();
} }


/**
* Process a user verification message.
*
* @param context actor context
* @param message user verification response message
*/
public void processUserVerificationMessage(ActorContext context, public void processUserVerificationMessage(ActorContext context,
UserVerificationResponseMessage message) { UserVerificationResponseMessage message) {
LOG.debug("[{}][{}] Received user verification message for request [{}] with status {}", LOG.debug("[{}][{}] Received user verification message for request [{}] with status {}",
Expand Down
Expand Up @@ -49,6 +49,9 @@ public class LocalEndpointActorState extends AbstractEndpointActorState {
private boolean ucfHashIntialized; private boolean ucfHashIntialized;
private byte[] ucfHash; private byte[] ucfHash;


/**
* All-args constructor.
*/
public LocalEndpointActorState(String endpointKey, String actorKey) { public LocalEndpointActorState(String endpointKey, String actorKey) {
super(endpointKey, actorKey); super(endpointKey, actorKey);
this.channelMap = new ChannelMap(endpointKey, actorKey); this.channelMap = new ChannelMap(endpointKey, actorKey);
Expand Down Expand Up @@ -171,12 +174,22 @@ public void setSubscriptionStates(Map<String, Integer> subscriptionStates) {
this.subscriptionStates = new HashMap<>(subscriptionStates); this.subscriptionStates = new HashMap<>(subscriptionStates);
} }


public boolean isUcfHashRequiresIntialization() { /**
* Returns whether hash requires initialization.
*
* @return true if hash requires initialization otherwise false
*/
public boolean isUcfHashRequiresInitialization() {
return isValidForUser() && !ucfHashIntialized; return isValidForUser() && !ucfHashIntialized;
} }


/**
* Returns whether user configuration update is pending.
*
* @return true if user configuration update is pending otherwise false
*/
public boolean isUserConfigurationUpdatePending() { public boolean isUserConfigurationUpdatePending() {
if (!isValidForUser() || isUcfHashRequiresIntialization()) { if (!isValidForUser() || isUcfHashRequiresInitialization()) {
return false; return false;
} }
return !Arrays.equals(ucfHash, endpointProfile.getUserConfigurationHash()); return !Arrays.equals(ucfHash, endpointProfile.getUserConfigurationHash());
Expand Down
Expand Up @@ -249,6 +249,11 @@ private NotificationClientSync diff(NotificationClientSync oldRequest,
} }
} }


/**
* Returns whether a transport a type is valid.
* @param type transport type
* @return true if type is valid otherwise false
*/
public boolean isValid(TransportType type) { public boolean isValid(TransportType type) {
switch (type) { switch (type) {
case EVENT: case EVENT:
Expand Down
Expand Up @@ -62,7 +62,9 @@ public List<EndpointEvent> getEndpointEvents() {
return events; return events;
} }


//CHECKSTYLE:OFF
public List<Event> getEvents() { public List<Event> getEvents() {
//CHECKSTYLE:ON
List<Event> result = new ArrayList<>(events.size()); List<Event> result = new ArrayList<>(events.size());
for (EndpointEvent event : events) { for (EndpointEvent event : events) {
result.add(event.getEvent()); result.add(event.getEvent());
Expand Down
Expand Up @@ -41,6 +41,9 @@ public EndpointUserConfigurationUpdate(String tenantId, String userId, String ap
this.hash = hash; this.hash = hash;
} }


/**
* All-args constructor.
*/
public static EndpointUserConfigurationUpdate fromThrift( public static EndpointUserConfigurationUpdate fromThrift(
org.kaaproject.kaa.server.common.thrift.gen.operations.EndpointStateUpdate notification) { org.kaaproject.kaa.server.common.thrift.gen.operations.EndpointStateUpdate notification) {
return new EndpointUserConfigurationUpdate(notification.getTenantId(), notification.getUserId(), return new EndpointUserConfigurationUpdate(notification.getTenantId(), notification.getUserId(),
Expand Down
Expand Up @@ -39,6 +39,9 @@ public UserConfigurationUpdate(String tenantId, String userId, String applicatio
this.hash = hash; this.hash = hash;
} }


/**
* All-args constructor.
*/
public static UserConfigurationUpdate fromThrift( public static UserConfigurationUpdate fromThrift(
org.kaaproject.kaa.server.common.thrift.gen.operations.UserConfigurationUpdate notification) { org.kaaproject.kaa.server.common.thrift.gen.operations.UserConfigurationUpdate notification) {
return new UserConfigurationUpdate(notification.getTenantId(), notification.getUserId(), return new UserConfigurationUpdate(notification.getTenantId(), notification.getUserId(),
Expand Down
Expand Up @@ -177,6 +177,12 @@ public boolean isDeliveryRequired(String serverId, RouteTableAddress address) {
return servers == null || !servers.contains(serverId); return servers == null || !servers.contains(serverId);
} }


/**
* Remove a local address.
*
* @param endpoint endpoint object hash
* @return removed address
*/
public RouteTableAddress removeLocal(EndpointObjectHash endpoint) { public RouteTableAddress removeLocal(EndpointObjectHash endpoint) {
clearRoutes(endpoint); clearRoutes(endpoint);
RouteTableAddress addressToRemove = null; RouteTableAddress addressToRemove = null;
Expand Down Expand Up @@ -252,6 +258,11 @@ private void clearReportedAddressMap(String serverId) {
} }
} }


/**
* Remove a route table address.
*
* @param address address
*/
public void removeByAddress(RouteTableAddress address) { public void removeByAddress(RouteTableAddress address) {
Set<Entry<RouteTableKey, Map<String, RouteTableAddress>>> entrySet = routes.entrySet(); Set<Entry<RouteTableKey, Map<String, RouteTableAddress>>> entrySet = routes.entrySet();
Iterator<Entry<RouteTableKey, Map<String, RouteTableAddress>>> iterator = entrySet.iterator(); Iterator<Entry<RouteTableKey, Map<String, RouteTableAddress>>> iterator = entrySet.iterator();
Expand Down
Expand Up @@ -70,6 +70,10 @@ public UpdateUuidsMigration(Connection connection, Options options) {
this.nosql = options.getNoSql(); this.nosql = options.getNoSql();
} }


/**
* Change encoding of uuids from Latin1 to Base64 in relational and NoSQL databases.
*
*/
public void transform() throws IOException, SQLException { public void transform() throws IOException, SQLException {
QueryRunner run = new QueryRunner(); QueryRunner run = new QueryRunner();
ResultSetHandler<List<Configuration>> rsHandler = new BeanListHandler<>(Configuration.class); ResultSetHandler<List<Configuration>> rsHandler = new BeanListHandler<>(Configuration.class);
Expand Down

0 comments on commit d95c886

Please sign in to comment.