## Test case LVV-T2724: 

### Specification: 
This test covers three requirements relating to high-volume queries on Qserv

####  Requirement ID: DMS-REQ-0354 (Priority: 1b) Result latency for high-volume complex queries<br>
Specification: Complex high-volume queries -– queries that involve full-sky spatial and temporal correlations -- shall be answered in less than hvComplexQueryTime. 
hvComplexQueryTime:  Maximum time allowed for retrieving results of a high-volume complex query. == 12 hours

#### Requirements ID: DMS-REQ-0361 (Priority: 1b)
Specification: The system shall support hvQueryUsers simultaneous high-volume queries running at any given time
hvQueryUsers: Minimum number of simultaneous users performing high volume queries == 50 

### Discussion:
We will use the DP1 catalog fr now. This is a query on Qserv, not the Butler

### Data 
DP1 full scan is teh best we can do 2025-07-22
Eventually erify latency against a full scale, e.g DR1 sized Object catalog


In [None]:
from lsst.rsp import get_tap_service
import time

In [None]:
# Requirements specs

# Maximum time allowed for retrieving results of a high-volume query of the Object table. Hours
hvComplexQueryTime = 12 *60 *60 # seconds

# Minimum number of simultaneous users performing high volume queries.
hvQueryUsers = 50 

In [None]:
service = get_tap_service("tap")
assert service is not None

In [None]:
# Calculate size in gigabytes (GB)
def get_size_in_gb(df): 
    size = df.memory_usage(deep=True).sum()
    size /= 1024 ** 3
    return size

In [None]:
# High-volume complex query involving  full-sky spatial and temporal correlations and use of some internal functions 
# Mag 17 cut: Results in a 1.1 GB dataframe, greater than the limit for "small queries" of 0.5GB
query = """SELECT *, scisql_nanojanskyToAbMag(r_scienceFluxMean) as rScienceMag
        FROM dp1.DiaObject AS diao
        JOIN dp1.ForcedSourceOnDiaObject as fsodo ON fsodo.diaObjectId = diao.diaObjectId
        JOIN dp1.Visit as vis ON vis.visit = fsodo.visit 
        where scisql_nanojanskyToAbMag(r_scienceFluxMean) < 17
"""
print(query)

In [None]:
job = service.submit_job(query)

In [None]:
start_time = time.time()
job.run()
job.wait(phases=['COMPLETED', 'ERROR'])
print('Job phase is', job.phase)
if job.phase == 'ERROR':
    job.raise_if_error()
assert job.phase == 'COMPLETED'
results = job.fetch_result()
end_time = time.time()

In [None]:
execution_time = end_time - start_time
print(f"Execution time: {execution_time}")

In [None]:
df = results.to_table().to_pandas()
df.info(memory_usage='deep')

In [None]:
size = get_size_in_gb(df)
print(f"Size is {size:.1f} GB")

In [None]:
#Pass spec?
print(f"Query run time {execution_time} : Specification time {hvComplexQueryTime}")
assert execution_time <= hvComplexQueryTime

In [None]:
job.delete()
del query, results