Storm topology to consume from a Kafka topic and publish to another Kafka topic (with Kerberos)
Switch branches/tags
Nothing to show
Clone or download
Fetching latest commit…
Cannot retrieve the latest commit at this time.
Failed to load latest commit information.

Storm topology - Consume from Kafka and publish to Kafka - with Kerberos

Environment with Hortonworks distribution:

  • Cluster deployed with Ambari, HDP 2.6.1 and HDF 3.0.0
  • Cluster is Kerberized against an Active Directory
  • Storm is deployed on 3 nodes, Kafka is deployed on 3 nodes, and a 3-nodes Zookeeper quorum
  • Authorizations for Kafka are managed by Ranger

Versions are Storm 1.1.0 and Kafka 0.10.1

Objective is to have a Storm topology consuming from a Kafka topic ("inputTopicStorm") using KafkaSpout, to send the data into a bolt that will not do anything (IdentityBolt) and send back the data into another Kafka topic ("outputTopicStorm") using KafkaBolt.

Starting with Kafka client 0.10.2 and KAFKA-4259, it is possible to dynamically define the JAAS configuration to connect to the Kafka topic. It means that, in theory, we could use a specific user to consume the data from the Kafka topic, and use another one to publish the data. It also means that it's not necessary to rely in JAAS configuration file anymore.

To build:

mvn clean package

To submit the topology:

[pvillard@storm-1 tmp]$ storm jar kafka-storm-kafka-0.0.1-SNAPSHOT.jar example.KafkaStormKafkaTopology

Code explanations

I define the properties I want to use in my Storm topology: (note that the keytab need to be available on all the Storm nodes)

Properties props = new Properties();
props.put("bootstrap.servers", ",,");
props.put("acks", "1");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("security.protocol", "SASL_PLAINTEXT");
props.put("sasl.jaas.config", " required "
            + "useTicketCache=false "
            + "renewTicket=true "
            + "serviceName=\"kafka\" "
            + "useKeyTab=true "
            + "keyTab=\"/home/pvillard/pvillard.keytab\" "
            + "principal=\"pvillard@EXAMPLE.COM\";");

Kafka Spout

// Kafka spout getting data from "inputTopicStorm"
KafkaSpoutConfig<String, String> kafkaSpoutConfig = KafkaSpoutConfig
        .builder(props.getProperty("bootstrap.servers"), "inputTopicStorm")
        .setRecordTranslator((r) -> new Values(r.topic(), r.key(), r.value()), new Fields("topic", "key", "message"))
KafkaSpout<String, String> kafkaSpout = new KafkaSpout<>(kafkaSpoutConfig);

Kafka Bolt

// Kafka bolt to send data into "outputTopicStorm"
KafkaBolt<String, String> kafkaBolt = new KafkaBolt<String, String>()
        .withTopicSelector(new DefaultTopicSelector("outputTopicStorm"))
        .withTupleToKafkaMapper(new FieldNameBasedTupleToKafkaMapper<String, String>());

Ranger authorizations rules

The Storm user will need the "describe" authorization on the Kafka topics, but the user "pvillard" will be the one needing the "consume" and "publish" authorizations.

Note that the conversion from the Kerberos principal to the local username is managed by:

However, if your cluster is named "hdphdf", then Ambari will create the following principal:


It means that the username used by Storm to check Ranger authorizations will be "storm-hdphdf". Consequently, you will need to create an internal user in Ranger UI to have this user so that you can create the appropriate rules.


If you have a blank page when accessing Storm UI from Ambari, you'll need to add the following in "Custom storm-site":


I also added:

To be able to use the Storm CLI with Kerberos, I did:

[pvillard@storm-1 ~]# cd
[pvillard@storm-1 ~]# mkdir .storm
[pvillard@storm-1 ~]# vi .storm/storm.yaml

And added the following:

nimbus.seeds: ['','','']
nimbus.thrift.port: 6627 "/etc/storm/conf/client_jaas.conf"
storm.thrift.transport: ""

You can find the values for nimbus.seeds & nimbus.thrift.port in /etc/storm/conf/storm.yaml

Then you can check if you can use the Storm CLI executing:

[pvillard@storm-1 ~]# storm list