Skip to content

Commit

Permalink
Merge remote-tracking branch 'anfeng/master-auth'
Browse files Browse the repository at this point in the history
  • Loading branch information
nathanmarz committed Mar 11, 2013
2 parents 7059efc + 4b66980 commit d6374fe
Show file tree
Hide file tree
Showing 23 changed files with 1,272 additions and 30 deletions.
1 change: 1 addition & 0 deletions conf/defaults.yaml
Expand Up @@ -17,6 +17,7 @@ storm.zookeeper.retry.interval: 1000
storm.zookeeper.retry.intervalceiling.millis: 30000
storm.cluster.mode: "distributed" # can be distributed or local
storm.local.mode.zmq: false
storm.thrift.transport: "backtype.storm.security.auth.SimpleTransportPlugin"

### nimbus.* configs are for the master
nimbus.host: "localhost"
Expand Down
21 changes: 21 additions & 0 deletions conf/jaas_digest.conf
@@ -0,0 +1,21 @@
/* This is example of JAAS Login configuration for digest authentication
*/

/*
StormServer section should contain a list of authorized users and their passwords.
*/
StormServer {
org.apache.zookeeper.server.auth.DigestLoginModule required
user_super="adminsecret"
user_bob="bobsecret";
user_john="johnsecret";
};

/*
StormClient section contains one user name and his/her password.
*/
StormClient {
org.apache.zookeeper.server.auth.DigestLoginModule required
username="bob"
password="bobsecret";
};
22 changes: 22 additions & 0 deletions logback/cluster.xml
Expand Up @@ -13,6 +13,23 @@
<maxFileSize>100MB</maxFileSize>
</triggeringPolicy>

<encoder>
<pattern>%d{yyyy-MM-dd HH:mm:ss} %c{1} [%p] %m%n</pattern>
</encoder>
</appender>

<appender name="ACCESS" class="ch.qos.logback.core.rolling.RollingFileAppender">
<file>${storm.home}/logs/access.log</file>
<rollingPolicy class="ch.qos.logback.core.rolling.FixedWindowRollingPolicy">
<fileNamePattern>${storm.home}/logs/${logfile.name}.%i</fileNamePattern>
<minIndex>1</minIndex>
<maxIndex>9</maxIndex>
</rollingPolicy>

<triggeringPolicy class="ch.qos.logback.core.rolling.SizeBasedTriggeringPolicy">
<maxFileSize>100MB</maxFileSize>
</triggeringPolicy>

<encoder>
<pattern>%d{yyyy-MM-dd HH:mm:ss} %c{1} [%p] %m%n</pattern>
</encoder>
Expand All @@ -21,4 +38,9 @@
<root level="INFO">
<appender-ref ref="A1"/>
</root>

