From dba0479bf9bb9b41fca2132b7529aaafd6420e4f Mon Sep 17 00:00:00 2001 From: atezs82 Date: Fri, 21 May 2021 15:07:40 +0200 Subject: [PATCH] [api][pulsar-client] make AuthenticationToken serializable (#10574) ### Motivation When trying to authenticate Pulsar in distributed systems (a very good example of this is the Spark connector, see here: https://pulsar.apache.org/docs/en/adaptors-spark/), the following exception is thrown (this is taken from the log of one of the Spark executors): ``` 21/05/13 11:59:34 WARN PulsarClientImpl: [topic: persistent://tenant/namespace/topic] Could not get connection while getPartitionedTopicMetadata -- Will try again in 380 ms 21/05/13 11:59:34 INFO ConnectionPool: [[id: 0x4d13ed61, L:/1.2.3.4:43624 - R:broker.svc.cluster.local/1.2.3.4:6650]] Connected to server 21/05/13 11:59:34 WARN ClientCnx: [broker.svc.cluster.local/1.2.3.4:6650] Got exception java.lang.RuntimeException: failed to get client token at org.apache.pulsar.client.impl.auth.AuthenticationDataToken.getToken(AuthenticationDataToken.java:62) at org.apache.pulsar.client.impl.auth.AuthenticationDataToken.getCommandData(AuthenticationDataToken.java:55) at org.apache.pulsar.client.api.AuthenticationDataProvider.authenticate(AuthenticationDataProvider.java:133) at org.apache.pulsar.client.impl.ClientCnx.newConnectCommand(ClientCnx.java:218) at org.apache.pulsar.client.impl.ClientCnx.channelActive(ClientCnx.java:199) at org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.invokeChannelActive(AbstractChannelHandlerContext.java:230) at org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.invokeChannelActive(AbstractChannelHandlerContext.java:216) at org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.fireChannelActive(AbstractChannelHandlerContext.java:209) at org.apache.pulsar.shade.io.netty.channel.DefaultChannelPipeline$HeadContext.channelActive(DefaultChannelPipeline.java:1398) at org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.invokeChannelActive(AbstractChannelHandlerContext.java:230) at org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.invokeChannelActive(AbstractChannelHandlerContext.java:216) at org.apache.pulsar.shade.io.netty.channel.DefaultChannelPipeline.fireChannelActive(DefaultChannelPipeline.java:895) at org.apache.pulsar.shade.io.netty.channel.epoll.AbstractEpollChannel$AbstractEpollUnsafe.fulfillConnectPromise(AbstractEpollChannel.java:620) at org.apache.pulsar.shade.io.netty.channel.epoll.AbstractEpollChannel$AbstractEpollUnsafe.finishConnect(AbstractEpollChannel.java:653) at org.apache.pulsar.shade.io.netty.channel.epoll.AbstractEpollChannel$AbstractEpollUnsafe.epollOutReady(AbstractEpollChannel.java:529) at org.apache.pulsar.shade.io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:465) at org.apache.pulsar.shade.io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:378) at org.apache.pulsar.shade.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989) at org.apache.pulsar.shade.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) at org.apache.pulsar.shade.io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) at java.lang.Thread.run(Thread.java:748) Caused by: java.lang.NullPointerException at org.apache.pulsar.client.impl.auth.AuthenticationDataToken.getToken(AuthenticationDataToken.java:60) ... 20 more ``` This happens since the token supplier function become null when it gets transferred to eg. a Spark executor. Refer to this issue (which is roughly the same): https://github.com/streamnative/pulsar/issues/905 . ### Modifications Added a new token storage class, `SerializableAuthenticationToken`, which can be serialized properly (this is tested in its corresponding unit test as well). On the other hand, this class does not use supplier functions to fetch token data, it stores the token as a plain string instead. For use-cases requiring a token supplier, this cannot be used, but can load its configuration from file (using the `file://` URL prefix), like the original `AuthenticationToken` class. Corresponding unit test were added to the class. Will also add proper documentation prior to merging once the change can be accepted - if that is required. --- .../impl/auth/AuthenticationDataTls.java | 3 + .../client/impl/auth/AuthenticationToken.java | 58 ++++++++++++++----- .../impl/auth/AuthenticationTokenTest.java | 43 +++++++++++++- site2/docs/adaptors-spark.md | 10 +++- 4 files changed, 97 insertions(+), 17 deletions(-) diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/AuthenticationDataTls.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/AuthenticationDataTls.java index b7987eb88e2d15..28b520c52246f7 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/AuthenticationDataTls.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/AuthenticationDataTls.java @@ -32,6 +32,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; + public class AuthenticationDataTls implements AuthenticationDataProvider { private static final long serialVersionUID = 1L; protected X509Certificate[] tlsCertificates; @@ -39,6 +41,7 @@ public class AuthenticationDataTls implements AuthenticationDataProvider { private transient FileModifiedTimeUpdater certFile, keyFile; // key and cert using stream private transient InputStream certStream, keyStream; + @SuppressFBWarnings(value="SE_TRANSIENT_FIELD_NOT_RESTORED", justification = "Using custom serializer which Findbugs can't detect") private transient Supplier certStreamProvider, keyStreamProvider, trustStoreStreamProvider; public AuthenticationDataTls(String certFilePath, String keyFilePath) throws KeyManagementException { diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/AuthenticationToken.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/AuthenticationToken.java index 77940480e7a17b..3365dbbf2e005c 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/AuthenticationToken.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/AuthenticationToken.java @@ -21,7 +21,9 @@ import com.google.common.base.Charsets; +import java.io.ByteArrayInputStream; import java.io.IOException; +import java.io.Serializable; import java.net.URI; import java.nio.file.Files; import java.nio.file.Paths; @@ -42,18 +44,16 @@ public class AuthenticationToken implements Authentication, EncodedAuthenticationParameterSupport { private static final long serialVersionUID = 1L; - private transient Supplier tokenSupplier = null; + private Supplier tokenSupplier = null; public AuthenticationToken() { } public AuthenticationToken(String token) { - this(() -> token); + this(new SerializableTokenSupplier(token)); } - public AuthenticationToken(Supplier tokenSupplier) { - this.tokenSupplier = tokenSupplier; - } + public AuthenticationToken(Supplier tokenSupplier) { this.tokenSupplier = tokenSupplier; } @Override public void close() throws IOException { @@ -75,24 +75,18 @@ public void configure(String encodedAuthParamString) { // Interpret the whole param string as the token. If the string contains the notation `token:xxxxx` then strip // the prefix if (encodedAuthParamString.startsWith("token:")) { - this.tokenSupplier = () -> encodedAuthParamString.substring("token:".length()); + this.tokenSupplier = new SerializableTokenSupplier(encodedAuthParamString.substring("token:".length())); } else if (encodedAuthParamString.startsWith("file:")) { // Read token from a file URI filePath = URI.create(encodedAuthParamString); - this.tokenSupplier = () -> { - try { - return new String(Files.readAllBytes(Paths.get(filePath)), Charsets.UTF_8).trim(); - } catch (IOException e) { - throw new RuntimeException("Failed to read token from file", e); - } - }; + this.tokenSupplier = new SerializableURITokenSupplier(filePath); } else { try { // Read token from json string JsonObject authParams = new Gson().fromJson(encodedAuthParamString, JsonObject.class); - this.tokenSupplier = () -> authParams.get("token").getAsString(); + this.tokenSupplier = new SerializableTokenSupplier(authParams.get("token").getAsString()); } catch (JsonSyntaxException e) { - this.tokenSupplier = () -> encodedAuthParamString; + this.tokenSupplier = new SerializableTokenSupplier(encodedAuthParamString); } } } @@ -107,4 +101,38 @@ public void start() throws PulsarClientException { // noop } + private static class SerializableURITokenSupplier implements Supplier, Serializable { + + private static final long serialVersionUID = 3160666668166028760L; + private final URI uri; + + public SerializableURITokenSupplier(final URI uri) { + super(); + this.uri = uri; + } + + @Override + public String get() { + try { + return new String(Files.readAllBytes(Paths.get(uri)), Charsets.UTF_8).trim(); + } catch (IOException e) { + throw new RuntimeException("Failed to read token from file", e); + } + } + } + + private static class SerializableTokenSupplier implements Supplier, Serializable { + + private static final long serialVersionUID = 5095234161799506913L; + private final String token; + + public SerializableTokenSupplier(final String token) { + super(); + this.token = token; + } + + @Override + public String get() { return token; } + + } } diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/auth/AuthenticationTokenTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/auth/AuthenticationTokenTest.java index e12ff71ea807d5..56483867704067 100644 --- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/auth/AuthenticationTokenTest.java +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/auth/AuthenticationTokenTest.java @@ -25,10 +25,12 @@ import com.google.common.base.Charsets; -import java.io.File; +import java.io.*; import java.util.Collections; +import java.util.function.Supplier; import org.apache.commons.io.FileUtils; +import org.apache.commons.lang.SerializationUtils; import org.apache.pulsar.client.api.Authentication; import org.apache.pulsar.client.api.AuthenticationDataProvider; import org.apache.pulsar.client.impl.PulsarClientImpl; @@ -171,4 +173,43 @@ public void testAuthTokenConfigFromJson() throws Exception{ assertEquals(authData.getCommandData(), "my-test-token-string"); authToken.close(); } + + @Test + public void testSerializableAuthentication() throws Exception { + SerializableSupplier tokenSupplier = new SerializableSupplier("cert"); + AuthenticationToken token = new AuthenticationToken(tokenSupplier); + + // serialize + ByteArrayOutputStream outStream = new ByteArrayOutputStream(); + ObjectOutputStream out = new ObjectOutputStream(outStream); + out.writeObject(token); + out.flush(); + byte[] outputBytes = outStream.toByteArray(); + out.close(); + + // deserialize + ByteArrayInputStream bis = new ByteArrayInputStream(outputBytes); + ObjectInput in = new ObjectInputStream(bis); + AuthenticationToken ts = (AuthenticationToken) in.readObject(); + in.close(); + + // read the deserialized object + assertEquals(tokenSupplier.token, ts.getAuthData().getCommandData()); + } + + public static class SerializableSupplier implements Supplier, Serializable { + + private static final long serialVersionUID = 6259616338933150683L; + private String token; + + public SerializableSupplier(final String token) { + super(); + this.token = token; + } + + @Override + public String get() { + return token; + } + } } diff --git a/site2/docs/adaptors-spark.md b/site2/docs/adaptors-spark.md index 10877ceb71b62e..2e7928f171740e 100644 --- a/site2/docs/adaptors-spark.md +++ b/site2/docs/adaptors-spark.md @@ -72,4 +72,12 @@ Pass an instance of `SparkStreamingPulsarReceiver` to the `receiverStream` metho JavaReceiverInputDStream lineDStream = jsc.receiverStream(pulsarReceiver); ``` -For a complete example, click [here](https://github.com/apache/pulsar-adapters/blob/master/examples/spark/src/main/java/org/apache/spark/streaming/receiver/example/SparkStreamingPulsarReceiverExample.java). In this example, the number of messages that contain the string "Pulsar" in received messages is counted. \ No newline at end of file +For a complete example, click [here](https://github.com/apache/pulsar-adapters/blob/master/examples/spark/src/main/java/org/apache/spark/streaming/receiver/example/SparkStreamingPulsarReceiverExample.java). In this example, the number of messages that contain the string "Pulsar" in received messages is counted. + +Note that if needed, other Pulsar authentication classes can be used. For example, in order to use a token during authentication the following parameters for the `SparkStreamingPulsarReceiver` constructor can be set: +```java +SparkStreamingPulsarReceiver pulsarReceiver = new SparkStreamingPulsarReceiver( + serviceUrl, + pulsarConf, + new AuthenticationToken("token:")); +``` \ No newline at end of file