# Parallelized FindOrb, Prototyping Notebook

In [1]:
import pandas as pd

## Utils to read/write PSV ADES

In [2]:
def read_psv_ades(fn):
    """Read PSV ADES file
    
    Args:
        fn (str): Filename
        
    Returns:
        tuple: loaded dataframe, header
    """
    header = []
    with open(fn) as fp:
        # consume and store the header
        while True:
            line = fp.readline()
            if line[0] in ['#', '!']:
                header.append(line.rstrip())
                continue
            # the line is the header
            names = [ s.strip() for s in line.split('|') ]
            break
        df = pd.read_csv(fp, sep='|', header=0, names=names)
    return df, header

## df, hdr = read_psv_ades(fn)
## del df["rmsMag"] # workaround for FindOrb bug
## hdr
## df

In [3]:
def write_psv_ades(fn, df, header):
    """Write PSV ADES file
    
    Args:
        fn (str): Output filename
        df (pd.DataFrame): Dataframe of observations
        header (list): ADES file header (list of lines)
        
    """
    with open(fn, "w") as fp:
        fp.write('\n'.join(header))
        fp.write('\n')
        df.to_csv(fp, sep='|', index=False)

## t1 = df[df["trkSub"] == 't0000000']
## write_psv_ades("foo.psv", t1, hdr)

In [4]:
df, hdr = read_psv_ades("2022-10-18T09:06:40.464Z_discoveries_01_LSST_TEST.psv")
del df["rmsMag"] # workaround for FindOrb bug
df

Unnamed: 0,trkSub,obsTime,ra,dec,rmsRA,rmsDec,mag,band,stn,mode,astCat
0,t0000000,2022-10-04T04:31:35.818Z,340.215384,-31.237961,0.024390,0.028525,21.200,y,I11,CCD,Gaia2
1,t0000000,2022-10-08T04:57:44.150Z,339.602568,-31.208910,0.030950,0.036187,21.247,y,I11,CCD,Gaia2
2,t0000000,2022-10-08T05:20:55.363Z,339.600181,-31.208704,0.031220,0.036502,21.253,z,I11,CCD,Gaia2
3,t0000000,2022-10-08T05:42:49.939Z,339.597960,-31.208498,0.039912,0.046665,21.253,z,I11,CCD,Gaia2
4,t0000000,2022-10-08T06:06:00.720Z,339.595550,-31.208269,0.049422,0.057784,21.248,y,I11,CCD,Gaia2
...,...,...,...,...,...,...,...,...,...,...,...
455044,t0030307,2022-10-17T01:27:39.254Z,341.016670,-13.647847,0.107355,0.110474,22.814,i,I11,CCD,Gaia2
455045,t0030307,2022-10-18T00:39:11.635Z,340.963762,-13.645669,0.030286,0.031165,22.952,r,I11,CCD,Gaia2
455046,t0030307,2022-10-18T00:39:50.256Z,340.963714,-13.645656,0.029608,0.030468,22.952,r,I11,CCD,Gaia2
455047,t0030307,2022-10-18T01:02:39.437Z,340.962809,-13.645616,0.050079,0.051533,23.427,g,I11,CCD,Gaia2


## Multiprocess-safe FindOrb Wrappers

