-
Notifications
You must be signed in to change notification settings - Fork 3
MR MPI
The emulator can be run using the MapReduce programming model. One alternative implementation of MapReduce from Hadoop is MapReduce-MPI (MR-MPI) created by Sandia National Laboratories.
In order to be able to run MR-MPI there is a setup process that you will have to go through to download and install MR-MPI along with the python bindings if needed. Please follow these MR-MPI Setup Instructions.
To use MR-MPI you must implement the map and reduce functions, I will be using the python bindings.
To start MR-MPI you must also have some MPI python interface installed, I will be using PyPar.
nprocs = pypar.size()
me = pypar.rank()
In the case of the emulator code, the map function handles reading the downsampled input files and running the emulator on the information provided in one downsample file. Note that in order to get the emulator to run in a reasonable amount of time, the number of downsample points for each files should be under 2500.
I found a copy of the downsample files that Rohit was using in his directory on Rush (I'm not sure where they came from). Note that Rohit had two directories inside his user directory on Rush that contained MapReduce Hadoop files. He was using the directories called /ifs/user/shivaswa/my_hadoop and /ifs/user/shivaswa/Hadoop_Emulator. From what I understand, only the files in my_hadoop are need so those are the scripts I have been referencing. I copied all of the files in his my_hadoor directory to my panasas directory under /panasas/scratch/kmarcus2/emulator/rohit. I have a cron job set up to touch every file in /panasas/scratch/kmarcus2/emulator on a nightly basis so these files should not be deleted.
The downsample files that I found in his my_hadoop directory had too many points in them for the emulator to run in a reasonable amount of time. In order to reduce the number of points in each file I ran the following script to only take every 3rd line from each downsample file.
#!/bin/bash
inDir=down_sample_files
outDir=down_sample_files_mod
for f in $inDir/*
do
name=`basename $f`
awk 'NR % 3 == 0 {print;}' $f > $outDir/$name
done
Note that is you want more or less points from each files you just have to change the 3 to some other number that better fits your needs. However if you have any amount less than 2500 points for each downsample file the hazard map may not turn out well.
Once you have the correct downsample input files you can run the emulator inside the map function. Because Hadoop only allowed two map processes per node the emulator code was not able to run in a reasonable amount of time so it was run outside of MapReduce just as a batch process on the cluster. Note that the emulator used in the MapReduce process is called newEmulator (it's a modified version of the emulator code used in the MPI process).
To run the emulator inside the map function in MR-MPI I used the map_file function. This is a special map function that is useful for distributing input files to map processes. Note that MR-MPI has a listing of python calls you can make, however the details of the calls are described better in the C++ interface documentation. I also had some issues when it came to the python documentation. I sometimes had to look at the ~/mrmpi-22Nov13/python/mrmpi.py code to figure out what parameters were needed when trying to call certain functions.
files = sys.argv[1]
mr = mrmpi()
mr.map_file([files],0,0,0,mapEmulator)
The above code shows the map function map_file which I used to distribute the input files. I would pass python the path to the downsample input files directoy and then MR-MPI would distribute those files to map processes. In the above code mapEmulator is my map function which I describe below.
def mapEmulator(itask,filename,mr):
# run the newEmulator with a downsample file
cmd = 'python emulator.py ' + filename + ' | ./newEmulator'
output = subprocess.check_output(cmd, shell=True)
# split output and add key/value pairs
output = output.strip().split('\n')
for line in output:
key = line.split(' ')[0]
mr.add(key,line)
In this map code I had to modify newEmulator.cpp to only print its output to stdout. Before, it would write its output to panasas however for this process I do not need to do that. The code emulator.py takes as input one downsample file and parses it to a specific output that newEmulator understands. The output from emulator.py is piped into newEmulator and the output of this is stored in the variable called output. I then split this output string and store the key/values.
Note that there is a load balance issue with this code and the emulator. Some of the input files to the emulator run faster than others, this still does not work very well in a MapReduce setting.
Also note that because the emulator takes so long to complete I have stored the output of a previous emulator run so that it does not need to be run again and again, the output files only need to be read inside the map function.
def mapEmulatorOutputFiles(itask, filename, mr):
# open emulator output file
fp = open(filename, 'r')
# add each line to the map
for line in fp:
key = line.strip().split(' ')[0]
mr.add(key, line.strip())
In the above code, I pass in the saved emulator output files /panasas/scratch/kmarcus2/emulator/my_hadoop/emulator_output and then those files are just read and stored as key/value pairs.
There are actually two reduce procedures in Rohit's code. First, you must collate the keys and then call the first reduce function.
mr.collate()
mr.reduce(myreduce3)
The first reduce is what Rohit calls reduce3. This takes the newEmulator output as input.
def myreduce3(key,mvalue,mr):
out = reduce3.main(mvalue)
if out != None:
for line in out:
key = '*' + line.split(' ')[0]
mr.add(key,line)
Note that I was not able to find a way to remove key/value pairs, so to differentiable the key/value pairs from the previous insert I add the '*' character to they beginning of the key. This will help me to find the correct keys in the next reduce function.
After this first reduce finishes, I collate they keys again and then call the next reduce function which Rohit called reduce5.
mr.collate()
mr.reduce(myreduce5)
This final reduce will look for all the keys beginning with '*' and then run the reduce operation on them and finally output the phmid and mean.
def myreduce5(key,mvalue,mr):
if key[0] == '*':
fp = open('/scratch/'+str(key), 'w')
out = reduce5.main(mvalue)
if out != None:
for value in out:
fp.write(value + '\n')
fp.close()
After this has finished I close down MR-MPI
pypar.barrier()
mr.destroy()
pypar.finalize()
If you want to test out MR-MPI you can run it on the Rush head node. The following code would run foo.py as a 8 core MPI process. I will usually do this when I'm testing out MR-MPI.
$ module load intel-mpi
$ module load mrmpi-python
$ mpirun -np 8 python foo.py
However, when you want to run this code on all 2048 downsample files, you will need to run it on the cluster. To do this I have usually been requesting 512 cores and then running python through srun.
cd /scratch
srun python emulator-mrmpi.py /panasas/scratch/kmarcus2/emulator/my_hadoop/down_sample_files_mod
Notice how I first cd into /scratch. This is very crucial because MR-MPI will create temporary key/value pair files when the internal buffer gets too large. Before, I was running MR-MPI with the cwd inside panasas and all the temporary files were being crated on panasas which turned out to be a very large bottle neck. MR-MPI uses the cwd as the place where it will create the temporary files so by changing into /scratch it will use the local scratch directory which will be a much faster process for read/write. Take a look at the MR-MPI settings page to find more detail about this process and what MR-MPI is doing in the background.
Also note that I write the final reduce output to the local node /scratch directory. When the MR-MPI process has finished you must then copy these files to panasas or else they will be lost when your job finishes. This is done inside the slurm script.