In [1]:
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.sql import SQLContext
from pyspark.sql.functions import desc
import json
from collections import namedtuple
from datetime import datetime
from kafka import KafkaProducer
import time
import pandas

 
f = open('hashtags.json')
hashtags = json.load(f)
f.close()

hash_values = ["#"+i.lower() for i in list(hashtags.values())]
hash_values

['#coal', '#elonmusk', '#zomato', '#ntr31', '#powercrisis']

In [2]:
sc = SparkContext("local[2]", "dbt-assignment")

22/05/06 22:23:53 WARN Utils: Your hostname, pop-os resolves to a loopback address: 127.0.1.1; using 10.5.26.188 instead (on interface wlo1)
22/05/06 22:23:53 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
22/05/06 22:24:04 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
ssc = StreamingContext(sc, 1)
sqlContext = SQLContext(sc)



In [4]:
window_length = 10
buffer = 1
lines = ssc.socketTextStream("127.0.0.1", 6006).window(window_length, window_length)


In [5]:
fields = ("Hashtag", "Count")
Tweet = namedtuple( 'Tweet', fields )


In [6]:
def myRead(rdd):
    if not rdd.isEmpty():
        return rdd.toDF().sort("Count").createOrReplaceTempView("tweets")

processed = ( lines.flatMap( lambda text: text.split( " " ) )
  .filter( lambda word: word.lower().startswith("#") and word.lower() in hash_values)
  .map( lambda word: ( word.lower(), 1 ) )
  .reduceByKey( lambda a, b: a + b ) 
  .map( lambda rec: Tweet( rec[0], rec[1] ) )) 

processed.pprint()

processed.foreachRDD( lambda rdd: myRead(rdd) ) # Registers to a table.

In [7]:
def serializer(message):
    return json.dumps(message).encode('utf-8')


# Kafka Producer
producer = KafkaProducer(
    bootstrap_servers=['localhost:9092'],
    value_serializer=serializer
)

def send_to_kafka(m):
    mydate = datetime.now()
    for i in m:
        print("producing: " + i + ":"+ str(m[i]))
        producer.send(i.split('#')[1], i+";"+str(m[i])+";"+str(mydate))


In [8]:
ssc.start()


In [9]:
count = 0
x = []
sleeptime = window_length
while count < 6:
    time.sleep(sleeptime)
    d = {}
    try:
        tweets = sqlContext.sql( 'select Hashtag, Count from tweets' )
    except Exception as e:
        sleeptime = buffer
        continue
    sleeptime = window_length
    
    df = tweets.toPandas()
    d1 = df.set_index('Hashtag').T.to_dict('list')
    for i in hash_values:
        if i not in d1:
            d[i] = 0
        else:
            d[i] = d1[i][0]
    send_to_kafka(d)
    count = count + 1


22/05/06 22:24:08 WARN RandomBlockReplicationPolicy: Expecting 1 replicas with only 0 peer/s.
22/05/06 22:24:08 WARN BlockManager: Block input-0-1651856048600 replicated to only 0 peer(s) instead of 1 peers
22/05/06 22:24:09 WARN RandomBlockReplicationPolicy: Expecting 1 replicas with only 0 peer/s.
22/05/06 22:24:09 WARN BlockManager: Block input-0-1651856048800 replicated to only 0 peer(s) instead of 1 peers
22/05/06 22:24:09 WARN RandomBlockReplicationPolicy: Expecting 1 replicas with only 0 peer/s.
22/05/06 22:24:09 WARN BlockManager: Block input-0-1651856049000 replicated to only 0 peer(s) instead of 1 peers
22/05/06 22:24:09 WARN RandomBlockReplicationPolicy: Expecting 1 replicas with only 0 peer/s.
22/05/06 22:24:09 WARN BlockManager: Block input-0-1651856049200 replicated to only 0 peer(s) instead of 1 peers
22/05/06 22:24:09 WARN RandomBlockReplicationPolicy: Expecting 1 replicas with only 0 peer/s.
22/05/06 22:24:09 WARN BlockManager: Block input-0-1651856049400 replicated to

