Skip to content

Commit

Permalink
ISPN-9180 Remote compat mode from remote query
Browse files Browse the repository at this point in the history
  • Loading branch information
Gustavo Fernandes authored and anistor committed Jul 30, 2018
1 parent 213146b commit 1edc1ae
Show file tree
Hide file tree
Showing 26 changed files with 318 additions and 380 deletions.
Expand Up @@ -213,9 +213,9 @@ public FaultTolerantPingOperation newFaultTolerantPingOperation() {
codec, channelFactory, cacheNameBytes, topologyId, flags(), cfg); codec, channelFactory, cacheNameBytes, topologyId, flags(), cfg);
} }


public QueryOperation newQueryOperation(RemoteQuery remoteQuery) { public QueryOperation newQueryOperation(RemoteQuery remoteQuery, DataFormat dataFormat) {
return new QueryOperation( return new QueryOperation(
codec, channelFactory, cacheNameBytes, topologyId, flags(), cfg, remoteQuery); codec, channelFactory, cacheNameBytes, topologyId, flags(), cfg, remoteQuery, dataFormat);
} }


public SizeOperation newSizeOperation() { public SizeOperation newSizeOperation() {
Expand Down
Expand Up @@ -8,6 +8,7 @@
import java.util.Map; import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;


import org.infinispan.client.hotrod.DataFormat;
import org.infinispan.client.hotrod.configuration.Configuration; import org.infinispan.client.hotrod.configuration.Configuration;
import org.infinispan.client.hotrod.exceptions.HotRodClientException; import org.infinispan.client.hotrod.exceptions.HotRodClientException;
import org.infinispan.client.hotrod.impl.protocol.Codec; import org.infinispan.client.hotrod.impl.protocol.Codec;
Expand Down Expand Up @@ -35,8 +36,8 @@ public final class QueryOperation extends RetryOnFailureOperation<QueryResponse>
private final RemoteQuery remoteQuery; private final RemoteQuery remoteQuery;


public QueryOperation(Codec codec, ChannelFactory channelFactory, byte[] cacheName, AtomicInteger topologyId, public QueryOperation(Codec codec, ChannelFactory channelFactory, byte[] cacheName, AtomicInteger topologyId,
int flags, Configuration cfg, RemoteQuery remoteQuery) { int flags, Configuration cfg, RemoteQuery remoteQuery, DataFormat dataFormat) {
super(QUERY_REQUEST, QUERY_RESPONSE, codec, channelFactory, cacheName, topologyId, flags, cfg, null); super(QUERY_REQUEST, QUERY_RESPONSE, codec, channelFactory, cacheName, topologyId, flags, cfg, dataFormat);
this.remoteQuery = remoteQuery; this.remoteQuery = remoteQuery;
} }


Expand Down
Expand Up @@ -71,7 +71,7 @@ private void executeQuery() {
if (results == null) { if (results == null) {
validateNamedParameters(); validateNamedParameters();


QueryOperation op = cache.getOperationsFactory().newQueryOperation(this); QueryOperation op = cache.getOperationsFactory().newQueryOperation(this, cache.getDataFormat());
QueryResponse response = await(op.execute()); QueryResponse response = await(op.execute());
totalResults = (int) response.getTotalResults(); totalResults = (int) response.getTotalResults();
results = unwrapResults(response.getProjectionSize(), response.getResults()); results = unwrapResults(response.getProjectionSize(), response.getResults());
Expand Down
Expand Up @@ -302,12 +302,12 @@ public int hashCode() {
return Objects.hash(encoderClass, wrapperClass, isKey); return Objects.hash(encoderClass, wrapperClass, isKey);
} }


public static DataConversion newKeyDataConversion(Class<? extends Encoder> encoderClass, Class<? extends Wrapper> wrapperClass, MediaType requestType) { public static DataConversion newKeyDataConversion(Class<? extends Encoder> encoderClass, Class<? extends Wrapper> wrapperClass, MediaType storageType) {
return new DataConversion(encoderClass, wrapperClass, requestType, null, true); return new DataConversion(encoderClass, wrapperClass, null, storageType, true);
} }


public static DataConversion newValueDataConversion(Class<? extends Encoder> encoderClass, Class<? extends Wrapper> wrapperClass, MediaType requestType) { public static DataConversion newValueDataConversion(Class<? extends Encoder> encoderClass, Class<? extends Wrapper> wrapperClass, MediaType storageType) {
return new DataConversion(encoderClass, wrapperClass, requestType, null, false); return new DataConversion(encoderClass, wrapperClass, null, storageType, false);
} }


private static boolean isDefault(DataConversion dataConversion) { private static boolean isDefault(DataConversion dataConversion) {
Expand Down
Expand Up @@ -17,6 +17,7 @@
<module name="org.infinispan.commons" slot="@infinispan.module.slot@" export="true"/> <module name="org.infinispan.commons" slot="@infinispan.module.slot@" export="true"/>
<module name="org.infinispan.core" slot="@infinispan.module.slot@" export="true" services="export"/> <module name="org.infinispan.core" slot="@infinispan.module.slot@" export="true" services="export"/>
<module name="org.infinispan.query" slot="@infinispan.module.slot@" export="true" services="import" optional="true" /> <module name="org.infinispan.query" slot="@infinispan.module.slot@" export="true" services="import" optional="true" />
<module name="org.infinispan.remote-query.client" slot="@infinispan.module.slot@" export="true"/>
<module name="org.infinispan.lucene-directory" slot="@infinispan.module.slot@" export="true" services="import" optional="true" /> <module name="org.infinispan.lucene-directory" slot="@infinispan.module.slot@" export="true" services="import" optional="true" />
<module name="org.infinispan.tasks" services="import" slot="@infinispan.module.slot@"/> <module name="org.infinispan.tasks" services="import" slot="@infinispan.module.slot@"/>
<module name="org.jboss.logging" /> <module name="org.jboss.logging" />
Expand Down
Expand Up @@ -146,6 +146,10 @@ protected SearchIntegrator getSearchFactory() {
return searchFactory; return searchFactory;
} }


public Class<? extends Matcher> getMatcherClass() {
return matcherImplClass;
}

protected BaseQuery buildQuery(QueryFactory queryFactory, IckleParsingResult<TypeMetadata> parsingResult, Map<String, Object> namedParameters, long startOffset, int maxResults) { protected BaseQuery buildQuery(QueryFactory queryFactory, IckleParsingResult<TypeMetadata> parsingResult, Map<String, Object> namedParameters, long startOffset, int maxResults) {
return buildQuery(queryFactory, parsingResult, namedParameters, startOffset, maxResults, IndexedQueryMode.FETCH); return buildQuery(queryFactory, parsingResult, namedParameters, startOffset, maxResults, IndexedQueryMode.FETCH);
} }
Expand Down
Expand Up @@ -15,7 +15,7 @@
* @author anistor@redhat.com * @author anistor@redhat.com
* @since 9.1 * @since 9.1
*/ */
final class Externalizers { public final class Externalizers {


private Externalizers() { private Externalizers() {
} }
Expand Down

This file was deleted.

This file was deleted.

This file was deleted.

Expand Up @@ -7,10 +7,10 @@
import javax.management.ObjectName; import javax.management.ObjectName;


import org.infinispan.commons.CacheException; import org.infinispan.commons.CacheException;
import org.infinispan.commons.configuration.ClassWhiteList;
import org.infinispan.commons.dataconversion.MediaType; import org.infinispan.commons.dataconversion.MediaType;
import org.infinispan.commons.logging.LogFactory; import org.infinispan.commons.logging.LogFactory;
import org.infinispan.commons.marshall.AdvancedExternalizer; import org.infinispan.commons.marshall.AdvancedExternalizer;
import org.infinispan.commons.marshall.Marshaller;
import org.infinispan.commons.util.ServiceFinder; import org.infinispan.commons.util.ServiceFinder;
import org.infinispan.configuration.cache.Configuration; import org.infinispan.configuration.cache.Configuration;
import org.infinispan.configuration.cache.ConfigurationBuilder; import org.infinispan.configuration.cache.ConfigurationBuilder;
Expand All @@ -32,11 +32,11 @@
import org.infinispan.manager.DefaultCacheManager; import org.infinispan.manager.DefaultCacheManager;
import org.infinispan.manager.EmbeddedCacheManager; import org.infinispan.manager.EmbeddedCacheManager;
import org.infinispan.marshall.core.EncoderRegistry; import org.infinispan.marshall.core.EncoderRegistry;
import org.infinispan.objectfilter.Matcher;
import org.infinispan.protostream.SerializationContext; import org.infinispan.protostream.SerializationContext;
import org.infinispan.query.remote.ProtobufMetadataManager; import org.infinispan.query.remote.ProtobufMetadataManager;
import org.infinispan.query.remote.client.BaseProtoStreamMarshaller; import org.infinispan.query.remote.client.Externalizers.QueryRequestExternalizer;
import org.infinispan.query.remote.client.ProtostreamSerializationContextInitializer; import org.infinispan.query.remote.client.ProtostreamSerializationContextInitializer;
import org.infinispan.query.remote.client.QueryRequest;
import org.infinispan.query.remote.impl.dataconversion.ProtostreamBinaryTranscoder; import org.infinispan.query.remote.impl.dataconversion.ProtostreamBinaryTranscoder;
import org.infinispan.query.remote.impl.dataconversion.ProtostreamJsonTranscoder; import org.infinispan.query.remote.impl.dataconversion.ProtostreamJsonTranscoder;
import org.infinispan.query.remote.impl.dataconversion.ProtostreamObjectTranscoder; import org.infinispan.query.remote.impl.dataconversion.ProtostreamObjectTranscoder;
Expand Down Expand Up @@ -82,6 +82,8 @@ public void cacheManagerStarted(GlobalComponentRegistry gcr) {
EncoderRegistry encoderRegistry = gcr.getComponent(EncoderRegistry.class); EncoderRegistry encoderRegistry = gcr.getComponent(EncoderRegistry.class);
encoderRegistry.registerWrapper(ProtobufWrapper.INSTANCE); encoderRegistry.registerWrapper(ProtobufWrapper.INSTANCE);
initProtobufMetadataManager((DefaultCacheManager) cacheManager, gcr); initProtobufMetadataManager((DefaultCacheManager) cacheManager, gcr);
ClassWhiteList classWhiteList = cacheManager.getClassWhiteList();
classWhiteList.addClasses(QueryRequest.class, QueryRequestExternalizer.class);
} }


private void initProtobufMetadataManager(DefaultCacheManager cacheManager, GlobalComponentRegistry gcr) { private void initProtobufMetadataManager(DefaultCacheManager cacheManager, GlobalComponentRegistry gcr) {
Expand All @@ -95,7 +97,7 @@ private void initProtobufMetadataManager(DefaultCacheManager cacheManager, Globa
EncoderRegistry encoderRegistry = gcr.getComponent(EncoderRegistry.class); EncoderRegistry encoderRegistry = gcr.getComponent(EncoderRegistry.class);
encoderRegistry.registerTranscoder(new ProtostreamJsonTranscoder(serCtx)); encoderRegistry.registerTranscoder(new ProtostreamJsonTranscoder(serCtx));
encoderRegistry.registerTranscoder(new ProtostreamTextTranscoder(serCtx)); encoderRegistry.registerTranscoder(new ProtostreamTextTranscoder(serCtx));
encoderRegistry.registerTranscoder(new ProtostreamObjectTranscoder(serCtx)); encoderRegistry.registerTranscoder(new ProtostreamObjectTranscoder(serCtx, classLoader));
encoderRegistry.registerTranscoder(new ProtostreamBinaryTranscoder()); encoderRegistry.registerTranscoder(new ProtostreamBinaryTranscoder());
} }


Expand Down Expand Up @@ -194,22 +196,10 @@ private void createProtobufValueWrapperInterceptor(ComponentRegistry cr, Configu


private RemoteQueryManager buildQueryManager(Configuration cfg, SerializationContext ctx, ComponentRegistry cr) { private RemoteQueryManager buildQueryManager(Configuration cfg, SerializationContext ctx, ComponentRegistry cr) {
ContentTypeConfiguration valueEncoding = cfg.encoding().valueDataType(); ContentTypeConfiguration valueEncoding = cfg.encoding().valueDataType();
boolean compatEnabled = cfg.compatibility().enabled(); MediaType valueStorageMediaType = valueEncoding.mediaType();
if (!compatEnabled) { boolean isObjectStorage = valueStorageMediaType != null && valueStorageMediaType.match(MediaType.APPLICATION_OBJECT);
if (valueEncoding != null) { if (isObjectStorage) return new ObjectRemoteQueryManager(cr);
if (!valueEncoding.isEncodingChanged() || valueEncoding.mediaType() != null && valueEncoding.mediaType().equals(MediaType.APPLICATION_PROTOSTREAM)) { return new ProtobufRemoteQueryManager(ctx, cr);
return new ProtobufRemoteQueryManager(ctx, cr);
}
}
return new GenericCompatRemoteQueryManager(cr);

} else {
Marshaller compatMarshaller = cfg.compatibility().marshaller();
if (compatMarshaller instanceof BaseProtoStreamMarshaller) {
return new ProtostreamCompatRemoteQueryManager(cr);
}
return new GenericCompatRemoteQueryManager(cr);
}
} }


@Override @Override
Expand All @@ -230,8 +220,6 @@ public void cacheStarted(ComponentRegistry cr, String cacheName) {


RemoteQueryManager remoteQueryManager = buildQueryManager(cfg, serCtx, cr); RemoteQueryManager remoteQueryManager = buildQueryManager(cfg, serCtx, cr);


Matcher matcher = remoteQueryManager.getMatcher();
cr.registerComponent(matcher, matcher.getClass());
cr.registerComponent(remoteQueryManager, RemoteQueryManager.class); cr.registerComponent(remoteQueryManager, RemoteQueryManager.class);
} }
} }
Expand Down
@@ -0,0 +1,26 @@
package org.infinispan.query.remote.impl;

import org.hibernate.search.spi.SearchIntegrator;
import org.infinispan.objectfilter.impl.ReflectionMatcher;
import org.infinispan.objectfilter.impl.syntax.parser.EntityNameResolver;
import org.infinispan.query.dsl.embedded.impl.HibernateSearchPropertyHelper;

/**
* @since 9.4
*/
final class ObjectReflectionMatcher extends ReflectionMatcher {

private ObjectReflectionMatcher(HibernateSearchPropertyHelper hibernateSearchPropertyHelper) {
super(hibernateSearchPropertyHelper);
}

private ObjectReflectionMatcher(EntityNameResolver entityNameResolver) {
super(entityNameResolver);
}

static ObjectReflectionMatcher create(EntityNameResolver entityNameResolver, SearchIntegrator searchIntegrator) {
if (searchIntegrator == null) return new ObjectReflectionMatcher(entityNameResolver);
return new ObjectReflectionMatcher(new HibernateSearchPropertyHelper(searchIntegrator, entityNameResolver));
}

}
@@ -0,0 +1,16 @@
package org.infinispan.query.remote.impl;

import org.infinispan.AdvancedCache;
import org.infinispan.commons.dataconversion.IdentityEncoder;
import org.infinispan.objectfilter.Matcher;

/**
* @author anistor@redhat.com
* @since 9.0
*/
final class ObjectRemoteQueryEngine extends BaseRemoteQueryEngine {

ObjectRemoteQueryEngine(AdvancedCache<?, ?> cache, Class<? extends Matcher> matcherImplClass, boolean isIndexed) {
super(cache.getAdvancedCache().withEncoding(IdentityEncoder.class), isIndexed, matcherImplClass, null);
}
}

0 comments on commit 1edc1ae

Please sign in to comment.