Skip to content

Commit

Permalink
ISPN-9569 Indexing is not working for clustered caches if it is trans…
Browse files Browse the repository at this point in the history
…actional

* TransactionalIndexingTest highlights the issue
* remove ProtobufValueWrapperInterceptor; its role is now taken by ProtobufValueWrapperSearchWorkCreator
* move ProtobufValueWrapperAnalyzerDiscriminator and ProtobufValueWrapperIndexingInterceptor
  to org.infinispan.query.remote.impl.indexing package
  • Loading branch information
anistor authored and Gustavo Fernandes committed Oct 10, 2018
1 parent 5235b7a commit 23353cc
Show file tree
Hide file tree
Showing 13 changed files with 181 additions and 177 deletions.
Expand Up @@ -44,9 +44,13 @@ public class MultiHotRodServerQueryTest extends MultiHotRodServersTest {
protected RemoteCache<Integer, User> remoteCache0; protected RemoteCache<Integer, User> remoteCache0;
protected RemoteCache<Integer, User> remoteCache1; protected RemoteCache<Integer, User> remoteCache1;


protected boolean useTransactions() {
return false;
}

@Override @Override
protected void createCacheManagers() throws Throwable { protected void createCacheManagers() throws Throwable {
ConfigurationBuilder builder = hotRodCacheConfiguration(getDefaultClusteredCacheConfig(CacheMode.REPL_SYNC, false)); ConfigurationBuilder builder = hotRodCacheConfiguration(getDefaultClusteredCacheConfig(CacheMode.REPL_SYNC, useTransactions()));
builder.indexing().index(Index.ALL) builder.indexing().index(Index.ALL)
.addProperty("default.directory_provider", "local-heap") .addProperty("default.directory_provider", "local-heap")
.addProperty("lucene_version", "LUCENE_CURRENT"); .addProperty("lucene_version", "LUCENE_CURRENT");
Expand Down Expand Up @@ -113,7 +117,7 @@ protected void populateCache() throws Exception {
client(0).getCache().put("dummy", "a primitive value cannot be queried"); client(0).getCache().put("dummy", "a primitive value cannot be queried");
} }


public void testAttributeQuery() throws Exception { public void testAttributeQuery() {
// get user back from remote cache and check its attributes // get user back from remote cache and check its attributes
User fromCache = remoteCache0.get(1); User fromCache = remoteCache0.get(1);
assertNotNull(fromCache); assertNotNull(fromCache);
Expand All @@ -131,7 +135,7 @@ public void testAttributeQuery() throws Exception {
assertUser1(list.get(0)); assertUser1(list.get(0));
} }


public void testGroupByQuery() throws Exception { public void testGroupByQuery() {
// get user back from remote cache and check its attributes // get user back from remote cache and check its attributes
User fromCache = remoteCache0.get(1); User fromCache = remoteCache0.get(1);
assertNotNull(fromCache); assertNotNull(fromCache);
Expand All @@ -154,7 +158,7 @@ public void testGroupByQuery() throws Exception {
assertEquals("Tom", list.get(1)[0]); assertEquals("Tom", list.get(1)[0]);
} }


public void testEmbeddedAttributeQuery() throws Exception { public void testEmbeddedAttributeQuery() {
// get user back from remote cache via query and check its attributes // get user back from remote cache via query and check its attributes
QueryFactory qf = Search.getQueryFactory(remoteCache1); QueryFactory qf = Search.getQueryFactory(remoteCache1);
Query query = qf.from(UserPB.class) Query query = qf.from(UserPB.class)
Expand All @@ -168,7 +172,7 @@ public void testEmbeddedAttributeQuery() throws Exception {
} }


@Test(expectedExceptions = HotRodClientException.class, expectedExceptionsMessageRegExp = ".*ISPN028503: Property addresses can not be selected from type sample_bank_account.User since it is an embedded entity.") @Test(expectedExceptions = HotRodClientException.class, expectedExceptionsMessageRegExp = ".*ISPN028503: Property addresses can not be selected from type sample_bank_account.User since it is an embedded entity.")
public void testInvalidEmbeddedAttributeQuery() throws Exception { public void testInvalidEmbeddedAttributeQuery() {
QueryFactory qf = Search.getQueryFactory(remoteCache1); QueryFactory qf = Search.getQueryFactory(remoteCache1);


Query q = qf.from(UserPB.class) Query q = qf.from(UserPB.class)
Expand All @@ -177,7 +181,7 @@ public void testInvalidEmbeddedAttributeQuery() throws Exception {
q.list(); // exception expected q.list(); // exception expected
} }


public void testProjections() throws Exception { public void testProjections() {
// get user back from remote cache and check its attributes // get user back from remote cache and check its attributes
User fromCache = remoteCache0.get(1); User fromCache = remoteCache0.get(1);
assertUser1(fromCache); assertUser1(fromCache);
Expand Down
@@ -0,0 +1,15 @@
package org.infinispan.client.hotrod.query;

import org.testng.annotations.Test;

/**
* @since 9.4
*/
@Test(testName = "client.hotrod.query.TransactionalIndexingTest", groups = "functional")
public class TransactionalIndexingTest extends MultiHotRodServerQueryTest {

@Override
protected boolean useTransactions() {
return true;
}
}
@@ -1,10 +1,14 @@
package org.infinispan.query.backend; package org.infinispan.query.backend;


//todo [anistor] this class will be removed in Infinispan 10

/** /**
* Add extra methods. * Add extra methods.
* *
* @author Ales Justin * @author Ales Justin
* @deprecated without replacement
*/ */
@Deprecated
public interface ExtendedSearchWorkCreator extends SearchWorkCreator { public interface ExtendedSearchWorkCreator extends SearchWorkCreator {
boolean shouldRemove(SearchWorkCreatorContext context); boolean shouldRemove(SearchWorkCreatorContext context);
} }
Expand Up @@ -329,7 +329,7 @@ public void purgeIndex(Class<?> entityType) {
} }


/** /**
* Remove entries from all indexes by key * Remove entries from all indexes by key.
*/ */
void removeFromIndexes(TransactionContext transactionContext, Object key) { void removeFromIndexes(TransactionContext transactionContext, Object key) {
Stream<IndexedTypeIdentifier> typeIdentifiers = getKnownClasses().stream() Stream<IndexedTypeIdentifier> typeIdentifiers = getKnownClasses().stream()
Expand Down
@@ -1,10 +1,14 @@
package org.infinispan.query.backend; package org.infinispan.query.backend;


//todo [anistor] this class will be removed in Infinispan 10

/** /**
* Use context pattern, so it can be easily extended / changed. * Use context pattern, so it can be easily extended / changed.
* *
* @author Ales Justin * @author Ales Justin
* @deprecated without replacement
*/ */
@Deprecated
public class SearchWorkCreatorContext { public class SearchWorkCreatorContext {
private Object previousValue; private Object previousValue;
private Object currentValue; private Object currentValue;
Expand Down
Expand Up @@ -18,26 +18,20 @@
import org.infinispan.commons.marshall.AdvancedExternalizer; import org.infinispan.commons.marshall.AdvancedExternalizer;
import org.infinispan.commons.util.ServiceFinder; import org.infinispan.commons.util.ServiceFinder;
import org.infinispan.configuration.cache.Configuration; import org.infinispan.configuration.cache.Configuration;
import org.infinispan.configuration.cache.ConfigurationBuilder;
import org.infinispan.configuration.cache.ContentTypeConfiguration; import org.infinispan.configuration.cache.ContentTypeConfiguration;
import org.infinispan.configuration.cache.CustomInterceptorsConfigurationBuilder;
import org.infinispan.configuration.cache.InterceptorConfiguration;
import org.infinispan.configuration.cache.InterceptorConfigurationBuilder;
import org.infinispan.configuration.global.GlobalConfiguration; import org.infinispan.configuration.global.GlobalConfiguration;
import org.infinispan.configuration.global.GlobalJmxStatisticsConfiguration; import org.infinispan.configuration.global.GlobalJmxStatisticsConfiguration;
import org.infinispan.factories.ComponentRegistry; import org.infinispan.factories.ComponentRegistry;
import org.infinispan.factories.GlobalComponentRegistry; import org.infinispan.factories.GlobalComponentRegistry;
import org.infinispan.factories.components.ComponentMetadataRepo; import org.infinispan.factories.components.ComponentMetadataRepo;
import org.infinispan.factories.components.ManageableComponentMetadata; import org.infinispan.factories.components.ManageableComponentMetadata;
import org.infinispan.factories.impl.BasicComponentRegistry; import org.infinispan.factories.impl.BasicComponentRegistry;
import org.infinispan.interceptors.AsyncInterceptorChain;
import org.infinispan.interceptors.impl.BatchingInterceptor;
import org.infinispan.interceptors.impl.InvocationContextInterceptor;
import org.infinispan.jmx.ResourceDMBean; import org.infinispan.jmx.ResourceDMBean;
import org.infinispan.lifecycle.ModuleLifecycle; import org.infinispan.lifecycle.ModuleLifecycle;
import org.infinispan.manager.EmbeddedCacheManager; import org.infinispan.manager.EmbeddedCacheManager;
import org.infinispan.marshall.core.EncoderRegistry; import org.infinispan.marshall.core.EncoderRegistry;
import org.infinispan.protostream.SerializationContext; import org.infinispan.protostream.SerializationContext;
import org.infinispan.query.backend.QueryInterceptor;
import org.infinispan.query.remote.ProtobufMetadataManager; import org.infinispan.query.remote.ProtobufMetadataManager;
import org.infinispan.query.remote.client.Externalizers.QueryRequestExternalizer; import org.infinispan.query.remote.client.Externalizers.QueryRequestExternalizer;
import org.infinispan.query.remote.client.ProtostreamSerializationContextInitializer; import org.infinispan.query.remote.client.ProtostreamSerializationContextInitializer;
Expand All @@ -53,7 +47,7 @@
import org.infinispan.query.remote.impl.filter.IckleProtobufCacheEventFilterConverter; import org.infinispan.query.remote.impl.filter.IckleProtobufCacheEventFilterConverter;
import org.infinispan.query.remote.impl.filter.IckleProtobufFilterAndConverter; import org.infinispan.query.remote.impl.filter.IckleProtobufFilterAndConverter;
import org.infinispan.query.remote.impl.indexing.ProtobufValueWrapper; import org.infinispan.query.remote.impl.indexing.ProtobufValueWrapper;
import org.infinispan.query.remote.impl.indexing.ProtobufValueWrapperInterceptor; import org.infinispan.query.remote.impl.indexing.ProtobufValueWrapperSearchWorkCreator;
import org.infinispan.query.remote.impl.logging.Log; import org.infinispan.query.remote.impl.logging.Log;
import org.infinispan.registry.InternalCacheRegistry; import org.infinispan.registry.InternalCacheRegistry;
import org.kohsuke.MetaInfServices; import org.kohsuke.MetaInfServices;
Expand Down Expand Up @@ -174,40 +168,6 @@ public void cacheStarting(ComponentRegistry cr, Configuration cfg, String cacheN
ProtobufMetadataManagerImpl protobufMetadataManager = ProtobufMetadataManagerImpl protobufMetadataManager =
(ProtobufMetadataManagerImpl) gcr.getComponent(ProtobufMetadataManager.class).running(); (ProtobufMetadataManagerImpl) gcr.getComponent(ProtobufMetadataManager.class).running();
protobufMetadataManager.addCacheDependency(cacheName); protobufMetadataManager.addCacheDependency(cacheName);

if (cfg.indexing().index().isEnabled()) {
log.infof("Registering ProtobufValueWrapperInterceptor for cache %s", cacheName);
EmbeddedCacheManager cacheManager = gcr.getComponent(EmbeddedCacheManager.class).running();
createProtobufValueWrapperInterceptor(cr, cfg, cacheManager);
}
}
}

private void createProtobufValueWrapperInterceptor(ComponentRegistry cr, Configuration cfg, EmbeddedCacheManager cacheManager) {
ProtobufValueWrapperInterceptor wrapperInterceptor = cr.getComponent(ProtobufValueWrapperInterceptor.class);
if (wrapperInterceptor == null) {
SerializationContext serCtx = ProtobufMetadataManagerImpl.getSerializationContext(cacheManager);
wrapperInterceptor = new ProtobufValueWrapperInterceptor(serCtx);

// Interceptor registration not needed, core configuration handling
// already does it for all custom interceptors - UNLESS the InterceptorChain already exists in the component registry!
AsyncInterceptorChain ic = cr.getComponent(AsyncInterceptorChain.class);

ConfigurationBuilder builder = new ConfigurationBuilder().read(cfg);
InterceptorConfigurationBuilder interceptorBuilder = builder.customInterceptors().addInterceptor();
interceptorBuilder.interceptor(wrapperInterceptor);

if (cfg.invocationBatching().enabled()) {
if (ic != null) ic.addInterceptorAfter(wrapperInterceptor, BatchingInterceptor.class);
interceptorBuilder.after(BatchingInterceptor.class);
} else {
if (ic != null) ic.addInterceptorAfter(wrapperInterceptor, InvocationContextInterceptor.class);
interceptorBuilder.after(InvocationContextInterceptor.class);
}
if (ic != null) {
cr.registerComponent(wrapperInterceptor, ProtobufValueWrapperInterceptor.class);
}
cfg.customInterceptors().interceptors(builder.build().customInterceptors().interceptors());
} }
} }


Expand Down Expand Up @@ -241,45 +201,19 @@ public void cacheStarted(ComponentRegistry cr, String cacheName) {
InternalCacheRegistry icr = cr.getGlobalComponentRegistry().getComponent(InternalCacheRegistry.class); InternalCacheRegistry icr = cr.getGlobalComponentRegistry().getComponent(InternalCacheRegistry.class);
if (!icr.isInternalCache(cacheName)) { if (!icr.isInternalCache(cacheName)) {
Configuration cfg = cr.getComponent(Configuration.class); Configuration cfg = cr.getComponent(Configuration.class);
if (cfg.indexing().index().isEnabled()) {
if (!verifyChainContainsProtobufValueWrapperInterceptor(cr)) {
throw new IllegalStateException("It was expected to find the ProtobufValueWrapperInterceptor registered in the InterceptorChain but it wasn't found");
}
} else if (verifyChainContainsProtobufValueWrapperInterceptor(cr)) {
throw new IllegalStateException("It was NOT expected to find the ProtobufValueWrapperInterceptor registered in the InterceptorChain as indexing was disabled, but it was found");
}

ProtobufMetadataManagerImpl protobufMetadataManager = (ProtobufMetadataManagerImpl) cr.getGlobalComponentRegistry().getComponent(ProtobufMetadataManager.class); ProtobufMetadataManagerImpl protobufMetadataManager = (ProtobufMetadataManagerImpl) cr.getGlobalComponentRegistry().getComponent(ProtobufMetadataManager.class);
SerializationContext serCtx = protobufMetadataManager.getSerializationContext(); SerializationContext serCtx = protobufMetadataManager.getSerializationContext();


RemoteQueryManager remoteQueryManager = buildQueryManager(cfg, serCtx, cr); RemoteQueryManager remoteQueryManager = buildQueryManager(cfg, serCtx, cr);


cr.registerComponent(remoteQueryManager, RemoteQueryManager.class); cr.registerComponent(remoteQueryManager, RemoteQueryManager.class);
}
}

private boolean verifyChainContainsProtobufValueWrapperInterceptor(ComponentRegistry cr) {
AsyncInterceptorChain interceptorChain = cr.getComponent(AsyncInterceptorChain.class);
return interceptorChain != null && interceptorChain.containsInterceptorType(ProtobufValueWrapperInterceptor.class, true);
}


@Override if (cfg.indexing().index().isEnabled()) {
public void cacheStopped(ComponentRegistry cr, String cacheName) { log.debugf("Wrapping the SearchWorkCreator for indexed cache %s", cacheName);
Configuration cfg = cr.getComponent(Configuration.class); QueryInterceptor queryInterceptor = cr.getComponent(QueryInterceptor.class);
removeProtobufValueWrapperInterceptor(cfg); queryInterceptor.setSearchWorkCreator(new ProtobufValueWrapperSearchWorkCreator(queryInterceptor.getSearchWorkCreator(), serCtx).get());
}

private void removeProtobufValueWrapperInterceptor(Configuration cfg) {
ConfigurationBuilder builder = new ConfigurationBuilder();
CustomInterceptorsConfigurationBuilder customInterceptorsBuilder = builder.customInterceptors();

for (InterceptorConfiguration interceptorConfig : cfg.customInterceptors().interceptors()) {
if (!(interceptorConfig.asyncInterceptor() instanceof ProtobufValueWrapperInterceptor)) {
customInterceptorsBuilder.addInterceptor().read(interceptorConfig);
} }
} }

cfg.customInterceptors().interceptors(builder.build().customInterceptors().interceptors());
} }


@Override @Override
Expand Down
Expand Up @@ -7,7 +7,9 @@
import org.infinispan.Cache; import org.infinispan.Cache;
import org.infinispan.commons.logging.LogFactory; import org.infinispan.commons.logging.LogFactory;
import org.infinispan.query.remote.impl.indexing.ProtobufValueWrapper; import org.infinispan.query.remote.impl.indexing.ProtobufValueWrapper;
import org.infinispan.query.remote.impl.indexing.ProtobufValueWrapperAnalyzerDiscriminator;
import org.infinispan.query.remote.impl.indexing.ProtobufValueWrapperFieldBridge; import org.infinispan.query.remote.impl.indexing.ProtobufValueWrapperFieldBridge;
import org.infinispan.query.remote.impl.indexing.ProtobufValueWrapperIndexingInterceptor;
import org.infinispan.query.remote.impl.logging.Log; import org.infinispan.query.remote.impl.logging.Log;
import org.infinispan.query.spi.ProgrammaticSearchMappingProvider; import org.infinispan.query.spi.ProgrammaticSearchMappingProvider;
import org.kohsuke.MetaInfServices; import org.kohsuke.MetaInfServices;
Expand Down
Expand Up @@ -26,14 +26,24 @@ public final class ProtobufValueWrapper implements WrappedBytes {


public static final IndexedTypeIdentifier INDEXING_TYPE = PojoIndexedTypeIdentifier.convertFromLegacy(ProtobufValueWrapper.class); public static final IndexedTypeIdentifier INDEXING_TYPE = PojoIndexedTypeIdentifier.convertFromLegacy(ProtobufValueWrapper.class);


/**
* Max number of bytes to include in {@link #toString()}.
*/
private static final int MAX_BYTES_IN_TOSTRING = 40; private static final int MAX_BYTES_IN_TOSTRING = 40;


// The protobuf encoded payload /**
* The protobuf encoded payload.
*/
private final byte[] binary; private final byte[] binary;


/**
* The lazily computed hashCode. Transient field!
*/
private transient int hashCode = 0; private transient int hashCode = 0;


// The Descriptor of the message (if it's a Message and not a primitive value). Transient field! /**
* The Descriptor of the message (if it's a Message and not a primitive value, or null otherwise). Transient field!
*/
private transient Descriptor messageDescriptor; private transient Descriptor messageDescriptor;


public ProtobufValueWrapper(byte[] binary) { public ProtobufValueWrapper(byte[] binary) {
Expand All @@ -43,21 +53,30 @@ public ProtobufValueWrapper(byte[] binary) {
this.binary = binary; this.binary = binary;
} }


/**
* Gets the internal byte array. Callers should not modify the contents of the array.
*
* @return the wrapped byte array
*/
public byte[] getBinary() { public byte[] getBinary() {
return binary; return binary;
} }


/** /**
* Returns the Protobuf descriptor of the message type of the payload. * Returns the Protobuf descriptor of the message type of the payload.
*
* @see #setMessageDescriptor
*/ */
public Descriptor getMessageDescriptor() { public Descriptor getMessageDescriptor() {
return messageDescriptor; return messageDescriptor;
} }


/** /**
* Sets the Protobuf descriptor of the message type of the payload. * Sets the Protobuf descriptor of the message type of the payload.
*
* @see ProtobufValueWrapperSearchWorkCreator#discoverMessageType
*/ */
public void setMessageDescriptor(Descriptor messageDescriptor) { void setMessageDescriptor(Descriptor messageDescriptor) {
this.messageDescriptor = messageDescriptor; this.messageDescriptor = messageDescriptor;
} }


Expand Down Expand Up @@ -117,6 +136,16 @@ public byte getByte(int offset) {
return binary[offset]; return binary[offset];
} }


@Override
public boolean equalsWrappedBytes(WrappedBytes other) {
if (other == null) return false;
if (other.getLength() != binary.length) return false;
for (int i = 0; i < binary.length; ++i) {
if (binary[i] != other.getByte(i)) return false;
}
return true;
}

public static final class Externalizer extends AbstractExternalizer<ProtobufValueWrapper> { public static final class Externalizer extends AbstractExternalizer<ProtobufValueWrapper> {


@Override @Override
Expand Down
@@ -1,11 +1,8 @@
package org.infinispan.query.remote.impl; package org.infinispan.query.remote.impl.indexing;


import org.hibernate.search.analyzer.Discriminator; import org.hibernate.search.analyzer.Discriminator;
import org.infinispan.protostream.descriptors.Descriptor; import org.infinispan.protostream.descriptors.Descriptor;
import org.infinispan.query.logging.Log; import org.infinispan.query.logging.Log;
import org.infinispan.query.remote.impl.indexing.FieldMapping;
import org.infinispan.query.remote.impl.indexing.IndexingMetadata;
import org.infinispan.query.remote.impl.indexing.ProtobufValueWrapper;
import org.infinispan.util.logging.LogFactory; import org.infinispan.util.logging.LogFactory;


/** /**
Expand All @@ -25,6 +22,10 @@ public String getAnalyzerDefinitionName(Object value, Object entity, String fiel
Descriptor messageDescriptor = wrapper.getMessageDescriptor(); Descriptor messageDescriptor = wrapper.getMessageDescriptor();
if (messageDescriptor != null) { if (messageDescriptor != null) {
return getAnalyzerForField(messageDescriptor, field); return getAnalyzerForField(messageDescriptor, field);
} else {
// this is either a scalar value (not indexed, why are we asked for analyzer -> bug)
// or this entry was not run through ProtobufValueWrapperSearchWorkCreator (bug again)
throw new IllegalStateException("Message descriptor not initialized for " + wrapper);
} }
} }
return null; return null;
Expand Down
@@ -1,10 +1,8 @@
package org.infinispan.query.remote.impl; package org.infinispan.query.remote.impl.indexing;


import org.hibernate.search.indexes.interceptor.EntityIndexingInterceptor; import org.hibernate.search.indexes.interceptor.EntityIndexingInterceptor;
import org.hibernate.search.indexes.interceptor.IndexingOverride; import org.hibernate.search.indexes.interceptor.IndexingOverride;
import org.infinispan.protostream.descriptors.Descriptor; import org.infinispan.protostream.descriptors.Descriptor;
import org.infinispan.query.remote.impl.indexing.IndexingMetadata;
import org.infinispan.query.remote.impl.indexing.ProtobufValueWrapper;


/** /**
* Hibernate Search interceptor for conditional indexing of protobuf message types based on the value of the * Hibernate Search interceptor for conditional indexing of protobuf message types based on the value of the
Expand Down

0 comments on commit 23353cc

Please sign in to comment.