22/05/06 22:24:18 WARN RandomBlockReplicationPolicy: Expecting 1 replicas with only 0 peer/s.
22/05/06 22:24:18 WARN BlockManager: Block input-0-1651856058400 replicated to only 0 peer(s) instead of 1 peers
22/05/06 22:24:18 WARN RandomBlockReplicationPolicy: Expecting 1 replicas with only 0 peer/s.
22/05/06 22:24:18 WARN BlockManager: Block input-0-1651856058600 replicated to only 0 peer(s) instead of 1 peers
22/05/06 22:24:19 WARN RandomBlockReplicationPolicy: Expecting 1 replicas with only 0 peer/s.
22/05/06 22:24:19 WARN BlockManager: Block input-0-1651856058800 replicated to only 0 peer(s) instead of 1 peers
22/05/06 22:24:19 WARN RandomBlockReplicationPolicy: Expecting 1 replicas with only 0 peer/s.
22/05/06 22:24:19 WARN BlockManager: Block input-0-1651856059000 replicated to only 0 peer(s) instead of 1 peers
22/05/06 22:24:19 WARN RandomBlockReplicationPolicy: Expecting 1 replicas with only 0 peer/s.
22/05/06 22:24:19 WARN BlockManager: Block input-0-1651856059200 replicated to

-------------------------------------------
Time: 2022-05-06 22:24:16
-------------------------------------------
Tweet(Hashtag='#elonmusk', Count=1)
Tweet(Hashtag='#ntr31', Count=1)



22/05/06 22:24:20 WARN RandomBlockReplicationPolicy: Expecting 1 replicas with only 0 peer/s.
22/05/06 22:24:20 WARN BlockManager: Block input-0-1651856060400 replicated to only 0 peer(s) instead of 1 peers
22/05/06 22:24:20 WARN RandomBlockReplicationPolicy: Expecting 1 replicas with only 0 peer/s.
22/05/06 22:24:20 WARN BlockManager: Block input-0-1651856060600 replicated to only 0 peer(s) instead of 1 peers
22/05/06 22:24:21 WARN RandomBlockReplicationPolicy: Expecting 1 replicas with only 0 peer/s.
22/05/06 22:24:21 WARN BlockManager: Block input-0-1651856061200 replicated to only 0 peer(s) instead of 1 peers
22/05/06 22:24:21 WARN RandomBlockReplicationPolicy: Expecting 1 replicas with only 0 peer/s.
22/05/06 22:24:21 WARN BlockManager: Block input-0-1651856061400 replicated to only 0 peer(s) instead of 1 peers
22/05/06 22:24:21 WARN RandomBlockReplicationPolicy: Expecting 1 replicas with only 0 peer/s.
22/05/06 22:24:21 WARN BlockManager: Block input-0-1651856061600 replicated to

producing: #coal:0
producing: #elonmusk:1
producing: #zomato:0
producing: #ntr31:1
producing: #powercrisis:0


22/05/06 22:24:24 WARN RandomBlockReplicationPolicy: Expecting 1 replicas with only 0 peer/s.
22/05/06 22:24:24 WARN BlockManager: Block input-0-1651856064200 replicated to only 0 peer(s) instead of 1 peers
22/05/06 22:24:24 WARN RandomBlockReplicationPolicy: Expecting 1 replicas with only 0 peer/s.
22/05/06 22:24:24 WARN BlockManager: Block input-0-1651856064400 replicated to only 0 peer(s) instead of 1 peers
22/05/06 22:24:25 WARN RandomBlockReplicationPolicy: Expecting 1 replicas with only 0 peer/s.
22/05/06 22:24:25 WARN BlockManager: Block input-0-1651856064800 replicated to only 0 peer(s) instead of 1 peers
22/05/06 22:24:25 WARN RandomBlockReplicationPolicy: Expecting 1 replicas with only 0 peer/s.
22/05/06 22:24:25 WARN BlockManager: Block input-0-1651856065200 replicated to only 0 peer(s) instead of 1 peers
22/05/06 22:24:25 WARN RandomBlockReplicationPolicy: Expecting 1 replicas with only 0 peer/s.
22/05/06 22:24:25 WARN BlockManager: Block input-0-1651856065400 replicated to

-------------------------------------------
Time: 2022-05-06 22:24:26
-------------------------------------------
Tweet(Hashtag='#elonmusk', Count=5)



