Skip to content

Commit

Permalink
ISPN-12976 Reindexing OFF_HEAP cache leads to ClassCastException when…
Browse files Browse the repository at this point in the history
… querying
  • Loading branch information
Gustavo committed May 11, 2021
1 parent 484ee2b commit 0daee31
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 31 deletions.
36 changes: 19 additions & 17 deletions core/src/main/java/org/infinispan/encoding/DataConversion.java
Expand Up @@ -238,8 +238,10 @@ public Object extractIndexable(Object stored) {
Wrapper wrapper = getWrapper();
if (isKey) return wrapper.unwrap(stored);

// If the value wrapper is indexable, just use it
if (wrapper.isFilterable()) return stored;
if (wrapper.isFilterable()) {
// If the value wrapper is indexable, return the already wrapped value or wrap it otherwise
return stored.getClass().equals(wrapperClass) ? stored : wrapper.wrap(stored);
}

// Otherwise convert to the request format
Object unencoded = encoder.fromStorage(wrapper.unwrap(stored));
Expand Down Expand Up @@ -287,26 +289,26 @@ public boolean equals(Object o) {
if (o == null || getClass() != o.getClass()) return false;
DataConversion that = (DataConversion) o;
return isKey == that.isKey &&
Objects.equals(encoder, that.encoder) &&
Objects.equals(customWrapper, that.customWrapper) &&
Objects.equals(transcoder, that.transcoder) &&
Objects.equals(requestMediaType, that.requestMediaType);
Objects.equals(encoder, that.encoder) &&
Objects.equals(customWrapper, that.customWrapper) &&
Objects.equals(transcoder, that.transcoder) &&
Objects.equals(requestMediaType, that.requestMediaType);
}

@Override
public String toString() {
return "DataConversion{" +
"encoderClass=" + encoderClass +
", wrapperClass=" + wrapperClass +
", requestMediaType=" + requestMediaType +
", storageMediaType=" + storageMediaType +
", encoderId=" + encoderId +
", wrapperId=" + wrapperId +
", encoder=" + encoder +
", wrapper=" + customWrapper +
", isKey=" + isKey +
", transcoder=" + transcoder +
'}';
"encoderClass=" + encoderClass +
", wrapperClass=" + wrapperClass +
", requestMediaType=" + requestMediaType +
", storageMediaType=" + storageMediaType +
", encoderId=" + encoderId +
", wrapperId=" + wrapperId +
", encoder=" + encoder +
", wrapper=" + customWrapper +
", isKey=" + isKey +
", transcoder=" + transcoder +
'}';
}

@Override
Expand Down
Expand Up @@ -11,7 +11,6 @@
import java.util.stream.Stream;

import org.infinispan.AdvancedCache;
import org.infinispan.commons.dataconversion.Wrapper;
import org.infinispan.commons.marshall.AbstractExternalizer;
import org.infinispan.commons.time.TimeService;
import org.infinispan.container.entries.CacheEntry;
Expand Down Expand Up @@ -47,10 +46,8 @@ public final class IndexWorker implements Function<EmbeddedCacheManager, Void> {
public Void apply(EmbeddedCacheManager embeddedCacheManager) {
AdvancedCache<Object, Object> cache = SecurityActions.getUnwrappedCache(embeddedCacheManager.getCache(cacheName)).getAdvancedCache();
DataConversion valueDataConversion = cache.getValueDataConversion();
Wrapper valueWrapper = valueDataConversion.getWrapper();
boolean valueFilterable = valueWrapper.isFilterable();

AdvancedCache<Object, Object> reindexCache = valueFilterable ? cache.withStorageMediaType() : cache;
AdvancedCache<Object, Object> reindexCache = cache.withStorageMediaType();

SearchMapping searchMapping = ComponentRegistryUtils.getSearchMapping(cache);
TimeService timeService = ComponentRegistryUtils.getTimeService(cache);
Expand All @@ -59,7 +56,6 @@ public Void apply(EmbeddedCacheManager embeddedCacheManager) {
IndexUpdater indexUpdater = new IndexUpdater(searchMapping);
KeyPartitioner keyPartitioner = ComponentRegistryUtils.getKeyPartitioner(cache);

DataConversion keyDataConversion = reindexCache.getKeyDataConversion();
if (keys == null || keys.size() == 0) {
preIndex(cache, indexUpdater);
MassIndexerProgressState progressState = new MassIndexerProgressState(notifier);
Expand All @@ -68,26 +64,23 @@ public Void apply(EmbeddedCacheManager embeddedCacheManager) {
.cacheEntrySet().stream()) {
stream.forEach(entry -> {
Object key = entry.getKey();
Object storedKey = keyDataConversion.toStorage(key);
Object value = entry.getValue();
if (valueFilterable) {
value = valueWrapper.wrap(value);
}
int segment = keyPartitioner.getSegment(storedKey);
Object value = valueDataConversion.extractIndexable(entry.getValue());
int segment = keyPartitioner.getSegment(key);

if (value != null && indexedTypes.contains(indexUpdater.toConvertedEntityJavaClass(value))) {
progressState.addItem(entry.getKey(), value,
indexUpdater.updateIndex(entry.getKey(), value, segment));
progressState.addItem(key, value,
indexUpdater.updateIndex(key, value, segment));
}
});
}
}
postIndex(indexUpdater, progressState, notifier);
} else {
DataConversion keyDataConversion = cache.getKeyDataConversion();
Set<Class<?>> classSet = new HashSet<>();
for (Object key : keys) {
Object storedKey = keyDataConversion.toStorage(key);
Object unwrappedKey = keyDataConversion.getWrapper().unwrap(storedKey);
Object unwrappedKey = keyDataConversion.extractIndexable(storedKey);
Object value = cache.get(key);
if (value != null) {
indexUpdater.updateIndex(unwrappedKey, value, keyPartitioner.getSegment(storedKey));
Expand Down

0 comments on commit 0daee31

Please sign in to comment.