Skip to content
Browse files

Merge remote-tracking branch 'anfeng/master-auth'

  • Loading branch information...
2 parents 7059efc + 4b66980 commit d6374fe57e75d35f1b5afa476e57e46e1980aa96 @nathanmarz nathanmarz committed Mar 11, 2013
Showing with 1,272 additions and 30 deletions.
  1. +1 −0 conf/defaults.yaml
  2. +21 −0 conf/jaas_digest.conf
  3. +22 −0 logback/cluster.xml
  4. +10 −0 src/jvm/backtype/storm/Config.java
  5. +81 −0 src/jvm/backtype/storm/security/auth/AuthUtils.java
  6. +30 −0 src/jvm/backtype/storm/security/auth/IAuthorizer.java
  7. +38 −0 src/jvm/backtype/storm/security/auth/ITransportPlugin.java
  8. +91 −0 src/jvm/backtype/storm/security/auth/ReqContext.java
  9. +143 −0 src/jvm/backtype/storm/security/auth/SaslTransportPlugin.java
  10. +107 −0 src/jvm/backtype/storm/security/auth/SimpleTransportPlugin.java
  11. +62 −0 src/jvm/backtype/storm/security/auth/ThriftClient.java
  12. +62 −0 src/jvm/backtype/storm/security/auth/ThriftServer.java
  13. +40 −0 src/jvm/backtype/storm/security/auth/authorizer/DenyAuthorizer.java
  14. +40 −0 src/jvm/backtype/storm/security/auth/authorizer/NoopAuthorizer.java
  15. +93 −0 src/jvm/backtype/storm/security/auth/digest/ClientCallbackHandler.java
  16. +52 −0 src/jvm/backtype/storm/security/auth/digest/DigestSaslTransportPlugin.java
  17. +97 −0 src/jvm/backtype/storm/security/auth/digest/ServerCallbackHandler.java
  18. +21 −30 src/jvm/backtype/storm/utils/NimbusClient.java
  19. +218 −0 test/clj/backtype/storm/security/auth/auth_test.clj
  20. +12 −0 test/clj/backtype/storm/security/auth/jaas_digest.conf
  21. +13 −0 test/clj/backtype/storm/security/auth/jaas_digest_bad_password.conf
  22. +5 −0 test/clj/backtype/storm/security/auth/jaas_digest_missing_client.conf
  23. +13 −0 test/clj/backtype/storm/security/auth/jaas_digest_unknown_user.conf
