Permalink
Browse files

Apply Thrift SASL client/server framework for authentication/authoriz…

…ation/audit
  • Loading branch information...
anfeng committed Feb 13, 2013
1 parent 01c4147 commit 96c51312a9149a0c86509823f2494dd54f943c9a
View
@@ -0,0 +1,10 @@
+StormServer {
+ org.apache.zookeeper.server.auth.DigestLoginModule required
+ user_super="adminsecret"
+ user_bob="bobsecret";
+};
+StormClient {
+ org.apache.zookeeper.server.auth.DigestLoginModule required
+ username="bob"
+ password="bobsecret";
+};
@@ -0,0 +1,17 @@
+StormServer {
+ com.sun.security.auth.module.Krb5LoginModule required
+ useKeyTab=true
+ keyTab="/etc/nimbus_server.keytab"
+ storeKey=true
+ useTicketCache=false
+ principal="storm_server/carcloth.corp.acme.com@STORM.CORP.ACME.COM";
+};
+StormClient {
+ com.sun.security.auth.module.Krb5LoginModule required
+ useKeyTab=true
+ keyTab="/etc/nimbus_client.keytab"
+ storeKey=true
+ useTicketCache=false
+ serviceName="storm_server";
+};
+
@@ -0,0 +1,7 @@
+StormClient {
+ com.sun.security.auth.module.Krb5LoginModule required
+ doNotPrompt=true
+ useTicketCache=true
+ serviceName="storm_server";
+};
+
View
@@ -13,6 +13,23 @@
<maxFileSize>100MB</maxFileSize>
</triggeringPolicy>
+ <encoder>
+ <pattern>%d{yyyy-MM-dd HH:mm:ss} %c{1} [%p] %m%n</pattern>
+ </encoder>
+ </appender>
+
+ <appender name="ACCESS" class="ch.qos.logback.core.rolling.RollingFileAppender">
+ <file>${storm.home}/logs/access.log</file>
+ <rollingPolicy class="ch.qos.logback.core.rolling.FixedWindowRollingPolicy">
+ <fileNamePattern>${storm.home}/logs/${logfile.name}.%i</fileNamePattern>
+ <minIndex>1</minIndex>
+ <maxIndex>9</maxIndex>
+ </rollingPolicy>
+
+ <triggeringPolicy class="ch.qos.logback.core.rolling.SizeBasedTriggeringPolicy">
+ <maxFileSize>100MB</maxFileSize>
+ </triggeringPolicy>
+
<encoder>
<pattern>%d{yyyy-MM-dd HH:mm:ss} %c{1} [%p] %m%n</pattern>
</encoder>
@@ -21,4 +38,9 @@
<root level="INFO">
<appender-ref ref="A1"/>
</root>
+
+ <logger name="backtype.storm.security.auth.NoopAuthorizer" additivity="false">
+ <level value="INFO" />
+ <appender-ref ref="ACCESS" />
+ </logger>
</configuration>
View
@@ -8,8 +8,8 @@
[storm/libthrift7 "0.7.0"
:exclusions [org.slf4j/slf4j-api]]
[clj-time "0.4.1"]
- [com.netflix.curator/curator-framework "1.0.1"
- :exclusions [log4j/log4j]]
+ [com.netflix.curator/curator-framework "1.2.6"
+ :exclusions [log4j/log4j org.slf4j/slf4j-log4j12]]
[backtype/jzmq "2.1.0"]
[com.googlecode.json-simple/json-simple "1.1"]
[compojure "1.1.3"]
@@ -6,7 +6,7 @@
ZooDefs ZooDefs$Ids CreateMode WatchedEvent Watcher$Event Watcher$Event$KeeperState
Watcher$Event$EventType KeeperException$NodeExistsException])
(:import [org.apache.zookeeper.data Stat])
- (:import [org.apache.zookeeper.server ZooKeeperServer NIOServerCnxn$Factory])
+ (:import [org.apache.zookeeper.server ZooKeeperServer NIOServerCnxnFactory])
(:import [java.net InetSocketAddress BindException])
(:import [java.io File])
(:import [backtype.storm.utils Utils ZookeeperAuthInfo])
@@ -132,7 +132,7 @@
(let [localfile (File. localdir)
zk (ZooKeeperServer. localfile localfile 2000)
[retport factory] (loop [retport (if port port 2000)]
- (if-let [factory-tmp (try-cause (NIOServerCnxn$Factory. (InetSocketAddress. retport))
+ (if-let [factory-tmp (try-cause (doto (NIOServerCnxnFactory.) (.configure (InetSocketAddress. retport) 100))
(catch BindException e
(when (> (inc retport) (if port port 65535))
(throw (RuntimeException. "No port is available to launch an inprocess zookeeper.")))))]
@@ -0,0 +1,116 @@
+package backtype.storm.security.auth;
+
+import java.io.IOException;
+import java.util.Map;
+
+import javax.security.auth.callback.CallbackHandler;
+import javax.security.sasl.SaslClient;
+import javax.security.sasl.SaslClientFactory;
+import javax.security.sasl.SaslServerFactory;
+import javax.security.sasl.SaslException;
+import javax.security.sasl.SaslServer;
+
+import com.google.common.annotations.VisibleForTesting;
+
+public class AnonymousAuthenticationProvider extends java.security.Provider {
+ public AnonymousAuthenticationProvider() {
+ super("ThriftSaslAnonymous", 1.0, "Thrift Anonymous SASL provider");
+ put("SaslClientFactory.ANONYMOUS", SaslAnonymousFactory.class.getName());
+ put("SaslServerFactory.ANONYMOUS", SaslAnonymousFactory.class.getName());
+ }
+
+ public static class SaslAnonymousFactory implements SaslClientFactory, SaslServerFactory {
+
+ @Override
+ public SaslClient createSaslClient(
+ String[] mechanisms, String authorizationId, String protocol,
+ String serverName, Map<String,?> props, CallbackHandler cbh)
+ {
+ for (String mech : mechanisms) {
+ if ("ANONYMOUS".equals(mech)) {
+ return new AnonymousClient(authorizationId);
+ }
+ }
+ return null;
+ }
+
+ @Override
+ public SaslServer createSaslServer(
+ String mechanism, String protocol, String serverName, Map<String,?> props, CallbackHandler cbh)
+ {
+ if ("ANONYMOUS".equals(mechanism)) {
+ return new AnonymousServer();
+ }
+ return null;
+ }
+ public String[] getMechanismNames(Map<String, ?> props) {
+ return new String[] { "ANONYMOUS" };
+ }
+ }
+}
+
+
+class AnonymousClient implements SaslClient {
+ @VisibleForTesting
+ final String username;
+ private boolean hasProvidedInitialResponse;
+
+ public AnonymousClient(String username) {
+ if (username == null) {
+ this.username = "anonymous";
+ } else {
+ this.username = username;
+ }
+ }
+
+ public String getMechanismName() { return "ANONYMOUS"; }
+ public boolean hasInitialResponse() { return true; }
+ public byte[] evaluateChallenge(byte[] challenge) throws SaslException {
+ if (hasProvidedInitialResponse) {
+ throw new SaslException("Already complete!");
+ }
+
+ try {
+ hasProvidedInitialResponse = true;
+ return username.getBytes("UTF-8");
+ } catch (IOException e) {
+ throw new SaslException(e.toString());
+ }
+ }
+ public boolean isComplete() { return hasProvidedInitialResponse; }
+ public byte[] unwrap(byte[] incoming, int offset, int len) {
+ throw new UnsupportedOperationException();
+ }
+ public byte[] wrap(byte[] outgoing, int offset, int len) {
+ throw new UnsupportedOperationException();
+ }
+ public Object getNegotiatedProperty(String propName) { return null; }
+ public void dispose() {}
+}
+
+class AnonymousServer implements SaslServer {
+ private String user;
+ public String getMechanismName() { return "ANONYMOUS"; }
+ public byte[] evaluateResponse(byte[] response) throws SaslException {
+ try {
+ this.user = new String(response, "UTF-8");
+ } catch (IOException e) {
+ throw new SaslException(e.toString());
+ }
+ return null;
+ }
+ public boolean isComplete() { return user != null; }
+ public String getAuthorizationID() { return user; }
+ public byte[] unwrap(byte[] incoming, int offset, int len) {
+ throw new UnsupportedOperationException();
+ }
+ public byte[] wrap(byte[] outgoing, int offset, int len) {
+ throw new UnsupportedOperationException();
+ }
+ public Object getNegotiatedProperty(String propName) { return null; }
+ public void dispose() {}
+
+}
+
+
+
@@ -0,0 +1,39 @@
+package backtype.storm.security.auth;
+
+import javax.security.auth.login.Configuration;
+import javax.security.auth.login.AppConfigurationEntry;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+import java.io.IOException;
+
+public class AuthUtils {
+ public static String LoginContextServer = "StormServer";
+ public static String LoginContextClient = "StormClient";
+
+ static public final String DIGEST = "DIGEST-MD5";
+ static public final String ANONYMOUS = "ANONYMOUS";
+ static public final String KERBEROS = "GSSAPI";
+ static public final String SERVICE = "storm_thrift_server";
+
+ private static final Logger LOG = LoggerFactory.getLogger(AuthUtils.class);
+
+ public static String get(Configuration configuration, String section, String key) throws IOException {
+ AppConfigurationEntry configurationEntries[] = configuration.getAppConfigurationEntry(section);
+ if (configurationEntries == null) {
+ String errorMessage = "Could not find a '"+ section + "' entry in this configuration.";
+ LOG.error(errorMessage);
+ throw new IOException(errorMessage);
+ }
+
+ for(AppConfigurationEntry entry: configurationEntries) {
+ Object val = entry.getOptions().get(key);
+ if (val != null)
+ return (String)val;
+ }
+ return null;
+ }
+}
+
@@ -0,0 +1,35 @@
+package backtype.storm.security.auth;
+
+import backtype.storm.Config;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * An authorization implementation that denies everything, for testing purposes
+ */
+public class DenyAuthorizer implements IAuthorization {
+ private static final Logger LOG =
+ LoggerFactory.getLogger(DenyAuthorizer.class);
+
+ /**
+ * permit() method is invoked for each incoming Thrift request
+ * @param contrext request context includes info about
+ * (1) remote address/subject,
+ * (2) operation
+ * (3) configuration of targeted topology
+ * @return true if the request is authorized, false if reject
+ */
+ public boolean permit(ReqContext context) {
+ LOG.info("Access "
+ + " from: " +
+ (context.remoteAddress() == null
+ ? "null" : context.remoteAddress().toString())
+ + " principal:"+ (context.principal() == null
+ ? "null" : context.principal())
+ +" op:"+context.operation()
+ + " topoology:"+context.topologyConf().get(Config.TOPOLOGY_NAME)
+ );
+ return false;
+ }
+}
@@ -0,0 +1,26 @@
+package backtype.storm.security.auth;
+
+/**
+ * Nimbus could be configured with an authorization plugin.
+ * If not specified, all requests are authorized.
+ *
+ * You could specify the authorization plugin via storm parameter. For example:
+ * storm -c nimbus.authorization.classname=backtype.storm.security.auth.DefaultAuthorizer ...
+ *
+ * You could also specify it via storm.yaml:
+ * nimbus.authorization.classname: backtype.storm.security.auth.DefaultAuthorizer
+ *
+ * @author afeng
+ *
+ */
+public interface IAuthorization {
+ /**
+ * permit() method is invoked for each incoming Thrift request.
+ * @param contrext request context includes info about
+ * (1) remote address/subject,
+ * (2) operation
+ * (3) configuration of targeted topology
+ * @return true if the request is authorized, false if reject
+ */
+ public boolean permit(ReqContext context);
+}
@@ -0,0 +1,32 @@
+package backtype.storm.security.auth;
+
+import backtype.storm.Config;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A no-op authorization implementation that illustrate info available for authorization decisions.
+ */
+public class NoopAuthorizer implements IAuthorization {
+ private static final Logger LOG = LoggerFactory.getLogger(NoopAuthorizer.class);
+
+ /**
+ * permit() method is invoked for each incoming Thrift request
+ * @param contrext request context includes info about
+ * (1) remote address/subject,
+ * (2) operation
+ * (3) configuration of targeted topology
+ * @return true if the request is authorized, false if reject
+ */
+ public boolean permit(ReqContext context) {
+ LOG.info("Access "
+ + " from: " + context.remoteAddress() == null
+ ? "null" : context.remoteAddress().toString()
+ + " principal:"+context.principal() == null
+ ? "null" : context.principal()
+ +" op:"+context.operation()
+ + " topoology:"+context.topologyConf().get(Config.TOPOLOGY_NAME));
+ return true;
+ }
+}
Oops, something went wrong.

0 comments on commit 96c5131

Please sign in to comment.