# Spark streaming tutorial
In this tutorial, we will cover
- Integration with Kafka topic
- Processing Kafka stream data
- Finding totp trending hashtags in last 30 mins

In [1]:
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-streaming-kafka-0-10_2.12:3.2.0,org.apache.spark:spark-sql-kafka-0-10_2.12:3.2.0 pyspark-shell'

In [2]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F

spark = SparkSession.builder.appName('sparkstreamingapp').getOrCreate()
spark.sparkContext.setLogLevel("ERROR")

24/02/15 18:44:46 WARN Utils: Your hostname, ndi-mb-275 resolves to a loopback address: 127.0.0.1; using 192.168.29.185 instead (on interface en0)
24/02/15 18:44:46 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Ivy Default Cache set to: /Users/rajesh.kaushik/.ivy2/cache
The jars for the packages stored in: /Users/rajesh.kaushik/.ivy2/jars
org.apache.spark#spark-streaming-kafka-0-10_2.12 added as a dependency
org.apache.spark#spark-sql-kafka-0-10_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-8e007655-8d00-4816-8e97-1df3cea2b109;1.0
	confs: [default]


:: loading settings :: url = jar:file:/Users/rajesh.kaushik/spark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


	found org.apache.spark#spark-streaming-kafka-0-10_2.12;3.2.0 in central
	found org.apache.spark#spark-token-provider-kafka-0-10_2.12;3.2.0 in central
	found org.apache.kafka#kafka-clients;2.8.0 in central
	found org.lz4#lz4-java;1.7.1 in central
	found org.xerial.snappy#snappy-java;1.1.8.4 in central
	found org.slf4j#slf4j-api;1.7.30 in central
	found org.apache.hadoop#hadoop-client-runtime;3.3.1 in central
	found org.spark-project.spark#unused;1.0.0 in central
	found org.apache.hadoop#hadoop-client-api;3.3.1 in central
	found org.apache.htrace#htrace-core4;4.1.0-incubating in central
	found commons-logging#commons-logging;1.1.3 in central
	found com.google.code.findbugs#jsr305;3.0.0 in central
	found org.apache.spark#spark-sql-kafka-0-10_2.12;3.2.0 in central
	found org.apache.commons#commons-pool2;2.6.2 in central
:: resolution report :: resolve 296ms :: artifacts dl 13ms
	:: modules in use:
	com.google.code.findbugs#jsr305;3.0.0 from central in [default]
	commons-logging#commons-lo

We need to create a Kafka topic topic for publishing Tweets data. Make sure Kafka server is running or follow these commands
- cd /kafka/installation/path
- ./bin/zookeeper-server-start.sh ./config/zookeeper.properties
- ./bin/kafka-server-start.sh ./config/server.properties
- ./bin/kafka-topics.sh --create --topic tweets --bootstrap-server localhost:9092

In [3]:
tweets_stream = spark \
  .readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "localhost:9092") \
  .option("subscribe", "tweets") \
  .option("startingOffsets", "earliest") \
  .load()

In [4]:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, TimestampType, LongType, ArrayType
SCHEMA = StructType([
    StructField("user_name", StringType()),
    StructField("user_location", StringType()),
    StructField("user_description", StringType()),
    StructField("user_created", StringType()),
    StructField("user_followers", StringType()),
    StructField("user_friends", StringType()),
    StructField("user_favourites", StringType()),
    StructField("user_verified", StringType()),
    StructField("date", StringType()),
    StructField("text", StringType()),
    StructField("hashtags", StringType()),
    StructField("source", StringType()),
    StructField("is_retweet", StringType())
])

In [None]:
def get_hashtags_list(hashtags):
    return(eval(hashtags) if hashtags else [])

my_udf = F.udf(get_hashtags_list, ArrayType(StringType()))
    
tweets_stream = tweets_stream\
    .select('timestamp',
        F.from_json(
            # decode string as iso-8859-1
            F.decode(F.col("value"), "iso-8859-1"),
            SCHEMA
        ).alias("value")
    )\
    .withColumn("hashtag", F.explode(my_udf('value.hashtags')))\
    .groupBy(
        F.window("timestamp", "5 minutes", "1 minutes"),
        F.col('hashtag'))\
    .count()\
    .orderBy(F.col('window').desc(), F.col('count').desc())\
    .writeStream\
    .option("truncate", "false")\
    .outputMode("complete")\
    .format("console")\
    .trigger(processingTime='60 seconds') \
    .option("truncate", "false")\
    .start().awaitTermination()

                                                                                

-------------------------------------------
Batch: 0
-------------------------------------------
+------------------------------------------+-------------------+-----+
|window                                    |hashtag            |count|
+------------------------------------------+-------------------+-----+
|{2024-02-15 18:44:00, 2024-02-15 18:49:00}|COVID19            |9    |
|{2024-02-15 18:44:00, 2024-02-15 18:49:00}|toughguyswearmasks |1    |
|{2024-02-15 18:44:00, 2024-02-15 18:49:00}|facecoveringmandate|1    |
|{2024-02-15 18:44:00, 2024-02-15 18:49:00}|realmenwearmasks   |1    |
|{2024-02-15 18:44:00, 2024-02-15 18:49:00}|Covid19            |1    |
|{2024-02-15 18:44:00, 2024-02-15 18:49:00}|alreadyvideo       |1    |
|{2024-02-15 18:44:00, 2024-02-15 18:49:00}|Ukraine            |1    |
|{2024-02-15 18:44:00, 2024-02-15 18:49:00}|WearAMask          |1    |
|{2024-02-15 18:44:00, 2024-02-15 18:49:00}|GoodJobCop         |1    |
|{2024-02-15 18:44:00, 2024-02-15 18:49:00}|Saturda

                                                                                

-------------------------------------------
Batch: 1
-------------------------------------------
+------------------------------------------+----------------+-----+
|window                                    |hashtag         |count|
+------------------------------------------+----------------+-----+
|{2024-02-15 18:45:00, 2024-02-15 18:50:00}|COVID19         |39   |
|{2024-02-15 18:45:00, 2024-02-15 18:50:00}|Covid19         |6    |
|{2024-02-15 18:45:00, 2024-02-15 18:50:00}|covid19         |4    |
|{2024-02-15 18:45:00, 2024-02-15 18:50:00}|TamilNadu       |3    |
|{2024-02-15 18:45:00, 2024-02-15 18:50:00}|Coronavirus     |2    |
|{2024-02-15 18:45:00, 2024-02-15 18:50:00}|NDR             |1    |
|{2024-02-15 18:45:00, 2024-02-15 18:50:00}|Vishal          |1    |
|{2024-02-15 18:45:00, 2024-02-15 18:50:00}|BorisJohnson    |1    |
|{2024-02-15 18:45:00, 2024-02-15 18:50:00}|WASH            |1    |
|{2024-02-15 18:45:00, 2024-02-15 18:50:00}|political       |1    |
|{2024-02-15 18:45:

