# TAP Service Test Notebook

Automated tests for the Table Access Protocol (TAP) service.

## Imports

In [None]:
from lsst.rsp import get_tap_service
from astropy.table import Table

## Service Instantiation

In [None]:
service = get_tap_service("tap")
assert service is not None
print(f"TAP service URL: {service.baseurl}")

## Check VOSI Tables

In [None]:
tables = service.tables
print(f"Found {len(tables)} tables")

## Schema Discovery - List Schemas

In [None]:
results = service.search("SELECT schema_name, description FROM TAP_SCHEMA.schemas")
assert len(results) > 0
print(f"Found {len(results)} schemas")
results.to_table()

## Schema Discovery - List Tables

In [None]:
results = service.search("SELECT table_name, description FROM TAP_SCHEMA.tables WHERE schema_name = 'dp1'")
assert len(results) > 0
print(f"Found {len(results)} tables in dp02_dc2_catalogs")
results.to_table()

## Schema Discovery - List Columns

In [None]:
results = service.search(
    "SELECT column_name, datatype, unit FROM TAP_SCHEMA.columns "
    "WHERE table_name = 'dp1.Object' "
    "ORDER BY column_name"
)
assert len(results) > 0
print(f"Found {len(results)} columns in Object table")

## Schema Discovery - Run asynchronously

In [None]:
results = service.run_async("SELECT TOP 10 * FROM tap_schema.columns")
assert len(results) > 0
print(f"Found {len(results)} columns in Object table")

## Synchronous Query - Small Result Set (TOP 10)

In [None]:
query = """
SELECT TOP 10 objectId, coord_ra, coord_dec
FROM dp1.Object
"""
results = service.search(query)
assert len(results) <= 10
print(f"Retrieved {len(results)} objects")
results.to_table()

## Asynchronous Query - Small Result Set (TOP 10 with 30 second timeout

In [None]:
query = """
SELECT TOP 10 objectId, coord_ra, coord_dec
FROM dp1.Object
"""
job = service.submit_job(query)
job.run()
job.wait(phases=['COMPLETED', 'ERROR'], timeout=30)
if job.phase not in ('COMPLETED', 'ERROR'):
    raise TimeoutError(f"Job timed out after 2 minutes. Current phase: {job.phase}")
if job.phase == 'ERROR':
    job.raise_if_error()
results = job.fetch_result()

## Asynchronous Query - Medium Large Set (TOP 10000)

In [None]:
query = """
SELECT TOP 10000 * 
FROM dp1.Object
"""
results = service.run_async(query)
assert len(results) <= 10000
print(f"Retrieved {len(results)} objects")

## Asynchronous Query - Polygon Search

In [None]:
query = """
SELECT TOP 50 objectId, coord_ra, coord_dec
FROM dp02_dc2_catalogs.Object
WHERE CONTAINS(POINT('ICRS', coord_ra, coord_dec),
               POLYGON('ICRS', 62.0, -37.0, 62.05, -37.0, 62.05, -37.05, 62.0, -37.05)) = 1
"""
results = service.run_async(query)
print(f"Retrieved {len(results)} objects in polygon")
results.to_table()

## Async Query - Submit Job

In [None]:
query = """
SELECT objectId, coord_ra, coord_dec, g_cModelFlux, r_cModelFlux, i_cModelFlux
FROM dp02_dc2_catalogs.Object
WHERE CONTAINS(POINT('ICRS', coord_ra, coord_dec),
               CIRCLE('ICRS', 62.0, -37.0, 0.2)) = 1
"""
job = service.submit_job(query)
print(f"Job submitted with ID: {job.job_id}")
job.run()
print(f"Job started, phase: {job.phase}")

## Async Query - Wait for Completion

In [None]:
job.wait(phases=['COMPLETED', 'ERROR'])
print(f"Job phase: {job.phase}")
if job.phase == 'ERROR':
    job.raise_if_error()
assert job.phase == 'COMPLETED', f"Job failed with phase: {job.phase}"

## Async Query - Fetch Results

In [None]:
results = job.fetch_result()
table = results.to_table()
print(f"Retrieved {len(table)} objects")
assert len(table) > 0
table[:10]

## Async Query - Cleanup

In [None]:
job.delete()
print("Job deleted")

## Async Query (Larger) - Submit Job

