In [None]:
%load_ext dockermagic

# Kafka
<img src="https://kafka.apache.org/images/apache-kafka.png" alt="Kafka" width="150"/>

- https://kafka.apache.org/

## Setup

- download from https://dlcdn.apache.org/kafka/3.6.0/kafka_2.13-3.6.0.tgz
- version 3.6.0

In [None]:
%%dockerexec hadoop

source /opt/envvars.sh

# Download package
mkdir /opt/pkgs
cd /opt/pkgs
wget -q -c https://dlcdn.apache.org/kafka/3.6.0/kafka_2.13-3.6.0.tgz
    
# unpack file and create link
tar -zxf kafka_2.13-3.6.0.tgz -C /opt
ln -s /opt/kafka_2.13-3.6.0 /opt/kafka

# update envvars.sh
cat >> /opt/envvars.sh << EOF
# Kafka
export KAFKA_HOME=/opt/kafka
export PATH=\${PATH}:\${KAFKA_HOME}/bin

EOF

cat /opt/envvars.sh

In [None]:
%%dockerexec hadoop

source /opt/envvars.sh

# Kafka with ZooKeeper

# Start ZooKeeper service
zookeeper-server-start.sh $KAFKA_HOME/config/zookeeper.properties > /tmp/zookeeper.output 2>&1 &
echo $! > /tmp/zookeeper.pid

ps -fp $(cat /tmp/zookeeper.pid)

# Start Kafka broker service
kafka-server-start.sh $KAFKA_HOME/config/server.properties > /tmp/kafka-server.output 2>&1 &
echo $! > /tmp/kafka-server.pid

ps -fp $(cat /tmp/kafka-server.pid)

## Basic utilization

In [None]:
%%dockerexec hadoop

source /opt/envvars.sh

# Create a topic to store events

kafka-topics.sh --create --topic mytopic --bootstrap-server localhost:9092
kafka-topics.sh --list --bootstrap-server localhost:9092
kafka-topics.sh --describe --topic mytopic --bootstrap-server localhost:9092

In [None]:
%%dockerexec hadoop

source /opt/envvars.sh

# Write events to topic
echo "event 1" | kafka-console-producer.sh --topic mytopic --bootstrap-server localhost:9092
echo "event 2" | kafka-console-producer.sh --topic mytopic --bootstrap-server localhost:9092
echo "event 3" | kafka-console-producer.sh --topic mytopic --bootstrap-server localhost:9092

In [None]:
%%dockerexec hadoop

source /opt/envvars.sh

# Read events from topic
# --from-beginning - starts consuming from the beginning of the topic's partition, otherwise, starts consuming from most recent messages
# --timeout-ms - specifies the duration in miliseconds the consumer runs waiting for events
kafka-console-consumer.sh --topic mytopic --from-beginning --timeout-ms 2000 --bootstrap-server localhost:9092

## Using Kafka and HDFS

In [None]:
%%dockerexec hadoop

source /opt/envvars.sh

cd $KAFKA_HOME/libs

# Download and install Kafka Connect HDFS
# https://www.confluent.io/hub/#hdfs
# https://www.confluent.io/hub/confluentinc/kafka-connect-hdfs3
wget -qq -c https://d1i4a15mxbxib1.cloudfront.net/api/plugins/confluentinc/kafka-connect-hdfs3/versions/1.1.27/confluentinc-kafka-connect-hdfs3-1.1.27.zip

unzip -qq confluentinc-kafka-connect-hdfs3-1.1.27.zip

In [None]:
%%dockerexec hadoop

source /opt/envvars.sh

# Create topic monitor
kafka-topics.sh --create --topic monitor --bootstrap-server localhost:9092
kafka-topics.sh --list --bootstrap-server localhost:9092

In [None]:
%%dockerexec hadoop

source /opt/envvars.sh

# https://docs.confluent.io/kafka-connectors/hdfs3-sink/current/overview.html
# Create configuration file for sink
cat <<EOF > $KAFKA_HOME/config/connect-hdfs.properties
name=hdfs-sink
bootstrap.servers=localhost:9092
connector.class=io.confluent.connect.hdfs3.Hdfs3SinkConnector
tasks.max=1
topics=monitor
hdfs.url=hdfs://hadoop:9000
format.class=io.confluent.connect.hdfs3.avro.AvroFormat
flush.size=3
confluent.license=
confluent.topic.bootstrap.servers=localhost:9092
hadoop.conf.dir=/opt/hadoop/etc/hadoop
hadoop.home=/opt/hadoop
EOF

In [None]:
%%dockerexec hadoop

source /opt/envvars.sh

# Create configuration file for kafka-connect
cat <<EOF > $KAFKA_HOME/config/myconnect.properties
bootstrap.servers=localhost:9092
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.storage.StringConverter
key.converter.schemas.enable=true
value.converter.schemas.enable=true
offset.storage.file.filename=/tmp/connect.offsets
offset.flush.interval.ms=10000
plugin.path=/opt/kafka/libs
EOF

In [None]:
%%dockerexec hadoop

source /opt/envvars.sh

# Start Connector
connect-standalone.sh $KAFKA_HOME/config/myconnect.properties $KAFKA_HOME/config/connect-hdfs.properties > /tmp/connect-standalone.output 2>&1 &
echo $! > /tmp/connect-standalone.pid

ps -fp $(cat /tmp/connect-standalone.pid)

In [None]:
%%dockerexec hadoop

source /opt/envvars.sh

# Create system monitor script
cat > /tmp/monitor.sh << EOF
while true
do
    DATE=\$(date)
    CPU=\$(top -bn1 | grep 'Cpu(s)' | awk '{print \$2 + \$4}')
    MEM=\$(free -m | awk '/Mem:/ {print \$3/\$2 * 100.0}')
    echo -n "Date: \$DATE - CPU Utilization: \$CPU% - Memory Utilization: \$MEM%" | \
        kafka-console-producer.sh --topic monitor --bootstrap-server localhost:9092
    sleep 3
done
EOF

chmod +x /tmp/monitor.sh
/tmp/monitor.sh > /dev/null 2>&1 &
echo $! > /tmp/monitor.pid

ps -fp $(cat /tmp/monitor.pid)

In [None]:
%%dockerexec hadoop

source /opt/envvars.sh

# List created files in HDFS
hdfs dfs -ls -R /topics

In [None]:
%%dockerexec hadoop

# Download Apache Avro
# https://avro.apache.org/
cd /opt/pkgs
wget -qq -c https://dlcdn.apache.org/avro/stable/java/avro-tools-1.11.3.jar

In [None]:
%%dockerexec hadoop

source /opt/envvars.sh

mkdir /tmp/monitor
cd /tmp/monitor

# Get files from HDFS
hdfs dfs -get /topics/monitor/partition=0/* .
ls *.avro

# Print files content
for FILE in $(ls -1 *.avro); do
    echo $FILE
    java -jar /opt/pkgs/avro-tools-1.11.3.jar tojson $FILE
done

In [None]:
%%dockerexec hadoop

source /opt/envvars.sh

# Kill services
kill $(cat /tmp/monitor.pid)
kill $(cat /tmp/connect-standalone.pid)

kafka-server-stop.sh
zookeeper-server-stop.sh

ps -fp $(cat /tmp/monitor.pid)
ps -fp $(cat /tmp/connect-standalone.pid)
ps -fp $(cat /tmp/kafka-server.pid)
ps -fp $(cat /tmp/zookeeper.pid)