**To run Kafka**\
bin/zookeeper-server-start.sh config/zookeeper.properties\
bin/kafka-server-start.sh config/server.properties

**producer and consumer**\
bin/kafka-console-consumer.sh --topic retail-events --from-beginning --bootstrap-server localhost:9092\
bin/kafka-console-producer.sh --topic retail-events --bootstrap-server localhost:9092

**pyspark with kafka support**\
pyspark --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.2.1


In [1]:
from IPython.core.interactiveshell import InteractiveShell
InteractiveShell.ast_node_interactivity = "all"
#
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.2.1 pyspark-shell'
import pyspark;
sc = pyspark.SparkContext.getOrCreate();
from pyspark.sql import SparkSession;
spark = SparkSession(sc)

22/05/23 15:31:24 WARN Utils: Your hostname, CarlPC resolves to a loopback address: 127.0.1.1; using 172.25.228.240 instead (on interface eth0)
22/05/23 15:31:24 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


:: loading settings :: url = jar:file:/home/nolfonzo/spark-3.2.1-bin-hadoop3.2/jars/ivy-2.5.0.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /home/nolfonzo/.ivy2/cache
The jars for the packages stored in: /home/nolfonzo/.ivy2/jars
org.apache.spark#spark-sql-kafka-0-10_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-1a1c4fe4-0e7f-4514-b22d-0f2db2792ba8;1.0
	confs: [default]
	found org.apache.spark#spark-sql-kafka-0-10_2.12;3.2.1 in central
	found org.apache.spark#spark-token-provider-kafka-0-10_2.12;3.2.1 in central
	found org.apache.kafka#kafka-clients;2.8.0 in central
	found org.lz4#lz4-java;1.7.1 in central
	found org.xerial.snappy#snappy-java;1.1.8.4 in central
	found org.slf4j#slf4j-api;1.7.30 in central
	found org.apache.hadoop#hadoop-client-runtime;3.3.1 in central
	found org.spark-project.spark#unused;1.0.0 in central
	found org.apache.hadoop#hadoop-client-api;3.3.1 in central
	found org.apache.htrace#htrace-core4;4.1.0-incubating in central
	found commons-logging#commons-logging;1.1.3 in central
	found com.google.code.findbugs#jsr305;3.0.0 in cen

In [2]:
# Data Ingestion
# loaded the CSV files with the header and inferSchema options enabled. 
# This creates a Spark DataFrame with eight columns along with their respective data types and column names
retail_df = (spark \
   .read \
   .option("header", "true") \
   .option("inferSchema", "true") \
   .csv("/home/nolfonzo/src/pyspark/Essential-PySpark-for-Scalable-Data-Analytics/data/online_retail/online_retail_small.csv") \
)

In [32]:
retail_df.show()    

+---------+---------+--------------------+--------+--------------+---------+----------+--------------+
|InvoiceNo|StockCode|         Description|Quantity|   InvoiceDate|UnitPrice|CustomerID|       Country|
+---------+---------+--------------------+--------+--------------+---------+----------+--------------+
|   536365|   85123A|WHITE HANGING HEA...|       6|01/12/10 08:26|     2.55|     17850|United Kingdom|
|   536365|    71053| WHITE METAL LANTERN|       6|01/12/10 08:26|     3.39|     17850|United Kingdom|
|   536365|   84406B|CREAM CUPID HEART...|       8|01/12/10 08:26|     2.75|     17850|United Kingdom|
|   536365|   84029G|KNITTED UNION FLA...|       6|01/12/10 08:26|     3.39|     17850|United Kingdom|
|   536365|   84029E|RED WOOLLY HOTTIE...|       6|01/12/10 08:26|     3.39|     17850|United Kingdom|
|   536365|    22752|SET 7 BABUSHKA NE...|       2|01/12/10 08:26|     7.65|     17850|United Kingdom|
|   536365|    21730|GLASS STAR FROSTE...|       6|01/12/10 08:26|     4.

In [3]:
# Structured Parquet file format is the optimal format to be used to store data in data lakes with Apache Spark
# Other structured formats: relational databases, Avro, OCR files
#
# save the retail_df Spark DataFrame, containing raw retail transactions, to the data lake in Parquet format
(retail_df \
    .write \
    .mode("overwrite") \
    .parquet("/tmp/data-lake/online_retail_small.parquet") \
)

                                                                                

