Skip to content

Commit

Permalink
Initialise managed context for user-supplied objects
Browse files Browse the repository at this point in the history
Inject all managed contexts into user supplied objects. This includes
the Spring context and Node which were previously not injected. The
fix also avoids cloning the object to inject the dependencies as that
can cause a performance issue.

Fixes : hazelcast#11410
  • Loading branch information
Matko Medenjak committed Sep 22, 2017
1 parent 2156b8e commit 5ef3c93
Showing 1 changed file with 58 additions and 51 deletions.
109 changes: 58 additions & 51 deletions hazelcast/src/main/java/com/hazelcast/map/impl/proxy/MapProxyImpl.java
Expand Up @@ -23,7 +23,6 @@
import com.hazelcast.core.ExecutionCallback;
import com.hazelcast.core.HazelcastException;
import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.core.HazelcastInstanceAware;
import com.hazelcast.core.ICompletableFuture;
import com.hazelcast.core.IMap;
import com.hazelcast.internal.cluster.Versions;
Expand Down Expand Up @@ -223,7 +222,7 @@ public boolean remove(Object key, Object value) {
@Override
public void removeAll(Predicate<K, V> predicate) {
checkNotNull(predicate, "predicate cannot be null");
handleHazelcastInstanceAwareParams(predicate);
initializeManagedContext(predicate);

removeAllInternal(predicate);
}
Expand Down Expand Up @@ -394,7 +393,7 @@ public void forceUnlock(K key) {
@Override
public String addInterceptor(MapInterceptor interceptor) {
checkNotNull(interceptor, "Interceptor should not be null!");
handleHazelcastInstanceAwareParams(interceptor);
initializeManagedContext(interceptor);

return addMapInterceptorInternal(interceptor);
}
Expand All @@ -409,15 +408,15 @@ public void removeInterceptor(String id) {
@Override
public String addLocalEntryListener(MapListener listener) {
checkNotNull(listener, NULL_LISTENER_IS_NOT_ALLOWED);
handleHazelcastInstanceAwareParams(listener);
initializeManagedContext(listener);

return addLocalEntryListenerInternal(listener);
}

@Override
public String addLocalEntryListener(EntryListener listener) {
checkNotNull(listener, NULL_LISTENER_IS_NOT_ALLOWED);
handleHazelcastInstanceAwareParams(listener);
initializeManagedContext(listener);

return addLocalEntryListenerInternal(listener);
}
Expand All @@ -426,7 +425,8 @@ public String addLocalEntryListener(EntryListener listener) {
public String addLocalEntryListener(MapListener listener, Predicate<K, V> predicate, boolean includeValue) {
checkNotNull(listener, NULL_LISTENER_IS_NOT_ALLOWED);
checkNotNull(predicate, NULL_PREDICATE_IS_NOT_ALLOWED);
handleHazelcastInstanceAwareParams(listener, predicate);
initializeManagedContext(listener);
initializeManagedContext(predicate);

return addLocalEntryListenerInternal(listener, predicate, null, includeValue);
}
Expand All @@ -435,7 +435,8 @@ public String addLocalEntryListener(MapListener listener, Predicate<K, V> predic
public String addLocalEntryListener(EntryListener listener, Predicate<K, V> predicate, boolean includeValue) {
checkNotNull(listener, NULL_LISTENER_IS_NOT_ALLOWED);
checkNotNull(predicate, NULL_PREDICATE_IS_NOT_ALLOWED);
handleHazelcastInstanceAwareParams(listener, predicate);
initializeManagedContext(listener);
initializeManagedContext(predicate);

return addLocalEntryListenerInternal(listener, predicate, null, includeValue);
}
Expand All @@ -444,7 +445,8 @@ public String addLocalEntryListener(EntryListener listener, Predicate<K, V> pred
public String addLocalEntryListener(MapListener listener, Predicate<K, V> predicate, K key, boolean includeValue) {
checkNotNull(listener, NULL_LISTENER_IS_NOT_ALLOWED);
checkNotNull(predicate, NULL_PREDICATE_IS_NOT_ALLOWED);
handleHazelcastInstanceAwareParams(listener, predicate);
initializeManagedContext(listener);
initializeManagedContext(predicate);

return addLocalEntryListenerInternal(listener, predicate, toDataWithStrategy(key), includeValue);
}
Expand All @@ -453,23 +455,24 @@ public String addLocalEntryListener(MapListener listener, Predicate<K, V> predic
public String addLocalEntryListener(EntryListener listener, Predicate<K, V> predicate, K key, boolean includeValue) {
checkNotNull(listener, NULL_LISTENER_IS_NOT_ALLOWED);
checkNotNull(predicate, NULL_PREDICATE_IS_NOT_ALLOWED);
handleHazelcastInstanceAwareParams(listener, predicate);
initializeManagedContext(listener);
initializeManagedContext(predicate);

return addLocalEntryListenerInternal(listener, predicate, toDataWithStrategy(key), includeValue);
}

@Override
public String addEntryListener(MapListener listener, boolean includeValue) {
checkNotNull(listener, NULL_LISTENER_IS_NOT_ALLOWED);
handleHazelcastInstanceAwareParams(listener);
initializeManagedContext(listener);

return addEntryListenerInternal(listener, null, includeValue);
}

@Override
public String addEntryListener(EntryListener listener, boolean includeValue) {
checkNotNull(listener, NULL_LISTENER_IS_NOT_ALLOWED);
handleHazelcastInstanceAwareParams(listener);
initializeManagedContext(listener);

return addEntryListenerInternal(listener, null, includeValue);
}
Expand All @@ -478,7 +481,7 @@ public String addEntryListener(EntryListener listener, boolean includeValue) {
public String addEntryListener(MapListener listener, K key, boolean includeValue) {
checkNotNull(listener, NULL_LISTENER_IS_NOT_ALLOWED);
checkNotNull(key, NULL_KEY_IS_NOT_ALLOWED);
handleHazelcastInstanceAwareParams(listener);
initializeManagedContext(listener);

return addEntryListenerInternal(listener, toDataWithStrategy(key), includeValue);
}
Expand All @@ -487,7 +490,7 @@ public String addEntryListener(MapListener listener, K key, boolean includeValue
public String addEntryListener(EntryListener listener, K key, boolean includeValue) {
checkNotNull(listener, NULL_LISTENER_IS_NOT_ALLOWED);
checkNotNull(key, NULL_KEY_IS_NOT_ALLOWED);
handleHazelcastInstanceAwareParams(listener);
initializeManagedContext(listener);

return addEntryListenerInternal(listener, toDataWithStrategy(key), includeValue);
}
Expand All @@ -496,7 +499,8 @@ public String addEntryListener(EntryListener listener, K key, boolean includeVal
public String addEntryListener(MapListener listener, Predicate<K, V> predicate, K key, boolean includeValue) {
checkNotNull(listener, NULL_LISTENER_IS_NOT_ALLOWED);
checkNotNull(predicate, NULL_PREDICATE_IS_NOT_ALLOWED);
handleHazelcastInstanceAwareParams(listener, predicate);
initializeManagedContext(listener);
initializeManagedContext(predicate);

return addEntryListenerInternal(listener, predicate, toDataWithStrategy(key), includeValue);
}
Expand All @@ -505,7 +509,8 @@ public String addEntryListener(MapListener listener, Predicate<K, V> predicate,
public String addEntryListener(EntryListener listener, Predicate<K, V> predicate, K key, boolean includeValue) {
checkNotNull(listener, NULL_LISTENER_IS_NOT_ALLOWED);
checkNotNull(predicate, NULL_PREDICATE_IS_NOT_ALLOWED);
handleHazelcastInstanceAwareParams(listener, predicate);
initializeManagedContext(listener);
initializeManagedContext(predicate);

return addEntryListenerInternal(listener, predicate, toDataWithStrategy(key), includeValue);
}
Expand All @@ -514,7 +519,8 @@ public String addEntryListener(EntryListener listener, Predicate<K, V> predicate
public String addEntryListener(MapListener listener, Predicate<K, V> predicate, boolean includeValue) {
checkNotNull(listener, NULL_LISTENER_IS_NOT_ALLOWED);
checkNotNull(predicate, NULL_PREDICATE_IS_NOT_ALLOWED);
handleHazelcastInstanceAwareParams(listener, predicate);
initializeManagedContext(listener);
initializeManagedContext(predicate);

return addEntryListenerInternal(listener, predicate, null, includeValue);
}
Expand All @@ -523,7 +529,8 @@ public String addEntryListener(MapListener listener, Predicate<K, V> predicate,
public String addEntryListener(EntryListener listener, Predicate<K, V> predicate, boolean includeValue) {
checkNotNull(listener, NULL_LISTENER_IS_NOT_ALLOWED);
checkNotNull(predicate, NULL_PREDICATE_IS_NOT_ALLOWED);
handleHazelcastInstanceAwareParams(listener, predicate);
initializeManagedContext(listener);
initializeManagedContext(predicate);

return addEntryListenerInternal(listener, predicate, null, includeValue);
}
Expand All @@ -538,7 +545,7 @@ public boolean removeEntryListener(String id) {
@Override
public String addPartitionLostListener(MapPartitionLostListener listener) {
checkNotNull(listener, NULL_LISTENER_IS_NOT_ALLOWED);
handleHazelcastInstanceAwareParams(listener);
initializeManagedContext(listener);

return addPartitionLostListenerInternal(listener);
}
Expand Down Expand Up @@ -627,7 +634,7 @@ public Set<K> keySet() {
@Override
@SuppressWarnings("unchecked")
public Set<K> keySet(Predicate predicate) {
handleHazelcastInstanceAwareParams(predicate);
initializeManagedContext(predicate);
return executePredicate(predicate, IterationType.KEY, true);
}

Expand All @@ -638,7 +645,7 @@ public Set<Map.Entry<K, V>> entrySet() {

@Override
public Set<Map.Entry<K, V>> entrySet(Predicate predicate) {
handleHazelcastInstanceAwareParams(predicate);
initializeManagedContext(predicate);
return executePredicate(predicate, IterationType.ENTRY, true);
}

Expand All @@ -650,7 +657,7 @@ public Collection<V> values() {
@Override
@SuppressWarnings("unchecked")
public Collection<V> values(Predicate predicate) {
handleHazelcastInstanceAwareParams(predicate);
initializeManagedContext(predicate);
return executePredicate(predicate, IterationType.VALUE, false);
}

Expand Down Expand Up @@ -688,7 +695,7 @@ public Set<K> localKeySet() {
@SuppressWarnings("unchecked")
public Set<K> localKeySet(Predicate predicate) {
checkNotNull(predicate, NULL_PREDICATE_IS_NOT_ALLOWED);
handleHazelcastInstanceAwareParams(predicate);
initializeManagedContext(predicate);

MapQueryEngine queryEngine = getMapQueryEngine();
Query query = Query.of()
Expand All @@ -703,7 +710,7 @@ public Set<K> localKeySet(Predicate predicate) {
@Override
public Object executeOnKey(K key, EntryProcessor entryProcessor) {
checkNotNull(key, NULL_KEY_IS_NOT_ALLOWED);
handleHazelcastInstanceAwareParams(entryProcessor);
initializeManagedContext(entryProcessor);

Data result = executeOnKeyInternal(key, entryProcessor);
return toObject(result);
Expand All @@ -712,7 +719,7 @@ public Object executeOnKey(K key, EntryProcessor entryProcessor) {
@Override
public Map<K, Object> executeOnKeys(Set<K> keys, EntryProcessor entryProcessor) {
checkNotNull(keys, NULL_KEYS_ARE_NOT_ALLOWED);
handleHazelcastInstanceAwareParams(entryProcessor);
initializeManagedContext(entryProcessor);

if (keys.isEmpty()) {
return emptyMap();
Expand All @@ -724,15 +731,16 @@ public Map<K, Object> executeOnKeys(Set<K> keys, EntryProcessor entryProcessor)
@Override
public void submitToKey(K key, EntryProcessor entryProcessor, ExecutionCallback callback) {
checkNotNull(key, NULL_KEY_IS_NOT_ALLOWED);
handleHazelcastInstanceAwareParams(entryProcessor, callback);
initializeManagedContext(entryProcessor);
initializeManagedContext(callback);

executeOnKeyInternal(key, entryProcessor, callback);
}

@Override
public ICompletableFuture submitToKey(K key, EntryProcessor entryProcessor) {
checkNotNull(key, NULL_KEY_IS_NOT_ALLOWED);
handleHazelcastInstanceAwareParams(entryProcessor);
initializeManagedContext(entryProcessor);

InternalCompletableFuture future = executeOnKeyInternal(key, entryProcessor, null);
return new DelegatingFuture(future, serializationService);
Expand All @@ -745,7 +753,8 @@ public Map<K, Object> executeOnEntries(EntryProcessor entryProcessor) {

@Override
public Map<K, Object> executeOnEntries(EntryProcessor entryProcessor, Predicate predicate) {
handleHazelcastInstanceAwareParams(entryProcessor, predicate);
initializeManagedContext(entryProcessor);
initializeManagedContext(predicate);
List<Data> result = new ArrayList<Data>();

executeOnEntriesInternal(entryProcessor, predicate, result);
Expand All @@ -769,8 +778,7 @@ public <R> R aggregate(Aggregator<Map.Entry<K, V>, R> aggregator) {
checkNotNull(aggregator, NULL_AGGREGATOR_IS_NOT_ALLOWED);

MapQueryEngine queryEngine = getMapQueryEngine();
// HazelcastInstanceAware handled by cloning
aggregator = serializationService.toObject(serializationService.toData(aggregator));
initializeManagedContext(aggregator);

Query query = Query.of()
.mapName(getName())
Expand All @@ -786,10 +794,9 @@ public <R> R aggregate(Aggregator<Map.Entry<K, V>, R> aggregator) {
public <R> R aggregate(Aggregator<Map.Entry<K, V>, R> aggregator, Predicate<K, V> predicate) {
checkNotNull(aggregator, NULL_AGGREGATOR_IS_NOT_ALLOWED);
checkNotNull(predicate, NULL_PREDICATE_IS_NOT_ALLOWED);
handleHazelcastInstanceAwareParams(predicate);
initializeManagedContext(predicate);
initializeManagedContext(aggregator);

// HazelcastInstanceAware handled by cloning
aggregator = serializationService.toObject(serializationService.toData(aggregator));
MapQueryEngine queryEngine = getMapQueryEngine();
if (predicate instanceof PagingPredicate) {
throw new IllegalArgumentException("PagingPredicate now allowed with EntryAggregator.");
Expand All @@ -810,8 +817,7 @@ public <R> Collection<R> project(Projection<Map.Entry<K, V>, R> projection) {
checkNotNull(projection, NULL_PROJECTION_IS_NOT_ALLOWED);

MapQueryEngine queryEngine = getMapQueryEngine();
// HazelcastInstanceAware handled by cloning
projection = serializationService.toObject(serializationService.toData(projection));
initializeManagedContext(projection);

Query query = Query.of()
.mapName(getName())
Expand All @@ -827,10 +833,9 @@ public <R> Collection<R> project(Projection<Map.Entry<K, V>, R> projection) {
public <R> Collection<R> project(Projection<Map.Entry<K, V>, R> projection, Predicate<K, V> predicate) {
checkNotNull(projection, NULL_PROJECTION_IS_NOT_ALLOWED);
checkNotNull(predicate, NULL_PREDICATE_IS_NOT_ALLOWED);
handleHazelcastInstanceAwareParams(predicate);
initializeManagedContext(predicate);
initializeManagedContext(projection);

// HazelcastInstanceAware handled by cloning
projection = serializationService.toObject(serializationService.toData(projection));
MapQueryEngine queryEngine = getMapQueryEngine();

Query query = Query.of()
Expand Down Expand Up @@ -958,9 +963,8 @@ public <R> Iterator<R> iterator(int fetchSize, int partitionId, Projection<Map.E
}
checkNotNull(projection, NULL_PROJECTION_IS_NOT_ALLOWED);
checkNotNull(predicate, NULL_PREDICATE_IS_NOT_ALLOWED);
// HazelcastInstanceAware handled by cloning
projection = serializationService.toObject(serializationService.toData(projection));
handleHazelcastInstanceAwareParams(predicate);
initializeManagedContext(projection);
initializeManagedContext(predicate);
return new MapQueryPartitionIterator<K, V, R>(this, fetchSize, partitionId, predicate, projection);
}

Expand All @@ -979,9 +983,8 @@ public <T> ICompletableFuture<ReadResultSet<T>> readFromEventJournal(
int partitionId,
com.hazelcast.util.function.Predicate<? super EventJournalMapEvent<K, V>> predicate,
Projection<? super EventJournalMapEvent<K, V>, T> projection) {
handleHazelcastInstanceAwareParams(predicate);
// HazelcastInstanceAware handled by cloning
projection = serializationService.toObject(serializationService.toData(projection));
initializeManagedContext(predicate);
initializeManagedContext(projection);
final MapEventJournalReadOperation<K, V, T> op = new MapEventJournalReadOperation<K, V, T>(
name, startSequence, minSize, maxSize, predicate, projection);
op.setPartitionId(partitionId);
Expand All @@ -1005,7 +1008,7 @@ public QueryCache<K, V> getQueryCache(String name, Predicate<K, V> predicate, bo
checkNotNull(name, "name cannot be null");
checkNotNull(predicate, "predicate cannot be null");
checkNotInstanceOf(PagingPredicate.class, predicate, "predicate");
handleHazelcastInstanceAwareParams(predicate);
initializeManagedContext(predicate);

return getQueryCacheInternal(name, null, predicate, includeValue, this);
}
Expand All @@ -1015,7 +1018,8 @@ public QueryCache<K, V> getQueryCache(String name, MapListener listener, Predica
checkNotNull(name, "name cannot be null");
checkNotNull(predicate, "predicate cannot be null");
checkNotInstanceOf(PagingPredicate.class, predicate, "predicate");
handleHazelcastInstanceAwareParams(listener, predicate);
initializeManagedContext(listener);
initializeManagedContext(predicate);

return getQueryCacheInternal(name, listener, predicate, includeValue, this);
}
Expand Down Expand Up @@ -1044,11 +1048,14 @@ private QueryCache<K, V> createQueryCache(QueryCacheRequest request) {
new NodeQueryCacheEndToEndConstructor(request));
}

private void handleHazelcastInstanceAwareParams(Object... objects) {
for (Object object : objects) {
if (object instanceof HazelcastInstanceAware) {
((HazelcastInstanceAware) object).setHazelcastInstance(getNodeEngine().getHazelcastInstance());
}
}
/**
* Handles injection of managed context into the provided object
* if any object implements a context-aware interface such as
* {@link com.hazelcast.core.HazelcastInstanceAware} or {@link com.hazelcast.spi.NodeAware}.
*
* @param o the objects which should have the contexts injected
*/
private void initializeManagedContext(Object o) {
serializationService.getManagedContext().initialize(o);
}
}

0 comments on commit 5ef3c93

Please sign in to comment.