From ca15960e5caa8e0af4c943593586eacfca3a1c73 Mon Sep 17 00:00:00 2001 From: Adrian Nistor Date: Mon, 29 Mar 2021 11:40:38 +0300 Subject: [PATCH] ISPN-12895 Upgrade to protostream 4.4.0.Beta3 * register by default some generated schemas for useful java types(collections mostly) from protostream-types * update some server tasks tests to ensure protobuf marshalling of ArrayLists works with ServerTasks --- build-configuration/bom/pom.xml | 5 ++ build-configuration/pom.xml | 2 +- .../client/hotrod/RemoteCacheManager.java | 23 +++++++++ .../infinispan/client/hotrod/logging/Log.java | 4 ++ commons/all/pom.xml | 5 ++ core/pom.xml | 5 ++ .../SerializationContextRegistryImpl.java | 48 ++++++++++--------- query-core/pom.xml | 6 +++ remote-query/remote-query-client/pom.xml | 5 ++ remote-query/remote-query-server/pom.xml | 10 ++++ .../impl/ProtobufMetadataManagerImpl.java | 9 ++++ server/hotrod/pom.xml | 4 ++ .../DistributedHelloServerTask.java | 23 +++++++-- .../server/extensions/HelloServerTask.java | 19 +++++++- .../server/extensions/ServerTasks.java | 8 +++- 15 files changed, 146 insertions(+), 30 deletions(-) diff --git a/build-configuration/bom/pom.xml b/build-configuration/bom/pom.xml index 4e9d615a9bb9..2fd9debb0dd9 100644 --- a/build-configuration/bom/pom.xml +++ b/build-configuration/bom/pom.xml @@ -403,6 +403,11 @@ protostream ${version.protostream} + + org.infinispan.protostream + protostream-types + ${version.protostream} + org.infinispan.protostream protostream-processor diff --git a/build-configuration/pom.xml b/build-configuration/pom.xml index 588d46b75abd..e2636d9deac6 100644 --- a/build-configuration/pom.xml +++ b/build-configuration/pom.xml @@ -187,7 +187,7 @@ 1.0.12.Final 5.0.3.Final 2.5.5.SP12 - 4.4.0.Beta1 + 4.4.0.Beta3 1.6.2 1.0.3 6.15.5 diff --git a/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/RemoteCacheManager.java b/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/RemoteCacheManager.java index 4cb1cf6acb61..0415ce44fef4 100644 --- a/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/RemoteCacheManager.java +++ b/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/RemoteCacheManager.java @@ -405,12 +405,35 @@ private void registerProtoStreamMarshaller() { private void initializeProtoStreamMarshaller(ProtoStreamMarshaller protoMarshaller) { SerializationContext ctx = protoMarshaller.getSerializationContext(); + + // First things first! Register some useful builtin schemas, which the user can override later. + registerDefaultSchemas(ctx, + "org.infinispan.protostream.types.java.CommonContainerTypesSchema", + "org.infinispan.protostream.types.java.CommonTypesSchema"); + + // Register the configured schemas. for (SerializationContextInitializer sci : configuration.getContextInitializers()) { sci.registerSchema(ctx); sci.registerMarshallers(ctx); } } + private static void registerDefaultSchemas(SerializationContext ctx, String... classNames) { + for (String className : classNames) { + SerializationContextInitializer sci; + try { + Class clazz = Class.forName(className); + Object instance = clazz.getDeclaredConstructor().newInstance(); + sci = (SerializationContextInitializer) instance; + } catch (Exception e) { + log.failedToCreatePredefinedSerializationContextInitializer(className, e); + continue; + } + sci.registerSchema(ctx); + sci.registerMarshallers(ctx); + } + } + public ChannelFactory createChannelFactory() { return new ChannelFactory(); } diff --git a/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/logging/Log.java b/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/logging/Log.java index 9810af0c6f40..c7367b763fac 100644 --- a/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/logging/Log.java +++ b/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/logging/Log.java @@ -367,4 +367,8 @@ public interface Log extends BasicLogger { @Message(value = "Near cache with bloom filter requires pool max active to be 1, was %s, and exhausted action to be WAIT, was %s", id = 4103) CacheConfigurationException bloomFilterRequiresMaxActiveOneAndWait(int maxActive, ExhaustedAction action); + + @LogMessage(level = WARN) + @Message(value = "Failed to load and create an optional ProtoStream serialization context initializer: %s", id = 4104) + void failedToCreatePredefinedSerializationContextInitializer(String className, @Cause Throwable throwable); } diff --git a/commons/all/pom.xml b/commons/all/pom.xml index ed5fabc2594d..6c2daf51ac10 100644 --- a/commons/all/pom.xml +++ b/commons/all/pom.xml @@ -19,6 +19,11 @@ protostream + + org.infinispan.protostream + protostream-types + + org.infinispan.protostream protostream-processor diff --git a/core/pom.xml b/core/pom.xml index fe483bd50243..2b1fc8d0c02e 100644 --- a/core/pom.xml +++ b/core/pom.xml @@ -28,6 +28,11 @@ protostream + + org.infinispan.protostream + protostream-types + + org.infinispan.protostream protostream-processor diff --git a/core/src/main/java/org/infinispan/marshall/protostream/impl/SerializationContextRegistryImpl.java b/core/src/main/java/org/infinispan/marshall/protostream/impl/SerializationContextRegistryImpl.java index 6ce588481f38..0afd9e53b4e6 100644 --- a/core/src/main/java/org/infinispan/marshall/protostream/impl/SerializationContextRegistryImpl.java +++ b/core/src/main/java/org/infinispan/marshall/protostream/impl/SerializationContextRegistryImpl.java @@ -26,6 +26,8 @@ import org.infinispan.protostream.ProtobufUtil; import org.infinispan.protostream.SerializationContext; import org.infinispan.protostream.SerializationContextInitializer; +import org.infinispan.protostream.types.java.CommonContainerTypesSchema; +import org.infinispan.protostream.types.java.CommonTypesSchema; @Scope(Scopes.GLOBAL) public class SerializationContextRegistryImpl implements SerializationContextRegistry { @@ -40,6 +42,11 @@ public class SerializationContextRegistryImpl implements SerializationContextReg @Start public void start() { + CommonTypesSchema commonTypesSchema = new CommonTypesSchema(); + CommonContainerTypesSchema commonContainerTypesSchema = new CommonContainerTypesSchema(); + register(commonTypesSchema, user); + register(commonContainerTypesSchema, user); + // Add user configured SCIs Collection initializers = globalConfig.serialization().contextInitializers(); if (initializers == null || initializers.isEmpty()) { @@ -50,18 +57,20 @@ public void start() { String messageName = PersistenceContextInitializer.getFqTypeName(MarshallableUserObject.class); BaseMarshaller userObjectMarshaller = new MarshallableUserObject.Marshaller(messageName, userMarshaller.wired()); - update(GLOBAL, ctx -> - ctx.addContextIntializer(new PersistenceContextInitializerImpl()) - // Register Commons util so that KeyValueWithPrevious can be used with JCache remote - .addContextIntializer(new org.infinispan.commons.GlobalContextInitializerImpl()) - .addMarshaller(userObjectMarshaller) - .update() + update(GLOBAL, ctx -> ctx.addContextInitializer(commonTypesSchema) + .addContextInitializer(commonContainerTypesSchema) + .addContextInitializer(new PersistenceContextInitializerImpl()) + // Register Commons util so that KeyValueWithPrevious can be used with JCache remote + .addContextInitializer(new org.infinispan.commons.GlobalContextInitializerImpl()) + .addMarshaller(userObjectMarshaller) + .update() ); - update(PERSISTENCE, ctx -> - ctx.addContextIntializer(new PersistenceContextInitializerImpl()) - .addMarshaller(userObjectMarshaller) - .update() + update(PERSISTENCE, ctx -> ctx.addContextInitializer(commonTypesSchema) + .addContextInitializer(commonContainerTypesSchema) + .addContextInitializer(new PersistenceContextInitializerImpl()) + .addMarshaller(userObjectMarshaller) + .update() ); } @@ -82,7 +91,7 @@ public ImmutableSerializationContext getUserCtx() { @Override public void addContextInitializer(MarshallerType type, SerializationContextInitializer sci) { - update(type, ctx -> ctx.addContextIntializer(sci).update()); + update(type, ctx -> ctx.addContextInitializer(sci).update()); } @Override @@ -107,27 +116,20 @@ private void update(MarshallerType type, Consumer consumer) { } } - private static void register(SerializationContextInitializer sci, SerializationContext... ctxs) { - for (SerializationContext ctx : ctxs) { - sci.registerSchema(ctx); - sci.registerMarshallers(ctx); - } + private static void register(SerializationContextInitializer sci, SerializationContext ctx) { + sci.registerSchema(ctx); + sci.registerMarshallers(ctx); } // Required until IPROTO-136 is resolved to ensure that custom marshaller implementations are not overridden by // non-core modules registering their SerializationContextInitializer(s) which depend on a core initializer. - static class MarshallerContext { + private static final class MarshallerContext { private final List initializers = new ArrayList<>(); private final List schemas = new ArrayList<>(); private final List> marshallers = new ArrayList<>(); private final SerializationContext ctx = ProtobufUtil.newSerializationContext(); - MarshallerContext addContextIntializers(List scis) { - initializers.addAll(scis); - return this; - } - - MarshallerContext addContextIntializer(SerializationContextInitializer sci) { + MarshallerContext addContextInitializer(SerializationContextInitializer sci) { initializers.add(sci); return this; } diff --git a/query-core/pom.xml b/query-core/pom.xml index d7ce49220fb6..b9bda50c84c2 100644 --- a/query-core/pom.xml +++ b/query-core/pom.xml @@ -41,6 +41,12 @@ true + + org.infinispan.protostream + protostream-types + true + + org.infinispan.protostream protostream-processor diff --git a/remote-query/remote-query-client/pom.xml b/remote-query/remote-query-client/pom.xml index 366501e092ed..eb4f7954a992 100644 --- a/remote-query/remote-query-client/pom.xml +++ b/remote-query/remote-query-client/pom.xml @@ -23,6 +23,11 @@ protostream + + org.infinispan.protostream + protostream-types + + org.infinispan infinispan-commons-test diff --git a/remote-query/remote-query-server/pom.xml b/remote-query/remote-query-server/pom.xml index 155c3d057a99..7fa8dcdbc626 100644 --- a/remote-query/remote-query-server/pom.xml +++ b/remote-query/remote-query-server/pom.xml @@ -42,6 +42,16 @@ infinispan-remote-query-client + + org.infinispan.protostream + protostream + + + + org.infinispan.protostream + protostream-types + + org.infinispan infinispan-server-core diff --git a/remote-query/remote-query-server/src/main/java/org/infinispan/query/remote/impl/ProtobufMetadataManagerImpl.java b/remote-query/remote-query-server/src/main/java/org/infinispan/query/remote/impl/ProtobufMetadataManagerImpl.java index e56eac7417e4..e1e87485fae0 100644 --- a/remote-query/remote-query-server/src/main/java/org/infinispan/query/remote/impl/ProtobufMetadataManagerImpl.java +++ b/remote-query/remote-query-server/src/main/java/org/infinispan/query/remote/impl/ProtobufMetadataManagerImpl.java @@ -39,6 +39,8 @@ import org.infinispan.protostream.SerializationContext; import org.infinispan.protostream.SerializationContextInitializer; import org.infinispan.protostream.config.Configuration; +import org.infinispan.protostream.types.java.CommonContainerTypesSchema; +import org.infinispan.protostream.types.java.CommonTypesSchema; import org.infinispan.query.remote.ProtobufMetadataManager; import org.infinispan.query.remote.client.ProtobufMetadataManagerConstants; import org.infinispan.query.remote.client.impl.MarshallerRegistration; @@ -86,6 +88,13 @@ public ProtobufMetadataManagerImpl() { } catch (DescriptorParserException e) { throw new CacheException("Failed to initialise the Protobuf serialization context", e); } + register(new CommonTypesSchema()); + register(new CommonContainerTypesSchema()); + } + + void register(SerializationContextInitializer initializer) { + initializer.registerSchema(getSerializationContext()); + initializer.registerMarshallers(getSerializationContext()); } @Start diff --git a/server/hotrod/pom.xml b/server/hotrod/pom.xml index 0fb67719c439..7fef79721a61 100644 --- a/server/hotrod/pom.xml +++ b/server/hotrod/pom.xml @@ -53,6 +53,10 @@ org.infinispan.protostream protostream + + org.infinispan.protostream + protostream-types + io.projectreactor.tools blockhound diff --git a/server/tests/src/test/java/org/infinispan/server/extensions/DistributedHelloServerTask.java b/server/tests/src/test/java/org/infinispan/server/extensions/DistributedHelloServerTask.java index 6b34641c9c35..d88d122e155d 100644 --- a/server/tests/src/test/java/org/infinispan/server/extensions/DistributedHelloServerTask.java +++ b/server/tests/src/test/java/org/infinispan/server/extensions/DistributedHelloServerTask.java @@ -1,6 +1,9 @@ package org.infinispan.server.extensions; -import org.infinispan.manager.EmbeddedCacheManager; +import java.util.ArrayList; +import java.util.Collection; + +import org.infinispan.remoting.transport.Address; import org.infinispan.tasks.ServerTask; import org.infinispan.tasks.TaskContext; import org.infinispan.tasks.TaskExecutionMode; @@ -10,6 +13,7 @@ * @since 11.0 **/ public class DistributedHelloServerTask implements ServerTask { + private TaskContext taskContext; @Override @@ -19,9 +23,22 @@ public void setTaskContext(TaskContext taskContext) { @Override public Object call() { - EmbeddedCacheManager cacheManager = taskContext.getCacheManager(); + Address address = taskContext.getCacheManager().getAddress(); Object greetee = taskContext.getParameters().get().get("greetee"); - return String.format("Hello %s from %s", greetee == null ? "world" : greetee, cacheManager.getAddress()); + + // if we're dealing with a Collections of greetees we'll greet them individually + if (greetee instanceof Collection) { + ArrayList messages = new ArrayList<>(); + for (Object o : (Collection) greetee) { + messages.add(greet(o, address)); + } + return messages; + } + return greet(greetee, address); + } + + private String greet(Object greetee, Address address) { + return String.format("Hello %s from %s", greetee == null ? "world" : greetee, address); } @Override diff --git a/server/tests/src/test/java/org/infinispan/server/extensions/HelloServerTask.java b/server/tests/src/test/java/org/infinispan/server/extensions/HelloServerTask.java index c7220ece5756..68a07eb34267 100644 --- a/server/tests/src/test/java/org/infinispan/server/extensions/HelloServerTask.java +++ b/server/tests/src/test/java/org/infinispan/server/extensions/HelloServerTask.java @@ -1,5 +1,8 @@ package org.infinispan.server.extensions; +import java.util.ArrayList; +import java.util.Collection; + import org.infinispan.tasks.ServerTask; import org.infinispan.tasks.TaskContext; @@ -8,6 +11,7 @@ * @since 10.0 **/ public class HelloServerTask implements ServerTask { + private TaskContext taskContext; @Override @@ -18,6 +22,20 @@ public void setTaskContext(TaskContext taskContext) { @Override public Object call() { Object greetee = taskContext.getParameters().get().get("greetee"); + + // if we're dealing with a Collections of greetees we'll greet them individually + if (greetee instanceof Collection) { + ArrayList messages = new ArrayList<>(); + for (Object o : (Collection) greetee) { + messages.add(greet(o)); + } + return messages; + } + + return greet(greetee); + } + + private String greet(Object greetee) { return greetee == null ? "Hello world" : "Hello " + greetee; } @@ -25,5 +43,4 @@ public Object call() { public String getName() { return "hello"; } - } diff --git a/server/tests/src/test/java/org/infinispan/server/extensions/ServerTasks.java b/server/tests/src/test/java/org/infinispan/server/extensions/ServerTasks.java index cabe99c1b953..2feb4e0f6b38 100644 --- a/server/tests/src/test/java/org/infinispan/server/extensions/ServerTasks.java +++ b/server/tests/src/test/java/org/infinispan/server/extensions/ServerTasks.java @@ -3,6 +3,8 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; +import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; import java.util.List; @@ -36,8 +38,10 @@ public void testServerTaskNoParameters() { @Test public void testServerTaskWithParameters() { RemoteCache cache = SERVER_TEST.hotrod().create(); - String hello = cache.execute("hello", Collections.singletonMap("greetee", "my friend")); - assertEquals("Hello my friend", hello); + ArrayList messages = cache.execute("hello", Collections.singletonMap("greetee", new ArrayList<>(Arrays.asList("nurse", "kitty")))); + assertEquals(2, messages.size()); + assertEquals("Hello nurse", messages.get(0)); + assertEquals("Hello kitty", messages.get(1)); } @Test