22/05/06 22:24:29 WARN RandomBlockReplicationPolicy: Expecting 1 replicas with only 0 peer/s.
22/05/06 22:24:29 WARN BlockManager: Block input-0-1651856068800 replicated to only 0 peer(s) instead of 1 peers
22/05/06 22:24:29 WARN RandomBlockReplicationPolicy: Expecting 1 replicas with only 0 peer/s.
22/05/06 22:24:29 WARN BlockManager: Block input-0-1651856069000 replicated to only 0 peer(s) instead of 1 peers
22/05/06 22:24:29 WARN RandomBlockReplicationPolicy: Expecting 1 replicas with only 0 peer/s.
22/05/06 22:24:29 WARN BlockManager: Block input-0-1651856069200 replicated to only 0 peer(s) instead of 1 peers
22/05/06 22:24:29 WARN RandomBlockReplicationPolicy: Expecting 1 replicas with only 0 peer/s.
22/05/06 22:24:29 WARN BlockManager: Block input-0-1651856069600 replicated to only 0 peer(s) instead of 1 peers
22/05/06 22:24:30 WARN RandomBlockReplicationPolicy: Expecting 1 replicas with only 0 peer/s.
22/05/06 22:24:30 WARN BlockManager: Block input-0-1651856069800 replicated to

producing: #coal:0
producing: #elonmusk:5
producing: #zomato:0
producing: #ntr31:0
producing: #powercrisis:0


22/05/06 22:24:35 WARN RandomBlockReplicationPolicy: Expecting 1 replicas with only 0 peer/s.
22/05/06 22:24:35 WARN BlockManager: Block input-0-1651856074800 replicated to only 0 peer(s) instead of 1 peers
22/05/06 22:24:35 WARN RandomBlockReplicationPolicy: Expecting 1 replicas with only 0 peer/s.
22/05/06 22:24:35 WARN BlockManager: Block input-0-1651856075200 replicated to only 0 peer(s) instead of 1 peers
22/05/06 22:24:35 WARN RandomBlockReplicationPolicy: Expecting 1 replicas with only 0 peer/s.
22/05/06 22:24:35 WARN BlockManager: Block input-0-1651856075400 replicated to only 0 peer(s) instead of 1 peers
22/05/06 22:24:35 WARN RandomBlockReplicationPolicy: Expecting 1 replicas with only 0 peer/s.
22/05/06 22:24:35 WARN BlockManager: Block input-0-1651856075600 replicated to only 0 peer(s) instead of 1 peers
22/05/06 22:24:36 WARN RandomBlockReplicationPolicy: Expecting 1 replicas with only 0 peer/s.
22/05/06 22:24:36 WARN BlockManager: Block input-0-1651856075800 replicated to

-------------------------------------------
Time: 2022-05-06 22:24:36
-------------------------------------------
Tweet(Hashtag='#elonmusk', Count=6)



22/05/06 22:24:38 WARN RandomBlockReplicationPolicy: Expecting 1 replicas with only 0 peer/s.
22/05/06 22:24:38 WARN BlockManager: Block input-0-1651856078600 replicated to only 0 peer(s) instead of 1 peers
22/05/06 22:24:39 WARN RandomBlockReplicationPolicy: Expecting 1 replicas with only 0 peer/s.
22/05/06 22:24:39 WARN BlockManager: Block input-0-1651856078800 replicated to only 0 peer(s) instead of 1 peers
22/05/06 22:24:39 WARN RandomBlockReplicationPolicy: Expecting 1 replicas with only 0 peer/s.
22/05/06 22:24:39 WARN BlockManager: Block input-0-1651856079000 replicated to only 0 peer(s) instead of 1 peers
22/05/06 22:24:39 WARN RandomBlockReplicationPolicy: Expecting 1 replicas with only 0 peer/s.
22/05/06 22:24:39 WARN BlockManager: Block input-0-1651856079200 replicated to only 0 peer(s) instead of 1 peers
22/05/06 22:24:40 WARN RandomBlockReplicationPolicy: Expecting 1 replicas with only 0 peer/s.
22/05/06 22:24:40 WARN BlockManager: Block input-0-1651856080000 replicated to

producing: #coal:0
producing: #elonmusk:6
producing: #zomato:0
producing: #ntr31:0
producing: #powercrisis:0


