In [None]:
import numpy as np
import matplotlib.pyplot as plt
import pandas as pd
import pickle
from extcats import CatalogPusher
import pymongo, os, subprocess

In [None]:
# Load pickle file we got from Raol
pfile = '/home/jnordin/data/catalogs/MPA_lensed/masterlens_match_ampel.pkl'
df = pd.read_pickle(pfile)

In [None]:
ar = df[ ['ra','dec'] ].hist()

In [None]:
list(df.columns)

In [None]:
# build the pusher object and point it to the raw files.
mqp = CatalogPusher.CatalogPusher(
    catalog_name = 'MPAlensing',             # short name of the catalog
    data_source = '/home/jnordin/data/catalogs/MPA_lensed', # where to find the data
    file_type = 'pkl'
    )

In [None]:
# See if we can use the information as above
mqp.assign_file_reader(
        reader_func = pd.read_pickle,         # callable to use to read the raw_files. 
        read_chunks = False)                  # weather or not the reader process each file into smaller chunks.

In [None]:
# now we have to define a modifier function that acts on the single documents
# (dictionaries) and format them in the way they have to appear in the database.
# in this case we format coordinates in the geoJSON type (this enables mongo to
# support queries in spherical cooridnates), and we assign to each source its
# healpix index on a grid of order 16, corresponding to ~3" resolution.
from healpy import ang2pix
def mqc_modifier(srcdict):
    
    # Make sure we have no duplicate columns
    srcdict = {'system_name':srcdict['system_name'],'ra':srcdict['ra'],'dec':srcdict['dec']}
    
    # format coordinates into geoJSON type (commented version uses 'legacy' pair):
    # unfortunately mongo needs the RA to be folded into -180, +180
    ra=srcdict['ra'] if srcdict['ra']<180. else srcdict['ra']-360.  
    srcdict['pos']={
            'type': 'Point', 
            'coordinates': [ra, srcdict['dec']]
                    }
    #srcdict['pos']=[srcdict['ra'], srcdict['dec']] # This is the legacy coordinate format
    
    # add healpix index
    srcdict['hpxid_16']=int(
        ang2pix(2**16, srcdict['ra'], srcdict['dec'], lonlat = True, nest = True))
    
    return srcdict

mqp.assign_dict_modifier(mqc_modifier)

In [None]:
# fill in the database, creting indexes on the position and healpix ID.
import pymongo
mqp.push_to_db(
    coll_name = 'srcs', 
    index_on = ['hpxid_16', [('pos', pymongo.GEOSPHERE)] ] ,
    index_args = [{}, {}], # specify arguments for the index creation
    overwrite_coll = False, 
    append_to_coll = False)

In [None]:
## Running the test scripts
# define the funtion to test coordinate based queries:
import numpy as np
from healpy import ang2pix, get_all_neighbours
from astropy.table import Table
from astropy.coordinates import SkyCoord
from math import radians
# define your search radius
rs_arcsec = 100.

def test_query_healpix(ra, dec, coll):
    """query collection for the closest point within 
    rs_arcsec of target ra, dec. It uses healpix ID
    to perform the search.
    
    The results as returned as an astropy Table. """
    
    # find the index of the target pixel and its neighbours 
    target_pix = int( ang2pix(2**16, ra, dec, nest = True, lonlat = True) )
    neighbs = get_all_neighbours(2*16, ra, dec, nest = True, lonlat = True)

    # remove non-existing neigbours (in case of E/W/N/S) and add center pixel
    pix_group = [int(pix_id) for pix_id in neighbs if pix_id != -1] + [target_pix]
    
    # query the database for sources in these pixels
    qfilter = { 'hpxid_16': { '$in': pix_group } }
    qresults = [o for o in coll.find(qfilter)]
    if len(qresults)==0:
        return None
    
    # then use astropy to find the closest match
    tab = Table(qresults)
    target = SkyCoord(ra, dec, unit = 'deg')
    matches_pos = SkyCoord(tab['ra'], tab['dec'], unit = 'deg')
    d2t = target.separation(matches_pos).arcsecond
    match_id = np.argmin(d2t)

    # if it's too far away don't use it
    if d2t[match_id]>rs_arcsec:
        return None
    return tab[match_id]


def test_query_2dsphere(ra, dec, coll):
    """query collection for the closest point within 
    rs_arcsec of target ra, dec. It uses mondod spherical
    geometry queries.
    
    The results as returned as an astropy Table. """
    
    
    # fold the RA between -180 and 180.
    if ra > 180:
        ra = ra - 360.
    
    # query and return
    geowithin={"$geoWithin": { "$centerSphere": [[ra, dec], radians(rs_arcsec/3600.)]}}
    qresults = [o for o in coll.find({"pos": geowithin})]
    if len(qresults)==0:
        return None
    
    # then use astropy to find the closest match
    tab = Table(qresults)
    target = SkyCoord(ra, dec, unit = 'deg')
    matches_pos = SkyCoord(tab['ra'], tab['dec'], unit = 'deg')
    d2t = target.separation(matches_pos).arcsecond
    match_id = np.argmin(d2t)

    # if it's too far away don't use it
    if d2t[match_id]>rs_arcsec:
        return None
    return tab[match_id]

# run the test. Here we compare queries based on the 
# healpix index with those based on the 2dsphere mongod support.
mqp.run_test(test_query_healpix, npoints = 10000)
mqp.run_test(test_query_2dsphere, npoints = 10000)

