## Writing to Kafka Topic

Let us understand how to write messages to Kafka topic using Spark Structured Streaming APIs.
* We can specify `kafka` as part of `format` to write data to Kafka topic.
* It is mandatory to specify the checkpoint location, bootstrap servers as well as Kafka topic name.

In [None]:
from pyspark.sql import SparkSession

import getpass
username = getpass.getuser()

spark = SparkSession. \
    builder. \
    config('spark.jars.packages', 'org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.1'). \
    config('spark.ui.port', '0'). \
    config('spark.sql.warehouse.dir', f'/user/{username}/warehouse'). \
    enableHiveSupport(). \
    appName(f'{username} | Python - Kafka and Spark Integration'). \
    master('yarn'). \
    getOrCreate()

In [None]:
spark.conf.set('spark.sql.shuffle.partitions', '2')

In [None]:
import socket
hostname = socket.gethostname()

In [None]:
log_messages = spark. \
    readStream. \
    format("socket"). \
    option("host", hostname). \
    option("port", 9000). \
    load()

In [None]:
log_messages.isStreaming

In [None]:
log_messages.printSchema()

In [None]:
from pyspark.sql.functions import regexp_extract

In [None]:
APACHE_ACCESS_LOG_PATTERN = '^(\S+) (\S+) (\S+) \[([\w:/]+\s[+\-]\d{4})\] "(.*?)" (\d+) (\d+) (\S+) "(.*?)"'

In [None]:
log_messages_extracted = log_messages.withColumn('ipaddress', regexp_extract('value', APACHE_ACCESS_LOG_PATTERN, 1)). \
    withColumn('message_ts', regexp_extract('value', APACHE_ACCESS_LOG_PATTERN, 4)). \
    withColumn('message_endpoint', regexp_extract('value', APACHE_ACCESS_LOG_PATTERN, 5)). \
    drop('value')

In [None]:
from pyspark.sql.functions import to_json, struct

In [None]:
log_messages_extracted_json = log_messages_extracted. \
    select(
        to_json(
            struct([log_messages_extracted[x] for x in log_messages_extracted.columns])
        ).alias("value")
    )

In [None]:
log_messages_extracted_json. \
    writeStream. \
    format("kafka"). \
    option('checkpointLocation', f'/user/{username}/kafka/retail_logs_json/gen_logs/checkpoint'). \
    option("kafka.bootstrap.servers", "w01.itversity.com:9092,w02.itversity.com:9092,w03.itversity.com:9092"). \
    option("topic", f"{username}_retail_json"). \
    trigger(processingTime='60 seconds'). \
    start()

In [None]:
!hdfs dfs -ls /user/${USER}/kafka/retail_logs_json/gen_logs/checkpoint

In [None]:
!hdfs dfs -ls /user/${USER}/kafka/retail_logs_json/gen_logs/checkpoint/offsets

In [None]:
!hdfs dfs -cat /user/${USER}/kafka/retail_logs_json/gen_logs/checkpoint/metadata