## Lesson 12 - Advanced Spark

## Learning tasks
* Read through the online lesson notes and work through Jupyter Notebook version.
* Complete the non-graded exercises at the end of the lesson.
* Contribute to the discussion for the lesson. Discussion forum 12 will be closed by 11:59 p.m. Day 7 of Week 12. For further details consult the course syllabus.
* Submit Assignment 3 no later than 11:59 p.m. Day 7 of Lesson 12. For further details consult the course syllabus.
* Work on your group Course project due at the end of the course.

## Lesson objectives

* Use topic modelling to generate topics for a set of documents, and then compute their contribution to each document
* Explain the difference between a Resilient Distributed Dataset (RDD) and a DataFrame
* Use Spark to process multiple binary files
* Use Optical Character Recognition (OCR) software to large sets of scans of documents in Spark 



## Introduction

The goal of second Spark lesson is to present more advanced features, and to show how it can be used in a general way to process and analyze large numbers of files. Spark includes many built in libraries, parallelized and easy to use. In particular, there is the machine learning (ML) library. We will use it to do topic modelling on the set of books that we have prepared in the previous lesson. Topic modelling allows us to determine a set of topics which the documents are about, and then determine which topic are contained in each document.  

## Topic modelling
Topic modelling is a purely statistical technique which treats documents as bags (sets) of words, with each word given a unique identifying number. Only the frequency of each word in the document is considered, as the algorithm knows nothing about the language or meaning of words.  Topic modelling then attempts to find sets of words, or topics, which tend to appear in subsets of documents more frequently.  The power of this approach is that the topics themselves don't have to be defined by a human a priori, but instead are determined by the algorithm itself.  In other words, a human does not need to spend time reading a large number of documents to determine what the topics are.  Once the topics are determined, then the presence of the topics in each document can be calculated.  The topics themselves are also just sets of words to the algorithm, and it is up to the human to look at the words most frequenly appearing to determine the meaning of the topic.

To make this idea even clearer, let's consider a simple example.  Imagine you take the set of all emails that you ever received, and want to divide them by topics that they discuss, to make it easier to organize them.  Some of these emails might be about your hobbies, some might be about work.  It is likely that in the hobby related emails a certain set of words will be regularly repeated, and in the emails about work a certain other set of words is repeated.  Topic modelling could discover these repeating sets of words and make a topic out of each set. However topic modelling might discover other topics which fit the data better, and which you would not even have considered if you had to pick a topic list yourself.  The end result is that you obtain a useful classification of your emails without having to read them all, and without having to decide which topics they should be classified by.

We are now going to do some topic modelling using the tools provided in Spark. First, to continue where we left off in the last lesson, we read the RDD with set of words saved at the end of last lecture.

In [5]:
# it is possible to control the configuration of spark at runtime
import pyspark
config = pyspark.SparkConf().setAll([('spark.executor.memory', '8g'), ('spark.executor.cores', '3'), ('spark.cores.max', '3'), ('spark.driver.memory','8g')])
config = pyspark.SparkConf().setAll([ ('spark.executor.extraJavaOptions', 'Xmx1024m') ,])

sc.stop()
sc = pyspark.SparkContext(conf=config)

# NOTE: this loads a restricted data set generated for files in range [A-F]
# this is needed to keep the workload reasonable (and to avoid spark crashes)
modernism_word_filt_string=sc.textFile("../week11/modernism_word_filt/part-*")

To remind ourselves what this RDD contains, we print the first 10 elements.  We see it contains strings, each string with Python syntax for a tuple containing the document identifier (URL) and then a word in that document.  

In [6]:
modernism_word_filt_string.take(10)

                                                                                

["('https://ebooks.adelaide.edu.au/h/hardy/thomas/changed/', 'changed')",
 "('https://ebooks.adelaide.edu.au/h/hardy/thomas/changed/', 'prefatory')",
 "('https://ebooks.adelaide.edu.au/h/hardy/thomas/changed/', 'note')",
 "('https://ebooks.adelaide.edu.au/h/hardy/thomas/changed/', 'reprint')",
 "('https://ebooks.adelaide.edu.au/h/hardy/thomas/changed/', 'volume')",
 "('https://ebooks.adelaide.edu.au/h/hardy/thomas/changed/', 'worth')",
 "('https://ebooks.adelaide.edu.au/h/hardy/thomas/changed/', 'dozen')",
 "('https://ebooks.adelaide.edu.au/h/hardy/thomas/changed/', 'minor')",
 "('https://ebooks.adelaide.edu.au/h/hardy/thomas/changed/', 'novels')",
 "('https://ebooks.adelaide.edu.au/h/hardy/thomas/changed/', 'published')"]

