In [1]:
from pyspark import SparkConf,SparkContext
from pyspark.streaming import StreamingContext
from pyspark.sql import Row,SQLContext
import sys
import requests

In [2]:
def aggregate_tags_count(new_values, total_sum):
    return sum(new_values) + (total_sum or 0)

In [3]:
def get_sql_context_instance(spark_context):
    if ('sqlContextSingletonInstance' not in globals()):
        globals()['sqlContextSingletonInstance'] = SQLContext(spark_context)
    return globals()['sqlContextSingletonInstance']

In [4]:
def process_rdd(time, rdd):
    print("----------- %s -----------" % str(time))
    try:
       # Get spark sql singleton context from the current context
        sql_context = get_sql_context_instance(rdd.context)
        print("Get spark sql singleton context from the current context ----------- %s -----------" % str(time))
    
        # convert the RDD to Row RDD
        row_rdd = rdd.map(lambda w: Row(word=w[0], word_count=w[1]))
   
        # create a DF from the Row RDD
        hashtags_df = sql_context.createDataFrame(row_rdd)
        
        # Register the dataframe as table
        hashtags_df.registerTempTable("hashtags")
   
        # get the top 10 hashtags from the table using SQL and print them
        hashtag_counts_df = sql_context.sql("select word , word_count from hashtags where word like '#%'order by word_count desc limit 10")
        hashtag_counts_df.show()
        hashtag_counts_df.coalesce(1).write.format('com.databricks.spark.csv').mode('overwrite').option("header", "true").csv("/Users/girishdurgaiah/hashtag_file.csv") 
   
        country_counts_df = sql_context.sql("select word as country_code, word_count as tweet_count from hashtags where word like 'CC%'order by word_count desc limit 10")
        country_counts_df.show()
        country_counts_df.coalesce(1).write.format('com.databricks.spark.csv').mode('overwrite').option("header", "true").csv("/Users/girishdurgaiah/country_file.csv")
   
        device_df = sql_context.sql("select word as device, word_count as device_count from hashtags where word like 'TS%'order by word_count desc limit 10")
        device_df.show()
        device_df.coalesce(1).write.format('com.databricks.spark.csv').mode('overwrite').option("header", "true").csv("/Users/girishdurgaiah/device_file.csv")
           
    except:
        pass

In [5]:
# create spark configuration
conf = SparkConf()
conf.setAppName("TwitterStreamApp")

<pyspark.conf.SparkConf at 0x7fb60c868d90>

In [6]:
# create spark context with the above configuration
sc = SparkContext(conf=conf)
sc.setLogLevel("ERROR")

22/04/11 20:44:47 WARN Utils: Your hostname, localhp14 resolves to a loopback address: 127.0.1.1; using 192.168.1.12 instead (on interface eno1)
22/04/11 20:44:47 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


In [7]:
# create the Streaming Context from the above spark context with interval size 2 seconds
ssc = StreamingContext(sc, 2)

In [8]:
# setting a checkpoint to allow RDD recovery
ssc.checkpoint("checkpoint_TwitterApp")

In [9]:
# read data from port 9009
dataStream = ssc.socketTextStream("localhost",9009)

In [10]:
# split each tweet into words
words = dataStream.flatMap(lambda line: line.split(" "))

In [11]:
# filter the words to get only hashtags, then map each hashtag to be a pair of (hashtag,1)
hashtags = words.map(lambda x: (x, 1)) 

In [12]:
# adding the count of each hashtag to its last count
tags_totals = hashtags.updateStateByKey(aggregate_tags_count)

In [13]:
# do processing for each RDD generated in each interval
tags_totals.foreachRDD(process_rdd)

In [14]:
# start the streaming computation
ssc.start()

# wait for the streaming to finish
ssc.awaitTermination()



----------- 2022-04-11 20:46:50 -----------
Get spark sql singleton context from the current context ----------- 2022-04-11 20:46:50 -----------


                                                                                

----------- 2022-04-11 20:46:52 -----------
Get spark sql singleton context from the current context ----------- 2022-04-11 20:46:52 -----------


                                                                                

----------- 2022-04-11 20:46:54 -----------
Get spark sql singleton context from the current context ----------- 2022-04-11 20:46:54 -----------


[Stage 2:>                                                          (0 + 1) / 1]                                                                                

----------- 2022-04-11 20:46:56 -----------
Get spark sql singleton context from the current context ----------- 2022-04-11 20:46:56 -----------
----------- 2022-04-11 20:46:58 -----------
Get spark sql singleton context from the current context ----------- 2022-04-11 20:46:58 -----------


                                                                                

