Skip to content

Commit

Permalink
Merge pull request #663 from legacysurvey/mpi4py
Browse files Browse the repository at this point in the history
Add mpi4py support, and WISE checkpointing
  • Loading branch information
dstndstn committed Nov 27, 2020
2 parents 588a64d + 60ce7b1 commit 39026c4
Show file tree
Hide file tree
Showing 5 changed files with 471 additions and 24 deletions.
115 changes: 115 additions & 0 deletions bin/runbrick-mpi.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
#! /bin/bash

# ####
# #SBATCH --qos=premium
# #SBATCH --nodes=3
# #SBATCH --ntasks-per-node=32
# #SBATCH --cpus-per-task=2
# #SBATCH --time=48:00:00
# #SBATCH --licenses=SCRATCH
# #SBATCH -C haswell
# nmpi=96

# Half-subscribed, 96 tasks
##SBATCH --qos=premium
##SBATCH --time=48:00:00
#SBATCH -p debug
#SBATCH --time=30:00
#SBATCH --nodes=3
#SBATCH --ntasks-per-node=16
#SBATCH --cpus-per-task=4
#SBATCH --licenses=SCRATCH
#SBATCH -C haswell
#SBATCH --job-name 0744m640
#SBATCH --image=docker:legacysurvey/legacypipe:mpi
#SBATCH --module=mpich-cle6
nmpi=48
brick=0744m640

#nmpi=4
#brick=0309p335

#brick=$1

# This seem to be the default at NERSC?
# module load cray-mpich

#export PYTHONPATH=$(pwd):${PYTHONPATH}

outdir=/global/cscratch1/sd/dstn/dr9m-mpi

BLOB_MASK_DIR=/global/cfs/cdirs/cosmo/work/legacysurvey/dr8/south

export LEGACY_SURVEY_DIR=/global/cfs/cdirs/cosmo/work/legacysurvey/dr9m

export DUST_DIR=/global/cfs/cdirs/cosmo/data/dust/v0_1
export UNWISE_COADDS_DIR=/global/cfs/cdirs/cosmo/work/wise/outputs/merge/neo6/fulldepth:/global/cfs/cdirs/cosmo/data/unwise/allwise/unwise-coadds/fulldepth
export UNWISE_COADDS_TIMERESOLVED_DIR=/global/cfs/cdirs/cosmo/work/wise/outputs/merge/neo6
export UNWISE_MODEL_SKY_DIR=/global/cfs/cdirs/cosmo/work/wise/unwise_catalog/dr3/mod
export GAIA_CAT_DIR=/global/cfs/cdirs/cosmo/work/gaia/chunks-gaia-dr2-astrom-2
export GAIA_CAT_VER=2
export TYCHO2_KD_DIR=/global/cfs/cdirs/cosmo/staging/tycho2
export LARGEGALAXIES_CAT=/global/cfs/cdirs/cosmo/staging/largegalaxies/v3.0/SGA-ellipse-v3.0.kd.fits
export PS1CAT_DIR=/global/cfs/cdirs/cosmo/work/ps1/cats/chunks-qz-star-v3
export SKY_TEMPLATE_DIR=/global/cfs/cdirs/cosmo/work/legacysurvey/sky-templates

# Don't add ~/.local/ to Python's sys.path
export PYTHONNOUSERSITE=1

# Force MKL single-threaded
# https://software.intel.com/en-us/articles/using-threaded-intel-mkl-in-multi-thread-application
export MKL_NUM_THREADS=1
export OMP_NUM_THREADS=1

# To avoid problems with MPI and Python multiprocessing
export MPICH_GNI_FORK_MODE=FULLCOPY
export KMP_AFFINITY=disabled

bri=$(echo $brick | head -c 3)
mkdir -p $outdir/logs/$bri
log="$outdir/logs/$bri/$brick.log"

mkdir -p $outdir/metrics/$bri

echo Logging to: $log
echo Running on $(hostname)

echo -e "\n\n\n" >> $log
echo "-----------------------------------------------------------------------------------------" >> $log
echo "PWD: $(pwd)" >> $log
echo >> $log
#echo "Environment:" >> $log
#set | grep -v PASS >> $log
#echo >> $log
ulimit -a >> $log
echo >> $log