In [8]:
def fit_orbit(df, hdr):
    """Fit orbit with FindOrb to a single track in a multi-processing safe manner
    
    Args:
        df (pd.DataFrame): The observations to fit
        hdr (str): ADES file header for the observations in `df`
        
    Returns:
        dict: A dictionary with the result of the run, of the form::
        
            {
                'name': track name,
                'state_vec': state vector
                'epoch': state vector epoch
                'covar': covariance matrix
                'findorb': { # run details, useful for debugging
                    'args': how findorb was invoked
                    'returncode': UNIX return code (zero for success)
                    'stdout': the contents of Find_Orb standard output
                    'stderr': the contents of Find_Orb standard error
                }
            }
    """
    # make sure there's only a single tracklet
    assert len(df) >= 1
    assert len(df["trkSub"].unique()) == 1
    trkSub = df["trkSub"].iloc[0]

    # create a temporary directory
    import tempfile, subprocess, json, os
    with tempfile.TemporaryDirectory() as tmpdir:
        print(f"tmpdir: {tmpdir}")

        # prep the new "home" directory
        from shutil import copytree, ignore_patterns
        copytree(
            os.path.expanduser("~/.find_orb"), f"{tmpdir}/.find_orb",
            ignore=ignore_patterns('debug.txt', 'elements.json', 'elem_short.json', 'linux_p1550p2650.430t')
        )
        os.symlink(os.path.expanduser("~/.find_orb/linux_p1550p2650.430t"), f"{tmpdir}/.find_orb/linux_p1550p2650.430t")

        # dump to ades
        datafile = f"{tmpdir}/data.psv"
        resultdir = f"{tmpdir}/result"
        write_psv_ades(datafile, df, hdr)

        # call findorb
        cmd = f"fo {datafile} -O {resultdir} -D /Users/mjuric/projects/elasticsky/environ.dat"
        env = os.environ.copy()
        env["HOME"] = tmpdir
        ret = subprocess.run(cmd, shell=True, env=env, check=False, capture_output=True)

        # fetch/construct the result
        if ret.returncode == 0:
            # read the result
            with open(f"{resultdir}/covar.json") as fp:
                result = json.load(fp)
        else:
            result = {}

        result["name"] = trkSub
        result["findorb"] = {
            'args': ret.args,
            'returncode': ret.returncode,
            'stdout': ret.stdout.decode('utf-8'),
            'stderr': ret.stderr.decode('utf-8')
        }

        return result

## t1 = df[df["trkSub"] == 't0000000']
## orbit = fit_orbit(t1, hdr)
## orbit

In [13]:
%%time
def fit_orbit_batch(df, hdr, tracks=None):
    """Fit a batch of tracklets with FindOrb in a multi-processing safe manner

    These are all still processed within a single thread (process).

    Args:
        df (pd.DataFrame): The observations to fit
        hdr (str): ADES file header for the observations in `df`
        tracks (list): A list of trkSubs to fit; if None, process all from `df`
        
    Returns:
        dict: A dictionary of (trkSub: result), where result is the output of `fit_orbit`
    """
    if tracks is None:
        tracks = df['trkSub'].unique()

    results = {}
    for trkSub in tracks:
        trk = df[df["trkSub"] == trkSub]
        results[trkSub] = fit_orbit(trk, hdr)
    return results

## orbits = fit_orbit_batch(df, hdr, df["trkSub"].unique()[:10])

tmpdir: /var/folders/0n/3cchn_wj5619fqz4dswygvj80000gn/T/tmplbb0p2el
tmpdir: /var/folders/0n/3cchn_wj5619fqz4dswygvj80000gn/T/tmp0iful63b
tmpdir: /var/folders/0n/3cchn_wj5619fqz4dswygvj80000gn/T/tmpqfhvp1ak
tmpdir: /var/folders/0n/3cchn_wj5619fqz4dswygvj80000gn/T/tmpguny3kdf
tmpdir: /var/folders/0n/3cchn_wj5619fqz4dswygvj80000gn/T/tmpxi6mluad
tmpdir: /var/folders/0n/3cchn_wj5619fqz4dswygvj80000gn/T/tmpy3pucq18
tmpdir: /var/folders/0n/3cchn_wj5619fqz4dswygvj80000gn/T/tmpwecyhfpr
tmpdir: /var/folders/0n/3cchn_wj5619fqz4dswygvj80000gn/T/tmpu83d497r
tmpdir: /var/folders/0n/3cchn_wj5619fqz4dswygvj80000gn/T/tmpaons6v7w
tmpdir: /var/folders/0n/3cchn_wj5619fqz4dswygvj80000gn/T/tmp0cyluxj0
CPU times: user 298 ms, sys: 201 ms, total: 499 ms
Wall time: 7.3 s


