In the first step, we declare a `SparkSession` and get `SparkContext` which will be used in further action. `SparkConf` help us to define `SparkSession` configurations; we can use the internal hostname in the spark master URI: `spark://spark-master:7077` since we've deployed the spark cluster and jupyterlab in the same docker virtual network.

In [10]:
from pyspark.sql import SparkSession
from pyspark.conf import SparkConf

conf = SparkConf()\
        .setMaster("spark://spark-master:7077")\
        .setAppName("ParallelWordCounting")\
        .setExecutorEnv("spark.executor.memory", "1024m")\
        .setExecutorEnv("spark.driver.memory", "1024m")

spark = SparkSession.builder.config(conf=conf).getOrCreate()
sc = spark.sparkContext

In [11]:
sc

### Text processing

We are going to load a raw text file from tesseract dataset: https://github.com/tesseractocr/langdata_lstm/blob/master/vie/vie.training_text. This file contains 280627 lines of the Vietnamese language. Our goal is to get the most frequent word in this file.

In [55]:
!wget https://raw.githubusercontent.com/tesseract-ocr/langdata_lstm/master/vie/vie.training_text

--2021-09-09 15:55:20--  https://raw.githubusercontent.com/tesseract-ocr/langdata_lstm/master/vie/vie.training_text
Resolving raw.githubusercontent.com (raw.githubusercontent.com)... 185.199.108.133, 185.199.109.133, 185.199.110.133, ...
Connecting to raw.githubusercontent.com (raw.githubusercontent.com)|185.199.108.133|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 21889986 (21M) [text/plain]
Saving to: ‘vie.training_text’


2021-09-09 15:55:38 (1.35 MB/s) - ‘vie.training_text’ saved [21889986/21889986]



Next, we declare a straightforward normalization step that converts all characters to lowercase and removes all punctuation.

In [12]:
import re
from string import punctuation

def normalize(x):
    x = x.lower()
    return re.sub(r"[" + punctuation + "]", "", x)

We now implement a naive algorithm that iterates through all of the text lines and counting sequentially.

In [13]:
def count_sequential():
    d = dict()
    with open("vie.training_text", "r") as stream:
        for line in stream.read().split():
            line = normalize(line.strip())
            line = line.split(" ")
            for w in line:
                if w not in d:
                    d[w] = 1
                else:
                    d[w] += 1

    d = {k: v for k, v in sorted(d.items(), reverse=True, key=lambda item: item[1])}
    return list(d.items())[:20]

The function below is the map-reduce algorithm using PySpark. First, we load `vie.training_text` to RDD using `sc.textFile()`. Second, each text line will be normalized and split into a list of words. Then, we create a pair `(word, 1)` for each word and aggregate them using `reduceByKey()` function. The last step is just sorting the output RDD.

In [14]:
def count_mapreduce():
    text_rdd = sc.textFile("vie.training_text")
    text_rdd = text_rdd.flatMap(lambda line: normalize(line.strip()).split(" "))\
                .map(lambda word: (word, 1))\
                .reduceByKey(lambda a, b: a + b)\
                .sortBy(lambda a: a[1], ascending=False).collect()
    
    return text_rdd[:20] # return top-20 frequent words

It is time to running the above functions and see the result.

In [15]:
import time
t0 = time.time()
res = count_sequential()
t1 = time.time()
print(f"Sequential: {t1 - t0}s\n{res}")

Sequential: 5.693771600723267s
[('và', 73333), ('', 37385), ('các', 35065), ('với', 31060), ('để', 21425), ('của', 20510), ('nhà', 20413), ('liên', 18289), ('số', 17943), ('haritası', 16596), ('thông', 16207), ('không', 16096), ('bảo', 14864), ('giá', 14613), ('ı', 14471), ('văn', 13621), ('thường', 13317), ('có', 12681), ('đã', 12602), ('bản', 12517)]


In [18]:
import time
t0 = time.time()
res = count_mapreduce()
t1 = time.time()
print(f"MapReduce: {t1 - t0}s\n{res}")

                                                                                

MapReduce: 3.287429094314575s
[('và', 73333), ('', 37385), ('các', 35065), ('với', 31060), ('để', 21425), ('của', 20510), ('nhà', 20413), ('liên', 18289), ('số', 17943), ('haritası', 16596), ('thông', 16207), ('không', 16096), ('bảo', 14864), ('giá', 14613), ('ı', 14471), ('văn', 13621), ('thường', 13317), ('có', 12681), ('đã', 12602), ('bản', 12517)]


In [8]:
sc.stop()
spark.stop()