echo -e "\nStarting on $(hostname)\n" >> $log
echo "-----------------------------------------------------------------------------------------" >> $log

# When I was trying mpi4py compiled with openmpi...
#mpirun -n $nmpi --map-by core --rank-by node \

# cray-mpich doesn't support this kind of --distribution
#srun -n $nmpi --distribution cyclic:cyclic

# Cray-mpich does round-robin placement of ranks on nodes with this setting -- good for memory load balancing.
export MPICH_RANK_REORDER_METHOD=0

srun -n $nmpi \
shifter \
python -u -O -m mpi4py.futures \
/src/legacypipe/py/legacypipe/mpi-runbrick.py \
--no-wise-ceres \
--run south \
--brick $brick \
--skip \
--skip-calibs \
--blob-mask-dir ${BLOB_MASK_DIR} \
--checkpoint ${outdir}/checkpoints/${bri}/checkpoint-${brick}.pickle \
--wise-checkpoint ${outdir}/checkpoints/${bri}/wise-${brick}.pickle \
--pickle "${outdir}/pickles/${bri}/runbrick-%(brick)s-%%(stage)s.pickle" \
--outdir $outdir \
>> $log 2>&1

# QDO_BATCH_PROFILE=cori-shifter qdo launch -v tst 1 --cores_per_worker 8 --walltime=30:00 --batchqueue=debug --keep_env --batchopts "--image=docker:dstndstn/legacypipe:intel" --script "/src/legacypipe/bin/runbrick-shifter.sh"
23 changes: 12 additions & 11 deletions docker-nersc/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -71,12 +71,11 @@ RUN wget -nv https://www.mpich.org/static/downloads/3.3/mpich-3.3.tar.gz \

# mpi4py / mpicc can't handle -ax=knl
ENV CFLAGS -O3 -g -fPIC -std=gnu99 -pthread
RUN wget -nv https://bitbucket.org/mpi4py/mpi4py/downloads/mpi4py-3.0.3.tar.gz \
&& tar xvzf mpi4py-3.0.3.tar.gz \
&& cd mpi4py-3.0.3 \
RUN git clone https://github.com/mpi4py/mpi4py.git \
&& (cd mpi4py \
&& python setup.py build \
&& python setup.py install \
&& cd .. && rm -Rf mpi4py-3.0.3 mpi4py-3.0.3.tar.gz
&& python setup.py install) \
&& rm -Rf mpi4py
ENV CFLAGS -O3 -g -fPIC -std=gnu99 -pthread -x=haswell -ax=knl

RUN git clone https://github.com/astromatic/sextractor.git sourceextractor \
Expand Down Expand Up @@ -214,7 +213,9 @@ RUN echo "export PS1='[container] \\u@\\h:\\w$ '" >> $HOME/.bashrc \
&& python -c "from astropy.coordinates import EarthLocation; EarthLocation._get_site_registry(force_download=True)" \
&& python -c "from astropy.coordinates import EarthLocation, SkyCoord, AltAz; from astropy.time import Time; print(EarthLocation.of_site('ctio')); print(SkyCoord(180.,-45.,unit='deg').transform_to(AltAz(obstime=Time(56806.0, format='mjd'), location=EarthLocation.of_site('ctio'))))" \
# Download astropy IERS leap-second list
&& python -c "from astropy.time import Time; Time.now()"
&& python -c "from astropy.time import Time; Time.now()" \
# Make config files readable!?!!
&& chmod -R a+rwX $HOME/.astropy

# Curses upon you, astropy
RUN wget -O /opt/conda/lib/python3.7/site-packages/astropy/utils/iers/data/Leap_Second.dat \
Expand All @@ -235,11 +236,11 @@ RUN python -O -m compileall \
/src/legacypipe/py/{legacypipe,legacyzpts}