22/05/06 22:24:45 WARN RandomBlockReplicationPolicy: Expecting 1 replicas with only 0 peer/s.
22/05/06 22:24:45 WARN BlockManager: Block input-0-1651856085400 replicated to only 0 peer(s) instead of 1 peers
22/05/06 22:24:45 WARN RandomBlockReplicationPolicy: Expecting 1 replicas with only 0 peer/s.
22/05/06 22:24:45 WARN BlockManager: Block input-0-1651856085600 replicated to only 0 peer(s) instead of 1 peers
22/05/06 22:24:46 WARN RandomBlockReplicationPolicy: Expecting 1 replicas with only 0 peer/s.
22/05/06 22:24:46 WARN BlockManager: Block input-0-1651856086000 replicated to only 0 peer(s) instead of 1 peers
22/05/06 22:24:46 WARN RandomBlockReplicationPolicy: Expecting 1 replicas with only 0 peer/s.
22/05/06 22:24:46 WARN BlockManager: Block input-0-1651856086600 replicated to only 0 peer(s) instead of 1 peers
22/05/06 22:24:47 WARN RandomBlockReplicationPolicy: Expecting 1 replicas with only 0 peer/s.
22/05/06 22:24:47 WARN BlockManager: Block input-0-1651856086800 replicated to

-------------------------------------------
Time: 2022-05-06 22:24:46
-------------------------------------------
Tweet(Hashtag='#ntr31', Count=2)
Tweet(Hashtag='#elonmusk', Count=1)



22/05/06 22:24:48 WARN RandomBlockReplicationPolicy: Expecting 1 replicas with only 0 peer/s.
22/05/06 22:24:48 WARN BlockManager: Block input-0-1651856088600 replicated to only 0 peer(s) instead of 1 peers
22/05/06 22:24:49 WARN RandomBlockReplicationPolicy: Expecting 1 replicas with only 0 peer/s.
22/05/06 22:24:49 WARN BlockManager: Block input-0-1651856088800 replicated to only 0 peer(s) instead of 1 peers
22/05/06 22:24:49 WARN RandomBlockReplicationPolicy: Expecting 1 replicas with only 0 peer/s.
22/05/06 22:24:49 WARN BlockManager: Block input-0-1651856089600 replicated to only 0 peer(s) instead of 1 peers
22/05/06 22:24:50 WARN RandomBlockReplicationPolicy: Expecting 1 replicas with only 0 peer/s.
22/05/06 22:24:50 WARN BlockManager: Block input-0-1651856090200 replicated to only 0 peer(s) instead of 1 peers
22/05/06 22:24:51 WARN RandomBlockReplicationPolicy: Expecting 1 replicas with only 0 peer/s.
22/05/06 22:24:51 WARN BlockManager: Block input-0-1651856090800 replicated to

producing: #coal:0
producing: #elonmusk:1
producing: #zomato:0
producing: #ntr31:2
producing: #powercrisis:0


22/05/06 22:24:56 WARN RandomBlockReplicationPolicy: Expecting 1 replicas with only 0 peer/s.
22/05/06 22:24:56 WARN BlockManager: Block input-0-1651856095800 replicated to only 0 peer(s) instead of 1 peers
22/05/06 22:24:56 WARN RandomBlockReplicationPolicy: Expecting 1 replicas with only 0 peer/s.
22/05/06 22:24:56 WARN BlockManager: Block input-0-1651856096000 replicated to only 0 peer(s) instead of 1 peers
22/05/06 22:24:56 WARN RandomBlockReplicationPolicy: Expecting 1 replicas with only 0 peer/s.
22/05/06 22:24:56 WARN BlockManager: Block input-0-1651856096200 replicated to only 0 peer(s) instead of 1 peers
22/05/06 22:24:56 WARN RandomBlockReplicationPolicy: Expecting 1 replicas with only 0 peer/s.
22/05/06 22:24:56 WARN BlockManager: Block input-0-1651856096400 replicated to only 0 peer(s) instead of 1 peers
22/05/06 22:24:56 WARN RandomBlockReplicationPolicy: Expecting 1 replicas with only 0 peer/s.
22/05/06 22:24:56 WARN BlockManager: Block input-0-1651856096600 replicated to

-------------------------------------------
Time: 2022-05-06 22:24:56
-------------------------------------------