We need to convert that string to a Python data structure. We could use json module however that has problems with interpreting double quotes.  Instead, will use the ast module.

In [7]:
import ast
modernism_word_filt=modernism_word_filt_string.map(lambda x:ast.literal_eval(x))

Now the data is in the form of a tuple containing two strings.

In [8]:
modernism_word_filt.first()

('https://ebooks.adelaide.edu.au/h/hardy/thomas/changed/', 'changed')

Now we read in the second RDD prepared in the previous lesson, the one containing word counts.

In [10]:
modernism_word_count_string=sc.textFile("../week11/modernism_word_count/part-*")
modernism_word_count_string.first()

"('below', 3693)"

As before, we convert the string to a Python tuple.

In [11]:
modernism_word_count=modernism_word_count_string.map(lambda x:ast.literal_eval(x))
modernism_word_count.first()

('below', 3693)

We check how many words are in our RDD.

In [12]:
modernism_word_count.count()

                                                                                

141053

We next filter out words which occur too frequently, and also those which appear too infrequently.  Words which occur in all documents are not useful to us, since the whole goal is to find distict words that can be used to describe a topic.  We have already filtered out the common stopwords in the previous lesson when preparing this RDD, so here we are just proceeding further with the same idea.  Similarly, very rare words are not going to be useful in determining topics, and they will also increase the time it takes to do the calculation, so removing them makes sense as well.

The filter thresholds are somewhat arbitrary, and they could be varied to see if a better topic set can be produced.

In [13]:
modernism_word_count_truncated=modernism_word_count.filter(lambda x: int(x[1])<10000 and int(x[1])>20)
# heavily restrict words for debugging
#modernism_word_count_truncated=modernism_word_count.filter(lambda x: int(x[1])<600 and int(x[1])>300)

modernism_word_count_truncated.count()

                                                                                

29656

After applying the filter we are left with a smaller number of words to consider.

Now we will read in the RDD with the text of selected books generated at the end of previous lesson. That consists of a tuple, containing the URL and another tuple.  That second tuple consists of a dictionary with metadata and then the text of the book.  We again use the ast module to convert a string into an actual Python data structure, then print out some of the data to see what it looks like.

In [15]:
modernism_meta_text_string=sc.textFile("../week11/modernism_meta_text/part-*")
import ast
modernism_meta_text=modernism_meta_text_string.map(lambda x:ast.literal_eval(x))
modernism_meta_text_first=modernism_meta_text.first()
print(modernism_meta_text_first[0])
print(modernism_meta_text_first[1][0])
print(modernism_meta_text_first[1][1][0:200])

https://ebooks.adelaide.edu.au/h/hardy/thomas/changed/
{'publisher': 'The University of Adelaide Library', '@type': 'Book', '@context': 'http://schema.org', 'url': 'https://ebooks.adelaide.edu.au/h/hardy/thomas/changed/', 'dateCreated': 1913, 'datePublished': '2006-10-15', 'author': 'Hardy, Thomas, 1840-1928', 'name': 'A Changed Man', 'dateModified': '2014-03-07', 'keywords': 'Literature', 'image': 'https://ebooks.adelaide.edu.au/h/hardy/thomas/changed/cover.jpg', 'description': 'A Changed Man / Thomas Hardy', 'inLanguage': 'en', 'author_lastname': 'Hardy', 'author_firstname': 'Thomas', 'author_birth': 1840, 'author_death': 1928}
A Changed Man











Prefatory Note

I reprint in this volume, for what they may be worth, a dozen minor novels that have been published in the
periodical press at various dates in the past, in or


## Topic modelling with LDA

The mathematics of topic modelling are quite challenging, even though the idea itself is fairly intuitive, so we will not cover them here.  Spark provides topic modelling techniques via machine learning (ML) libraries. The particular topic modelling technique we will use is based on Latent Dirichlet allocation or LDA.  We next proceed to convert our input RDDs into the mathematical form that the LDA routines will accept.

As a first step, we take our list containing the tuple of (word,frequency), and extract only the word, using the keys() method. Then we apply zipWithIndex, which zips the RDD with its element indices.   Quick example to illustrate:

In [16]:
sc.parallelize(["a", "b", "c", "d"], 3).zipWithIndex().collect()

[('a', 0), ('b', 1), ('c', 2), ('d', 3)]

Finally, collectAsMap will return the key-value pairs in this RDD to the master as a dictionary.

In [18]:
modern_vocab = modernism_word_count_truncated.keys().zipWithIndex().collectAsMap()
print(modern_vocab)





                                                                                

We broadcast this structure from the master so it is available to all Spark processes.

In [19]:
br_modern_vocab = sc.broadcast(modern_vocab)

We now create an RDD containing tuples, each with the url identifier of a book, and a list containing all of its words.

In [20]:
modernism_doc_bag = modernism_meta_text.values().map(lambda x: (x[0]['url'], x[1].split()))

We count the number of books in our RDD.

In [21]:
modernism_doc_bag.count()

                                                                                

397

We now perform the mathematical transformation.  It is quite involved, so you are welcome to decompose it into stages and print out the results at each stage.  Here we describe the steps.

1. Apply filter to keep only the words present in our restricted vocabulary.
2. Convert each word to its index, using the dictionary we broadcast, with the word itself as the identifier
3. Counter is a tool which takes a list as input, and produces a dictionary containing list members as keys and the number of times each key occurs in the list as values.  Quick example to illustrate:

In [22]:
from collections import Counter
Counter(["a","a","b","b","c"])

Counter({'a': 2, 'b': 2, 'c': 1})

We apply the Counter.

4. We sort the items in each dictionary, and return an ordered dictionary object, which remembers the order keys were inserted in.
5. We convert the data in the dictionary into sparse vector, using one of the methods of the Vectors submodule.  To create a sparse vector, we provide a list of nonzero entries, their indices and their values.
6. We perform zipWithIndex, taking the RDD and producing a new one, with pairs consisting of the index and the contents of the original RDD.  Then we map to produce the final RDD by forming a tuple containing the index and the sparse vector.
7. We cache the RDD as the final step.

We have just outlined a complex multistep process, shown in full below.  The best way to understand it is to run it multiple times, adding the successive stages, and looking at the first element of the resulting RDD each time, to see what each operation does.

In [32]:
from collections import Counter, OrderedDict
from pyspark.ml.linalg import Vectors

mdwc_idx = modernism_doc_bag.mapValues(lambda words: list(filter(lambda word: word in br_modern_vocab.value, words)))\
                  .mapValues(lambda words: list(map(lambda word: br_modern_vocab.value[word], words)))\
                  .mapValues(Counter)\
                  .mapValues(lambda d: OrderedDict(sorted(d.items())))\
                  .mapValues(lambda counter: Vectors.sparse(len(br_modern_vocab.value), list(counter.keys()), list(counter.values())))\
                  .zipWithIndex().map(lambda x: [x[1], x[0][1]])\
                  .cache()

# mdwc_idx.first()

                                                                                

There are two ML (Machine Learning) implementations in Spark
- spark.mllib
- spark.ml
 

spark.mllib contains the original API build on top of RDDs. We will use spark.ml which provides a higher level API build on top of DataFrames.  The older spark.mllib is still available but it lacks the latest features.  The newer library uses DataFrames instead of RDDs.  DataFrames are an extension of RDDs, storing not only data by labes for the columns in the data as well.  We are not going to discuss DataFrames in detail in this course.  We are only going to create a DataFrame from an RDD, which is done via a createDataFrame routine, which takes as arguments an RDD and a list of labels for the column in the RDD.  Once we are working with DataFrames, the results will be DataFrames also, and each element in the DataFrame will be a Row object.

In [33]:
from pyspark.ml.clustering import LDA

In [36]:
from pyspark.ml.linalg import Vectors, SparseVector
from pyspark.ml.clustering import LDA

from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()

testdf=spark.createDataFrame(mdwc_idx, ["id","features"])

                                                                                

We can pass this DataFrame to the LDA model.  In it, we can specify either EMLDAOptimizer (the default) or OnlineLDAOptimizer. We will use the first of these.  We also specify how many topics we want to have generated.  After the LDA object has been initialized, we run its fit method with the DataFrame containing our data as argument.  This may be a time consuming step for a large dataset.  Please note that the technique uses random numbers in its algorithm, so the result will be somewhat different each time (and topics might be ordered differently), unless the same seed is specified explicitly.  

