In [1]:
import os
import json

from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, sum, count, split
from pyspark.sql.types import IntegerType

from kafka import KafkaProducer, KafkaAdminClient
from kafka.admin import NewTopic

In [2]:
conf = SparkConf()
conf.setAll([
    ('spark.master', 'local'),
    ('spark.app.name', 'Kafka ETL'),
    ('spark.submit.deployment', 'client'),
    ('spark.ui.showConsoleProgress', 'true'),
    ('spark.eventLog.enabled', 'false'),
    ('spark.logConf', 'false'),
    ('spark.driver.host', 'localhost'),
    ('spark.driver.memory', '6g'),
])
spark = SparkSession.builder.config(conf=conf).getOrCreate()

22/12/08 23:08:04 WARN Utils: Your hostname, hasirama resolves to a loopback address: 127.0.1.1; using 192.168.0.219 instead (on interface enp7s0)
22/12/08 23:08:04 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


22/12/08 23:08:04 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
# os.system('spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0 consume.py')

In [4]:
rdf = spark.read.options(inferSchema=True,header=True).json('News_Category_Dataset_v3.json')
rdf.show(5)

[Stage 0:>                                                          (0 + 1) / 1]                                                                                

+--------------------+---------+----------+--------------------+--------------------+--------------------+
|             authors| category|      date|            headline|                link|   short_description|
+--------------------+---------+----------+--------------------+--------------------+--------------------+
|Carla K. Johnson, AP|U.S. NEWS|2022-09-23|Over 4 Million Am...|https://www.huffp...|Health experts sa...|
|      Mary Papenfuss|U.S. NEWS|2022-09-23|American Airlines...|https://www.huffp...|He was subdued by...|
|       Elyse Wanshel|   COMEDY|2022-09-23|23 Of The Funnies...|https://www.huffp...|"Until you have a...|
|    Caroline Bologna|PARENTING|2022-09-23|The Funniest Twee...|https://www.huffp...|"Accidentally put...|
|      Nina Golgowski|U.S. NEWS|2022-09-22|Woman Who Called ...|https://www.huffp...|Amy Cooper accuse...|
+--------------------+---------+----------+--------------------+--------------------+--------------------+
only showing top 5 rows



In [5]:
def get_word_count(x):
    
    return len(x.split(' '))

def get_author_count(x):
    
    return len(x.split('and'))

rdf = rdf.drop('date', 'headline', 'link')
countUDF = udf(lambda x: get_word_count(x), IntegerType())
rdf = rdf.withColumn('word_count', countUDF(rdf.short_description))
authorCountUDF = udf(lambda x: get_author_count(x), IntegerType())
rdf = rdf.withColumn('num_authors', authorCountUDF(rdf.authors))
rdf.show(5)

+--------------------+---------+--------------------+----------+-----------+
|             authors| category|   short_description|word_count|num_authors|
+--------------------+---------+--------------------+----------+-----------+
|Carla K. Johnson, AP|U.S. NEWS|Health experts sa...|        29|          1|
|      Mary Papenfuss|U.S. NEWS|He was subdued by...|        28|          1|
|       Elyse Wanshel|   COMEDY|"Until you have a...|        12|          1|
|    Caroline Bologna|PARENTING|"Accidentally put...|        25|          1|
|      Nina Golgowski|U.S. NEWS|Amy Cooper accuse...|        25|          1|
+--------------------+---------+--------------------+----------+-----------+
only showing top 5 rows



In [6]:
group = rdf.groupBy('category').agg(
    sum('num_authors').alias('num_authors'),
    sum('word_count').alias('word_count'),
)
group.show(5)

+------------+-----------+----------+
|    category|num_authors|word_count|
+------------+-----------+----------+
|      SPORTS|       5540|     70136|
|       MEDIA|       3135|     42646|
|BLACK VOICES|       5102|     78712|
|    POLITICS|      41931|    573293|
|        ARTS|       1857|     33039|
+------------+-----------+----------+
only showing top 5 rows



[Stage 3:>                                                          (0 + 1) / 1]                                                                                

In [7]:
group.rdd.getNumPartitions()

1

In [8]:
client = KafkaAdminClient(
    bootstrap_servers=['127.0.0.1:9092'],
    client_id='kafka-spark-etl',
)

In [9]:
topic_list = []
topic = NewTopic(
    name='spark-kafka-etl',
    num_partitions=2,
    replication_factor=2,
)
topic_list.append(topic)

In [10]:
def serialize(data):
    return json.dumps(data).encode('utf-8')

In [11]:
def get_partition(key, all, available):
    return 0

In [12]:
def get_producer():
    producer = KafkaProducer(
        bootstrap_servers=['127.0.0.1:9092'],
        value_serializer=serialize,
        partitioner=get_partition,
    )
    
    return producer

In [13]:
producer = get_producer()

In [14]:
rows = group.collect()
print(len(rows))

42


In [15]:
for row in rows:
    data = {}
    data['category'] = row.category
    data['num_author'] = row.num_authors
    data['word_count'] = row.word_count
    producer.send(topic=topic.name, value=data)

In [16]:
topic.name

'spark-kafka-etl'

In [17]:
producer.flush()

In [18]:
producer.metrics()

{'kafka-metrics-count': {'count': 56.0},
 'producer-metrics': {'connection-close-rate': 0.03332335286982692,
  'connection-creation-rate': 0.0662928865693428,
  'select-rate': 0.26607828892431906,
  'io-wait-time-ns-avg': 7648676.633834839,
  'io-wait-ratio': 0.0020351463285516232,
  'io-time-ns-avg': 151664.01863098145,
  'io-ratio': 4.035481026369422e-05,
  'connection-count': 1.0,
  'network-io-rate': 0.26517179090705795,
  'outgoing-byte-rate': 112.46597216666004,
  'request-rate': 0.13258586541126582,
  'request-size-avg': 848.25,
  'request-size-max': 3274.0,
  'incoming-byte-rate': 108.45119065705963,
  'response-rate': 0.13302812645245782,
  'request-latency-avg': 26.04728937149048,
  'request-latency-max': 100.50106048583984,
  'bufferpool-wait-ratio': 0.0,
  'batch-size-avg': 3196.0,
  'batch-size-max': 3196.0,
  'compression-rate-avg': 1.0,
  'record-queue-time-avg': 0.001863718032836914,
  'record-queue-time-max': 0.001863718032836914,
  'produce-throttle-time-avg': 0.0,
  