<logger name="backtype.storm.security.auth.authorizer" additivity="false">
<level value="INFO" />
<appender-ref ref="ACCESS" />
</logger>
</configuration>
10 changes: 10 additions & 0 deletions src/jvm/backtype/storm/Config.java
Expand Up @@ -64,6 +64,11 @@ public class Config extends HashMap<String, Object> {
*/
public static String STORM_LOCAL_HOSTNAME = "storm.local.hostname";

/**
* The transport plug-in for Thrift client/server communication
*/
public static String STORM_THRIFT_TRANSPORT_PLUGIN = "storm.thrift.transport";

/**
* The serializer class for ListDelegate (tuple payload).
* The default serializer will be ListDelegateSerializer
Expand Down Expand Up @@ -211,6 +216,11 @@ public class Config extends HashMap<String, Object> {
*/
public static String NIMBUS_TOPOLOGY_VALIDATOR = "nimbus.topology.validator";

/**
* Class name for authorization plugin for Nimbus
*/
public static String NIMBUS_AUTHORIZER = "nimbus.authorizer";

/**
* Storm UI binds to this port.
*/
Expand Down
81 changes: 81 additions & 0 deletions src/jvm/backtype/storm/security/auth/AuthUtils.java
@@ -0,0 +1,81 @@
package backtype.storm.security.auth;

import backtype.storm.Config;
import javax.security.auth.login.Configuration;
import javax.security.auth.login.AppConfigurationEntry;
import java.security.NoSuchAlgorithmException;
import java.security.URIParameter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.net.URI;
import java.util.Map;

public class AuthUtils {
private static final Logger LOG = LoggerFactory.getLogger(AuthUtils.class);
public static final String LOGIN_CONTEXT_SERVER = "StormServer";
public static final String LOGIN_CONTEXT_CLIENT = "StormClient";
public static final String SERVICE = "storm_thrift_server";

/**
* Construct a JAAS configuration object per storm configuration file
* @param storm_conf Storm configuration
* @return JAAS configuration object
*/
public static Configuration GetConfiguration(Map storm_conf) {
Configuration login_conf = null;

//find login file configuration from Storm configuration
String loginConfigurationFile = (String)storm_conf.get("java.security.auth.login.config");
if ((loginConfigurationFile != null) && (loginConfigurationFile.length()>0)) {
try {
URI config_uri = new File(loginConfigurationFile).toURI();
login_conf = Configuration.getInstance("JavaLoginConfig", new URIParameter(config_uri));
} catch (NoSuchAlgorithmException ex1) {
if (ex1.getCause() instanceof FileNotFoundException)
throw new RuntimeException("configuration file "+loginConfigurationFile+" could not be found");
else throw new RuntimeException(ex1);
} catch (Exception ex2) {
throw new RuntimeException(ex2);
}
}

return login_conf;
}

/**
* Construct a transport plugin per storm configuration
* @param conf storm configuration
* @return
*/
public static ITransportPlugin GetTransportPlugin(Map storm_conf, Configuration login_conf) {
ITransportPlugin transportPlugin = null;
try {
String transport_plugin_klassName = (String) storm_conf.get(Config.STORM_THRIFT_TRANSPORT_PLUGIN);
Class klass = Class.forName(transport_plugin_klassName);
transportPlugin = (ITransportPlugin)klass.newInstance();
transportPlugin.prepare(storm_conf, login_conf);
} catch(Exception e) {
throw new RuntimeException(e);
}
return transportPlugin;
}

public static String get(Configuration configuration, String section, String key) throws IOException {
AppConfigurationEntry configurationEntries[] = configuration.getAppConfigurationEntry(section);
if (configurationEntries == null) {
String errorMessage = "Could not find a '"+ section + "' entry in this configuration.";
throw new IOException(errorMessage);
}

for(AppConfigurationEntry entry: configurationEntries) {
Object val = entry.getOptions().get(key);
if (val != null)
return (String)val;
}
return null;
}
}

30 changes: 30 additions & 0 deletions src/jvm/backtype/storm/security/auth/IAuthorizer.java
@@ -0,0 +1,30 @@
package backtype.storm.security.auth;

import java.util.Map;

/**
* Nimbus could be configured with an authorization plugin.
* If not specified, all requests are authorized.
*
* You could specify the authorization plugin via storm parameter. For example:
* storm -c nimbus.authorization.class=backtype.storm.security.auth.NoopAuthorizer ...
*
* You could also specify it via storm.yaml:
* nimbus.authorization.class: backtype.storm.security.auth.NoopAuthorizer
*/
public interface IAuthorizer {
/**
* Invoked once immediately after construction
* @param conf Storm configuration
*/
void prepare(Map storm_conf);

/**
* permit() method is invoked for each incoming Thrift request.
* @param context request context includes info about
* @param operation operation name
* @param topology_storm configuration of targeted topology
* @return true if the request is authorized, false if reject
*/
public boolean permit(ReqContext context, String operation, Map topology_conf);
}
38 changes: 38 additions & 0 deletions src/jvm/backtype/storm/security/auth/ITransportPlugin.java
@@ -0,0 +1,38 @@
package backtype.storm.security.auth;

import java.io.IOException;
import java.util.Map;

import javax.security.auth.login.Configuration;

import org.apache.thrift7.TProcessor;
import org.apache.thrift7.server.TServer;
import org.apache.thrift7.transport.TTransport;
import org.apache.thrift7.transport.TTransportException;

/**
* Interface for Thrift Transport plugin
*/
public interface ITransportPlugin {
/**
* Invoked once immediately after construction
* @param storm_conf Storm configuration
* @param login_conf login configuration
*/
void prepare(Map storm_conf, Configuration login_conf);

/**
* Create a server associated with a given port and service handler
* @param port listening port
* @param processor service handler
* @return server to be binded
*/
public TServer getServer(int port, TProcessor processor) throws IOException, TTransportException;

/**
* Connect to the specified server via framed transport
* @param transport The underlying Thrift transport.
* @param serverHost server host
*/
public TTransport connect(TTransport transport, String serverHost) throws IOException, TTransportException;
}
91 changes: 91 additions & 0 deletions src/jvm/backtype/storm/security/auth/ReqContext.java
@@ -0,0 +1,91 @@
package backtype.storm.security.auth;

import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import java.net.InetAddress;
import com.google.common.annotations.VisibleForTesting;
import java.security.AccessControlContext;
import java.security.AccessController;
import java.security.Principal;
import javax.security.auth.Subject;

/**
* context request context includes info about
* (1) remote address,
* (2) remote subject and primary principal
* (3) request ID
*/
public class ReqContext {
private static final AtomicInteger uniqueId = new AtomicInteger(0);
private Subject _subject;
private InetAddress _remoteAddr;
private Integer _reqID;
private Map _storm_conf;

/**
* Get a request context associated with current thread
* @return
*/
public static ReqContext context() {
return ctxt.get();
}

//each thread will have its own request context
private static final ThreadLocal < ReqContext > ctxt =
new ThreadLocal < ReqContext > () {
@Override
protected ReqContext initialValue() {
return new ReqContext(AccessController.getContext());
}
};

//private constructor
@VisibleForTesting
ReqContext(AccessControlContext acl_ctxt) {
_subject = Subject.getSubject(acl_ctxt);
_reqID = uniqueId.incrementAndGet();
}

/**
* client address
*/
public void setRemoteAddress(InetAddress addr) {
_remoteAddr = addr;
}

public InetAddress remoteAddress() {
return _remoteAddr;
}

/**
* Set remote subject explicitly
*/
public void setSubject(Subject subject) {
_subject = subject;
}

/**
* Retrieve client subject associated with this request context
*/
public Subject subject() {
return _subject;
}

/**
* The primary principal associated current subject
*/
public Principal principal() {
if (_subject == null) return null;
Set<Principal> princs = _subject.getPrincipals();
if (princs.size()==0) return null;
return (Principal) (princs.toArray()[0]);
}

/**
* request ID of this request
*/
public Integer requestID() {
return _reqID;
}
}

0 comments on commit d6374fe

Please sign in to comment.