Skip to content

Commit

Permalink
ISPN-12895 Upgrade to protostream 4.4.0.Beta3
Browse files Browse the repository at this point in the history
* register by default some generated schemas for useful java types(collections mostly) from protostream-types
* update some server tasks tests to ensure protobuf marshalling of ArrayLists works with ServerTasks
  • Loading branch information
anistor committed Apr 3, 2021
1 parent bb103db commit ca15960
Show file tree
Hide file tree
Showing 15 changed files with 146 additions and 30 deletions.
5 changes: 5 additions & 0 deletions build-configuration/bom/pom.xml
Expand Up @@ -403,6 +403,11 @@
<artifactId>protostream</artifactId>
<version>${version.protostream}</version>
</dependency>
<dependency>
<groupId>org.infinispan.protostream</groupId>
<artifactId>protostream-types</artifactId>
<version>${version.protostream}</version>
</dependency>
<dependency>
<groupId>org.infinispan.protostream</groupId>
<artifactId>protostream-processor</artifactId>
Expand Down
2 changes: 1 addition & 1 deletion build-configuration/pom.xml
Expand Up @@ -187,7 +187,7 @@
<version.org.wildfly.openssl>1.0.12.Final</version.org.wildfly.openssl>
<version.picketbox>5.0.3.Final</version.picketbox>
<version.picketlink>2.5.5.SP12</version.picketlink>
<version.protostream>4.4.0.Beta1</version.protostream>
<version.protostream>4.4.0.Beta3</version.protostream>
<version.protostuff>1.6.2</version.protostuff>
<version.reactivestreams>1.0.3</version.reactivestreams>
<version.rocksdb>6.15.5</version.rocksdb>
Expand Down
Expand Up @@ -405,12 +405,35 @@ private void registerProtoStreamMarshaller() {

private void initializeProtoStreamMarshaller(ProtoStreamMarshaller protoMarshaller) {
SerializationContext ctx = protoMarshaller.getSerializationContext();

// First things first! Register some useful builtin schemas, which the user can override later.
registerDefaultSchemas(ctx,
"org.infinispan.protostream.types.java.CommonContainerTypesSchema",
"org.infinispan.protostream.types.java.CommonTypesSchema");

// Register the configured schemas.
for (SerializationContextInitializer sci : configuration.getContextInitializers()) {
sci.registerSchema(ctx);
sci.registerMarshallers(ctx);
}
}

private static void registerDefaultSchemas(SerializationContext ctx, String... classNames) {
for (String className : classNames) {
SerializationContextInitializer sci;
try {
Class<?> clazz = Class.forName(className);
Object instance = clazz.getDeclaredConstructor().newInstance();
sci = (SerializationContextInitializer) instance;
} catch (Exception e) {
log.failedToCreatePredefinedSerializationContextInitializer(className, e);
continue;
}
sci.registerSchema(ctx);
sci.registerMarshallers(ctx);
}
}

public ChannelFactory createChannelFactory() {
return new ChannelFactory();
}
Expand Down
Expand Up @@ -367,4 +367,8 @@ public interface Log extends BasicLogger {

@Message(value = "Near cache with bloom filter requires pool max active to be 1, was %s, and exhausted action to be WAIT, was %s", id = 4103)
CacheConfigurationException bloomFilterRequiresMaxActiveOneAndWait(int maxActive, ExhaustedAction action);

@LogMessage(level = WARN)
@Message(value = "Failed to load and create an optional ProtoStream serialization context initializer: %s", id = 4104)
void failedToCreatePredefinedSerializationContextInitializer(String className, @Cause Throwable throwable);
}
5 changes: 5 additions & 0 deletions commons/all/pom.xml
Expand Up @@ -19,6 +19,11 @@
<artifactId>protostream</artifactId>
</dependency>

<dependency>
<groupId>org.infinispan.protostream</groupId>
<artifactId>protostream-types</artifactId>
</dependency>

<dependency>
<groupId>org.infinispan.protostream</groupId>
<artifactId>protostream-processor</artifactId>
Expand Down
5 changes: 5 additions & 0 deletions core/pom.xml
Expand Up @@ -28,6 +28,11 @@
<artifactId>protostream</artifactId>
</dependency>

<dependency>
<groupId>org.infinispan.protostream</groupId>
<artifactId>protostream-types</artifactId>
</dependency>

<dependency>
<groupId>org.infinispan.protostream</groupId>
<artifactId>protostream-processor</artifactId>
Expand Down
Expand Up @@ -26,6 +26,8 @@
import org.infinispan.protostream.ProtobufUtil;
import org.infinispan.protostream.SerializationContext;
import org.infinispan.protostream.SerializationContextInitializer;
import org.infinispan.protostream.types.java.CommonContainerTypesSchema;
import org.infinispan.protostream.types.java.CommonTypesSchema;

@Scope(Scopes.GLOBAL)
public class SerializationContextRegistryImpl implements SerializationContextRegistry {
Expand All @@ -40,6 +42,11 @@ public class SerializationContextRegistryImpl implements SerializationContextReg

@Start
public void start() {
CommonTypesSchema commonTypesSchema = new CommonTypesSchema();
CommonContainerTypesSchema commonContainerTypesSchema = new CommonContainerTypesSchema();
register(commonTypesSchema, user);
register(commonContainerTypesSchema, user);

// Add user configured SCIs
Collection<SerializationContextInitializer> initializers = globalConfig.serialization().contextInitializers();
if (initializers == null || initializers.isEmpty()) {
Expand All @@ -50,18 +57,20 @@ public void start() {

String messageName = PersistenceContextInitializer.getFqTypeName(MarshallableUserObject.class);
BaseMarshaller userObjectMarshaller = new MarshallableUserObject.Marshaller(messageName, userMarshaller.wired());
update(GLOBAL, ctx ->
ctx.addContextIntializer(new PersistenceContextInitializerImpl())
// Register Commons util so that KeyValueWithPrevious can be used with JCache remote
.addContextIntializer(new org.infinispan.commons.GlobalContextInitializerImpl())
.addMarshaller(userObjectMarshaller)
.update()
update(GLOBAL, ctx -> ctx.addContextInitializer(commonTypesSchema)
.addContextInitializer(commonContainerTypesSchema)
.addContextInitializer(new PersistenceContextInitializerImpl())
// Register Commons util so that KeyValueWithPrevious can be used with JCache remote
.addContextInitializer(new org.infinispan.commons.GlobalContextInitializerImpl())
.addMarshaller(userObjectMarshaller)
.update()
);

update(PERSISTENCE, ctx ->
ctx.addContextIntializer(new PersistenceContextInitializerImpl())
.addMarshaller(userObjectMarshaller)
.update()
update(PERSISTENCE, ctx -> ctx.addContextInitializer(commonTypesSchema)
.addContextInitializer(commonContainerTypesSchema)
.addContextInitializer(new PersistenceContextInitializerImpl())
.addMarshaller(userObjectMarshaller)
.update()
);
}

Expand All @@ -82,7 +91,7 @@ public ImmutableSerializationContext getUserCtx() {

@Override
public void addContextInitializer(MarshallerType type, SerializationContextInitializer sci) {
update(type, ctx -> ctx.addContextIntializer(sci).update());
update(type, ctx -> ctx.addContextInitializer(sci).update());
}

@Override
Expand All @@ -107,27 +116,20 @@ private void update(MarshallerType type, Consumer<MarshallerContext> consumer) {
}
}

private static void register(SerializationContextInitializer sci, SerializationContext... ctxs) {
for (SerializationContext ctx : ctxs) {
sci.registerSchema(ctx);
sci.registerMarshallers(ctx);
}
private static void register(SerializationContextInitializer sci, SerializationContext ctx) {
sci.registerSchema(ctx);
sci.registerMarshallers(ctx);
}

// Required until IPROTO-136 is resolved to ensure that custom marshaller implementations are not overridden by
// non-core modules registering their SerializationContextInitializer(s) which depend on a core initializer.
static class MarshallerContext {
private static final class MarshallerContext {
private final List<SerializationContextInitializer> initializers = new ArrayList<>();
private final List<FileDescriptorSource> schemas = new ArrayList<>();
private final List<BaseMarshaller<?>> marshallers = new ArrayList<>();
private final SerializationContext ctx = ProtobufUtil.newSerializationContext();

MarshallerContext addContextIntializers(List<SerializationContextInitializer> scis) {
initializers.addAll(scis);
return this;
}

MarshallerContext addContextIntializer(SerializationContextInitializer sci) {
MarshallerContext addContextInitializer(SerializationContextInitializer sci) {
initializers.add(sci);
return this;
}
Expand Down
6 changes: 6 additions & 0 deletions query-core/pom.xml
Expand Up @@ -41,6 +41,12 @@
<optional>true</optional>
</dependency>

<dependency>
<groupId>org.infinispan.protostream</groupId>
<artifactId>protostream-types</artifactId>
<optional>true</optional>
</dependency>

<dependency>
<groupId>org.infinispan.protostream</groupId>
<artifactId>protostream-processor</artifactId>
Expand Down
5 changes: 5 additions & 0 deletions remote-query/remote-query-client/pom.xml
Expand Up @@ -23,6 +23,11 @@
<artifactId>protostream</artifactId>
</dependency>

<dependency>
<groupId>org.infinispan.protostream</groupId>
<artifactId>protostream-types</artifactId>
</dependency>

<dependency>
<groupId>org.infinispan</groupId>
<artifactId>infinispan-commons-test</artifactId>
Expand Down
10 changes: 10 additions & 0 deletions remote-query/remote-query-server/pom.xml
Expand Up @@ -42,6 +42,16 @@
<artifactId>infinispan-remote-query-client</artifactId>
</dependency>

<dependency>
<groupId>org.infinispan.protostream</groupId>
<artifactId>protostream</artifactId>
</dependency>

<dependency>
<groupId>org.infinispan.protostream</groupId>
<artifactId>protostream-types</artifactId>
</dependency>

<dependency>
<groupId>org.infinispan</groupId>
<artifactId>infinispan-server-core</artifactId>
Expand Down
Expand Up @@ -39,6 +39,8 @@
import org.infinispan.protostream.SerializationContext;
import org.infinispan.protostream.SerializationContextInitializer;
import org.infinispan.protostream.config.Configuration;
import org.infinispan.protostream.types.java.CommonContainerTypesSchema;
import org.infinispan.protostream.types.java.CommonTypesSchema;
import org.infinispan.query.remote.ProtobufMetadataManager;
import org.infinispan.query.remote.client.ProtobufMetadataManagerConstants;
import org.infinispan.query.remote.client.impl.MarshallerRegistration;
Expand Down Expand Up @@ -86,6 +88,13 @@ public ProtobufMetadataManagerImpl() {
} catch (DescriptorParserException e) {
throw new CacheException("Failed to initialise the Protobuf serialization context", e);
}
register(new CommonTypesSchema());
register(new CommonContainerTypesSchema());
}

void register(SerializationContextInitializer initializer) {
initializer.registerSchema(getSerializationContext());
initializer.registerMarshallers(getSerializationContext());
}

@Start
Expand Down
4 changes: 4 additions & 0 deletions server/hotrod/pom.xml
Expand Up @@ -53,6 +53,10 @@
<groupId>org.infinispan.protostream</groupId>
<artifactId>protostream</artifactId>
</dependency>
<dependency>
<groupId>org.infinispan.protostream</groupId>
<artifactId>protostream-types</artifactId>
</dependency>
<dependency>
<groupId>io.projectreactor.tools</groupId>
<artifactId>blockhound</artifactId>
Expand Down
@@ -1,6 +1,9 @@
package org.infinispan.server.extensions;

import org.infinispan.manager.EmbeddedCacheManager;
import java.util.ArrayList;
import java.util.Collection;

import org.infinispan.remoting.transport.Address;
import org.infinispan.tasks.ServerTask;
import org.infinispan.tasks.TaskContext;
import org.infinispan.tasks.TaskExecutionMode;
Expand All @@ -10,6 +13,7 @@
* @since 11.0
**/
public class DistributedHelloServerTask implements ServerTask {

private TaskContext taskContext;

@Override
Expand All @@ -19,9 +23,22 @@ public void setTaskContext(TaskContext taskContext) {

@Override
public Object call() {
EmbeddedCacheManager cacheManager = taskContext.getCacheManager();
Address address = taskContext.getCacheManager().getAddress();
Object greetee = taskContext.getParameters().get().get("greetee");
return String.format("Hello %s from %s", greetee == null ? "world" : greetee, cacheManager.getAddress());

// if we're dealing with a Collections of greetees we'll greet them individually
if (greetee instanceof Collection) {
ArrayList<String> messages = new ArrayList<>();
for (Object o : (Collection<?>) greetee) {
messages.add(greet(o, address));
}
return messages;
}
return greet(greetee, address);
}

private String greet(Object greetee, Address address) {
return String.format("Hello %s from %s", greetee == null ? "world" : greetee, address);
}

@Override
Expand Down
@@ -1,5 +1,8 @@
package org.infinispan.server.extensions;

import java.util.ArrayList;
import java.util.Collection;

import org.infinispan.tasks.ServerTask;
import org.infinispan.tasks.TaskContext;

Expand All @@ -8,6 +11,7 @@
* @since 10.0
**/
public class HelloServerTask implements ServerTask {

private TaskContext taskContext;

@Override
Expand All @@ -18,12 +22,25 @@ public void setTaskContext(TaskContext taskContext) {
@Override
public Object call() {
Object greetee = taskContext.getParameters().get().get("greetee");

// if we're dealing with a Collections of greetees we'll greet them individually
if (greetee instanceof Collection) {
ArrayList<String> messages = new ArrayList<>();
for (Object o : (Collection<?>) greetee) {
messages.add(greet(o));
}
return messages;
}

return greet(greetee);
}

private String greet(Object greetee) {
return greetee == null ? "Hello world" : "Hello " + greetee;
}

@Override
public String getName() {
return "hello";
}

}
Expand Up @@ -3,6 +3,8 @@
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;

Expand Down Expand Up @@ -36,8 +38,10 @@ public void testServerTaskNoParameters() {
@Test
public void testServerTaskWithParameters() {
RemoteCache<String, String> cache = SERVER_TEST.hotrod().create();
String hello = cache.execute("hello", Collections.singletonMap("greetee", "my friend"));
assertEquals("Hello my friend", hello);
ArrayList<String> messages = cache.execute("hello", Collections.singletonMap("greetee", new ArrayList<>(Arrays.asList("nurse", "kitty"))));
assertEquals(2, messages.size());
assertEquals("Hello nurse", messages.get(0));
assertEquals("Hello kitty", messages.get(1));
}

@Test
Expand Down

0 comments on commit ca15960

Please sign in to comment.