Permalink
Browse files

authentication/authorization framework restructured

  • Loading branch information...
anfeng committed Feb 20, 2013
1 parent 2420876 commit f4f2cdd2d6ab6f2bcd32a4cc32839677fb442999
@@ -1,31 +0,0 @@
-/*
-This is a sample JAAS configuration for Storm servers to handle Kerberos authentication
-*/
-
-/*
- StormServer section should contains the info about server keytab file and server principal.
- In Storm, we have 2 thrift servers: Nimbus and DRPC. These servers could be assigned with
- different principals.
-*/
-StormServer {
- com.sun.security.auth.module.Krb5LoginModule required
- useKeyTab=true
- keyTab="/etc/storm_server.keytab"
- storeKey=true
- useTicketCache=false
- principal="storm_service/carcloth.corp.acme.com@STORM.CORP.ACME.COM";
-};
-
-/*
-StormClient section should contains the info about client keytab file and client principal.
-For example, Supervisors are clients of Nimbus, and we should assign keytab/principal for supervisors.
-*/
-StormClient {
- com.sun.security.auth.module.Krb5LoginModule required
- useKeyTab=true
- keyTab="/etc/storm_client.keytab"
- storeKey=true
- useTicketCache=false
- serviceName="storm_service";
-};
-
@@ -1,12 +0,0 @@
-/*
- This is a sample JAAS configuration for Storm topology launcher/submitter.
- Since launcher machines are typically accessible by many folks, we
- encourage you to leverage "kinit", instead of keytab.
-*/
-StormClient {
- com.sun.security.auth.module.Krb5LoginModule required
- doNotPrompt=true
- useTicketCache=true
- serviceName="storm_service";
-};
-
View
@@ -39,7 +39,7 @@
<appender-ref ref="A1"/>
</root>
- <logger name="backtype.storm.security.auth.NoopAuthorizer" additivity="false">
+ <logger name="backtype.storm.security.auth.authorizer" additivity="false">
<level value="INFO" />
<appender-ref ref="ACCESS" />
</logger>
View
@@ -8,8 +8,8 @@
[storm/libthrift7 "0.7.0"
:exclusions [org.slf4j/slf4j-api]]
[clj-time "0.4.1"]
- [com.netflix.curator/curator-framework "1.2.6"
- :exclusions [log4j/log4j org.slf4j/slf4j-log4j12]]
+ [com.netflix.curator/curator-framework "1.0.1"
+ :exclusions [log4j/log4j]]
[backtype/jzmq "2.1.0"]
[com.googlecode.json-simple/json-simple "1.1"]
[compojure "1.1.3"]
@@ -6,7 +6,7 @@
ZooDefs ZooDefs$Ids CreateMode WatchedEvent Watcher$Event Watcher$Event$KeeperState
Watcher$Event$EventType KeeperException$NodeExistsException])
(:import [org.apache.zookeeper.data Stat])
- (:import [org.apache.zookeeper.server ZooKeeperServer NIOServerCnxnFactory])
+ (:import [org.apache.zookeeper.server ZooKeeperServer NIOServerCnxn$Factory])
(:import [java.net InetSocketAddress BindException])
(:import [java.io File])
(:import [backtype.storm.utils Utils ZookeeperAuthInfo])
@@ -132,7 +132,7 @@
(let [localfile (File. localdir)
zk (ZooKeeperServer. localfile localfile 2000)
[retport factory] (loop [retport (if port port 2000)]
- (if-let [factory-tmp (try-cause (doto (NIOServerCnxnFactory.) (.configure (InetSocketAddress. retport) 100))
+ (if-let [factory-tmp (try-cause (NIOServerCnxn$Factory. (InetSocketAddress. retport))
(catch BindException e
(when (> (inc retport) (if port port 65535))
(throw (RuntimeException. "No port is available to launch an inprocess zookeeper.")))))]
@@ -67,8 +67,7 @@
/**
* The transport plug-in for Thrift client/server communication
*/
- public static String STORM_THRIFT_TRANSPORT_PLUGIN_CLASS = "storm.thrift.transport.class";
- public static String STORM_THRIFT_TRANSPORT_PLUGIN_JAR = "storm.thrift.transport.jar";
+ public static String STORM_THRIFT_TRANSPORT_PLUGIN = "storm.thrift.transport";
/**
* The serializer class for ListDelegate (tuple payload).
@@ -215,7 +214,7 @@
/**
* Class name for authorization plugin for Nimbus
*/
- public static String NIMBUS_AUTHORIZATION_CLASSNAME = "nimbus.authorization.classname";
+ public static String NIMBUS_AUTHORIZER = "nimbus.authorizer";
/**
* Storm UI binds to this port.
@@ -49,17 +49,8 @@ else if (loginConfigurationFile.length()==0)
public static ITransportPlugin GetTransportPlugin(Map storm_conf, Configuration login_conf) {
ITransportPlugin transportPlugin = null;
try {
- String transport_plugin_klassName = (String) storm_conf.get(Config.STORM_THRIFT_TRANSPORT_PLUGIN_CLASS);
- String transport_plugin_jar = (String) storm_conf.get(Config.STORM_THRIFT_TRANSPORT_PLUGIN_JAR);
- Class klass = null;
- if (transport_plugin_jar==null) klass = Class.forName(transport_plugin_klassName);
- else {
- URL url = new URL("jar:file:" + transport_plugin_jar + "!/");
- LOG.debug("Plugin URL:"+url);
- URL[] urls = new URL[] { url };
- ClassLoader loader = new URLClassLoader(urls);
- klass = loader.loadClass(transport_plugin_klassName);
- }
+ String transport_plugin_klassName = (String) storm_conf.get(Config.STORM_THRIFT_TRANSPORT_PLUGIN);
+ Class klass = Class.forName(transport_plugin_klassName);
transportPlugin = (ITransportPlugin)klass.getConstructor(Configuration.class).newInstance(login_conf);
} catch(Exception e) {
throw new RuntimeException(e);
@@ -5,12 +5,12 @@
* If not specified, all requests are authorized.
*
* You could specify the authorization plugin via storm parameter. For example:
- * storm -c nimbus.authorization.classname=backtype.storm.security.auth.DefaultAuthorizer ...
+ * storm -c nimbus.authorization.class=backtype.storm.security.auth.NoopAuthorizer ...
*
* You could also specify it via storm.yaml:
- * nimbus.authorization.classname: backtype.storm.security.auth.DefaultAuthorizer
+ * nimbus.authorization.class: backtype.storm.security.auth.NoopAuthorizer
*/
-public interface IAuthorization {
+public interface IAuthorizer {
/**
* permit() method is invoked for each incoming Thrift request.
* @param contrext request context includes info about
@@ -1,188 +0,0 @@
-package backtype.storm.security.auth;
-
-import java.io.IOException;
-import java.security.Principal;
-import java.security.PrivilegedActionException;
-import java.security.PrivilegedExceptionAction;
-import java.util.Map;
-import java.util.Set;
-import java.util.TreeMap;
-import javax.security.auth.Subject;
-import javax.security.auth.callback.CallbackHandler;
-import javax.security.auth.kerberos.KerberosTicket;
-import javax.security.auth.login.Configuration;
-import javax.security.auth.login.LoginException;
-import javax.security.sasl.Sasl;
-import org.apache.thrift7.transport.TSaslClientTransport;
-import org.apache.thrift7.transport.TSaslServerTransport;
-import org.apache.thrift7.transport.TTransport;
-import org.apache.thrift7.transport.TTransportException;
-import org.apache.thrift7.transport.TTransportFactory;
-import org.apache.zookeeper.Login;
-import org.apache.zookeeper.server.auth.KerberosName;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class KerberosSaslTransportPlugin extends SaslTransportPlugin {
- public static final String KERBEROS = "GSSAPI";
- private static final Logger LOG = LoggerFactory.getLogger(KerberosSaslTransportPlugin.class);
-
- public KerberosSaslTransportPlugin(Configuration login_conf) {
- super(login_conf);
- }
-
- public TTransportFactory getServerTransportFactory() throws IOException {
- //create an authentication callback handler
- CallbackHandler server_callback_handler = new SaslServerCallbackHandler(login_conf);
-
- //login our principal
- Subject subject = null;
- try {
- Login login = new Login(AuthUtils.LOGIN_CONTEXT_SERVER, server_callback_handler);
- subject = login.getSubject();
- } catch (LoginException ex) {
- LOG.error("Server failed to login in principal:" + ex, ex);
- throw new RuntimeException(ex);
- }
-
- //check the credential of our principal
- if (subject.getPrivateCredentials(KerberosTicket.class).isEmpty()) {
- RuntimeException ex = new RuntimeException("Fail to verify user principal with section \""
- +AuthUtils.LOGIN_CONTEXT_SERVER+"\" in login configuration file "+ login_conf);
- LOG.error(ex.getMessage(), ex);
- throw ex;
- }
-
- String principal = AuthUtils.get(login_conf, AuthUtils.LOGIN_CONTEXT_SERVER, "principal");
- LOG.debug("principal:"+principal);
- KerberosName serviceKerberosName = new KerberosName(principal);
- String serviceName = serviceKerberosName.getServiceName();
- String hostName = serviceKerberosName.getHostName();
- Map<String, String> props = new TreeMap<String,String>();
- props.put(Sasl.QOP, "auth");
- props.put(Sasl.SERVER_AUTH, "false");
-
- //create a transport factory that will invoke our auth callback for digest
- TSaslServerTransport.Factory factory = new TSaslServerTransport.Factory();
- factory.addServerDefinition(KERBEROS, serviceName, hostName, props, server_callback_handler);
-
- //create a wrap transport factory so that we could apply user credential during connections
- TUGIAssumingTransportFactory wrapFactory = new TUGIAssumingTransportFactory(factory, subject);
-
- LOG.info("SASL GSSAPI transport factory will be used");
- return wrapFactory;
- }
-
- public TTransport connect(TTransport transport, String serverHost) throws TTransportException, IOException {
- //create an authentication callback handler
- SaslClientCallbackHandler client_callback_handler = new SaslClientCallbackHandler(login_conf);
-
- //login our user
- Login login = null;
- try {
- login = new Login(AuthUtils.LOGIN_CONTEXT_CLIENT, client_callback_handler);
- } catch (LoginException ex) {
- LOG.error("Server failed to login in principal:" + ex, ex);
- throw new RuntimeException(ex);
- }
-
- final Subject subject = login.getSubject();
- if (subject.getPrivateCredentials(KerberosTicket.class).isEmpty()) { //error
- RuntimeException ex = new RuntimeException("Fail to verify user principal with section \""
- +AuthUtils.LOGIN_CONTEXT_CLIENT+"\" in login configuration file "+ login_conf);
- LOG.error(ex.getMessage(), ex);
- throw ex;
- }
-
- final String principal = getPrincipal(subject);
- String serviceName = AuthUtils.get(login_conf, AuthUtils.LOGIN_CONTEXT_CLIENT, "serviceName");
- if (serviceName == null) {
- serviceName = AuthUtils.SERVICE;
- }
- Map<String, String> props = new TreeMap<String,String>();
- props.put(Sasl.QOP, "auth");
- props.put(Sasl.SERVER_AUTH, "false");
-
- LOG.debug("SASL GSSAPI client transport is being established");
- final TTransport sasalTransport = new TSaslClientTransport(KERBEROS,
- principal,
- serviceName,
- serverHost,
- props,
- null,
- transport);
-
- //open Sasl transport with the login credential
- try {
- Subject.doAs(subject,
- new PrivilegedExceptionAction<Void>() {
- public Void run() {
- try {
- LOG.debug("do as:"+ principal);
- sasalTransport.open();
- }
- catch (Exception e) {
- LOG.error("Client failed to open SaslClientTransport to interact with a server during session initiation: " + e, e);
- e.printStackTrace();
- }
- return null;
- }
- });
- } catch (PrivilegedActionException e) {
- LOG.error("Client experienced a PrivilegedActionException exception while creating a TSaslClientTransport using a JAAS principal context:" + e, e);
- throw new RuntimeException(e);
- }
-
- return sasalTransport;
- }
-
- private String getPrincipal(Subject subject) {
- Set<Principal> principals = (Set<Principal>)subject.getPrincipals();
- if (principals==null || principals.size()<1) {
- LOG.info("No principal found in login subject");
- return null;
- }
- return ((Principal)(principals.toArray()[0])).getName();
- }
-
- /** A TransportFactory that wraps another one, but assumes a specified UGI
- * before calling through.
- *
- * This is used on the server side to assume the server's Principal when accepting
- * clients.
- */
- static class TUGIAssumingTransportFactory extends TTransportFactory {
- private final Subject subject;
- private final TTransportFactory wrapped;
-
- public TUGIAssumingTransportFactory(TTransportFactory wrapped, Subject subject) {
- this.wrapped = wrapped;
- this.subject = subject;
-
- Set<Principal> principals = (Set<Principal>)subject.getPrincipals();
- if (principals.size()>0)
- LOG.info("Service principal:"+ ((Principal)(principals.toArray()[0])).getName());
- }
-
- @Override
- public TTransport getTransport(final TTransport trans) {
- try {
- return Subject.doAs(subject,
- new PrivilegedExceptionAction<TTransport>() {
- public TTransport run() {
- try {
- return wrapped.getTransport(trans);
- }
- catch (Exception e) {
- LOG.error("Storm server failed to open transport to interact with a client during session initiation: " + e, e);
- return null;
- }
- }
- });
- } catch (PrivilegedActionException e) {
- LOG.error("Storm server experienced a PrivilegedActionException exception while creating a transport using a JAAS principal context:" + e, e);
- return null;
- }
- }
- }
-}
@@ -21,6 +21,9 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+/**
+ * Base class for SASL authentication plugin.
+ */
public abstract class SaslTransportPlugin implements ITransportPlugin {
protected Configuration login_conf;
private static final Logger LOG = LoggerFactory.getLogger(SaslTransportPlugin.class);
@@ -1,13 +1,16 @@
-package backtype.storm.security.auth;
+package backtype.storm.security.auth.authorizer;
import backtype.storm.Config;
+import backtype.storm.security.auth.IAuthorizer;
+import backtype.storm.security.auth.ReqContext;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* An authorization implementation that denies everything, for testing purposes
*/
-public class DenyAuthorizer implements IAuthorization {
+public class DenyAuthorizer implements IAuthorizer {
private static final Logger LOG = LoggerFactory.getLogger(DenyAuthorizer.class);
/**
@@ -1,13 +1,16 @@
-package backtype.storm.security.auth;
+package backtype.storm.security.auth.authorizer;
import backtype.storm.Config;
+import backtype.storm.security.auth.IAuthorizer;
+import backtype.storm.security.auth.ReqContext;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* A no-op authorization implementation that illustrate info available for authorization decisions.
*/
-public class NoopAuthorizer implements IAuthorization {
+public class NoopAuthorizer implements IAuthorizer {
private static final Logger LOG = LoggerFactory.getLogger(NoopAuthorizer.class);
/**
Oops, something went wrong.

0 comments on commit f4f2cdd

Please sign in to comment.