Skip to content

Commit

Permalink
ISPN-12684 Lazy start SearchMapping for protobuf caches
Browse files Browse the repository at this point in the history
  • Loading branch information
Gustavo Fernandes committed Feb 3, 2021
1 parent 95199a1 commit 0a219a1
Show file tree
Hide file tree
Showing 7 changed files with 215 additions and 36 deletions.
Expand Up @@ -118,5 +118,9 @@ public void shouldPreventNonIndexedEntities() {
"</infinispan>";

remoteCacheManager.administration().withFlags(VOLATILE).createCache(CACHE_NAME, new XMLStringConfiguration(config));

// The SearchMapping is started lazily for protobuf caches, so the exception only happens after first usage
RemoteCache<Object, Object> cache = remoteCacheManager.getCache(CACHE_NAME);
cache.put("1", new Entity("name"));
}
}
@@ -0,0 +1,145 @@
package org.infinispan.query.remote.impl;

import static org.infinispan.commons.dataconversion.MediaType.APPLICATION_PROTOSTREAM;

import java.util.Collection;
import java.util.Set;

import org.hibernate.search.engine.reporting.FailureHandler;
import org.infinispan.AdvancedCache;
import org.infinispan.Cache;
import org.infinispan.commons.CacheConfigurationException;
import org.infinispan.commons.logging.LogFactory;
import org.infinispan.configuration.cache.IndexingConfiguration;
import org.infinispan.encoding.DataConversion;
import org.infinispan.protostream.SerializationContext;
import org.infinispan.query.impl.EntityLoader;
import org.infinispan.query.remote.impl.logging.Log;
import org.infinispan.query.remote.impl.mapping.SerializationContextSearchMapping;
import org.infinispan.query.remote.impl.util.LazyRef;
import org.infinispan.search.mapper.mapping.SearchIndexedEntity;
import org.infinispan.search.mapper.mapping.SearchMapping;
import org.infinispan.search.mapper.mapping.SearchMappingBuilder;
import org.infinispan.search.mapper.mapping.SearchMappingCommonBuilding;
import org.infinispan.search.mapper.scope.SearchScope;
import org.infinispan.search.mapper.session.SearchSession;
import org.infinispan.search.mapper.work.SearchIndexer;

