## Tesseract
Is a library from google (https://github.com/tesseract-ocr/tesseract) to perform OCR  
We will leverage the Pytesseract bindings to tesseract via spark to perform distributed OCR against document strings in a dataframe  

### First, 
We develop our algorithm locally  
The file we are using is public domain shakespeare - about 100 pages long  
For simplicities sake, we can just copy/paste this file a bunch of times to load the system. My system has only 8 logical cores, so I will only need about 8 files to complete the tutorial. Adjust as you see fit.    
In reality, input files for a process like this can vary widely from a single page to over a thousand - which the last part of this tutorial deals with (skew)

In [1]:
import pytesseract
import pdf2image
import time
import os
import re
max_dop = 8

In [24]:
# ! curl -O https://shakespeare.folger.edu/downloads/pdf/julius-caesar_PDF_FolgerShakespeare.pdf 
# TODO Adjust to max_dop  with shell magic
! seq 8 | xargs -I XY cp ./by_the_sea_poe.pdf ./copy_by_the_sea_poeXY.pdf

In [25]:
fnames = list(map(lambda x: '/'.join([os.getcwd(),x]), filter(lambda x: x.endswith('.pdf'), list(os.walk(os.getcwd()))[0][2])))

In [26]:
fnames

['/home/bsavoy/pytesseract_on_pyspark/by_the_sea_poe.pdf',
 '/home/bsavoy/pytesseract_on_pyspark/copy_by_the_sea_poe6.pdf',
 '/home/bsavoy/pytesseract_on_pyspark/copy_by_the_sea_poe7.pdf',
 '/home/bsavoy/pytesseract_on_pyspark/copy_by_the_sea_poe1.pdf',
 '/home/bsavoy/pytesseract_on_pyspark/copy_by_the_sea_poe2.pdf',
 '/home/bsavoy/pytesseract_on_pyspark/copy_by_the_sea_poe8.pdf',
 '/home/bsavoy/pytesseract_on_pyspark/copy_by_the_sea_poe4.pdf',
 '/home/bsavoy/pytesseract_on_pyspark/copy_by_the_sea_poe5.pdf',
 '/home/bsavoy/pytesseract_on_pyspark/copy_by_the_sea_poe3.pdf']

In [27]:
content = pdf2image.convert_from_bytes(open(fnames[0],'rb').read())
doc_string = ''
start = time.time()
for page in content:
    doc_string += pytesseract.image_to_string(page)
    
elapsed = time.time() - start
print('took {}s'.format(elapsed))

took 3.9696993827819824s


## Let's see how that scales on our box
And I don't see that scaling well beyond a handful of documents at a time - perhaps for an ad-hoc process
Perhaps performance will be better if we multiprocess it - pytesseract is a binding with the tesseract executable, afterall - sub-processes also sidestep interpreter lock.  
We should be able to get some benefit by forking more external processes

In [28]:
def do_ocr(fname):
    content = pdf2image.convert_from_bytes(open(fnames[0],'rb').read())
    doc_string = ''
    for page in content:
        doc_string += pytesseract.image_to_string(page)

In [34]:
# TODO convert to pandas and display over matplotlib
data

[[1, 9, 40.4643657207489],
 [2, 9, 23.20615577697754],
 [3, 9, 18.52650022506714],
 [4, 9, 21.658716678619385],
 [5, 9, 20.666146993637085],
 [6, 9, 23.207656621932983],
 [7, 9, 30.47342801094055],
 [8, 9, 287.7722792625427]]

## Take away
The benefit of parallelism drops before we even saturate even the physical core count. Welcome to suggestions, but my first guess is context switching cost since the CPU is so busy between different tesseract threads.  
Therefore, anecdotally, we have only up to 3 concurrent tesseract processes (which in reality, are using multiple cores as they run) before we start seeing a performance drop off which is less than the 4 physical cores of my CPU, let alone the 8 logical cores. Rougly 1/3 of the cores available on the box.      

This is a small file - with only 2 pages. In other applications, you will could run into files of varying length. A 100 page sample here takes approximately 1000s  - which we can use to project that even for 50 documents on our box, with a parallelism of 3, we could take up to 50,000/3 or 16,667 seconds or 277 minutes. Even leveraging a larger box, with 96 cores, we can only use roughtly 1/3 of the available cores we would still end up with a 32nd degree of parallelism, roughly 1500s for a run or about 26 minutes. Hardly adequate for streaming or any other time sensitive work loads.  

If we're dividng the work by document, the best, on average performance we could get for a run would be about 1000s (foreshadowing the last example).

We also have a major disadvantage that this solution MUST run on single node.  
We could split the workload across a number of hosts but this has orchestration and scheduling headaches    
Also, what if you wanted to coherently deal with this data in a dataframe API after extracting the text?  
Spark helps with these problems  
* Spark can handle arbitrarily large datasets by natively splitting data across a cluster   
* Spark can apply transformations in a distributed fashion and return a single coherent dataset back from our transformations  
* The workload can scale to N number of nodes - we tune executors for the individual hosts we plan to have in the cluster and let spark loose to schedule the work

In [32]:
from multiprocessing import Pool
data = []
# In my VM, I only have 8 cpu cores to work with (really 4 physical and 8 logical) available on my host
for dop in range(1,9):
    with Pool(dop) as p:
        start = time.time()
        p.map(do_ocr, fnames)
        elapsed = time.time() - start
        print('dop={} files={} seconds={}'.format(dop,len(fnames),elapsed))
        data.append([dop,len(fnames),elapsed])


dop=1 files=9 seconds=40.4643657207489
dop=2 files=9 seconds=23.20615577697754
dop=3 files=9 seconds=18.52650022506714
dop=4 files=9 seconds=21.658716678619385
dop=5 files=9 seconds=20.666146993637085
dop=6 files=9 seconds=23.207656621932983
dop=7 files=9 seconds=30.47342801094055
dop=8 files=9 seconds=287.7722792625427


## Next,
* We will package our OCR function into a method that Pyspark will pickle and distribute to worker nodes - eerily similarly to the do_ocr function above.    
* In our driver program, we will instruct spark to read in the PDFs as binary data into an RDD  
* We will use the built-in RDD map function to apply our ocr function to the binary content of the PDFs in each element  
* When execution is complete - we will have a new RDD which consists of the parsed text by tesseract, along with some telemetry and error handling (for troublesome PDFs or PDFs that well, aren't PDFs)
* Finally we can simply convert the RDD to a dataframe and use spark.sql api functions against our OCR'd text

In [1]:
from pyspark.sql import SparkSession

In [2]:
spark = SparkSession.builder.getOrCreate()

In [3]:
an_rdd = spark.sparkContext.parallelize(range(10))

In [4]:
an_rdd.collect()

[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

In [7]:
def do_pytesseract(record):
    import socket
    import time
    
    file_name = record[0]
    binary_content = record[1]
    start_time = time.time()
    try: 
        ocr_text = do_ocr(binary_content)
    except Exception as e:
        ocr_text  = str(e)
    final_time = time.time() - start_time
    
    return [socket.gethostname(), file_name, ocr_text, str(final_time) + 's']

def do_ocr(binary_content):
    from pdf2image import convert_from_bytes
    import pytesseract
    
    content = convert_from_bytes(binary_content)
    doc_string = ''
    for page in content:
        doc_string += pytesseract.image_to_string(page)
        
    return doc_string
    

## Last,
In the programs current state, if a large document is encountered by a worker, there is a chance the rest of the cluster could finish their documents and the remaining document keep the stage open, bottlenecking work and wasting compute. Also, our best case performance (in the case of my box) would be 1000s for an entire run (average for a single document). How could we improve this?    

With some minor complexity, we can digest documents to their component pages before processing and, instead of distributing the document for processing, distribute the pages for a document and recombine them by a document key at the end. This avoids strapping a node with a larger docuement and decreases our best case runtime to the length of time to process a single page (if we had enough nodes to process every single page from every single document at once time). For easy math, say a 1000s minimum for a run reduces to 1000page/s/100page = 10s/page - for a best case runtime of 10s    

In [None]:
raw_df = spark.read.binaryFiles()