22/05/06 22:24:58 WARN RandomBlockReplicationPolicy: Expecting 1 replicas with only 0 peer/s.
22/05/06 22:24:58 WARN BlockManager: Block input-0-1651856098000 replicated to only 0 peer(s) instead of 1 peers
22/05/06 22:24:58 WARN RandomBlockReplicationPolicy: Expecting 1 replicas with only 0 peer/s.
22/05/06 22:24:58 WARN BlockManager: Block input-0-1651856098200 replicated to only 0 peer(s) instead of 1 peers
22/05/06 22:24:58 WARN RandomBlockReplicationPolicy: Expecting 1 replicas with only 0 peer/s.
22/05/06 22:24:58 WARN BlockManager: Block input-0-1651856098400 replicated to only 0 peer(s) instead of 1 peers
22/05/06 22:24:59 WARN RandomBlockReplicationPolicy: Expecting 1 replicas with only 0 peer/s.
22/05/06 22:24:59 WARN BlockManager: Block input-0-1651856098800 replicated to only 0 peer(s) instead of 1 peers
22/05/06 22:24:59 WARN RandomBlockReplicationPolicy: Expecting 1 replicas with only 0 peer/s.
22/05/06 22:24:59 WARN BlockManager: Block input-0-1651856099000 replicated to

-------------------------------------------
Time: 2022-05-06 22:25:06
-------------------------------------------
Tweet(Hashtag='#elonmusk', Count=4)
Tweet(Hashtag='#ntr31', Count=1)

producing: #coal:0
producing: #elonmusk:1
producing: #zomato:0
producing: #ntr31:2
producing: #powercrisis:0


22/05/06 22:25:08 WARN RandomBlockReplicationPolicy: Expecting 1 replicas with only 0 peer/s.
22/05/06 22:25:08 WARN BlockManager: Block input-0-1651856108600 replicated to only 0 peer(s) instead of 1 peers
22/05/06 22:25:09 WARN RandomBlockReplicationPolicy: Expecting 1 replicas with only 0 peer/s.
22/05/06 22:25:09 WARN BlockManager: Block input-0-1651856108800 replicated to only 0 peer(s) instead of 1 peers
22/05/06 22:25:09 WARN RandomBlockReplicationPolicy: Expecting 1 replicas with only 0 peer/s.
22/05/06 22:25:09 WARN BlockManager: Block input-0-1651856109000 replicated to only 0 peer(s) instead of 1 peers
22/05/06 22:25:09 WARN RandomBlockReplicationPolicy: Expecting 1 replicas with only 0 peer/s.
22/05/06 22:25:09 WARN BlockManager: Block input-0-1651856109200 replicated to only 0 peer(s) instead of 1 peers
22/05/06 22:25:09 WARN RandomBlockReplicationPolicy: Expecting 1 replicas with only 0 peer/s.
22/05/06 22:25:09 WARN BlockManager: Block input-0-1651856109400 replicated to

-------------------------------------------
Time: 2022-05-06 22:25:16
-------------------------------------------
Tweet(Hashtag='#elonmusk', Count=2)
Tweet(Hashtag='#ntr31', Count=1)



22/05/06 22:25:19 WARN RandomBlockReplicationPolicy: Expecting 1 replicas with only 0 peer/s.
22/05/06 22:25:19 WARN BlockManager: Block input-0-1651856119600 replicated to only 0 peer(s) instead of 1 peers


producing: #coal:0
producing: #elonmusk:4
producing: #zomato:0
producing: #ntr31:1
producing: #powercrisis:0


In [10]:
ssc.stop()

22/05/06 22:25:20 WARN SocketReceiver: Error receiving data
java.net.SocketException: Socket closed
	at java.net.SocketInputStream.socketRead0(Native Method)
	at java.net.SocketInputStream.socketRead(SocketInputStream.java:116)
	at java.net.SocketInputStream.read(SocketInputStream.java:171)
	at java.net.SocketInputStream.read(SocketInputStream.java:141)
	at sun.nio.cs.StreamDecoder.readBytes(StreamDecoder.java:284)
	at sun.nio.cs.StreamDecoder.implRead(StreamDecoder.java:326)
	at sun.nio.cs.StreamDecoder.read(StreamDecoder.java:178)
	at java.io.InputStreamReader.read(InputStreamReader.java:184)
	at java.io.BufferedReader.fill(BufferedReader.java:161)
	at java.io.BufferedReader.readLine(BufferedReader.java:324)
	at java.io.BufferedReader.readLine(BufferedReader.java:389)
	at org.apache.spark.streaming.dstream.SocketReceiver$$anon$2.getNext(SocketInputDStream.scala:121)
	at org.apache.spark.streaming.dstream.SocketReceiver$$anon$2.getNext(SocketInputDStream.scala:119)
	at org.apache.spar