In [37]:
numTopics = 10
lda = LDA(k=numTopics, seed=1, optimizer="em")
model = lda.fit(testdf)

                                                                                

The model object has various methods, which you can see by running the help routine on it.

In [38]:
#help(localModel)

Now we can extract the topics.  Coming straight out of the model, they are just sets of numbers, which we need to convert to contain corresponding words for these to be useful.  We can limit the outputs to just the top words in each topic for easy readibility.

In [39]:
topicIndices = model.describeTopics(maxTermsPerTopic = 10)

We collect these topics to take a look at the raw values.

In [40]:
collected_topicIndices=topicIndices.collect()
#print(type(collected_topicIndices))
#print(collected_topicIndices[0])

To get this into a readable form, we first need to inverse our (word, index) list.

In [41]:
modern_top_vocab_inv = {v:k for k, v in modern_vocab.items()}
#type(modern_top_vocab_inv)

Using this inversed list, we now go through the topic list and print out the words that they contain, as well as their weights.

In [42]:
for rowind,terms, termWeights in collected_topicIndices:
    print("TOPIC:")
    for term, weight in zip(terms, termWeights):
        print(modern_top_vocab_inv[term], weight)
    print()

TOPIC:
dream 0.005645333782752436
form 0.0033403970979731063
dreams 0.00302458849892818
relation 0.002642229435087516
means 0.0026357442500690555
character 0.0025786132634423816
theory 0.002488014165305282
individual 0.0023914589779171257
question 0.0022076995098922283
real 0.002152255463048928

TOPIC:
till 0.0029347560979657446
whom 0.0025228050704586675
thou 0.002504561094940546
forth 0.0022658240789955103
brought 0.002239667416899913
unto 0.0022290424310280107
fell 0.002188552730980231
held 0.002008146298420883
hath 0.0019025456816226468
sent 0.0018577157256367085

TOPIC:
sense 0.002756499374902485
human 0.0025681070464081096
nature 0.002205664977730307
whom 0.0021356486391704665
till 0.001912942062408199
brought 0.0017837346046840354
themselves 0.0017609311301697416
merely 0.0017366180216934611
true 0.001690998556203416
women 0.001684166912635403

TOPIC:
gone 0.0028558563340459306
hear 0.0028128223789083816
dark 0.0027680251992292524
black 0.0025981930943070174
walked 0.00250972931

These are machine generated topics and they will not necessarily be intuitive to a human.

Now that we know what the topics are, we want to see which topics are contained in each book.  To do this, we apply the transform method of the model to a DataFrame, in this case the DataFrame we have used to fit the model.  We could have also applied it to different data, if it was sufficiently similar to the data we fit the model to.  The transform operation generates a new DataFrame, which will contain the topicDicstribution column for each book.  These contain the contribution of each topic to the book, and the values add up to 100%.  If the contribution for a given topic is high, that means the book contains the words from the topic in a significant degree, and hence is probably about the topic.

In [43]:
transformed=model.transform(testdf)
model.transform(testdf).select("id","topicDistribution").show(truncate=False)

24/04/03 22:48:51 WARN DAGScheduler: Broadcasting large task binary with size 2.3 MiB
24/04/03 22:48:51 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS
24/04/03 22:48:52 WARN DAGScheduler: Broadcasting large task binary with size 2.3 MiB
24/04/03 22:48:52 WARN DAGScheduler: Broadcasting large task binary with size 2.3 MiB


