### Install requirements

In [1]:
# !pip install textblob 
# !pip install pyspark

### Import libraries

In [2]:
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.mllib.clustering import StreamingKMeans
from pyspark.mllib.linalg import Vectors
from textblob import TextBlob
from sklearn.preprocessing import LabelEncoder
import json
import requests
import warnings
warnings.filterwarnings("ignore")


### Set up the environment

In [3]:
HOST = "localhost"
STREAM_PORT = 9999


### Spark session

In [4]:
sc = SparkContext.getOrCreate()
sc.setCheckpointDir("spark_checkpoint")
ssc = StreamingContext(sc, 10)
submissions = ssc.socketTextStream(HOST, STREAM_PORT)
sc.setLogLevel("ERROR")


your 131072x1 screen size is bogus. expect trouble
23/05/02 20:01:25 WARN Utils: Your hostname, DESKTOP-CBR75GN resolves to a loopback address: 127.0.1.1; using 172.21.102.147 instead (on interface eth0)
23/05/02 20:01:25 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/05/02 20:01:26 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


### Process data

In [5]:
def process_submission(message):

    submission = json.loads(message)

    title = submission['message']
    metadata = submission['metadata']
    author = metadata['author_name']
    date = metadata['date']
    score = metadata['score']
    num_comments = metadata['num_comments']
    upvote_ratio = metadata['upvote_ratio']
    text = metadata['text']
    subreddit_name = metadata['subreddit_name']

    title_polarity, title_subjectivity = TextBlob(title).sentiment
    text_polarity, text_subjectivity = TextBlob(text).sentiment

    return {
        'title': title,
        'text': text,
        'author': author,
        'date': date,
        'score': score,
        'num_comments': num_comments,
        'upvote_ratio': upvote_ratio,
        'text': text,
        'subreddit_name': subreddit_name,
        'subreddit_hash': hash(subreddit_name),
        'title_polarity': title_polarity,
        'title_subjectivity': title_subjectivity,
        'text_polarity': text_polarity,
        'text_subjectivity': text_subjectivity
    }


### Add processing to the pipeline

In [6]:
submissions = submissions.map(process_submission)
# submissions.pprint()
training_data = submissions.map(lambda x: Vectors.dense(
    [x['title_polarity'], x['title_subjectivity'], x['text_polarity'], x['text_subjectivity'], x["subreddit_hash"]]))


### Cluster the data

In [7]:
k = 4
model = StreamingKMeans(k, decayFactor=1.0).setRandomCenters(5, 1.0, 0)
model.trainOn(training_data)
result = model.predictOn(training_data)


### Analyzing sentiment by cluster

In [8]:
# create viz.json if doesn't exist and set every cluster size to 0
with open("sentiment.json", "w") as f:
    total_data = {}
    for i in range(k):
        total_data[str(i)] = (0,0,0,0,0)
    json.dump(total_data,f)

def update_cluster_sentiment(rdd):
    if not rdd.isEmpty():
        for x in rdd.collect(): 
            with open("sentiment.json", "r") as f:
                total_data = json.load(f)
            total_data[str(x[0])] = tuple(sum(t) for t in zip(total_data[str(x[0])], x[1]))
            with open("sentiment.json", "w") as f:
                json.dump(total_data,f)

# Calculate the average sentiment scores for each cluster
cluster_sentiment = submissions.map(lambda x: ((x['title_polarity'], x['title_subjectivity'], x['text_polarity'], x['text_subjectivity'],1),Vectors.dense([x['title_polarity'], x['title_subjectivity'], x['text_polarity'], x['text_subjectivity'], x['subreddit_hash']])))
cluster_sentiment = model.predictOnValues(cluster_sentiment)
cluster_sentiment = cluster_sentiment.map(lambda x: (x[1],x[0]))
cluster_sentiment = cluster_sentiment.reduceByKey(lambda x, y: (x[0] + y[0], x[1] + y[1], x[2] + y[2], x[3] + y[3], x[4] + y[4]))
cluster_sentiment.foreachRDD(lambda rdd: update_cluster_sentiment(rdd))
# cluster_sentiment = cluster_sentiment.mapValues(lambda x: (x[0] / x[4], x[1] / x[4], x[2] / x[4], x[3] / x[4]))
# cluster_sentiment.pprint()

