# Multithreading

This notebook contains an example of how to implement a multithreaded version of the seq2seq model over a directory of text files.

In [1]:
import os
import threading
from threading import Thread

# these two lines help with locating the file from this notebook
import sys
sys.path.append('../')

from seq2seqocr import Seq2SeqOCR

model = Seq2SeqOCR()



2021-09-13 14:09:52.250393: I tensorflow/core/platform/cpu_feature_guard.cc:142] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN) to use the following CPU instructions in performance-critical operations:  AVX2 FMA
To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags.


We can use the os.walk() function to create a queue of files to process.

In [2]:
paths_to_test = []

for root, dirs, files in os.walk('sample-files/sample-directory'):  # these files are from 10/4/1820 London Times
    for f in files:
        if f.endswith(".txt"):
            paths_to_test.append(root + "/" + f)

paths_to_test

['sample-files/sample-directory/parlimentary1.txt',
 'sample-files/sample-directory/parlimentary2.txt',
 'sample-files/sample-directory/parlimentary3.txt',
 'sample-files/sample-directory/bombay.txt']

Print out some of the first file.

In [3]:
sample_file = paths_to_test[0]

with open(sample_file, 'r') as f:
        print(' '.join(f.read().split()[:45]) + ' [...]') # print first 45 words of one file

PARLIAMENTARY INTELLIGENCE HOUSE OF LORDS, TUESDAY, OCT. 3. This day tbe house re-assembled, pursuant to adjournment. The peers began to take their places soon after nine o'clock, and at about seven minutes before ten the Lord-Chancellor entered, and seated himself on the woolsack. Prayers were [...]


Now let's define a helper function that houses a model.process_text() call within it and prints the result. We can spawn new threads running this function over all of the files in our queue.  

NOTE: the below code will take a while to run (1-2 minutes on my local machine). Please be patient.

In [4]:
def threaded_process(input_text : str) -> None:
    processed_text = model.process_text(input_text)
    # we can do anything with the text here -- save to an array, process it further, etc.
    # I'm just going to print its contents with this gross-looking print statement
    print("Size of file: ", len(input_text), ' words.\n', ' '.join(processed_text.split()[:45]) + ' [...]\n')


for filepath in paths_to_test:
    with open(filepath, 'r') as f:
        file_text = f.read()
        # spawn a new thread, processing each file
        t = Thread(target=threaded_process, args=(file_text,))
        t.start()
        f.close()
    
    def wait_all_threads():
        # citation: https://stackoverflow.com/questions/11968689/python-multithreading-wait-till-all-threads-finished
        for t in threading.enumerate():
            if t.daemon:
                continue
            try:
                t.join()
            except RuntimeError as err:
                if 'cannot join current thread' in err.args[0]:
                    # catchs main thread
                    continue
                else:
                    raise Exception("An error occurred while joining threads")
    
    wait_all_threads()

2021-09-13 14:09:55.758734: I tensorflow/compiler/mlir/mlir_graph_optimization_pass.cc:176] None of the MLIR Optimization Passes are enabled (registered 2)


Size of file:  640  words.
 parliamentary intelligence house of lords tuesday oct this day tbe house reassembled pursuant to adjournment the peers began to take their places soon after nine oclock and at about seven minutes before ten the lord chancellor entered and seated himself on the wool sack prayers [...]

Size of file:  1939  words.
 signor mariettithe earl of liverpool before their lord ships proceeded to thc business of thc day wished to say a few words on a transaction which had been madc the subject of conversation previously to the adjournment he alluded to what had passed respecting a [...]

Size of file:  1646  words.
 expense of tlhe priscee dirgs against theiueenlhe earl of darnley prose to move thaan nccolnt of the money expendced in the proceedings against her mlajetay be laid before the noa se lhe had before urged tile prdoriety of thleir lord ships calling to a statdnient [...]

Size of file:  2092  words.
 wuu bomlbay direct with leave to call at imatleiran loading

Some of the threads took longer to finish because their files have more text to be evaluated. However, this threaded program still processes the directory in a reasonably-fast time.  

Below is an example of a threading program that takes every file in ORIGINAL_DIR, processes it, and then writes a new version of that file to PREDICTED_DIR. Some lines are commented out so as to not have this program create new files in the sample directory. To use this code, un-comment the lines.

In [10]:
from threading import Lock

printLock = Lock()

# edit these to point at your data
ORIGINAL_DIR = 'sample-files/'
PREDICTED_DIR = 'predicted-files/'

def seq2seq_process_direc(original_dir, predicted_dir, replace=False):
    '''
    Program that processes all files in a directory.

    Un-comment out the lines below if you are using this on your own files.
    '''
    model = Seq2SeqOCR()

    #if not os.path.isdir(predicted_dir):
        #os.mkdir(predicted_dir)

    paths_to_test = []

    for root, dirs, files in os.walk(ORIGINAL_DIR):
        old_file_dir = root.split('/')[-1]
        processed_file_dir = os.path.join(PREDICTED_DIR, old_file_dir)
        #if not os.path.isdir(processed_file_dir):
            #os.mkdir(processed_file_dir)
        for f in files:
            if f.endswith('.txt'):
                paths_to_test.append((root, processed_file_dir, f))
    
    while (len(paths_to_test) > 0):
        root, processed_file_dir, filename = paths_to_test.pop()
        old_filepath = os.path.join(root, filename)
        new_filepath = os.path.join(processed_file_dir, filename)

        if (not replace) and (os.path.exists(new_filepath)):
            s_print("File already exists: " + new_filepath)
            continue

        t = Thread(target=threaded_newfile_process, args=(old_filepath, new_filepath, model))
        t.start()
    
    wait_all_threads()


def threaded_newfile_process(old_filepath, new_filepath, model):
    '''
    spawns a new thread that process a single file and write its output to a new file
    '''
    with open(old_filepath, 'r') as f:
        orig_text = f.read()
        f.close()

    #processed_text = model.process_text(orig_text)

    #with open(new_filepath, 'w') as f:
        #f.write(processed_text)
        #f.close()
    
    s_print("Created new file: " +  new_filepath)

def s_print(msg):
    '''
    thread-safe print
    '''
    printLock.acquire()
    try:
        print(msg)
    finally:
        printLock.release()


seq2seq_process_direc(ORIGINAL_DIR, PREDICTED_DIR)

Created new file: predicted-files/sample-directory/parlimentary2.txt
Created new file: predicted-files/sample-directory/parlimentary3.txt
Created new file: predicted-files/sample-directory/parlimentary1.txt
Created new file: predicted-files/sample-directory/bombay.txt
Created new file: predicted-files/floods.txt
Created new file: predicted-files/floods_correct.txt
Created new file: predicted-files/floods_paired.txt
