Skip to content
Browse files

more test cases for authentication/authorization added

  • Loading branch information...
1 parent 2847f41 commit 1e24e9fdd97d8f05c54278ca23e4f3fbd1656135 afeng committed
View
45 src/jvm/backtype/storm/security/auth/AuthUtils.java
@@ -7,29 +7,34 @@
import java.io.IOException;
public class AuthUtils {
- public static String LoginContextServer = "StormServer";
- public static String LoginContextClient = "StormClient";
- public static final String DIGEST = "DIGEST-MD5";
- public static final String ANONYMOUS = "ANONYMOUS";
- public static final String KERBEROS = "GSSAPI";
- public static final String SERVICE = "storm_thrift_server";
- private static final Logger LOG = LoggerFactory.getLogger(AuthUtils.class);
+ public static String LoginContextServer = "StormServer";
+ public static String LoginContextClient = "StormClient";
+ public static final String DIGEST = "DIGEST-MD5";
+ public static final String ANONYMOUS = "ANONYMOUS";
+ public static final String KERBEROS = "GSSAPI";
+ public static final String SERVICE = "storm_thrift_server";
+ private static final Logger LOG = LoggerFactory.getLogger(AuthUtils.class);
-
- public static String get(Configuration configuration, String section, String key) throws IOException {
- AppConfigurationEntry configurationEntries[] = configuration.getAppConfigurationEntry(section);
- if (configurationEntries == null) {
- String errorMessage = "Could not find a '"+ section + "' entry in this configuration.";
- LOG.error(errorMessage);
- throw new IOException(errorMessage);
+ public static synchronized Configuration getConfiguration(String loginConfigurationFile) {
+ Configuration.setConfiguration(null);
+ System.setProperty("java.security.auth.login.config", loginConfigurationFile);
+ return Configuration.getConfiguration();
}
- for(AppConfigurationEntry entry: configurationEntries) {
- Object val = entry.getOptions().get(key);
- if (val != null)
- return (String)val;
+ public static String get(Configuration configuration, String section, String key) throws IOException {
+ AppConfigurationEntry configurationEntries[] = configuration.getAppConfigurationEntry(section);
+ if (configurationEntries == null) {
+ String errorMessage = "Could not find a '"+ section + "' entry in this configuration.";
+ LOG.error(errorMessage);
+ throw new IOException(errorMessage);
+ }
+
+ for(AppConfigurationEntry entry: configurationEntries) {
+ Object val = entry.getOptions().get(key);
+ if (val != null)
+ return (String)val;
+ }
+ return null;
}
- return null;
- }
}
View
160 src/jvm/backtype/storm/security/auth/SaslClientCallbackHandler.java
@@ -18,90 +18,90 @@
* SASL client side callback handler.
*/
public class SaslClientCallbackHandler implements CallbackHandler {
- private static final String USERNAME = "username";
- private static final String PASSWORD = "password";
- private static final Logger LOG = LoggerFactory.getLogger(SaslClientCallbackHandler.class);
- private String _username = null;
- private String _password = null;
+ private static final String USERNAME = "username";
+ private static final String PASSWORD = "password";
+ private static final Logger LOG = LoggerFactory.getLogger(SaslClientCallbackHandler.class);
+ private String _username = null;
+ private String _password = null;
- /**
- * Constructor based on a JAAS configuration
- *
- * For digest, you should have a pair of user name and password defined in this figgure.
- *
- * @param configuration
- * @throws IOException
- */
- public SaslClientCallbackHandler(Configuration configuration) throws IOException {
- if (configuration == null) return;
- AppConfigurationEntry configurationEntries[] = configuration.getAppConfigurationEntry(AuthUtils.LoginContextClient);
- if (configurationEntries == null) {
- String errorMessage = "Could not find a '"+AuthUtils.LoginContextClient
- + "' entry in this configuration: Client cannot start.";
- LOG.error(errorMessage);
- throw new IOException(errorMessage);
- }
+ /**
+ * Constructor based on a JAAS configuration
+ *
+ * For digest, you should have a pair of user name and password defined in this figgure.
+ *
+ * @param configuration
+ * @throws IOException
+ */
+ public SaslClientCallbackHandler(Configuration configuration) throws IOException {
+ if (configuration == null) return;
+ AppConfigurationEntry configurationEntries[] = configuration.getAppConfigurationEntry(AuthUtils.LoginContextClient);
+ if (configurationEntries == null) {
+ String errorMessage = "Could not find a '"+AuthUtils.LoginContextClient
+ + "' entry in this configuration: Client cannot start.";
+ LOG.error(errorMessage);
+ throw new IOException(errorMessage);
+ }
- for(AppConfigurationEntry entry: configurationEntries) {
- if (entry.getOptions().get(USERNAME) != null) {
- _username = (String)entry.getOptions().get(USERNAME);
- }
- if (entry.getOptions().get(PASSWORD) != null) {
- _password = (String)entry.getOptions().get(PASSWORD);
- }
+ for(AppConfigurationEntry entry: configurationEntries) {
+ if (entry.getOptions().get(USERNAME) != null) {
+ _username = (String)entry.getOptions().get(USERNAME);
+ }
+ if (entry.getOptions().get(PASSWORD) != null) {
+ _password = (String)entry.getOptions().get(PASSWORD);
+ }
+ }
}
- }
- /**
- * This method is invoked by SASL for authentication challenges
- * @param callbacks a collection of challenge callbacks
- */
- public void handle(Callback[] callbacks) throws IOException, UnsupportedCallbackException {
- for (Callback c : callbacks) {
- if (c instanceof NameCallback) {
- LOG.debug("name callback");
- NameCallback nc = (NameCallback) c;
- nc.setName(_username);
- } else if (c instanceof PasswordCallback) {
- LOG.debug("pwd callback");
- PasswordCallback pc = (PasswordCallback)c;
- if (_password != null) {
- pc.setPassword(_password.toCharArray());
- } else {
- LOG.warn("Could not login: the client is being asked for a password, but the " +
- " client code does not currently support obtaining a password from the user." +
- " Make sure that the client is configured to use a ticket cache (using" +
- " the JAAS configuration setting 'useTicketCache=true)' and restart the client. If" +
- " you still get this message after that, the TGT in the ticket cache has expired and must" +
- " be manually refreshed. To do so, first determine if you are using a password or a" +
- " keytab. If the former, run kinit in a Unix shell in the environment of the user who" +
- " is running this client using the command" +
- " 'kinit <princ>' (where <princ> is the name of the client's Kerberos principal)." +
- " If the latter, do" +
- " 'kinit -k -t <keytab> <princ>' (where <princ> is the name of the Kerberos principal, and" +
- " <keytab> is the location of the keytab file). After manually refreshing your cache," +
- " restart this client. If you continue to see this message after manually refreshing" +
- " your cache, ensure that your KDC host's clock is in sync with this host's clock.");
- }
- } else if (c instanceof AuthorizeCallback) {
- LOG.debug("authorization callback");
- AuthorizeCallback ac = (AuthorizeCallback) c;
- String authid = ac.getAuthenticationID();
- String authzid = ac.getAuthorizationID();
- if (authid.equals(authzid)) {
- ac.setAuthorized(true);
- } else {
- ac.setAuthorized(false);
- }
- if (ac.isAuthorized()) {
- ac.setAuthorizedID(authzid);
+ /**
+ * This method is invoked by SASL for authentication challenges
+ * @param callbacks a collection of challenge callbacks
+ */
+ public void handle(Callback[] callbacks) throws IOException, UnsupportedCallbackException {
+ for (Callback c : callbacks) {
+ if (c instanceof NameCallback) {
+ LOG.debug("name callback");
+ NameCallback nc = (NameCallback) c;
+ nc.setName(_username);
+ } else if (c instanceof PasswordCallback) {
+ LOG.debug("password callback");
+ PasswordCallback pc = (PasswordCallback)c;
+ if (_password != null) {
+ pc.setPassword(_password.toCharArray());
+ } else {
+ LOG.warn("Could not login: the client is being asked for a password, but the " +
+ " client code does not currently support obtaining a password from the user." +
+ " Make sure that the client is configured to use a ticket cache (using" +
+ " the JAAS configuration setting 'useTicketCache=true)' and restart the client. If" +
+ " you still get this message after that, the TGT in the ticket cache has expired and must" +
+ " be manually refreshed. To do so, first determine if you are using a password or a" +
+ " keytab. If the former, run kinit in a Unix shell in the environment of the user who" +
+ " is running this client using the command" +
+ " 'kinit <princ>' (where <princ> is the name of the client's Kerberos principal)." +
+ " If the latter, do" +
+ " 'kinit -k -t <keytab> <princ>' (where <princ> is the name of the Kerberos principal, and" +
+ " <keytab> is the location of the keytab file). After manually refreshing your cache," +
+ " restart this client. If you continue to see this message after manually refreshing" +
+ " your cache, ensure that your KDC host's clock is in sync with this host's clock.");
+ }
+ } else if (c instanceof AuthorizeCallback) {
+ LOG.debug("authorization callback");
+ AuthorizeCallback ac = (AuthorizeCallback) c;
+ String authid = ac.getAuthenticationID();
+ String authzid = ac.getAuthorizationID();
+ if (authid.equals(authzid)) {
+ ac.setAuthorized(true);
+ } else {
+ ac.setAuthorized(false);
+ }
+ if (ac.isAuthorized()) {
+ ac.setAuthorizedID(authzid);
+ }
+ } else if (c instanceof RealmCallback) {
+ RealmCallback rc = (RealmCallback) c;
+ ((RealmCallback) c).setText(rc.getDefaultText());
+ } else {
+ throw new UnsupportedCallbackException(c);
+ }
}
- } else if (c instanceof RealmCallback) {
- RealmCallback rc = (RealmCallback) c;
- ((RealmCallback) c).setText(rc.getDefaultText());
- } else {
- throw new UnsupportedCallbackException(c);
- }
}
- }
}
View
172 src/jvm/backtype/storm/security/auth/SaslServerCallbackHandler.java
@@ -20,107 +20,107 @@
* SASL server side collback handler
*/
public class SaslServerCallbackHandler implements CallbackHandler {
- private static final String USER_PREFIX = "user_";
- private static final Logger LOG = LoggerFactory.getLogger(SaslServerCallbackHandler.class);
- private static final String SYSPROP_SUPER_PASSWORD = "storm.SASLAuthenticationProvider.superPassword";
- private static final String SYSPROP_REMOVE_HOST = "storm.kerberos.removeHostFromPrincipal";
- private static final String SYSPROP_REMOVE_REALM = "storm.kerberos.removeRealmFromPrincipal";
+ private static final String USER_PREFIX = "user_";
+ private static final Logger LOG = LoggerFactory.getLogger(SaslServerCallbackHandler.class);
+ private static final String SYSPROP_SUPER_PASSWORD = "storm.SASLAuthenticationProvider.superPassword";
+ private static final String SYSPROP_REMOVE_HOST = "storm.kerberos.removeHostFromPrincipal";
+ private static final String SYSPROP_REMOVE_REALM = "storm.kerberos.removeRealmFromPrincipal";
- private String userName;
- private final Map<String,String> credentials = new HashMap<String,String>();
+ private String userName;
+ private final Map<String,String> credentials = new HashMap<String,String>();
- public SaslServerCallbackHandler(Configuration configuration) throws IOException {
- if (configuration==null) return;
+ public SaslServerCallbackHandler(Configuration configuration) throws IOException {
+ if (configuration==null) return;
- AppConfigurationEntry configurationEntries[] = configuration.getAppConfigurationEntry(AuthUtils.LoginContextServer);
- if (configurationEntries == null) {
- String errorMessage = "Could not find a '"+AuthUtils.LoginContextServer+"' entry in this configuration: Server cannot start.";
- LOG.error(errorMessage);
- throw new IOException(errorMessage);
- }
- credentials.clear();
- for(AppConfigurationEntry entry: configurationEntries) {
- Map<String,?> options = entry.getOptions();
- // Populate DIGEST-MD5 user -> password map with JAAS configuration entries from the "Server" section.
- // Usernames are distinguished from other options by prefixing the username with a "user_" prefix.
- for(Map.Entry<String, ?> pair : options.entrySet()) {
- String key = pair.getKey();
- if (key.startsWith(USER_PREFIX)) {
- String userName = key.substring(USER_PREFIX.length());
- credentials.put(userName,(String)pair.getValue());
+ AppConfigurationEntry configurationEntries[] = configuration.getAppConfigurationEntry(AuthUtils.LoginContextServer);
+ if (configurationEntries == null) {
+ String errorMessage = "Could not find a '"+AuthUtils.LoginContextServer+"' entry in this configuration: Server cannot start.";
+ LOG.error(errorMessage);
+ throw new IOException(errorMessage);
+ }
+ credentials.clear();
+ for(AppConfigurationEntry entry: configurationEntries) {
+ Map<String,?> options = entry.getOptions();
+ // Populate DIGEST-MD5 user -> password map with JAAS configuration entries from the "Server" section.
+ // Usernames are distinguished from other options by prefixing the username with a "user_" prefix.
+ for(Map.Entry<String, ?> pair : options.entrySet()) {
+ String key = pair.getKey();
+ if (key.startsWith(USER_PREFIX)) {
+ String userName = key.substring(USER_PREFIX.length());
+ credentials.put(userName,(String)pair.getValue());
+ }
+ }
}
- }
}
- }
- public void handle(Callback[] callbacks) throws UnsupportedCallbackException {
- for (Callback callback : callbacks) {
- if (callback instanceof NameCallback) {
- handleNameCallback((NameCallback) callback);
- } else if (callback instanceof PasswordCallback) {
- handlePasswordCallback((PasswordCallback) callback);
- } else if (callback instanceof RealmCallback) {
- handleRealmCallback((RealmCallback) callback);
- } else if (callback instanceof AuthorizeCallback) {
- handleAuthorizeCallback((AuthorizeCallback) callback);
- }
+ public void handle(Callback[] callbacks) throws UnsupportedCallbackException {
+ for (Callback callback : callbacks) {
+ if (callback instanceof NameCallback) {
+ handleNameCallback((NameCallback) callback);
+ } else if (callback instanceof PasswordCallback) {
+ handlePasswordCallback((PasswordCallback) callback);
+ } else if (callback instanceof RealmCallback) {
+ handleRealmCallback((RealmCallback) callback);
+ } else if (callback instanceof AuthorizeCallback) {
+ handleAuthorizeCallback((AuthorizeCallback) callback);
+ }
+ }
}
- }
- private void handleNameCallback(NameCallback nc) {
- userName = nc.getDefaultName();
- nc.setName(nc.getDefaultName());
- }
+ private void handleNameCallback(NameCallback nc) {
+ userName = nc.getDefaultName();
+ nc.setName(nc.getDefaultName());
+ }
- private void handlePasswordCallback(PasswordCallback pc) {
- if ("super".equals(this.userName) && System.getProperty(SYSPROP_SUPER_PASSWORD) != null) {
- // superuser: use Java system property for password, if available.
- pc.setPassword(System.getProperty(SYSPROP_SUPER_PASSWORD).toCharArray());
- } else if (credentials.containsKey(userName) ) {
- pc.setPassword(credentials.get(userName).toCharArray());
- } else {
- LOG.warn("No password found for user: " + userName);
+ private void handlePasswordCallback(PasswordCallback pc) {
+ if ("super".equals(this.userName) && System.getProperty(SYSPROP_SUPER_PASSWORD) != null) {
+ // superuser: use Java system property for password, if available.
+ pc.setPassword(System.getProperty(SYSPROP_SUPER_PASSWORD).toCharArray());
+ } else if (credentials.containsKey(userName) ) {
+ pc.setPassword(credentials.get(userName).toCharArray());
+ } else {
+ LOG.warn("No password found for user: " + userName);
+ }
}
- }
- private void handleRealmCallback(RealmCallback rc) {
- LOG.debug("handleRealmCallback: "+ rc.getDefaultText());
- rc.setText(rc.getDefaultText());
- }
+ private void handleRealmCallback(RealmCallback rc) {
+ LOG.debug("handleRealmCallback: "+ rc.getDefaultText());
+ rc.setText(rc.getDefaultText());
+ }
- private void handleAuthorizeCallback(AuthorizeCallback ac) {
- String authenticationID = ac.getAuthenticationID();
- LOG.debug("Successfully authenticated client: authenticationID=" + authenticationID);
- ac.setAuthorized(true);
+ private void handleAuthorizeCallback(AuthorizeCallback ac) {
+ String authenticationID = ac.getAuthenticationID();
+ LOG.debug("Successfully authenticated client: authenticationID=" + authenticationID);
+ ac.setAuthorized(true);
- // canonicalize authorization id according to system properties:
- // storm.kerberos.removeRealmFromPrincipal(={true,false})
- // storm.kerberos.removeHostFromPrincipal(={true,false})
- KerberosName kerberosName = new KerberosName(authenticationID);
- try {
- StringBuilder userNameBuilder = new StringBuilder(kerberosName.getShortName());
- if (shouldAppendHost(kerberosName)) {
- userNameBuilder.append("/").append(kerberosName.getHostName());
- }
- if (shouldAppendRealm(kerberosName)) {
- userNameBuilder.append("@").append(kerberosName.getRealm());
- }
- LOG.debug("Setting authorizedID: " + userNameBuilder);
- ac.setAuthorizedID(userNameBuilder.toString());
- } catch (IOException e) {
- LOG.error("Failed to set name based on Kerberos authentication rules.");
+ // canonicalize authorization id according to system properties:
+ // storm.kerberos.removeRealmFromPrincipal(={true,false})
+ // storm.kerberos.removeHostFromPrincipal(={true,false})
+ KerberosName kerberosName = new KerberosName(authenticationID);
+ try {
+ StringBuilder userNameBuilder = new StringBuilder(kerberosName.getShortName());
+ if (shouldAppendHost(kerberosName)) {
+ userNameBuilder.append("/").append(kerberosName.getHostName());
+ }
+ if (shouldAppendRealm(kerberosName)) {
+ userNameBuilder.append("@").append(kerberosName.getRealm());
+ }
+ LOG.debug("Setting authorizedID: " + userNameBuilder);
+ ac.setAuthorizedID(userNameBuilder.toString());
+ } catch (IOException e) {
+ LOG.error("Failed to set name based on Kerberos authentication rules.");
+ }
}
- }
- private boolean shouldAppendRealm(KerberosName kerberosName) {
- return !isSystemPropertyTrue(SYSPROP_REMOVE_REALM) && kerberosName.getRealm() != null;
- }
+ private boolean shouldAppendRealm(KerberosName kerberosName) {
+ return !isSystemPropertyTrue(SYSPROP_REMOVE_REALM) && kerberosName.getRealm() != null;
+ }
- private boolean shouldAppendHost(KerberosName kerberosName) {
- return !isSystemPropertyTrue(SYSPROP_REMOVE_HOST) && kerberosName.getHostName() != null;
- }
+ private boolean shouldAppendHost(KerberosName kerberosName) {
+ return !isSystemPropertyTrue(SYSPROP_REMOVE_HOST) && kerberosName.getHostName() != null;
+ }
- private boolean isSystemPropertyTrue(String propertyName) {
- return "true".equals(System.getProperty(propertyName));
- }
+ private boolean isSystemPropertyTrue(String propertyName) {
+ return "true".equals(System.getProperty(propertyName));
+ }
}
View
232 src/jvm/backtype/storm/security/auth/ThriftClient.java
@@ -16,140 +16,138 @@
import org.apache.thrift7.transport.TTransport;
import org.apache.thrift7.transport.TSaslClientTransport;
import org.apache.zookeeper.Login;
-import org.apache.zookeeper.server.auth.KerberosName;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import backtype.storm.utils.Utils;
public class ThriftClient {
- private static final Logger LOG = LoggerFactory.getLogger(ThriftClient.class);
- private TTransport _transport;
- protected TProtocol _protocol;
+ private static final Logger LOG = LoggerFactory.getLogger(ThriftClient.class);
+ private TTransport _transport;
+ protected TProtocol _protocol;
- static {
- java.security.Security.addProvider(new AnonymousAuthenticationProvider());
- }
+ static {
+ java.security.Security.addProvider(new AnonymousAuthenticationProvider());
+ }
- public ThriftClient(String host, int port, String default_service_name) {
- this(host, port, default_service_name, null);
- }
+ public ThriftClient(String host, int port, String default_service_name) {
+ this(host, port, default_service_name, null);
+ }
- public ThriftClient(String host, int port, String default_service_name, Integer timeout) {
- try {
- if(host==null) {
- throw new IllegalArgumentException("host is not set");
- }
- if(port<=0) {
- throw new IllegalArgumentException("invalid port: "+port);
- }
+ public ThriftClient(String host, int port, String default_service_name, Integer timeout) {
+ try {
+ if(host==null) {
+ throw new IllegalArgumentException("host is not set");
+ }
+ if(port<=0) {
+ throw new IllegalArgumentException("invalid port: "+port);
+ }
- TSocket socket = new TSocket(host, port);
- if(timeout!=null) {
- socket.setTimeout(timeout);
- }
- final TTransport underlyingTransport = socket;
+ TSocket socket = new TSocket(host, port);
+ if(timeout!=null) {
+ socket.setTimeout(timeout);
+ }
+ final TTransport underlyingTransport = socket;
- String loginConfigurationFile = System.getProperty("java.security.auth.login.config");
- if ((loginConfigurationFile==null) || (loginConfigurationFile.length()==0)) {
- //apply Storm configuration for JAAS login
- Map conf = Utils.readStormConfig();
- loginConfigurationFile = (String)conf.get("java.security.auth.login.config");
- }
- if ((loginConfigurationFile==null) || (loginConfigurationFile.length()==0)) { //ANONYMOUS
- LOG.info("SASL ANONYMOUS client transport is being established");
- _transport = new TSaslClientTransport(AuthUtils.ANONYMOUS,
- null,
- AuthUtils.SERVICE,
- host,
- null,
- null,
- underlyingTransport);
- _transport.open();
- } else {
- LOG.debug("Use jaas login config:"+loginConfigurationFile);
- System.setProperty("java.security.auth.login.config", loginConfigurationFile);
- Configuration auth_conf = Configuration.getConfiguration();
+ String loginConfigurationFile = System.getProperty("java.security.auth.login.config");
+ if ((loginConfigurationFile==null) || (loginConfigurationFile.length()==0)) {
+ //apply Storm configuration for JAAS login
+ Map conf = Utils.readStormConfig();
+ loginConfigurationFile = (String)conf.get("java.security.auth.login.config");
+ }
+ if ((loginConfigurationFile==null) || (loginConfigurationFile.length()==0)) { //ANONYMOUS
+ LOG.info("SASL ANONYMOUS client transport is being established");
+ _transport = new TSaslClientTransport(AuthUtils.ANONYMOUS,
+ null,
+ AuthUtils.SERVICE,
+ host,
+ null,
+ null,
+ underlyingTransport);
+ _transport.open();
+ } else {
+ //retrieve authentication configuration from java.security.auth.login.config
+ Configuration auth_conf = AuthUtils.getConfiguration(loginConfigurationFile);
- //login our user
- SaslClientCallbackHandler callback_handler = new SaslClientCallbackHandler(auth_conf);
- Login login = new Login(AuthUtils.LoginContextClient, callback_handler);
+ //login our user
+ SaslClientCallbackHandler callback_handler = new SaslClientCallbackHandler(auth_conf);
+ Login login = new Login(AuthUtils.LoginContextClient, callback_handler);
- final Subject subject = login.getSubject();
- if (subject.getPrivateCredentials(KerberosTicket.class).isEmpty()) { //DIGEST-MD5
- LOG.debug("SASL DIGEST-MD5 client transport is being established");
- _transport = new TSaslClientTransport(AuthUtils.DIGEST,
- null,
- AuthUtils.SERVICE,
- host,
- null,
- callback_handler,
- underlyingTransport);
- _transport.open();
- } else { //GSSAPI
- final String principal = getPrincipal(subject);
- String serviceName = AuthUtils.get(auth_conf, AuthUtils.LoginContextClient, "serviceName");
- if (serviceName == null) {
- serviceName = default_service_name;
- }
- Map<String, String> props = new TreeMap<String,String>();
- props.put(Sasl.QOP, "auth");
- props.put(Sasl.SERVER_AUTH, "false");
- LOG.debug("SASL GSSAPI client transport is being established");
- _transport = new TSaslClientTransport(AuthUtils.KERBEROS,
- principal,
- serviceName,
- host,
- props,
- null,
- underlyingTransport);
+ final Subject subject = login.getSubject();
+ if (subject.getPrivateCredentials(KerberosTicket.class).isEmpty()) { //DIGEST-MD5
+ LOG.debug("SASL DIGEST-MD5 client transport is being established");
+ _transport = new TSaslClientTransport(AuthUtils.DIGEST,
+ null,
+ AuthUtils.SERVICE,
+ host,
+ null,
+ callback_handler,
+ underlyingTransport);
+ _transport.open();
+ } else { //GSSAPI
+ final String principal = getPrincipal(subject);
+ String serviceName = AuthUtils.get(auth_conf, AuthUtils.LoginContextClient, "serviceName");
+ if (serviceName == null) {
+ serviceName = default_service_name;
+ }
+ Map<String, String> props = new TreeMap<String,String>();
+ props.put(Sasl.QOP, "auth");
+ props.put(Sasl.SERVER_AUTH, "false");
+ LOG.debug("SASL GSSAPI client transport is being established");
+ _transport = new TSaslClientTransport(AuthUtils.KERBEROS,
+ principal,
+ serviceName,
+ host,
+ props,
+ null,
+ underlyingTransport);
- //open Sasl transport with the login credential
- try {
- Subject.doAs(subject,
- new PrivilegedExceptionAction<Void>() {
- public Void run() {
- try {
- LOG.debug("do as:"+ principal);
- _transport.open();
- }
- catch (Exception e) {
- LOG.error("Nimbus client failed to open SaslClientTransport to interact with a server during session initiation: " + e, e);
- e.printStackTrace();
- }
- return null;
- }
- });
- } catch (PrivilegedActionException e) {
- LOG.error("Nimbus client experienced a PrivilegedActionException exception while creating a TSaslClientTransport using a JAAS principal context:" + e, e);
- e.printStackTrace();
- }
- }
+ //open Sasl transport with the login credential
+ try {
+ Subject.doAs(subject,
+ new PrivilegedExceptionAction<Void>() {
+ public Void run() {
+ try {
+ LOG.debug("do as:"+ principal);
+ _transport.open();
+ }
+ catch (Exception e) {
+ LOG.error("Nimbus client failed to open SaslClientTransport to interact with a server during session initiation: " + e, e);
+ e.printStackTrace();
+ }
+ return null;
+ }
+ });
+ } catch (PrivilegedActionException e) {
+ LOG.error("Nimbus client experienced a PrivilegedActionException exception while creating a TSaslClientTransport using a JAAS principal context:" + e, e);
+ e.printStackTrace();
+ }
+ }
- }
- } catch (Exception e) {
- LOG.error(e.getMessage());
- throw new RuntimeException(e);
- }
+ }
+ } catch (Exception e) {
+ LOG.error(e.getMessage());
+ throw new RuntimeException(e);
+ }
- _protocol = null;
- if (_transport != null)
- _protocol = new TBinaryProtocol(_transport);
- }
+ _protocol = null;
+ if (_transport != null)
+ _protocol = new TBinaryProtocol(_transport);
+ }
- private String getPrincipal(Subject subject) {
- Set<Principal> principals = (Set<Principal>)subject.getPrincipals();
- if (principals==null || principals.size()<1) {
- LOG.info("No principal found in login subject");
- return null;
+ private String getPrincipal(Subject subject) {
+ Set<Principal> principals = (Set<Principal>)subject.getPrincipals();
+ if (principals==null || principals.size()<1) {
+ LOG.info("No principal found in login subject");
+ return null;
+ }
+ return ((Principal)(principals.toArray()[0])).getName();
}
- return ((Principal)(principals.toArray()[0])).getName();
- }
- public TTransport transport() {
- return _transport;
- }
+ public TTransport transport() {
+ return _transport;
+ }
- public void close() {
- _transport.close();
- }
+ public void close() {
+ _transport.close();
+ }
}
View
381 src/jvm/backtype/storm/security/auth/ThriftServer.java
@@ -34,221 +34,218 @@
import backtype.storm.utils.Utils;
public class ThriftServer {
- static {
- java.security.Security.addProvider(new AnonymousAuthenticationProvider());
- }
-
- private TProcessor _processor = null;
- private int _port = 0;
- private TServer _server;
- private static final Logger LOG = LoggerFactory.getLogger(ThriftServer.class);
- private String _loginConfigurationFile;
-
- public ThriftServer(TProcessor processor, int port) {
- try {
- _processor = processor;
- _port = port;
-
- _loginConfigurationFile = System.getProperty("java.security.auth.login.config");
- if ((_loginConfigurationFile==null) || (_loginConfigurationFile.length()==0)) {
- //apply Storm configuration for JAAS login
- Map conf = Utils.readStormConfig();
- _loginConfigurationFile = (String)conf.get("java.security.auth.login.config");
- if ((_loginConfigurationFile!=null) && (_loginConfigurationFile.length()>0)) {
- System.setProperty("java.security.auth.login.config", _loginConfigurationFile);
+ static {
+ java.security.Security.addProvider(new AnonymousAuthenticationProvider());
+ }
+
+ private TProcessor _processor = null;
+ private int _port = 0;
+ private TServer _server;
+ private static final Logger LOG = LoggerFactory.getLogger(ThriftServer.class);
+ private String _loginConfigurationFile;
+
+ public ThriftServer(TProcessor processor, int port) {
+ try {
+ _processor = processor;
+ _port = port;
+
+ _loginConfigurationFile = System.getProperty("java.security.auth.login.config");
+ if ((_loginConfigurationFile==null) || (_loginConfigurationFile.length()==0)) {
+ //apply Storm configuration for JAAS login
+ Map conf = Utils.readStormConfig();
+ _loginConfigurationFile = (String)conf.get("java.security.auth.login.config");
+ }
+ } catch (Exception x) {
+ x.printStackTrace();
}
- }
- } catch (Exception x) {
- x.printStackTrace();
}
- }
-
- public void stop() {
- if (_server != null)
- _server.stop();
- }
- public void serve() {
- TServerTransport serverTransport = null;
+ public void stop() {
+ if (_server != null)
+ _server.stop();
+ }
+
+ public void serve() {
+ TServerTransport serverTransport = null;
- try {
- TSaslServerTransport.Factory factory = new TSaslServerTransport.Factory();
- serverTransport = new TServerSocket(_port);
+ try {
+ TSaslServerTransport.Factory factory = new TSaslServerTransport.Factory();
+ serverTransport = new TServerSocket(_port);
- if ((_loginConfigurationFile==null) || (_loginConfigurationFile.length()==0)) { //ANONYMOUS
- factory.addServerDefinition(AuthUtils.ANONYMOUS, AuthUtils.SERVICE, "localhost", null, null);
+ if ((_loginConfigurationFile==null) || (_loginConfigurationFile.length()==0)) { //ANONYMOUS
+ factory.addServerDefinition(AuthUtils.ANONYMOUS, AuthUtils.SERVICE, "localhost", null, null);
- LOG.info("Starting SASL ANONYMOUS server at port:" + _port);
- _server = new TThreadPoolServer(new TThreadPoolServer.Args(serverTransport).
+ LOG.info("Starting SASL ANONYMOUS server at port:" + _port);
+ _server = new TThreadPoolServer(new TThreadPoolServer.Args(serverTransport).
processor(new SaslProcessor(_processor)).
transportFactory(factory).
minWorkerThreads(64).
maxWorkerThreads(64).
protocolFactory(new TBinaryProtocol.Factory()));
- } else {
- //retrieve authentication configuration from java.security.auth.login.config
- Configuration auth_conf = Configuration.getConfiguration();
-
- //login our user
- CallbackHandler auth_callback_handler = new SaslServerCallbackHandler(auth_conf);
- Login login = new Login(AuthUtils.LoginContextServer, auth_callback_handler);
- Subject subject = login.getSubject();
-
- if (!subject.getPrivateCredentials(KerberosTicket.class).isEmpty()) { //KERBEROS
- String principal = AuthUtils.get(auth_conf, AuthUtils.LoginContextServer, "principal");
- LOG.debug("principal:"+principal);
- KerberosName serviceKerberosName = new KerberosName(principal);
- String serviceName = serviceKerberosName.getServiceName();
- String hostName = serviceKerberosName.getHostName();
- Map<String, String> props = new TreeMap<String,String>();
- props.put(Sasl.QOP, "auth");
- props.put(Sasl.SERVER_AUTH, "false");
- factory.addServerDefinition(AuthUtils.KERBEROS, serviceName, hostName, props, auth_callback_handler);
- LOG.info("Starting KERBEROS server at port:" + _port);
- //create a wrap transport factory so that we could apply user credential during connections
- TUGIAssumingTransportFactory wrapFactory = new TUGIAssumingTransportFactory(factory, subject);
- _server = new TThreadPoolServer(new TThreadPoolServer.Args(serverTransport).
- processor(new SaslProcessor(_processor)).
- minWorkerThreads(64).
- maxWorkerThreads(64).
- transportFactory(wrapFactory).
- protocolFactory(new TBinaryProtocol.Factory()));
- } else { //DIGEST
- factory.addServerDefinition(AuthUtils.DIGEST, AuthUtils.SERVICE, "localhost", null, auth_callback_handler);
- LOG.info("Starting DIGEST server at port:" + _port);
- _server = new TThreadPoolServer(new TThreadPoolServer.Args(serverTransport).
- processor(new SaslProcessor(_processor)).
- minWorkerThreads(64).
- maxWorkerThreads(64).
- transportFactory(factory).
- protocolFactory(new TBinaryProtocol.Factory()));
- }
- }
-
- _server.serve();
- } catch (Exception ex) {
- LOG.error("ThriftServer is being stopped due to: " + ex, ex);
- if (_server != null) _server.stop();
- System.exit(1); //shutdown server process since we could not handle Thrift requests any more
- }
- }
-
- /**
- * Processor that pulls the SaslServer object out of the transport, and
- * assumes the remote user's UGI before calling through to the original
- * processor.
- *
- * This is used on the server side to set the UGI for each specific call.
- */
- private class SaslProcessor implements TProcessor {
- final TProcessor wrapped;
-
- SaslProcessor(TProcessor wrapped) {
- this.wrapped = wrapped;
+ } else {
+ //retrieve authentication configuration from java.security.auth.login.config
+ Configuration auth_conf = AuthUtils.getConfiguration(_loginConfigurationFile);
+
+ //login our user
+ CallbackHandler auth_callback_handler = new SaslServerCallbackHandler(auth_conf);
+ Login login = new Login(AuthUtils.LoginContextServer, auth_callback_handler);
+ Subject subject = login.getSubject();
+
+ if (!subject.getPrivateCredentials(KerberosTicket.class).isEmpty()) { //KERBEROS
+ String principal = AuthUtils.get(auth_conf, AuthUtils.LoginContextServer, "principal");
+ LOG.debug("principal:"+principal);
+ KerberosName serviceKerberosName = new KerberosName(principal);
+ String serviceName = serviceKerberosName.getServiceName();
+ String hostName = serviceKerberosName.getHostName();
+ Map<String, String> props = new TreeMap<String,String>();
+ props.put(Sasl.QOP, "auth");
+ props.put(Sasl.SERVER_AUTH, "false");
+ factory.addServerDefinition(AuthUtils.KERBEROS, serviceName, hostName, props, auth_callback_handler);
+ LOG.info("Starting KERBEROS server at port:" + _port);
+ //create a wrap transport factory so that we could apply user credential during connections
+ TUGIAssumingTransportFactory wrapFactory = new TUGIAssumingTransportFactory(factory, subject);
+ _server = new TThreadPoolServer(new TThreadPoolServer.Args(serverTransport).
+ processor(new SaslProcessor(_processor)).
+ minWorkerThreads(64).
+ maxWorkerThreads(64).
+ transportFactory(wrapFactory).
+ protocolFactory(new TBinaryProtocol.Factory()));
+ } else { //DIGEST
+ factory.addServerDefinition(AuthUtils.DIGEST, AuthUtils.SERVICE, "localhost", null, auth_callback_handler);
+ LOG.info("Starting DIGEST server at port:" + _port);
+ _server = new TThreadPoolServer(new TThreadPoolServer.Args(serverTransport).
+ processor(new SaslProcessor(_processor)).
+ minWorkerThreads(64).
+ maxWorkerThreads(64).
+ transportFactory(factory).
+ protocolFactory(new TBinaryProtocol.Factory()));
+ }
+ }
+
+ _server.serve();
+ } catch (Exception ex) {
+ LOG.error("ThriftServer is being stopped due to: " + ex, ex);
+ if (_server != null) _server.stop();
+ System.exit(1); //shutdown server process since we could not handle Thrift requests any more
+ }
}
- public boolean process(final TProtocol inProt, final TProtocol outProt) throws TException {
- TTransport trans = inProt.getTransport();
- if (!(trans instanceof TSaslServerTransport)) {
- throw new TException("Unexpected non-SASL transport " + trans.getClass());
- }
- TSaslServerTransport saslTrans = (TSaslServerTransport)trans;
-
- //populating request context
- ReqContext req_context = ReqContext.context();
-
- //remote address
- TSocket tsocket = (TSocket)saslTrans.getUnderlyingTransport();
- Socket socket = tsocket.getSocket();
- req_context.setRemoteAddress(socket.getInetAddress());
-
- //remote subject
- SaslServer saslServer = saslTrans.getSaslServer();
- String authId = saslServer.getAuthorizationID();
- LOG.debug("AUTH ID ======>" + authId);
- Subject remoteUser = new Subject();
- remoteUser.getPrincipals().add(new User(authId));
- req_context.setSubject(remoteUser);
-
- //invoke application logic
- return wrapped.process(inProt, outProt);
- }
- }
+ /**
+ * Processor that pulls the SaslServer object out of the transport, and
+ * assumes the remote user's UGI before calling through to the original
+ * processor.
+ *
+ * This is used on the server side to set the UGI for each specific call.
+ */
+ private class SaslProcessor implements TProcessor {
+ final TProcessor wrapped;
- static class User implements Principal {
- private final String name;
+ SaslProcessor(TProcessor wrapped) {
+ this.wrapped = wrapped;
+ }
- public User(String name) {
- this.name = name;
+ public boolean process(final TProtocol inProt, final TProtocol outProt) throws TException {
+ TTransport trans = inProt.getTransport();
+ if (!(trans instanceof TSaslServerTransport)) {
+ throw new TException("Unexpected non-SASL transport " + trans.getClass());
+ }
+ TSaslServerTransport saslTrans = (TSaslServerTransport)trans;
+
+ //populating request context
+ ReqContext req_context = ReqContext.context();
+
+ //remote address
+ TSocket tsocket = (TSocket)saslTrans.getUnderlyingTransport();
+ Socket socket = tsocket.getSocket();
+ req_context.setRemoteAddress(socket.getInetAddress());
+
+ //remote subject
+ SaslServer saslServer = saslTrans.getSaslServer();
+ String authId = saslServer.getAuthorizationID();
+ LOG.debug("AUTH ID ======>" + authId);
+ Subject remoteUser = new Subject();
+ remoteUser.getPrincipals().add(new User(authId));
+ req_context.setSubject(remoteUser);
+
+ //invoke application logic
+ return wrapped.process(inProt, outProt);
+ }
}
- /**
- * Get the full name of the user.
- */
- public String getName() {
- return name;
- }
+ static class User implements Principal {
+ private final String name;
- @Override
- public boolean equals(Object o) {
- if (this == o) {
- return true;
- } else if (o == null || getClass() != o.getClass()) {
- return false;
- } else {
- return (name.equals(((User) o).name));
- }
- }
+ public User(String name) {
+ this.name = name;
+ }
- @Override
- public int hashCode() {
- return name.hashCode();
- }
+ /**
+ * Get the full name of the user.
+ */
+ public String getName() {
+ return name;
+ }
- @Override
- public String toString() {
- return name;
- }
- }
-
- /** A TransportFactory that wraps another one, but assumes a specified UGI
- * before calling through.
- *
- * This is used on the server side to assume the server's Principal when accepting
- * clients.
- */
- static class TUGIAssumingTransportFactory extends TTransportFactory {
- private final Subject subject;
- private final TTransportFactory wrapped;
-
- public TUGIAssumingTransportFactory(TTransportFactory wrapped, Subject subject) {
- this.wrapped = wrapped;
- this.subject = subject;
-
- Set<Principal> principals = (Set<Principal>)subject.getPrincipals();
- if (principals.size()>0)
- LOG.info("Service principal:"+ ((Principal)(principals.toArray()[0])).getName());
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ } else if (o == null || getClass() != o.getClass()) {
+ return false;
+ } else {
+ return (name.equals(((User) o).name));
+ }
+ }
+
+ @Override
+ public int hashCode() {
+ return name.hashCode();
+ }
+
+ @Override
+ public String toString() {
+ return name;
+ }
}
- @Override
- public TTransport getTransport(final TTransport trans) {
- try {
- return Subject.doAs(subject,
- new PrivilegedExceptionAction<TTransport>() {
+ /** A TransportFactory that wraps another one, but assumes a specified UGI
+ * before calling through.
+ *
+ * This is used on the server side to assume the server's Principal when accepting
+ * clients.
+ */
+ static class TUGIAssumingTransportFactory extends TTransportFactory {
+ private final Subject subject;
+ private final TTransportFactory wrapped;
+
+ public TUGIAssumingTransportFactory(TTransportFactory wrapped, Subject subject) {
+ this.wrapped = wrapped;
+ this.subject = subject;
+
+ Set<Principal> principals = (Set<Principal>)subject.getPrincipals();
+ if (principals.size()>0)
+ LOG.info("Service principal:"+ ((Principal)(principals.toArray()[0])).getName());
+ }
+
+ @Override
+ public TTransport getTransport(final TTransport trans) {
+ try {
+ return Subject.doAs(subject,
+ new PrivilegedExceptionAction<TTransport>() {
public TTransport run() {
- try {
- return wrapped.getTransport(trans);
- }
- catch (Exception e) {
- LOG.error("Storm server failed to open transport to interact with a client during session initiation: " + e, e);
- return null;
- }
+ try {
+ return wrapped.getTransport(trans);
+ }
+ catch (Exception e) {
+ LOG.error("Storm server failed to open transport to interact with a client during session initiation: " + e, e);
+ return null;
+ }
}
- });
- } catch (PrivilegedActionException e) {
- LOG.error("Storm server experienced a PrivilegedActionException exception while creating a transport using a JAAS principal context:" + e, e);
- return null;
- }
+ });
+ } catch (PrivilegedActionException e) {
+ LOG.error("Storm server experienced a PrivilegedActionException exception while creating a transport using a JAAS principal context:" + e, e);
+ return null;
+ }
+ }
}
- }
}
View
140 test/clj/backtype/storm/security/auth/auth_test.clj
@@ -19,10 +19,10 @@
aznClass (if klassname (Class/forName klassname))
aznHandler (if aznClass (.newInstance aznClass))]
(log-debug "authorization class name:" klassname
- " class:" aznClass
- " handler:" aznHandler)
+ " class:" aznClass
+ " handler:" aznHandler)
aznHandler
- ))
+ ))
(defn nimbus-data [conf inimbus]
(let [forced-scheduler (.getForcedScheduler inimbus)]
@@ -50,35 +50,35 @@
req))
(defn check-authorization! [nimbus storm-name storm-conf operation]
- (let [aclHandler (:authorization-handler nimbus)]
- (log-debug "check-authorization with handler: " aclHandler)
- (if aclHandler
- (let [req (update-req-context! nimbus storm-name storm-conf operation)]
+ (let [aclHandler (:authorization-handler nimbus)]
+ (log-debug "check-authorization with handler: " aclHandler)
+ (if aclHandler
+ (let [req (update-req-context! nimbus storm-name storm-conf operation)]
(if-not (.permit aclHandler req)
(throw (RuntimeException. (str operation " on topology " storm-name " is not authorized")))
)))))
(defn dummy-service-handler [conf inimbus]
(let [nimbus (nimbus-data conf inimbus)]
- (reify Nimbus$Iface
+ (reify Nimbus$Iface
(^void submitTopologyWithOpts [this ^String storm-name ^String uploadedJarLocation ^String serializedConf ^StormTopology topology
- ^SubmitOptions submitOptions]
- (check-authorization! nimbus storm-name nil (ReqContext$OperationType/SUBMIT_TOPOLOGY)))
+ ^SubmitOptions submitOptions]
+ (check-authorization! nimbus storm-name nil (ReqContext$OperationType/SUBMIT_TOPOLOGY)))
(^void killTopology [this ^String storm-name]
- (check-authorization! nimbus storm-name nil (ReqContext$OperationType/KILL_TOPOLOGY)))
-
+ (check-authorization! nimbus storm-name nil (ReqContext$OperationType/KILL_TOPOLOGY)))
+
(^void killTopologyWithOpts [this ^String storm-name ^KillOptions options]
- (check-authorization! nimbus storm-name nil (ReqContext$OperationType/KILL_TOPOLOGY)))
+ (check-authorization! nimbus storm-name nil (ReqContext$OperationType/KILL_TOPOLOGY)))
(^void rebalance [this ^String storm-name ^RebalanceOptions options]
- (check-authorization! nimbus storm-name nil (ReqContext$OperationType/REBALANCE_TOPOLOGY)))
+ (check-authorization! nimbus storm-name nil (ReqContext$OperationType/REBALANCE_TOPOLOGY)))
(activate [this storm-name]
- (check-authorization! nimbus storm-name nil (ReqContext$OperationType/ACTIVATE_TOPOLOGY)))
+ (check-authorization! nimbus storm-name nil (ReqContext$OperationType/ACTIVATE_TOPOLOGY)))
(deactivate [this storm-name]
- (check-authorization! nimbus storm-name nil (ReqContext$OperationType/DEACTIVATE_TOPOLOGY)))
+ (check-authorization! nimbus storm-name nil (ReqContext$OperationType/DEACTIVATE_TOPOLOGY)))
(beginFileUpload [this])
@@ -103,46 +103,84 @@
(^TopologyInfo getTopologyInfo [this ^String storm-id]))))
(defn launch-test-server [server-port login-cfg aznClass]
- (System/setProperty "java.security.auth.login.config" login-cfg)
- (let [conf (merge (read-storm-config)
- {NIMBUS-AUTHORIZATION-CLASSNAME aznClass
- NIMBUS-HOST "localhost"
- NIMBUS-THRIFT-PORT server-port})
- nimbus (nimbus/standalone-nimbus)
- service-handler (dummy-service-handler conf nimbus)
- server (ThriftServer. (Nimbus$Processor. service-handler) (int (conf NIMBUS-THRIFT-PORT)))]
- (.addShutdownHook (Runtime/getRuntime) (Thread. (fn [] (.stop server))))
- (log-message "Starting Nimbus server...")
- (.serve server)))
+ (System/setProperty "java.security.auth.login.config" login-cfg)
+ (let [conf (merge (read-storm-config)
+ {NIMBUS-AUTHORIZATION-CLASSNAME aznClass
+ NIMBUS-HOST "localhost"
+ NIMBUS-THRIFT-PORT server-port})
+ nimbus (nimbus/standalone-nimbus)
+ service-handler (dummy-service-handler conf nimbus)
+ server (ThriftServer. (Nimbus$Processor. service-handler) (int (conf NIMBUS-THRIFT-PORT)))]
+ (.addShutdownHook (Runtime/getRuntime) (Thread. (fn [] (.stop server))))
+ (.serve server)))
(defn launch-server-w-wait [server-port ms login-cfg aznClass]
- (.start (Thread. #(launch-test-server server-port login-cfg aznClass)))
- (log-message "Waiting for Nimbus Server...")
- (Thread/sleep ms)
- (log-message "Continue..."))
-
-(deftest authorization-test
- (launch-server-w-wait 6627 2000 "" "backtype.storm.security.auth.DenyAuthorizer")
- (log-message "Starting Nimbus client w/ anonymous authentication")
+ (.start (Thread. #(launch-test-server server-port login-cfg aznClass)))
+ (Thread/sleep ms))
+
+(deftest anonymous-authentication-test
+ (launch-server-w-wait 6627 1000 "" nil)
+
+ (log-message "(Positive authentication) Server and Client with anonymous authentication")
(let [client (NimbusClient. "localhost" 6627)
nimbus_client (.getClient client)]
- (is (thrown? TTransportException
- (.activate nimbus_client "security_auth_test_topology")))
- (.close client)))
+ (.activate nimbus_client "security_auth_test_topology")
+ (.close client))
+
+ (log-message "(Negative authentication) Server: anonymous vs. Client: Digest")
+ (System/setProperty "java.security.auth.login.config" "test/clj/backtype/storm/security/auth/jaas_digest.conf")
+ (log-message "java.security.auth.login.config: " (System/getProperty "java.security.auth.login.config"))
+ (is (= "Peer indicated failure: Unsupported mechanism type DIGEST-MD5"
+ (try (NimbusClient. "localhost" 6627)
+ nil
+ (catch java.lang.RuntimeException ex (.getMessage (.getCause ex)))))))
+
+(deftest positive-authorization-test
+ (launch-server-w-wait 6628 1000 "" "backtype.storm.security.auth.NoopAuthorizer")
+ (let [client (NimbusClient. "localhost" 6628)
+ nimbus_client (.getClient client)]
+ (log-message "(Positive authorization) Authorization plugin should accept client request")
+ (.activate nimbus_client "security_auth_test_topology")
+ (.close client)))
-(deftest authentication-test
- (launch-server-w-wait 6628 2000 "./conf/jaas_digest.conf" "backtype.storm.security.auth.NoopAuthorizer")
+(deftest deny-authorization-test
+ (launch-server-w-wait 6629 1000 "" "backtype.storm.security.auth.DenyAuthorizer")
+ (let [client (NimbusClient. "localhost" 6629)
+ nimbus_client (.getClient client)]
+ (log-message "(Negative authorization) Authorization plugin should reject client request")
+ (is (thrown? TTransportException
+ (.activate nimbus_client "security_auth_test_topology")))
+ (.close client)))
+
+(deftest digest-authentication-test
+ (launch-server-w-wait 6630 2000 "test/clj/backtype/storm/security/auth/jaas_digest.conf" nil)
+
+ (log-message "(Positive authentication) valid digest authentication")
+ (System/setProperty "java.security.auth.login.config" "test/clj/backtype/storm/security/auth/jaas_digest.conf")
+ (let [client (NimbusClient. "localhost" 6630)
+ nimbus_client (.getClient client)]
+ (.activate nimbus_client "security_auth_test_topology")
+ (.close client))
+
+ (log-message "(Negative authentication) Server: Digest vs. Client: anonymous")
(System/setProperty "java.security.auth.login.config" "")
- (log-message "Starting Nimbus client w/ anonymous authentication (expect authentication failure")
(is (= "Peer indicated failure: Unsupported mechanism type ANONYMOUS"
- (try (NimbusClient. "localhost" 6628)
- nil
- (catch java.lang.RuntimeException ex
- (.getMessage (.getCause ex))))))
- (log-message "Starting Nimbus client w/ digest authentication (expect authentication success)")
- (System/setProperty "java.security.auth.login.config" "./conf/jaas_digest.conf")
- (let [client (NimbusClient. "localhost" 6628)
- nimbus_client (.getClient client)]
- (.activate nimbus_client "security_auth_test_topology")
- (.close client)))
+ (try (NimbusClient. "localhost" 6630)
+ nil
+ (catch java.lang.RuntimeException ex (.getMessage (.getCause ex))))))
+
+ (log-message "(Negative authentication) Invalid password")
+ (System/setProperty "java.security.auth.login.config" "test/clj/backtype/storm/security/auth/jaas_digest_bad_password.conf")
+ (is (= "Peer indicated failure: DIGEST-MD5: digest response format violation. Mismatched response."
+ (try (NimbusClient. "localhost" 6630)
+ nil
+ (catch java.lang.RuntimeException ex (.getMessage (.getCause ex))))))
+
+ (log-message "(Negative authentication) Unknown user")
+ (System/setProperty "java.security.auth.login.config" "test/clj/backtype/storm/security/auth/jaas_digest_unknown_user.conf")
+ (is (= "Peer indicated failure: DIGEST-MD5: cannot acquire password for unknown_user in realm : localhost"
+ (try (NimbusClient. "localhost" 6630)
+ nil
+ (catch java.lang.RuntimeException ex (.getMessage (.getCause ex)))))))
+
View
10 test/clj/backtype/storm/security/auth/jaas_digest.conf
@@ -0,0 +1,10 @@
+StormServer {
+ org.apache.zookeeper.server.auth.DigestLoginModule required
+ user_super="adminsecret"
+ user_bob="bobsecret";
+};
+StormClient {
+ org.apache.zookeeper.server.auth.DigestLoginModule required
+ username="bob"
+ password="bobsecret";
+};
View
10 test/clj/backtype/storm/security/auth/jaas_digest_bad_password.conf
@@ -0,0 +1,10 @@
+StormServer {
+ org.apache.zookeeper.server.auth.DigestLoginModule required
+ user_super="adminsecret"
+ user_bob="bobsecret";
+};
+StormClient {
+ org.apache.zookeeper.server.auth.DigestLoginModule required
+ username="bob"
+ password="bad_password";
+};
View
10 test/clj/backtype/storm/security/auth/jaas_digest_unknown_user.conf
@@ -0,0 +1,10 @@
+StormServer {
+ org.apache.zookeeper.server.auth.DigestLoginModule required
+ user_super="adminsecret"
+ user_bob="bobsecret";
+};
+StormClient {
+ org.apache.zookeeper.server.auth.DigestLoginModule required
+ username="unknown_user"
+ password="some_password";
+};

0 comments on commit 1e24e9f

Please sign in to comment.
Something went wrong with that request. Please try again.