### Run a local Kafka cluster

(This worked on Vijay Balasubramaniam's personal cluster, but did not work on Shared Autoscaling Americas for some reason)

In [None]:
%scala
// Make sure to grab a version of Kafka that matches your scala version. Databricks clusters usually have scala 2.12.something
util.Properties.versionNumberString

In [None]:
%%bash
# Install and Start Kafka (Idempotent script below)
mkdir -p /databricks/driver/kafka
cd /databricks/driver/kafka
FILE=/databricks/driver/kafka/kafka_2.12-3.3.1.tgz
if test -f "$FILE"; then
  echo "Kafka bundle already exists. Skipping the download"
else
  wget https://downloads.apache.org/kafka/3.3.1/kafka_2.12-3.3.1.tgz
fi
FOLDER=/databricks/driver/kafka/kafka_2.12-3.3.1
if test -e "$FOLDER"; then
  echo "Kafka extracted folder already exists. Skipping the extraction"
else
  tar -xzf kafka_2.12-3.3.1.tgz
fi

--2023-01-02 22:26:54--  https://downloads.apache.org/kafka/3.3.1/kafka_2.12-3.3.1.tgz
Resolving downloads.apache.org (downloads.apache.org)... 88.99.95.219, 135.181.214.104, 2a01:4f9:3a:2c57::2, ...
Connecting to downloads.apache.org (downloads.apache.org)|88.99.95.219|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 105092106 (100M) [application/x-gzip]
Saving to: ‘kafka_2.12-3.3.1.tgz’

     0K .......... .......... .......... .......... ..........  0%  148K 11m33s
    50K .......... .......... .......... .......... ..........  0%  295K 8m40s
   100K .......... .......... .......... .......... ..........  0%  296K 7m42s
   150K .......... .......... .......... .......... ..........  0% 51.4M 5m47s
   200K .......... .......... .......... .......... ..........  0%  585K 5m12s
   250K .......... .......... .......... .......... ..........  0%  593K 4m49s
   300K .......... .......... .......... .......... ..........  0% 97.7M 4m8s
   350K .......... ..........

In [None]:
%%bash
cd /databricks/driver/kafka/kafka_2.12-3.3.1
# Update config files to use a different port for Zookeeper [2181 --> 21081]
sed -i.OLD 's/2181/21081/g' config/zookeeper.properties
sed -i.OLD 's/2181/21081/g' config/server.properties

In [None]:
%%bash
# Note: This notebook cell will keep executing. You can cancel it
cd /databricks/driver/kafka/kafka_2.12-3.3.1
# Check if Zookeeper is running on port 21081, otherwise start it
if netstat -tulpn 2>&1|grep 21081|grep -q LISTEN; then 
  echo "Zookeeper already running on port 21081"; 
else 
  echo "Starting Zookeeper"
  nohup bin/zookeeper-server-start.sh config/zookeeper.properties &
fi

In [None]:
%%bash
# Note: This notebook cell will keep executing. You can cancel it
cd /databricks/driver/kafka/kafka_2.12-3.3.1
# Check if Kafka is running on port 9092, otherwise start it
if netstat -tulpn 2>&1|grep 9092|grep -q LISTEN; then 
  echo "Kafka already running on port 9092"; 
else 
  echo "Starting Kafka"
  nohup bin/kafka-server-start.sh config/server.properties &
fi

##### Create a Kafka topic and publish some messages to it

In [None]:
%%bash
cd /databricks/driver/kafka/kafka_2.12-3.3.1

# Create the topic and describe it
bin/kafka-topics.sh --create --topic quickstart-events --bootstrap-server localhost:9092 2>/dev/null
bin/kafka-topics.sh --describe --topic quickstart-events --bootstrap-server localhost:9092 2>/dev/null

# Publish some messages to this topic
printf "one\ntwo\nthree\n" >messages.txt
bin/kafka-console-producer.sh --topic quickstart-events --bootstrap-server localhost:9092 <messages.txt 2>/dev/null

Created topic quickstart-events.
Topic: quickstart-events	TopicId: U4O21y48R_-x8jtb6bj_gw	PartitionCount: 1	ReplicationFactor: 1	Configs: 
	Topic: quickstart-events	Partition: 0	Leader: 0	Replicas: 0	Isr: 0


In [None]:
streamingInputDF = (
  spark.readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", "localhost:9092")
    .option("subscribe", "quickstart-events")     
    .option("startingOffsets", "earliest")
    .load()
  )

In [None]:
df = streamingInputDF.selectExpr("CAST(topic AS STRING)", "CAST(value AS STRING)")
df.createOrReplaceTempView('messages')

In [None]:
%sql
SELECT * FROM messages;

topic,value
quickstart-events,one
quickstart-events,two
quickstart-events,three
quickstart-events,Mon Jan 2 22:42:52 UTC 2023
quickstart-events,Mon Jan 2 22:42:56 UTC 2023
quickstart-events,2023-01-02T22:43:44.057Z
quickstart-events,2023-01-02T22:43:45.246Z
quickstart-events,Mon Jan 2 22:42:52 UTC 2023
quickstart-events,Mon Jan 2 22:42:56 UTC 2023
quickstart-events,2023-01-02T22:43:44.057Z
