### Streaming Data from Kafka

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, from_json
from pyspark.sql.types import StructType, StructField, StringType, ArrayType, FloatType, LongType, IntegerType, BooleanType

In [2]:
spark = SparkSession.builder \
    .appName("KafkaSparkConsumer1") \
    .config("spark.sql.streaming.checkpointLocation", "/tmp/kafka_checkpoint") \
    .config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.4") \
    .getOrCreate()

:: loading settings :: url = jar:file:/opt/anaconda3/envs/data_eng/lib/python3.11/site-packages/pyspark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /Users/bharathvelamala/.ivy2/cache
The jars for the packages stored in: /Users/bharathvelamala/.ivy2/jars
org.apache.spark#spark-sql-kafka-0-10_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-f26c1409-941d-4eb1-98cd-3d844fe74cfc;1.0
	confs: [default]
	found org.apache.spark#spark-sql-kafka-0-10_2.12;3.5.4 in central
	found org.apache.spark#spark-token-provider-kafka-0-10_2.12;3.5.4 in central
	found org.apache.kafka#kafka-clients;3.4.1 in central
	found org.lz4#lz4-java;1.8.0 in central
	found org.xerial.snappy#snappy-java;1.1.10.5 in central
	found org.slf4j#slf4j-api;2.0.7 in central
	found org.apache.hadoop#hadoop-client-runtime;3.3.4 in central
	found org.apache.hadoop#hadoop-client-api;3.3.4 in central
	found commons-logging#commons-logging;1.1.3 in central
	found com.google.code.findbugs#jsr305;3.0.0 in central
	found org.apache.commons#commons-pool2;2.11.1 in central
:: resolution report :: resolve 297ms :: a

In [3]:
channel_schema = StructType() \
    .add("channel_id", StringType()) \
    .add("title", StringType()) \
    .add("description", StringType()) \
    .add("custom_url", StringType()) \
    .add("published_at", StringType()) \
    .add("country", StringType()) \
    .add("subscriber_count", IntegerType()) \
    .add("view_count", IntegerType()) \
    .add("video_count", IntegerType()) \
    .add("hidden_subscriber_count", BooleanType()) \
    .add("high_thumbnail", StringType())

In [4]:
video_schema = StructType() \
    .add("video_id", StringType()) \
    .add("title", StringType()) \
    .add("description", StringType()) \
    .add("description_summary", StringType()) \
    .add("channel_id", StringType()) \
    .add("channel_title", StringType()) \
    .add("published_at", StringType()) \
    .add("published_year", StringType()) \
    .add("view_count", IntegerType()) \
    .add("like_count", IntegerType()) \
    .add("comment_count", IntegerType()) \
    .add("favorite_count", IntegerType()) \
    .add("engagement_ratio", IntegerType()) \
    .add("likes_per_view", IntegerType()) \
    .add("comments_per_view", IntegerType()) \
    .add("thumbnail_url", StringType()) \
    .add("thumbnail_width", IntegerType()) \
    .add("thumbnail_height", IntegerType()) \
    .add("duration", StringType()) \
    .add("definition", StringType()) \
    .add("caption", BooleanType()) \
    .add("licensed_content", BooleanType()) \
    .add("tags", StringType()) \
    .add("tag_count", IntegerType()) \
    .add("category_id", StringType()) \
    .add("live_broadcast_content", StringType()) \
    .add("default_language", StringType()) \
    .add("default_audio_language", StringType()) \
    .add("privacy_status", StringType()) \
    .add("upload_status", StringType()) \
    .add("embeddable", BooleanType()) \
    .add("made_for_kids", BooleanType()) \
    .add("title_length", IntegerType()) \
    .add("description_length", IntegerType()) \
    .add("has_hashtags", BooleanType())

In [5]:
comment_schema = StructType() \
    .add("comment_id", StringType()) \
    .add("video_id", StringType()) \
    .add("channel_id", StringType()) \
    .add("text_display", StringType()) \
    .add("text_original", StringType()) \
    .add("author_display_name", StringType()) \
    .add("author_profile_image_url", StringType()) \
    .add("author_channel_url", StringType()) \
    .add("author_channel_id", StringType()) \
    .add("like_count", IntegerType()) \
    .add("published_at", StringType()) \
    .add("updated_at", StringType()) \
    .add("can_reply", BooleanType()) \
    .add("total_reply_count", IntegerType()) \
    .add("is_public", BooleanType())

