Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add mpi4py support, and WISE checkpointing #663

Merged
merged 18 commits into from
Nov 27, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
115 changes: 115 additions & 0 deletions bin/runbrick-mpi.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
#! /bin/bash

# ####
# #SBATCH --qos=premium
# #SBATCH --nodes=3
# #SBATCH --ntasks-per-node=32
# #SBATCH --cpus-per-task=2
# #SBATCH --time=48:00:00
# #SBATCH --licenses=SCRATCH
# #SBATCH -C haswell
# nmpi=96

# Half-subscribed, 96 tasks
##SBATCH --qos=premium
##SBATCH --time=48:00:00
#SBATCH -p debug
#SBATCH --time=30:00
#SBATCH --nodes=3
#SBATCH --ntasks-per-node=16
#SBATCH --cpus-per-task=4
#SBATCH --licenses=SCRATCH
#SBATCH -C haswell
#SBATCH --job-name 0744m640
#SBATCH --image=docker:legacysurvey/legacypipe:mpi
#SBATCH --module=mpich-cle6
nmpi=48
brick=0744m640

#nmpi=4
#brick=0309p335

#brick=$1

# This seem to be the default at NERSC?
# module load cray-mpich

#export PYTHONPATH=$(pwd):${PYTHONPATH}

outdir=/global/cscratch1/sd/dstn/dr9m-mpi

BLOB_MASK_DIR=/global/cfs/cdirs/cosmo/work/legacysurvey/dr8/south

export LEGACY_SURVEY_DIR=/global/cfs/cdirs/cosmo/work/legacysurvey/dr9m

export DUST_DIR=/global/cfs/cdirs/cosmo/data/dust/v0_1
export UNWISE_COADDS_DIR=/global/cfs/cdirs/cosmo/work/wise/outputs/merge/neo6/fulldepth:/global/cfs/cdirs/cosmo/data/unwise/allwise/unwise-coadds/fulldepth
export UNWISE_COADDS_TIMERESOLVED_DIR=/global/cfs/cdirs/cosmo/work/wise/outputs/merge/neo6
export UNWISE_MODEL_SKY_DIR=/global/cfs/cdirs/cosmo/work/wise/unwise_catalog/dr3/mod
export GAIA_CAT_DIR=/global/cfs/cdirs/cosmo/work/gaia/chunks-gaia-dr2-astrom-2
export GAIA_CAT_VER=2
export TYCHO2_KD_DIR=/global/cfs/cdirs/cosmo/staging/tycho2
export LARGEGALAXIES_CAT=/global/cfs/cdirs/cosmo/staging/largegalaxies/v3.0/SGA-ellipse-v3.0.kd.fits
export PS1CAT_DIR=/global/cfs/cdirs/cosmo/work/ps1/cats/chunks-qz-star-v3
export SKY_TEMPLATE_DIR=/global/cfs/cdirs/cosmo/work/legacysurvey/sky-templates

# Don't add ~/.local/ to Python's sys.path
export PYTHONNOUSERSITE=1

# Force MKL single-threaded
# https://software.intel.com/en-us/articles/using-threaded-intel-mkl-in-multi-thread-application
export MKL_NUM_THREADS=1
export OMP_NUM_THREADS=1

# To avoid problems with MPI and Python multiprocessing
export MPICH_GNI_FORK_MODE=FULLCOPY
export KMP_AFFINITY=disabled

bri=$(echo $brick | head -c 3)
mkdir -p $outdir/logs/$bri
log="$outdir/logs/$bri/$brick.log"

mkdir -p $outdir/metrics/$bri

echo Logging to: $log
echo Running on $(hostname)

echo -e "\n\n\n" >> $log
echo "-----------------------------------------------------------------------------------------" >> $log
echo "PWD: $(pwd)" >> $log
echo >> $log
#echo "Environment:" >> $log
#set | grep -v PASS >> $log
#echo >> $log
ulimit -a >> $log
echo >> $log

echo -e "\nStarting on $(hostname)\n" >> $log
echo "-----------------------------------------------------------------------------------------" >> $log