+---+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|id |topicDistribution                                                                                                                                                                                                   |
+---+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|0  |[0.04512287540895156,0.143493211610512,0.02583568020461748,0.011764379105668056,0.022774980128659156,0.3263737486394607,0.044159229393283365,0.14185484759585162,0.06470214125069434,0.17391890666230161]           |
|1  |[0.001936227627662102,0.7911558104903137,0.0050903813105973895,0.1527967125144267,0.0023268825792108604,0.0037611074054

At this point we could, for example, select a topic and then find the books in which these topics are prominently represented. This last step is left as an exercise for the you.

## Multiple file RDDs applied to analysing newspaper content via OCR

The RDDs we have used so far took data from multiple files and gathered it into one set.  However, it is also possible to have an RDD holding separate text and binary files, where each file is distinct.  For text files, this might be a convenient way to keep data from different files separate.  For binary files, this approach is essential if we want to treat them separately, for example if the files are images and we want to apply some operations to them.
In this section we are going to use data from the Library of Congress, which has made available online the contents of hundreds of American newspapers published in the 19th and early 20th century.  Much of this data is old enough to be in the Public Domain, which makes it convenient to use.  The data contains scans of newspaper pages, and also the text of those pages obtained via OCR (optical character recognition) software.

The techniques of OCR are continually improving, with advances in Machine Learning algorithms, so it often makes sense to perform scans with more modern software to see if the quality of the OCR data can be improved.  However, doing OCR is time consuming, especially if it has to be done on a large data set.  Here is where the parallelization provided by Spark can be very useful, processing large amounts of data efficiently.

In the Python code below, we will read data from multiple editions of a newspaper, count the number of occurrences of selected search terms on the front page, and plot them as a function of time.  We will do this first with OCR text data provided online, and then redo the analysis with our own more modern OCR to compare results.

The data we will be working with is available at Chronicling America - Historic American Newspapers:  https://chroniclingamerica.loc.gov/lccn .  From the large number of newspapers, we will randomly select the "New York Tribune" (1866-1924) for further analysis.  Picking an issue to work in our example, we can look at January 7, 1918 issue at: https://chroniclingamerica.loc.gov/lccn/sn83030214/1918-01-07/ed-1/.  Page 1 of that issue is at https://chroniclingamerica.loc.gov/lccn/sn83030214/1918-01-07/ed-1/seq-1/ and that page has links to the OCR text of the page, as well as the PDF of the scan of the page, i.e. the data we want to scrape.

The web scraping program which obtains the data is shown below.  It is not multi-threaded, since we have to be careful about too many simultaneous access requests to the Library of Congress website.  The program downloads the data for a selected range of dates. As the last step of this program, we convert the scan image from PDF format and JPEG, as that is compatible with the tools we will be using below.

In [44]:
# web scraping program
# do not run it, data is already available in /cp631/lesson12/loc_papers

import datetime
import os
import urllib.request
import urllib.error

def getpdffile(link,datestring):

    attempts = 0

    while attempts < 3:
        try:
            print(link)
            response = urllib.request.urlopen(link, timeout = 5)
            content = response.read()
            f = open( datestring+".pdf", 'wb' )
            f.write( content )
            f.close()
            os.system('convert -density 300 '+datestring+'.pdf  '+datestring+'.jpg')

            break

        except urllib.error.URLError as e:
            attempts += 1
            print(type(e))


def gethtmlfile(link,datestring):

    attempts = 0

    while attempts < 3:
        try:
            print(link)
            response = urllib.request.urlopen(link, timeout = 5)
            content = response.read().decode()
            f = open( datestring+".txt", 'w' )
            f.write( content )
            f.close()

            break

        except urllib.error.URLError as e:
            attempts += 1
            print(type(e))


link_base="https://chroniclingamerica.loc.gov/lccn/sn83030214/"


date_start=datetime.date(year=1918, month=1, day=1)
date_start_ord=date_start.toordinal()

date_end=datetime.date(year=1918, month=2, day=2)
date_end_ord=date_end.toordinal()


for i in range(date_start_ord,date_end_ord+1):
    dt=datetime.date.fromordinal(i)
    datestring=str(dt.year)+"-"+str(dt.month).zfill(2)+"-"+str(dt.day).zfill(2)
    link1=link_base+datestring+"/ed-1/seq-1/ocr.txt"
    link2=link_base+datestring+"/ed-1/seq-1.pdf"

    gethtmlfile(link1,datestring)
    getpdffile(link2,datestring)


https://chroniclingamerica.loc.gov/lccn/sn83030214/1918-01-01/ed-1/seq-1/ocr.txt
https://chroniclingamerica.loc.gov/lccn/sn83030214/1918-01-01/ed-1/seq-1.pdf
<class 'urllib.error.HTTPError'>
https://chroniclingamerica.loc.gov/lccn/sn83030214/1918-01-01/ed-1/seq-1.pdf
<class 'urllib.error.HTTPError'>
https://chroniclingamerica.loc.gov/lccn/sn83030214/1918-01-01/ed-1/seq-1.pdf
<class 'urllib.error.HTTPError'>
https://chroniclingamerica.loc.gov/lccn/sn83030214/1918-01-02/ed-1/seq-1/ocr.txt
https://chroniclingamerica.loc.gov/lccn/sn83030214/1918-01-02/ed-1/seq-1.pdf
<class 'urllib.error.HTTPError'>
https://chroniclingamerica.loc.gov/lccn/sn83030214/1918-01-02/ed-1/seq-1.pdf
<class 'urllib.error.HTTPError'>
https://chroniclingamerica.loc.gov/lccn/sn83030214/1918-01-02/ed-1/seq-1.pdf
<class 'urllib.error.HTTPError'>
https://chroniclingamerica.loc.gov/lccn/sn83030214/1918-01-03/ed-1/seq-1/ocr.txt
https://chroniclingamerica.loc.gov/lccn/sn83030214/1918-01-03/ed-1/seq-1.pdf
<class 'urllib.error

Here is the full scan of the front page, displayed in the notebook using the tools available in the IPython module. 

In [45]:
from IPython.display import Image as Img
Img(filename="./loc_papers/example/1918-01-07.jpg")



FileNotFoundError: [Errno 2] No such file or directory: '/cp631/lesson12/loc_papers/example/1918-01-07.jpg'

(scan above from Library of Congress, it is Public Domain because it was published over 100 years ago, like other figures in this lesson)

That is quite a lot of text, and it is has many more columns than modern newspapers.  To make the work for this example a little bit easier, let's look at a section of the above page, the one containing the first article.

In [None]:
from IPython.display import Image as Img
Img(filename="/cp631/lesson12/loc_papers/example/coal-1918-01-07.jpg")

We can see that the scan is not perfect. The text rows are not straight and the letters are blurred. Even though this text is still quite easy to read for a human being, we will see that OCR software will not be able to read this text perfectly. We could try to improve things by doing some preprocessing on the image, and for example increasing the contrast before feeding the image into OCR software. However, in this case we will just go straight to the text recognition (OCR) step.

Let us now look at the OCR text data of this section provided by the Library of Congress.

In [None]:
f=open('/cp631/lesson12/loc_papers/example/1918-01-07.txt')
text=f.readlines()
f.close()
for line in text[22:59]:
    print(line,end='')

The software we will be using for OCR is called Tesseract.  It is one of the most popular software packages in its class, and it is free and open source.  It is available as a standard package in most common Linux distribution.  However, the software bundled with Linux is typically older and thus does not incorporate the latest advances in OCR techniques.  To take advantage of those, we will build the latest available version of Tesseract from source and use that.  We will also download recent language training data.

Tesseract can be run from the command line.  We can run such commands inside the notebook.  In this case this command will generate the output.txt file which will contain the results of the scan.

In [None]:
! TESSDATA_PREFIX=/usr/share/tesseract/4/tessdata tesseract /cp631/lesson12/loc_papers/example/coal-1918-01-07.jpg output
! cat output.txt

The result of this command will be available in the file output.txt  However, we want a way to call Tesseract directly inside a Python script and hence inside a Jupyter notebook.  We can do this via a module called pytesseract which provides a wrapper for this purpose.  We set up pytesseract to use the latest tesseract executable we compiled and we also point it to the location of our language data file.

In [None]:
# if import fails, run (after sourcing your virtual environment file)
# pip install pytesseract
import pytesseract
# it is possible to point pytesseract to use a non-default versions
#pytesseract.pytesseract.tesseract_cmd = '/home/ubuntu/local/bin/tesseract'
#tessdata_dir_config = '--tessdata-dir "/home/ubuntu"'

We use the PIL module to open an image inside a Python script and pass it to pytesseract, which performs the OCR and stores the result in the string.  Finally, we print the string.

In [None]:
import pytesseract
from PIL import Image
im=Image.open('/cp631/lesson12/loc_papers/example/coal-1918-01-07.jpg')
#resultOCR=pytesseract.image_to_string(im,config=tessdata_dir_config)

resultOCR=pytesseract.image_to_string(im)


print(resultOCR)



FileNotFoundError: [Errno 2] No such file or directory: '/cp631/lesson12/loc_papers/example/coal-1918-01-07.jpg'

Compare this result to the OCR text obtained from the Library of Congress.  Modern Tesseract seems to be more accurate for some words, though it also makes some mistakes that the older OCR text does not.

Our goal is to process multiple pages using Spark. To do this, we want to gather multiple image files of the scan into an RDD. For simplicity the RDD will contain only one file for now, but we could have loaded more files by using a wildcard in the file name.

In [None]:
imgfiles=sc.binaryFiles('/cp631/lesson12/loc_papers/example/coal-1918-01-07.jpg')
# to load more files, we could do:
#imgfiles=sc.binaryFiles('/home/ubuntu/adelaide/loc_papers/*.jpg')

An RDD created in this way consists of tuples (pairs), consisting of a string containing the file name, and then the actual binary content of the file.  We want to be able to run a map operation on this RDD, which will take the binary content of each file as output, pass it to pytesseract, and return the OCR text output for that file as a string.

The function which will do this is shown below.  This function takes a pair tuple, converts the second part of the pair containing binary data into image object data, runs pytesseract on that, and returns a tuple with filename and the string containing the OCR text.

In [None]:

from PIL import Image
import io

def process_whole_file_with_spark(pair_tuple):
    
    filename=pair_tuple[0]
    image_data=io.BytesIO(pair_tuple[1])
    im=Image.open(image_data)
#    textOCR = pytesseract.image_to_string(im,config=tessdata_dir_config)
    textOCR = pytesseract.image_to_string(im)
    return filename,textOCR



For this to work, the tesseract executable must be visible to all Spark process, and this will require the directory in which it is located to be in the path.  The content of the path may be examined by running the command below.  Then the softlink to tesseract software may be created with the ln command in one of those directories.

In [None]:
!echo $PATH
# must put tesseract executable in one of the directories listed in path
# ln -s /home/ubuntu/local/bin/tesseract tesseract

We now run the function on the binary files RDD using a map command, generating a new RDD with the string containing the OCR text.

In [None]:
tesseract_result = imgfiles.map(process_whole_file_with_spark)
result=tesseract_result.collect()
print(result)

At this point we have all the tools we need to analyze multiple binary files.  There is a corresponding command for building an RDD out of multiple text files, resulting in an RDD of pairs, containing the string with the filename, and the string with the file content.

Let's now read in the OCR text files for all the newspaper editions we have in our dataset:

In [None]:
# now read more files
htmlfiles=sc.wholeTextFiles('/cp631/lesson12/loc_papers/*.txt',minPartitions=64)

We now wish to search for a specific term.

In [None]:
def search_function(s):
    searchterms=["Canada","Canadian"]
                 
    count=0
    for t in searchterms:
        count = count + s.count(t)
    return count

We apply this function to our text files dataset, obtaining the count of the number of times our search terms occur.  We also convert the filename to just the date of the newspaper edition to which the file corresponds.

In [None]:
# should make list for search terms
#searchterm="Canadian"
# in the map below we extract just the date from the full file name
# 32-42 range will likely need changing on different file system
termcount=htmlfiles.map(lambda x:(x[0][32:42],search_function(x[1])))

Our new RDD now contains a date string and the number of occurences of search terms.   Next we want to plot this data.

In [None]:
termcount.take(10)

For working with dates Python has the datetime module.  This can take a date in string format and convert it to a datetime object, which we will use further.

In [None]:
import datetime
# need list of tuples
# each containing: python date object, counts
termcount_pythondates=termcount.map(lambda x: (datetime.date(int(x[0][0:4]),int(x[0][5:7]),int(x[0][8:10])),x[1]))
termcount_pythondates.take(10)

To plot these dates we collect the RDD into a list on the Spark master process.

In [None]:
termcount_pythondates_local=termcount_pythondates.collect()

In [None]:
print(termcount_pythondates_local)

Now we plot the data.  For this we use numpy and matplotlib modules.  The datetime object for each date can be converted to ordinal, which is a unique integer for each day.  We can then modify data array elements (initialized to zero) for all the dates at which there were newspaper editions.  We finally plot the figure using the special functionality of matplotlib for plotting dates.  If you want a more detailed description of what we are doing here, please refer to online documentation for matplotlib, numpy and datetime.  The function we define for plotting takes as arguments the start and end date for the range of editions that we wish to plot, with all editions being processed having dates falling within the window.

In [None]:
import matplotlib.pyplot as plt
import matplotlib
from matplotlib.dates import drange
from numpy import zeros


def plot_date_count(ys,ms,ds,ye,me,de,termcount_pythondates_local):
    date1 = datetime.datetime(ys,ms,ds)
    date2 = datetime.datetime(ye,me,de+1)
    delta = datetime.timedelta(days=1)
    dates = drange(date1,date2,delta)
    data=zeros(len(dates))

# fill in the data

    for ele in termcount_pythondates_local:
        assert ele[0].toordinal()-date1.toordinal() > -1 
        assert ele[0].toordinal()-date1.toordinal() < len(data)
        data[ele[0].toordinal()-date1.toordinal()]=ele[1]
    
    fig, ax = matplotlib.pyplot.subplots()

    ax.plot_date(dates,data)

    fig.autofmt_xdate()

    plt.show()
    
plot_date_count(1918,1,1,1918,2,3,termcount_pythondates_local)

(figure generated by me in notebook)

Next we want to repeat this plot with text data obtained through our own run of tesseract.  To do this, we form an RDD from all available page scan files.

In [None]:
# running tesseract is time consuming, so need to select appropriate number of files in this step if reasonble runtime desired
#imgfiles=sc.binaryFiles('/cp631/lesson12/loc_papers/*.jpg')
#imgfiles=sc.binaryFiles('/cp631/lesson12/loc_papers/1918-01-07.jpg')
imgfiles=sc.binaryFiles('/cp631/lesson12/loc_papers/1918-01-0*.jpg')

It is important to cache this RDD, so that the time consuming RDD step is performed only once.

In [None]:
tesseract_result = imgfiles.map(process_whole_file_with_spark).cache()

In [None]:
tesseract_result_first=tesseract_result.first()
print(tesseract_result_first[0])
print(tesseract_result_first[1][0:20])


We apply the search function defined above via a map.

In [None]:
termcount=tesseract_result.map(lambda x:(x[0][32:42],search_function(x[1]))).cache()

In [None]:
termcount.first()

We convert dates to datetime object via another map.

In [None]:
import datetime
# need list of tuples
# each containing: python date object, counts
termcount_pythondates=termcount.map(lambda x: (datetime.date(int(x[0][0:4]),int(x[0][5:7]),int(x[0][8:10])),x[1])).cache()
termcount_pythondates.take(1)

Finally, we collect that data and plot the result using function defined previously.

In [None]:
termcount_pythondates_local=termcount_pythondates.collect()
plot_date_count(1918,1,1,1918,1,9,termcount_pythondates_local)

(figure generated by me in notebook)

We can now compare this graph generated with our OCR using tesseract to one generated from Library of Congress OCR data.  We can see that our Tesseract has fewer term detections, which indicates that it does worse, at least when searching for our terms.

You can see how the Spark code we have written enabled us to process multiple binary files with an external program, and then process the results within Spark.  This method is general and could be extended to any workflow in which multile files are processed with various tools.  This is one of the reasons for popularity of Spark when it comes to massive parallel workflows.  As long as the problem we are working on fits the constraints of the Spark paradigm, with each file processing being independent of the others, then Spark is very well suited to the task.

## Summary

In this lesson we have learned some advanced features of Spark.

* Spark comes with a set of powerful mathematical tools built in, parallelized and ready to use.  It contains a Machine Learning suite, which provides topic modelling algorithms.
* Topic modelling can be used to classify a set of documents without topics defined a priori.  This is highly efficient for large sets of documents which could not be read in a reasonable time by a human.
* DataFrames are an extension of RDDs, storing data as well as labels for that data.
* Spark can create RDDs out of multiple files, keeping each file as a distinct object.  This is particularly useful for binary files, such as images or videos.
* Spark can run arbitrary programs on these binary files, making it very versatile in performing analysis on large datasets in parallel.

In the following lesson we will wrap up the whole course with a short lesson summarizing what we have learned.

## Discussion exercises
 
Try these exercises and post your efforts on the discussion board.  See the syllabus for more details.

1. In the topic modelling section of lesson 12, experiment by changing the number of topics and modifying the cutoffs for the most frequently and least frequently used words.  Try to obtain the most meaningful topics that you can.  Show the parameters that worked best and the top words of the resulting topics.

2. In the newspaper dataset text data we were working on, count how many times the ship Titanic is mentioned in each month between 1910 and 1920. Show the code necessary to do that.

3. When working on images, it can be useful to pre-process the images first so that the text recognition software does a better job, by either being more accurate or being faster.  Read the documentation of the image processing features of the Image module, and try to modify the image before doing text analysis.  You can for example decrease the resolution of the image, which should result in faster processing at the cost of decreased accuracy.  Measure the performance of the code and observe how the detection count of selected words changes.

To locate Discussion forum 12 click on “Discussions” at the top of your MyLS webpage and select “Discussion Forum 12”. For further details and grading rubric consult the course syllabus.
 