# GRT Parallel Processing

In this notebook we will test the parallel processing for GRT

The following two lines are very important for getting the code working as expected in a multicore machine.  Since numpy uses OpenMP to compute matricial operations, the program normally spawns child processes to perform those operations.  As a result if you use multiprocessing (mp), besides the processes retrieved by mp, it will start 4 NP processes.  To avoid this, you just need to set this variable.

In [1]:
import os
os.environ["OMP_NUM_THREADS"] = "1"

In [2]:
from gravray import *
from gravray.util import *
from gravray.sampling import *
from gravray.spice import *
from gravray.orbit import *
from gravray.stats import *

import gc
from tqdm import tqdm
import pandas as pd
import multiprocessing as mp
from itertools import product as cartesian

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

The autoreload extension is already loaded. To reload it, use:
  %reload_ext autoreload


<IPython.core.display.Javascript object>

The autoreload extension is already loaded. To reload it, use:
  %reload_ext autoreload


<IPython.core.display.Javascript object>

The autoreload extension is already loaded. To reload it, use:
  %reload_ext autoreload


<IPython.core.display.Javascript object>

The autoreload extension is already loaded. To reload it, use:
  %reload_ext autoreload


In [3]:
%load_ext autoreload
%autoreload 2

The autoreload extension is already loaded. To reload it, use:
  %reload_ext autoreload


## Initialize

In [4]:
Spice.loadKernels()
NP=mp.cpu_count()-1
print("Available number of processors: ",NP)

Available number of processors:  3


## Common data

In [None]:
bodyid="EARTH"
body=Body(bodyid)

## Parallel function

In [None]:
NRAYS=0
FREQ=10000
def rayProcessing(initial,nw=0):
    global NRAYS,FREQ
    t=initial[0]
    site=initial[1]
    direction=initial[2]
    ray=GrtRay(site,direction[0][0],direction[0][1],direction[1])
    ray.updateRay(t)
    try:
        ray.propagateRay(t)
        detJ=ray.calcJacobianDeterminant()
    except AssertionError as e:
        detJ=0
    raydf=ray.packRay()
    raydf["detJ"]=detJ
    del ray
    NRAYS+=1
    if (NRAYS%FREQ)==1:
        unreachable=gc.collect()
        print(f"Completed rays by worker {nw}: {NRAYS} (unreachable {unreachable})")
        elTime(start=True)
    return raydf
    
def rayProcessingMulti(initials,nw=0):
    print(f"Processing {len(initials)} initial conditions in worker {nw}")
    raydfs=[rayProcessing(initial,nw=nw) for initial in initials]
    return raydfs

allrays=pd.DataFrame()
def joinResults(raydfs):
    global allrays
    allrays=pd.concat((allrays,)+tuple(raydfs))
    del raydfs
    gc.collect()

## Test data

In [None]:
ts=[Spice.str2t("02/15/2013 03:20:34 UTC")]
siteprops=[[61.1*Angle.Deg,54.8*Angle.Deg,23.3*Const.km]]
sites=[]
for siteprop in siteprops:
    sites+=[Location(body,siteprop[0],siteprop[1],siteprop[2])]
directions=[[[101.1*Angle.Deg,15.9*Angle.Deg],-18.6*Const.km/Const.s]]

#List of conditions
initials=list(cartesian(*[ts,sites,directions]))
r=rayProcessingMulti(initials)
r

In [None]:
print(r[0][['et','ximp', 'yimp', 'zimp','vximp', 'vyimp', 'vzimp']].values*np.array([1]+[1/1e3]*6))

In [None]:
print(r[0][['q','e', 'i', 'W','w', 'M']])

## Massive input data

In [None]:
#Numbers
Ntimes=Nsites=Npoints=Nvels=3
"""
Ntimes=365
Nsites=100
Npoints=100
Nvels=50
#"""

#Times
print("Preparing times...")
tini=Spice.str2t("02/15/2013 03:20:34")
tend=tini+Const.Year
ts=np.linspace(tini,tend,10)

#Sites
print("Preparing sites...")
elTime(0)
H=23.3*Const.km
points=Sample(Nsites)
points.genUnitSphere()
siteprops=np.zeros((Nsites,3))
siteprops[:,:2]=points.pp[:,1:]
siteprops[:,2]=H*np.ones(Nsites)
sites=[]
for siteprop in siteprops:
    sites+=[Location(body,siteprop[0],siteprop[1],siteprop[2])]
elTime()