In [None]:
query = """
SELECT objectId, coord_ra, coord_dec, 
       g_cModelFlux, r_cModelFlux, i_cModelFlux, z_cModelFlux, y_cModelFlux,
       g_extendedness, r_extendedness
FROM dp02_dc2_catalogs.Object
WHERE CONTAINS(POINT('ICRS', coord_ra, coord_dec),
               CIRCLE('ICRS', 62.0, -37.0, 0.5)) = 1
"""
job_large = service.submit_job(query)
print(f"Job submitted with ID: {job_large.job_id}")
job_large.run()
print(f"Job started, phase: {job_large.phase}")

## Async Query (Larger) - Wait for Completion

In [None]:
job_large.wait(phases=['COMPLETED', 'ERROR'])
print(f"Job phase: {job_large.phase}")
if job_large.phase == 'ERROR':
    job_large.raise_if_error()
assert job_large.phase == 'COMPLETED', f"Job failed with phase: {job_large.phase}"

## Async Query (Larger) - Fetch Results

In [None]:
results_large = job_large.fetch_result()
table_large = results_large.to_table()
print(f"Retrieved {len(table_large)} objects")
assert len(table_large) > 0

## Async Query (Larger) - Cleanup

In [None]:
job_large.delete()
print("Job deleted")

## Table Join Query

In [None]:
query = """
SELECT TOP 50 o.objectId, o.coord_ra, o.coord_dec,
       COUNT(fs.forcedSourceId) AS n_forced_sources
FROM dp02_dc2_catalogs.Object AS o
JOIN dp02_dc2_catalogs.ForcedSource AS fs ON o.objectId = fs.objectId
WHERE CONTAINS(POINT('ICRS', o.coord_ra, o.coord_dec),
               CIRCLE('ICRS', 62.0, -37.0, 0.05)) = 1
GROUP BY o.objectId, o.coord_ra, o.coord_dec
"""
results = service.run_async(query)
print(f"Retrieved {len(results)} objects with forced source counts")
results.to_table()

## ObsCore Query

In [None]:
query = """
SELECT TOP 10 * 
FROM ivoa.ObsCore
"""
results = service.run_async(query)
print(f"Retrieved {len(results)} ObsCore records")
results.to_table()

## Test Table Uploads

In [None]:
query = """
        SELECT TOP 10 * 
        FROM dp1.DiaSource 
        WHERE ssObjectID>0
        """

In [None]:
results = service.run_async(query)

In [None]:
# coords
coords_string = "ssObjectId\tcoord_ra\tdec\tmidpointMjdTai\n" + "\n".join(
    f"{row['ssObjectId']}\t{row['coord_ra']}\t{row['dec']}\t{row['midpointMjdTai']}" 
    for _, row in enumerate(results)
)

In [None]:
ssObjectId_string = "ssObjectId\n" + "\n".join(str(row['ssObjectId']) for row in results)

In [None]:
ut1 = Table.read(coords_string, format='ascii.basic')

In [None]:
query = """
        SELECT * FROM TAP_UPLOAD.ut1 AS ut1
        WHERE ut1.coord_ra > 10
        """

In [None]:
job = service.submit_job(query, uploads={"ut1": ut1})
job.run()

In [None]:
job.wait(phases=['COMPLETED', 'ERROR'])
print('Job phase is', job.phase)
print('Job ID is', job.job.jobid)

In [None]:
results = job.fetch_result()
assert len(results) > 0

In [None]:
job.delete()

In [None]:
ut1 = Table.read(coords_string, format='ascii.basic')

In [None]:
query = """
        SELECT dias.ra, dias.dec, dias.midPointMjdTai, dias.ssObjectId,
        ut1.coord_ra AS ut1_ra, ut1.dec AS ut1_dec, ut1.midPointMjdTai AS ut1_mjd, ut1.ssObjectId AS ut1_id
        FROM dp1.DiaSource AS dias, TAP_UPLOAD.ut1 AS ut1
        WHERE CONTAINS(POINT('ICRS', dias.ra, dias.dec),
        CIRCLE('ICRS', ut1.coord_ra, ut1.dec, 0.00278))=1
        AND ABS(dias.midPointMjdTai - ut1.midPointMjdTai) < 0.5
        ORDER BY dias.ssObjectId
        """

In [None]:
job = service.submit_job(query, uploads={"ut1": ut1})
job.run()

In [None]:
job.wait(phases=['COMPLETED', 'ERROR'])
print('Job phase is', job.phase)
print('Job ID is', job.job.jobid)

In [None]:
results = job.fetch_result()
results.to_table()

In [None]:
assert len(results) > 0

In [None]:
job.delete()

## Test Complete

In [None]:
print("All TAP service tests completed successfully.")