In [6]:
transcripts_schema = StructType([
    StructField("videoId", StringType(), True),
    StructField("transcript", ArrayType(StructType([
        StructField("text", StringType(), True),
        StructField("start", FloatType(), True),
        StructField("duration", FloatType(), True),
        StructField("videoId", StringType(), True)
    ])), True),
    StructField("transcriptMetrics", StructType([
        StructField("segmentCount", IntegerType(), True),
        StructField("totalDuration", FloatType(), True),
        StructField("wordCount", LongType(), True)
    ]), True),
    StructField("processedDate", StringType(), True)
])

In [7]:
kafka_video_df = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("subscribe", "youtube_video_info") \
    .option("startingOffsets", "earliest") \
    .load()

kafka_channel_df = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("subscribe", "youtube_channel_info") \
    .option("startingOffsets", "earliest") \
    .load()

kafka_comment_df = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("subscribe", "youtube_video_comments") \
    .option("startingOffsets", "earliest") \
    .load()

kafka_transcripts_df = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("subscribe", "youtube_transcripts") \
    .option("startingOffsets", "earliest") \
    .load()

In [8]:
video_parsed_df = kafka_video_df \
    .selectExpr("CAST(value AS STRING)") \
    .select(from_json(col("value"), video_schema).alias("data")) \
    .select("data.*")

channel_parsed_df = kafka_channel_df \
    .selectExpr("CAST(value AS STRING)") \
    .select(from_json(col("value"), channel_schema).alias("data")) \
    .select("data.*")

comment_parsed_df = kafka_comment_df \
    .selectExpr("CAST(value AS STRING)") \
    .select(from_json(col("value"), comment_schema).alias("data")) \
    .select("data.*")

transcripts_parsed_df = kafka_transcripts_df \
    .selectExpr("CAST(value AS STRING)") \
    .select(from_json(col("value"), transcripts_schema).alias("data")) \
    .select("data.*")

25/03/12 11:59:29 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


In [9]:
video_parsed_df.printSchema()

root
 |-- video_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- description: string (nullable = true)
 |-- description_summary: string (nullable = true)
 |-- channel_id: string (nullable = true)
 |-- channel_title: string (nullable = true)
 |-- published_at: string (nullable = true)
 |-- published_year: string (nullable = true)
 |-- view_count: integer (nullable = true)
 |-- like_count: integer (nullable = true)
 |-- comment_count: integer (nullable = true)
 |-- favorite_count: integer (nullable = true)
 |-- engagement_ratio: integer (nullable = true)
 |-- likes_per_view: integer (nullable = true)
 |-- comments_per_view: integer (nullable = true)
 |-- thumbnail_url: string (nullable = true)
 |-- thumbnail_width: integer (nullable = true)
 |-- thumbnail_height: integer (nullable = true)
 |-- duration: string (nullable = true)
 |-- definition: string (nullable = true)
 |-- caption: boolean (nullable = true)
 |-- licensed_content: boolean (nullable = true)
 |-- tags

In [10]:
channel_parsed_df.printSchema()

root
 |-- channel_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- description: string (nullable = true)
 |-- custom_url: string (nullable = true)
 |-- published_at: string (nullable = true)
 |-- country: string (nullable = true)
 |-- subscriber_count: integer (nullable = true)
 |-- view_count: integer (nullable = true)
 |-- video_count: integer (nullable = true)
 |-- hidden_subscriber_count: boolean (nullable = true)
 |-- high_thumbnail: string (nullable = true)



In [11]:
comment_parsed_df.printSchema()