#Directions
print("Preparing directions...")
elTime(0)
gpoints=Sample(Npoints)
gpoints.genUnitHemisphere()
speeds=-np.linspace(11.2,72.0,Nvels)*Const.km/Const.s
directions=list(cartesian(*[gpoints.pp[:,1:].tolist(),speeds]))
elTime()

#Initial conditions
print("Preparing initial conditions...")
elTime(0)
initials=list(cartesian(*[ts,sites,directions]))
elTime()

Ninitials=len(initials)
print(f"Number of initial conditions: {Ninitials} = {len(ts)}(ts)*{len(sites)}(sites)*{len(directions)}(dirs)")

## Chunking and computing time estimations

In [None]:
#Computing time estimations
print(f"Sequential processing of {Ninitials} rays:")
dt,dtu=elTime(0)
tinitials=initials[:10]
rays=rayProcessingMulti(tinitials)
dt,dtu=elTime()
tpray=dt/len(tinitials)
tupray=tUnit(tpray)
totrays=tpray*Ninitials
toturays=tUnit(totrays)
print(f"Total duration: {dtu[0]} {dtu[1]}, Duration per ray: {tupray[0]} {tupray[1]}")

#Chunks
npchunk=np.int(np.ceil(Ninitials/NP))
cinitials=[initial for initial in Util.chunkList(initials,npchunk)]
Nchunks=len(cinitials)
FREQ=np.int(npchunk/10)
print(f"{Nchunks} chunks containing {npchunk} initial conditions (showed with frequency {FREQ})")
tchunk=tpray*npchunk
tchunku=tUnit(tchunk)
print()
print(f"Estimated total: {toturays[0]} {toturays[1]}")

print(f"Estimated time per chunk (estimated parallel): {tchunku[0]} {tchunku[1]}")

## Parallel processing

In [None]:
allrays=pd.DataFrame()
pool=mp.Pool(processes=NP)
elTime(0)
[pool.apply_async(rayProcessingMulti,args=(inis,nw),callback=joinResults) for nw,inis in enumerate(cinitials)]
pool.close()
pool.join()
elTime()

In [None]:
print("Number of results:",len(allrays))

In [None]:
allrays.to_csv("rays_parallel.csv",index=False,float_format='%.17e')

--End--

## Postprocessing

In [5]:
data=pd.read_csv("../tmp/rays_parallel.csv.gz",compression='gzip')
del data['Unnamed: 0']
print(f"Read {len(data)} conditions...")

Read 730000 conditions...


In [6]:
#data.to_csv("../tmp/rays_parallel.csv.gz",compression='gzip',index=False,float_format='%.17e')
#np.savetxt("../tmp/rays_parallel.dat",data.values,fmt="%.17e")

In [7]:
test=data.head(1)

In [8]:
print(test[['et','ximp', 'yimp', 'zimp','vximp', 'vyimp', 'vzimp']].values*np.array([1]+[1/1e3]*6))

[[ 4.24660448e+08 -1.23164974e+07 -1.51860159e+08 -9.61439193e+02
   2.64288343e+01  6.65402372e+00 -5.82682421e+01]]


In [9]:
print(test[['q','e', 'i', 'W','w', 'M']])

          q         e         i          W           w          M
0  1.007315  3.561901  65.96306  85.363151  189.602713 -19.765144


In [12]:
dt=elTime(0)
f=open("../tmp/gravray-include.sh","w")
empty=""
for i in range(10):empty+=f"0\t"
nrays=len(data)
for i,index in enumerate(tqdm(data.index)):
    ray=data.loc[index]
    et=ray["et"]
    ximp=""
    for x in ray[['ximp','yimp','zimp','vximp','vyimp','vzimp']].values*np.array([1/1e3]*6):
        ximp+=f"{x:.17e} "
    f.write(f"echo 'ray {i}/{nrays}...';")
    f.write(f"rm ray$1.dat &> /dev/null;")
    f.write(f"./wherewillitbe.exe {ray['et']:.17e} {ximp} -1.0 2 \"$1\" &> /dev/null;")
    f.write(f'if [ $? -eq 0 -a "x$(cat ray$1.dat)" != "x" ];then tail -n 1 ray$1.dat >> orbits$1.dat;')
    f.write(f"else echo '{et:+.17e} {ximp} {empty}' >> orbits$1.dat;")
    f.write(f"fi"+"\n")
f.close()
dt=elTime(1)

100%|██████████| 730000/730000 [15:29<00:00, 785.52it/s]  


Elapsed time since last call: 15.4892 min