+---------+---------+--------------------+--------+--------------+---------+----------+--------------+
|InvoiceNo|StockCode|         Description|Quantity|   InvoiceDate|UnitPrice|CustomerID|       Country|
+---------+---------+--------------------+--------+--------------+---------+----------+--------------+
|   489434|    85048|15CM CHRISTMAS GL...|      12|01/12/09 07:45|     6.95|     13085|United Kingdom|
|   489434|   79323P|  PINK CHERRY LIGHTS|      12|01/12/09 07:45|     6.75|     13085|United Kingdom|
|   489434|   79323W| WHITE CHERRY LIGHTS|      12|01/12/09 07:45|     6.75|     13085|United Kingdom|
|   489434|    22041|"RECORD FRAME 7""...|      48|01/12/09 07:45|      2.1|     13085|United Kingdom|
|   489434|    21232|STRAWBERRY CERAMI...|      24|01/12/09 07:45|     1.25|     13085|United Kingdom|
|   489434|    22064|PINK DOUGHNUT TRI...|      24|01/12/09 07:45|     1.65|     13085|United Kingdom|
|   489434|    21871| SAVE THE PLANET MUG|      24|01/12/09 07:45|     1.

In [12]:
# Structured Streaming has built-in mechanisms to help you to easily maintain the state information that is required for an incremental load
# 
# Following code creates a DF for kafka broadcasting, columns converted to a JSON string
# 
from pyspark.sql.functions import to_json, struct, from_json, monotonically_increasing_id
#
kafka_broadcast_df = retail_df.withColumn("key", monotonically_increasing_id().cast("STRING")).withColumn("value", to_json(struct([retail_df[x] for x in retail_df.columns])).cast("STRING"))
kafka_broadcast_df.select("key", "value").collect()
kafka_broadcast_df.select("key", "value").show()


[Row(key='0', value='{"InvoiceNo":536365,"StockCode":"85123A","Description":"WHITE HANGING HEART T-LIGHT HOLDER","Quantity":6,"InvoiceDate":"01/12/10 08:26","UnitPrice":2.55,"CustomerID":17850,"Country":"United Kingdom"}'),
 Row(key='1', value='{"InvoiceNo":536365,"StockCode":"71053","Description":"WHITE METAL LANTERN","Quantity":6,"InvoiceDate":"01/12/10 08:26","UnitPrice":3.39,"CustomerID":17850,"Country":"United Kingdom"}'),
 Row(key='2', value='{"InvoiceNo":536365,"StockCode":"84406B","Description":"CREAM CUPID HEARTS COAT HANGER","Quantity":8,"InvoiceDate":"01/12/10 08:26","UnitPrice":2.75,"CustomerID":17850,"Country":"United Kingdom"}'),
 Row(key='3', value='{"InvoiceNo":536365,"StockCode":"84029G","Description":"KNITTED UNION FLAG HOT WATER BOTTLE","Quantity":6,"InvoiceDate":"01/12/10 08:26","UnitPrice":3.39,"CustomerID":17850,"Country":"United Kingdom"}'),
 Row(key='4', value='{"InvoiceNo":536365,"StockCode":"84029E","Description":"RED WOOLLY HOTTIE WHITE HEART.","Quantity":6,"

+---+--------------------+
|key|               value|
+---+--------------------+
|  0|{"InvoiceNo":5363...|
|  1|{"InvoiceNo":5363...|
|  2|{"InvoiceNo":5363...|
|  3|{"InvoiceNo":5363...|
|  4|{"InvoiceNo":5363...|
|  5|{"InvoiceNo":5363...|
|  6|{"InvoiceNo":5363...|
|  7|{"InvoiceNo":5363...|
|  8|{"InvoiceNo":5363...|
+---+--------------------+



In [26]:
# Broadcast kafka_df to retail-events topic
kafka_broadcast_df.select("key", "value")\
  .write\
  .format("kafka")\
  .option("kafka.bootstrap.servers", "localhost:9092")\
  .option("topic", "retail-events")\
  .save()
