<a href="https://colab.research.google.com/github/thinkdeepai/spark-training/blob/main/Spark_Training_Spark_Streaming_RDD_WordCount.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
# download and install dependencies
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://downloads.apache.org/spark/spark-3.2.4/spark-3.2.4-bin-hadoop2.7.tgz
!tar xf spark-3.2.4-bin-hadoop2.7.tgz
!pip install -q findspark
!pip install -q pyngrok
!hostname

In [None]:
import os

# set the environment variables
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.2.4-bin-hadoop2.7"

In [None]:
import findspark
findspark.init()

from operator import add, sub
from time import sleep
from pyspark.streaming import StreamingContext
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession

# create the session
conf = SparkConf().set("spark.ui.port", "4050")

# create the context
sc = SparkContext(conf=conf)
spark = SparkSession.builder.getOrCreate()

In [None]:
# Read the input file and Calculating words count
# Try the following text (Caesar's Gallic Wars): https://raw.githubusercontent.com/deanwampler/spark-scala-tutorial/master/data/gallic.mb.txt
!wget https://raw.githubusercontent.com/deanwampler/spark-scala-tutorial/master/data/gallic.mb.txt
text_file = sc.textFile("gallic.mb.txt")

--2023-07-07 14:49:42--  https://raw.githubusercontent.com/deanwampler/spark-scala-tutorial/master/data/gallic.mb.txt
Resolving raw.githubusercontent.com (raw.githubusercontent.com)... 185.199.111.133, 185.199.109.133, 185.199.108.133, ...
Connecting to raw.githubusercontent.com (raw.githubusercontent.com)|185.199.111.133|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 496659 (485K) [text/plain]
Saving to: ‘gallic.mb.txt’


2023-07-07 14:49:43 (12.6 MB/s) - ‘gallic.mb.txt’ saved [496659/496659]



In [None]:
ssc = StreamingContext(sc, 1)

# Input data
rddQueue = [text_file]

inputStream = ssc.queueStream(rddQueue)

In [None]:
text_file.take(10)

['',
 'BOOK 1',
 '',
 'Chapter 1',
 '',
 'All Gaul is divided into three parts, one of which the Belgae inhabit,',
 'the Aquitani another, those who in their own language are called Celts,',
 'in our Gauls, the third. All these differ from each other in language,',
 'customs and laws. The river Garonne separates the Gauls from the Aquitani;',
 'the Marne and the Seine separate them from the Belgae. Of all these,']

In [None]:
# Ex1: Implement the Wordcount
wordCount = inputStream.flatMap(lambda x: x.split(" ")) \
  .filter(lambda x: x != '') \
  .filteredLines.map(lambda x: (x,1)) \
  .reduceByKeyAndWindow(lambda x,y: x+y)
wordCount.pprint(10)

In [None]:
ssc.start()
sleep(20)

-------------------------------------------
Time: 2023-07-07 14:50:04
-------------------------------------------

BOOK
1

Chapter
1

All
Gaul
is
...

-------------------------------------------
Time: 2023-07-07 14:50:05
-------------------------------------------

-------------------------------------------
Time: 2023-07-07 14:50:06
-------------------------------------------

-------------------------------------------
Time: 2023-07-07 14:50:07
-------------------------------------------

-------------------------------------------
Time: 2023-07-07 14:50:08
-------------------------------------------

-------------------------------------------
Time: 2023-07-07 14:50:09
-------------------------------------------

-------------------------------------------
Time: 2023-07-07 14:50:10
-------------------------------------------

-------------------------------------------
Time: 2023-07-07 14:50:11
-------------------------------------------

-------------------------------------------


KeyboardInterrupt: ignored

In [None]:
ssc.stop(stopSparkContext=True, stopGraceFully=True)

-------------------------------------------
Time: 2023-07-07 14:46:22
-------------------------------------------

-------------------------------------------
Time: 2023-07-07 14:46:23
-------------------------------------------