root
 |-- comment_id: string (nullable = true)
 |-- video_id: string (nullable = true)
 |-- channel_id: string (nullable = true)
 |-- text_display: string (nullable = true)
 |-- text_original: string (nullable = true)
 |-- author_display_name: string (nullable = true)
 |-- author_profile_image_url: string (nullable = true)
 |-- author_channel_url: string (nullable = true)
 |-- author_channel_id: string (nullable = true)
 |-- like_count: integer (nullable = true)
 |-- published_at: string (nullable = true)
 |-- updated_at: string (nullable = true)
 |-- can_reply: boolean (nullable = true)
 |-- total_reply_count: integer (nullable = true)
 |-- is_public: boolean (nullable = true)



In [12]:
transcripts_parsed_df.printSchema()

root
 |-- videoId: string (nullable = true)
 |-- transcript: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- text: string (nullable = true)
 |    |    |-- start: float (nullable = true)
 |    |    |-- duration: float (nullable = true)
 |    |    |-- videoId: string (nullable = true)
 |-- transcriptMetrics: struct (nullable = true)
 |    |-- segmentCount: integer (nullable = true)
 |    |-- totalDuration: float (nullable = true)
 |    |-- wordCount: long (nullable = true)
 |-- processedDate: string (nullable = true)



In [13]:
video_query = video_parsed_df \
    .writeStream \
    .outputMode("append") \
    .format("console") \
    .start()

# Await termination for 10 seconds
video_query.awaitTermination(10)

25/03/12 11:59:29 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.
25/03/12 11:59:30 WARN AdminClientConfig: These configurations '[key.deserializer, value.deserializer, enable.auto.commit, max.poll.records, auto.offset.reset]' were supplied but are not used yet.
                                                                                

-------------------------------------------
Batch: 0
-------------------------------------------
+-----------+--------------------+--------------------+--------------------+--------------------+----------------+--------------------+--------------+----------+----------+-------------+--------------+----------------+--------------+-----------------+--------------------+---------------+----------------+--------+----------+-------+----------------+--------------------+---------+-----------+----------------------+----------------+----------------------+--------------+-------------+----------+-------------+------------+------------------+------------+
|   video_id|               title|         description| description_summary|          channel_id|   channel_title|        published_at|published_year|view_count|like_count|comment_count|favorite_count|engagement_ratio|likes_per_view|comments_per_view|       thumbnail_url|thumbnail_width|thumbnail_height|duration|definition|caption|licensed_conte

False

In [14]:

channel_query = channel_parsed_df \
    .writeStream \
    .outputMode("append") \
    .format("console") \
    .start()

channel_query.awaitTermination(10)

25/03/12 11:59:39 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.
25/03/12 11:59:40 WARN AdminClientConfig: These configurations '[key.deserializer, value.deserializer, enable.auto.commit, max.poll.records, auto.offset.reset]' were supplied but are not used yet.


-------------------------------------------
Batch: 0
-------------------------------------------
+--------------------+----------------+--------------------+-------------+--------------------+-------+----------------+----------+-----------+-----------------------+--------------------+
|          channel_id|           title|         description|   custom_url|        published_at|country|subscriber_count|view_count|video_count|hidden_subscriber_count|      high_thumbnail|
+--------------------+----------------+--------------------+-------------+--------------------+-------+----------------+----------+-----------+-----------------------+--------------------+
|UC8butISFwT-Wl7EV...|freeCodeCamp.org|Learn to code for...|@freecodecamp|2014-12-16T21:18:48Z|     US|        10600000| 839961366|       1822|                  false|https://yt3.ggpht...|
|UCHnyfMqiRRG1u-2M...|      Veritasium|An element of tru...|  @veritasium|2010-07-21T07:18:02Z|     US|        17500000|      NULL|        431|    

False

In [15]:
comment_query = comment_parsed_df \
    .writeStream \
    .outputMode("append") \
    .format("console") \
    .start()

comment_query.awaitTermination(10)

25/03/12 11:59:50 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.
25/03/12 11:59:50 WARN AdminClientConfig: These configurations '[key.deserializer, value.deserializer, enable.auto.commit, max.poll.records, auto.offset.reset]' were supplied but are not used yet.
                                                                                