# When I was trying mpi4py compiled with openmpi...
#mpirun -n $nmpi --map-by core --rank-by node \

# cray-mpich doesn't support this kind of --distribution
#srun -n $nmpi --distribution cyclic:cyclic

# Cray-mpich does round-robin placement of ranks on nodes with this setting -- good for memory load balancing.
export MPICH_RANK_REORDER_METHOD=0

srun -n $nmpi \
shifter \
python -u -O -m mpi4py.futures \
/src/legacypipe/py/legacypipe/mpi-runbrick.py \
--no-wise-ceres \
--run south \
--brick $brick \
--skip \
--skip-calibs \
--blob-mask-dir ${BLOB_MASK_DIR} \
--checkpoint ${outdir}/checkpoints/${bri}/checkpoint-${brick}.pickle \
--wise-checkpoint ${outdir}/checkpoints/${bri}/wise-${brick}.pickle \
--pickle "${outdir}/pickles/${bri}/runbrick-%(brick)s-%%(stage)s.pickle" \
--outdir $outdir \
>> $log 2>&1

# QDO_BATCH_PROFILE=cori-shifter qdo launch -v tst 1 --cores_per_worker 8 --walltime=30:00 --batchqueue=debug --keep_env --batchopts "--image=docker:dstndstn/legacypipe:intel" --script "/src/legacypipe/bin/runbrick-shifter.sh"
23 changes: 12 additions & 11 deletions docker-nersc/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -71,12 +71,11 @@ RUN wget -nv https://www.mpich.org/static/downloads/3.3/mpich-3.3.tar.gz \

# mpi4py / mpicc can't handle -ax=knl
ENV CFLAGS -O3 -g -fPIC -std=gnu99 -pthread
RUN wget -nv https://bitbucket.org/mpi4py/mpi4py/downloads/mpi4py-3.0.3.tar.gz \
&& tar xvzf mpi4py-3.0.3.tar.gz \
&& cd mpi4py-3.0.3 \
RUN git clone https://github.com/mpi4py/mpi4py.git \
&& (cd mpi4py \
&& python setup.py build \
&& python setup.py install \
&& cd .. && rm -Rf mpi4py-3.0.3 mpi4py-3.0.3.tar.gz
&& python setup.py install) \
&& rm -Rf mpi4py
ENV CFLAGS -O3 -g -fPIC -std=gnu99 -pthread -x=haswell -ax=knl

RUN git clone https://github.com/astromatic/sextractor.git sourceextractor \
Expand Down Expand Up @@ -214,7 +213,9 @@ RUN echo "export PS1='[container] \\u@\\h:\\w$ '" >> $HOME/.bashrc \
&& python -c "from astropy.coordinates import EarthLocation; EarthLocation._get_site_registry(force_download=True)" \
&& python -c "from astropy.coordinates import EarthLocation, SkyCoord, AltAz; from astropy.time import Time; print(EarthLocation.of_site('ctio')); print(SkyCoord(180.,-45.,unit='deg').transform_to(AltAz(obstime=Time(56806.0, format='mjd'), location=EarthLocation.of_site('ctio'))))" \
# Download astropy IERS leap-second list
&& python -c "from astropy.time import Time; Time.now()"
&& python -c "from astropy.time import Time; Time.now()" \
# Make config files readable!?!!
&& chmod -R a+rwX $HOME/.astropy

# Curses upon you, astropy
RUN wget -O /opt/conda/lib/python3.7/site-packages/astropy/utils/iers/data/Leap_Second.dat \
Expand All @@ -235,11 +236,11 @@ RUN python -O -m compileall \
/src/legacypipe/py/{legacypipe,legacyzpts}

# update legacypipe
RUN cd /src/legacypipe && git pull && git checkout DR9.6.7 && git describe && echo 967 && \
RUN cd /src/legacypipe && git pull && git checkout mpi4py && git describe && \
python -m compileall /src/legacypipe/py/{legacypipe,legacyzpts} && \
python -O -m compileall /src/legacypipe/py/{legacypipe,legacyzpts}

