<img align="left" src = https://project.lsst.org/sites/default/files/Rubin-O-Logo_0.png width=250, style="padding: 10px"> 
<b>Advanced TAP Queries for DP0 catalogs </b> <br>
Last verified to run on 2021-06-25 with LSST Science Pipelines release w_2021_25 <br>
Contact authors: Leanne Guy <br>
Target audience: All DP0 delegates. <br>
Container Size: medium <br>
Questions welcome at <a href="https://community.lsst.org/c/support/dp0">community.lsst.org/c/support/dp0</a> <br>
Find DP0 documentation and resources at <a href="https://dp0-1.lsst.io">dp0-1.lsst.io</a> <br>

**Credit:** Originally developed by Leanne Guy in the context of the Rubin DP0.1. Please consider acknowledging Leanne Guy if this notebook is used for the preparation of journal articles or software releases.

TODO: check https://confluence.lsstcorp.org/display/DM/058+Find+all+objects+that+are+varying+with+the+same+pattern+as+a+given+object%2C+possibly+at+different+time
and 
https://github.com/lsst/qserv_testdata/blob/8c635e29b8b42087e18601537d0a588fcfb40485/datasets/case01/queries/0006_transientVarObjNearGalaxy.sql

for some good examples

### Learning Objectives

The Rubin Science Platform provides QUERY access to the DP0.1 catalogs via TAP from jupyter notebooks. TAP is a Virtual Observatory protocol for access to catalog data. In this tutorial, we will learn how to exploit some of the more advanced capabilities of ADQL and Qserv to query the DP0.1 archive via TAP. Full TAP documentation can be found [here](https://www.ivoa.net/documents/TAP/). 

Prerequisities: Have completed and understood notebook 02_Intermediate_TAP_Query

This notebook demonstrates how to: <br>
1. Introduce advanced ADQL queries and the science they can enable </br>
2. Read SQL queries from an external file and execute in Python
3. Query data hosted at other archives via TAP, join with DP0.1 data<br>

Resources: 
The following resources may be helpful:
1. [Qserv user Guide](https://github.com/lsst/qserv/blob/master/UserManual.md#sub-queries-are-not-supported)
2. [IVOA ADQL User Guide](https://www.ivoa.net/documents/ADQL/20180112/PR-ADQL-2.1-20180112.html)

### Set Up 

In [None]:
# Import general python packages
import numpy as np
import pandas as pd
import pyvo
import re

from pandas.testing import assert_frame_equal

# Astropy
from astropy import units as u
from astropy.coordinates import SkyCoord

# HealPy 
import healpy as hp

# SQL parse tools 
import sqlparse

# Holoviews
import holoviews as hv
from holoviews import streams, opts
from holoviews.operation.datashader import datashade, dynspread, rasterize
from holoviews.plotting.util import process_cmap
hv.extension('bokeh')

# Bokeh for plotting
from bokeh.io import output_file, output_notebook,  show
from bokeh.plotting import figure
from bokeh.models import HoverTool

# Set the maximum number of rows to display from pandas
pd.set_option('display.max_rows', 100)
                  
# Configure bokeh to generate output in notebook cells when show() is called.
output_notebook()

# Set bokeh as the holoviews backend
hv.extension('bokeh')

In [None]:
# Ignore warnings
import warnings
warnings.filterwarnings('ignore')

# Reduce warnings from logger
import logging
logging.getLogger("flake8").setLevel(logging.FATAL)

### 1. Loading SQL queries from an external file

A common use case is to reuse data returned frm a single query as part of more than on analysis. Hard coding queries in a single notebook makes query reuse difficult. A better strategy is to store ADQL queries in an external file and read them in to any notbeook that will use them. 

In [None]:
# Import the Rubin TAP service utilities
from lsst.rsp import get_tap_service

# Get an instance of the Rubin TAP service
rb_service = get_tap_service()
assert rb_service is not None

In [None]:
# Path to DP0.1 queries
dp01QueryPath = './queries/dp0-1'

In [None]:
# Read the SQL file in 
queryPath = os.path.join(dp01QueryPath, 'dp01Tables.sql')
fd = open(queryPath, 'r')
sql = fd.read()
fd.close()
print(sql)

The entire file is read in, including the comments. This string is not executable as is. We will use the python [sqlparse](https://sqlparse.readthedocs.io/) library to parse our SQL statements. The `format` method allows us to strip the comments.

In [None]:
query = sqlparse.format(sql, strip_comments=True).strip()

In [None]:
results = rb_service.search(query).to_table()

In [None]:
results

In [None]:
# Function to load sql queries from a file
def loadQuery(path):
    """ Load a query from a file and strip out the comments """
    fd = open(path, 'r')
    sql = fd.read()
    fd.close()
    return sqlparse.format(sql, strip_comments=True, reindent=True).strip()

### 2. Deep dive into advanced Qserv capabilities

LSST Query Services (Qserv) provides access to the LSST Database Catalogs in DP0.1 and will the database for all LSST Data Previews and Releases. Qserv has been designed to handle petascale LSST catalogs. 

Qserv supports standard SQL query language with a few restrictions as follows: 
1. Sub queries are not supported.
2. ...

#### 2.1. Advanced ADQL queries with Qserv
In notebook `02_Intermediate_TAP_Query` we introduced basic ADQL including queries of single tables, table joins and selection cuts. In this notebook we are going to take that one step further and look at how we can analyse the millions entries in the LSST catalogs. 

#### 1.1 Histograms 

For petascale datasets, such as that of the LSST,  retrieving millions or billions of entries from Qserv over TAP and then binning or aggregating in a notebook is not efficient and will not scale. Instead, we can use Qserv to reduce and aggregate data via ADQL queries. This can be extremely useful when we want to compute summary statistics across large datasets. 

In `02_Intermediate_TAP_Query` we saw a simple example of how to bin catagorical data using the 'GROUP BY' ADQL command to group the `Objects` in the `truth_match` catalog by type (1: galaxies, 2:stars, 3: SNe), and the 'COUNT' command to count the number of Objects in each category. 

To recap, here is that query again

In [None]:
query_truth_type = loadQuery(os.path.join(dp01QueryPath, 'truthTypeCount.sql'))

In [None]:
%%time
# We remove the index so that it doesn't show up in the hover tool on the plots.
object_types = rb_service.search(query_truth_type).to_table().to_pandas(index=False)

Note the time taken by this query is of the order of 10 secs. It would take considerably longer to retrieve the entire truth match catalog and bin it in the notebook. 

In [None]:
# Map the numerical values for each truth type to a more descriptive name
# Catalog schema for the truth table can be found at:
#  https://dp0-1.lsst.io/data-products-dp0-1/index.html#catalogs
object_map = {1: 'Galaxy', 2: 'Star', 3: 'SNe'}
object_types['truth_type'] = object_types['truth_type'].map(object_map)

In [None]:
object_types

Now let's look at computations on non-categorical data. We will run these as asynchronous queries as they will take a few minutes. We will query the forced photometry catalog, joining on the object catalog, to look at the distribution of elliptical Gaussian adaptive moments (pixels^2), given by the parameter `i_base_SdssShape_xy`, for objects detected in a single tract, tract 2723. The parameter `i_base_SdssShape_xy` is specified as a `double` in Qserv, so we will use the SQL `ROUND` function to round the values to 0 decimal places. Thanks to Douglas Tucker for inspiration on these queries.

In [None]:
query = "SELECT ROUND(fp.i_base_SdssShape_xy, 0) as bin, "\
        "COUNT(*) as count "\
        "FROM dp01_dc2_catalogs.forced_photometry as fp "\
        "JOIN dp01_dc2_catalogs.object as obj ON fp.objectId = obj.objectId "\
        "WHERE obj.tract=2723 "\
        "GROUP BY bin ORDER BY bin"
#query = loadQuery(os.path.join(dp01QueryPath, 'iBandShapeByTract.sql'))

Let's breakdown this query. We join the `forced_photometry` catalog with the `object` catalog to select only those objects in tract 2723.  .....

In [None]:
job = rb_service.submit_job(query)

In [None]:
job.run()

In [None]:
job.wait(phases=['COMPLETED', 'ERROR'])
print('Job phase is', job.phase)

In [None]:
results_tract_2723 = job.fetch_result().to_table().to_pandas()

In [None]:
results_tract_2723

The binning here is clearly not optimal. Lets look at defining a more appropriate bin size so we can better understand the shape of the distribution. Let's use 10 bins by specifying the width of the bucket. We round each value of `i_base_SdssShape_xy` down to the nearest multiple of 5 (rather than 1 previously) and then group by that rounded value. 

We also notice that the original query contained `NaN` values. Let's exclude them and also apply a quality cut on the i-band results using th `i_good` flag. 

In [None]:
bin_width = 10
query = "SELECT floor(fp.i_base_SdssShape_xy/" + str(bin_width) + ")*" + str((bin_width)) + " as edge, "\
        "COUNT(*) as count " \
        "FROM dp01_dc2_catalogs.forced_photometry as fp " \
        "JOIN dp01_dc2_catalogs.object as obj ON fp.objectId = obj.objectId " \
        "WHERE obj.tract=2723 " \
        "AND i_good = 1 AND i_base_SdssShape_xy != 'NaN' " \
        "GROUP BY edge " \
        "ORDER BY edge ASC"
print(query)

In [None]:
job = rb_service.submit_job(query)

In [None]:
job.run()

In [None]:
job.wait(phases=['COMPLETED', 'ERROR'])
print('Job phase is', job.phase)

In [None]:
binned_tract_2723 = job.fetch_result().to_table()

In [None]:
binned_tract_2723

What changes now ..... 

In [None]:
query = "SELECT floor(fp.i_base_SdssShape_xy/10.0)*10 as edge, "\
        "COUNT(*) as count " \
        "FROM dp01_dc2_catalogs.forced_photometry as fp " \
        "JOIN dp01_dc2_catalogs.object as obj ON fp.objectId = obj.objectId " \
        "WHERE obj.tract=2723 " \
        "AND i_good = 1 AND i_base_SdssShape_xy != 'NaN' " \
        "GROUP BY edge " \
        "ORDER BY edge"
print(query)

Now lets look at the distribution 

In [None]:
query = "SELECT ROUND(fp.i_base_SdssShape_xy, -2) AS bucket, "\
        "COUNT(*) AS COUNT, "\
        "RPAD('', LN(COUNT(*)), '*') AS bar "\
        "FROM   dp01_dc2_catalogs.forced_photometry as fp "\
        "WHERE obj.tract=2723 "\
        "AND i_good = 1 AND i_base_SdssShape_xy != 'NaN' " \
        "GROUP BY bucket ORDER  BY bucket"
print(query)

In [None]:
query = "SELECT obj.tract, 
        "ROUND(fp.i_base_SdssShape_xy, 1) as bin, "\
        "COUNT(*) " \
        "FROM dp01_dc2_catalogs.forced_photometry as fp " \
        "JOIN dp01_dc2_catalogs.object as obj " \
        "ON fp.objectId = obj.objectId " \
        "GROUP BY obj.tract " \
        "ORDER BY obj.tract ASC"
print(query)
results_all_tracts = rb_service.search(query).to_table().to_pandas()

In [None]:
hv.Histogram(binned_tract_2723, 
             kdims=['edge'], vdims=['count'])

Comments on the timing for these two queries. Why does the second take the same amount of time as the first

In [None]:
# Example of a query to do a logarithmic binning of some forced photometry fluxes in a cone of radius 1.0 degree. 
query = "SELECT COUNT(*), FLOOR(LOG10(i_modelfit_CModel_initial_instFlux)) as BIN "\
        "FROM dp01_dc2_catalogs.forced_photometry "\
        "WHERE CONTAINS(POINT('ICRS', coord_ra, coord_dec), CIRCLE('ICRS', 60.0, -35.0, 1.0))=1 "\
        "GROUP BY BIN ORDER BY BIN"
print(query)

#### 1.1.2 Construct a CMD using ADQL 
Now we will look at some other more complex uses of the 'GROUP BY' functionality. Construct a CMD with bin size = BIN_SIZE

In [None]:
select
  bp_rp_index / 10 as bp_rp,
  g_mag_abs_index / 10 as g_mag_abs,
  count(*) as n
from (
     ## DC2 add in a cut on redshift  and some cuts on the quality of the data
  select top 1000000 source_id,
    floor((phot_g_mean_mag+5*log10(parallax)-10) * 10) as g_mag_abs_index,
    floor(bp_rp * 10) as bp_rp_index
  from gaiadr2.gaia_source
  where parallax_over_error >= 5 and
    phot_bp_mean_flux_over_error > 0 and
    phot_rp_mean_flux_over_error > 0 and
    sqrt(
      power(2.5/log(10) / phot_bp_mean_flux_over_error, 2)
      + power(2.5/log(10) / phot_rp_mean_flux_over_error, 2)
    ) <= 0.05
  order by random_index
)as subquery
group by bp_rp_index, g_mag_abs_index

#### 1.2 Aggregation methods in ADQL

### 3. Query archives at external data centres via TAP

#### 3.1 Create TAP Service clients to access data at Rubin and other external data centres.  

In notebook '02_Intermediate_TAP_Query' we saw how to use the Rubin provided TAP service to access DP0.1 data. Similarly, 
most astronomical archives proved a TAP service to access the data stored at their archive.  We are going to learn at how to access the catalogs 
stored at other astronimical archives over TAP from the Rubin Science Platform. 

**Hazard Warning:** Not all ADQL functionality is supported yet in the DP0 RSP.

#### 3.2 Create a TAP service to query external archives
For this example, we'll use 1) The Gaia Archive's TAP service at ESAC, 2) NOIRLab's AstroDataLab and 3) <X>

In [None]:
# Gaia Archive TAP service 
gea_tap_url = "https://gea.esac.esa.int/tap-server/tap"
gea_service = pyvo.dal.TAPService(gea_tap_url)
assert gea_service is not None
assert gea_service.baseurl == gea_tap_url

In [None]:
# Query the Gaia archive to see what data they expose
query = "select * from tap_schema.schemas"
gea_schemas = gea_service.search(query).to_table()
gea_schemas

The Gaia archive make available Gaia DR1, DR2 and Gaia Early DR3 over TAP

In [None]:
# Datalab TAP service
dl_tap_url = "https://datalab.noirlab.edu/tap"
dl_service = pyvo.dal.TAPService(dl_tap_url)
assert dl_service is not None
assert dl_service.baseurl == dl_tap_url

In [None]:
# Query the DataLab TAP schema to see what data they expose
query = "select * from tap_schema.schemas"
dl_schema = dl_service.search(query).to_table()
dl_schema

The DataLab TAP service makes available a large number of catalogs from many surveys that are hosted at the NOIRLab Community Science Data Centre (CSDC), including Gaia DR2 and EDR3 that we saw above. 

#### 3.3 Query and retrieve data from Gaia DR2 for variable stars and plot  .. or maybe  show that the galazy is rotating (RV/DR2)

1) Query for a given 2 source Ids and get the URL to the epoch data. Then retrieve the VOTable from the link and plot the time series. 

2) Show the density distribution of radial velocities in Gaia DR2.  (Gaia Collaboration, Katz et al. 2019 A&A 622, A205; adapted Fig. 7). We will reproduce some of the content in Gaia Data Release 2 Mapping the Milky Way disc kinematics. See also this [SciAm article]( https://www.americanscientist.org/article/gaia-reveals-the-milky-way)

In [None]:
# First explore the Gaia schema to find details of the tables
query = "SELECT * FROM tap_schema.tables "\
        "WHERE tap_schema.tables.schema_name = 'gaia_dr2' AND table_name like '%source%' " 
gaia_edr3_tables= dl_service.search(query).to_table().to_pandas()
#print(gaia_edr3_tables)

In [None]:
# how many RV are there in the Gaia DR2 catalog. 
query = "select count(*) as num_rv_stars from gaiadr2.gaia_source where radial_velocity IS NOT NULL AND ABS(radial_velocity) < 550"

In [None]:
num_rv_stars = gea_service.search(query)
# assert 7224631  == num_rv_stars, f"Expected 7224632, got {num_rv_stars}"

In [None]:
# Query the source table to retrieve the average distribution of Gaia DR2 radial velocities in Galactic Coordinates. 
# Note that if we queried the source table to retrieve all 7.2 million the RVs and then aggregatted them in this NB, you would kill it. 
# Instead, we will transform the Gaia source_id for those sources with a radial velocityto HEALPix number using the built 
# in Gaia archive function GAIA_HEALPIX_INDEX within the ADQL query 
# and then group the Gaia sources by healpix and compute the average radial This demonstrates the power of ADQL's aggregation functionality 
# It also shows that remote built in functions are also available over TAP. 
# These numbers have been aggregated in sky bins of 0.84 deg2 (level 6 HEALPix) and their mean value is shown.
# HP7: 0.21 sq deg. 
# Always read the Gaia DR2 Primer before workng with Gaia data.  https://www.cosmos.esa.int/web/gaia/gaia-dr2-primer
query = "SELECT GAIA_HEALPIX_INDEX(7, source_id) AS healpix7, avg(radial_velocity) AS avg_radial_velocity " \
        "FROM gaiadr2.gaia_source "\
        "WHERE radial_velocity IS NOT NULL "\
        "AND ABS(radial_velocity) < 550 " \
        "GROUP BY healpix7"

In [None]:
# This could take a while so let's run it as an asynchronous query. Note that queries that take too long will timeout. 
job = gea_service.submit_job(query)
print(f"'Job with URL {job.url} is in {job.phase} phase")

In [None]:
job.run()

In [None]:
# Pause notebook execution until the asynchronous job finshes 
job.wait(phases=['COMPLETED', 'ERROR'])
print('Job phase is', job.phase)

In [None]:
# fetch results 
gaia_rv_density_distribution = job.fetch_result().to_table().to_pandas()

In [None]:
# Now we have a much more manageable dataset and a lot of the work of aggregation has been done within the Gaia archive centre. 
print(len(gaia_rv_density_distribution))

##### 2.2.1 Plot 
Plot the RVs. This figure was published in Gaia DR2, Gaia Collaboration, Katz et al. 2019 A&A 622, A205; adapted Fig. 7). It is not exactly the same

A reminder of some heapix basics. We will use the healpy package for healix calculations. The Gaia archive provides a 
npix = 12 * nside ** 2
nside = 2**order (level). So for heapix level 7. nside is 128. 

hp.nside2npix
hp.npix2order
hp.order2npix

In [None]:
nside=aphp.level_to_nside(7)
nside = hp.order2nside(7)
m = np.zeros(hp.nside2npix(nside))
idx = np.array(gaia_rv_density_distribution['healpix7'])
counts = np.array(gaia_rv_density_distribution['avg_radial_velocity'])
m[idx] = counts

In [None]:
import matplotlib as mpl
mpl.rcParams.update({'font.size': 10})

hp.mollview(m, nest='True', coord=['C','G'], 
            title="Gaia DR2 Average RV", 
            cmap=mpl.cm.jet,
            norm="hist")
hp.graticule()

#### 2.3 Gaia Sky Density distribution

In [None]:
# Source Sky density distribution
query = "SELECT gaia_healpix_index(7, source_id) AS healpix7, count(*)/0.9161 AS stars_per_sq_deg " \
        "FROM gaiaedr3.gaia_source " \
        "GROUP BY healpix7"

In [None]:
# This could take a while so let's run it as an asynchronous query. Note that queries that take too long will timeout. 
job = gea_service.submit_job(query)
print(f"'Job with URL {job.url} is in {job.phase} phase")

In [None]:
job.run()

In [None]:
job.wait(phases = ['COMPLETED', 'ERROR'])

In [None]:
results = job.fetch_result().to_table().to_pandas()

In [None]:
gaia_sky = np.zeros(hp.nside2npix(hp.order2nside(7)))
gaia_sky[np.array(results['healpix7'])] = np.array(results['stars_per_sq_deg'])

In [None]:
hp.mollview(gaia_sky,
            nest=True, coord=['C','G'],
            norm='log', title='Gaia Sky Density')
# hp.graticule()

#### 2.4 Synergies between LSST and Gaia
We can clearly see the complementarity between LSST and Gaia by looking a the distribution of uncertainties for photometry, astrometry and proper motion as a function of magnitude. Here we use the DC2 data and data from Gaia DR2/EDR3 to plot the uncertainty as a function of magnitude for for both (reference original plots)

https://github.com/agabrown/PyGaia/blob/master/pygaia/errors/astrometric.py

In [None]:
query = "SELECT objectid, ra, dec, mag_r, magerr_r, mag_r_cModel, magerr_r_cModel "\
        "FROM dp01_dc2_catalogs.object " \
        "WHERE mag_r_cModel != 'NaN' " \
        "AND mag_r_cModel < 29"
print(query)

In [None]:
results = rb_service.search(query, maxrec=20)
results = results.to_table().to_pandas()
#lsst_results

In [None]:
query = "SELECT sourceid, ra, dec, parallax_error, phot_g_mean_mag, parallax_over_error, "\
        "pm, pmra, pmra_error, pmdec, pmdec_error " \
        "FROM gaiaedr3.gaia_source " \
        "WHERE mag_r_cModel != 'NaN' " \
        "AND mag_r_cModel < 29"
print(query)

In [None]:
query = "SELECT TOP 10 ASTROMETRIC_PARAMETER_ERROR(" \
        "ra_error, dec_error, parallax_error, pmra_error, pmdec_error, "\
        "ra_dec_corr, ra_parallax_corr, ra_pmra_corr, ra_pmdec_corr, dec_parallax_corr, "\
        "dec_pmra_corr, dec_pmdec_corr, parallax_pmra_corr, parallax_pmdec_corr, pmra_pmdec_corr, "\
        "parallax, radial_velocity, radial_velocity_error) " \
        "FROM gaiadr2.gaia_source"
print(query)

In [None]:
results_gaia = gea_service.search(query)