This document walks through installing Kafka and then connecting to the ZTF alert stream via 2 different methods:
- Console Consumer: Command-line consumer (installed with Confluent Platform) that prints alert content to stdout; useful for testing the connection.
- Kafka Connectors: Plugins that listen to a stream and route the message to another service. Our consumer is a Kafka -> Pub/Sub connector that simply passes the bytes through (no data decoding or conversions).
- Pre-configured instance
- Install Kafka (Confluent Platform) manually
- Console Consumer (useful for testing the connection)
- Kafka Connectors (run a consumer and route the messages to another service)
The example code that follows creates a Compute Engine (CE) instance called kafka-consumer-test and then installs and configures both methods to listen to the ZTF stream. (ZTF auth files are required, but not provided here.) There is an existing CE instance, kafka-consumer, that has been setup following this example. You(*) can log into it and test or use the methods described here to connect to ZTF without having to install or configure anything (see the "Run" sections below; you could also take advantage of the installed software and auth files, but create/configure your own working directory). It is not a Production instance. The command line commands to access kafka-consumer are:
# first start the instance
gcloud compute instances start kafka-consumer --zone us-central1-a
# then log in
gcloud beta compute ssh kafka-consumer --zone us-central1-a
# make sure you STOP the instance when you are done
# so that we don't pay for it to run
gcloud compute instances stop kafka-consumer --zone us-central1-a
If you need permissions to access it or use sudo
, you should be able to grant them in the IAM section of the GCP Console. I can help if you get stuck.
(*) Assuming "you" are a PGB member with access to the GCP project.
Confluent Platform is a collection of tools (including Kafka) to run and manage data streams. In some sense, installing the full platform is overkill (listening to a stream requires fewer tools than producing a stream). However, it's worth it:
- This is a (the?) standard way to install Kafka, so it becomes easier to follow online examples/tutorials and to troubleshoot with ZTF folks;
- The tasks we need to accomplish (testing and running connections) run smoothly using Confluent Platform (the same cannot be said of other methods I and we have tried); and
- It's easy to imagine needing some of the other components in the package down the road.
Instruction Links:
- gcloud compute instances create
- How To Install Java with Apt on Debian 10
- Confluent Platform: Manual Install on Ubuntu and Debian
See the file at code path broker/consumer/vm_install.sh for a quick list of the commands required for steps 2 and 3. (this file is used to set up the production instance ztf-consumer).
- (Optional) Create a Compute Engine VM instance (Debian 10):
# configs
instancename=kafka-consumer-test
machinetype=e2-standard-2
zone=us-central1-a
# create the instance
gcloud compute instances create ${instancename} \
--zone=${zone} \
--machine-type=${machinetype} \
--scopes=cloud-platform \
--metadata=google-logging-enabled=true \
--tags=ztfport # firewall rule, opens port used by Kafka/ZTF
# log in
gcloud compute ssh ${instancename} --zone=${zone}
- Install Java and the Java Development Kit (JDK).
- Debian 10 instructions are at the link above.
- From that page you can select different versions or distributions.
- I used the "Default" OpenJDK option.
- Be sure to set the
JAVA_HOME
environment variable; instructions at the bottom of the page.
- Install the Confluent Platform. This installs Kafka + additional tools.
- Follow the instructions in in the "Get the Software" section of the Confluent Platform link above.
- See links on LHS of the page for RHEL, CentOS, or Docker installs.
kafka-console-consumer.sh is a command line utility that creates a consumer and prints the messages to the terminal. It is useful for testing the connection.
The following instructions are pieced together from:
- Kafka Consumer Configs
- SASL configuration for Kafka Clients
- Confluent Kafka Consumer
- info I got from Christopher Phillips over phone/email.
Find out where Kafka is installed. On the VM using Marketplace, it is in /opt/kafka. On the VM using manual install of Confluent Platform, components are scattered around a bit; look in:
- /etc/kafka (example properties and config files)
- /bin (e.g., for kafka-console-consumer and confluent-hub)
The following assumes we are on the VM with Confluent Platform.
- Create a working directory. In the following I use /home/ztf_consumer
- This requires two authorization files (not provided here):
- krb5.conf, which should be at /etc/krb5.conf
- pitt-reader.user.keytab. I store this in the directory /home/ztf_consumer; we need the path for config below.
- Create kafka_client_jaas.conf in your working directory containing the following (change the keyTab path if needed):
KafkaClient {
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
storeKeyTab=true
debug=true
serviceName="kafka"
keyTab="/home/ztf_consumer/pitt-reader.user.keytab"
principal="pitt-reader@KAFKA.SECURE"
useTicketCache=false;
};
Make sure there are no extra spaces at the ends of the lines, else the connection will not succeed.
- Set an environment variable so Java can find the file we just created:
export KAFKA_OPTS="-Djava.security.auth.login.config=/home/ztf_consumer/kafka_client_jaas.conf"
- Setup the Kafka config file consumer.properties. Sample config files are provided with the installation in /opt/kafka/config/ (Marketplace VM) or /etc/kafka/ on the manual install VM. Create a consumer.properties file in your working directory that contains the following:
bootstrap.servers=public2.alerts.ztf.uw.edu:9094
group.id=group
session.timeout.ms=6000
enable.auto.commit=False
sasl.kerberos.kinit.cmd='kinit -t "%{sasl.kerberos.keytab}" -k %{sasl.kerberos.principal}'
sasl.kerberos.service.name=kafka
security.protocol=SASL_PLAINTEXT
sasl.mechanism=GSSAPI
auto.offset.reset=earliest
The following assumes we are using the manual install VM.
# make sure the KAFKA_OPTS env variable is set
export KAFKA_OPTS="-Djava.security.auth.login.config=/home/ztf_consumer/kafka_client_jaas.conf"
# Set the topic and run the console consumer
topicday=20210105 # yyyymmdd, must be within 7 days of present
cd /bin
./kafka-console-consumer \
--bootstrap-server public2.alerts.ztf.uw.edu:9094 \
--topic ztf_${topicday}_programid1 \
--consumer.config /home/ztf_consumer/consumer.properties
# final argument should point to the consumer.properties file created above
After a few moments, if the connection is successful you will see encoded alerts printing to stdout. Use control-C
to stop consuming.
Kafka connectors run a Kafka consumer and route the messages to another service.
The following uses instructions at:
- Create a directory to store the connectors (plugins):
mkdir /usr/local/share/kafka/plugins
- To use connectors, the .properties file called when running the consumer/connector must include the following:
plugin.path=/usr/local/share/kafka/plugins
- Create a working directory. In the following I use /home/ztf_consumer
- Two authorization files are required:
- krb5.conf, which should be at /etc/krb5.conf
- pitt-reader.user.keytab. I store this in the directory /home/ztf_consumer; we need the path for config below.
We use a Kafka-Pub/Sub connector (kafka-connector) that is maintained by Pub/Sub developers. There is another connector managed by Confluent (here) but it only supports a Pub/Sub source (i.e., Pub/Sub -> Kafka), we need a Pub/Sub sink.
We pass the alert bytes straight through to Pub/Sub without decoding or converting them.
The following instructions were pieced together from:
- Installation:
- Getting Started with Kafka Connect
- the copy_tool.py file provided with connector (see the repo )
- Configuration:
- Worker configuration:
- Connector configuration:
- CloudPubSubConnector Sink Configuration Properties
- Example config files, which you can find at:
- /etc/kafka/connect-standalone.properties
- /etc/kafka/connect-distributed.properties
- cps-sink-connector.properties (link)
The connector can be configured to run in "standalone" or "distributed" mode. Distributed is recommended for production environments, partly due to its fault tolerance. I initially tried distributed, but: a) I got confused about where to put the connector configs, and b) I'm not totally clear on what the distributed-specific worker options are and what they do. Starting with standalone mode for the following (but we should probably switch at some point):
Install
# navigate to the directory created above to store connectors
cd /usr/local/share/kafka/plugins
# download the .jar file
CONNECTOR_RELEASE=v0.5-alpha
sudo wget https://github.com/GoogleCloudPlatform/pubsub/releases/download/${CONNECTOR_RELEASE}/pubsub-kafka-connector.jar
# now the plugin is installed
Configure
Worker configuration
# navigate to the working directory created when configuring Kafka for ZTF
cd /home/ztf_consumer
Create a file called psconnect-worker.properties containing the following:
plugin.path=/usr/local/share/kafka/plugins
# ByteArrayConverter provides a “pass-through” option that does no conversion
key.converter=org.apache.kafka.connect.converters.ByteArrayConverter
value.converter=org.apache.kafka.connect.converters.ByteArrayConverter
offset.storage.file.filename=/tmp/connect.offsets
# Flush much faster than normal, which is useful for testing/debugging
# offset.flush.interval.ms=10000
# workers need to use SASL
sasl.mechanism=GSSAPI
sasl.kerberos.service.name=kafka
security.protocol=SASL_PLAINTEXT
sasl.jaas.config=com.sun.security.auth.module.Krb5LoginModule required \
useKeyTab=true \
storeKeyTab=true \
serviceName="kafka" \
keyTab="/home/ztf_consumer/pitt-reader.user.keytab" \
principal="pitt-reader@KAFKA.SECURE" \
useTicketCache=false;
# connecting to ZTF
bootstrap.servers=public2.alerts.ztf.uw.edu:9094
# group.id=group
# session.timeout.ms=6000
# enable.auto.commit=False
# sasl.kerberos.kinit.cmd='kinit -t "%{sasl.kerberos.keytab}" -k %{sasl.kerberos.principal}'
consumer.auto.offset.reset=earliest
consumer.sasl.mechanism=GSSAPI
consumer.sasl.kerberos.service.name=kafka
consumer.security.protocol=SASL_PLAINTEXT
consumer.sasl.jaas.config=com.sun.security.auth.module.Krb5LoginModule required \
useKeyTab=true \
storeKeyTab=true \
serviceName="kafka" \
keyTab="/home/ztf_consumer/pitt-reader.user.keytab" \
principal="pitt-reader@KAFKA.SECURE" \
useTicketCache=false;
Connector configuration
Create a file in your working directory called ps-connector.properties containing the following:
name=ps-sink-connector-ztf
connector.class=com.google.pubsub.kafka.sink.CloudPubSubSinkConnector
tasks.max=10
# set ZTF Kafka the topic
topics=ztf_20210107_programid1
# set our Pub/Sub topic and configs
cps.topic=ztf_alert_data-kafka_consumer
cps.project=ardent-cycling-243415
# include Kafka topic, partition, offset, timestamp as msg attributes
metadata.publish=true
cd /bin
# if you want to leave it running and disconnect your terminal from the VM:
screen
# if needed, change the Kafka/ZTF topic (must be within 7 days of present)
# or other configs in the .properties files called below
./connect-standalone \
/home/ztf_consumer/psconnect-worker.properties \
/home/ztf_consumer/ps-connector.properties
This will start up a Kafka consumer and route the messages to Pub/Sub. After a few minutes, if it is working correctly, you will see log messages similar to
INFO WorkerSinkTask{id=ps-sink-connector-ztf-0} Committing offsets asynchronously using sequence number 3
and messages streaming into the Pub/Sub topic ztf_alert_data-kafka_consumer.
This exists and it is free (some connectors require a Confluent Enterprise License), but I haven't actually tried it.
One question that I haven't been able to find the answer to is this: If we run two Kafka connectors, does that create two separate connections to ZTF, or do both connectors use the same incoming stream? We could just install this and try it; I just haven't done it yet. I'm guessing it would be bad form (cost more money on both ends) to pull in two connections every night.