# QDO
RUN cd qdo && git remote add dstn https://bitbucket.org/dstn/qdo.git \
&& git fetch dstn && git checkout handle-oom \
&& pip install . && echo 3 #python3 setup.py install
# # QDO
# RUN cd qdo && git remote add dstn https://bitbucket.org/dstn/qdo.git \
# && git fetch dstn && git checkout handle-oom \
# && pip install . && echo 3 #python3 setup.py install
228 changes: 228 additions & 0 deletions py/legacypipe/mpi-runbrick.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,228 @@
import sys

def hello(i):
import socket
import os
import time
print('Hello', i, 'from', socket.gethostname(), 'pid', os.getpid())
time.sleep(2)
return i

def global_init(loglevel):
'''
Global initialization routine called by mpi4py when each worker is started.
'''
import socket
import os
from mpi4py import MPI
print('MPI4PY process starting on', socket.gethostname(), 'pid', os.getpid(),
'MPI rank', MPI.COMM_WORLD.Get_rank())

import logging
logging.basicConfig(level=loglevel, format='%(message)s', stream=sys.stdout)
# tractor logging is *soooo* chatty
logging.getLogger('tractor.engine').setLevel(loglevel + 10)

from astrometry.util.ttime import Time, MemMeas
Time.add_measurement(MemMeas)

# mpi4py to multiprocssing.Pool adapters:

# This is an adapter class that provides an iterator over
# imap_unordered results with a next(timeout) function, required
# for checkpointing.
from concurrent.futures import wait as cfwait, FIRST_COMPLETED
class result_iter(object):
def __init__(self, futures):
self.futures = futures
def next(self, timeout=None):
if len(self.futures) == 0:
raise StopIteration()
# Have any of the elements completed?
for f in self.futures:
if f.done():
self.futures.remove(f)
return f.result()
# Wait for the first one to complete, with timeout
done,_ = cfwait(self.futures, timeout=timeout,
return_when=FIRST_COMPLETED)
if len(done):
f = done.pop()
self.futures.remove(f)
return f.result()
raise TimeoutError()

# Wrapper over MPIPoolExecutor to make it look like a multiprocessing.Pool
# (actually, an astrometry.util.timingpool!)
class MyMPIPool(object):
def __init__(self, **kwargs):
from mpi4py.futures import MPIPoolExecutor
self.real = MPIPoolExecutor(**kwargs)
def map(self, func, args, chunksize=1):
return list(self.real.map(func, args, chunksize=chunksize))
def imap_unordered(self, func, args, chunksize=1):
return result_iter([self.real.submit(func, a) for a in args])
def bootup(self, **kwargs):
return self.real.bootup(**kwargs)
def shutdown(self, **kwargs):
return self.real.shutdown(**kwargs)

def close(self):
self.shutdown()
def join(self):
pass

def apply_async(self, *args, **kwargs):
raise RuntimeError('APPLY_ASYNC NOT IMPLEMENTED IN MyMPIPool')
def get_worker_cpu(self):
return 0.
def get_worker_wall(self):
return 0.
def get_pickle_traffic(self):
return None
def get_pickle_traffic_string(self):
return 'nope'

def main(args=None):
import os
import datetime
import logging
import numpy as np
from legacypipe.survey import get_git_version
from legacypipe.runbrick import (get_parser, get_runbrick_kwargs, run_brick,
NothingToDoError, RunbrickError)

print()
print('mpi-runbrick.py starting at', datetime.datetime.now().isoformat())
print('legacypipe git version:', get_git_version())
if args is None:
print('Command-line args:', sys.argv)
cmd = 'python'
for vv in sys.argv:
cmd += ' {}'.format(vv)
print(cmd)
else:
print('Args:', args)
print()

parser = get_parser()
opt = parser.parse_args(args=args)

if opt.brick is None and opt.radec is None:
parser.print_help()
return -1

optdict = vars(opt)
verbose = optdict.pop('verbose')

