Permalink
Browse files

Authentication is now truly pluggable

  • Loading branch information...
1 parent 1e24e9f commit c729f4580e262f8f4eff851cc8e46f5bae0f0929 @anfeng anfeng committed Feb 19, 2013
View
@@ -16,6 +16,7 @@ storm.zookeeper.retry.times: 5
storm.zookeeper.retry.interval: 1000
storm.cluster.mode: "distributed" # can be distributed or local
storm.local.mode.zmq: false
+storm.thrift.transport: "backtype.storm.security.auth.SimpleTransportPlugin"
### nimbus.* configs are for the master
nimbus.host: "localhost"
@@ -65,6 +65,11 @@
public static String STORM_LOCAL_HOSTNAME = "storm.local.hostname";
/**
+ * The transport class for Thrift
+ */
+ public static String STORM_THRIFT_TRANSPORT_PLUGIN = "storm.thrift.transport";
+
+ /**
* The serializer class for ListDelegate (tuple payload).
* The default serializer will be ListDelegateSerializer
*/
@@ -11,37 +11,37 @@
import com.google.common.annotations.VisibleForTesting;
public class AnonymousAuthenticationProvider extends java.security.Provider {
+ private static final long serialVersionUID = -738189377355473270L;
+
public AnonymousAuthenticationProvider() {
- super("ThriftSaslAnonymous", 1.0, "Thrift Anonymous SASL provider");
- put("SaslClientFactory.ANONYMOUS", SaslAnonymousFactory.class.getName());
- put("SaslServerFactory.ANONYMOUS", SaslAnonymousFactory.class.getName());
+ super("ThriftSaslAnonymous", 1.0, "Thrift Anonymous SASL provider");
+ put("SaslClientFactory.ANONYMOUS", SaslAnonymousFactory.class.getName());
+ put("SaslServerFactory.ANONYMOUS", SaslAnonymousFactory.class.getName());
}
public static class SaslAnonymousFactory implements SaslClientFactory, SaslServerFactory {
- @Override
- public SaslClient createSaslClient(String[] mechanisms, String authorizationId, String protocol,
- String serverName, Map<String,?> props, CallbackHandler cbh)
- {
- for (String mech : mechanisms) {
- if ("ANONYMOUS".equals(mech)) {
- return new AnonymousClient(authorizationId);
- }
- }
- return null;
- }
-
- @Override
- public SaslServer createSaslServer(String mechanism, String protocol,
- String serverName, Map<String,?> props, CallbackHandler cbh)
- {
- if ("ANONYMOUS".equals(mechanism)) {
- return new AnonymousServer();
- }
- return null;
- }
- public String[] getMechanismNames(Map<String, ?> props) {
- return new String[] { "ANONYMOUS" };
- }
+ public SaslClient createSaslClient(String[] mechanisms, String authorizationId, String protocol,
+ String serverName, Map<String,?> props, CallbackHandler cbh)
+ {
+ for (String mech : mechanisms) {
+ if ("ANONYMOUS".equals(mech)) {
+ return new AnonymousClient(authorizationId);
+ }
+ }
+ return null;
+ }
+
+ public SaslServer createSaslServer(String mechanism, String protocol,
+ String serverName, Map<String,?> props, CallbackHandler cbh)
+ {
+ if ("ANONYMOUS".equals(mechanism)) {
+ return new AnonymousServer();
+ }
+ return null;
+ }
+ public String[] getMechanismNames(Map<String, ?> props) {
+ return new String[] { "ANONYMOUS" };
+ }
}
}
@@ -52,48 +52,48 @@ public SaslServer createSaslServer(String mechanism, String protocol,
private boolean hasProvidedInitialResponse;
public AnonymousClient(String username) {
- if (username == null) {
- this.username = "anonymous";
- } else {
- this.username = username;
- }
+ if (username == null) {
+ this.username = "anonymous";
+ } else {
+ this.username = username;
+ }
}
public String getMechanismName() {
- return "ANONYMOUS";
+ return "ANONYMOUS";
}
public boolean hasInitialResponse() {
- return true;
+ return true;
}
public byte[] evaluateChallenge(byte[] challenge) throws SaslException {
- if (hasProvidedInitialResponse) {
- throw new SaslException("Already complete!");
- }
+ if (hasProvidedInitialResponse) {
+ throw new SaslException("Already complete!");
+ }
- try {
- hasProvidedInitialResponse = true;
- return username.getBytes("UTF-8");
- } catch (IOException e) {
- throw new SaslException(e.toString());
- }
+ try {
+ hasProvidedInitialResponse = true;
+ return username.getBytes("UTF-8");
+ } catch (IOException e) {
+ throw new SaslException(e.toString());
+ }
}
public boolean isComplete() {
- return hasProvidedInitialResponse;
+ return hasProvidedInitialResponse;
}
public byte[] unwrap(byte[] incoming, int offset, int len) {
- throw new UnsupportedOperationException();
+ throw new UnsupportedOperationException();
}
public byte[] wrap(byte[] outgoing, int offset, int len) {
- throw new UnsupportedOperationException();
+ throw new UnsupportedOperationException();
}
public Object getNegotiatedProperty(String propName) {
- return null;
+ return null;
}
public void dispose() {}
@@ -103,36 +103,36 @@ public void dispose() {}
private String user;
public String getMechanismName() {
- return "ANONYMOUS";
+ return "ANONYMOUS";
}
public byte[] evaluateResponse(byte[] response) throws SaslException {
- try {
- this.user = new String(response, "UTF-8");
- } catch (IOException e) {
- throw new SaslException(e.toString());
- }
- return null;
+ try {
+ this.user = new String(response, "UTF-8");
+ } catch (IOException e) {
+ throw new SaslException(e.toString());
+ }
+ return null;
}
public boolean isComplete() {
- return user != null;
+ return user != null;
}
public String getAuthorizationID() {
- return user;
+ return user;
}
public byte[] unwrap(byte[] incoming, int offset, int len) {
- throw new UnsupportedOperationException();
+ throw new UnsupportedOperationException();
}
public byte[] wrap(byte[] outgoing, int offset, int len) {
- throw new UnsupportedOperationException();
+ throw new UnsupportedOperationException();
}
public Object getNegotiatedProperty(String propName) {
- return null;
+ return null;
}
public void dispose() {}
@@ -0,0 +1,46 @@
+package backtype.storm.security.auth;
+
+import java.io.IOException;
+import javax.security.auth.login.Configuration;
+import org.apache.thrift7.transport.TSaslClientTransport;
+import org.apache.thrift7.transport.TSaslServerTransport;
+import org.apache.thrift7.transport.TTransport;
+import org.apache.thrift7.transport.TTransportException;
+import org.apache.thrift7.transport.TTransportFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class AnonymousSaslTransportPlugin extends SaslTransportPlugin {
+ static {
+ java.security.Security.addProvider(new AnonymousAuthenticationProvider());
+ }
+
+ public static final String ANONYMOUS = "ANONYMOUS";
+ private static final Logger LOG = LoggerFactory.getLogger(AnonymousSaslTransportPlugin.class);
+
+ public AnonymousSaslTransportPlugin(Configuration login_conf) {
+ super(login_conf);
+ }
+
+ public TTransportFactory getServerTransportFactory() throws IOException {
+ //create a transport factory that will invoke our auth callback for digest
+ TSaslServerTransport.Factory factory = new TSaslServerTransport.Factory();
+ factory.addServerDefinition(ANONYMOUS, AuthUtils.SERVICE, "localhost", null, null);
+ LOG.info("SASL ANONYMOUS transport factory will be used");
+ return factory;
+ }
+
+ public TTransport connect(TTransport transport, String serverHost)
+ throws TTransportException, IOException {
+ TSaslClientTransport wrapper_transport = new TSaslClientTransport(ANONYMOUS,
+ null,
+ AuthUtils.SERVICE,
+ serverHost,
+ null,
+ null,
+ transport);
+ LOG.debug("SASL ANONYMOUS client transport is being established");
+ wrapper_transport.open();
+ return wrapper_transport;
+ }
+}
@@ -4,37 +4,72 @@
import javax.security.auth.login.AppConfigurationEntry;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+
+import backtype.storm.Config;
+import backtype.storm.utils.Utils;
+
import java.io.IOException;
+import java.util.Map;
public class AuthUtils {
- public static String LoginContextServer = "StormServer";
- public static String LoginContextClient = "StormClient";
- public static final String DIGEST = "DIGEST-MD5";
- public static final String ANONYMOUS = "ANONYMOUS";
- public static final String KERBEROS = "GSSAPI";
- public static final String SERVICE = "storm_thrift_server";
- private static final Logger LOG = LoggerFactory.getLogger(AuthUtils.class);
-
- public static synchronized Configuration getConfiguration(String loginConfigurationFile) {
- Configuration.setConfiguration(null);
- System.setProperty("java.security.auth.login.config", loginConfigurationFile);
- return Configuration.getConfiguration();
- }
-
- public static String get(Configuration configuration, String section, String key) throws IOException {
- AppConfigurationEntry configurationEntries[] = configuration.getAppConfigurationEntry(section);
- if (configurationEntries == null) {
- String errorMessage = "Could not find a '"+ section + "' entry in this configuration.";
- LOG.error(errorMessage);
- throw new IOException(errorMessage);
- }
-
- for(AppConfigurationEntry entry: configurationEntries) {
- Object val = entry.getOptions().get(key);
- if (val != null)
- return (String)val;
- }
- return null;
- }
+ public static final String LOGIN_CONTEXT_SERVER = "StormServer";
+ public static final String LOGIN_CONTEXT_CLIENT = "StormClient";
+ public static final String SERVICE = "storm_thrift_server";
+ private static final Logger LOG = LoggerFactory.getLogger(AuthUtils.class);
+
+ /**
+ * Construct a JAAS configuration object per the given file
+ * @param storm_conf Storm configuration
+ * @return
+ */
+ public static synchronized Configuration GetConfiguration(Map storm_conf) {
+ Configuration.setConfiguration(null);
+
+ //exam system property first
+ String loginConfigurationFile = System.getProperty("java.security.auth.login.config");
+
+ //if not defined, examine Storm configuration
+ if (loginConfigurationFile==null)
+ loginConfigurationFile = (String)storm_conf.get("java.security.auth.login.config");
+ else if (loginConfigurationFile.length()==0)
+ loginConfigurationFile = (String)storm_conf.get("java.security.auth.login.config");
+
+ if (loginConfigurationFile == null) return null;
+ System.setProperty("java.security.auth.login.config", loginConfigurationFile);
+ return Configuration.getConfiguration();
+ }
+
+ /**
+ * Construct a transport plugin per storm configuration
+ * @param conf storm configuration
+ * @return
+ */
+ public static ITransportPlugin GetTransportPlugin(Map storm_conf, Configuration login_conf) {
+ ITransportPlugin transportPlugin = null;
+ try {
+ String transport_klassName = (String) storm_conf.get(Config.STORM_THRIFT_TRANSPORT_PLUGIN);
+ Class klass = Class.forName(transport_klassName);
+ transportPlugin = (ITransportPlugin)klass.getConstructor(Configuration.class).newInstance(login_conf);
+ } catch(Exception e) {
+ throw new RuntimeException(e);
+ }
+ return transportPlugin;
+ }
+
+ public static String get(Configuration configuration, String section, String key) throws IOException {
+ AppConfigurationEntry configurationEntries[] = configuration.getAppConfigurationEntry(section);
+ if (configurationEntries == null) {
+ String errorMessage = "Could not find a '"+ section + "' entry in this configuration.";
+ LOG.error(errorMessage);
+ throw new IOException(errorMessage);
+ }
+
+ for(AppConfigurationEntry entry: configurationEntries) {
+ Object val = entry.getOptions().get(key);
+ if (val != null)
+ return (String)val;
+ }
+ return null;
+ }
}
@@ -19,14 +19,11 @@
* @return true if the request is authorized, false if reject
*/
public boolean permit(ReqContext context) {
- LOG.info("Access "
- + " from: " +
- (context.remoteAddress() == null
- ? "null" : context.remoteAddress().toString())
- + " principal:"+ (context.principal() == null
- ? "null" : context.principal())
- +" op:"+context.operation()
- + " topoology:"+context.topologyConf().get(Config.TOPOLOGY_NAME));
- return false;
+ LOG.info("Access "
+ + " from: " + (context.remoteAddress() == null? "null" : context.remoteAddress().toString())
+ + " principal:"+ (context.principal() == null? "null" : context.principal())
+ +" op:"+context.operation()
+ + " topoology:"+context.topologyConf().get(Config.TOPOLOGY_NAME));
+ return false;
}
}
Oops, something went wrong.

0 comments on commit c729f45

Please sign in to comment.