In [0]:
from pyspark.sql import SparkSession

# first we enrich our Spark session with some knowledge about Kafka
spark = (
    SparkSession.builder
    # integration with Kafka is done as recommended by the official doc:
    # http://spark.apache.org/docs/3.2.1/streaming-kafka-0-10-integration.html
    .config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.2.1")
    # setting a checkpoint directory is mandatory
    # without it streaming jobs simply won't work
    .config("spark.sql.streaming.checkpointLocation", "checkpoints")
    .getOrCreate()
)

In [0]:
input_topic = (
    spark.readStream.format("kafka")
    .option("subscribe", "input_topic")
    .option(
        "kafka.bootstrap.servers",
        "put the Kafka cluster address here"
    )
    # these two lines are similar to configuring `kafka-python`
    .option("kafka.security.protocol", "SASL_SSL")
    .option("kafka.sasl.mechanism","PLAIN")
    # this mysterious line must include API key and secret from the cluster
    # see more in the documentation https://kafka.apache.org/documentation/#security_jaas_broker
    .option(
        "kafka.sasl.jaas.config",
        # remove kafkashaded prefix when not on Databricks
        """kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required
        username="use API key as username"
        password="use API secret as username";
        """
    )
    .load()
)

In [0]:
# Spark has a feature called Structured Streaming
# the idea is that streams are DataFrames
print(type(input_topic))
input_topic

<class 'pyspark.sql.dataframe.DataFrame'>
Out[3]: DataFrame[key: binary, value: binary, topic: string, partition: int, offset: bigint, timestamp: timestamp, timestampType: int]

In [0]:
# unfortunately, it doesn't work with pandas API, only with Spark DataFrames
input_topic.to_pandas_on_spark()

[0;31m---------------------------------------------------------------------------[0m
[0;31mAssertionError[0m                            Traceback (most recent call last)
[0;32m<command-849733004669718>[0m in [0;36m<module>[0;34m[0m
[1;32m      1[0m [0;31m# unfortunately, it doesn't work with pandas API, only with Spark DataFrames[0m[0;34m[0m[0;34m[0m[0;34m[0m[0m
[0;32m----> 2[0;31m [0minput_topic[0m[0;34m.[0m[0mto_pandas_on_spark[0m[0;34m([0m[0;34m)[0m[0;34m[0m[0;34m[0m[0m
[0m
[0;32m/databricks/spark/python/pyspark/sql/dataframe.py[0m in [0;36mto_pandas_on_spark[0;34m(self, index_col)[0m
[1;32m   2955[0m [0;34m[0m[0m
[1;32m   2956[0m         [0mindex_spark_columns[0m[0;34m,[0m [0mindex_names[0m [0;34m=[0m [0m_get_index_map[0m[0;34m([0m[0mself[0m[0;34m,[0m [0mindex_col[0m[0;34m)[0m[0;34m[0m[0;34m[0m[0m
[0;32m-> 2957[0;31m         internal = InternalFrame(
[0m[1;32m   2958[0m             [0mspark_frame[0m[

The documentation mentions a [workaround](https://spark.apache.org/docs/latest/api/python/user_guide/pandas_on_spark/faq.html#does-pandas-api-on-spark-support-structured-streaming) to use pandas-on-Spark with Structured Streaming.

Unfortunately, it means you write your stream to nowhere and do something with batches of this stream as a side effect.

The drawbacks of it are:
* you have to care about writing you stream somewhere yourself
* you always work with data in mini-batches, no continuous streams
* you can't use advanced streaming features like [watermarks](http://spark.apache.org/docs/3.2.1/structured-streaming-programming-guide.html#handling-late-data-and-watermarking)

So, we are not going for it.

In [0]:
# in Spark, we use identical DataFrame API both for files on disks and for streams
transformed_dataframe = (
    input_topic
    .select(
        (
            # by default, the streaming DataFrame has `value` column
            # it contain bytes, so first it should be decoded as a string
            # then we transform it to float
            10 * input_topic["value"].astype("string").astype("float")
        # to save data, we encode it to string (Spark will make them bytes for us)
        # Kafka expects only key and value columns to be in the data at most
        ).astype("string").alias("value")
    )
)

In [0]:
# we don't need to add other columns like timestamp or partition
# Kafka sink will add them automatically
transformed_dataframe

Out[6]: DataFrame[value: string]

In [0]:
# write stream back to Kafka
output_topic = (
    transformed_dataframe
    .writeStream
    .format("kafka")
    .option("topic", "output_topic")
    # these line are the same as in the input topic
    .option(
        "kafka.bootstrap.servers",
        "put the Kafka cluster address here"
    )
    .option("kafka.security.protocol", "SASL_SSL")
    .option("kafka.sasl.mechanism","PLAIN")
    .option(
        "kafka.sasl.jaas.config",
        """kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required
        username="use API key as username"
        password="use API secret as username";
        """
    )
)

In [0]:
# the most important part! It's an action. Everything else were transformations
job = output_topic.start()

In [0]:
# job runs asynchronously
# simply changing its definition and starting again won't kill it
# stop your jobs gracefully
job.stop()

# Do it Yourself

* compute average value and the number of values
* inside a 10 seconds window
* output the window description and the computation results to Kafka