### Analyzing location by cluster

In [9]:
# create viz.json if doesn't exist and set every cluster size to 0
with open("subreddit.json", "w") as f:
    total_data = {}
    for i in range(k):
        total_data[str(i)] = ["empty"]
    json.dump(total_data,f)

def update_cluster_subreddit_names(rdd):
    if not rdd.isEmpty():
        for x in rdd.collect(): 
            with open("subreddit.json", "r") as f:
                total_data = json.load(f)
            total_data[str(x[0])] = total_data[str(x[0])] + list(x[1])
            with open("subreddit.json", "w") as f:
                json.dump(total_data,f)

from collections import Counter

# Calculate the most common locations for each cluster
cluster_locations = submissions.map(lambda x: (x['subreddit_name'],Vectors.dense([x['title_polarity'], x['title_subjectivity'], x['text_polarity'], x['text_subjectivity'], x['subreddit_hash']])))
cluster_locations = model.predictOnValues(cluster_locations)
cluster_locations = cluster_locations.map(lambda x: (x[1],x[0]))
cluster_locations = cluster_locations.groupByKey()
cluster_locations.foreachRDD(lambda rdd: update_cluster_subreddit_names(rdd))
# cluster_locations = cluster_locations.mapValues(lambda x: Counter(x).most_common(1)[0][0])
# cluster_locations.pprint()

### Window operation

In [10]:
pairs = result.map(lambda cluster: (cluster, 1))
# window of size 30s, and slides by 10s (very arbitrary)
cluseter_counts = pairs.reduceByKeyAndWindow(
    lambda x, y: x + y, lambda x, y: x - y, 30, 10)
cluseter_counts.pprint()


### Visualization

In [11]:
# create viz.json if doesn't exist and set every cluster size to 0
with open("viz.json", "w") as f:
    total_data = {}
    for i in range(k):
        total_data[str(i)] = 0
    json.dump(total_data, f)


In [12]:
def update_cluster_sizes(rdd):
    if not rdd.isEmpty():
        for x in rdd.collect():
            with open("viz.json", "r") as f:
                total_data = json.load(f)
            total_data[str(x[0])] += x[1]
            with open("viz.json", "w") as f:
                json.dump(total_data, f)


In [13]:
# Update the cluster sizes in sliding windows
cluseter_counts.foreachRDD(lambda rdd: update_cluster_sizes(rdd))


### Start the processing

In [14]:
ssc.start()
ssc.awaitTermination()


                                                                                

-------------------------------------------
Time: 2023-05-02 20:02:20
-------------------------------------------



                                                                                

-------------------------------------------
Time: 2023-05-02 20:02:30
-------------------------------------------
(0, 98)
(1, 2)



                                                                                

-------------------------------------------
Time: 2023-05-02 20:02:40
-------------------------------------------
(0, 161)
(1, 37)
(2, 2)



                                                                                

-------------------------------------------
Time: 2023-05-02 20:02:50
-------------------------------------------
(0, 214)
(1, 83)
(2, 3)



                                                                                

-------------------------------------------
Time: 2023-05-02 20:03:00
-------------------------------------------
(0, 144)
(1, 118)
(2, 13)
(3, 15)



                                                                                

-------------------------------------------
Time: 2023-05-02 20:03:10
-------------------------------------------
(0, 118)
(1, 114)
(2, 21)
(3, 24)

-------------------------------------------
Time: 2023-05-02 20:03:20
-------------------------------------------
(0, 65)
(1, 68)
(2, 20)
(3, 24)



                                                                                

-------------------------------------------
Time: 2023-05-02 20:03:30
-------------------------------------------
(0, 70)
(1, 52)
(2, 22)
(3, 23)



                                                                                