# update legacypipe
RUN cd /src/legacypipe && git pull && git checkout DR9.6.7 && git describe && echo 967 && \
RUN cd /src/legacypipe && git pull && git checkout mpi4py && git describe && \
python -m compileall /src/legacypipe/py/{legacypipe,legacyzpts} && \
python -O -m compileall /src/legacypipe/py/{legacypipe,legacyzpts}

# QDO
RUN cd qdo && git remote add dstn https://bitbucket.org/dstn/qdo.git \
&& git fetch dstn && git checkout handle-oom \
&& pip install . && echo 3 #python3 setup.py install
# # QDO
# RUN cd qdo && git remote add dstn https://bitbucket.org/dstn/qdo.git \
# && git fetch dstn && git checkout handle-oom \
# && pip install . && echo 3 #python3 setup.py install
228 changes: 228 additions & 0 deletions py/legacypipe/mpi-runbrick.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,228 @@
import sys

def hello(i):
import socket
import os
import time
print('Hello', i, 'from', socket.gethostname(), 'pid', os.getpid())
time.sleep(2)
return i

def global_init(loglevel):
'''
Global initialization routine called by mpi4py when each worker is started.
'''
import socket
import os
from mpi4py import MPI
print('MPI4PY process starting on', socket.gethostname(), 'pid', os.getpid(),
'MPI rank', MPI.COMM_WORLD.Get_rank())

import logging
logging.basicConfig(level=loglevel, format='%(message)s', stream=sys.stdout)
# tractor logging is *soooo* chatty
logging.getLogger('tractor.engine').setLevel(loglevel + 10)

from astrometry.util.ttime import Time, MemMeas
Time.add_measurement(MemMeas)

# mpi4py to multiprocssing.Pool adapters:

# This is an adapter class that provides an iterator over
# imap_unordered results with a next(timeout) function, required
# for checkpointing.
from concurrent.futures import wait as cfwait, FIRST_COMPLETED
class result_iter(object):
def __init__(self, futures):
self.futures = futures
def next(self, timeout=None):
if len(self.futures) == 0:
raise StopIteration()
# Have any of the elements completed?
for f in self.futures:
if f.done():
self.futures.remove(f)
return f.result()
# Wait for the first one to complete, with timeout
done,_ = cfwait(self.futures, timeout=timeout,
return_when=FIRST_COMPLETED)
if len(done):
f = done.pop()
self.futures.remove(f)
return f.result()
raise TimeoutError()

# Wrapper over MPIPoolExecutor to make it look like a multiprocessing.Pool
# (actually, an astrometry.util.timingpool!)
class MyMPIPool(object):
def __init__(self, **kwargs):
from mpi4py.futures import MPIPoolExecutor
self.real = MPIPoolExecutor(**kwargs)
def map(self, func, args, chunksize=1):
return list(self.real.map(func, args, chunksize=chunksize))
def imap_unordered(self, func, args, chunksize=1):
return result_iter([self.real.submit(func, a) for a in args])
def bootup(self, **kwargs):
return self.real.bootup(**kwargs)
def shutdown(self, **kwargs):
return self.real.shutdown(**kwargs)

def close(self):
self.shutdown()
def join(self):
pass

def apply_async(self, *args, **kwargs):
raise RuntimeError('APPLY_ASYNC NOT IMPLEMENTED IN MyMPIPool')
def get_worker_cpu(self):
return 0.
def get_worker_wall(self):
return 0.
def get_pickle_traffic(self):
return None
def get_pickle_traffic_string(self):
return 'nope'

def main(args=None):
import os
import datetime
import logging
import numpy as np
from legacypipe.survey import get_git_version
from legacypipe.runbrick import (get_parser, get_runbrick_kwargs, run_brick,
NothingToDoError, RunbrickError)

print()
print('mpi-runbrick.py starting at', datetime.datetime.now().isoformat())
print('legacypipe git version:', get_git_version())
if args is None:
print('Command-line args:', sys.argv)
cmd = 'python'
for vv in sys.argv:
cmd += ' {}'.format(vv)
print(cmd)
else:
print('Args:', args)
print()

parser = get_parser()
opt = parser.parse_args(args=args)

if opt.brick is None and opt.radec is None:
parser.print_help()
return -1

optdict = vars(opt)
verbose = optdict.pop('verbose')

survey, kwargs = get_runbrick_kwargs(**optdict)
if kwargs in [-1, 0]:
return kwargs
kwargs.update(command_line=' '.join(sys.argv))

if verbose == 0:
lvl = logging.INFO
else:
lvl = logging.DEBUG
global_init(lvl)

# matplotlib config directory, when running in shifter...
for tag in ['CACHE', 'CONFIG']:
if not 'XDG_%s_HOME'%tag in os.environ:
src = os.path.expanduser('~/.%s' % (tag.lower()))
# Read-only?
if os.path.isdir(src) and not(os.access(src, os.W_OK)):
import shutil
import tempfile
tempdir = tempfile.mkdtemp(prefix='%s-' % tag.lower())
os.rmdir(tempdir)
shutil.copytree(src, tempdir, symlinks=True) #dirs_exist_ok=True)
# astropy config file: if XDG_CONFIG_HOME is set,
# expect to find $XDG_CONFIG_HOME/astropy, not
# ~/.astropy.
if tag == 'CONFIG':
shutil.copytree(os.path.expanduser('~/.astropy'),
os.path.join(tempdir, 'astropy'),
symlinks=True) #dirs_exist_ok=True)
os.environ['XDG_%s_HOME' % tag] = tempdir

if opt.plots:
import matplotlib
matplotlib.use('Agg')
import pylab as plt
plt.figure(figsize=(12,9))
plt.subplots_adjust(left=0.07, right=0.99, bottom=0.07, top=0.93,
hspace=0.2, wspace=0.05)

# The "initializer" arg is only available in mpi4py master
pool = MyMPIPool(initializer=global_init, initargs=(lvl,))

u = int(os.environ.get('OMPI_UNIVERSE_SIZE', '0'))
if u == 0:
u = int(os.environ.get('MPICH_UNIVERSE_SIZE', '0'))
if u == 0:
from mpi4py import MPI
u = MPI.COMM_WORLD.Get_size()

print('Booting up MPI pool with', u, 'workers...')
pool.bootup()
print('Booted up MPI pool.')
pool._processes = u
kwargs.update(pool=pool)

#pool.map(hello, np.arange(128))

rtn = -1
try:
run_brick(opt.brick, survey, **kwargs)
rtn = 0
except NothingToDoError as e:
print()
if hasattr(e, 'message'):
print(e.message)
else:
print(e)
print()
rtn = 0
except RunbrickError as e:
print()
if hasattr(e, 'message'):
print(e.message)
else:
print(e)
print()
rtn = -1

print('Shutting down MPI pool...')
pool.shutdown()
print('Shut down MPI pool')

return rtn

if __name__ == '__main__':
from astrometry.util.ttime import Time,MemMeas
Time.add_measurement(MemMeas)
sys.exit(main())


# salloc -N 2 -C haswell -q interactive -t 04:00:00 --ntasks-per-node=32 --cpus-per-task=2
# module unload cray-mpich
# module load openmpi
# mpirun -n 64 --map-by core --rank-by node python -m mpi4py.futures legacypipe/mpi-runbrick.py --no-wise-ceres --brick 0715m657 --zoom 100 300 100 300 --run south --outdir $CSCRATCH/mpi --stage wise_forced


# cray-mpich version:
# srun -n 64 --distribution cyclic:cyclic python -m mpi4py.futures legacypipe/mpi-runbrick.py --no-wise-ceres --brick 0715m657 --zoom 100 300 100 300 --run south --outdir $CSCRATCH/mpi --stage wise_forced
#
#
# mpi4py setup:
# cray-mpich module, PrgEnv-intel/6.0.5
#
# mpi.cfg:
# [mpi]
# # $CRAY_MPICH2_DIR
# mpi_dir = /opt/cray/pe/mpt/7.7.10/gni/mpich-intel/16.0
# include_dirs = %(mpi_dir)s/include
# libraries = mpich
# library_dirs = %(mpi_dir)s/lib
# runtime_library_dirs = %(mpi_dir)s/lib
Loading

0 comments on commit 39026c4

Please sign in to comment.