survey, kwargs = get_runbrick_kwargs(**optdict)
if kwargs in [-1, 0]:
return kwargs
kwargs.update(command_line=' '.join(sys.argv))

if verbose == 0:
lvl = logging.INFO
else:
lvl = logging.DEBUG
global_init(lvl)

# matplotlib config directory, when running in shifter...
for tag in ['CACHE', 'CONFIG']:
if not 'XDG_%s_HOME'%tag in os.environ:
src = os.path.expanduser('~/.%s' % (tag.lower()))
# Read-only?
if os.path.isdir(src) and not(os.access(src, os.W_OK)):
import shutil
import tempfile
tempdir = tempfile.mkdtemp(prefix='%s-' % tag.lower())
os.rmdir(tempdir)
shutil.copytree(src, tempdir, symlinks=True) #dirs_exist_ok=True)
# astropy config file: if XDG_CONFIG_HOME is set,
# expect to find $XDG_CONFIG_HOME/astropy, not
# ~/.astropy.
if tag == 'CONFIG':
shutil.copytree(os.path.expanduser('~/.astropy'),
os.path.join(tempdir, 'astropy'),
symlinks=True) #dirs_exist_ok=True)
os.environ['XDG_%s_HOME' % tag] = tempdir

if opt.plots:
import matplotlib
matplotlib.use('Agg')
import pylab as plt
plt.figure(figsize=(12,9))
plt.subplots_adjust(left=0.07, right=0.99, bottom=0.07, top=0.93,
hspace=0.2, wspace=0.05)

# The "initializer" arg is only available in mpi4py master
pool = MyMPIPool(initializer=global_init, initargs=(lvl,))

u = int(os.environ.get('OMPI_UNIVERSE_SIZE', '0'))
if u == 0:
u = int(os.environ.get('MPICH_UNIVERSE_SIZE', '0'))
if u == 0:
from mpi4py import MPI
u = MPI.COMM_WORLD.Get_size()

print('Booting up MPI pool with', u, 'workers...')
pool.bootup()
print('Booted up MPI pool.')
pool._processes = u
kwargs.update(pool=pool)

#pool.map(hello, np.arange(128))

rtn = -1
try:
run_brick(opt.brick, survey, **kwargs)
rtn = 0
except NothingToDoError as e:
print()
if hasattr(e, 'message'):
print(e.message)
else:
print(e)
print()
rtn = 0
except RunbrickError as e:
print()
if hasattr(e, 'message'):
print(e.message)
else:
print(e)
print()
rtn = -1

print('Shutting down MPI pool...')
pool.shutdown()
print('Shut down MPI pool')

return rtn

if __name__ == '__main__':
from astrometry.util.ttime import Time,MemMeas
Time.add_measurement(MemMeas)
sys.exit(main())


# salloc -N 2 -C haswell -q interactive -t 04:00:00 --ntasks-per-node=32 --cpus-per-task=2
# module unload cray-mpich
# module load openmpi
# mpirun -n 64 --map-by core --rank-by node python -m mpi4py.futures legacypipe/mpi-runbrick.py --no-wise-ceres --brick 0715m657 --zoom 100 300 100 300 --run south --outdir $CSCRATCH/mpi --stage wise_forced


# cray-mpich version:
# srun -n 64 --distribution cyclic:cyclic python -m mpi4py.futures legacypipe/mpi-runbrick.py --no-wise-ceres --brick 0715m657 --zoom 100 300 100 300 --run south --outdir $CSCRATCH/mpi --stage wise_forced
#
#
# mpi4py setup:
# cray-mpich module, PrgEnv-intel/6.0.5
#
# mpi.cfg:
# [mpi]
# # $CRAY_MPICH2_DIR
# mpi_dir = /opt/cray/pe/mpt/7.7.10/gni/mpich-intel/16.0
# include_dirs = %(mpi_dir)s/include
# libraries = mpich
# library_dirs = %(mpi_dir)s/lib
# runtime_library_dirs = %(mpi_dir)s/lib
Loading