Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Using Security feature in AoP - Part 2(Implementation based on PR#897) #898

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -415,6 +415,10 @@ To use proxy, complete the following steps. If you do not know some detailed ste
$RABBITMQ_PERF_TOOL_HOME/bin/runjava com.rabbitmq.perf.PerfTest -e ex-perf -u qu-perf -r 1000 -h amqp://127.0.0.1:5682 -p
```

# Advanced features
You can configure and manage AoP based on your requirements. Check the following guides for more details.
- [Secure AoP](docs/security.md)

## Project Maintainer
- [@codelipenghui](https://github.com/codelipenghui)
- [@gaoran10](https://github.com/gaoran10)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ public class AmqpServiceConfiguration extends ServiceConfiguration {
private static final String CATEGORY_AMQP = "AMQP on Pulsar";
@Category
private static final String CATEGORY_AMQP_PROXY = "AMQP Proxy";
@Category
private static final String CATEGORY_AMQP_SSL = "AMQP Proxy SSL";

//
// --- AMQP on Pulsar Broker configuration ---
Expand Down Expand Up @@ -137,4 +139,87 @@ public class AmqpServiceConfiguration extends ServiceConfiguration {
)
private int amqpPulsarConsumerQueueSize = 10000;

@FieldContext(
category = CATEGORY_AMQP,
doc = "is the amqp authentication open"
)
private boolean amqpAuthenticationEnabled = false;

@FieldContext(
category = CATEGORY_AMQP,
doc = "is the amqp authorization open"
)
private boolean amqpAuthorizationEnabled = false;

@FieldContext(
category = CATEGORY_AMQP_SSL,
doc = "is the amqp proxy ssl open"
)
private boolean amqpTlsEnabled = false;

@FieldContext(
category = CATEGORY_AMQP_SSL,
doc = "amqp ssl configuration"
)
private String amqpSslProtocol = "TLSv1.2";

@FieldContext(
category = CATEGORY_AMQP_SSL,
doc = "amqp ssl configuration"
)
private String amqpSslKeystoreType = "PKCS12";

@FieldContext(
category = CATEGORY_AMQP_SSL,
doc = "amqp ssl configuration"
)
private String amqpSslKeystoreLocation;

@FieldContext(
category = CATEGORY_AMQP_SSL,
doc = "amqp ssl configuration"
)
private String amqpSslKeystorePassword;

@FieldContext(
category = CATEGORY_AMQP_SSL,
doc = "amqp ssl configuration"
)
private String amqpSslTruststoreType = "JKS";

@FieldContext(
category = CATEGORY_AMQP_SSL,
doc = "amqp ssl configuration"
)
private String amqpSslTruststoreLocation;

@FieldContext(
category = CATEGORY_AMQP_SSL,
doc = "amqp ssl configuration"
)
private String amqpSslTruststorePassword;

@FieldContext(
category = CATEGORY_AMQP_SSL,
doc = "amqp ssl configuration"
)
private String amqpSslKeymanagerAlgorithm = "SunX509";

@FieldContext(
category = CATEGORY_AMQP_SSL,
doc = "amqp ssl configuration"
)
private String amqpSslTrustmanagerAlgorithm = "SunX509";

@FieldContext(
category = CATEGORY_AMQP_SSL,
doc = "amqp TLS peer or not"
)
private boolean amqpSslClientAuth = false;

@FieldContext(
category = CATEGORY_AMQP,
doc = "Mechanisms supported"
)
private String amqpAllowedMechanisms = "PLAIN token";
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ public class ProxyConfiguration {
private static final String CATEGORY_AMQP_PROXY = "AMQP Proxy";
@Category
private static final String CATEGORY_BROKER_DISCOVERY = "Broker Discovery";
@Category
private static final String CATEGORY_AMQP_SSL = "AMQP Proxy SSL";

@FieldContext(
category = CATEGORY_AMQP,
Expand Down Expand Up @@ -85,4 +87,76 @@ public class ProxyConfiguration {
)
private int amqpExplicitFlushAfterFlushes = 1000;

@FieldContext(
category = CATEGORY_AMQP_SSL,
doc = "is the amqp proxy ssl open"
)
private boolean amqpTlsEnabled = false;

@FieldContext(
category = CATEGORY_AMQP_SSL,
doc = "amqp ssl configuration"
)
private String amqpSslProtocol = "TLSv1.2";

@FieldContext(
category = CATEGORY_AMQP_SSL,
doc = "amqp ssl configuration"
)
private String amqpSslKeystoreType = "PKCS12";

@FieldContext(
category = CATEGORY_AMQP_SSL,
doc = "amqp ssl configuration"
)
private String amqpSslKeystoreLocation;

@FieldContext(
category = CATEGORY_AMQP_SSL,
doc = "amqp ssl configuration"
)
private String amqpSslKeystorePassword;

@FieldContext(
category = CATEGORY_AMQP_SSL,
doc = "amqp ssl configuration"
)
private String amqpSslTruststoreType = "JKS";

@FieldContext(
category = CATEGORY_AMQP_SSL,
doc = "amqp ssl configuration"
)
private String amqpSslTruststoreLocation;

@FieldContext(
category = CATEGORY_AMQP_SSL,
doc = "amqp ssl configuration"
)
private String amqpSslTruststorePassword;

@FieldContext(
category = CATEGORY_AMQP_SSL,
doc = "amqp ssl configuration"
)
private String amqpSslKeymanagerAlgorithm = "SunX509";

@FieldContext(
category = CATEGORY_AMQP_SSL,
doc = "amqp ssl configuration"
)
private String amqpSslTrustmanagerAlgorithm = "SunX509";

@FieldContext(
category = CATEGORY_AMQP_SSL,
doc = "amqp TLS peer or not"
)
private boolean amqpSslClientAuth = false;


@FieldContext(
category = CATEGORY_AMQP,
doc = "Mechanisms supported"
)
private String amqpAllowedMechanisms = "PLAIN token";
}
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ public void receiveProtocolHeader(ProtocolInitiation protocolInitiation) {
}
}),
// TODO temporary modification
"PLAIN token".getBytes(US_ASCII),
this.proxyConfig.getAmqpAllowedMechanisms().getBytes(US_ASCII),
"en_US".getBytes(US_ASCII));
writeFrame(responseBody.generateFrame(0));
} catch (QpidException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,23 +16,33 @@
import io.netty.channel.ChannelInitializer;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.flush.FlushConsolidationHandler;
import io.netty.handler.ssl.SslHandler;
import io.streamnative.pulsar.handlers.amqp.AmqpEncoder;
import io.streamnative.pulsar.handlers.amqp.utils.SSLUtils;

/**
* Proxy service channel initializer.
*/
public class ServiceChannelInitializer extends ChannelInitializer<SocketChannel> {

public static final String TLS_HANDLER = "tls";

private ProxyService proxyService;

private final boolean isTlsEnabled;

public ServiceChannelInitializer(ProxyService proxyService) {
this.proxyService = proxyService;
this.isTlsEnabled = proxyService.getProxyConfig().isAmqpTlsEnabled();
}

@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast("consolidation", new FlushConsolidationHandler(
proxyService.getProxyConfig().getAmqpExplicitFlushAfterFlushes(), true));
if (isTlsEnabled) {
ch.pipeline().addLast(TLS_HANDLER, new SslHandler(SSLUtils.createSslEngineSun(proxyService.getProxyConfig())));
}
ch.pipeline().addLast("frameEncoder", new AmqpEncoder());
ch.pipeline().addLast("handler", new ProxyConnection(proxyService));
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
package io.streamnative.pulsar.handlers.amqp.utils;

import io.streamnative.pulsar.handlers.amqp.proxy.ProxyConfiguration;

import javax.net.ssl.KeyManagerFactory;
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLEngine;
import javax.net.ssl.TrustManagerFactory;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.security.KeyStore;
import java.security.KeyStoreException;
import java.security.NoSuchAlgorithmException;
import java.security.UnrecoverableKeyException;
import java.security.cert.CertificateException;

/**
* Helper class for setting up SSL for KafkaChannelInitializer.
* RabbitMQ and Pulsar use different way to store SSL keys, this utils only work for RabbitMQ.
*/
public class SSLUtils {

/**
* Create SSL engine used in ServiceChannelInitializer.
*/
public static SSLEngine createSslEngineSun(ProxyConfiguration proxyConfig) throws Exception {
KeyManagerFactory kmf = createKeyManagerFactory(proxyConfig);
TrustManagerFactory tmf = createTrustManagerFactory(proxyConfig);

SSLContext sslContext = SSLContext.getInstance(proxyConfig.getAmqpSslProtocol());
sslContext.init(kmf.getKeyManagers(), tmf.getTrustManagers(), null);
SSLEngine sslEngine = sslContext.createSSLEngine();
// It means engine working for AMQP server not AMQP client
sslEngine.setUseClientMode(false);

setClientAuth(sslEngine, proxyConfig.isAmqpSslClientAuth());
return sslEngine;
}

/**
* Create a KeyManager to manage the KeyStore containing the server p12 file
* */
public static KeyManagerFactory createKeyManagerFactory(ProxyConfiguration proxyConfig) throws Exception {
char[] keyPassphrase = proxyConfig.getAmqpSslKeystorePassword().toCharArray();
KeyStore ks = KeyStore.getInstance(proxyConfig.getAmqpSslKeystoreType());
ks.load(new FileInputStream(proxyConfig.getAmqpSslKeystoreLocation()), keyPassphrase);
KeyManagerFactory kmf = KeyManagerFactory.getInstance(proxyConfig.getAmqpSslKeymanagerAlgorithm());
kmf.init(ks, keyPassphrase);
return kmf;
}

/**
* Create a TrustManager to manage the TrustStore trusting the CA certificates
* */
public static TrustManagerFactory createTrustManagerFactory(ProxyConfiguration proxyConfig) throws Exception {
char[] trustPassphrase = proxyConfig.getAmqpSslTruststorePassword().toCharArray();
KeyStore tks = KeyStore.getInstance(proxyConfig.getAmqpSslTruststoreType());
tks.load(new FileInputStream(proxyConfig.getAmqpSslTruststoreLocation()), trustPassphrase);
TrustManagerFactory tmf = TrustManagerFactory.getInstance(proxyConfig.getAmqpSslTrustmanagerAlgorithm());
tmf.init(tks);
return tmf;
}

/**
* If true, client wound be asked to auth to make TLS HAND_SHAKE.
* If false, client don't need to take relevant files to make TLS HAND_SHAKE
* */
public static void setClientAuth(SSLEngine sslEngine, boolean isPeer) {
sslEngine.setNeedClientAuth(isPeer);
}
}
Loading