<a href="https://colab.research.google.com/github/shajarian/Realtime-wordcount-pyspark/blob/main/wordcount.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

***Shaghayegh (Shirley) Shajarian***

---






**Pyspark Streaming Wordcount Example**

Sentiment140 dataset with 1.6 million tweets
https://www.kaggle.com/datasets/kazanova/sentiment140

In [None]:
from google.colab import drive
drive.mount('/content/drive')

!ls "/content/drive/My Drive/Semester 3/Big Data/HW4"

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).
 archive.zip  'Assignment-4(1).pdf'


In [None]:
!unzip -o "/content/drive/My Drive/Semester 3/Big Data/HW4/archive.zip" -d "/content/dataset"

Archive:  /content/drive/My Drive/Semester 3/Big Data/HW4/archive.zip
  inflating: /content/dataset/training.1600000.processed.noemoticon.csv  


In [None]:
!pip install pyspark



In [None]:
from pyspark.sql import SparkSession
from pyspark.streaming import StreamingContext

# Initialize Spark Session
spark = SparkSession.builder.master("local[2]").appName("TwitterStreamWordCount").getOrCreate()

print(spark)
# # Initialize Streaming Context with a batch interval of 5 seconds
# ssc = StreamingContext(spark.sparkContext, 5)

<pyspark.sql.session.SparkSession object at 0x7a433ad49240>


Load the Dataset

In [None]:
import os

file_path = "/content/dataset/training.1600000.processed.noemoticon.csv"
print("File exists:", os.path.exists(file_path))  # This should return True if the file path is correct

File exists: True


In [None]:
from pyspark.sql.types import StructType, StructField, IntegerType, StringType

schema = StructType([
    StructField("target", IntegerType(), True),
    StructField("ids", StringType(), True),
    StructField("date", StringType(), True),
    StructField("flag", StringType(), True),
    StructField("user", StringType(), True),
    StructField("text", StringType(), True)
])

df = spark.read.csv(file_path, schema=schema, header=False, encoding="ISO-8859-1")
df.show(5)

+------+----------+--------------------+--------+---------------+--------------------+
|target|       ids|                date|    flag|           user|                text|
+------+----------+--------------------+--------+---------------+--------------------+
|     0|1467810369|Mon Apr 06 22:19:...|NO_QUERY|_TheSpecialOne_|@switchfoot http:...|
|     0|1467810672|Mon Apr 06 22:19:...|NO_QUERY|  scotthamilton|is upset that he ...|
|     0|1467810917|Mon Apr 06 22:19:...|NO_QUERY|       mattycus|@Kenichan I dived...|
|     0|1467811184|Mon Apr 06 22:19:...|NO_QUERY|        ElleCTF|my whole body fee...|
|     0|1467811193|Mon Apr 06 22:19:...|NO_QUERY|         Karoli|@nationwideclass ...|
+------+----------+--------------------+--------+---------------+--------------------+
only showing top 5 rows



Remove URLs or special characters from the tweets

In [None]:
from pyspark.sql.functions import regexp_replace

# Removing URLs and special characters to clean the 'text' column
df = df.withColumn("text", regexp_replace("text", "http[s]?://\S+", ""))  # Remove URLs
df = df.withColumn("text", regexp_replace("text", "[^a-zA-Z0-9 ]", ""))  # Remove special chars
df.show(5)

+------+----------+--------------------+--------+---------------+--------------------+
|target|       ids|                date|    flag|           user|                text|
+------+----------+--------------------+--------+---------------+--------------------+
|     0|1467810369|Mon Apr 06 22:19:...|NO_QUERY|_TheSpecialOne_|switchfoot   Awww...|
|     0|1467810672|Mon Apr 06 22:19:...|NO_QUERY|  scotthamilton|is upset that he ...|
|     0|1467810917|Mon Apr 06 22:19:...|NO_QUERY|       mattycus|Kenichan I dived ...|
|     0|1467811184|Mon Apr 06 22:19:...|NO_QUERY|        ElleCTF|my whole body fee...|
|     0|1467811193|Mon Apr 06 22:19:...|NO_QUERY|         Karoli|nationwideclass n...|
+------+----------+--------------------+--------+---------------+--------------------+
only showing top 5 rows



For a simple analysis, let's perform a word count on the text column

In [None]:
from pyspark.sql.functions import explode, split

# Split text into words and then explode to have one word per row
words = df.select(explode(split(df.text, "\s+")).alias("word"))
word_counts = words.groupBy("word").count()

# Show the most frequent words
word_counts.orderBy("count", ascending=False).show()