----------- 2022-04-11 20:47:00 -----------
Get spark sql singleton context from the current context ----------- 2022-04-11 20:47:00 -----------


[Stage 2:>                                                          (0 + 1) / 1]                                                                                

----------- 2022-04-11 20:47:02 -----------
Get spark sql singleton context from the current context ----------- 2022-04-11 20:47:02 -----------
----------- 2022-04-11 20:47:04 -----------
Get spark sql singleton context from the current context ----------- 2022-04-11 20:47:04 -----------
----------- 2022-04-11 20:47:06 -----------
Get spark sql singleton context from the current context ----------- 2022-04-11 20:47:06 -----------


[Stage 2:>                                                          (0 + 1) / 1]

----------- 2022-04-11 20:47:08 -----------
Get spark sql singleton context from the current context ----------- 2022-04-11 20:47:08 -----------


[Stage 2:>                                                          (0 + 1) / 1]

----------- 2022-04-11 20:47:10 -----------
Get spark sql singleton context from the current context ----------- 2022-04-11 20:47:10 -----------


[Stage 2:>                                                          (0 + 1) / 1]

----------- 2022-04-11 20:47:12 -----------
Get spark sql singleton context from the current context ----------- 2022-04-11 20:47:12 -----------


[Stage 2:>                                                          (0 + 1) / 1]

----------- 2022-04-11 20:47:14 -----------
Get spark sql singleton context from the current context ----------- 2022-04-11 20:47:14 -----------


[Stage 2:>                                                          (0 + 1) / 1]

----------- 2022-04-11 20:47:16 -----------
Get spark sql singleton context from the current context ----------- 2022-04-11 20:47:16 -----------


[Stage 2:>                                                          (0 + 1) / 1]

----------- 2022-04-11 20:47:18 -----------
Get spark sql singleton context from the current context ----------- 2022-04-11 20:47:18 -----------


[Stage 2:>                                                          (0 + 1) / 1]

----------- 2022-04-11 20:47:20 -----------
Get spark sql singleton context from the current context ----------- 2022-04-11 20:47:20 -----------


[Stage 2:>                                                          (0 + 1) / 1]

----------- 2022-04-11 20:47:22 -----------
Get spark sql singleton context from the current context ----------- 2022-04-11 20:47:22 -----------


[Stage 2:>                                                          (0 + 1) / 1]

----------- 2022-04-11 20:47:24 -----------
Get spark sql singleton context from the current context ----------- 2022-04-11 20:47:24 -----------


[Stage 2:>                                                          (0 + 1) / 1]

----------- 2022-04-11 20:47:26 -----------
Get spark sql singleton context from the current context ----------- 2022-04-11 20:47:26 -----------


[Stage 2:>                                                          (0 + 1) / 1]

----------- 2022-04-11 20:47:28 -----------
Get spark sql singleton context from the current context ----------- 2022-04-11 20:47:28 -----------


[Stage 2:>                                                          (0 + 1) / 1]

----------- 2022-04-11 20:47:30 -----------
Get spark sql singleton context from the current context ----------- 2022-04-11 20:47:30 -----------


[Stage 2:>                                                          (0 + 1) / 1]

----------- 2022-04-11 20:47:32 -----------
Get spark sql singleton context from the current context ----------- 2022-04-11 20:47:32 -----------


[Stage 2:>                                                          (0 + 1) / 1]

----------- 2022-04-11 20:47:34 -----------
Get spark sql singleton context from the current context ----------- 2022-04-11 20:47:34 -----------


[Stage 2:>                                                          (0 + 1) / 1]

----------- 2022-04-11 20:47:36 -----------
Get spark sql singleton context from the current context ----------- 2022-04-11 20:47:36 -----------


[Stage 2:>                                                          (0 + 1) / 1]

----------- 2022-04-11 20:47:38 -----------
Get spark sql singleton context from the current context ----------- 2022-04-11 20:47:38 -----------


[Stage 2:>                                                          (0 + 1) / 1]

----------- 2022-04-11 20:47:40 -----------
Get spark sql singleton context from the current context ----------- 2022-04-11 20:47:40 -----------


ERROR:root:KeyboardInterrupt while sending command.                             
Traceback (most recent call last):
  File "/home/mrzdtydlntm/CodeMirza/hadoopProject/env/lib/python3.8/site-packages/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
  File "/home/mrzdtydlntm/CodeMirza/hadoopProject/env/lib/python3.8/site-packages/py4j/clientserver.py", line 475, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
  File "/usr/lib/python3.8/socket.py", line 669, in readinto
    return self._sock.recv_into(b)
KeyboardInterrupt
[Stage 2:>                                                          (0 + 1) / 1]

KeyboardInterrupt: 