/**
* @since 12.0
*/
public class LazySearchMapping implements SearchMapping {

private static final Log log = LogFactory.getLog(LazySearchMapping.class, Log.class);

private final Cache<?, ?> cache;
private final ProtobufMetadataManagerImpl protobufMetadataManager;
private final SearchMappingCommonBuilding commonBuilding;
private final EntityLoader<?> entityLoader;
private final SerializationContext serCtx;
private final LazyRef<SearchMapping> searchMappingRef = new LazyRef<>(this::createMapping);

public LazySearchMapping(SearchMappingCommonBuilding commonBuilding, EntityLoader<?> entityLoader,
SerializationContext serCtx, AdvancedCache<?, ?> cache,
ProtobufMetadataManagerImpl protobufMetadataManager) {
this.commonBuilding = commonBuilding;
this.entityLoader = entityLoader;
this.serCtx = serCtx;
this.cache = cache;
this.protobufMetadataManager = protobufMetadataManager;
}

@Override
public <E> SearchScope<E> scope(Collection<? extends Class<? extends E>> types) {
return searchMappingRef.get().scope(types);
}

@Override
public SearchScope<?> scopeAll() {
return searchMappingRef.get().scopeAll();
}

@Override
public FailureHandler getFailureHandler() {
return searchMappingRef.get().getFailureHandler();
}

@Override
public void close() {
searchMappingRef.get().close();
}

@Override
public boolean isClose() {
return searchMappingRef.get().isClose();
}

@Override
public SearchSession getMappingSession() {
return searchMappingRef.get().getMappingSession();
}

@Override
public SearchIndexer getSearchIndexer() {
return searchMappingRef.get().getSearchIndexer();
}

@Override
public SearchIndexedEntity indexedEntity(Class<?> entityType) {
return searchMappingRef.get().indexedEntity(entityType);
}

@Override
public SearchIndexedEntity indexedEntity(String entityName) {
return searchMappingRef.get().indexedEntity(entityName);
}

@Override
public Collection<? extends SearchIndexedEntity> allIndexedEntities() {
return searchMappingRef.get().allIndexedEntities();
}

@Override
public Set<String> allIndexedEntityNames() {
return searchMappingRef.get().allIndexedEntityNames();
}

@Override
public Set<Class<?>> allIndexedEntityJavaClasses() {
return searchMappingRef.get().allIndexedEntityJavaClasses();
}

@Override
public Class<?> toConvertedEntityJavaClass(Object value) {
return searchMappingRef.get().toConvertedEntityJavaClass(value);
}

private SearchMapping createMapping() {
IndexingConfiguration indexingConfiguration = cache.getCacheConfiguration().indexing();
Set<String> indexedEntityTypes = indexingConfiguration.indexedEntityTypes();
DataConversion valueDataConversion = cache.getAdvancedCache().getValueDataConversion();

SearchMapping searchMapping = null;
if (commonBuilding != null) {
SearchMappingBuilder builder = SerializationContextSearchMapping.createBuilder(commonBuilding, entityLoader, indexedEntityTypes, serCtx);
searchMapping = builder != null ? builder.build() : null;
}
if (indexingConfiguration.enabled()) {
if (valueDataConversion.getStorageMediaType().match(APPLICATION_PROTOSTREAM)) {
// Try to resolve the indexed type names to protobuf type names.
Set<String> knownTypes = protobufMetadataManager.getSerializationContext().getGenericDescriptors().keySet();
for (String typeName : indexedEntityTypes) {
if (!knownTypes.contains(typeName)) {
if (searchMapping != null) searchMapping.close();
throw new CacheConfigurationException("The declared indexed type '" + typeName + "' is not known. Please register its proto schema file first.");
}
if (searchMapping == null || searchMapping.indexedEntity(typeName) == null) {
if (searchMapping != null) searchMapping.close();
throw log.typeNotIndexed(typeName);
}
}
}
}
return searchMapping;
}
}
Expand Up @@ -5,13 +5,11 @@
import static org.infinispan.query.remote.client.ProtobufMetadataManagerConstants.PROTOBUF_METADATA_CACHE_NAME;

import java.util.Map;
import java.util.Set;

import javax.management.ObjectName;

import org.infinispan.AdvancedCache;
import org.infinispan.Cache;
import org.infinispan.commons.CacheConfigurationException;
import org.infinispan.commons.CacheException;
import org.infinispan.commons.dataconversion.ByteArrayWrapper;
import org.infinispan.commons.dataconversion.MediaType;
Expand Down Expand Up @@ -47,7 +45,6 @@
import org.infinispan.query.remote.impl.filter.IckleProtobufCacheEventFilterConverter;
import org.infinispan.query.remote.impl.filter.IckleProtobufFilterAndConverter;
import org.infinispan.query.remote.impl.logging.Log;
import org.infinispan.query.remote.impl.mapping.SerializationContextSearchMapping;
import org.infinispan.query.remote.impl.persistence.PersistenceContextInitializerImpl;
import org.infinispan.query.stats.impl.LocalIndexStatistics;
import org.infinispan.registry.InternalCacheRegistry;
Expand Down Expand Up @@ -159,33 +156,16 @@ public void cacheStarting(ComponentRegistry cr, Configuration cfg, String cacheN
.withWrapping(ByteArrayWrapper.class, ProtobufWrapper.class);
KeyTransformationHandler keyTransformationHandler = ComponentRegistryUtils.getKeyTransformationHandler(cache);

searchMapping = SerializationContextSearchMapping.buildMapping(commonBuilding,
new EntityLoader<>(queryStatistics, cache, keyTransformationHandler),
cache.getCacheConfiguration().indexing().indexedEntityTypes(), serCtx);
EntityLoader<?> entityLoader = new EntityLoader<>(queryStatistics, cache, keyTransformationHandler);

if (searchMapping != null) {
cr.registerComponent(searchMapping, SearchMapping.class);
BasicComponentRegistry bcr = cr.getComponent(BasicComponentRegistry.class);
bcr.replaceComponent(IndexStatistics.class.getName(), new LocalIndexStatistics(), true);
bcr.rewire();
}
}
searchMapping = new LazySearchMapping(commonBuilding, entityLoader, serCtx, cache, protobufMetadataManager);

