In [None]:
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
import os
import time

# 1️⃣ Create Spark Context
sc = SparkContext("local[2]", "FileStreamWordCount")
ssc = StreamingContext(sc, 5)  # batch interval 5 sec

# 2️⃣ Create streaming folder
stream_dir = "/content/stream_folder"
os.makedirs(stream_dir, exist_ok=True)

# 3️⃣ Create a DStream from the folder
lines = ssc.textFileStream(stream_dir)

# 4️⃣ Split into words and count
words = lines.flatMap(lambda line: line.split(" "))
word_counts = words.map(lambda word: (word, 1)).reduceByKey(lambda a, b: a + b)
word_counts.pprint()

# 5️⃣ Start streaming
ssc.start()

# 6️⃣ Simulate writing new files every few seconds
for i in range(1, 6):
    filename = f"{stream_dir}/file_{i}.txt"
    with open(filename, "w") as f:
        f.write(f"This is This a new line number {i}\n")
    print(f"Created {filename}")
    time.sleep(10)  # wait longer than batch interval

ssc.awaitTermination()




Created /content/stream_folder/file_1.txt
-------------------------------------------
Time: 2025-09-23 10:36:15
-------------------------------------------
('new', 1)
('line', 1)
('number', 1)
('This', 2)
('is', 1)
('a', 1)
('1', 1)

-------------------------------------------
Time: 2025-09-23 10:36:20
-------------------------------------------

Created /content/stream_folder/file_2.txt
-------------------------------------------
Time: 2025-09-23 10:36:25
-------------------------------------------
('new', 1)
('line', 1)
('number', 1)
('This', 2)
('is', 1)
('a', 1)
('2', 1)

-------------------------------------------
Time: 2025-09-23 10:36:30
-------------------------------------------

Created /content/stream_folder/file_3.txt
-------------------------------------------
Time: 2025-09-23 10:36:35
-------------------------------------------
('new', 1)
('line', 1)
('number', 1)
('3', 1)
('This', 2)
('is', 1)
('a', 1)

-------------------------------------------
Time: 2025-09-23 10:36:4

ERROR:root:Exception while sending command.
Traceback (most recent call last):
  File "/usr/local/lib/python3.12/dist-packages/py4j/clientserver.py", line 511, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
                          ^^^^^^^^^^^^^^^^^^^^^^
RuntimeError: reentrant call inside <_io.BufferedReader name=40>

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/local/lib/python3.12/dist-packages/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/dist-packages/py4j/clientserver.py", line 539, in send_command
    raise Py4JNetworkError(
py4j.protocol.Py4JNetworkError: Error while sending or receiving
ERROR:root:Exception while sending command.
Traceback (most recent call last):
  File "/usr/local/lib/python3.12/dist-packages/py4j/clientserver.py", line 511, in sen

Py4JError: An error occurred while calling o25.awaitTermination

In [None]:
ssc.stop()

NameError: name 'ssc' is not defined

In [None]:
# window operations

from pyspark import SparkContext
from pyspark.streaming import StreamingContext
import os
import time

# 1️⃣ Create Spark Context
sc = SparkContext("local[2]", "FileStreamWordCount")
ssc = StreamingContext(sc, 5)  # batch interval = 5 seconds

# 2️⃣ Checkpoint (required for window operations)
ssc.checkpoint("/content/checkpoint")

# 3️⃣ Create streaming folder
stream_dir = "/content/stream_folder"
os.makedirs(stream_dir, exist_ok=True)

# 4️⃣ Create a DStream from the folder
lines = ssc.textFileStream(stream_dir)

# 5️⃣ Split into words
words = lines.flatMap(lambda line: line.split(" "))

# 🔹 Normal word count (per batch)
word_counts = words.map(lambda word: (word, 1)).reduceByKey(lambda a, b: a + b)

# 🔹 Windowed word count (e.g., 20 sec window, slides every 10 sec)
windowed_word_counts = words.map(lambda word: (word, 1)) \
    .reduceByKeyAndWindow(
        lambda a, b: a + b,   # adding function
        lambda a, b: a - b,   # subtracting function
        20,  # window duration
        10   # sliding interval
    )

print("\n===== Per Batch Word Count =====")
word_counts.pprint()

print("\n===== Windowed Word Count (20s window, 10s slide) =====")
windowed_word_counts.pprint()

# 6️⃣ Start streaming
ssc.start()

# 7️⃣ Simulate writing new files every few seconds
for i in range(1, 6):
    filename = f"{stream_dir}/file_{i}.txt"
    with open(filename, "w") as f:
        f.write(f"This is a new line number {i}\n")
    print(f"Created {filename}")
    time.sleep(10)  # wait longer than batch interval

ssc.awaitTermination()



===== Per Batch Word Count =====

===== Windowed Word Count (20s window, 10s slide) =====
Created /content/stream_folder/file_1.txt
-------------------------------------------
Time: 2025-09-21 10:13:35
-------------------------------------------
('new', 1)
('line', 1)
('number', 1)
('This', 1)
('is', 1)
('a', 1)
('1', 1)

-------------------------------------------
Time: 2025-09-21 10:13:40
-------------------------------------------

-------------------------------------------
Time: 2025-09-21 10:13:40
-------------------------------------------
('new', 1)
('line', 1)
('number', 1)
('This', 1)
('is', 1)
('a', 1)
('1', 1)

Created /content/stream_folder/file_2.txt
-------------------------------------------
Time: 2025-09-21 10:13:45
-------------------------------------------
('new', 1)
('line', 1)
('number', 1)
('This', 1)
('is', 1)
('a', 1)
('2', 1)

-------------------------------------------
Time: 2025-09-21 10:13:50
-------------------------------------------

Created /content/st

ERROR:root:Exception while sending command.
Traceback (most recent call last):
  File "/usr/local/lib/python3.12/dist-packages/py4j/clientserver.py", line 511, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
                          ^^^^^^^^^^^^^^^^^^^^^^
RuntimeError: reentrant call inside <_io.BufferedReader name=40>

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/local/lib/python3.12/dist-packages/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/dist-packages/py4j/clientserver.py", line 539, in send_command
    raise Py4JNetworkError(
py4j.protocol.Py4JNetworkError: Error while sending or receiving
ERROR:root:Exception while sending command.
Traceback (most recent call last):
  File "/usr/local/lib/python3.12/dist-packages/py4j/clientserver.py", line 511, in sen

Py4JError: An error occurred while calling o39448.awaitTermination