+----+-------+
|word|  count|
+----+-------+
|    |1265051|
|  to| 557047|
|   I| 499345|
| the| 488381|
|   a| 366846|
|  my| 281036|
| and| 277434|
|   i| 251504|
| you| 244146|
|  is| 222421|
| for| 211386|
|  it| 210368|
|  in| 206395|
|  of| 180363|
|  on| 159611|
|  me| 152157|
|have| 133408|
|that| 130962|
|  so| 128844|
|with| 112197|
+----+-------+
only showing top 20 rows



So far, I've run the word count using Pyspark for our data frame. Next, I will run the realtime wordcount using PySpark.

In [None]:
from pyspark import SparkContext
from pyspark.streaming import StreamingContext

# Stop any existing Spark Context
sc = SparkContext.getOrCreate()
if sc is not None:
    sc.stop()

# Now create a new Spark Context
sc = SparkContext("local[2]", "TwitterStream")

# Create a Streaming Context with a batch interval of 1 second
ssc = StreamingContext(sc, 1)

In [None]:
# Connect to a stream source, e.g., TCP socket
lines = ssc.socketTextStream("localhost", 9999)

# Define the computation logic
words = lines.flatMap(lambda line: line.split(" "))
pairs = words.map(lambda word: (word, 1))
word_counts = pairs.reduceByKey(lambda num1, num2: num1 + num2)

# Print the results
word_counts.pprint()

# Start processing and await termination
ssc.start()
ssc.awaitTermination()

-------------------------------------------
Time: 2024-05-09 02:42:40
-------------------------------------------

-------------------------------------------
Time: 2024-05-09 02:42:41
-------------------------------------------

-------------------------------------------
Time: 2024-05-09 02:42:42
-------------------------------------------

-------------------------------------------
Time: 2024-05-09 02:42:43
-------------------------------------------

-------------------------------------------
Time: 2024-05-09 02:42:44
-------------------------------------------

-------------------------------------------
Time: 2024-05-09 02:42:45
-------------------------------------------

-------------------------------------------
Time: 2024-05-09 02:42:46
-------------------------------------------

-------------------------------------------
Time: 2024-05-09 02:42:47
-------------------------------------------

-------------------------------------------
Time: 2024-05-09 02:42:48
----------

ERROR:root:KeyboardInterrupt while sending command.
Traceback (most recent call last):
  File "/usr/local/lib/python3.10/dist-packages/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
  File "/usr/local/lib/python3.10/dist-packages/py4j/clientserver.py", line 511, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
  File "/usr/lib/python3.10/socket.py", line 705, in readinto
    return self._sock.recv_into(b)
KeyboardInterrupt


-------------------------------------------
Time: 2024-05-09 02:44:32
-------------------------------------------



KeyboardInterrupt: 

The output indicates that the Spark Streaming job is running and processing time intervals (batches), but it's not showing any actual data results because it's not receiving any data from the source.

Set Up a Simple TCP Server
I create a simple TCP server in Python that runs on my local machine and sends data to the port that Spark Streaming is listening on.

In [None]:
import socket
import time

# Function to simulate sending data
def send_data():
    # Create a socket object
    s = socket.socket()
    # Get local machine name
    host = socket.gethostname()
    # Set up port
    port = 9998
    # Bind to the port
    s.bind((host, port))
    # Now wait for client connection
    s.listen(5)
    print('Server started. Listening...')

    # Establish connection with client
    c, addr = s.accept()
    print('Got connection from', addr)

    # Send data every second
    try:
        while True:
            # Send current time as data
            message = time.strftime('%Y-%m-%d %H:%M:%S')
            c.send(message.encode())
            time.sleep(1)  # sleep for 1 second
    except KeyboardInterrupt:
        c.close()

# Start sending data
send_data()

Server started. Listening...
-------------------------------------------
Time: 2024-05-09 02:52:29
-------------------------------------------

-------------------------------------------
Time: 2024-05-09 02:52:30
-------------------------------------------

-------------------------------------------
Time: 2024-05-09 02:52:31
-------------------------------------------

-------------------------------------------
Time: 2024-05-09 02:52:32
-------------------------------------------

-------------------------------------------
Time: 2024-05-09 02:52:33
-------------------------------------------

-------------------------------------------
Time: 2024-05-09 02:52:34
-------------------------------------------

-------------------------------------------
Time: 2024-05-09 02:52:35
-------------------------------------------

-------------------------------------------
Time: 2024-05-09 02:52:36
-------------------------------------------

-------------------------------------------
Time: 2

KeyboardInterrupt: 

Now we can see the real-time simulated data appearing in the Spark Streaming output, showing the timestamps (or it can be any other data we choose to send) being processed batch by batch.