if (cfg.indexing().enabled()) {
AdvancedCache<?, ?> cache = cr.getComponent(Cache.class).getAdvancedCache();
if (cache.getValueDataConversion().getStorageMediaType().match(MediaType.APPLICATION_PROTOSTREAM)) {
// Try to resolve the indexed type names to protobuf type names.
Set<String> knownTypes = protobufMetadataManager.getSerializationContext().getGenericDescriptors().keySet();
for (String typeName : cfg.indexing().indexedEntityTypes()) {
if (!knownTypes.contains(typeName)) {
throw new CacheConfigurationException("The declared indexed type '" + typeName + "' is not known. Please register its proto schema file first.");
}
if (searchMapping == null || searchMapping.indexedEntity(typeName) == null) {
throw log.typeNotIndexed(typeName);
}
}
}
cr.registerComponent(searchMapping, SearchMapping.class);
BasicComponentRegistry bcr = cr.getComponent(BasicComponentRegistry.class);
bcr.replaceComponent(IndexStatistics.class.getName(), new LocalIndexStatistics(), true);
bcr.rewire();
}

}
}

Expand Down
Expand Up @@ -10,7 +10,6 @@
import org.infinispan.query.remote.impl.mapping.reference.GlobalReferenceHolder;
import org.infinispan.query.remote.impl.mapping.typebridge.ProtobufMessageBinder;
import org.infinispan.search.mapper.common.EntityReference;
import org.infinispan.search.mapper.mapping.SearchMapping;
import org.infinispan.search.mapper.mapping.SearchMappingBuilder;
import org.infinispan.search.mapper.mapping.SearchMappingCommonBuilding;

