Skip to content

Commit

Permalink
Fixing NPE issues when accessing endpoint on message task
Browse files Browse the repository at this point in the history
getEndpoint is renamed to initEndpoint to make it clearer that
it should not be called besides the constructor.

All accesses to endpoint is made via `endpoint` field instead of
`getEndpoint` method.

`endpoint` field being null is already handled in
AbstractMessageTask.run. And it is quaranteed to be non-null for
any MessageTask .

fixes hazelcast#13119
  • Loading branch information
sancar committed May 21, 2018
1 parent 2e2c694 commit ac96902
Show file tree
Hide file tree
Showing 46 changed files with 28 additions and 69 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@

package com.hazelcast.client.impl.protocol.task;

import com.hazelcast.client.ClientEndpoint;
import com.hazelcast.client.impl.operations.OperationFactoryWrapper;
import com.hazelcast.client.impl.protocol.ClientMessage;
import com.hazelcast.instance.Node;
Expand All @@ -34,7 +33,6 @@ public AbstractAllPartitionsMessageTask(ClientMessage clientMessage, Node node,

@Override
protected void processMessage() throws Exception {
ClientEndpoint endpoint = getEndpoint();
OperationFactory operationFactory = new OperationFactoryWrapper(createOperationFactory(), endpoint.getUuid());
final InternalOperationService operationService = nodeEngine.getOperationService();
Map<Integer, Object> map = operationService.invokeOnAllPartitions(getServiceName(), operationFactory);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@

package com.hazelcast.client.impl.protocol.task;

import com.hazelcast.client.ClientEndpoint;
import com.hazelcast.client.impl.protocol.ClientMessage;
import com.hazelcast.core.ExecutionCallback;
import com.hazelcast.instance.Node;
Expand All @@ -32,7 +31,6 @@ protected AbstractInvocationMessageTask(ClientMessage clientMessage, Node node,

@Override
protected void processMessage() {
final ClientEndpoint endpoint = getEndpoint();
Operation op = prepareOperation();
op.setCallerUuid(endpoint.getUuid());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,22 +70,18 @@ protected AbstractMessageTask(ClientMessage clientMessage, Node node, Connection
this.connection = connection;
this.clientEngine = node.clientEngine;
this.endpointManager = clientEngine.getEndpointManager();
this.endpoint = getEndpoint();
this.endpoint = initEndpoint();
}

@SuppressWarnings("unchecked")
public <S> S getService(String serviceName) {
return (S) node.nodeEngine.getService(serviceName);
}

protected ClientEndpoint getEndpoint() {
protected ClientEndpoint initEndpoint() {
return endpointManager.getEndpoint(connection);
}

protected int getClientVersion() {
return getEndpoint().getClientVersion();
}

protected abstract P decodeClientMessage(ClientMessage clientMessage);

protected abstract ClientMessage encodeResponse(Object response);
Expand All @@ -101,7 +97,6 @@ public void run() {
if (isAuthenticationMessage()) {
initializeAndProcessMessage();
} else {
ClientEndpoint endpoint = getEndpoint();
if (endpoint == null) {
handleMissingEndpoint();
} else if (!endpoint.isAuthenticated()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@

package com.hazelcast.client.impl.protocol.task;

import com.hazelcast.client.ClientEndpoint;
import com.hazelcast.client.impl.operations.OperationFactoryWrapper;
import com.hazelcast.client.impl.protocol.ClientMessage;
import com.hazelcast.instance.Node;
Expand All @@ -35,7 +34,6 @@ protected AbstractMultiPartitionMessageTask(ClientMessage clientMessage, Node no

@Override
protected Object call() throws Exception {
ClientEndpoint endpoint = getEndpoint();
OperationFactory operationFactory = new OperationFactoryWrapper(createOperationFactory(), endpoint.getUuid());

final InternalOperationService operationService = nodeEngine.getOperationService();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@ public AddMembershipListenerMessageTask(ClientMessage clientMessage, Node node,
protected Object call() {
String serviceName = ClusterServiceImpl.SERVICE_NAME;
ClusterServiceImpl service = getService(serviceName);
ClientEndpoint endpoint = getEndpoint();
String registrationId = service.addMembershipListener(new MembershipListenerImpl(endpoint));
endpoint.addListenerDestroyAction(serviceName, serviceName, registrationId);
return registrationId;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,10 +71,10 @@ protected Object resolve(Object response) {
}

@Override
protected ClientEndpointImpl getEndpoint() {
protected ClientEndpoint initEndpoint() {
ClientEndpoint endpoint = endpointManager.getEndpoint(connection);
if (endpoint != null) {
return (ClientEndpointImpl) endpoint;
return endpoint;
}
return new ClientEndpointImpl(clientEngine, nodeEngine, connection);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ protected ClientMessage encodeResponse(Object response) {

@Override
protected Object call() throws Exception {
getEndpoint().setClientStatistics(parameters.stats);
endpoint.setClientStatistics(parameters.stats);
return null;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@ public CacheAddEntryListenerMessageTask(ClientMessage clientMessage, Node node,

@Override
protected Object call() {
ClientEndpoint endpoint = getEndpoint();
final CacheService service = getService(CacheService.SERVICE_NAME);
CacheEntryListener cacheEntryListener = new CacheEntryListener(endpoint, this);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ public CacheAddNearCacheInvalidationListenerTask(ClientMessage clientMessage, No

@Override
protected Object call() {
ClientEndpoint endpoint = getEndpoint();
CacheService cacheService = getService(CacheService.SERVICE_NAME);
CacheContext cacheContext = cacheService.getOrCreateCacheContext(parameters.name);
NearCacheInvalidationListener listener
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import com.hazelcast.cache.impl.event.CachePartitionLostEventFilter;
import com.hazelcast.cache.impl.event.CachePartitionLostListener;
import com.hazelcast.cache.impl.event.InternalCachePartitionLostListenerAdapter;
import com.hazelcast.client.ClientEndpoint;
import com.hazelcast.client.impl.protocol.ClientMessage;
import com.hazelcast.client.impl.protocol.codec.CacheAddPartitionLostListenerCodec;
import com.hazelcast.client.impl.protocol.task.AbstractCallableMessageTask;
Expand All @@ -44,8 +43,6 @@ public CacheAddPartitionLostListenerMessageTask(ClientMessage clientMessage, Nod

@Override
protected Object call() {
final ClientEndpoint endpoint = getEndpoint();

CachePartitionLostListener listener = new CachePartitionLostListener() {
@Override
public void partitionLost(CachePartitionLostEvent event) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ protected void processMessage() {
}

private CacheConfig extractCacheConfigFromMessage() {
int clientVersion = getClientVersion();
int clientVersion = endpoint.getClientVersion();
if (BuildInfo.UNKNOWN_HAZELCAST_VERSION == clientVersion) {
boolean compatibilityEnabled = nodeEngine.getProperties().getBoolean(GroupProperty.COMPATIBILITY_3_6_CLIENT_ENABLED);
if (compatibilityEnabled) {
Expand Down Expand Up @@ -135,7 +135,7 @@ public void onFailure(Throwable t) {

private Data serializeCacheConfig(Object response) {
Data responseData = null;
if (BuildInfo.UNKNOWN_HAZELCAST_VERSION == getClientVersion()) {
if (BuildInfo.UNKNOWN_HAZELCAST_VERSION == endpoint.getClientVersion()) {
boolean compatibilityEnabled = nodeEngine.getProperties().getBoolean(GroupProperty.COMPATIBILITY_3_6_CLIENT_ENABLED);
if (compatibilityEnabled) {
responseData = nodeEngine.toData(response == null ? null : new LegacyCacheConfig((CacheConfig) response));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ public String getDistributedObjectName() {

private Data serializeCacheConfig(Object response) {
Data responseData = null;
if (BuildInfo.UNKNOWN_HAZELCAST_VERSION == getClientVersion()) {
if (BuildInfo.UNKNOWN_HAZELCAST_VERSION == endpoint.getClientVersion()) {
boolean compatibilityEnabled = nodeEngine.getProperties().getBoolean(GroupProperty.COMPATIBILITY_3_6_CLIENT_ENABLED);
if (compatibilityEnabled) {
responseData = nodeEngine.toData(response == null ? null : new LegacyCacheConfig((CacheConfig) response));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ public Pre38CacheAddInvalidationListenerTask(ClientMessage clientMessage, Node n

@Override
protected Object call() {
ClientEndpoint endpoint = getEndpoint();
CacheService cacheService = getService(CacheService.SERVICE_NAME);
CacheContext cacheContext = cacheService.getOrCreateCacheContext(parameters.name);
String uuid = nodeEngine.getLocalMember().getUuid();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ protected Operation prepareOperation() {
Data callableData = parameters.callable;
if (securityContext != null) {
Callable callable = serializationService.toObject(parameters.callable);
Subject subject = getEndpoint().getSubject();
Subject subject = endpoint.getSubject();
callable = securityContext.createSecureCallable(subject, callable);
callableData = serializationService.toData(callable);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ protected Operation prepareOperation() {
SecurityContext securityContext = clientEngine.getSecurityContext();
Data callableData = parameters.callable;
if (securityContext != null) {
Subject subject = getEndpoint().getSubject();
Subject subject = endpoint.getSubject();
Callable callable = serializationService.toObject(parameters.callable);
callable = securityContext.createSecureCallable(subject, callable);
callableData = serializationService.toData(callable);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ protected Operation prepareOperation() {
SecurityContext securityContext = clientEngine.getSecurityContext();
Data callableData = parameters.callable;
if (securityContext != null) {
Subject subject = getEndpoint().getSubject();
Subject subject = endpoint.getSubject();
Callable callable = serializationService.toObject(parameters.callable);
callable = securityContext.createSecureCallable(subject, callable);
callableData = serializationService.toData(callable);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@ public ListAddListenerMessageTask(ClientMessage clientMessage, Node node, Connec

@Override
protected Object call() {
ClientEndpoint endpoint = getEndpoint();
Data partitionKey = serializationService.toData(parameters.name);
ItemListener listener = createItemListener(endpoint, partitionKey);
EventService eventService = clientEngine.getEventService();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@

package com.hazelcast.client.impl.protocol.task.map;

import com.hazelcast.client.ClientEndpoint;
import com.hazelcast.client.impl.protocol.ClientMessage;
import com.hazelcast.client.impl.protocol.task.AbstractCallableMessageTask;
import com.hazelcast.core.EntryEvent;
Expand Down Expand Up @@ -44,7 +43,6 @@ public AbstractMapAddEntryListenerMessageTask(ClientMessage clientMessage, Node

@Override
protected Object call() {
final ClientEndpoint endpoint = getEndpoint();
final MapService mapService = getService(MapService.SERVICE_NAME);

Object listener = newMapListener();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@ public MapAddListenerMessageTask(ClientMessage clientMessage, Node node, Connect

@Override
protected Object call() throws Exception {
ClientEndpoint endpoint = getEndpoint();
return registerListener(endpoint, this);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@

package com.hazelcast.client.impl.protocol.task.map;

import com.hazelcast.client.ClientEndpoint;
import com.hazelcast.client.impl.protocol.ClientMessage;
import com.hazelcast.client.impl.protocol.codec.MapAddPartitionLostListenerCodec;
import com.hazelcast.client.impl.protocol.task.AbstractCallableMessageTask;
Expand All @@ -41,7 +40,6 @@ public MapAddPartitionLostListenerMessageTask(ClientMessage clientMessage, Node

@Override
protected Object call() {
final ClientEndpoint endpoint = getEndpoint();
final MapService mapService = getService(MapService.SERVICE_NAME);

final MapPartitionLostListener listener = new MapPartitionLostListener() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@

package com.hazelcast.client.impl.protocol.task.map;

import com.hazelcast.client.ClientEndpoint;
import com.hazelcast.client.impl.operations.OperationFactoryWrapper;
import com.hazelcast.client.impl.protocol.ClientMessage;
import com.hazelcast.client.impl.protocol.codec.MapExecuteWithPredicateCodec;
Expand Down Expand Up @@ -54,7 +53,6 @@ public MapExecuteWithPredicateMessageTask(ClientMessage clientMessage, Node node

@Override
protected Object call() throws Exception {
ClientEndpoint endpoint = getEndpoint();
InternalOperationService operationService = nodeEngine.getOperationService();

Predicate predicate = serializationService.toObject(parameters.predicate);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@

package com.hazelcast.client.impl.protocol.task.map;

import com.hazelcast.client.ClientEndpoint;
import com.hazelcast.client.impl.protocol.ClientMessage;
import com.hazelcast.client.impl.protocol.codec.ContinuousQueryPublisherCreateCodec;
import com.hazelcast.client.impl.protocol.task.AbstractCallableMessageTask;
Expand Down Expand Up @@ -69,7 +68,6 @@ protected Object call() throws Exception {

private void createInvocations(Collection<MemberImpl> members, List<Future> futures) {
final InternalOperationService operationService = nodeEngine.getOperationService();
final ClientEndpoint endpoint = getEndpoint();
for (MemberImpl member : members) {
Predicate predicate = serializationService.toObject(parameters.predicate);
AccumulatorInfo accumulatorInfo =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@

package com.hazelcast.client.impl.protocol.task.map;

import com.hazelcast.client.ClientEndpoint;
import com.hazelcast.client.impl.protocol.ClientMessage;
import com.hazelcast.client.impl.protocol.codec.ContinuousQueryPublisherCreateWithValueCodec;
import com.hazelcast.client.impl.protocol.task.AbstractCallableMessageTask;
Expand Down Expand Up @@ -70,7 +69,6 @@ protected Object call() throws Exception {

private void createInvocations(Collection<MemberImpl> members, List<Future> futures) {
final InternalOperationService operationService = nodeEngine.getOperationService();
final ClientEndpoint endpoint = getEndpoint();
for (MemberImpl member : members) {
Predicate predicate = serializationService.toObject(parameters.predicate);
AccumulatorInfo accumulatorInfo =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@

package com.hazelcast.client.impl.protocol.task.multimap;

import com.hazelcast.client.ClientEndpoint;
import com.hazelcast.client.impl.protocol.ClientMessage;
import com.hazelcast.client.impl.protocol.task.AbstractCallableMessageTask;
import com.hazelcast.core.EntryAdapter;
Expand All @@ -41,7 +40,6 @@ public AbstractMultiMapAddEntryListenerMessageTask(ClientMessage clientMessage,

@Override
protected Object call() throws Exception {
final ClientEndpoint endpoint = getEndpoint();
final MultiMapService service = getService(MultiMapService.SERVICE_NAME);
EntryAdapter listener = new MultiMapListener();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@

package com.hazelcast.client.impl.protocol.task.queue;

import com.hazelcast.client.ClientEndpoint;
import com.hazelcast.client.impl.protocol.ClientMessage;
import com.hazelcast.client.impl.protocol.codec.QueueAddListenerCodec;
import com.hazelcast.client.impl.protocol.task.AbstractCallableMessageTask;
Expand Down Expand Up @@ -45,7 +44,6 @@ public QueueAddListenerMessageTask(ClientMessage clientMessage, Node node, Conne

@Override
protected Object call() {
final ClientEndpoint endpoint = getEndpoint();
final QueueService service = getService(QueueService.SERVICE_NAME);
final Data partitionKey = serializationService.toData(parameters.name);
ItemListener listener = new ItemListener() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@ public SetAddListenerMessageTask(ClientMessage clientMessage, Node node, Connect

@Override
protected Object call() {
ClientEndpoint endpoint = getEndpoint();
Data partitionKey = serializationService.toData(parameters.name);
ItemListener listener = createItemListener(endpoint, partitionKey);
EventService eventService = clientEngine.getEventService();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@

package com.hazelcast.client.impl.protocol.task.topic;

import com.hazelcast.client.ClientEndpoint;
import com.hazelcast.client.impl.protocol.ClientMessage;
import com.hazelcast.client.impl.protocol.codec.TopicAddMessageListenerCodec;
import com.hazelcast.client.impl.protocol.task.AbstractCallableMessageTask;
Expand Down Expand Up @@ -50,7 +49,6 @@ public TopicAddMessageListenerMessageTask(ClientMessage clientMessage, Node node
protected Object call() throws Exception {
partitionKey = serializationService.toData(parameters.name);
TopicService service = getService(TopicService.SERVICE_NAME);
ClientEndpoint endpoint = getEndpoint();
String registrationId = service.addMessageListener(parameters.name, this, parameters.localOnly);
endpoint.addListenerDestroyAction(TopicService.SERVICE_NAME, parameters.name, registrationId);
return registrationId;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@

package com.hazelcast.client.impl.protocol.task.transaction;

import com.hazelcast.client.ClientEndpoint;
import com.hazelcast.client.impl.protocol.ClientMessage;
import com.hazelcast.client.impl.protocol.codec.XATransactionClearRemoteCodec;
import com.hazelcast.client.impl.protocol.task.AbstractCallableMessageTask;
Expand Down Expand Up @@ -56,7 +55,6 @@ protected ClientMessage encodeResponse(Object response) {
protected Object call() throws Exception {
InternalOperationService operationService = nodeEngine.getOperationService();
InternalPartitionService partitionService = nodeEngine.getPartitionService();
ClientEndpoint endpoint = getEndpoint();

Data xidData = serializationService.toData(parameters.xid);
Operation op = new ClearRemoteTransactionOperation(xidData);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@

package com.hazelcast.client.impl.protocol.task.transaction;

import com.hazelcast.client.ClientEndpoint;
import com.hazelcast.client.impl.protocol.ClientMessage;
import com.hazelcast.client.impl.protocol.codec.XATransactionCreateCodec;
import com.hazelcast.client.impl.protocol.task.AbstractCallableMessageTask;
Expand All @@ -38,7 +37,6 @@ public XATransactionCreateMessageTask(ClientMessage clientMessage, Node node, Co

@Override
protected Object call() throws Exception {
ClientEndpoint endpoint = getEndpoint();
XAService xaService = getService(getServiceName());
String ownerUuid = endpoint.getUuid();
TransactionContext context = xaService.newXATransactionContext(parameters.xid, ownerUuid, (int) parameters.timeout, true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ public TransactionalMapContainsKeyMessageTask(ClientMessage clientMessage, Node

@Override
protected Object innerCall() throws Exception {
final TransactionContext context = getEndpoint().getTransactionContext(parameters.txnId);
final TransactionContext context = endpoint.getTransactionContext(parameters.txnId);
final TransactionalMap map = context.getMap(parameters.name);
return map.containsKey(parameters.key);
}
Expand Down
Loading

0 comments on commit ac96902

Please sign in to comment.