-------------------------------------------
Time: 2023-05-02 20:03:40
-------------------------------------------
(0, 68)
(1, 53)
(2, 25)
(3, 31)



                                                                                

-------------------------------------------
Time: 2023-05-02 20:03:50
-------------------------------------------
(0, 91)
(1, 69)
(2, 39)
(3, 41)



                                                                                

-------------------------------------------
Time: 2023-05-02 20:04:00
-------------------------------------------
(0, 88)
(1, 79)
(2, 50)
(3, 35)



                                                                                

-------------------------------------------
Time: 2023-05-02 20:04:10
-------------------------------------------
(0, 88)
(1, 81)
(2, 45)
(3, 32)



                                                                                

-------------------------------------------
Time: 2023-05-02 20:04:20
-------------------------------------------
(0, 92)
(1, 95)
(2, 54)
(3, 42)



                                                                                

-------------------------------------------
Time: 2023-05-02 20:04:30
-------------------------------------------
(0, 62)
(1, 64)
(2, 31)
(3, 34)



                                                                                

-------------------------------------------
Time: 2023-05-02 20:04:40
-------------------------------------------
(0, 63)
(1, 62)
(2, 40)
(3, 35)



                                                                                

-------------------------------------------
Time: 2023-05-02 20:04:50
-------------------------------------------
(0, 72)
(1, 59)
(2, 39)
(3, 30)



                                                                                

-------------------------------------------
Time: 2023-05-02 20:05:00
-------------------------------------------
(0, 107)
(1, 88)
(2, 63)
(3, 42)



                                                                                

-------------------------------------------
Time: 2023-05-02 20:05:10
-------------------------------------------
(0, 95)
(1, 81)
(2, 77)
(3, 40)



23/05/02 20:05:30 ERROR ReceiverTracker: Deregistered receiver for stream 0: Restarting receiver with delay 2000ms: Socket data stream had no more data
                                                                                

-------------------------------------------
Time: 2023-05-02 20:05:20
-------------------------------------------
(0, 99)
(1, 77)
(2, 75)
(3, 36)



23/05/02 20:05:32 ERROR ReceiverTracker: Deregistered receiver for stream 0: Restarting receiver with delay 2000ms: Error connecting to localhost:9999 - java.net.ConnectException: Connection refused (Connection refused)
	at java.net.PlainSocketImpl.socketConnect(Native Method)
	at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350)
	at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206)
	at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188)
	at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
	at java.net.Socket.connect(Socket.java:607)
	at java.net.Socket.connect(Socket.java:556)
	at java.net.Socket.<init>(Socket.java:452)
	at java.net.Socket.<init>(Socket.java:229)
	at org.apache.spark.streaming.dstream.SocketReceiver.onStart(SocketInputDStream.scala:61)
	at org.apache.spark.streaming.receiver.ReceiverSupervisor.startReceiver(ReceiverSupervisor.scala:149)
	at org.apache.spark.streaming.receiver

KeyboardInterrupt: 

23/05/02 20:05:36 ERROR ReceiverTracker: Deregistered receiver for stream 0: Restarting receiver with delay 2000ms: Error connecting to localhost:9999 - java.net.ConnectException: Connection refused (Connection refused)
	at java.net.PlainSocketImpl.socketConnect(Native Method)
	at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350)
	at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206)
	at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188)
	at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
	at java.net.Socket.connect(Socket.java:607)
	at java.net.Socket.connect(Socket.java:556)
	at java.net.Socket.<init>(Socket.java:452)
	at java.net.Socket.<init>(Socket.java:229)
	at org.apache.spark.streaming.dstream.SocketReceiver.onStart(SocketInputDStream.scala:61)
	at org.apache.spark.streaming.receiver.ReceiverSupervisor.startReceiver(ReceiverSupervisor.scala:149)
	at org.apache.spark.streaming.receiver

In [None]:
# ssc.stop(stopSparkContext=True, stopGraceFully=True)