Permalink
Browse files

revison per Nathan's comments on ReqContext/testing, and use wait-for…

…-condition
  • Loading branch information...
anfeng committed Mar 8, 2013
1 parent 9d2356f commit 9def4042b3ce0fdca3d2d410384ff8dedc4d00a4
@@ -4,43 +4,54 @@
import javax.security.auth.login.AppConfigurationEntry;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-
import backtype.storm.Config;
-import backtype.storm.utils.Utils;
-
import java.io.IOException;
-import java.net.URL;
-import java.net.URLClassLoader;
import java.util.Map;
public class AuthUtils {
public static final String LOGIN_CONTEXT_SERVER = "StormServer";
public static final String LOGIN_CONTEXT_CLIENT = "StormClient";
public static final String SERVICE = "storm_thrift_server";
- private static final Logger LOG = LoggerFactory.getLogger(AuthUtils.class);
/**
* Construct a JAAS configuration object per storm configuration file
* @param storm_conf Storm configuration
* @return
*/
public static synchronized Configuration GetConfiguration(Map storm_conf) {
- Configuration.setConfiguration(null);
-
- //exam system property first
+ //retrieve system property
String orig_loginConfigurationFile = System.getProperty("java.security.auth.login.config");
//try to find login file from Storm configuration
String loginConfigurationFile = (String)storm_conf.get("java.security.auth.login.config");
- if (loginConfigurationFile==null)
+ if ((loginConfigurationFile==null) || (loginConfigurationFile.length()==0))
loginConfigurationFile = orig_loginConfigurationFile;
Configuration login_conf = null;
if ((loginConfigurationFile != null) && (loginConfigurationFile.length()>0)) {
+ //We don't allow system property and storm conf have conflicts
+ if (orig_loginConfigurationFile!=null &&
+ orig_loginConfigurationFile.length()>0 &&
+ !loginConfigurationFile.equals(orig_loginConfigurationFile)) {
+ throw new RuntimeException("System property java.security.auth.login.config ("
+ + orig_loginConfigurationFile
+ +") != storm configuration java.security.auth.login.config ("
+ + loginConfigurationFile + ")");
+ }
+
+ //reset login configuration so that javax.security.auth.login will not use cache
+ Configuration.setConfiguration(null);
+
+ //use javax.security.auth.login.Configuration to obtain login configuration object
+ //login.Configuration depends on system property "java.security.auth.login.config"
+ //(see http://docs.oracle.com/javase/6/docs/jre/api/security/jaas/spec/com/sun/security/auth/login/ConfigFile.html)
System.setProperty("java.security.auth.login.config", loginConfigurationFile);
login_conf = Configuration.getConfiguration();
+ //we reset system property to previous value if any
if (orig_loginConfigurationFile!=null)
System.setProperty("java.security.auth.login.config", orig_loginConfigurationFile);
+ else
+ System.setProperty("java.security.auth.login.config", "");
}
return login_conf;
}
@@ -21,11 +21,10 @@
/**
* permit() method is invoked for each incoming Thrift request.
- * @param contrext request context includes info about
- * (1) remote address/subject,
- * (2) operation
- * (3) configuration of targeted topology
+ * @param context request context includes info about
+ * @param operation operation name
+ * @param topology_storm configuration of targeted topology
* @return true if the request is authorized, false if reject
*/
- public boolean permit(ReqContext context);
+ public boolean permit(ReqContext context, String operation, Map topology_conf);
}
@@ -12,18 +12,16 @@
/**
* context request context includes info about
- * (1) remote address/subject,
- * (2) operation
- * (3) configuration of targeted topology
+ * (1) remote address,
+ * (2) remote subject and primary principal
+ * (3) request ID
*/
public class ReqContext {
private static final AtomicInteger uniqueId = new AtomicInteger(0);
- public enum OperationType { SUBMIT_TOPOLOGY, KILL_TOPOLOGY, REBALANCE_TOPOLOGY, ACTIVATE_TOPOLOGY, DEACTIVATE_TOPOLOGY };
private Subject _subject;
private InetAddress _remoteAddr;
private Integer _reqID;
private Map _storm_conf;
- private OperationType _operation;
/**
* Get a request context associated with current thread
@@ -83,26 +81,11 @@ public Principal principal() {
if (princs.size()==0) return null;
return (Principal) (princs.toArray()[0]);
}
-
+
/**
- * Topology that this request is against
+ * request ID of this request
*/
- public Map topologyConf() {
- return _storm_conf;
- }
-
- public void setTopologyConf(Map conf) {
- _storm_conf = conf;
- }
-
- /**
- * Operation that this request is performing
- */
- public OperationType operation() {
- return _operation;
- }
-
- public void setOperation(OperationType operation) {
- _operation = operation;
+ public Integer requestID() {
+ return _reqID;
}
}
@@ -99,13 +99,13 @@ public boolean process(final TProtocol inProt, final TProtocol outProt) throws T
Subject remoteUser = new Subject();
remoteUser.getPrincipals().add(new User(authId));
req_context.setSubject(remoteUser);
-
+
//invoke service handler
return wrapped.process(inProt, outProt);
}
}
- static class User implements Principal {
+ public static class User implements Principal {
private final String name;
public User(String name) {
@@ -11,7 +11,7 @@
public class ThriftServer {
private static final Logger LOG = LoggerFactory.getLogger(ThriftServer.class);
private Map _storm_conf; //storm configuration
- private TProcessor _processor = null;
+ protected TProcessor _processor = null;
private int _port = 0;
private TServer _server = null;
private Configuration _login_conf;
@@ -34,6 +34,15 @@ public void stop() {
_server.stop();
}
+ /**
+ * Is ThriftServer listening to requests?
+ * @return
+ */
+ public boolean isServing() {
+ if (_server == null) return false;
+ return _server.isServing();
+ }
+
public void serve() {
try {
//locate our thrift transport plugin
@@ -17,25 +17,24 @@
/**
* Invoked once immediately after construction
- * @param conf Stom configuration
+ * @param conf Storm configuration
*/
public void prepare(Map conf) {
}
/**
* permit() method is invoked for each incoming Thrift request
- * @param contrext request context includes info about
- * (1) remote address/subject,
- * (2) operation
- * (3) configuration of targeted topology
+ * @param contrext request context
+ * @param operation operation name
+ * @param topology_storm configuration of targeted topology
* @return true if the request is authorized, false if reject
*/
- public boolean permit(ReqContext context) {
- LOG.info("Access "
+ public boolean permit(ReqContext context, String operation, Map topology_conf) {
+ LOG.info("[req "+ context.requestID()+ "] Access "
+ " from: " + (context.remoteAddress() == null? "null" : context.remoteAddress().toString())
+ " principal:"+ (context.principal() == null? "null" : context.principal())
- +" op:"+context.operation()
- + " topoology:"+context.topologyConf().get(Config.TOPOLOGY_NAME));
+ +" op:"+operation
+ + " topoology:"+topology_conf.get(Config.TOPOLOGY_NAME));
return false;
}
}
@@ -17,25 +17,24 @@
/**
* Invoked once immediately after construction
- * @param conf Stom configuration
+ * @param conf Storm configuration
*/
public void prepare(Map conf) {
}
/**
* permit() method is invoked for each incoming Thrift request
- * @param contrext request context includes info about
- * (1) remote address/subject,
- * (2) operation
- * (3) configuration of targeted topology
+ * @param context request context includes info about
+ * @param operation operation name
+ * @param topology_storm configuration of targeted topology
* @return true if the request is authorized, false if reject
*/
- public boolean permit(ReqContext context) {
- LOG.info("Access "
+ public boolean permit(ReqContext context, String operation, Map topology_conf) {
+ LOG.info("[req "+ context.requestID()+ "] Access "
+ " from: " + (context.remoteAddress() == null? "null" : context.remoteAddress().toString())
+ " principal:"+(context.principal() == null? "null" : context.principal())
- +" op:"+context.operation()
- + " topoology:"+ context.topologyConf().get(Config.TOPOLOGY_NAME));
+ +" op:"+ operation
+ + " topoology:"+ topology_conf.get(Config.TOPOLOGY_NAME));
return true;
}
}
Oops, something went wrong.

0 comments on commit 9def404

Please sign in to comment.