In [None]:
## Adding metadata
mqp.healpix_meta(healpix_id_key = 'hpxid_16', order = 16, is_indexed = True, nest = True)
mqp.sphere2d_meta(sphere2d_key = 'pos', is_indexed = True, pos_format = 'geoJSON')
mqp.science_meta(
    contact =  'R. Canameras', 
    email = 'rcanameras@MPA-Garching.MPG.DE', 
    description = 'Private list contributed by MPA',
    reference = 'File masterlens_match_ampel.pkl')

In [None]:
# Also test searching
from extcats import CatalogQuery

# initialize the CatalogQuery object pointing it to an existsing database
mqc_query = CatalogQuery.CatalogQuery(
        cat_name = 'MPAlensing',           # name of the database
        coll_name = 'srcs',               # name of the collection with the sources
        ra_key = 'ra', dec_key = 'dec',   # name of catalog fields for the coordinates
        dbclient = None)


# specify target position (same format as the 'ra_key' and 
# 'dec_key specified at initilization) and serach radius
target_ra, target_dec, rs = 157.365, 61.251 , 100.

In [None]:
# the 'raw' method does not require any pre-formatting of the catalog.
# It first selects points within a box of radius 'box_scale' times larger than the
# search radius using $gte and $lte operators, then uses the $where expression
# to compute the angular distance of the sources in the box from the target.
out_raw = mqc_query.findwithin(target_ra, target_dec, rs, method = 'raw', box_scale = 2.5)
if not out_raw is None:
    print ("%d sources found around target position using the 'raw' method."%len(out_raw))

# the '2dsphere' method uses instead the use mongodb searches in 
# spherical geometry using "$geoWithin" and "$centerSphere" operators.
# it requires the catalog documents to have been assigned a geoJSON 
# or 'legacy pair' field of type 'Point' (see insert_example notebook).
out_2dsphere = mqc_query.findwithin(target_ra, target_dec, rs, method = '2dsphere')
if not out_2dsphere is None:
    print ("%d sources found around target position using the '2dsphere' method."%len(out_2dsphere))
    print(out_2dsphere['system_name'])


# finally, the healpix method can be used to speed up queries using a 
# spatial prepartinioning of the data based on a HEALPix grid. In this 
# case, the sources in the catalog should be assigned a field containing
# the ID of the healpix that contains it.
out_healpix = mqc_query.findwithin(target_ra, target_dec, rs, method = 'healpix')
if not out_healpix is None:
    print ("%d sources found around target position using the 'healpix' method."%len(out_healpix))
    print(out_healpix['system_name'])
    
out_healpix_square = mqc_query.findwithin(target_ra, target_dec, rs, method = 'healpix', circular = False)
if not out_healpix_square is None:
    print ("%d sources found around target position using the 'healpix' (square) method."%len(out_healpix_square))

In [None]:
# ======================================== #
#    make a plot with the query results    #
# ======================================== #
%matplotlib notebook
import matplotlib.pyplot as plt

# get a random sample from the catalog
cat_pos=[[o['ra'], o['dec']] for o in 
         mqc_query.src_coll.aggregate([{ '$sample': { 'size': 5000 }}])]
cat_ra, cat_dec = zip(*cat_pos)


fig=plt.figure()
ax=fig.add_subplot(111)#, projection="aitoff")

ax.scatter(cat_ra, cat_dec, label="random sample", c="k", s=50, marker="o", zorder=1)
ax.scatter(out_raw['ra'], out_raw['dec'], label="matches (RAW)", c="r", s=100, marker="+")
ax.scatter(out_2dsphere['ra'], out_2dsphere['dec'], label="matches (2D sphere)", c="b", s=100, marker="x")
ax.scatter(out_healpix['ra'], out_healpix['dec'], label="matches (HEALPix)", c="m", s=100, marker="v")
ax.scatter(
out_healpix_square['ra'], out_healpix_square['dec'], label="matches (HEALPix square)", c="g", s=50, marker="v")

ax.scatter(target_ra, target_dec, label='target', s=200, c='y', marker='*', zorder=0)
ax.set_xlim(target_ra-2, target_ra+2)
ax.set_ylim(target_dec-3, target_dec+3)
ax.legend(loc='best')
fig.show()

Final step is to dump catalog into the prefered format for AMPEL ingestion

In [None]:
# Where to store, limit for saving only small subset, name of final catalog
dumpto = "/home/jnordin/data/catalogs/MPA_lensed/"
limit = None    # Small dataset for testing
cat2dump = ['MPAlensing']

In [None]:
# connect to database
c = pymongo.MongoClient()

In [None]:
def mongodump(dbname, where, limit=None):
    '''
    Dump extcat catalog from live mongod session
    '''
    
    cmd="mongodump --db %s --out %s"%(dbname, where)
    
    if limit is None:
        print (subprocess.check_output(cmd,stderr=subprocess.STDOUT,shell=True))
        pass
    else:
        print ("limiting out to the first %d sources"%limit)
        
        # figure out Id of limit + 1 source
        coll = c[dbname]['srcs']
        last_id = list(coll.find().sort('_id', pymongo.ASCENDING).limit(limit))[-1]['_id']
        print ("last ID:", last_id, type(last_id))
        
        # mongodump meta collection
        cmd="mongodump --db %s --collection meta --out %s"%(dbname, where)
        print (subprocess.check_output(cmd,stderr=subprocess.STDOUT,shell=True))
        
        # and the small source collection
        cmd=("mongodump --db %s --collection srcs --query '{_id:{$lte:ObjectId(%s)}}' --out %s"%
            (dbname, '"%s"'%str(last_id), where))
        print (subprocess.check_output(cmd,stderr=subprocess.STDOUT,shell=True))

In [None]:
# dump them all
for dcn in c.list_database_names():
    if dcn in cat2dump:
        print ("dumping catalog %s to dir %s"%(dcn, dumpto))
        mongodump(dcn, dumpto, limit)