## Parallelization with Ray

In [16]:
import ray
ray.init()

2020-12-09 17:21:20,050	INFO services.py:1090 -- View the Ray dashboard at [1m[32mhttp://127.0.0.1:8265[39m[22m


{'node_ip_address': '10.0.1.10',
 'raylet_ip_address': '10.0.1.10',
 'redis_address': '10.0.1.10:6379',
 'object_store_address': '/tmp/ray/session_2020-12-09_17-21-19_496480_27232/sockets/plasma_store',
 'raylet_socket_name': '/tmp/ray/session_2020-12-09_17-21-19_496480_27232/sockets/raylet',
 'webui_url': '127.0.0.1:8265',
 'session_dir': '/tmp/ray/session_2020-12-09_17-21-19_496480_27232',
 'metrics_export_port': 61492,
 'node_id': '62f2d5f7d1d1c844a1d787bbbe9a5cb609186dcf'}

Ray variant of the batched fitter:

In [21]:
@ray.remote
def dist_fit_orbit_batch(df, hdr, tracks):
    return fit_orbit_batch(df, hdr, tracks)

In [22]:
# A utility to divide up the tracklets into smaller chunks
def chunk(k, chunk_size):
     return[ k[i:i + chunk_size] for i in range(0, len(k), chunk_size) ]

#tracks = chunk(df['trkSub'].unique(), chunk_size=10)
#print(len(tracks))
#tracks[-1][-10:]

In [32]:
%%time

def processAdesFile(fn, chunk_size=10, max_tracklets=None):
    # load the file
    df, hdr = read_psv_ades(fn)
    del df["rmsMag"] # workaround for FindOrb bug

    # subdivide it into smaller chunks, with N tracklets each. These
    # chunks will be submitted to individual FindOrb threads to work on.
    tracks = chunk(df['trkSub'].unique(), chunk_size)

    # launch the parallel processing, and wait for the result
    df_id = ray.put(df)
    futures = [
        dist_fit_orbit_batch.remote(df_id, hdr, track_batch)
        for track_batch in tracks[:max_tracklets]
    ]
    chunked_results = ray.get(futures)
    del df_id

    # merge the result
    results = {}
    for d in chunked_results:
        results.update(d)

    return results

# grab this file from https://epyc.astro.washington.edu/~moeyensj/rubin_submissions/ver5/
results = processAdesFile("2022-10-18T09:06:40.464Z_discoveries_01_LSST_TEST.psv", max_tracklets=10)

[2m[36m(pid=27395)[0m tmpdir: /var/folders/0n/3cchn_wj5619fqz4dswygvj80000gn/T/tmpstq8t8ap
[2m[36m(pid=27398)[0m tmpdir: /var/folders/0n/3cchn_wj5619fqz4dswygvj80000gn/T/tmp3holsz2q
[2m[36m(pid=27399)[0m tmpdir: /var/folders/0n/3cchn_wj5619fqz4dswygvj80000gn/T/tmp7uogky20
[2m[36m(pid=27397)[0m tmpdir: /var/folders/0n/3cchn_wj5619fqz4dswygvj80000gn/T/tmp8w5cx82g
[2m[36m(pid=27396)[0m tmpdir: /var/folders/0n/3cchn_wj5619fqz4dswygvj80000gn/T/tmpwg0ii7kf
[2m[36m(pid=27394)[0m tmpdir: /var/folders/0n/3cchn_wj5619fqz4dswygvj80000gn/T/tmpdo7cjc1u
[2m[36m(pid=27392)[0m tmpdir: /var/folders/0n/3cchn_wj5619fqz4dswygvj80000gn/T/tmp_855_if6
[2m[36m(pid=27393)[0m tmpdir: /var/folders/0n/3cchn_wj5619fqz4dswygvj80000gn/T/tmp_w7p9pu9
[2m[36m(pid=27396)[0m tmpdir: /var/folders/0n/3cchn_wj5619fqz4dswygvj80000gn/T/tmpdgt21hi3
[2m[36m(pid=27397)[0m tmpdir: /var/folders/0n/3cchn_wj5619fqz4dswygvj80000gn/T/tmpytq00_j1
[2m[36m(pid=27399)[0m tmpdir: /var/folders/0n/3cchn_wj561

[2m[36m(pid=27394)[0m tmpdir: /var/folders/0n/3cchn_wj5619fqz4dswygvj80000gn/T/tmpfrtr73r0
[2m[36m(pid=27396)[0m tmpdir: /var/folders/0n/3cchn_wj5619fqz4dswygvj80000gn/T/tmp_f01jb52
[2m[36m(pid=27394)[0m tmpdir: /var/folders/0n/3cchn_wj5619fqz4dswygvj80000gn/T/tmpglj9e81d
[2m[36m(pid=27396)[0m tmpdir: /var/folders/0n/3cchn_wj5619fqz4dswygvj80000gn/T/tmp_xxt3lw1
[2m[36m(pid=27394)[0m tmpdir: /var/folders/0n/3cchn_wj5619fqz4dswygvj80000gn/T/tmp0u776k1l
[2m[36m(pid=27396)[0m tmpdir: /var/folders/0n/3cchn_wj5619fqz4dswygvj80000gn/T/tmpv93jy47o
[2m[36m(pid=27394)[0m tmpdir: /var/folders/0n/3cchn_wj5619fqz4dswygvj80000gn/T/tmpy4i3r1p1
[2m[36m(pid=27396)[0m tmpdir: /var/folders/0n/3cchn_wj5619fqz4dswygvj80000gn/T/tmppvsiwnua
[2m[36m(pid=27394)[0m tmpdir: /var/folders/0n/3cchn_wj5619fqz4dswygvj80000gn/T/tmpfroctton
[2m[36m(pid=27396)[0m tmpdir: /var/folders/0n/3cchn_wj5619fqz4dswygvj80000gn/T/tmpbzeik1he
[2m[36m(pid=27394)[0m tmpdir: /var/folders/0n/3cchn_wj561

## Q/A

Check for failiures:

In [33]:
failures = [ result for result in orbits.values() if result['findorb']['returncode'] != 0]
print(len(failures))
failures

0


[]

Construct a dataframe of states:

In [34]:
states = [ result['state_vect'] for result in orbits.values() ]
states = pd.DataFrame(states, index=orbits.keys())
states

Unnamed: 0,0,1,2,3,4,5
t0000000,3.7322,-1.427343,-1.232355,0.001064,0.005572,-9.2e-05
t0000001,4.013681,0.824397,-1.523476,-0.000825,0.00268,-0.000553
t0000002,2.375318,-0.443731,0.379375,0.001031,0.008871,0.0012
t0000003,1.25365,-0.359243,-0.265275,0.009259,0.013286,-0.002807
t0000004,1.620572,-0.258156,0.222494,0.014663,0.008725,-0.000178
t0000005,2.069839,-0.712923,0.317498,0.008977,0.006604,0.00101
t0000006,1.29633,0.044765,-0.765443,-0.004952,0.007241,0.010736
t0000007,3.493377,-0.502076,-0.488515,0.00331,0.003824,-5.3e-05
t0000008,1.837818,0.02008,-0.368643,-0.007236,0.010703,0.003001
t0000009,1.939703,-1.469046,-0.066237,0.008151,0.005376,0.000414


In [35]:
# states1 = states

In [36]:
(states1 == states).all()

0    True
1    True
2    True
3    True
4    True
5    True
dtype: bool