View
1 conf/defaults.yaml
@@ -17,6 +17,7 @@ storm.zookeeper.retry.interval: 1000
storm.zookeeper.retry.intervalceiling.millis: 30000
storm.cluster.mode: "distributed" # can be distributed or local
storm.local.mode.zmq: false
+storm.thrift.transport: "backtype.storm.security.auth.SimpleTransportPlugin"
### nimbus.* configs are for the master
nimbus.host: "localhost"
View
21 conf/jaas_digest.conf
@@ -0,0 +1,21 @@
+/* This is example of JAAS Login configuration for digest authentication
+*/
+
+/*
+StormServer section should contain a list of authorized users and their passwords.
+*/
+StormServer {
+ org.apache.zookeeper.server.auth.DigestLoginModule required
+ user_super="adminsecret"
+ user_bob="bobsecret";
+ user_john="johnsecret";
+};
+
+/*
+StormClient section contains one user name and his/her password.
+*/
+StormClient {
+ org.apache.zookeeper.server.auth.DigestLoginModule required
+ username="bob"
+ password="bobsecret";
+};
View
22 logback/cluster.xml
@@ -16,9 +16,31 @@
<encoder>
<pattern>%d{yyyy-MM-dd HH:mm:ss} %c{1} [%p] %m%n</pattern>
</encoder>
+ </appender>
+
+ <appender name="ACCESS" class="ch.qos.logback.core.rolling.RollingFileAppender">
+ <file>${storm.home}/logs/access.log</file>
+ <rollingPolicy class="ch.qos.logback.core.rolling.FixedWindowRollingPolicy">
+ <fileNamePattern>${storm.home}/logs/${logfile.name}.%i</fileNamePattern>
+ <minIndex>1</minIndex>
+ <maxIndex>9</maxIndex>
+ </rollingPolicy>
+
+ <triggeringPolicy class="ch.qos.logback.core.rolling.SizeBasedTriggeringPolicy">
+ <maxFileSize>100MB</maxFileSize>
+ </triggeringPolicy>
+
+ <encoder>
+ <pattern>%d{yyyy-MM-dd HH:mm:ss} %c{1} [%p] %m%n</pattern>
+ </encoder>
</appender>
<root level="INFO">
<appender-ref ref="A1"/>
</root>
+
+ <logger name="backtype.storm.security.auth.authorizer" additivity="false">
+ <level value="INFO" />
+ <appender-ref ref="ACCESS" />
+ </logger>
</configuration>
View
10 src/jvm/backtype/storm/Config.java
@@ -65,6 +65,11 @@
public static String STORM_LOCAL_HOSTNAME = "storm.local.hostname";
/**
+ * The transport plug-in for Thrift client/server communication
+ */
+ public static String STORM_THRIFT_TRANSPORT_PLUGIN = "storm.thrift.transport";
+
+ /**
* The serializer class for ListDelegate (tuple payload).
* The default serializer will be ListDelegateSerializer
*/
@@ -212,6 +217,11 @@
public static String NIMBUS_TOPOLOGY_VALIDATOR = "nimbus.topology.validator";
/**
+ * Class name for authorization plugin for Nimbus
+ */
+ public static String NIMBUS_AUTHORIZER = "nimbus.authorizer";
+
+ /**
* Storm UI binds to this port.
*/
public static String UI_PORT = "ui.port";
View
81 src/jvm/backtype/storm/security/auth/AuthUtils.java
@@ -0,0 +1,81 @@
+package backtype.storm.security.auth;
+
+import backtype.storm.Config;
+import javax.security.auth.login.Configuration;
+import javax.security.auth.login.AppConfigurationEntry;
+import java.security.NoSuchAlgorithmException;
+import java.security.URIParameter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.net.URI;
+import java.util.Map;
+
+public class AuthUtils {
+ private static final Logger LOG = LoggerFactory.getLogger(AuthUtils.class);
+ public static final String LOGIN_CONTEXT_SERVER = "StormServer";
+ public static final String LOGIN_CONTEXT_CLIENT = "StormClient";
+ public static final String SERVICE = "storm_thrift_server";
+
+ /**
+ * Construct a JAAS configuration object per storm configuration file
+ * @param storm_conf Storm configuration
+ * @return JAAS configuration object
+ */
+ public static Configuration GetConfiguration(Map storm_conf) {
+ Configuration login_conf = null;
+
+ //find login file configuration from Storm configuration
+ String loginConfigurationFile = (String)storm_conf.get("java.security.auth.login.config");
+ if ((loginConfigurationFile != null) && (loginConfigurationFile.length()>0)) {
+ try {
+ URI config_uri = new File(loginConfigurationFile).toURI();
+ login_conf = Configuration.getInstance("JavaLoginConfig", new URIParameter(config_uri));
+ } catch (NoSuchAlgorithmException ex1) {
+ if (ex1.getCause() instanceof FileNotFoundException)
+ throw new RuntimeException("configuration file "+loginConfigurationFile+" could not be found");
+ else throw new RuntimeException(ex1);
+ } catch (Exception ex2) {
+ throw new RuntimeException(ex2);
+ }
+ }
+
+ return login_conf;
+ }
+
+ /**
+ * Construct a transport plugin per storm configuration
+ * @param conf storm configuration
+ * @return
+ */
+ public static ITransportPlugin GetTransportPlugin(Map storm_conf, Configuration login_conf) {
+ ITransportPlugin transportPlugin = null;
+ try {
+ String transport_plugin_klassName = (String) storm_conf.get(Config.STORM_THRIFT_TRANSPORT_PLUGIN);
+ Class klass = Class.forName(transport_plugin_klassName);
+ transportPlugin = (ITransportPlugin)klass.newInstance();
+ transportPlugin.prepare(storm_conf, login_conf);
+ } catch(Exception e) {
+ throw new RuntimeException(e);
+ }
+ return transportPlugin;
+ }
+
+ public static String get(Configuration configuration, String section, String key) throws IOException {
+ AppConfigurationEntry configurationEntries[] = configuration.getAppConfigurationEntry(section);
+ if (configurationEntries == null) {
+ String errorMessage = "Could not find a '"+ section + "' entry in this configuration.";
+ throw new IOException(errorMessage);
+ }
+
+ for(AppConfigurationEntry entry: configurationEntries) {
+ Object val = entry.getOptions().get(key);
+ if (val != null)
+ return (String)val;
+ }
+ return null;
+ }
+}
+
View
30 src/jvm/backtype/storm/security/auth/IAuthorizer.java
@@ -0,0 +1,30 @@
+package backtype.storm.security.auth;
+
+import java.util.Map;
+
+/**
+ * Nimbus could be configured with an authorization plugin.
+ * If not specified, all requests are authorized.
+ *
+ * You could specify the authorization plugin via storm parameter. For example:
+ * storm -c nimbus.authorization.class=backtype.storm.security.auth.NoopAuthorizer ...
+ *
+ * You could also specify it via storm.yaml:
+ * nimbus.authorization.class: backtype.storm.security.auth.NoopAuthorizer
+ */
+public interface IAuthorizer {
+ /**
+ * Invoked once immediately after construction
+ * @param conf Storm configuration
+ */
+ void prepare(Map storm_conf);
+
+ /**
+ * permit() method is invoked for each incoming Thrift request.
+ * @param context request context includes info about
+ * @param operation operation name
+ * @param topology_storm configuration of targeted topology
+ * @return true if the request is authorized, false if reject
+ */
+ public boolean permit(ReqContext context, String operation, Map topology_conf);
+}
View
38 src/jvm/backtype/storm/security/auth/ITransportPlugin.java
@@ -0,0 +1,38 @@
+package backtype.storm.security.auth;
+
+import java.io.IOException;
+import java.util.Map;
+
+import javax.security.auth.login.Configuration;
+
+import org.apache.thrift7.TProcessor;
+import org.apache.thrift7.server.TServer;
+import org.apache.thrift7.transport.TTransport;
+import org.apache.thrift7.transport.TTransportException;
+
+/**
+ * Interface for Thrift Transport plugin
+ */
+public interface ITransportPlugin {
+ /**
+ * Invoked once immediately after construction
+ * @param storm_conf Storm configuration
+ * @param login_conf login configuration
+ */
+ void prepare(Map storm_conf, Configuration login_conf);
+
+ /**
+ * Create a server associated with a given port and service handler
+ * @param port listening port
+ * @param processor service handler
+ * @return server to be binded
+ */
+ public TServer getServer(int port, TProcessor processor) throws IOException, TTransportException;
+
+ /**
+ * Connect to the specified server via framed transport
+ * @param transport The underlying Thrift transport.
+ * @param serverHost server host
+ */
+ public TTransport connect(TTransport transport, String serverHost) throws IOException, TTransportException;
+}
View
91 src/jvm/backtype/storm/security/auth/ReqContext.java
@@ -0,0 +1,91 @@
+package backtype.storm.security.auth;
+
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.net.InetAddress;
+import com.google.common.annotations.VisibleForTesting;
+import java.security.AccessControlContext;
+import java.security.AccessController;
+import java.security.Principal;
+import javax.security.auth.Subject;
+
+/**
+ * context request context includes info about
+ * (1) remote address,
+ * (2) remote subject and primary principal
+ * (3) request ID
+ */
+public class ReqContext {
+ private static final AtomicInteger uniqueId = new AtomicInteger(0);
+ private Subject _subject;
+ private InetAddress _remoteAddr;
+ private Integer _reqID;
+ private Map _storm_conf;
+
+ /**
+ * Get a request context associated with current thread
+ * @return
+ */
+ public static ReqContext context() {
+ return ctxt.get();
+ }
+
+ //each thread will have its own request context
+ private static final ThreadLocal < ReqContext > ctxt =
+ new ThreadLocal < ReqContext > () {
+ @Override
+ protected ReqContext initialValue() {
+ return new ReqContext(AccessController.getContext());
+ }
+ };
+
+ //private constructor
+ @VisibleForTesting
+ ReqContext(AccessControlContext acl_ctxt) {
+ _subject = Subject.getSubject(acl_ctxt);
+ _reqID = uniqueId.incrementAndGet();
+ }
+
+ /**
+ * client address
+ */
+ public void setRemoteAddress(InetAddress addr) {
+ _remoteAddr = addr;
+ }
+
+ public InetAddress remoteAddress() {
+ return _remoteAddr;
+ }
+
+ /**
+ * Set remote subject explicitly
+ */
+ public void setSubject(Subject subject) {
+ _subject = subject;
+ }
+
+ /**
+ * Retrieve client subject associated with this request context
+ */
+ public Subject subject() {
+ return _subject;
+ }
+
+ /**
+ * The primary principal associated current subject
+ */
+ public Principal principal() {
+ if (_subject == null) return null;
+ Set<Principal> princs = _subject.getPrincipals();
+ if (princs.size()==0) return null;
+ return (Principal) (princs.toArray()[0]);
+ }
+
+ /**
+ * request ID of this request
+ */
+ public Integer requestID() {
+ return _reqID;
+ }
+}
View
143 src/jvm/backtype/storm/security/auth/SaslTransportPlugin.java
@@ -0,0 +1,143 @@
+package backtype.storm.security.auth;
+
+import java.io.IOException;
+import java.net.Socket;
+import java.security.Principal;
+import java.util.Map;
+
+import javax.security.auth.Subject;
+import javax.security.auth.login.Configuration;
+import javax.security.sasl.SaslServer;
+import org.apache.thrift7.TException;
+import org.apache.thrift7.TProcessor;
+import org.apache.thrift7.protocol.TBinaryProtocol;
+import org.apache.thrift7.protocol.TProtocol;
+import org.apache.thrift7.server.TServer;
+import org.apache.thrift7.server.TThreadPoolServer;
+import org.apache.thrift7.transport.TSaslServerTransport;
+import org.apache.thrift7.transport.TServerSocket;
+import org.apache.thrift7.transport.TSocket;
+import org.apache.thrift7.transport.TTransport;
+import org.apache.thrift7.transport.TTransportException;
+import org.apache.thrift7.transport.TTransportFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Base class for SASL authentication plugin.
+ */
+public abstract class SaslTransportPlugin implements ITransportPlugin {
+ protected Configuration login_conf;
+ private static final Logger LOG = LoggerFactory.getLogger(SaslTransportPlugin.class);
+
+ /**
+ * Invoked once immediately after construction
+ * @param conf Storm configuration
+ * @param login_conf login configuration
+ */
+ public void prepare(Map storm_conf, Configuration login_conf) {
+ this.login_conf = login_conf;
+ }
+
+ public TServer getServer(int port, TProcessor processor) throws IOException, TTransportException {
+ TTransportFactory serverTransportFactory = getServerTransportFactory();
+
+ //define THsHaServer args
+ //original: THsHaServer + TNonblockingServerSocket
+ //option: TThreadPoolServer + TServerSocket
+ TServerSocket serverTransport = new TServerSocket(port);
+ TThreadPoolServer.Args server_args = new TThreadPoolServer.Args(serverTransport).
+ processor(new TUGIWrapProcessor(processor)).
+ minWorkerThreads(64).
+ maxWorkerThreads(64).
+ protocolFactory(new TBinaryProtocol.Factory());
+ if (serverTransportFactory != null)
+ server_args.transportFactory(serverTransportFactory);
+
+ //construct THsHaServer
+ return new TThreadPoolServer(server_args);
+ }
+
+ /**
+ * All subclass must implement this method
+ * @return
+ * @throws IOException
+ */
+ protected abstract TTransportFactory getServerTransportFactory() throws IOException;
+
+
+ /**
+ * Processor that pulls the SaslServer object out of the transport, and
+ * assumes the remote user's UGI before calling through to the original
+ * processor.
+ *
+ * This is used on the server side to set the UGI for each specific call.
+ */
+ private class TUGIWrapProcessor implements TProcessor {
+ final TProcessor wrapped;
+
+ TUGIWrapProcessor(TProcessor wrapped) {
+ this.wrapped = wrapped;
+ }
+
+ public boolean process(final TProtocol inProt, final TProtocol outProt) throws TException {
+ //populating request context
+ ReqContext req_context = ReqContext.context();
+
+ TTransport trans = inProt.getTransport();
+ //Sasl transport
+ TSaslServerTransport saslTrans = (TSaslServerTransport)trans;
+
+ //remote address
+ TSocket tsocket = (TSocket)saslTrans.getUnderlyingTransport();
+ Socket socket = tsocket.getSocket();
+ req_context.setRemoteAddress(socket.getInetAddress());
+
+ //remote subject
+ SaslServer saslServer = saslTrans.getSaslServer();
+ String authId = saslServer.getAuthorizationID();
+ Subject remoteUser = new Subject();
+ remoteUser.getPrincipals().add(new User(authId));
+ req_context.setSubject(remoteUser);
+
+ //invoke service handler
+ return wrapped.process(inProt, outProt);
+ }
+ }
+
+ public static class User implements Principal {
+ private final String name;
+
+ public User(String name) {
+ this.name = name;
+ }
+
+ /**
+ * Get the full name of the user.
+ */
+ public String getName() {
+ return name;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ } else if (o == null || getClass() != o.getClass()) {
+ return false;
+ } else {
+ return (name.equals(((User) o).name));
+ }
+ }
+
+ @Override
+ public int hashCode() {
+ return name.hashCode();
+ }
+
+ @Override
+ public String toString() {
+ return name;
+ }
+ }
+}
View
107 src/jvm/backtype/storm/security/auth/SimpleTransportPlugin.java
@@ -0,0 +1,107 @@
+package backtype.storm.security.auth;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.Socket;
+import java.net.UnknownHostException;
+import java.util.Map;
+
+import javax.security.auth.login.Configuration;
+import org.apache.thrift7.TException;
+import org.apache.thrift7.TProcessor;
+import org.apache.thrift7.protocol.TBinaryProtocol;
+import org.apache.thrift7.protocol.TProtocol;
+import org.apache.thrift7.server.THsHaServer;
+import org.apache.thrift7.server.TServer;
+import org.apache.thrift7.transport.TFramedTransport;
+import org.apache.thrift7.transport.TMemoryInputTransport;
+import org.apache.thrift7.transport.TNonblockingServerSocket;
+import org.apache.thrift7.transport.TSocket;
+import org.apache.thrift7.transport.TTransport;
+import org.apache.thrift7.transport.TTransportException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Simple transport for Thrift plugin.
+ *
+ * This plugin is designed to be backward compatible with existing Storm code.
+ */
+public class SimpleTransportPlugin implements ITransportPlugin {
+ protected Configuration login_conf;
+ private static final Logger LOG = LoggerFactory.getLogger(SimpleTransportPlugin.class);
+
+ /**
+ * Invoked once immediately after construction
+ * @param conf Storm configuration
+ * @param login_conf login configuration
+ */
+ public void prepare(Map storm_conf, Configuration login_conf) {
+ this.login_conf = login_conf;
+ }
+
+ /**
+ * We will let Thrift to apply default transport factory
+ */
+ public TServer getServer(int port, TProcessor processor) throws IOException, TTransportException {
+ TNonblockingServerSocket serverTransport = new TNonblockingServerSocket(port);
+ THsHaServer.Args server_args = new THsHaServer.Args(serverTransport).
+ processor(new SimpleWrapProcessor(processor)).
+ workerThreads(64).
+ protocolFactory(new TBinaryProtocol.Factory());
+
+ //construct THsHaServer
+ return new THsHaServer(server_args);
+ }
+
+ /**
+ * Connect to the specified server via framed transport
+ * @param transport The underlying Thrift transport.
+ */
+ public TTransport connect(TTransport transport, String serverHost) throws TTransportException {
+ //create a framed transport
+ TTransport conn = new TFramedTransport(transport);
+
+ //connect
+ conn.open();
+ LOG.debug("Simple client transport has been established");
+
+ return conn;
+ }
+
+ /**
+ * Processor that populate simple transport info into ReqContext, and then invoke a service handler
+ */
+ private class SimpleWrapProcessor implements TProcessor {
+ final TProcessor wrapped;
+
+ SimpleWrapProcessor(TProcessor wrapped) {
+ this.wrapped = wrapped;
+ }
+
+ public boolean process(final TProtocol inProt, final TProtocol outProt) throws TException {
+ //populating request context
+ ReqContext req_context = ReqContext.context();
+
+ TTransport trans = inProt.getTransport();
+ if (trans instanceof TMemoryInputTransport) {
+ try {
+ req_context.setRemoteAddress(InetAddress.getLocalHost());
+ } catch (UnknownHostException e) {
+ throw new RuntimeException(e);
+ }
+ } else if (trans instanceof TSocket) {
+ TSocket tsocket = (TSocket)trans;
+ //remote address
+ Socket socket = tsocket.getSocket();
+ req_context.setRemoteAddress(socket.getInetAddress());
+ }
+
+ //anonymous user
+ req_context.setSubject(null);
+
+ //invoke service handler
+ return wrapped.process(inProt, outProt);
+ }
+ }
+}
View
62 src/jvm/backtype/storm/security/auth/ThriftClient.java
@@ -0,0 +1,62 @@
+package backtype.storm.security.auth;
+
+import java.io.IOException;
+import java.util.Map;
+import javax.security.auth.login.Configuration;
+import org.apache.thrift7.protocol.TBinaryProtocol;
+import org.apache.thrift7.protocol.TProtocol;
+import org.apache.thrift7.transport.TSocket;
+import org.apache.thrift7.transport.TTransport;
+import org.apache.thrift7.transport.TTransportException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import backtype.storm.utils.Utils;
+
+public class ThriftClient {
+ private static final Logger LOG = LoggerFactory.getLogger(ThriftClient.class);
+ private TTransport _transport;
+ protected TProtocol _protocol;
+
+ public ThriftClient(Map storm_conf, String host, int port) throws TTransportException {
+ this(storm_conf, host, port, null);
+ }
+
+ public ThriftClient(Map storm_conf, String host, int port, Integer timeout) throws TTransportException {
+ try {
+ //locate login configuration
+ Configuration login_conf = AuthUtils.GetConfiguration(storm_conf);
+
+ //construct a transport plugin
+ ITransportPlugin transportPlugin = AuthUtils.GetTransportPlugin(storm_conf, login_conf);
+
+ //create a socket with server
+ if(host==null) {
+ throw new IllegalArgumentException("host is not set");
+ }
+ if(port<=0) {
+ throw new IllegalArgumentException("invalid port: "+port);
+ }
+ TSocket socket = new TSocket(host, port);
+ if(timeout!=null) {
+ socket.setTimeout(timeout);
+ }
+ final TTransport underlyingTransport = socket;
+
+ //establish client-server transport via plugin
+ _transport = transportPlugin.connect(underlyingTransport, host);
+ } catch (IOException ex) {
+ throw new RuntimeException(ex);
+ }
+ _protocol = null;
+ if (_transport != null)
+ _protocol = new TBinaryProtocol(_transport);
+ }
+
+ public TTransport transport() {
+ return _transport;
+ }
+
+ public void close() {
+ _transport.close();
+ }
+}
View
62 src/jvm/backtype/storm/security/auth/ThriftServer.java
@@ -0,0 +1,62 @@
+package backtype.storm.security.auth;
+
+import java.util.Map;
+import javax.security.auth.login.Configuration;
+import org.apache.thrift7.TProcessor;
+import org.apache.thrift7.server.TServer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import backtype.storm.utils.Utils;
+
+public class ThriftServer {
+ private static final Logger LOG = LoggerFactory.getLogger(ThriftServer.class);
+ private Map _storm_conf; //storm configuration
+ protected TProcessor _processor = null;
+ private int _port = 0;
+ private TServer _server = null;
+ private Configuration _login_conf;
+
+ public ThriftServer(Map storm_conf, TProcessor processor, int port) {
+ try {
+ _storm_conf = storm_conf;
+ _processor = processor;
+ _port = port;
+
+ //retrieve authentication configuration
+ _login_conf = AuthUtils.GetConfiguration(_storm_conf);
+ } catch (Exception x) {
+ LOG.error(x.getMessage(), x);
+ }
+ }
+
+ public void stop() {
+ if (_server != null)
+ _server.stop();
+ }
+
+ /**
+ * Is ThriftServer listening to requests?
+ * @return
+ */
+ public boolean isServing() {
+ if (_server == null) return false;
+ return _server.isServing();
+ }
+
+ public void serve() {
+ try {
+ //locate our thrift transport plugin
+ ITransportPlugin transportPlugin = AuthUtils.GetTransportPlugin(_storm_conf, _login_conf);
+
+ //server
+ _server = transportPlugin.getServer(_port, _processor);
+
+ //start accepting requests
+ _server.serve();
+ } catch (Exception ex) {
+ LOG.error("ThriftServer is being stopped due to: " + ex, ex);
+ if (_server != null) _server.stop();
+ Runtime.getRuntime().halt(1); //shutdown server process since we could not handle Thrift requests any more
+ }
+ }
+}
View
40 src/jvm/backtype/storm/security/auth/authorizer/DenyAuthorizer.java
@@ -0,0 +1,40 @@
+package backtype.storm.security.auth.authorizer;
+
+import java.util.Map;
+
+import backtype.storm.Config;
+import backtype.storm.security.auth.IAuthorizer;
+import backtype.storm.security.auth.ReqContext;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * An authorization implementation that denies everything, for testing purposes
+ */
+public class DenyAuthorizer implements IAuthorizer {
+ private static final Logger LOG = LoggerFactory.getLogger(DenyAuthorizer.class);
+
+ /**
+ * Invoked once immediately after construction
+ * @param conf Storm configuration
+ */
+ public void prepare(Map conf) {
+ }
+
+ /**
+ * permit() method is invoked for each incoming Thrift request
+ * @param contrext request context
+ * @param operation operation name
+ * @param topology_storm configuration of targeted topology
+ * @return true if the request is authorized, false if reject
+ */
+ public boolean permit(ReqContext context, String operation, Map topology_conf) {
+ LOG.info("[req "+ context.requestID()+ "] Access "
+ + " from: " + (context.remoteAddress() == null? "null" : context.remoteAddress().toString())
+ + " principal:"+ (context.principal() == null? "null" : context.principal())
+ +" op:"+operation
+ + " topoology:"+topology_conf.get(Config.TOPOLOGY_NAME));
+ return false;
+ }
+}
View
40 src/jvm/backtype/storm/security/auth/authorizer/NoopAuthorizer.java
@@ -0,0 +1,40 @@
+package backtype.storm.security.auth.authorizer;
+
+import java.util.Map;
+
+import backtype.storm.Config;
+import backtype.storm.security.auth.IAuthorizer;
+import backtype.storm.security.auth.ReqContext;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A no-op authorization implementation that illustrate info available for authorization decisions.
+ */
+public class NoopAuthorizer implements IAuthorizer {
+ private static final Logger LOG = LoggerFactory.getLogger(NoopAuthorizer.class);
+
+ /**
+ * Invoked once immediately after construction
+ * @param conf Storm configuration
+ */
+ public void prepare(Map conf) {
+ }
+
+ /**
+ * permit() method is invoked for each incoming Thrift request
+ * @param context request context includes info about
+ * @param operation operation name
+ * @param topology_storm configuration of targeted topology
+ * @return true if the request is authorized, false if reject
+ */
+ public boolean permit(ReqContext context, String operation, Map topology_conf) {
+ LOG.info("[req "+ context.requestID()+ "] Access "
+ + " from: " + (context.remoteAddress() == null? "null" : context.remoteAddress().toString())
+ + " principal:"+(context.principal() == null? "null" : context.principal())
+ +" op:"+ operation
+ + " topoology:"+ topology_conf.get(Config.TOPOLOGY_NAME));
+ return true;
+ }
+}
View
93 src/jvm/backtype/storm/security/auth/digest/ClientCallbackHandler.java
@@ -0,0 +1,93 @@
+package backtype.storm.security.auth.digest;
+
+import java.io.IOException;
+import javax.security.auth.callback.Callback;
+import javax.security.auth.callback.CallbackHandler;
+import javax.security.auth.callback.NameCallback;
+import javax.security.auth.callback.PasswordCallback;
+import javax.security.auth.callback.UnsupportedCallbackException;
+import javax.security.sasl.AuthorizeCallback;
+import javax.security.sasl.RealmCallback;
+import javax.security.auth.login.AppConfigurationEntry;
+import javax.security.auth.login.Configuration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import backtype.storm.security.auth.AuthUtils;
+
+/**
+ * client side callback handler.
+ */
+public class ClientCallbackHandler implements CallbackHandler {
+ private static final String USERNAME = "username";
+ private static final String PASSWORD = "password";
+ private static final Logger LOG = LoggerFactory.getLogger(ClientCallbackHandler.class);
+ private String _username = null;
+ private String _password = null;
+
+ /**
+ * Constructor based on a JAAS configuration
+ *
+ * For digest, you should have a pair of user name and password defined.
+ *
+ * @param configuration
+ * @throws IOException
+ */
+ public ClientCallbackHandler(Configuration configuration) throws IOException {
+ if (configuration == null) return;
+ AppConfigurationEntry configurationEntries[] = configuration.getAppConfigurationEntry(AuthUtils.LOGIN_CONTEXT_CLIENT);
+ if (configurationEntries == null) {
+ String errorMessage = "Could not find a '"+AuthUtils.LOGIN_CONTEXT_CLIENT
+ + "' entry in this configuration: Client cannot start.";
+ throw new IOException(errorMessage);
+ }
+
+ _password = "";
+ for(AppConfigurationEntry entry: configurationEntries) {
+ if (entry.getOptions().get(USERNAME) != null) {
+ _username = (String)entry.getOptions().get(USERNAME);
+ }
+ if (entry.getOptions().get(PASSWORD) != null) {
+ _password = (String)entry.getOptions().get(PASSWORD);
+ }
+ }
+ }
+
+ /**
+ * This method is invoked by SASL for authentication challenges
+ * @param callbacks a collection of challenge callbacks
+ */
+ public void handle(Callback[] callbacks) throws IOException, UnsupportedCallbackException {
+ for (Callback c : callbacks) {
+ if (c instanceof NameCallback) {
+ LOG.debug("name callback");
+ NameCallback nc = (NameCallback) c;
+ nc.setName(_username);
+ } else if (c instanceof PasswordCallback) {
+ LOG.debug("password callback");
+ PasswordCallback pc = (PasswordCallback)c;
+ if (_password != null) {
+ pc.setPassword(_password.toCharArray());
+ }
+ } else if (c instanceof AuthorizeCallback) {
+ LOG.debug("authorization callback");
+ AuthorizeCallback ac = (AuthorizeCallback) c;
+ String authid = ac.getAuthenticationID();
+ String authzid = ac.getAuthorizationID();
+ if (authid.equals(authzid)) {
+ ac.setAuthorized(true);
+ } else {
+ ac.setAuthorized(false);
+ }
+ if (ac.isAuthorized()) {
+ ac.setAuthorizedID(authzid);
+ }
+ } else if (c instanceof RealmCallback) {
+ RealmCallback rc = (RealmCallback) c;
+ ((RealmCallback) c).setText(rc.getDefaultText());
+ } else {
+ throw new UnsupportedCallbackException(c);
+ }
+ }
+ }
+}
View
52 src/jvm/backtype/storm/security/auth/digest/DigestSaslTransportPlugin.java
@@ -0,0 +1,52 @@
+package backtype.storm.security.auth.digest;
+
+import java.io.IOException;
+import java.util.Map;
+
+import javax.security.auth.callback.CallbackHandler;
+import javax.security.auth.login.Configuration;
+
+import org.apache.thrift7.transport.TSaslClientTransport;
+import org.apache.thrift7.transport.TSaslServerTransport;
+import org.apache.thrift7.transport.TTransport;
+import org.apache.thrift7.transport.TTransportException;
+import org.apache.thrift7.transport.TTransportFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import backtype.storm.security.auth.AuthUtils;
+import backtype.storm.security.auth.SaslTransportPlugin;
+
+public class DigestSaslTransportPlugin extends SaslTransportPlugin {
+ public static final String DIGEST = "DIGEST-MD5";
+ private static final Logger LOG = LoggerFactory.getLogger(DigestSaslTransportPlugin.class);
+
+ protected TTransportFactory getServerTransportFactory() throws IOException {
+ //create an authentication callback handler
+ CallbackHandler serer_callback_handler = new ServerCallbackHandler(login_conf);
+
+ //create a transport factory that will invoke our auth callback for digest
+ TSaslServerTransport.Factory factory = new TSaslServerTransport.Factory();
+ factory.addServerDefinition(DIGEST, AuthUtils.SERVICE, "localhost", null, serer_callback_handler);
+
+ LOG.info("SASL DIGEST-MD5 transport factory will be used");
+ return factory;
+ }
+
+ public TTransport connect(TTransport transport, String serverHost) throws TTransportException, IOException {
+ ClientCallbackHandler client_callback_handler = new ClientCallbackHandler(login_conf);
+ TSaslClientTransport wrapper_transport = new TSaslClientTransport(DIGEST,
+ null,
+ AuthUtils.SERVICE,
+ serverHost,
+ null,
+ client_callback_handler,
+ transport);
+
+ wrapper_transport.open();
+ LOG.debug("SASL DIGEST-MD5 client transport has been established");
+
+ return wrapper_transport;
+ }
+
+}
View
97 src/jvm/backtype/storm/security/auth/digest/ServerCallbackHandler.java
@@ -0,0 +1,97 @@
+package backtype.storm.security.auth.digest;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import javax.security.auth.callback.Callback;
+import javax.security.auth.callback.CallbackHandler;
+import javax.security.auth.callback.NameCallback;
+import javax.security.auth.callback.PasswordCallback;
+import javax.security.auth.callback.UnsupportedCallbackException;
+import javax.security.auth.login.AppConfigurationEntry;
+import javax.security.auth.login.Configuration;
+import javax.security.sasl.AuthorizeCallback;
+import javax.security.sasl.RealmCallback;
+
+import backtype.storm.security.auth.AuthUtils;
+
+/**
+ * SASL server side collback handler
+ */
+public class ServerCallbackHandler implements CallbackHandler {
+ private static final String USER_PREFIX = "user_";
+ private static final Logger LOG = LoggerFactory.getLogger(ServerCallbackHandler.class);
+ private static final String SYSPROP_SUPER_PASSWORD = "storm.SASLAuthenticationProvider.superPassword";
+
+ private String userName;
+ private final Map<String,String> credentials = new HashMap<String,String>();
+
+ public ServerCallbackHandler(Configuration configuration) throws IOException {
+ if (configuration==null) return;
+
+ AppConfigurationEntry configurationEntries[] = configuration.getAppConfigurationEntry(AuthUtils.LOGIN_CONTEXT_SERVER);
+ if (configurationEntries == null) {
+ String errorMessage = "Could not find a '"+AuthUtils.LOGIN_CONTEXT_SERVER+"' entry in this configuration: Server cannot start.";
+ throw new IOException(errorMessage);
+ }
+ credentials.clear();
+ for(AppConfigurationEntry entry: configurationEntries) {
+ Map<String,?> options = entry.getOptions();
+ // Populate DIGEST-MD5 user -> password map with JAAS configuration entries from the "Server" section.
+ // Usernames are distinguished from other options by prefixing the username with a "user_" prefix.
+ for(Map.Entry<String, ?> pair : options.entrySet()) {
+ String key = pair.getKey();
+ if (key.startsWith(USER_PREFIX)) {
+ String userName = key.substring(USER_PREFIX.length());
+ credentials.put(userName,(String)pair.getValue());
+ }
+ }
+ }
+ }
+
+ public void handle(Callback[] callbacks) throws UnsupportedCallbackException {
+ for (Callback callback : callbacks) {
+ if (callback instanceof NameCallback) {
+ handleNameCallback((NameCallback) callback);
+ } else if (callback instanceof PasswordCallback) {
+ handlePasswordCallback((PasswordCallback) callback);
+ } else if (callback instanceof RealmCallback) {
+ handleRealmCallback((RealmCallback) callback);
+ } else if (callback instanceof AuthorizeCallback) {
+ handleAuthorizeCallback((AuthorizeCallback) callback);
+ }
+ }
+ }
+
+ private void handleNameCallback(NameCallback nc) {
+ LOG.debug("handleNameCallback");
+ userName = nc.getDefaultName();
+ nc.setName(nc.getDefaultName());
+ }
+
+ private void handlePasswordCallback(PasswordCallback pc) {
+ LOG.debug("handlePasswordCallback");
+ if ("super".equals(this.userName) && System.getProperty(SYSPROP_SUPER_PASSWORD) != null) {
+ // superuser: use Java system property for password, if available.
+ pc.setPassword(System.getProperty(SYSPROP_SUPER_PASSWORD).toCharArray());
+ } else if (credentials.containsKey(userName) ) {
+ pc.setPassword(credentials.get(userName).toCharArray());
+ } else {
+ LOG.warn("No password found for user: " + userName);
+ }
+ }
+
+ private void handleRealmCallback(RealmCallback rc) {
+ LOG.debug("handleRealmCallback: "+ rc.getDefaultText());
+ rc.setText(rc.getDefaultText());
+ }
+
+ private void handleAuthorizeCallback(AuthorizeCallback ac) {
+ String authenticationID = ac.getAuthenticationID();
+ LOG.debug("Successfully authenticated client: authenticationID=" + authenticationID);
+ ac.setAuthorizedID(authenticationID);
+ ac.setAuthorized(true);
+ }
+}
View
51 src/jvm/backtype/storm/utils/NimbusClient.java
@@ -1,47 +1,38 @@
package backtype.storm.utils;
import backtype.storm.Config;
+import backtype.storm.security.auth.ThriftClient;
import backtype.storm.generated.Nimbus;
import java.util.Map;
-import org.apache.thrift7.TException;
-import org.apache.thrift7.protocol.TBinaryProtocol;
-import org.apache.thrift7.transport.TFramedTransport;
-import org.apache.thrift7.transport.TSocket;
-import org.apache.thrift7.transport.TTransport;
+import org.apache.thrift7.transport.TTransportException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+public class NimbusClient extends ThriftClient {
+ private Nimbus.Client _client;
+ private static final Logger LOG = LoggerFactory.getLogger(NimbusClient.class);
-public class NimbusClient {
public static NimbusClient getConfiguredClient(Map conf) {
- String nimbusHost = (String) conf.get(Config.NIMBUS_HOST);
- int nimbusPort = Utils.getInt(conf.get(Config.NIMBUS_THRIFT_PORT));
- return new NimbusClient(nimbusHost, nimbusPort);
+ try {
+ String nimbusHost = (String) conf.get(Config.NIMBUS_HOST);
+ int nimbusPort = Utils.getInt(conf.get(Config.NIMBUS_THRIFT_PORT));
+ Integer timeout = Utils.getInt(conf.get(Config.NIMBUS_TASK_TIMEOUT_SECS));
+ return new NimbusClient(conf, nimbusHost, nimbusPort, timeout);
+ } catch (TTransportException ex) {
+ throw new RuntimeException(ex);
+ }
}
- private TTransport conn;
- private Nimbus.Client client;
-
- public NimbusClient(String host) {
- this(host, 6627);
+ public NimbusClient(Map conf, String host, int port) throws TTransportException {
+ this(conf, host, port, null);
}
- public NimbusClient(String host, int port) {
- try {
- if(host==null) {
- throw new IllegalArgumentException("Nimbus host is not set");
- }
- conn = new TFramedTransport(new TSocket(host, port));
- client = new Nimbus.Client(new TBinaryProtocol(conn));
- conn.open();
- } catch(TException e) {
- throw new RuntimeException(e);
- }
+ public NimbusClient(Map conf, String host, int port, Integer timeout) throws TTransportException {
+ super(conf, host, port, timeout);
+ _client = new Nimbus.Client(_protocol);
}
public Nimbus.Client getClient() {
- return client;
- }
-
- public void close() {
- conn.close();
+ return _client;
}
}
View
218 test/clj/backtype/storm/security/auth/auth_test.clj
@@ -0,0 +1,218 @@
+(ns backtype.storm.security.auth.auth-test
+ (:use [clojure test])
+ (:require [backtype.storm.daemon [nimbus :as nimbus]])
+ (:import [org.apache.thrift7 TException])
+ (:import [org.apache.thrift7.transport TTransportException])
+ (:import [java.nio ByteBuffer])
+ (:import [backtype.storm Config])
+ (:import [backtype.storm.utils NimbusClient])
+ (:import [backtype.storm.security.auth AuthUtils ThriftServer ThriftClient
+ ReqContext])
+ (:use [backtype.storm bootstrap util])
+ (:use [backtype.storm.daemon common])
+ (:use [backtype.storm bootstrap testing])
+ (:import [backtype.storm.generated Nimbus Nimbus$Client])
+ )
+
+(bootstrap)
+
+(def nimbus-timeout (Integer. 30))
+
+(defn mk-authorization-handler [storm-conf]
+ (let [klassname (storm-conf NIMBUS-AUTHORIZER)
+ aznClass (if klassname (Class/forName klassname))
+ aznHandler (if aznClass (.newInstance aznClass))]
+ (if aznHandler (.prepare aznHandler storm-conf))
+ (log-debug "authorization class name:" klassname
+ " class:" aznClass
+ " handler:" aznHandler)
+ aznHandler
+ ))
+
+(defn nimbus-data [storm-conf inimbus]
+ (let [forced-scheduler (.getForcedScheduler inimbus)]
+ {:conf storm-conf
+ :inimbus inimbus
+ :authorization-handler (mk-authorization-handler storm-conf)
+ :submitted-count (atom 0)
+ :storm-cluster-state nil
+ :submit-lock (Object.)
+ :heartbeats-cache (atom {})
+ :downloaders nil
+ :uploaders nil
+ :uptime (uptime-computer)
+ :validator nil
+ :timer nil
+ :scheduler nil
+ }))
+
+(defn check-authorization! [nimbus storm-name storm-conf operation]
+ (let [aclHandler (:authorization-handler nimbus)]
+ (log-debug "check-authorization with handler: " aclHandler)
+ (if aclHandler
+ (if-not (.permit aclHandler
+ (ReqContext/context)
+ operation
+ (if storm-conf storm-conf {TOPOLOGY-NAME storm-name}))
+ (throw (RuntimeException. (str operation " on topology " storm-name " is not authorized")))
+ ))))
+
+(defn dummy-service-handler [conf inimbus]
+ (let [nimbus (nimbus-data conf inimbus)]
+ (reify Nimbus$Iface
+ (^void submitTopologyWithOpts [this ^String storm-name ^String uploadedJarLocation ^String serializedConf ^StormTopology topology
+ ^SubmitOptions submitOptions]
+ (check-authorization! nimbus storm-name nil "submitTopology"))
+
+ (^void killTopology [this ^String storm-name]
+ (check-authorization! nimbus storm-name nil "killTopology"))
+
+ (^void killTopologyWithOpts [this ^String storm-name ^KillOptions options]
+ (check-authorization! nimbus storm-name nil "killTopology"))
+
+ (^void rebalance [this ^String storm-name ^RebalanceOptions options]
+ (check-authorization! nimbus storm-name nil "rebalance"))
+
+ (activate [this storm-name]
+ (check-authorization! nimbus storm-name nil "activate"))
+
+ (deactivate [this storm-name]
+ (check-authorization! nimbus storm-name nil "deactivate"))
+
+ (beginFileUpload [this])
+
+ (^void uploadChunk [this ^String location ^ByteBuffer chunk])
+
+ (^void finishFileUpload [this ^String location])
+
+ (^String beginFileDownload [this ^String file])
+
+ (^ByteBuffer downloadChunk [this ^String id])
+
+ (^String getNimbusConf [this])
+
+ (^String getTopologyConf [this ^String id])
+
+ (^StormTopology getTopology [this ^String id])
+
+ (^StormTopology getUserTopology [this ^String id])
+
+ (^ClusterSummary getClusterInfo [this])
+
+ (^TopologyInfo getTopologyInfo [this ^String storm-id]))))
+
+(defn launch-server [server-port login-cfg aznClass transportPluginClass]
+ (let [conf1 (merge (read-storm-config)
+ {NIMBUS-AUTHORIZER aznClass
+ NIMBUS-HOST "localhost"
+ NIMBUS-THRIFT-PORT server-port
+ STORM-THRIFT-TRANSPORT-PLUGIN transportPluginClass})
+ conf (if login-cfg (merge conf1 {"java.security.auth.login.config" login-cfg}) conf1)
+ nimbus (nimbus/standalone-nimbus)
+ service-handler (dummy-service-handler conf nimbus)
+ server (ThriftServer. conf (Nimbus$Processor. service-handler) (int (conf NIMBUS-THRIFT-PORT)))]
+ (.addShutdownHook (Runtime/getRuntime) (Thread. (fn [] (.stop server))))
+ (.start (Thread. #(.serve server)))
+ (wait-for-condition #(.isServing server))))
+
+(deftest Simple-authentication-test
+ (launch-server 6627 nil nil "backtype.storm.security.auth.SimpleTransportPlugin")
+
+ (let [storm-conf (merge (read-storm-config)
+ {STORM-THRIFT-TRANSPORT-PLUGIN "backtype.storm.security.auth.SimpleTransportPlugin"})
+ client (NimbusClient. storm-conf "localhost" 6627 nimbus-timeout)
+ nimbus_client (.getClient client)]
+ (.activate nimbus_client "security_auth_test_topology")
+ (.close client))
+
+ (let [storm-conf (merge (read-storm-config)
+ {STORM-THRIFT-TRANSPORT-PLUGIN "backtype.storm.security.auth.digest.DigestSaslTransportPlugin"
+ "java.security.auth.login.config" "test/clj/backtype/storm/security/auth/jaas_digest.conf"})]
+ (testing "(Negative authentication) Server: Simple vs. Client: Digest"
+ (is (thrown-cause? java.net.SocketTimeoutException
+ (NimbusClient. storm-conf "localhost" 6627 nimbus-timeout))))))
+
+(deftest positive-authorization-test
+ (launch-server 6628 nil
+ "backtype.storm.security.auth.authorizer.NoopAuthorizer"
+ "backtype.storm.security.auth.SimpleTransportPlugin")
+ (let [storm-conf (merge (read-storm-config)
+ {STORM-THRIFT-TRANSPORT-PLUGIN "backtype.storm.security.auth.SimpleTransportPlugin"})
+ client (NimbusClient. storm-conf "localhost" 6628 nimbus-timeout)
+ nimbus_client (.getClient client)]
+ (testing "(Positive authorization) Authorization plugin should accept client request"
+ (.activate nimbus_client "security_auth_test_topology"))
+ (.close client)))
+
+(deftest deny-authorization-test
+ (launch-server 6629 nil
+ "backtype.storm.security.auth.authorizer.DenyAuthorizer"
+ "backtype.storm.security.auth.SimpleTransportPlugin")
+ (let [storm-conf (merge (read-storm-config)
+ {STORM-THRIFT-TRANSPORT-PLUGIN "backtype.storm.security.auth.SimpleTransportPlugin"
+ Config/NIMBUS_HOST "localhost"
+ Config/NIMBUS_THRIFT_PORT 6629
+ Config/NIMBUS_TASK_TIMEOUT_SECS nimbus-timeout})
+ client (NimbusClient/getConfiguredClient storm-conf)
+ nimbus_client (.getClient client)]
+ (testing "(Negative authorization) Authorization plugin should reject client request"
+ (is (thrown? TTransportException
+ (.activate nimbus_client "security_auth_test_topology"))))
+ (.close client)))
+
+(deftest digest-authentication-test
+ (launch-server 6630
+ "test/clj/backtype/storm/security/auth/jaas_digest.conf"
+ nil
+ "backtype.storm.security.auth.digest.DigestSaslTransportPlugin")
+ (let [storm-conf (merge (read-storm-config)
+ {STORM-THRIFT-TRANSPORT-PLUGIN "backtype.storm.security.auth.digest.DigestSaslTransportPlugin"
+ "java.security.auth.login.config" "test/clj/backtype/storm/security/auth/jaas_digest.conf"})
+ client (NimbusClient. storm-conf "localhost" 6630 nimbus-timeout)
+ nimbus_client (.getClient client)]
+ (testing "(Positive authentication) valid digest authentication"
+ (.activate nimbus_client "security_auth_test_topology"))
+ (.close client))
+
+ (let [storm-conf (merge (read-storm-config)
+ {STORM-THRIFT-TRANSPORT-PLUGIN "backtype.storm.security.auth.SimpleTransportPlugin"})
+ client (NimbusClient. storm-conf "localhost" 6630 nimbus-timeout)
+ nimbus_client (.getClient client)]
+ (testing "(Negative authentication) Server: Digest vs. Client: Simple"
+ (is (thrown-cause? java.net.SocketTimeoutException
+ (.activate nimbus_client "security_auth_test_topology"))))
+ (.close client))
+
+ (let [storm-conf (merge (read-storm-config)
+ {STORM-THRIFT-TRANSPORT-PLUGIN "backtype.storm.security.auth.digest.DigestSaslTransportPlugin"
+ "java.security.auth.login.config" "test/clj/backtype/storm/security/auth/jaas_digest_bad_password.conf"})]
+ (testing "(Negative authentication) Invalid password"
+ (is (thrown? TTransportException
+ (NimbusClient. storm-conf "localhost" 6630 nimbus-timeout)))))
+
+ (let [storm-conf (merge (read-storm-config)
+ {STORM-THRIFT-TRANSPORT-PLUGIN "backtype.storm.security.auth.digest.DigestSaslTransportPlugin"
+ "java.security.auth.login.config" "test/clj/backtype/storm/security/auth/jaas_digest_unknown_user.conf"})]
+ (testing "(Negative authentication) Unknown user"
+ (is (thrown? TTransportException
+ (NimbusClient. storm-conf "localhost" 6630 nimbus-timeout)))))
+
+ (let [storm-conf (merge (read-storm-config)
+ {STORM-THRIFT-TRANSPORT-PLUGIN "backtype.storm.security.auth.digest.DigestSaslTransportPlugin"
+ "java.security.auth.login.config" "test/clj/backtype/storm/security/auth/nonexistent.conf"})]
+ (testing "(Negative authentication) nonexistent configuration file"
+ (is (thrown? RuntimeException
+ (NimbusClient. storm-conf "localhost" 6630 nimbus-timeout)))))
+
+ (let [storm-conf (merge (read-storm-config)
+ {STORM-THRIFT-TRANSPORT-PLUGIN "backtype.storm.security.auth.digest.DigestSaslTransportPlugin"
+ "java.security.auth.login.config" "test/clj/backtype/storm/security/auth/jaas_digest_missing_client.conf"})]
+ (testing "(Negative authentication) Missing client"
+ (is (thrown-cause? java.io.IOException
+ (NimbusClient. storm-conf "localhost" 6630 nimbus-timeout))))))
+
+
+(deftest test-GetTransportPlugin-throws-RuntimeException
+ (let [conf (merge (read-storm-config)
+ {Config/STORM_THRIFT_TRANSPORT_PLUGIN "null.invalid"})]
+ (is (thrown? RuntimeException (AuthUtils/GetTransportPlugin conf nil)))))
View
12 test/clj/backtype/storm/security/auth/jaas_digest.conf
@@ -0,0 +1,12 @@
+/* This sample file illustrates how Digest authentication should be configured
+*/
+StormServer {
+ org.apache.zookeeper.server.auth.DigestLoginModule required
+ user_super="adminsecret"
+ user_bob="bobsecret";
+};
+StormClient {
+ org.apache.zookeeper.server.auth.DigestLoginModule required
+ username="bob"
+ password="bobsecret";
+};
View
13 test/clj/backtype/storm/security/auth/jaas_digest_bad_password.conf
@@ -0,0 +1,13 @@
+/* This sample file containes incorrect password of a user.
+ We use this file for negative test.
+*/
+StormServer {
+ org.apache.zookeeper.server.auth.DigestLoginModule required
+ user_super="adminsecret"
+ user_bob="bobsecret";
+};
+StormClient {
+ org.apache.zookeeper.server.auth.DigestLoginModule required
+ username="bob"
+ password="bad_password";
+};
View
5 test/clj/backtype/storm/security/auth/jaas_digest_missing_client.conf
@@ -0,0 +1,5 @@
+StormServer {
+ org.apache.zookeeper.server.auth.DigestLoginModule required
+ user_super="adminsecret"
+ user_bob="bobsecret";
+};
View
13 test/clj/backtype/storm/security/auth/jaas_digest_unknown_user.conf
@@ -0,0 +1,13 @@
+/* This sample file containes an unauthorized user.
+ We use this file for negative test.
+*/
+StormServer {
+ org.apache.zookeeper.server.auth.DigestLoginModule required
+ user_super="adminsecret"
+ user_bob="bobsecret";
+};
+StormClient {
+ org.apache.zookeeper.server.auth.DigestLoginModule required
+ username="unknown_user"
+ password="some_password";
+};

0 comments on commit d6374fe

Please sign in to comment.
Something went wrong with that request. Please try again.