# Parallel Word Count: Overall Word Count

When most people hear Map Reduce, they usually think Hadoop, Java, Big Data, etc. Map Reduce is actually a parallel programming model that has been around for a long time. It consists of two steps: a map step where an operation is executed on a number of files in parallel (like the throughput example we just talked about), and a reduce step where the output of the map step is combined into a single output.

In our word count example, the top 5 overall word counts is a good example of a problem that fits the Map Reduce paradigm. Getting the word counts for each book is the Map step, and combining the those word counts to get an overall count across the corpus is the Reduce step.

<img src="images/MapReduce.png" alt="Map Reduce" style="height: 200px;"/>

## MapReduce with LLMapReduce

You can run a MapReduce job with very few changes to your serial code. This approach is language-agnostic, although the specific implementation may change with language.

LLMapReduce is a command that is available on LLSC Systems and the MIT Supercloud. It is a bit like running a Job Array for the map step, followed by a serial job for the reduce step.

### Starting Point: Serial Code

For reference, here is the starting serial Python code with Slurm Submission script. Since we would submit this with LLsub, it doesn't contain any Slurm options.

#### Serial Code

```python
import os
import sys
sys.path.append('../')
from helpers import *

# Load the file names                                                                                                                                      
dataDir = "../books/"
fnames = os.listdir(dataDir)

allcounts = []
for fname in fnames:
    f = open(dataDir+fname, 'r', encoding='utf-8')
    text = cleantext(f.readlines())
    allcounts.append(countwords(text))

globalcounts = dict()
for counts in allcounts:
    globalcounts = { k: counts.get(k, 0) + globalcounts.get(k, 0) for k in set(counts) | set(globalcounts) }
    
top5 = sorted(globalcounts, key=globalcounts.get, reverse=True)[:5]
for k in top5:
    print("{}: {}".format(k, globalcounts[k]))
```

#### Submission Script

```bash
#!/bin/bash


# Initialize the Module command
source /etc/profile

# Load the anaconda module
module load anaconda/2020a


# Run your script as you would from the command line
python top5overall.py
```

### Identify Map and Reduce Portions of your Code and Separate

Create two separate files to put the Map and Reduce portions of your code. The Map step can usually be identified by some sort of for loop- something you could implement as a Job Array. The Reduce step should take the result of the Map step and produce a single result.

In our example, the Map step is counting the number of times each word appears in each book. The reduce step is summing the counts for each word to get an overall count.

We create two files: `top5overall_map.py` and `top5overall_reduce.py`.

The contents of `top5overall_map.py`:
```python
import os
import sys
sys.path.append('../')
from helpers import *

# Load the file names                                                                                                                                      
dataDir = "../books/"
fnames = os.listdir(dataDir)

allcounts = []
for fname in fnames:
    f = open(dataDir+fname, 'r', encoding='utf-8')
    text = cleantext(f.readlines())
    allcounts.append(countwords(text))
```

The contents of `top5overall_reduce.py`:
```python
globalcounts = dict()
for counts in allcounts:
    globalcounts = { k: counts.get(k, 0) + globalcounts.get(k, 0) for k in set(counts) | set(globalcounts) }
    
top5 = sorted(globalcounts, key=globalcounts.get, reverse=True)[:5]
for k in top5:
    print("{}: {}".format(k, globalcounts[k]))
```

### Edit the Map Script: Reading the Input File

LLMapReduce will pass in the name of a file containing the list of files an individual process should iterate over. You'll have to read in this file and edit your code to iterate over the line of it.

To read it in:

```python
inOutFileList = open(sys.argv[1],"r+")
```

Then change the start of your for loop:

```python
for line in inOutFileList.readlines():

    # Get input and output file names for this iteration
    (inFile,outFile) = line.split()
    print("Reading from " + inFile + " and writing to " + outFile)

    # Read in file and clean the text
    infid  = open(inFile,"r+", encoding='utf-8')
    text = cleantext(infid.readlines())
    ...
```



### Edit the Map Script: Writing the Result

You'll need to write the result out into a file at the end of each iteration. Since I'm writing a dictionary, I'm using a JSON file, but you could use any other file type that you can easily read and write:

```python
    ...
    # Save the word counts to the passed in output file name 
    with open(outFile+'.json', 'w') as fp:
        json.dump(wordCounts, fp)
```

### Edit the Reduce Script: Grab the Inputs

LLMapReduce passes two arguments into the Reduce script:
- The directory where the output of the Map step is written
- The name of the file where the output of the Reduce step should be written

By now you are probably familiar with grabbing arguments. You'll grab them and then read in all the input files:

```python
# Grab the two arguments that are passed in
# The first is the directory containing the output of the mapper
# The second is the name of the file where the final word counts should be saved
inputdir = sys.argv[1]
outputfile = sys.argv[2]

# Read in the first word count file
fnames = os.listdir(inputdir)
with open(inputdir+"/"+fnames[1], 'r') as fp:
    globalranks = json.load(fp)
```

### Edit the Reduce Script: Writing the Final Output

Finally we can write out the final ouptut. I'm writing to a csv file because it is both human readable and easily parsed in case it needs to be read by another program.

```python
# Save to the output file
with open(outputfile+'.csv', 'w') as output_file:
    csv.writer(output_file).writerows({k: globalcounts[k] for k in top5}.items())
```

### Create Map and Reduce Wrapper Bash Scripts

The final step is to create wrapper bash scripts for the mapper and reducer. These are fairly standard and there isn't much to them, but this is where you can load any modules or set any environment variables you might need.

For `mapper.sh`:
```bash
#!/bin/bash


# Initialize Modules
source /etc/profile

# Load the anaconda module
module load anaconda/2020a

# Call your script as you would from the command line, passing in $1 and $2 as arugments
# Note that $1 and $2 are the arguments passed into this script
python top5overall_map.py $1 $2
```

For `reducer.sh`
```bash
#!/bin/bash

# Initialize Modules
source /etc/profile

# Load the anaconda module
module load anaconda/2020a

# Call your script as you would from the command line, passing in $1 and $2 as arugments
# Note that $1 and $2 are the arguments passed into this script
python top5overall_reduce.py $1 $2
```

### Call LLMapReduce

The final step is to call `LLMapReduce`:


```bash
LLMapReduce --mapper mapper.sh --reducer reducer.sh --input path/to/data/ --output counts --apptype=mimo --np=4
```

If you would like to keep the output and intermeidate files, say for debugging, you can add the option `--keep=true`. This will keep the log files that are written out.

There are many options to `LLMapReduce`. You can see them with a short description by running `LLMapReduce -h` at the command line.

### Final Scripts

#### Final Python Scripts

`top5overall_map.py`:
```python
import os, sys, json
sys.path.append('../')
from helpers import *

inOutFileList = open(sys.argv[1],"r+")

for line in inOutFileList.readlines():

    # Get input and output file names for this iteration
    (inFile,outFile) = line.split()
    print("Reading from " + inFile + " and writing to " + outFile)

    # Read in file and clean the text
    infid  = open(inFile,"r+", encoding='utf-8')
    text = cleantext(infid.readlines())

    # Count number of times each word appears
    wordCounts =countwords(text)

    # Save the word counts to the passed in output file name 
    with open(outFile+'.json', 'w') as fp:
        json.dump(wordCounts, fp)
```

`top5overall_reduce.py`:
```python
import os, sys, json, csv
sys.path.append('../')
from helpers import *

# Grab the two arguments that are passed in
# The first is the directory containing the output of the mapper
# The second is the name of the file where the final word counts should be saved
inputdir = sys.argv[1]
outputfile = sys.argv[2]

# Read in the first word count file
fnames = os.listdir(inputdir)
with open(inputdir+"/"+fnames[1], 'r') as fp:
    globalranks = json.load(fp)

# Merge the counts from all files
globalcounts = dict()
for fname in fnames:
    with open(inputdir+"/"+fname, 'r') as fp:
        counts = json.load(fp)
    globalcounts = { k: counts.get(k, 0) + globalcounts.get(k, 0) for k in set(counts) | set(globalcounts) }
    
# Sort and print the top 5 words with their counts
top5 = sorted(globalcounts, key=globalcounts.get, reverse=True)[:5]
for k in top5:
    print("{}: {}".format(k, globalcounts[k]))

# Save to the output file
with open(outputfile+'.csv', 'w') as output_file:
    csv.writer(output_file).writerows({k: globalcounts[k] for k in top5}.items())
```

#### Final Wrapper Bash Scripts

`mapper.sh`:
```bash
#!/bin/sh

# Initialize Modules
source /etc/profile

# Load the anaconda module
module load anaconda/2020a

# Call your script as you would from the command line, passing in $1 and $2 as arugments
# Note that $1 and $2 are the arguments passed into this script
python top5overall_map.py $1 $2
```

`reducer.sh`:
```bash
#!/bin/sh

# Initialize Modules
source /etc/profile

# Load the anaconda module
module load anaconda/2020a

# Call your script as you would from the command line, passing in $1 and $2 as arugments
# Note that $1 and $2 are the arguments passed into this script
python top5overall_reduce.py $1 $2
```