-------------------------------------------
Batch: 0
-------------------------------------------
+--------------------+-----------+--------------------+--------------------+--------------------+--------------------+------------------------+--------------------+--------------------+----------+--------------------+--------------------+---------+-----------------+---------+
|          comment_id|   video_id|          channel_id|        text_display|       text_original| author_display_name|author_profile_image_url|  author_channel_url|   author_channel_id|like_count|        published_at|          updated_at|can_reply|total_reply_count|is_public|
+--------------------+-----------+--------------------+--------------------+--------------------+--------------------+------------------------+--------------------+--------------------+----------+--------------------+--------------------+---------+-----------------+---------+
|Ugxqcek3RUzxWv-4V...|rfscVS0vtbw|UC8butISFwT-Wl7EV...|Want more from Mi

False

In [None]:
transcripts_query = transcripts_parsed_df \
    .writeStream \
    .outputMode("append") \
    .format("console") \
    .start()

transcripts_query.awaitTermination(10)

25/03/12 12:00:00 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.
25/03/12 12:00:00 WARN AdminClientConfig: These configurations '[key.deserializer, value.deserializer, enable.auto.commit, max.poll.records, auto.offset.reset]' were supplied but are not used yet.
                                                                                

-------------------------------------------
Batch: 0
-------------------------------------------
+-----------+--------------------+--------------------+--------------------+
|    videoId|          transcript|   transcriptMetrics|       processedDate|
+-----------+--------------------+--------------------+--------------------+
|rfscVS0vtbw|[{In this course,...|{2935, 15928.48, ...|2025-03-12T18:27:...|
|HeQX2HjkcNo|[{There is a hole...|{317, 1935.706, 5...|2025-03-12T18:27:...|
|PmlRbfSavbI|[{(audience membe...|{365, 754.981, 2779}|2025-03-12T18:27:...|
|UT2noVDFoaA|[{NARRATOR: Austr...|    {19, 63.88, 149}|2025-03-12T18:27:...|
|UT2noVDFoaA|[{NARRATOR: Austr...|    {19, 63.88, 149}|2025-03-12T18:27:...|
+-----------+--------------------+--------------------+--------------------+



False

25/03/12 12:50:24 WARN HeartbeatReceiver: Removing executor driver with no recent heartbeats: 278591 ms exceeds timeout 120000 ms
25/03/12 12:50:24 WARN SparkContext: Killing executors is not supported by current scheduler.
25/03/12 12:50:24 ERROR Inbox: Ignoring error
org.apache.spark.SparkException: Exception thrown in awaitResult: 
	at org.apache.spark.util.SparkThreadUtils$.awaitResult(SparkThreadUtils.scala:56)
	at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:310)
	at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
	at org.apache.spark.rpc.RpcEnv.setupEndpointRefByURI(RpcEnv.scala:102)
	at org.apache.spark.rpc.RpcEnv.setupEndpointRef(RpcEnv.scala:110)
	at org.apache.spark.util.RpcUtils$.makeDriverRef(RpcUtils.scala:36)
	at org.apache.spark.storage.BlockManagerMasterEndpoint.driverEndpoint$lzycompute(BlockManagerMasterEndpoint.scala:124)
	at org.apache.spark.storage.BlockManagerMasterEndpoint.org$apache$spark$storage$BlockManagerMasterEndpoint$$

-------------------------------------------
Batch: 1
-------------------------------------------
+--------------------+-----------------+--------------------+----------------+--------------------+-------+----------------+----------+-----------+-----------------------+--------------------+
|          channel_id|            title|         description|      custom_url|        published_at|country|subscriber_count|view_count|video_count|hidden_subscriber_count|      high_thumbnail|
+--------------------+-----------------+--------------------+----------------+--------------------+-------+----------------+----------+-----------+-----------------------+--------------------+
|UCbfYPyITQ-7l4upo...|Two Minute Papers|What a time to be...|@twominutepapers|2006-08-18T00:05:41Z|     HU|         1620000| 149983304|        957|                  false|https://yt3.ggpht...|
|UCY1kMZp36IQSyNx_...|       Mark Rober|Former NASA engin...|      @markrober|2011-10-20T06:17:58Z|     US|        65200000|      N

                                                                                

-------------------------------------------
Batch: 1
-------------------------------------------
+-----------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------+----------+----------+-------------+--------------+----------------+--------------+-----------------+--------------------+---------------+----------------+--------+----------+-------+----------------+--------------------+---------+-----------+----------------------+----------------+----------------------+--------------+-------------+----------+-------------+------------+------------------+------------+
|   video_id|               title|         description| description_summary|          channel_id|       channel_title|        published_at|published_year|view_count|like_count|comment_count|favorite_count|engagement_ratio|likes_per_view|comments_per_view|       thumbnail_url|thumbnail_width|thumbnail_height|duration|definition|caption|licens

                                                                                

-------------------------------------------
Batch: 1
-------------------------------------------
+--------------------+-----------+--------------------+--------------------+--------------------+--------------------+------------------------+--------------------+--------------------+----------+--------------------+--------------------+---------+-----------------+---------+
|          comment_id|   video_id|          channel_id|        text_display|       text_original| author_display_name|author_profile_image_url|  author_channel_url|   author_channel_id|like_count|        published_at|          updated_at|can_reply|total_reply_count|is_public|
+--------------------+-----------+--------------------+--------------------+--------------------+--------------------+------------------------+--------------------+--------------------+----------+--------------------+--------------------+---------+-----------------+---------+
|Ugxqcek3RUzxWv-4V...|rfscVS0vtbw|UC8butISFwT-Wl7EV...|Want more from Mi

25/03/12 12:59:54 WARN Executor: Issue communicating with driver in heartbeater
org.apache.spark.SparkException: Exception thrown in awaitResult: 
	at org.apache.spark.util.SparkThreadUtils$.awaitResult(SparkThreadUtils.scala:56)
	at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:310)
	at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
	at org.apache.spark.rpc.RpcEndpointRef.askSync(RpcEndpointRef.scala:101)
	at org.apache.spark.rpc.RpcEndpointRef.askSync(RpcEndpointRef.scala:85)
	at org.apache.spark.storage.BlockManagerMaster.registerBlockManager(BlockManagerMaster.scala:80)
	at org.apache.spark.storage.BlockManager.reregister(BlockManager.scala:642)
	at org.apache.spark.executor.Executor.reportHeartBeat(Executor.scala:1223)
	at org.apache.spark.executor.Executor.$anonfun$heartbeater$1(Executor.scala:295)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1928)

-------------------------------------------
Batch: 1
-------------------------------------------
+-----------+--------------------+--------------------+--------------------+
|    videoId|          transcript|   transcriptMetrics|       processedDate|
+-----------+--------------------+--------------------+--------------------+
|rfscVS0vtbw|[{In this course,...|{2935, 15928.48, ...|2025-03-12T19:59:...|
|HeQX2HjkcNo|[{There is a hole...|{317, 1935.706, 5...|2025-03-12T19:59:...|
|PmlRbfSavbI|[{(audience membe...|{365, 754.981, 2779}|2025-03-12T19:59:...|
|Lu56xVlZ40M|[{While we look a...|   {85, 356.31, 840}|2025-03-12T19:59:...|
+-----------+--------------------+--------------------+--------------------+



25/03/12 13:00:04 ERROR Inbox: Ignoring error
org.apache.spark.SparkException: Exception thrown in awaitResult: 
	at org.apache.spark.util.SparkThreadUtils$.awaitResult(SparkThreadUtils.scala:56)
	at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:310)
	at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
	at org.apache.spark.rpc.RpcEnv.setupEndpointRefByURI(RpcEnv.scala:102)
	at org.apache.spark.rpc.RpcEnv.setupEndpointRef(RpcEnv.scala:110)
	at org.apache.spark.util.RpcUtils$.makeDriverRef(RpcUtils.scala:36)
	at org.apache.spark.storage.BlockManagerMasterEndpoint.driverEndpoint$lzycompute(BlockManagerMasterEndpoint.scala:124)
	at org.apache.spark.storage.BlockManagerMasterEndpoint.org$apache$spark$storage$BlockManagerMasterEndpoint$$driverEndpoint(BlockManagerMasterEndpoint.scala:123)
	at org.apache.spark.storage.BlockManagerMasterEndpoint.isExecutorAlive$lzycompute$1(BlockManagerMasterEndpoint.scala:688)
	at org.apache.spark.storage.BlockManagerMasterE