Expand All @@ -19,10 +18,10 @@ public final class SerializationContextSearchMapping {
private SerializationContextSearchMapping() {
}

public static SearchMapping buildMapping(SearchMappingCommonBuilding commonBuilding,
EntityLoader<EntityReference, ?> entityLoader,
Set<String> indexedEntityTypes,
SerializationContext serializationContext) {
public static SearchMappingBuilder createBuilder(SearchMappingCommonBuilding commonBuilding,
EntityLoader<EntityReference, ?> entityLoader,
Set<String> indexedEntityTypes,
SerializationContext serializationContext) {
GlobalReferenceHolder globalReferenceHolder = new GlobalReferenceHolder(serializationContext.getGenericDescriptors());
ProtobufBootstrapIntrospector introspector = new ProtobufBootstrapIntrospector();
SearchMappingBuilder builder = commonBuilding.builder(introspector);
Expand All @@ -46,6 +45,6 @@ public static SearchMapping buildMapping(SearchMappingCommonBuilding commonBuild
builder.addEntityType(byte[].class, fullName);
}

return existIndexedEntities ? builder.build() : null;
return existIndexedEntities ? builder : null;
}
}
@@ -1,6 +1,7 @@
package org.infinispan.rest.resources;

import static org.infinispan.client.rest.configuration.Protocol.HTTP_11;
import static org.infinispan.commons.dataconversion.MediaType.APPLICATION_OBJECT_TYPE;
import static org.infinispan.rest.RequestHeader.KEY_CONTENT_TYPE_HEADER;
import static org.infinispan.rest.helper.RestServerHelper.CLIENT_KEY_STORE;
import static org.infinispan.rest.helper.RestServerHelper.SERVER_KEY_STORE;
Expand Down Expand Up @@ -121,7 +122,7 @@ protected void createCacheManagers() throws Exception {
addClusterEnabledCacheManager(new GlobalConfigurationBuilder().read(configForNode.build()), getDefaultCacheBuilder(), TransportFlags.minimalXsiteFlags());
}
cacheManagers.forEach(this::defineCaches);
cacheManagers.forEach(cm -> cm.defineConfiguration("invalid", getDefaultCacheBuilder().indexing().enabled(true).addIndexedEntities("invalid").build()));
cacheManagers.forEach(cm -> cm.defineConfiguration("invalid", getDefaultCacheBuilder().encoding().mediaType(APPLICATION_OBJECT_TYPE).indexing().enabled(true).addIndexedEntities("invalid").build()));
for (EmbeddedCacheManager cm : cacheManagers) {
cm.getClassAllowList().addClasses(TestClass.class);
waitForClusterToForm(cm.getCacheNames().stream().filter(name -> {
Expand Down
Expand Up @@ -355,6 +355,7 @@ public void testCreateInvalidCache() {
String invalidConfig = "<infinispan>\n" +
" <cache-container>\n" +
" <replicated-cache name=\"books\">\n" +
" <encoding media-type=\"application/x-java-object\"/>\n" +
" <indexing>\n" +
" <indexed-entities>\n" +
" <indexed-entity>Dummy</indexed-entity>\n" +
Expand All @@ -365,7 +366,7 @@ public void testCreateInvalidCache() {
"</infinispan>";

CompletionStage<RestResponse> response = client.cache("CACHE").createWithConfiguration(RestEntity.create(APPLICATION_XML, invalidConfig));
assertThat(response).isBadRequest().hasReturnedText("The declared indexed type 'Dummy' is not known. Please register its proto schema file first.");
assertThat(response).isBadRequest().hasReturnedText("Unable to instantiate class Dummy");

response = client.cache("CACHE").exists();
assertThat(response).isOk();
Expand Down Expand Up @@ -742,10 +743,54 @@ public void testSearchStatistics() {
assertThat(statJson.at("index").at("types").at("Entity").at("size").asLong())
.isGreaterThan(MIN_NON_EMPTY_INDEX_SIZE);
assertThat(statJson.at("index").at("types").at("Another").at("size").asLong())
.isGreaterThan(MIN_NON_EMPTY_INDEX_SIZE);
.isGreaterThan(MIN_NON_EMPTY_INDEX_SIZE);
assertFalse(statJson.at("index").at("reindexing").asBoolean());
}

@Test
public void testLazySearchMapping() {
String proto = " package future;\n" +
" /* @Indexed */\n" +
" message Entity {\n" +
" /* @Field */\n" +
" optional string name=1;\n" +
" }";

String value = Json.object().set("_type", "future.Entity").set("name", "Kim").toString();
RestEntity restEntity = RestEntity.create(APPLICATION_JSON, value);

// Create a cache with a declared, not yet registered protobuf entity
ConfigurationBuilder builder = new ConfigurationBuilder();
builder.indexing().enable().storage(LOCAL_HEAP).addIndexedEntities("future.Entity");
String cacheConfig = new JsonWriter().toJSON(builder.build());

RestCacheClient cacheClient = client.cache("index-lazy");
RestEntity config = RestEntity.create(APPLICATION_JSON, cacheConfig);

CompletionStage<RestResponse> response = cacheClient.createWithConfiguration(config);
assertThat(response).isOk();

// Queries should return error
RestResponse restResponse = join(cacheClient.query("From future.Entity"));
assertThat(restResponse).containsReturnedText("Unknown type name : future.Entity");

// Writes too
restResponse = join(cacheClient.put("key", restEntity));
assertThat(restResponse).containsReturnedText("Unknown type name : future.Entity");

// Register the protobuf
restResponse = join(client.schemas().put("future.proto", proto));
assertThat(restResponse).isOk();

// All operations should work
restResponse = join(cacheClient.put("key", restEntity));
assertThat(restResponse).isOk();

restResponse = join(cacheClient.query("From future.Entity"));
assertThat(restResponse).isOk();
assertThat(restResponse).containsReturnedText("Kim");
}

private void assertQueryStatEmpty(Json queryTypeStats) {
assertEquals(0, queryTypeStats.at("count").asInteger());
assertEquals(0, queryTypeStats.at("max").asInteger());
Expand Down
Expand Up @@ -70,6 +70,11 @@ public void shouldPreventNonIndexedEntities() {

RestCacheClient cacheClient = client.cache(CACHE_NAME);
response = cacheClient.createWithConfiguration(configEntity, VOLATILE);
// The SearchMapping is started lazily, creating and starting the cache will not throw errors
ResponseAssertion.assertThat(response).isOk();

// Force initialization of the SearchMapping
response = cacheClient.query("FROM NonIndexed");
ResponseAssertion.assertThat(response).isBadRequest();
ResponseAssertion.assertThat(response).containsReturnedText("The configured indexed-entity type 'NonIndexed' must be indexed. Please annotate it with @Indexed");
}
Expand Down

0 comments on commit 0a219a1

Please sign in to comment.