# 1. Setup
Install Confluent-Kafka client

In [2]:
! pip show confluent-kafka

# 2. Batch Processing

## 2.1. Load topic data from Confluent in batch mode

In [5]:
from pyspark.sql import functions
topic_name = "databricks_test"
bootstrap.servers = "provide your confluent endpoint"
df_kafka = spark.read.format("kafka")\
    .option("kafka.bootstrap.servers", bootstrap.servers)\
    .option("subscribe", topic_name)\
    .option("kafka.security.protocol","SASL_SSL")\
    .option("kafka.sasl.mechanism", "PLAIN")\
    .option("kafka.sasl.jaas.config", "kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username=\"Replace with your api key\" password=\"replace with your api secret\";")\
    .load()

display(df_kafka)

key,value,topic,partition,offset,timestamp,timestampType
Mw==,eyJuYW1lIjogIkpvaG4gRG9lIiwgImFnZSI6IDM1LCAiZW1haWwiOiAiam9obi5kb2VAZ21haWwuY29tIn0=,databricks_test,0,0,2020-01-20T18:56:05.097+0000,0
MQ==,eyJuYW1lIjogIkphbmUgRG9lIiwgImFnZSI6IDI5LCAiZW1haWwiOiAiamFuZS5kb2VAZ21haWwuY29tIn0=,databricks_test,0,1,2020-01-20T18:56:05.097+0000,0
Mg==,eyJuYW1lIjogIkZhdGloIiwgImFnZSI6IDI0LCAiZW1haWwiOiAiZi5uYXllYmlAZ21haWwuY29tIn0=,databricks_test,0,2,2020-01-20T18:56:05.204+0000,0
Mg==,eyJuYW1lIjogIkZhdGloIiwgImFnZSI6IDI0LCAiZW1haWwiOiAiZi5uYXllYmlAZ21haWwuY29tIn0=,databricks_test,0,3,2020-01-20T18:57:15.946+0000,0
MQ==,eyJuYW1lIjogIkphbmUgRG9lIiwgImFnZSI6IDI5LCAiZW1haWwiOiAiamFuZS5kb2VAZ21haWwuY29tIn0=,databricks_test,0,4,2020-01-20T18:57:15.946+0000,0
Mw==,eyJuYW1lIjogIkpvaG4gRG9lIiwgImFnZSI6IDM1LCAiZW1haWwiOiAiam9obi5kb2VAZ21haWwuY29tIn0=,databricks_test,0,5,2020-01-20T18:57:16.004+0000,0


## 2.2. Write a Kafka sink for batch queries

### 2.2.1. Create sample data

In [8]:
# import pyspark class Row from module sql
from pyspark.sql import *

In [9]:
# Create the sample data
sample_data = Row("key", "value", "topic")
sample1 = sample_data('1', '{"name": "Jane Doe", "age": 29, "email": "jane.doe@gmail.com"}', "test-topic")
sample2 = sample_data('2', '{"name": "Fatih", "age": 24, "email": "f.nayebi@gmail.com"}', "test-topic")
sample3 = sample_data('3', '{"name": "John Doe", "age": 35, "email": "john.doe@gmail.com"}', "test-topic")

print(sample1)

### 2.2.2. Create a dataframe from sample data

In [11]:
df = spark.createDataFrame([sample1, sample2, sample3])

In [12]:
df.show()

In [13]:
display(df)

key,value,topic
1,"{""name"": ""Jane Doe"", ""age"": 29, ""email"": ""jane.doe@gmail.com""}",test-topic
2,"{""name"": ""Fatih"", ""age"": 24, ""email"": ""f.nayebi@gmail.com""}",test-topic
3,"{""name"": ""John Doe"", ""age"": 35, ""email"": ""john.doe@gmail.com""}",test-topic


### 2.2.3. Write data from a dataframe to a confluent kafka topic

In [15]:
# Write key-value data from a DataFrame to a specific Kafka topic specified in an option
ds = df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")\
    .write.format("kafka")\
    .option("kafka.bootstrap.servers", bootstrap.servers)\
    .option("subscribe", topic_name)\
    .option("kafka.security.protocol","SASL_SSL")\
    .option("kafka.sasl.mechanism", "PLAIN")\
    .option("kafka.sasl.jaas.config", "kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username=\"Replace with your api key\" password=\"replace with your api secret\";")\
    .option("topic", topic_name)\
    .save()

# 3. Stream Processing

## 3.1. Read a stream from Kafka

In [18]:
df_stream = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", bootstrap.servers)\
    .option("subscribe", topic_name)\
    .option("kafka.security.protocol","SASL_SSL")\
    .option("kafka.sasl.mechanism", "PLAIN")\
    .option("kafka.sasl.jaas.config", "kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username=\"Replace with your api key\" password=\"replace with your api secret\";")\
    .load() \

display(df_stream)

key,value,topic,partition,offset,timestamp,timestampType


## 3.2. Write a Kafka sink for streaming queries

In [20]:
# Write key-value data from a DataFrame to a specific Kafka topic specified in an option
ds = df_stream \
  .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") \
  .writeStream \
  .format("kafka") \
    .option("kafka.bootstrap.servers", bootstrap.servers)\
    .option("subscribe", topic_name)\
    .option("kafka.security.protocol","SASL_SSL")\
    .option("kafka.sasl.mechanism", "PLAIN")\
    .option("kafka.sasl.jaas.config", "kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username=\"Replace with your api key\" password=\"replace with your api secret\";")\
  .option("topic", "databricks_test") \
  .option("checkpointLocation", "/dbfs/dir") \
  .start()