Skip to content

Commit

Permalink
[api][pulsar-client] make AuthenticationToken serializable (apache#10574
Browse files Browse the repository at this point in the history
)

### Motivation

When trying to authenticate Pulsar in distributed systems (a very good example of this is the Spark connector, see here: https://pulsar.apache.org/docs/en/adaptors-spark/), the following exception is thrown (this is taken from the log of one of the Spark executors):
```
21/05/13 11:59:34 WARN PulsarClientImpl: [topic: persistent://tenant/namespace/topic] Could not get connection while getPartitionedTopicMetadata -- Will try again in 380 ms
21/05/13 11:59:34 INFO ConnectionPool: [[id: 0x4d13ed61, L:/1.2.3.4:43624 - R:broker.svc.cluster.local/1.2.3.4:6650]] Connected to server
21/05/13 11:59:34 WARN ClientCnx: [broker.svc.cluster.local/1.2.3.4:6650] Got exception java.lang.RuntimeException: failed to get client token
	at org.apache.pulsar.client.impl.auth.AuthenticationDataToken.getToken(AuthenticationDataToken.java:62)
	at org.apache.pulsar.client.impl.auth.AuthenticationDataToken.getCommandData(AuthenticationDataToken.java:55)
	at org.apache.pulsar.client.api.AuthenticationDataProvider.authenticate(AuthenticationDataProvider.java:133)
	at org.apache.pulsar.client.impl.ClientCnx.newConnectCommand(ClientCnx.java:218)
	at org.apache.pulsar.client.impl.ClientCnx.channelActive(ClientCnx.java:199)
	at org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.invokeChannelActive(AbstractChannelHandlerContext.java:230)
	at org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.invokeChannelActive(AbstractChannelHandlerContext.java:216)
	at org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.fireChannelActive(AbstractChannelHandlerContext.java:209)
	at org.apache.pulsar.shade.io.netty.channel.DefaultChannelPipeline$HeadContext.channelActive(DefaultChannelPipeline.java:1398)
	at org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.invokeChannelActive(AbstractChannelHandlerContext.java:230)
	at org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.invokeChannelActive(AbstractChannelHandlerContext.java:216)
	at org.apache.pulsar.shade.io.netty.channel.DefaultChannelPipeline.fireChannelActive(DefaultChannelPipeline.java:895)
	at org.apache.pulsar.shade.io.netty.channel.epoll.AbstractEpollChannel$AbstractEpollUnsafe.fulfillConnectPromise(AbstractEpollChannel.java:620)
	at org.apache.pulsar.shade.io.netty.channel.epoll.AbstractEpollChannel$AbstractEpollUnsafe.finishConnect(AbstractEpollChannel.java:653)
	at org.apache.pulsar.shade.io.netty.channel.epoll.AbstractEpollChannel$AbstractEpollUnsafe.epollOutReady(AbstractEpollChannel.java:529)
	at org.apache.pulsar.shade.io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:465)
	at org.apache.pulsar.shade.io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:378)
	at org.apache.pulsar.shade.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
	at org.apache.pulsar.shade.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
	at org.apache.pulsar.shade.io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
	at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.NullPointerException
	at org.apache.pulsar.client.impl.auth.AuthenticationDataToken.getToken(AuthenticationDataToken.java:60)
	... 20 more
```
This happens since the token supplier function become null when it gets transferred to eg. a Spark executor. Refer to this issue (which is roughly the same): https://github.com/streamnative/pulsar/issues/905 .

### Modifications
Added a new token storage class, `SerializableAuthenticationToken`, which can be serialized properly (this is tested in its corresponding unit test as well). On the other hand, this class does not use supplier functions to fetch token data, it stores the token as a plain string instead. For use-cases requiring a token supplier, this cannot be used, but can load its configuration from file (using the `file://` URL prefix), like the original `AuthenticationToken` class.

Corresponding unit test were added to the class. Will also add proper documentation prior to merging once the change can be accepted - if that is required.
  • Loading branch information
atezs82 authored and yangl committed Jun 23, 2021
1 parent 8335224 commit dba0479
Show file tree
Hide file tree
Showing 4 changed files with 97 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,16 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;

public class AuthenticationDataTls implements AuthenticationDataProvider {
private static final long serialVersionUID = 1L;
protected X509Certificate[] tlsCertificates;
protected PrivateKey tlsPrivateKey;
private transient FileModifiedTimeUpdater certFile, keyFile;
// key and cert using stream
private transient InputStream certStream, keyStream;
@SuppressFBWarnings(value="SE_TRANSIENT_FIELD_NOT_RESTORED", justification = "Using custom serializer which Findbugs can't detect")
private transient Supplier<ByteArrayInputStream> certStreamProvider, keyStreamProvider, trustStoreStreamProvider;

public AuthenticationDataTls(String certFilePath, String keyFilePath) throws KeyManagementException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@

import com.google.common.base.Charsets;

import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.Serializable;
import java.net.URI;
import java.nio.file.Files;
import java.nio.file.Paths;
Expand All @@ -42,18 +44,16 @@
public class AuthenticationToken implements Authentication, EncodedAuthenticationParameterSupport {

private static final long serialVersionUID = 1L;
private transient Supplier<String> tokenSupplier = null;
private Supplier<String> tokenSupplier = null;

public AuthenticationToken() {
}

public AuthenticationToken(String token) {
this(() -> token);
this(new SerializableTokenSupplier(token));
}

public AuthenticationToken(Supplier<String> tokenSupplier) {
this.tokenSupplier = tokenSupplier;
}
public AuthenticationToken(Supplier<String> tokenSupplier) { this.tokenSupplier = tokenSupplier; }

@Override
public void close() throws IOException {
Expand All @@ -75,24 +75,18 @@ public void configure(String encodedAuthParamString) {
// Interpret the whole param string as the token. If the string contains the notation `token:xxxxx` then strip
// the prefix
if (encodedAuthParamString.startsWith("token:")) {
this.tokenSupplier = () -> encodedAuthParamString.substring("token:".length());
this.tokenSupplier = new SerializableTokenSupplier(encodedAuthParamString.substring("token:".length()));
} else if (encodedAuthParamString.startsWith("file:")) {
// Read token from a file
URI filePath = URI.create(encodedAuthParamString);
this.tokenSupplier = () -> {
try {
return new String(Files.readAllBytes(Paths.get(filePath)), Charsets.UTF_8).trim();
} catch (IOException e) {
throw new RuntimeException("Failed to read token from file", e);
}
};
this.tokenSupplier = new SerializableURITokenSupplier(filePath);
} else {
try {
// Read token from json string
JsonObject authParams = new Gson().fromJson(encodedAuthParamString, JsonObject.class);
this.tokenSupplier = () -> authParams.get("token").getAsString();
this.tokenSupplier = new SerializableTokenSupplier(authParams.get("token").getAsString());
} catch (JsonSyntaxException e) {
this.tokenSupplier = () -> encodedAuthParamString;
this.tokenSupplier = new SerializableTokenSupplier(encodedAuthParamString);
}
}
}
Expand All @@ -107,4 +101,38 @@ public void start() throws PulsarClientException {
// noop
}

private static class SerializableURITokenSupplier implements Supplier<String>, Serializable {

private static final long serialVersionUID = 3160666668166028760L;
private final URI uri;

public SerializableURITokenSupplier(final URI uri) {
super();
this.uri = uri;
}

@Override
public String get() {
try {
return new String(Files.readAllBytes(Paths.get(uri)), Charsets.UTF_8).trim();
} catch (IOException e) {
throw new RuntimeException("Failed to read token from file", e);
}
}
}

private static class SerializableTokenSupplier implements Supplier<String>, Serializable {

private static final long serialVersionUID = 5095234161799506913L;
private final String token;

public SerializableTokenSupplier(final String token) {
super();
this.token = token;
}

@Override
public String get() { return token; }

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,12 @@

import com.google.common.base.Charsets;

import java.io.File;
import java.io.*;
import java.util.Collections;
import java.util.function.Supplier;

import org.apache.commons.io.FileUtils;
import org.apache.commons.lang.SerializationUtils;
import org.apache.pulsar.client.api.Authentication;
import org.apache.pulsar.client.api.AuthenticationDataProvider;
import org.apache.pulsar.client.impl.PulsarClientImpl;
Expand Down Expand Up @@ -171,4 +173,43 @@ public void testAuthTokenConfigFromJson() throws Exception{
assertEquals(authData.getCommandData(), "my-test-token-string");
authToken.close();
}

@Test
public void testSerializableAuthentication() throws Exception {
SerializableSupplier tokenSupplier = new SerializableSupplier("cert");
AuthenticationToken token = new AuthenticationToken(tokenSupplier);

// serialize
ByteArrayOutputStream outStream = new ByteArrayOutputStream();
ObjectOutputStream out = new ObjectOutputStream(outStream);
out.writeObject(token);
out.flush();
byte[] outputBytes = outStream.toByteArray();
out.close();

// deserialize
ByteArrayInputStream bis = new ByteArrayInputStream(outputBytes);
ObjectInput in = new ObjectInputStream(bis);
AuthenticationToken ts = (AuthenticationToken) in.readObject();
in.close();

// read the deserialized object
assertEquals(tokenSupplier.token, ts.getAuthData().getCommandData());
}

public static class SerializableSupplier implements Supplier<String>, Serializable {

private static final long serialVersionUID = 6259616338933150683L;
private String token;

public SerializableSupplier(final String token) {
super();
this.token = token;
}

@Override
public String get() {
return token;
}
}
}
10 changes: 9 additions & 1 deletion site2/docs/adaptors-spark.md
Original file line number Diff line number Diff line change
Expand Up @@ -72,4 +72,12 @@ Pass an instance of `SparkStreamingPulsarReceiver` to the `receiverStream` metho
JavaReceiverInputDStream<byte[]> lineDStream = jsc.receiverStream(pulsarReceiver);
```

For a complete example, click [here](https://github.com/apache/pulsar-adapters/blob/master/examples/spark/src/main/java/org/apache/spark/streaming/receiver/example/SparkStreamingPulsarReceiverExample.java). In this example, the number of messages that contain the string "Pulsar" in received messages is counted.
For a complete example, click [here](https://github.com/apache/pulsar-adapters/blob/master/examples/spark/src/main/java/org/apache/spark/streaming/receiver/example/SparkStreamingPulsarReceiverExample.java). In this example, the number of messages that contain the string "Pulsar" in received messages is counted.

Note that if needed, other Pulsar authentication classes can be used. For example, in order to use a token during authentication the following parameters for the `SparkStreamingPulsarReceiver` constructor can be set:
```java
SparkStreamingPulsarReceiver pulsarReceiver = new SparkStreamingPulsarReceiver(
serviceUrl,
pulsarConf,
new AuthenticationToken("token:<secret-JWT-token>"));
```

0 comments on commit dba0479

Please sign in to comment.