Permalink
Browse files

enable transport plugin to be given via JAR file (storm.thrift.transp…

…ort.class+storm.thrift.transport.jar)
  • Loading branch information...
1 parent 30bd164 commit 2420876b665456502af8b3ceff8a8fb50a67838f afeng committed Feb 20, 2013
View
2 conf/defaults.yaml
@@ -16,7 +16,7 @@ storm.zookeeper.retry.times: 5
storm.zookeeper.retry.interval: 1000
storm.cluster.mode: "distributed" # can be distributed or local
storm.local.mode.zmq: false
-storm.thrift.transport: "backtype.storm.security.auth.SimpleTransportPlugin"
+storm.thrift.transport.class: "backtype.storm.security.auth.SimpleTransportPlugin"
### nimbus.* configs are for the master
nimbus.host: "localhost"
View
5 src/jvm/backtype/storm/Config.java
@@ -65,9 +65,10 @@
public static String STORM_LOCAL_HOSTNAME = "storm.local.hostname";
/**
- * The transport class for Thrift
+ * The transport plug-in for Thrift client/server communication
*/
- public static String STORM_THRIFT_TRANSPORT_PLUGIN = "storm.thrift.transport";
+ public static String STORM_THRIFT_TRANSPORT_PLUGIN_CLASS = "storm.thrift.transport.class";
+ public static String STORM_THRIFT_TRANSPORT_PLUGIN_JAR = "storm.thrift.transport.jar";
/**
* The serializer class for ListDelegate (tuple payload).
View
142 src/jvm/backtype/storm/security/auth/AnonymousAuthenticationProvider.java
@@ -1,142 +0,0 @@
-package backtype.storm.security.auth;
-
-import java.io.IOException;
-import java.util.Map;
-import javax.security.auth.callback.CallbackHandler;
-import javax.security.sasl.SaslClient;
-import javax.security.sasl.SaslClientFactory;
-import javax.security.sasl.SaslServerFactory;
-import javax.security.sasl.SaslException;
-import javax.security.sasl.SaslServer;
-import com.google.common.annotations.VisibleForTesting;
-
-public class AnonymousAuthenticationProvider extends java.security.Provider {
- private static final long serialVersionUID = -738189377355473270L;
-
- public AnonymousAuthenticationProvider() {
- super("ThriftSaslAnonymous", 1.0, "Thrift Anonymous SASL provider");
- put("SaslClientFactory.ANONYMOUS", SaslAnonymousFactory.class.getName());
- put("SaslServerFactory.ANONYMOUS", SaslAnonymousFactory.class.getName());
- }
-
- public static class SaslAnonymousFactory implements SaslClientFactory, SaslServerFactory {
- public SaslClient createSaslClient(String[] mechanisms, String authorizationId, String protocol,
- String serverName, Map<String,?> props, CallbackHandler cbh)
- {
- for (String mech : mechanisms) {
- if ("ANONYMOUS".equals(mech)) {
- return new AnonymousClient(authorizationId);
- }
- }
- return null;
- }
-
- public SaslServer createSaslServer(String mechanism, String protocol,
- String serverName, Map<String,?> props, CallbackHandler cbh)
- {
- if ("ANONYMOUS".equals(mechanism)) {
- return new AnonymousServer();
- }
- return null;
- }
- public String[] getMechanismNames(Map<String, ?> props) {
- return new String[] { "ANONYMOUS" };
- }
- }
-}
-
-
-class AnonymousClient implements SaslClient {
- @VisibleForTesting
- final String username;
- private boolean hasProvidedInitialResponse;
-
- public AnonymousClient(String username) {
- if (username == null) {
- this.username = "anonymous";
- } else {
- this.username = username;
- }
- }
-
- public String getMechanismName() {
- return "ANONYMOUS";
- }
-
- public boolean hasInitialResponse() {
- return true;
- }
-
- public byte[] evaluateChallenge(byte[] challenge) throws SaslException {
- if (hasProvidedInitialResponse) {
- throw new SaslException("Already complete!");
- }
-
- try {
- hasProvidedInitialResponse = true;
- return username.getBytes("UTF-8");
- } catch (IOException e) {
- throw new SaslException(e.toString());
- }
- }
-
- public boolean isComplete() {
- return hasProvidedInitialResponse;
- }
-
- public byte[] unwrap(byte[] incoming, int offset, int len) {
- throw new UnsupportedOperationException();
- }
-
- public byte[] wrap(byte[] outgoing, int offset, int len) {
- throw new UnsupportedOperationException();
- }
-
- public Object getNegotiatedProperty(String propName) {
- return null;
- }
-
- public void dispose() {}
-}
-
-class AnonymousServer implements SaslServer {
- private String user;
-
- public String getMechanismName() {
- return "ANONYMOUS";
- }
-
- public byte[] evaluateResponse(byte[] response) throws SaslException {
- try {
- this.user = new String(response, "UTF-8");
- } catch (IOException e) {
- throw new SaslException(e.toString());
- }
- return null;
- }
-
- public boolean isComplete() {
- return user != null;
- }
-
- public String getAuthorizationID() {
- return user;
- }
-
- public byte[] unwrap(byte[] incoming, int offset, int len) {
- throw new UnsupportedOperationException();
- }
-
- public byte[] wrap(byte[] outgoing, int offset, int len) {
- throw new UnsupportedOperationException();
- }
-
- public Object getNegotiatedProperty(String propName) {
- return null;
- }
-
- public void dispose() {}
-}
-
-
-
View
46 src/jvm/backtype/storm/security/auth/AnonymousSaslTransportPlugin.java
@@ -1,46 +0,0 @@
-package backtype.storm.security.auth;
-
-import java.io.IOException;
-import javax.security.auth.login.Configuration;
-import org.apache.thrift7.transport.TSaslClientTransport;
-import org.apache.thrift7.transport.TSaslServerTransport;
-import org.apache.thrift7.transport.TTransport;
-import org.apache.thrift7.transport.TTransportException;
-import org.apache.thrift7.transport.TTransportFactory;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class AnonymousSaslTransportPlugin extends SaslTransportPlugin {
- static {
- java.security.Security.addProvider(new AnonymousAuthenticationProvider());
- }
-
- public static final String ANONYMOUS = "ANONYMOUS";
- private static final Logger LOG = LoggerFactory.getLogger(AnonymousSaslTransportPlugin.class);
-
- public AnonymousSaslTransportPlugin(Configuration login_conf) {
- super(login_conf);
- }
-
- public TTransportFactory getServerTransportFactory() throws IOException {
- //create a transport factory that will invoke our auth callback for digest
- TSaslServerTransport.Factory factory = new TSaslServerTransport.Factory();
- factory.addServerDefinition(ANONYMOUS, AuthUtils.SERVICE, "localhost", null, null);
- LOG.info("SASL ANONYMOUS transport factory will be used");
- return factory;
- }
-
- public TTransport connect(TTransport transport, String serverHost)
- throws TTransportException, IOException {
- TSaslClientTransport wrapper_transport = new TSaslClientTransport(ANONYMOUS,
- null,
- AuthUtils.SERVICE,
- serverHost,
- null,
- null,
- transport);
- LOG.debug("SASL ANONYMOUS client transport is being established");
- wrapper_transport.open();
- return wrapper_transport;
- }
-}
View
15 src/jvm/backtype/storm/security/auth/AuthUtils.java
@@ -9,6 +9,8 @@
import backtype.storm.utils.Utils;
import java.io.IOException;
+import java.net.URL;
+import java.net.URLClassLoader;
import java.util.Map;
public class AuthUtils {
@@ -47,8 +49,17 @@ else if (loginConfigurationFile.length()==0)
public static ITransportPlugin GetTransportPlugin(Map storm_conf, Configuration login_conf) {
ITransportPlugin transportPlugin = null;
try {
- String transport_klassName = (String) storm_conf.get(Config.STORM_THRIFT_TRANSPORT_PLUGIN);
- Class klass = Class.forName(transport_klassName);
+ String transport_plugin_klassName = (String) storm_conf.get(Config.STORM_THRIFT_TRANSPORT_PLUGIN_CLASS);
+ String transport_plugin_jar = (String) storm_conf.get(Config.STORM_THRIFT_TRANSPORT_PLUGIN_JAR);
+ Class klass = null;
+ if (transport_plugin_jar==null) klass = Class.forName(transport_plugin_klassName);
+ else {
+ URL url = new URL("jar:file:" + transport_plugin_jar + "!/");
+ LOG.debug("Plugin URL:"+url);
+ URL[] urls = new URL[] { url };
+ ClassLoader loader = new URLClassLoader(urls);
+ klass = loader.loadClass(transport_plugin_klassName);
+ }
transportPlugin = (ITransportPlugin)klass.getConstructor(Configuration.class).newInstance(login_conf);
} catch(Exception e) {
throw new RuntimeException(e);
View
13 src/jvm/backtype/storm/security/auth/DigestSaslTransportPlugin.java
@@ -1,28 +1,17 @@
package backtype.storm.security.auth;
import java.io.IOException;
-import java.util.Map;
-
-import javax.security.auth.Subject;
import javax.security.auth.callback.CallbackHandler;
-import javax.security.auth.kerberos.KerberosTicket;
import javax.security.auth.login.Configuration;
-import org.apache.thrift7.transport.TFramedTransport;
import org.apache.thrift7.transport.TSaslClientTransport;
import org.apache.thrift7.transport.TSaslServerTransport;
-import org.apache.thrift7.transport.TServerSocket;
import org.apache.thrift7.transport.TTransport;
import org.apache.thrift7.transport.TTransportException;
import org.apache.thrift7.transport.TTransportFactory;
-import org.apache.zookeeper.Login;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.esotericsoftware.minlog.Log;
-
-import backtype.storm.utils.Utils;
-
public class DigestSaslTransportPlugin extends SaslTransportPlugin {
public static final String DIGEST = "DIGEST-MD5";
private static final Logger LOG = LoggerFactory.getLogger(DigestSaslTransportPlugin.class);
@@ -33,7 +22,7 @@
public DigestSaslTransportPlugin(Configuration login_conf) {
super(login_conf);
}
-
+
protected TTransportFactory getServerTransportFactory() throws IOException {
//create an authentication callback handler
CallbackHandler serer_callback_handler = new SaslServerCallbackHandler(login_conf);
View
5 src/jvm/backtype/storm/security/auth/KerberosSaslTransportPlugin.java
@@ -27,9 +27,6 @@
public static final String KERBEROS = "GSSAPI";
private static final Logger LOG = LoggerFactory.getLogger(KerberosSaslTransportPlugin.class);
- /**
- * constructor
- */
public KerberosSaslTransportPlugin(Configuration login_conf) {
super(login_conf);
}
@@ -147,7 +144,7 @@ private String getPrincipal(Subject subject) {
}
return ((Principal)(principals.toArray()[0])).getName();
}
-
+
/** A TransportFactory that wraps another one, but assumes a specified UGI
* before calling through.
*
View
14 src/jvm/backtype/storm/security/auth/NoopAuthorizer.java
@@ -19,13 +19,11 @@
* @return true if the request is authorized, false if reject
*/
public boolean permit(ReqContext context) {
- LOG.info("Access "
- + " from: " + context.remoteAddress() == null
- ? "null" : context.remoteAddress().toString()
- + " principal:"+context.principal() == null
- ? "null" : context.principal()
- +" op:"+context.operation()
- + " topoology:"+ context.topologyConf().get(Config.TOPOLOGY_NAME));
- return true;
+ LOG.info("Access "
+ + " from: " + (context.remoteAddress() == null? "null" : context.remoteAddress().toString())
+ + " principal:"+(context.principal() == null? "null" : context.principal())
+ +" op:"+context.operation()
+ + " topoology:"+ context.topologyConf().get(Config.TOPOLOGY_NAME));
+ return true;
}
}
View
166 src/jvm/backtype/storm/security/auth/SaslClientCallbackHandler.java
@@ -18,90 +18,90 @@
* SASL client side callback handler.
*/
public class SaslClientCallbackHandler implements CallbackHandler {
- private static final String USERNAME = "username";
- private static final String PASSWORD = "password";
- private static final Logger LOG = LoggerFactory.getLogger(SaslClientCallbackHandler.class);
- private String _username = null;
- private String _password = null;
+ private static final String USERNAME = "username";
+ private static final String PASSWORD = "password";
+ private static final Logger LOG = LoggerFactory.getLogger(SaslClientCallbackHandler.class);
+ private String _username = null;
+ private String _password = null;
- /**
- * Constructor based on a JAAS configuration
- *
- * For digest, you should have a pair of user name and password defined in this figgure.
- *
- * @param configuration
- * @throws IOException
- */
- public SaslClientCallbackHandler(Configuration configuration) throws IOException {
- if (configuration == null) return;
- AppConfigurationEntry configurationEntries[] = configuration.getAppConfigurationEntry(AuthUtils.LOGIN_CONTEXT_CLIENT);
- if (configurationEntries == null) {
- String errorMessage = "Could not find a '"+AuthUtils.LOGIN_CONTEXT_CLIENT
- + "' entry in this configuration: Client cannot start.";
- LOG.error(errorMessage);
- throw new IOException(errorMessage);
- }
+ /**
+ * Constructor based on a JAAS configuration
+ *
+ * For digest, you should have a pair of user name and password defined in this figgure.
+ *
+ * @param configuration
+ * @throws IOException
+ */
+ public SaslClientCallbackHandler(Configuration configuration) throws IOException {
+ if (configuration == null) return;
+ AppConfigurationEntry configurationEntries[] = configuration.getAppConfigurationEntry(AuthUtils.LOGIN_CONTEXT_CLIENT);
+ if (configurationEntries == null) {
+ String errorMessage = "Could not find a '"+AuthUtils.LOGIN_CONTEXT_CLIENT
+ + "' entry in this configuration: Client cannot start.";
+ LOG.error(errorMessage);
+ throw new IOException(errorMessage);
+ }
- for(AppConfigurationEntry entry: configurationEntries) {
- if (entry.getOptions().get(USERNAME) != null) {
- _username = (String)entry.getOptions().get(USERNAME);
- }
- if (entry.getOptions().get(PASSWORD) != null) {
- _password = (String)entry.getOptions().get(PASSWORD);
- }
- }
- }
+ for(AppConfigurationEntry entry: configurationEntries) {
+ if (entry.getOptions().get(USERNAME) != null) {
+ _username = (String)entry.getOptions().get(USERNAME);
+ }
+ if (entry.getOptions().get(PASSWORD) != null) {
+ _password = (String)entry.getOptions().get(PASSWORD);
+ }
+ }
+ }
- /**
- * This method is invoked by SASL for authentication challenges
- * @param callbacks a collection of challenge callbacks
- */
- public void handle(Callback[] callbacks) throws IOException, UnsupportedCallbackException {
- for (Callback c : callbacks) {
- if (c instanceof NameCallback) {
- LOG.debug("name callback");
- NameCallback nc = (NameCallback) c;
- nc.setName(_username);
- } else if (c instanceof PasswordCallback) {
- LOG.debug("password callback");
- PasswordCallback pc = (PasswordCallback)c;
- if (_password != null) {
- pc.setPassword(_password.toCharArray());
- } else {
- LOG.warn("Could not login: the client is being asked for a password, but the " +
- " client code does not currently support obtaining a password from the user." +
- " Make sure that the client is configured to use a ticket cache (using" +
- " the JAAS configuration setting 'useTicketCache=true)' and restart the client. If" +
- " you still get this message after that, the TGT in the ticket cache has expired and must" +
- " be manually refreshed. To do so, first determine if you are using a password or a" +
- " keytab. If the former, run kinit in a Unix shell in the environment of the user who" +
- " is running this client using the command" +
- " 'kinit <princ>' (where <princ> is the name of the client's Kerberos principal)." +
- " If the latter, do" +
- " 'kinit -k -t <keytab> <princ>' (where <princ> is the name of the Kerberos principal, and" +
- " <keytab> is the location of the keytab file). After manually refreshing your cache," +
- " restart this client. If you continue to see this message after manually refreshing" +
- " your cache, ensure that your KDC host's clock is in sync with this host's clock.");
- }
- } else if (c instanceof AuthorizeCallback) {
- LOG.debug("authorization callback");
- AuthorizeCallback ac = (AuthorizeCallback) c;
- String authid = ac.getAuthenticationID();
- String authzid = ac.getAuthorizationID();
- if (authid.equals(authzid)) {
- ac.setAuthorized(true);
- } else {
- ac.setAuthorized(false);
- }
- if (ac.isAuthorized()) {
- ac.setAuthorizedID(authzid);
- }
- } else if (c instanceof RealmCallback) {
- RealmCallback rc = (RealmCallback) c;
- ((RealmCallback) c).setText(rc.getDefaultText());
- } else {
- throw new UnsupportedCallbackException(c);
- }
- }
- }
+ /**
+ * This method is invoked by SASL for authentication challenges
+ * @param callbacks a collection of challenge callbacks
+ */
+ public void handle(Callback[] callbacks) throws IOException, UnsupportedCallbackException {
+ for (Callback c : callbacks) {
+ if (c instanceof NameCallback) {
+ LOG.debug("name callback");
+ NameCallback nc = (NameCallback) c;
+ nc.setName(_username);
+ } else if (c instanceof PasswordCallback) {
+ LOG.debug("password callback");
+ PasswordCallback pc = (PasswordCallback)c;
+ if (_password != null) {
+ pc.setPassword(_password.toCharArray());
+ } else {
+ LOG.warn("Could not login: the client is being asked for a password, but the " +
+ " client code does not currently support obtaining a password from the user." +
+ " Make sure that the client is configured to use a ticket cache (using" +
+ " the JAAS configuration setting 'useTicketCache=true)' and restart the client. If" +
+ " you still get this message after that, the TGT in the ticket cache has expired and must" +
+ " be manually refreshed. To do so, first determine if you are using a password or a" +
+ " keytab. If the former, run kinit in a Unix shell in the environment of the user who" +
+ " is running this client using the command" +
+ " 'kinit <princ>' (where <princ> is the name of the client's Kerberos principal)." +
+ " If the latter, do" +
+ " 'kinit -k -t <keytab> <princ>' (where <princ> is the name of the Kerberos principal, and" +
+ " <keytab> is the location of the keytab file). After manually refreshing your cache," +
+ " restart this client. If you continue to see this message after manually refreshing" +
+ " your cache, ensure that your KDC host's clock is in sync with this host's clock.");
+ }
+ } else if (c instanceof AuthorizeCallback) {
+ LOG.debug("authorization callback");
+ AuthorizeCallback ac = (AuthorizeCallback) c;
+ String authid = ac.getAuthenticationID();
+ String authzid = ac.getAuthorizationID();
+ if (authid.equals(authzid)) {
+ ac.setAuthorized(true);
+ } else {
+ ac.setAuthorized(false);
+ }
+ if (ac.isAuthorized()) {
+ ac.setAuthorizedID(authzid);
+ }
+ } else if (c instanceof RealmCallback) {
+ RealmCallback rc = (RealmCallback) c;
+ ((RealmCallback) c).setText(rc.getDefaultText());
+ } else {
+ throw new UnsupportedCallbackException(c);
+ }
+ }
+ }
}
View
182 src/jvm/backtype/storm/security/auth/SaslServerCallbackHandler.java
@@ -20,109 +20,109 @@
* SASL server side collback handler
*/
public class SaslServerCallbackHandler implements CallbackHandler {
- private static final String USER_PREFIX = "user_";
- private static final Logger LOG = LoggerFactory.getLogger(SaslServerCallbackHandler.class);
- private static final String SYSPROP_SUPER_PASSWORD = "storm.SASLAuthenticationProvider.superPassword";
- private static final String SYSPROP_REMOVE_HOST = "storm.kerberos.removeHostFromPrincipal";
- private static final String SYSPROP_REMOVE_REALM = "storm.kerberos.removeRealmFromPrincipal";
+ private static final String USER_PREFIX = "user_";
+ private static final Logger LOG = LoggerFactory.getLogger(SaslServerCallbackHandler.class);
+ private static final String SYSPROP_SUPER_PASSWORD = "storm.SASLAuthenticationProvider.superPassword";
+ private static final String SYSPROP_REMOVE_HOST = "storm.kerberos.removeHostFromPrincipal";
+ private static final String SYSPROP_REMOVE_REALM = "storm.kerberos.removeRealmFromPrincipal";
- private String userName;
- private final Map<String,String> credentials = new HashMap<String,String>();
+ private String userName;
+ private final Map<String,String> credentials = new HashMap<String,String>();
- public SaslServerCallbackHandler(Configuration configuration) throws IOException {
- if (configuration==null) return;
+ public SaslServerCallbackHandler(Configuration configuration) throws IOException {
+ if (configuration==null) return;
- AppConfigurationEntry configurationEntries[] = configuration.getAppConfigurationEntry(AuthUtils.LOGIN_CONTEXT_SERVER);
- if (configurationEntries == null) {
- String errorMessage = "Could not find a '"+AuthUtils.LOGIN_CONTEXT_SERVER+"' entry in this configuration: Server cannot start.";
- LOG.error(errorMessage);
- throw new IOException(errorMessage);
- }
- credentials.clear();
- for(AppConfigurationEntry entry: configurationEntries) {
- Map<String,?> options = entry.getOptions();
- // Populate DIGEST-MD5 user -> password map with JAAS configuration entries from the "Server" section.
- // Usernames are distinguished from other options by prefixing the username with a "user_" prefix.
- for(Map.Entry<String, ?> pair : options.entrySet()) {
- String key = pair.getKey();
- if (key.startsWith(USER_PREFIX)) {
- String userName = key.substring(USER_PREFIX.length());
- credentials.put(userName,(String)pair.getValue());
- }
- }
- }
- }
+ AppConfigurationEntry configurationEntries[] = configuration.getAppConfigurationEntry(AuthUtils.LOGIN_CONTEXT_SERVER);
+ if (configurationEntries == null) {
+ String errorMessage = "Could not find a '"+AuthUtils.LOGIN_CONTEXT_SERVER+"' entry in this configuration: Server cannot start.";
+ LOG.error(errorMessage);
+ throw new IOException(errorMessage);
+ }
+ credentials.clear();
+ for(AppConfigurationEntry entry: configurationEntries) {
+ Map<String,?> options = entry.getOptions();
+ // Populate DIGEST-MD5 user -> password map with JAAS configuration entries from the "Server" section.
+ // Usernames are distinguished from other options by prefixing the username with a "user_" prefix.
+ for(Map.Entry<String, ?> pair : options.entrySet()) {
+ String key = pair.getKey();
+ if (key.startsWith(USER_PREFIX)) {
+ String userName = key.substring(USER_PREFIX.length());
+ credentials.put(userName,(String)pair.getValue());
+ }
+ }
+ }
+ }
- public void handle(Callback[] callbacks) throws UnsupportedCallbackException {
- for (Callback callback : callbacks) {
- if (callback instanceof NameCallback) {
- handleNameCallback((NameCallback) callback);
- } else if (callback instanceof PasswordCallback) {
- handlePasswordCallback((PasswordCallback) callback);
- } else if (callback instanceof RealmCallback) {
- handleRealmCallback((RealmCallback) callback);
- } else if (callback instanceof AuthorizeCallback) {
- handleAuthorizeCallback((AuthorizeCallback) callback);
- }
- }
- }
+ public void handle(Callback[] callbacks) throws UnsupportedCallbackException {
+ for (Callback callback : callbacks) {
+ if (callback instanceof NameCallback) {
+ handleNameCallback((NameCallback) callback);
+ } else if (callback instanceof PasswordCallback) {
+ handlePasswordCallback((PasswordCallback) callback);
+ } else if (callback instanceof RealmCallback) {
+ handleRealmCallback((RealmCallback) callback);
+ } else if (callback instanceof AuthorizeCallback) {
+ handleAuthorizeCallback((AuthorizeCallback) callback);
+ }
+ }
+ }
- private void handleNameCallback(NameCallback nc) {
+ private void handleNameCallback(NameCallback nc) {
LOG.debug("handleNameCallback");
- userName = nc.getDefaultName();
- nc.setName(nc.getDefaultName());
- }
+ userName = nc.getDefaultName();
+ nc.setName(nc.getDefaultName());
+ }
- private void handlePasswordCallback(PasswordCallback pc) {
+ private void handlePasswordCallback(PasswordCallback pc) {
LOG.debug("handlePasswordCallback");
- if ("super".equals(this.userName) && System.getProperty(SYSPROP_SUPER_PASSWORD) != null) {
- // superuser: use Java system property for password, if available.
- pc.setPassword(System.getProperty(SYSPROP_SUPER_PASSWORD).toCharArray());
- } else if (credentials.containsKey(userName) ) {
- pc.setPassword(credentials.get(userName).toCharArray());
- } else {
- LOG.warn("No password found for user: " + userName);
- }
- }
+ if ("super".equals(this.userName) && System.getProperty(SYSPROP_SUPER_PASSWORD) != null) {
+ // superuser: use Java system property for password, if available.
+ pc.setPassword(System.getProperty(SYSPROP_SUPER_PASSWORD).toCharArray());
+ } else if (credentials.containsKey(userName) ) {
+ pc.setPassword(credentials.get(userName).toCharArray());
+ } else {
+ LOG.warn("No password found for user: " + userName);
+ }
+ }
- private void handleRealmCallback(RealmCallback rc) {
- LOG.debug("handleRealmCallback: "+ rc.getDefaultText());
- rc.setText(rc.getDefaultText());
- }
+ private void handleRealmCallback(RealmCallback rc) {
+ LOG.debug("handleRealmCallback: "+ rc.getDefaultText());
+ rc.setText(rc.getDefaultText());
+ }
- private void handleAuthorizeCallback(AuthorizeCallback ac) {
- String authenticationID = ac.getAuthenticationID();
- LOG.debug("Successfully authenticated client: authenticationID=" + authenticationID);
- ac.setAuthorized(true);
+ private void handleAuthorizeCallback(AuthorizeCallback ac) {
+ String authenticationID = ac.getAuthenticationID();
+ LOG.debug("Successfully authenticated client: authenticationID=" + authenticationID);
+ ac.setAuthorized(true);
- // canonicalize authorization id according to system properties:
- // storm.kerberos.removeRealmFromPrincipal(={true,false})
- // storm.kerberos.removeHostFromPrincipal(={true,false})
- KerberosName kerberosName = new KerberosName(authenticationID);
- try {
- StringBuilder userNameBuilder = new StringBuilder(kerberosName.getShortName());
- if (shouldAppendHost(kerberosName)) {
- userNameBuilder.append("/").append(kerberosName.getHostName());
- }
- if (shouldAppendRealm(kerberosName)) {
- userNameBuilder.append("@").append(kerberosName.getRealm());
- }
- LOG.debug("Setting authorizedID: " + userNameBuilder);
- ac.setAuthorizedID(userNameBuilder.toString());
- } catch (IOException e) {
- LOG.error("Failed to set name based on Kerberos authentication rules.");
- }
- }
+ // canonicalize authorization id according to system properties:
+ // storm.kerberos.removeRealmFromPrincipal(={true,false})
+ // storm.kerberos.removeHostFromPrincipal(={true,false})
+ KerberosName kerberosName = new KerberosName(authenticationID);
+ try {
+ StringBuilder userNameBuilder = new StringBuilder(kerberosName.getShortName());
+ if (shouldAppendHost(kerberosName)) {
+ userNameBuilder.append("/").append(kerberosName.getHostName());
+ }
+ if (shouldAppendRealm(kerberosName)) {
+ userNameBuilder.append("@").append(kerberosName.getRealm());
+ }
+ LOG.debug("Setting authorizedID: " + userNameBuilder);
+ ac.setAuthorizedID(userNameBuilder.toString());
+ } catch (IOException e) {
+ LOG.error("Failed to set name based on Kerberos authentication rules.");
+ }
+ }
- private boolean shouldAppendRealm(KerberosName kerberosName) {
- return !isSystemPropertyTrue(SYSPROP_REMOVE_REALM) && kerberosName.getRealm() != null;
- }
+ private boolean shouldAppendRealm(KerberosName kerberosName) {
+ return !isSystemPropertyTrue(SYSPROP_REMOVE_REALM) && kerberosName.getRealm() != null;
+ }
- private boolean shouldAppendHost(KerberosName kerberosName) {
- return !isSystemPropertyTrue(SYSPROP_REMOVE_HOST) && kerberosName.getHostName() != null;
- }
+ private boolean shouldAppendHost(KerberosName kerberosName) {
+ return !isSystemPropertyTrue(SYSPROP_REMOVE_HOST) && kerberosName.getHostName() != null;
+ }
- private boolean isSystemPropertyTrue(String propertyName) {
- return "true".equals(System.getProperty(propertyName));
- }
+ private boolean isSystemPropertyTrue(String propertyName) {
+ return "true".equals(System.getProperty(propertyName));
+ }
}
View
4 src/jvm/backtype/storm/security/auth/SaslTransportPlugin.java
@@ -31,7 +31,7 @@
public SaslTransportPlugin(Configuration login_conf) {
this.login_conf = login_conf;
}
-
+
public TServer getServer(int port, TProcessor processor) throws IOException, TTransportException {
TTransportFactory serverTransportFactory = getServerTransportFactory();
@@ -46,7 +46,7 @@ public TServer getServer(int port, TProcessor processor) throws IOException, TTr
protocolFactory(new TBinaryProtocol.Factory());
if (serverTransportFactory != null)
server_args.transportFactory(serverTransportFactory);
-
+
//construct THsHaServer
return new TThreadPoolServer(server_args);
}
View
8 src/jvm/backtype/storm/security/auth/SimpleTransportPlugin.java
@@ -24,7 +24,9 @@
import org.slf4j.LoggerFactory;
/**
- * Basic transport for Thrift plugin
+ * Simple transport for Thrift plugin.
+ *
+ * This plugin is designed to be backward compatible with existing Storm code.
*/
public class SimpleTransportPlugin implements ITransportPlugin {
protected Configuration login_conf;
@@ -46,7 +48,7 @@ public TServer getServer(int port, TProcessor processor) throws IOException, TTr
processor(new SimpleWrapProcessor(processor)).
workerThreads(64).
protocolFactory(new TBinaryProtocol.Factory());
-
+
//construct THsHaServer
return new THsHaServer(server_args);
}
@@ -96,7 +98,7 @@ public boolean process(final TProtocol inProt, final TProtocol outProt) throws T
//anonymous user
req_context.setSubject(null);
-
+
//invoke service handler
try {
return wrapped.process(inProt, outProt);
View
27 src/jvm/backtype/storm/security/auth/ThriftServer.java
@@ -1,38 +1,11 @@
package backtype.storm.security.auth;
import java.util.Map;
-import java.util.Set;
-import java.util.TreeMap;
-import javax.security.sasl.Sasl;
-import javax.security.sasl.SaslServer;
-import javax.security.auth.callback.CallbackHandler;
-import javax.security.auth.kerberos.KerberosTicket;
import javax.security.auth.login.Configuration;
-import javax.security.auth.Subject;
-import java.io.IOException;
-import java.net.Socket;
-import java.security.Principal;
-import java.security.PrivilegedExceptionAction;
-import java.security.PrivilegedActionException;
-import org.apache.zookeeper.Login;
-import org.apache.zookeeper.server.auth.KerberosName;
-import org.apache.thrift7.TException;
import org.apache.thrift7.TProcessor;
-import org.apache.thrift7.server.THsHaServer;
import org.apache.thrift7.server.TServer;
-import org.apache.thrift7.server.TThreadPoolServer;
-import org.apache.thrift7.protocol.TBinaryProtocol;
-import org.apache.thrift7.protocol.TProtocol;
-import org.apache.thrift7.transport.TNonblockingServerSocket;
-import org.apache.thrift7.transport.TSaslServerTransport;
-import org.apache.thrift7.transport.TServerSocket;
-import org.apache.thrift7.transport.TServerTransport;
-import org.apache.thrift7.transport.TSocket;
-import org.apache.thrift7.transport.TTransport;
-import org.apache.thrift7.transport.TTransportFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import backtype.storm.security.auth.*;
import backtype.storm.utils.Utils;
public class ThriftServer {
View
135 test/clj/backtype/storm/security/auth/auth_test.clj
@@ -104,29 +104,30 @@
(^TopologyInfo getTopologyInfo [this ^String storm-id]))))
-(defn launch-test-server [server-port login-cfg aznClass transportPlugin]
+(defn launch-test-server [server-port login-cfg aznClass transportPluginClass transportPluginJAR]
(System/setProperty "java.security.auth.login.config" login-cfg)
(let [conf (merge (read-storm-config)
{NIMBUS-AUTHORIZATION-CLASSNAME aznClass
NIMBUS-HOST "localhost"
NIMBUS-THRIFT-PORT server-port
- STORM-THRIFT-TRANSPORT-PLUGIN transportPlugin})
+ STORM-THRIFT-TRANSPORT-PLUGIN-CLASS transportPluginClass
+ STORM-THRIFT-TRANSPORT-PLUGIN-JAR transportPluginJAR})
nimbus (nimbus/standalone-nimbus)
service-handler (dummy-service-handler conf nimbus)
server (ThriftServer. conf (Nimbus$Processor. service-handler) (int (conf NIMBUS-THRIFT-PORT)))]
(.addShutdownHook (Runtime/getRuntime) (Thread. (fn [] (.stop server))))
(.serve server)))
-(defn launch-server-w-wait [server-port ms login-cfg aznClass transportPlugin]
- (.start (Thread. #(launch-test-server server-port login-cfg aznClass transportPlugin)))
+(defn launch-server-w-wait [server-port ms login-cfg aznClass transportPluginClass transportPluginJAR]
+ (.start (Thread. #(launch-test-server server-port login-cfg aznClass transportPluginClass transportPluginJAR)))
(Thread/sleep ms))
(deftest Simple-authentication-test
- (launch-server-w-wait 6627 1000 "" nil "backtype.storm.security.auth.SimpleTransportPlugin")
+ (launch-server-w-wait 6627 1000 "" nil "backtype.storm.security.auth.SimpleTransportPlugin" nil)
(log-message "(Positive authentication) Server and Client with simple transport, no authentication")
(let [storm-conf (merge (read-storm-config)
- {STORM-THRIFT-TRANSPORT-PLUGIN "backtype.storm.security.auth.SimpleTransportPlugin"})
+ {STORM-THRIFT-TRANSPORT-PLUGIN-CLASS "backtype.storm.security.auth.SimpleTransportPlugin"})
client (NimbusClient. storm-conf "localhost" 6627 nimbus-timeout)
nimbus_client (.getClient client)]
(.activate nimbus_client "security_auth_test_topology")
@@ -136,7 +137,7 @@
(System/setProperty "java.security.auth.login.config" "test/clj/backtype/storm/security/auth/jaas_digest.conf")
(log-message "java.security.auth.login.config: " (System/getProperty "java.security.auth.login.config"))
(let [storm-conf (merge (read-storm-config)
- {STORM-THRIFT-TRANSPORT-PLUGIN "backtype.storm.security.auth.DigestSaslTransportPlugin"})]
+ {STORM-THRIFT-TRANSPORT-PLUGIN-CLASS "backtype.storm.security.auth.DigestSaslTransportPlugin"})]
(is (= "java.net.SocketTimeoutException: Read timed out"
(try (NimbusClient. storm-conf "localhost" 6627 nimbus-timeout)
nil
@@ -145,9 +146,10 @@
(deftest positive-authorization-test
(launch-server-w-wait 6628 1000 ""
"backtype.storm.security.auth.NoopAuthorizer"
- "backtype.storm.security.auth.SimpleTransportPlugin")
+ "backtype.storm.security.auth.SimpleTransportPlugin"
+ nil)
(let [storm-conf (merge (read-storm-config)
- {STORM-THRIFT-TRANSPORT-PLUGIN "backtype.storm.security.auth.SimpleTransportPlugin"})
+ {STORM-THRIFT-TRANSPORT-PLUGIN-CLASS "backtype.storm.security.auth.SimpleTransportPlugin"})
client (NimbusClient. storm-conf "localhost" 6628 nimbus-timeout)
nimbus_client (.getClient client)]
(log-message "(Positive authorization) Authorization plugin should accept client request")
@@ -157,9 +159,10 @@
(deftest deny-authorization-test
(launch-server-w-wait 6629 1000 ""
"backtype.storm.security.auth.DenyAuthorizer"
- "backtype.storm.security.auth.SimpleTransportPlugin")
+ "backtype.storm.security.auth.SimpleTransportPlugin"
+ nil)
(let [storm-conf (merge (read-storm-config)
- {STORM-THRIFT-TRANSPORT-PLUGIN "backtype.storm.security.auth.SimpleTransportPlugin"})
+ {STORM-THRIFT-TRANSPORT-PLUGIN-CLASS "backtype.storm.security.auth.SimpleTransportPlugin"})
client (NimbusClient. storm-conf "localhost" 6629 nimbus-timeout)
nimbus_client (.getClient client)]
(log-message "(Negative authorization) Authorization plugin should reject client request")
@@ -171,12 +174,12 @@
(launch-server-w-wait 6630 2000
"test/clj/backtype/storm/security/auth/jaas_digest.conf"
nil
- "backtype.storm.security.auth.DigestSaslTransportPlugin")
-
+ "backtype.storm.security.auth.DigestSaslTransportPlugin"
+ nil)
(log-message "(Positive authentication) valid digest authentication")
(System/setProperty "java.security.auth.login.config" "test/clj/backtype/storm/security/auth/jaas_digest.conf")
(let [storm-conf (merge (read-storm-config)
- {STORM-THRIFT-TRANSPORT-PLUGIN "backtype.storm.security.auth.DigestSaslTransportPlugin"})
+ {STORM-THRIFT-TRANSPORT-PLUGIN-CLASS "backtype.storm.security.auth.DigestSaslTransportPlugin"})
client (NimbusClient. storm-conf "localhost" 6630 nimbus-timeout)
nimbus_client (.getClient client)]
(.activate nimbus_client "security_auth_test_topology")
@@ -185,7 +188,7 @@
(log-message "(Negative authentication) Server: Digest vs. Client: Simple")
(System/setProperty "java.security.auth.login.config" "")
(let [storm-conf (merge (read-storm-config)
- {STORM-THRIFT-TRANSPORT-PLUGIN "backtype.storm.security.auth.SimpleTransportPlugin"})
+ {STORM-THRIFT-TRANSPORT-PLUGIN-CLASS "backtype.storm.security.auth.SimpleTransportPlugin"})
client (NimbusClient. storm-conf "localhost" 6630 nimbus-timeout)
nimbus_client (.getClient client)]
(is (thrown? TTransportException
@@ -195,7 +198,7 @@
(log-message "(Negative authentication) Invalid password")
(System/setProperty "java.security.auth.login.config" "test/clj/backtype/storm/security/auth/jaas_digest_bad_password.conf")
(let [storm-conf (merge (read-storm-config)
- {STORM-THRIFT-TRANSPORT-PLUGIN "backtype.storm.security.auth.DigestSaslTransportPlugin"})]
+ {STORM-THRIFT-TRANSPORT-PLUGIN-CLASS "backtype.storm.security.auth.DigestSaslTransportPlugin"})]
(is (= "Peer indicated failure: DIGEST-MD5: digest response format violation. Mismatched response."
(try (NimbusClient. storm-conf "localhost" 6630 nimbus-timeout)
nil
@@ -204,54 +207,62 @@
(log-message "(Negative authentication) Unknown user")
(System/setProperty "java.security.auth.login.config" "test/clj/backtype/storm/security/auth/jaas_digest_unknown_user.conf")
(let [storm-conf (merge (read-storm-config)
- {STORM-THRIFT-TRANSPORT-PLUGIN "backtype.storm.security.auth.DigestSaslTransportPlugin"})]
+ {STORM-THRIFT-TRANSPORT-PLUGIN-CLASS "backtype.storm.security.auth.DigestSaslTransportPlugin"})]
(is (= "Peer indicated failure: DIGEST-MD5: cannot acquire password for unknown_user in realm : localhost"
(try (NimbusClient. storm-conf "localhost" 6630 nimbus-timeout)
nil
(catch TTransportException ex (.getMessage ex)))))))
-(deftest anonymous-authentication-test
- (launch-server-w-wait 6625 1000 "" nil "backtype.storm.security.auth.AnonymousSaslTransportPlugin")
-
- (log-message "(Positive authentication) Server and Client with anonymous authentication")
- (let [storm-conf (merge (read-storm-config)
- {STORM-THRIFT-TRANSPORT-PLUGIN "backtype.storm.security.auth.AnonymousSaslTransportPlugin"})
- client (NimbusClient. storm-conf "localhost" 6625 nimbus-timeout)
- nimbus_client (.getClient client)]
- (.activate nimbus_client "security_auth_test_topology")
- (.close client))
-
- (log-message "(Negative authentication) Server: anonymous vs. Client: Digest")
- (System/setProperty "java.security.auth.login.config" "test/clj/backtype/storm/security/auth/jaas_digest.conf")
- (log-message "java.security.auth.login.config: " (System/getProperty "java.security.auth.login.config"))
- (let [storm-conf (merge (read-storm-config)
- {STORM-THRIFT-TRANSPORT-PLUGIN "backtype.storm.security.auth.DigestSaslTransportPlugin"})]
- (is (= "Peer indicated failure: Unsupported mechanism type DIGEST-MD5"
- (try (NimbusClient. storm-conf "localhost" 6625 nimbus-timeout)
- nil
- (catch TTransportException ex (.getMessage ex)))))))
-
-(deftest anonymous-positive-authorization-test
- (launch-server-w-wait 6623 1000 ""
- "backtype.storm.security.auth.NoopAuthorizer"
- "backtype.storm.security.auth.AnonymousSaslTransportPlugin")
- (let [storm-conf (merge (read-storm-config)
- {STORM-THRIFT-TRANSPORT-PLUGIN "backtype.storm.security.auth.AnonymousSaslTransportPlugin"})
- client (NimbusClient. storm-conf "localhost" 6623 nimbus-timeout)
- nimbus_client (.getClient client)]
- (log-message "(Positive authorization) Authorization plugin should accept client request")
- (.activate nimbus_client "security_auth_test_topology")
- (.close client)))
-
-(deftest anonymous-deny-authorization-test
- (launch-server-w-wait 6624 1000 ""
- "backtype.storm.security.auth.DenyAuthorizer"
- "backtype.storm.security.auth.AnonymousSaslTransportPlugin")
- (let [storm-conf (merge (read-storm-config)
- {STORM-THRIFT-TRANSPORT-PLUGIN "backtype.storm.security.auth.AnonymousSaslTransportPlugin"})
- client (NimbusClient. storm-conf "localhost" 6624 nimbus-timeout)
- nimbus_client (.getClient client)]
- (log-message "(Negative authorization) Authorization plugin should reject client request")
- (is (thrown? TTransportException
- (.activate nimbus_client "security_auth_test_topology")))
- (.close client)))
+;
+;
+;(deftest anonymous-authentication-test
+; (launch-server-w-wait 6625 1000 "" nil "backtype.storm.security.auth.AnonymousSaslTransportPlugin"
+; "../storm-auth-plugin/target/storm-auth-plugin-sample-1.0-SNAPSHOT.jar")
+;
+; (log-message "(Positive authentication) Server and Client with anonymous authentication")
+; (let [storm-conf (merge (read-storm-config)
+; {STORM-THRIFT-TRANSPORT-PLUGIN-CLASS "backtype.storm.security.auth.AnonymousSaslTransportPlugin"
+; STORM-THRIFT-TRANSPORT-PLUGIN-JAR "../storm-auth-plugin/target/storm-auth-plugin-sample-1.0-SNAPSHOT.jar"})
+; client (NimbusClient. storm-conf "localhost" 6625 nimbus-timeout)
+; nimbus_client (.getClient client)]
+; (.activate nimbus_client "security_auth_test_topology")
+; (.close client))
+;
+; (log-message "(Negative authentication) Server: anonymous vs. Client: Digest")
+; (System/setProperty "java.security.auth.login.config" "test/clj/backtype/storm/security/auth/jaas_digest.conf")
+; (log-message "java.security.auth.login.config: " (System/getProperty "java.security.auth.login.config"))
+; (let [storm-conf (merge (read-storm-config)
+; {STORM-THRIFT-TRANSPORT-PLUGIN-CLASS "backtype.storm.security.auth.DigestSaslTransportPlugin"})]
+; (is (= "Peer indicated failure: Unsupported mechanism type DIGEST-MD5"
+; (try (NimbusClient. storm-conf "localhost" 6625 nimbus-timeout)
+; nil
+; (catch TTransportException ex (.getMessage ex)))))))
+;
+;(deftest anonymous-positive-authorization-test
+; (launch-server-w-wait 6623 1000 ""
+; "backtype.storm.security.auth.NoopAuthorizer"
+; "backtype.storm.security.auth.AnonymousSaslTransportPlugin"
+; "../storm-auth-plugin/target/storm-auth-plugin-sample-1.0-SNAPSHOT.jar")
+; (let [storm-conf (merge (read-storm-config)
+; {STORM-THRIFT-TRANSPORT-PLUGIN-CLASS "backtype.storm.security.auth.AnonymousSaslTransportPlugin"
+; STORM-THRIFT-TRANSPORT-PLUGIN-JAR "../storm-auth-plugin/target/storm-auth-plugin-sample-1.0-SNAPSHOT.jar"})
+; client (NimbusClient. storm-conf "localhost" 6623 nimbus-timeout)
+; nimbus_client (.getClient client)]
+; (log-message "(Positive authorization) Authorization plugin should accept client request")
+; (.activate nimbus_client "security_auth_test_topology")
+; (.close client)))
+;
+;(deftest anonymous-deny-authorization-test
+; (launch-server-w-wait 6624 1000 ""
+; "backtype.storm.security.auth.DenyAuthorizer"
+; "backtype.storm.security.auth.AnonymousSaslTransportPlugin"
+; "../storm-auth-plugin/target/storm-auth-plugin-sample-1.0-SNAPSHOT.jar")
+; (let [storm-conf (merge (read-storm-config)
+; {STORM-THRIFT-TRANSPORT-PLUGIN-CLASS "backtype.storm.security.auth.AnonymousSaslTransportPlugin"
+; STORM-THRIFT-TRANSPORT-PLUGIN-JAR "../storm-auth-plugin/target/storm-auth-plugin-sample-1.0-SNAPSHOT.jar"})
+; client (NimbusClient. storm-conf "localhost" 6624 nimbus-timeout)
+; nimbus_client (.getClient client)]
+; (log-message "(Negative authorization) Authorization plugin should reject client request")
+; (is (thrown? TTransportException
+; (.activate nimbus_client "security_auth_test_topology")))
+; (.close client)))

0 comments on commit 2420876

Please sign in to comment.