Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

Apply Thrift SASL client/server framework for authentication/authoriz…

…ation/audit
  • Loading branch information...
commit 96c51312a9149a0c86509823f2494dd54f943c9a 1 parent 01c4147
afeng authored
View
10 conf/jaas_digest.conf
@@ -0,0 +1,10 @@
+StormServer {
+ org.apache.zookeeper.server.auth.DigestLoginModule required
+ user_super="adminsecret"
+ user_bob="bobsecret";
+};
+StormClient {
+ org.apache.zookeeper.server.auth.DigestLoginModule required
+ username="bob"
+ password="bobsecret";
+};
View
17 conf/jaas_kerberos_cluster.conf
@@ -0,0 +1,17 @@
+StormServer {
+ com.sun.security.auth.module.Krb5LoginModule required
+ useKeyTab=true
+ keyTab="/etc/nimbus_server.keytab"
+ storeKey=true
+ useTicketCache=false
+ principal="storm_server/carcloth.corp.acme.com@STORM.CORP.ACME.COM";
+};
+StormClient {
+ com.sun.security.auth.module.Krb5LoginModule required
+ useKeyTab=true
+ keyTab="/etc/nimbus_client.keytab"
+ storeKey=true
+ useTicketCache=false
+ serviceName="storm_server";
+};
+
View
7 conf/jaas_kerberos_launcher.conf
@@ -0,0 +1,7 @@
+StormClient {
+ com.sun.security.auth.module.Krb5LoginModule required
+ doNotPrompt=true
+ useTicketCache=true
+ serviceName="storm_server";
+};
+
View
22 logback/cluster.xml
@@ -16,9 +16,31 @@
<encoder>
<pattern>%d{yyyy-MM-dd HH:mm:ss} %c{1} [%p] %m%n</pattern>
</encoder>
+ </appender>
+
+ <appender name="ACCESS" class="ch.qos.logback.core.rolling.RollingFileAppender">
+ <file>${storm.home}/logs/access.log</file>
+ <rollingPolicy class="ch.qos.logback.core.rolling.FixedWindowRollingPolicy">
+ <fileNamePattern>${storm.home}/logs/${logfile.name}.%i</fileNamePattern>
+ <minIndex>1</minIndex>
+ <maxIndex>9</maxIndex>
+ </rollingPolicy>
+
+ <triggeringPolicy class="ch.qos.logback.core.rolling.SizeBasedTriggeringPolicy">
+ <maxFileSize>100MB</maxFileSize>
+ </triggeringPolicy>
+
+ <encoder>
+ <pattern>%d{yyyy-MM-dd HH:mm:ss} %c{1} [%p] %m%n</pattern>
+ </encoder>
</appender>
<root level="INFO">
<appender-ref ref="A1"/>
</root>
+
+ <logger name="backtype.storm.security.auth.NoopAuthorizer" additivity="false">
+ <level value="INFO" />
+ <appender-ref ref="ACCESS" />
+ </logger>
</configuration>
View
4 project.clj
@@ -8,8 +8,8 @@
[storm/libthrift7 "0.7.0"
:exclusions [org.slf4j/slf4j-api]]
[clj-time "0.4.1"]
- [com.netflix.curator/curator-framework "1.0.1"
- :exclusions [log4j/log4j]]
+ [com.netflix.curator/curator-framework "1.2.6"
+ :exclusions [log4j/log4j org.slf4j/slf4j-log4j12]]
[backtype/jzmq "2.1.0"]
[com.googlecode.json-simple/json-simple "1.1"]
[compojure "1.1.3"]
View
4 src/clj/backtype/storm/zookeeper.clj
@@ -6,7 +6,7 @@
ZooDefs ZooDefs$Ids CreateMode WatchedEvent Watcher$Event Watcher$Event$KeeperState
Watcher$Event$EventType KeeperException$NodeExistsException])
(:import [org.apache.zookeeper.data Stat])
- (:import [org.apache.zookeeper.server ZooKeeperServer NIOServerCnxn$Factory])
+ (:import [org.apache.zookeeper.server ZooKeeperServer NIOServerCnxnFactory])
(:import [java.net InetSocketAddress BindException])
(:import [java.io File])
(:import [backtype.storm.utils Utils ZookeeperAuthInfo])
@@ -132,7 +132,7 @@
(let [localfile (File. localdir)
zk (ZooKeeperServer. localfile localfile 2000)
[retport factory] (loop [retport (if port port 2000)]
- (if-let [factory-tmp (try-cause (NIOServerCnxn$Factory. (InetSocketAddress. retport))
+ (if-let [factory-tmp (try-cause (doto (NIOServerCnxnFactory.) (.configure (InetSocketAddress. retport) 100))
(catch BindException e
(when (> (inc retport) (if port port 65535))
(throw (RuntimeException. "No port is available to launch an inprocess zookeeper.")))))]
View
116 src/jvm/backtype/storm/security/auth/AnonymousAuthenticationProvider.java
@@ -0,0 +1,116 @@
+package backtype.storm.security.auth;
+
+import java.io.IOException;
+import java.util.Map;
+
+import javax.security.auth.callback.CallbackHandler;
+import javax.security.sasl.SaslClient;
+import javax.security.sasl.SaslClientFactory;
+import javax.security.sasl.SaslServerFactory;
+import javax.security.sasl.SaslException;
+import javax.security.sasl.SaslServer;
+
+import com.google.common.annotations.VisibleForTesting;
+
+public class AnonymousAuthenticationProvider extends java.security.Provider {
+ public AnonymousAuthenticationProvider() {
+ super("ThriftSaslAnonymous", 1.0, "Thrift Anonymous SASL provider");
+ put("SaslClientFactory.ANONYMOUS", SaslAnonymousFactory.class.getName());
+ put("SaslServerFactory.ANONYMOUS", SaslAnonymousFactory.class.getName());
+ }
+
+ public static class SaslAnonymousFactory implements SaslClientFactory, SaslServerFactory {
+
+ @Override
+ public SaslClient createSaslClient(
+ String[] mechanisms, String authorizationId, String protocol,
+ String serverName, Map<String,?> props, CallbackHandler cbh)
+ {
+ for (String mech : mechanisms) {
+ if ("ANONYMOUS".equals(mech)) {
+ return new AnonymousClient(authorizationId);
+ }
+ }
+ return null;
+ }
+
+ @Override
+ public SaslServer createSaslServer(
+ String mechanism, String protocol, String serverName, Map<String,?> props, CallbackHandler cbh)
+ {
+ if ("ANONYMOUS".equals(mechanism)) {
+ return new AnonymousServer();
+ }
+ return null;
+ }
+ public String[] getMechanismNames(Map<String, ?> props) {
+ return new String[] { "ANONYMOUS" };
+ }
+ }
+}
+
+
+class AnonymousClient implements SaslClient {
+ @VisibleForTesting
+ final String username;
+ private boolean hasProvidedInitialResponse;
+
+ public AnonymousClient(String username) {
+ if (username == null) {
+ this.username = "anonymous";
+ } else {
+ this.username = username;
+ }
+ }
+
+ public String getMechanismName() { return "ANONYMOUS"; }
+ public boolean hasInitialResponse() { return true; }
+ public byte[] evaluateChallenge(byte[] challenge) throws SaslException {
+ if (hasProvidedInitialResponse) {
+ throw new SaslException("Already complete!");
+ }
+
+ try {
+ hasProvidedInitialResponse = true;
+ return username.getBytes("UTF-8");
+ } catch (IOException e) {
+ throw new SaslException(e.toString());
+ }
+ }
+ public boolean isComplete() { return hasProvidedInitialResponse; }
+ public byte[] unwrap(byte[] incoming, int offset, int len) {
+ throw new UnsupportedOperationException();
+ }
+ public byte[] wrap(byte[] outgoing, int offset, int len) {
+ throw new UnsupportedOperationException();
+ }
+ public Object getNegotiatedProperty(String propName) { return null; }
+ public void dispose() {}
+}
+
+class AnonymousServer implements SaslServer {
+ private String user;
+ public String getMechanismName() { return "ANONYMOUS"; }
+ public byte[] evaluateResponse(byte[] response) throws SaslException {
+ try {
+ this.user = new String(response, "UTF-8");
+ } catch (IOException e) {
+ throw new SaslException(e.toString());
+ }
+ return null;
+ }
+ public boolean isComplete() { return user != null; }
+ public String getAuthorizationID() { return user; }
+ public byte[] unwrap(byte[] incoming, int offset, int len) {
+ throw new UnsupportedOperationException();
+ }
+ public byte[] wrap(byte[] outgoing, int offset, int len) {
+ throw new UnsupportedOperationException();
+ }
+ public Object getNegotiatedProperty(String propName) { return null; }
+ public void dispose() {}
+
+}
+
+
+
View
39 src/jvm/backtype/storm/security/auth/AuthUtils.java
@@ -0,0 +1,39 @@
+package backtype.storm.security.auth;
+
+import javax.security.auth.login.Configuration;
+import javax.security.auth.login.AppConfigurationEntry;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+import java.io.IOException;
+
+public class AuthUtils {
+ public static String LoginContextServer = "StormServer";
+ public static String LoginContextClient = "StormClient";
+
+ static public final String DIGEST = "DIGEST-MD5";
+ static public final String ANONYMOUS = "ANONYMOUS";
+ static public final String KERBEROS = "GSSAPI";
+ static public final String SERVICE = "storm_thrift_server";
+
+ private static final Logger LOG = LoggerFactory.getLogger(AuthUtils.class);
+
+ public static String get(Configuration configuration, String section, String key) throws IOException {
+ AppConfigurationEntry configurationEntries[] = configuration.getAppConfigurationEntry(section);
+ if (configurationEntries == null) {
+ String errorMessage = "Could not find a '"+ section + "' entry in this configuration.";
+ LOG.error(errorMessage);
+ throw new IOException(errorMessage);
+ }
+
+ for(AppConfigurationEntry entry: configurationEntries) {
+ Object val = entry.getOptions().get(key);
+ if (val != null)
+ return (String)val;
+ }
+ return null;
+ }
+}
+
View
35 src/jvm/backtype/storm/security/auth/DenyAuthorizer.java
@@ -0,0 +1,35 @@
+package backtype.storm.security.auth;
+
+import backtype.storm.Config;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * An authorization implementation that denies everything, for testing purposes
+ */
+public class DenyAuthorizer implements IAuthorization {
+ private static final Logger LOG =
+ LoggerFactory.getLogger(DenyAuthorizer.class);
+
+ /**
+ * permit() method is invoked for each incoming Thrift request
+ * @param contrext request context includes info about
+ * (1) remote address/subject,
+ * (2) operation
+ * (3) configuration of targeted topology
+ * @return true if the request is authorized, false if reject
+ */
+ public boolean permit(ReqContext context) {
+ LOG.info("Access "
+ + " from: " +
+ (context.remoteAddress() == null
+ ? "null" : context.remoteAddress().toString())
+ + " principal:"+ (context.principal() == null
+ ? "null" : context.principal())
+ +" op:"+context.operation()
+ + " topoology:"+context.topologyConf().get(Config.TOPOLOGY_NAME)
+ );
+ return false;
+ }
+}
View
26 src/jvm/backtype/storm/security/auth/IAuthorization.java
@@ -0,0 +1,26 @@
+package backtype.storm.security.auth;
+
+/**
+ * Nimbus could be configured with an authorization plugin.
+ * If not specified, all requests are authorized.
+ *
+ * You could specify the authorization plugin via storm parameter. For example:
+ * storm -c nimbus.authorization.classname=backtype.storm.security.auth.DefaultAuthorizer ...
+ *
+ * You could also specify it via storm.yaml:
+ * nimbus.authorization.classname: backtype.storm.security.auth.DefaultAuthorizer
+ *
+ * @author afeng
+ *
+ */
+public interface IAuthorization {
+ /**
+ * permit() method is invoked for each incoming Thrift request.
+ * @param contrext request context includes info about
+ * (1) remote address/subject,
+ * (2) operation
+ * (3) configuration of targeted topology
+ * @return true if the request is authorized, false if reject
+ */
+ public boolean permit(ReqContext context);
+}
View
32 src/jvm/backtype/storm/security/auth/NoopAuthorizer.java
@@ -0,0 +1,32 @@
+package backtype.storm.security.auth;
+
+import backtype.storm.Config;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A no-op authorization implementation that illustrate info available for authorization decisions.
+ */
+public class NoopAuthorizer implements IAuthorization {
+ private static final Logger LOG = LoggerFactory.getLogger(NoopAuthorizer.class);
+
+ /**
+ * permit() method is invoked for each incoming Thrift request
+ * @param contrext request context includes info about
+ * (1) remote address/subject,
+ * (2) operation
+ * (3) configuration of targeted topology
+ * @return true if the request is authorized, false if reject
+ */
+ public boolean permit(ReqContext context) {
+ LOG.info("Access "
+ + " from: " + context.remoteAddress() == null
+ ? "null" : context.remoteAddress().toString()
+ + " principal:"+context.principal() == null
+ ? "null" : context.principal()
+ +" op:"+context.operation()
+ + " topoology:"+context.topologyConf().get(Config.TOPOLOGY_NAME));
+ return true;
+ }
+}
View
114 src/jvm/backtype/storm/security/auth/ReqContext.java
@@ -0,0 +1,114 @@
+package backtype.storm.security.auth;
+
+import com.google.common.annotations.VisibleForTesting;
+import java.security.AccessControlContext;
+import java.security.AccessController;
+import java.security.Principal;
+import javax.security.auth.Subject;
+
+import backtype.storm.scheduler.TopologyDetails;
+
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.net.InetAddress;
+
+/**
+ * context request context includes info about
+ * (1) remote address/subject,
+ * (2) operation
+ * (3) configuration of targeted topology
+ */
+public class ReqContext {
+ private static final AtomicInteger uniqueId = new AtomicInteger(0);
+
+ public enum OperationType { SUBMIT_TOPOLOGY, KILL_TOPOLOGY, REBALANCE_TOPOLOGY, ACTIVATE_TOPOLOGY, DEACTIVATE_TOPOLOGY };
+
+ private Subject _subject;
+ private InetAddress _remoteAddr;
+ private Integer _reqID;
+ private Map _storm_conf;
+ private OperationType _operation;
+
+ /**
+ * Get a request context associated with current thread
+ * @return
+ */
+ public static ReqContext context() {
+ return ctxt.get();
+ }
+
+ //each thread will have its own request context
+ private static final ThreadLocal < ReqContext > ctxt =
+ new ThreadLocal < ReqContext > () {
+ @Override protected ReqContext initialValue() {
+ return new ReqContext(AccessController.getContext());
+ }
+ };
+
+ //private constructor
+ @VisibleForTesting
+ ReqContext(AccessControlContext acl_ctxt) {
+ _subject = Subject.getSubject(acl_ctxt);
+ _reqID = uniqueId.incrementAndGet();
+ }
+
+ /**
+ * client address
+ */
+ public void setRemoteAddress(InetAddress addr) {
+ _remoteAddr = addr;
+ }
+
+ public InetAddress remoteAddress() {
+ return _remoteAddr;
+ }
+
+ /**
+ * Set remote subject explicitly
+ */
+ public void setSubject(Subject subject) {
+ _subject = subject;
+ }
+
+ /**
+ * Client subject associated with this request context
+ * @return
+ */
+ public Subject subject() {
+ return _subject;
+ }
+
+ /**
+ * The primary principal associated current subject
+ * @return
+ */
+ public Principal principal() {
+ if (_subject == null) return null;
+ Set<Principal> princs = _subject.getPrincipals();
+ if (princs.size()==0) return null;
+ return (Principal) (princs.toArray()[0]);
+ }
+
+ /**
+ * Topology that this request is against
+ */
+ public Map topologyConf() {
+ return _storm_conf;
+ }
+
+ public void setTopologyConf(Map conf) {
+ _storm_conf = conf;
+ }
+
+ /**
+ * Operation that this request is performing
+ */
+ public OperationType operation() {
+ return _operation;
+ }
+
+ public void setOperation(OperationType operation) {
+ _operation = operation;
+ }
+}
View
111 src/jvm/backtype/storm/security/auth/SaslClientCallbackHandler.java
@@ -0,0 +1,111 @@
+package backtype.storm.security.auth;
+
+import java.io.IOException;
+import java.util.Map;
+
+import javax.security.auth.callback.Callback;
+import javax.security.auth.callback.CallbackHandler;
+import javax.security.auth.callback.NameCallback;
+import javax.security.auth.callback.PasswordCallback;
+import javax.security.auth.callback.UnsupportedCallbackException;
+import javax.security.sasl.AuthorizeCallback;
+import javax.security.sasl.RealmCallback;
+import javax.security.auth.login.AppConfigurationEntry;
+import javax.security.auth.login.Configuration;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * SASL client side callback handler.
+ * @author afeng
+ *
+ */
+public class SaslClientCallbackHandler implements CallbackHandler {
+ private static final String USERNAME = "username";
+ private static final String PASSWORD = "password";
+ private static final Logger LOG = LoggerFactory.getLogger(SaslClientCallbackHandler.class);
+ private String _username = null;
+ private String _password = null;
+
+ /**
+ * Constructor based on a JAAS configuration
+ *
+ * For digest, you should have a pair of user name and password defined in this figgure.
+ *
+ * @param configuration
+ * @throws IOException
+ */
+ public SaslClientCallbackHandler(Configuration configuration) throws IOException {
+ if (configuration == null) return;
+ AppConfigurationEntry configurationEntries[] = configuration.getAppConfigurationEntry(AuthUtils.LoginContextClient);
+ if (configurationEntries == null) {
+ String errorMessage = "Could not find a '"+AuthUtils.LoginContextClient
+ + "' entry in this configuration: Client cannot start.";
+ LOG.error(errorMessage);
+ throw new IOException(errorMessage);
+ }
+
+ for(AppConfigurationEntry entry: configurationEntries) {
+ if (entry.getOptions().get(USERNAME) != null) {
+ _username = (String)entry.getOptions().get(USERNAME);
+ }
+ if (entry.getOptions().get(PASSWORD) != null) {
+ _password = (String)entry.getOptions().get(PASSWORD);
+ }
+ }
+ }
+
+ /**
+ * This method is invoked by SASL for authentication challenges
+ * @param callbacks a collection of challenge callbacks
+ */
+ public void handle(Callback[] callbacks) throws IOException, UnsupportedCallbackException {
+ for (Callback c : callbacks) {
+ if (c instanceof NameCallback) {
+ LOG.debug("name callback");
+ NameCallback nc = (NameCallback) c;
+ nc.setName(_username);
+ } else if (c instanceof PasswordCallback) {
+ LOG.debug("pwd callback");
+ PasswordCallback pc = (PasswordCallback)c;
+ if (_password != null) {
+ pc.setPassword(_password.toCharArray());
+ } else {
+ LOG.warn("Could not login: the client is being asked for a password, but the " +
+ " client code does not currently support obtaining a password from the user." +
+ " Make sure that the client is configured to use a ticket cache (using" +
+ " the JAAS configuration setting 'useTicketCache=true)' and restart the client. If" +
+ " you still get this message after that, the TGT in the ticket cache has expired and must" +
+ " be manually refreshed. To do so, first determine if you are using a password or a" +
+ " keytab. If the former, run kinit in a Unix shell in the environment of the user who" +
+ " is running this client using the command" +
+ " 'kinit <princ>' (where <princ> is the name of the client's Kerberos principal)." +
+ " If the latter, do" +
+ " 'kinit -k -t <keytab> <princ>' (where <princ> is the name of the Kerberos principal, and" +
+ " <keytab> is the location of the keytab file). After manually refreshing your cache," +
+ " restart this client. If you continue to see this message after manually refreshing" +
+ " your cache, ensure that your KDC host's clock is in sync with this host's clock.");
+ }
+ } else if (c instanceof AuthorizeCallback) {
+ LOG.debug("authorization callback");
+ AuthorizeCallback ac = (AuthorizeCallback) c;
+ String authid = ac.getAuthenticationID();
+ String authzid = ac.getAuthorizationID();
+ if (authid.equals(authzid)) {
+ ac.setAuthorized(true);
+ } else {
+ ac.setAuthorized(false);
+ }
+ if (ac.isAuthorized()) {
+ ac.setAuthorizedID(authzid);
+ }
+ } else if (c instanceof RealmCallback) {
+ RealmCallback rc = (RealmCallback) c;
+ ((RealmCallback) c).setText(rc.getDefaultText());
+ } else {
+ throw new UnsupportedCallbackException(c);
+ }
+ }
+ }
+}
View
131 src/jvm/backtype/storm/security/auth/SaslServerCallbackHandler.java
@@ -0,0 +1,131 @@
+package backtype.storm.security.auth;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import javax.security.auth.callback.Callback;
+import javax.security.auth.callback.CallbackHandler;
+import javax.security.auth.callback.NameCallback;
+import javax.security.auth.callback.PasswordCallback;
+import javax.security.auth.callback.UnsupportedCallbackException;
+import javax.security.auth.login.AppConfigurationEntry;
+import javax.security.auth.login.Configuration;
+import javax.security.sasl.AuthorizeCallback;
+import javax.security.sasl.RealmCallback;
+
+import org.apache.zookeeper.server.auth.KerberosName;
+
+/**
+ * SASL server side collback handler
+ *
+ * @author afeng
+ *
+ */
+public class SaslServerCallbackHandler implements CallbackHandler {
+ private static final String USER_PREFIX = "user_";
+ private static final Logger LOG = LoggerFactory.getLogger(SaslServerCallbackHandler.class);
+ private static final String SYSPROP_SUPER_PASSWORD = "storm.SASLAuthenticationProvider.superPassword";
+ private static final String SYSPROP_REMOVE_HOST = "storm.kerberos.removeHostFromPrincipal";
+ private static final String SYSPROP_REMOVE_REALM = "storm.kerberos.removeRealmFromPrincipal";
+
+ private String userName;
+ private final Map<String,String> credentials = new HashMap<String,String>();
+
+ public SaslServerCallbackHandler(Configuration configuration) throws IOException {
+ if (configuration==null) return;
+
+ AppConfigurationEntry configurationEntries[] = configuration.getAppConfigurationEntry(AuthUtils.LoginContextServer);
+ if (configurationEntries == null) {
+ String errorMessage = "Could not find a '"+AuthUtils.LoginContextServer+"' entry in this configuration: Server cannot start.";
+ LOG.error(errorMessage);
+ throw new IOException(errorMessage);
+ }
+ credentials.clear();
+ for(AppConfigurationEntry entry: configurationEntries) {
+ Map<String,?> options = entry.getOptions();
+ // Populate DIGEST-MD5 user -> password map with JAAS configuration entries from the "Server" section.
+ // Usernames are distinguished from other options by prefixing the username with a "user_" prefix.
+ for(Map.Entry<String, ?> pair : options.entrySet()) {
+ String key = pair.getKey();
+ if (key.startsWith(USER_PREFIX)) {
+ String userName = key.substring(USER_PREFIX.length());
+ credentials.put(userName,(String)pair.getValue());
+ }
+ }
+ }
+ }
+
+ public void handle(Callback[] callbacks) throws UnsupportedCallbackException {
+ for (Callback callback : callbacks) {
+ if (callback instanceof NameCallback) {
+ handleNameCallback((NameCallback) callback);
+ } else if (callback instanceof PasswordCallback) {
+ handlePasswordCallback((PasswordCallback) callback);
+ } else if (callback instanceof RealmCallback) {
+ handleRealmCallback((RealmCallback) callback);
+ } else if (callback instanceof AuthorizeCallback) {
+ handleAuthorizeCallback((AuthorizeCallback) callback);
+ }
+ }
+ }
+
+ private void handleNameCallback(NameCallback nc) {
+ userName = nc.getDefaultName();
+ nc.setName(nc.getDefaultName());
+ }
+
+ private void handlePasswordCallback(PasswordCallback pc) {
+ if ("super".equals(this.userName) && System.getProperty(SYSPROP_SUPER_PASSWORD) != null) {
+ // superuser: use Java system property for password, if available.
+ pc.setPassword(System.getProperty(SYSPROP_SUPER_PASSWORD).toCharArray());
+ } else if (credentials.containsKey(userName) ) {
+ pc.setPassword(credentials.get(userName).toCharArray());
+ } else {
+ LOG.warn("No password found for user: " + userName);
+ }
+ }
+
+ private void handleRealmCallback(RealmCallback rc) {
+ LOG.debug("handleRealmCallback: "+ rc.getDefaultText());
+ rc.setText(rc.getDefaultText());
+ }
+
+ private void handleAuthorizeCallback(AuthorizeCallback ac) {
+ String authenticationID = ac.getAuthenticationID();
+ LOG.debug("Successfully authenticated client: authenticationID=" + authenticationID);
+ ac.setAuthorized(true);
+
+ // canonicalize authorization id according to system properties:
+ // storm.kerberos.removeRealmFromPrincipal(={true,false})
+ // storm.kerberos.removeHostFromPrincipal(={true,false})
+ KerberosName kerberosName = new KerberosName(authenticationID);
+ try {
+ StringBuilder userNameBuilder = new StringBuilder(kerberosName.getShortName());
+ if (shouldAppendHost(kerberosName)) {
+ userNameBuilder.append("/").append(kerberosName.getHostName());
+ }
+ if (shouldAppendRealm(kerberosName)) {
+ userNameBuilder.append("@").append(kerberosName.getRealm());
+ }
+ LOG.debug("Setting authorizedID: " + userNameBuilder);
+ ac.setAuthorizedID(userNameBuilder.toString());
+ } catch (IOException e) {
+ LOG.error("Failed to set name based on Kerberos authentication rules.");
+ }
+ }
+
+ private boolean shouldAppendRealm(KerberosName kerberosName) {
+ return !isSystemPropertyTrue(SYSPROP_REMOVE_REALM) && kerberosName.getRealm() != null;
+ }
+
+ private boolean shouldAppendHost(KerberosName kerberosName) {
+ return !isSystemPropertyTrue(SYSPROP_REMOVE_HOST) && kerberosName.getHostName() != null;
+ }
+
+ private boolean isSystemPropertyTrue(String propertyName) {
+ return "true".equals(System.getProperty(propertyName));
+ }
+}
View
160 src/jvm/backtype/storm/security/auth/ThriftClient.java
@@ -0,0 +1,160 @@
+package backtype.storm.security.auth;
+
+import backtype.storm.utils.Utils;
+
+import java.security.Principal;
+import java.security.PrivilegedActionException;
+import java.security.PrivilegedExceptionAction;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+
+import javax.security.auth.Subject;
+import javax.security.auth.kerberos.KerberosTicket;
+import javax.security.auth.login.Configuration;
+import javax.security.sasl.Sasl;
+
+import org.apache.thrift7.protocol.TBinaryProtocol;
+import org.apache.thrift7.protocol.TProtocol;
+import org.apache.thrift7.transport.TSocket;
+import org.apache.thrift7.transport.TTransport;
+import org.apache.thrift7.transport.TSaslClientTransport;
+import org.apache.zookeeper.Login;
+import org.apache.zookeeper.server.auth.KerberosName;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class ThriftClient {
+ private static final Logger LOG = LoggerFactory.getLogger(ThriftClient.class);
+ private TTransport _transport;
+ protected TProtocol _protocol;
+
+ static {
+ java.security.Security.addProvider(new AnonymousAuthenticationProvider());
+ }
+
+ public ThriftClient(String host, int port, String default_service_name) {
+ this(host, port, default_service_name, null);
+ }
+
+ public ThriftClient(String host, int port, String default_service_name, Integer timeout) {
+ try {
+ if(host==null) {
+ throw new IllegalArgumentException("host is not set");
+ }
+ if(port<=0) {
+ throw new IllegalArgumentException("invalid port: "+port);
+ }
+
+ TSocket socket = new TSocket(host, port);
+ if(timeout!=null) {
+ socket.setTimeout(timeout);
+ }
+ final TTransport underlyingTransport = socket;
+
+ String loginConfigurationFile = System.getProperty("java.security.auth.login.config");
+ if ((loginConfigurationFile==null) || (loginConfigurationFile.length()==0)) {
+ //apply Storm configuration for JAAS login
+ Map conf = Utils.readStormConfig();
+ loginConfigurationFile = (String)conf.get("java.security.auth.login.config");
+ }
+ if ((loginConfigurationFile==null) || (loginConfigurationFile.length()==0)) { //ANONYMOUS
+ LOG.debug("SASL ANONYMOUS client transport is being established");
+ _transport = new TSaslClientTransport(AuthUtils.ANONYMOUS,
+ null,
+ AuthUtils.SERVICE,
+ host,
+ null,
+ null,
+ underlyingTransport);
+ _transport.open();
+ } else {
+ LOG.debug("Use jaas login config:"+loginConfigurationFile);
+ System.setProperty("java.security.auth.login.config", loginConfigurationFile);
+ Configuration auth_conf = Configuration.getConfiguration();
+
+ //login our user
+ SaslClientCallbackHandler callback_handler = new SaslClientCallbackHandler(auth_conf);
+ Login login = new Login(AuthUtils.LoginContextClient, callback_handler);
+
+ final Subject subject = login.getSubject();
+ if (subject.getPrivateCredentials(KerberosTicket.class).isEmpty()) { //DIGEST-MD5
+ LOG.debug("SASL DIGEST-MD5 client transport is being established");
+ _transport = new TSaslClientTransport(AuthUtils.DIGEST,
+ null,
+ AuthUtils.SERVICE,
+ host,
+ null,
+ callback_handler,
+ underlyingTransport);
+ _transport.open();
+ } else { //GSSAPI
+ final String principal = getPrincipal(subject);
+ String serviceName = AuthUtils.get(auth_conf, AuthUtils.LoginContextClient, "serviceName");
+ if (serviceName == null) {
+ serviceName = default_service_name;
+ }
+ Map<String, String> props = new TreeMap<String,String>();
+ props.put(Sasl.QOP, "auth");
+ props.put(Sasl.SERVER_AUTH, "false");
+ LOG.debug("SASL GSSAPI client transport is being established");
+ _transport = new TSaslClientTransport(AuthUtils.KERBEROS,
+ principal,
+ serviceName,
+ host,
+ props,
+ null,
+ underlyingTransport);
+
+ //open Sasl transport with the login credential
+ try {
+ Subject.doAs(subject,
+ new PrivilegedExceptionAction<Void>() {
+ public Void run() {
+ try {
+ LOG.debug("do as:"+ principal);
+ _transport.open();
+ }
+ catch (Exception e) {
+ LOG.error("Nimbus client failed to open SaslClientTransport to interact with a server during session initiation: " + e);
+ e.printStackTrace();
+ }
+ return null;
+ }
+ });
+ } catch (PrivilegedActionException e) {
+ LOG.error("Nimbus client experienced a PrivilegedActionException exception while creating a TSaslClientTransport using a JAAS principal context:" + e);
+ e.printStackTrace();
+ }
+ }
+
+ }
+ } catch (Exception e) {
+ LOG.error(e.getMessage());
+ throw new RuntimeException(e);
+ }
+
+ _protocol = null;
+ if (_transport != null)
+ _protocol = new TBinaryProtocol(_transport);
+ }
+
+ private String getPrincipal(Subject subject) {
+ Set<Principal> principals = (Set<Principal>)subject.getPrincipals();
+ if (principals==null || principals.size()<1) {
+ LOG.info("No principal found in login subject");
+ return null;
+ }
+ return ((Principal)(principals.toArray()[0])).getName();
+ }
+
+ public TTransport transport() {
+ return _transport;
+ }
+
+ public void close() {
+ _transport.close();
+ }
+}
View
260 src/jvm/backtype/storm/security/auth/ThriftServer.java
@@ -0,0 +1,260 @@
+package backtype.storm.security.auth;
+
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+
+import javax.security.sasl.Sasl;
+import javax.security.sasl.SaslServer;
+import javax.security.auth.callback.CallbackHandler;
+import javax.security.auth.kerberos.KerberosTicket;
+import javax.security.auth.login.Configuration;
+import javax.security.auth.Subject;
+
+import java.io.IOException;
+import java.net.Socket;
+import java.security.Principal;
+import java.security.PrivilegedExceptionAction;
+import java.security.PrivilegedActionException;
+import org.apache.zookeeper.Login;
+import org.apache.zookeeper.server.auth.KerberosName;
+
+import org.apache.thrift7.TException;
+import org.apache.thrift7.TProcessor;
+import org.apache.thrift7.server.TServer;
+import org.apache.thrift7.server.TThreadPoolServer;
+import org.apache.thrift7.protocol.TBinaryProtocol;
+import org.apache.thrift7.protocol.TProtocol;
+import org.apache.thrift7.transport.TSaslServerTransport;
+import org.apache.thrift7.transport.TServerSocket;
+import org.apache.thrift7.transport.TServerTransport;
+import org.apache.thrift7.transport.TSocket;
+import org.apache.thrift7.transport.TTransport;
+import org.apache.thrift7.transport.TTransportFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+import backtype.storm.security.auth.*;
+import backtype.storm.utils.Utils;
+
+public class ThriftServer {
+ static {
+ java.security.Security.addProvider(new AnonymousAuthenticationProvider());
+ }
+
+ private TProcessor _processor = null;
+ private int _port = 0;
+ private TServer _server;
+ private static final Logger LOG = LoggerFactory.getLogger(ThriftServer.class);
+ private String _loginConfigurationFile;
+
+ public ThriftServer(TProcessor processor, int port) {
+ try {
+ _processor = processor;
+ _port = port;
+
+ _loginConfigurationFile = System.getProperty("java.security.auth.login.config");
+ if ((_loginConfigurationFile==null) || (_loginConfigurationFile.length()==0)) {
+ //apply Storm configuration for JAAS login
+ Map conf = Utils.readStormConfig();
+ _loginConfigurationFile = (String)conf.get("java.security.auth.login.config");
+ if ((_loginConfigurationFile!=null) && (_loginConfigurationFile.length()>0)) {
+ System.setProperty("java.security.auth.login.config", _loginConfigurationFile);
+ }
+ }
+ } catch (Exception x) {
+ x.printStackTrace();
+ }
+ }
+
+ public void stop() {
+ if (_server != null)
+ _server.stop();
+ }
+
+ public void serve() {
+ TServerTransport serverTransport = null;
+
+ try {
+ TSaslServerTransport.Factory factory = new TSaslServerTransport.Factory();
+ serverTransport = new TServerSocket(_port);
+
+ if ((_loginConfigurationFile==null) || (_loginConfigurationFile.length()==0)) { //ANONYMOUS
+ factory.addServerDefinition(AuthUtils.ANONYMOUS, AuthUtils.SERVICE, "localhost", null, null);
+
+ LOG.info("Starting SASL ANONYMOUS server at port:" + _port);
+ _server = new TThreadPoolServer(new TThreadPoolServer.Args(serverTransport).
+ processor(new SaslProcessor(_processor)).
+ transportFactory(factory).
+ minWorkerThreads(64).
+ maxWorkerThreads(64).
+ protocolFactory(new TBinaryProtocol.Factory()));
+ } else {
+ //retrieve authentication configuration from java.security.auth.login.config
+ Configuration auth_conf = Configuration.getConfiguration();
+
+ //login our user
+ CallbackHandler auth_callback_handler = new SaslServerCallbackHandler(auth_conf);
+ Login login = new Login(AuthUtils.LoginContextServer, auth_callback_handler);
+ Subject subject = login.getSubject();
+
+ if (!subject.getPrivateCredentials(KerberosTicket.class).isEmpty()) { //KERBEROS
+ String principal = AuthUtils.get(auth_conf, AuthUtils.LoginContextServer, "principal");
+ LOG.debug("principal:"+principal);
+ KerberosName serviceKerberosName = new KerberosName(principal);
+ String serviceName = serviceKerberosName.getServiceName();
+ String hostName = serviceKerberosName.getHostName();
+ Map<String, String> props = new TreeMap<String,String>();
+ props.put(Sasl.QOP, "auth");
+ props.put(Sasl.SERVER_AUTH, "false");
+ factory.addServerDefinition(AuthUtils.KERBEROS, serviceName, hostName, props, auth_callback_handler);
+ LOG.info("Starting KERBEROS server at port:" + _port);
+ //create a wrap transport factory so that we could apply user credential during connections
+ TUGIAssumingTransportFactory wrapFactory = new TUGIAssumingTransportFactory(factory, subject);
+ _server = new TThreadPoolServer(new TThreadPoolServer.Args(serverTransport).
+ processor(new SaslProcessor(_processor)).
+ minWorkerThreads(64).
+ maxWorkerThreads(64).
+ transportFactory(wrapFactory).
+ protocolFactory(new TBinaryProtocol.Factory()));
+ } else { //DIGEST
+ factory.addServerDefinition(AuthUtils.DIGEST, AuthUtils.SERVICE, "localhost", null, auth_callback_handler);
+ LOG.info("Starting DIGEST server at port:" + _port);
+ _server = new TThreadPoolServer(new TThreadPoolServer.Args(serverTransport).
+ processor(new SaslProcessor(_processor)).
+ minWorkerThreads(64).
+ maxWorkerThreads(64).
+ transportFactory(factory).
+ protocolFactory(new TBinaryProtocol.Factory()));
+ }
+
+ }
+
+ _server.serve();
+ } catch (Exception ex) {
+ LOG.error("ThriftServer is being stopped due to: " + ex, ex);
+ if (_server != null) _server.stop();
+ System.exit(1); //shutdown server process since we could not handle Thrift requests any more
+ }
+ }
+
+ /**
+ * Processor that pulls the SaslServer object out of the transport, and
+ * assumes the remote user's UGI before calling through to the original
+ * processor.
+ *
+ * This is used on the server side to set the UGI for each specific call.
+ */
+ private class SaslProcessor implements TProcessor {
+ final TProcessor wrapped;
+
+ SaslProcessor(TProcessor wrapped) {
+ this.wrapped = wrapped;
+ }
+
+ public boolean process(final TProtocol inProt, final TProtocol outProt) throws TException {
+ TTransport trans = inProt.getTransport();
+ if (!(trans instanceof TSaslServerTransport)) {
+ throw new TException("Unexpected non-SASL transport " + trans.getClass());
+ }
+ TSaslServerTransport saslTrans = (TSaslServerTransport)trans;
+
+ //populating request context
+ ReqContext req_context = ReqContext.context();
+
+ //remote address
+ TSocket tsocket = (TSocket)saslTrans.getUnderlyingTransport();
+ Socket socket = tsocket.getSocket();
+ req_context.setRemoteAddress(socket.getInetAddress());
+
+ //remote subject
+ SaslServer saslServer = saslTrans.getSaslServer();
+ String authId = saslServer.getAuthorizationID();
+ LOG.debug("AUTH ID ======>" + authId);
+ Subject remoteUser = new Subject();
+ remoteUser.getPrincipals().add(new User(authId));
+ req_context.setSubject(remoteUser);
+
+ //invoke application logic
+ return wrapped.process(inProt, outProt);
+ }
+ }
+
+ static class User implements Principal {
+ private final String name;
+
+ public User(String name) {
+ this.name = name;
+ }
+
+ /**
+ * Get the full name of the user.
+ */
+ public String getName() {
+ return name;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ } else if (o == null || getClass() != o.getClass()) {
+ return false;
+ } else {
+ return (name.equals(((User) o).name));
+ }
+ }
+
+ @Override
+ public int hashCode() {
+ return name.hashCode();
+ }
+
+ @Override
+ public String toString() {
+ return name;
+ }
+ }
+
+ /** A TransportFactory that wraps another one, but assumes a specified UGI
+ * before calling through.
+ *
+ * This is used on the server side to assume the server's Principal when accepting
+ * clients.
+ */
+ static class TUGIAssumingTransportFactory extends TTransportFactory {
+ private final Subject subject;
+ private final TTransportFactory wrapped;
+
+ public TUGIAssumingTransportFactory(TTransportFactory wrapped, Subject subject) {
+ this.wrapped = wrapped;
+ this.subject = subject;
+
+ Set<Principal> principals = (Set<Principal>)subject.getPrincipals();
+ if (principals.size()>0)
+ LOG.info("Service principal:"+ ((Principal)(principals.toArray()[0])).getName());
+ }
+
+ @Override
+ public TTransport getTransport(final TTransport trans) {
+ try {
+ return Subject.doAs(subject,
+ new PrivilegedExceptionAction<TTransport>() {
+ public TTransport run() {
+ try {
+ return wrapped.getTransport(trans);
+ }
+ catch (Exception e) {
+ LOG.error("Storm server failed to open transport to interact with a client during session initiation: " + e, e);
+ return null;
+ }
+ }
+ });
+ } catch (PrivilegedActionException e) {
+ LOG.error("Storm server experienced a PrivilegedActionException exception while creating a transport using a JAAS principal context:" + e, e);
+ return null;
+ }
+ }
+ }
+}
View
22 src/jvm/backtype/storm/utils/Utils.java
@@ -293,20 +293,16 @@ public static CuratorFramework newCurator(Map conf, List<String> servers, Object
serverPorts.add(zkServer + ":" + Utils.getInt(port));
}
String zkStr = StringUtils.join(serverPorts, ",") + root;
- try {
-
- CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder()
- .connectString(zkStr)
- .connectionTimeoutMs(Utils.getInt(conf.get(Config.STORM_ZOOKEEPER_CONNECTION_TIMEOUT)))
- .sessionTimeoutMs(Utils.getInt(conf.get(Config.STORM_ZOOKEEPER_SESSION_TIMEOUT)))
- .retryPolicy(new RetryNTimes(Utils.getInt(conf.get(Config.STORM_ZOOKEEPER_RETRY_TIMES)), Utils.getInt(conf.get(Config.STORM_ZOOKEEPER_RETRY_INTERVAL))));
- if(auth!=null && auth.scheme!=null) {
- builder = builder.authorization(auth.scheme, auth.payload);
- }
- return builder.build();
- } catch (IOException e) {
- throw new RuntimeException(e);
+
+ CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder()
+ .connectString(zkStr)
+ .connectionTimeoutMs(Utils.getInt(conf.get(Config.STORM_ZOOKEEPER_CONNECTION_TIMEOUT)))
+ .sessionTimeoutMs(Utils.getInt(conf.get(Config.STORM_ZOOKEEPER_SESSION_TIMEOUT)))
+ .retryPolicy(new RetryNTimes(Utils.getInt(conf.get(Config.STORM_ZOOKEEPER_RETRY_TIMES)), Utils.getInt(conf.get(Config.STORM_ZOOKEEPER_RETRY_INTERVAL))));
+ if(auth!=null && auth.scheme!=null) {
+ builder = builder.authorization(auth.scheme, auth.payload);
}
+ return builder.build();
}
public static CuratorFramework newCurator(Map conf, List<String> servers, Object port) {
Please sign in to comment.
Something went wrong with that request. Please try again.