# Using Python's Multiprocessing Library, Part 1C

Frank Neugebauer
May 19, 2019

In [1]:
import pandas as pd
import os
import logging
from datetime import datetime
from multiprocessing import Pool

## 1. Parallel Corpus Preprocessor

### Word Count

Created a function that counts the number of times each word appears throughout the entire collection of articles. The result is a Python dictionary containing each word as a key and the word count as a value.

This part is very different than the first two becasue in this case, all I want is the cleaned up text. I used Apache Spark in a Google Cloud instance to run the code as a remote job within Google Cloud. 

The process for creating this third part is as follows:

1. Modify the code from part 1b to only create clean code as text. Didn't need vectors for Spark. The output of the following code is a text file called corpus.txt.
2. Create a relatively simple program to invoke the map reduce in Spark. This does all the work for word counts and uses corpus.txt.
3. Move the corpus.txt file into a bucket within Google Cloud.
4. Configure my local command line using the Google Cloud SDK Shell.
5. Run the python program from step #2 as a job that get submitted to Google Cloud. Here is the command I ran, which shows the input and ouput locations (the python program reads all files within the input and outputs multiple files).

`gcloud dataproc jobs submit pyspark word-count.py --cluster=%CLUSTER% -- gs://%BUCKET_NAME%/input/ gs://%BUCKET_NAME%/output/`

Here is the code from step #1.

In [None]:
def start_logger():
    logging.basicConfig(filename ='./log/exercise_1_c_log_%s.log' %
                        datetime.strftime(datetime.now(), '%m%d%Y_%H%M%S'),
                        level = logging.DEBUG, format='%(asctime)s %(message)s', datefmt='%m-%d%H:%M:%S')

def read_article_jsonl(file_paths):
    articles = []
    logging.debug('In read_article_json...')
    for file_path in file_paths:
        logging.debug('Reading the ' + file_path + ' file...')
        wiki_file_full = pd.read_json(file_path, lines=True)
        articles.append(wiki_file_full['section_texts'])
    return articles

def read_json_directory():
    WIKI_DIR = '.\\wikipedia\\featured-articles'
    logging.debug('In read_json_directory...')
    logging.debug('Building paths...')
    json_file_paths = [
        entry.path
        for entry in os.scandir(WIKI_DIR) if entry.name.endswith('.jsonl')
    ]
    logging.debug('Starting the pooling...')
    articles = read_article_jsonl(json_file_paths)
    logging.debug('There are ' + str(len(articles)) + ' dictionaries in the articles list.')
    logging.debug('Finished building the article dictionary.')
    """
    with Pool(processes=n_processes) as pool:
        articles = pool.map(read_article_jsonl, json_file_paths) 
    """
    return articles

def strip_extras(this_text):
    logging.debug('Inside strip_extras...')
    return_str = this_text.replace('\n', '').replace('\'', '')
    return return_str

def final_clean(doc):
    logging.debug('Inside final_clean...')
    doc.lower().split(" ")
    return doc

def create_clean_string(all_documents):
    logging.debug('Inside tokenize_documents...')

    clean_string_list = []
    for text_list in all_documents:
        for text in text_list:
            clean_string_list.append(strip_extras(text[0]))

    file = open('corpus.txt','w', encoding='utf-8')
    for clean_string in clean_string_list:
        file.write(clean_string + ' ')

    file.close()

if __name__ == '__main__':
    start_logger()
    logging.debug('Starting word count...')

    articles = read_json_directory()
    create_clean_string(articles)

Here is the code from step #2. Again, this code was run via the Google Cloud SDK Shell and executed within Google Cloud using Spark.

In [None]:
#!/usr/bin/env python
import pyspark
import sys
from pyspark.sql import SparkSession

if len(sys.argv) != 3:
  raise Exception("Exactly 2 arguments are required: <inputUri> <outputUri>")

sc = pyspark.SparkContext()
lines = sc.textFile(sys.argv[1])
words = lines.flatMap(lambda line: line.split())
wordCounts = words.map(lambda word: (word, 1)).reduceByKey(lambda count1, count2: count1 + count2)
wordCounts.saveAsTextFile(sys.argv[2])

The output file is extremely large as expected. Here is a sample).
`(u'Fame.', 27)
(u'history.The', 6)
(u'Fame,', 17)
(u'Bowl",', 3)
(u'1,800', 5)
(u'Fame"', 1)
(u'Record,', 2)
(u'1154,', 1)
(u'1744)', 1)
(u'Penn).', 1)
(u'Fame:', 2)
(u'Dartmouth,', 2)
(u'Waltheofs', 1)
(u'non-central', 3)
(u'comically', 2)
(u'localized', 7)
(u'hours.Around', 1)
(u'Septimus', 1)
(u'Proto-Mayan', 1)
(u'dome,', 2)
(u'originality', 2)
(u'mutinies', 1)
(u'L\u01d0', 1)
(u'restrengthening', 1)
(u'virtuoso"', 1)
(u'nonmetallic,', 1)
(u'Jornada', 1)
(u'Kirwan', 2)
(u'Tennessee,', 19)
(u'1154.', 2)
(u'Tennessee.', 10)
(u'wood,', 8)
(u'wood.', 7)
(u'closers', 1)
(u'conductivity,', 1)
(u'staffing.On', 1)
(u'revivals.In', 1)
(u'disagreed.', 1)
(u'progesterone,', 1)
(u'Kempf', 1)
(u'wooded', 6)
(u'grains', 6)
(u'wooden', 42)
(u'Loeb', 1)
(u'Ordained', 1)
(u'black-cloaked', 1)
(u'Sack', 1)
(u'May', 462)`
