# MERIT Query

## Initialization

In [None]:
import logging

%matplotlib inline

consoleHandler = logging.StreamHandler()
consoleHandler.setFormatter(logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s'))
logging.getLogger().addHandler(consoleHandler)

# %env

In [None]:
from typing import Optional
import matplotlib.pyplot as plt

import pandas
import dask.dataframe as dd

from jupiter_csg_query.core import URL, Selector, Result
from jupiter_csg_query.util import generator
from jupiter_csg_query.source import merit
from jupiter_csg_query.runtime.batch import BatchProvider

logging.getLogger('jupiter.csg.query').setLevel(logging.DEBUG)

In [None]:

def show_points(df:pandas.DataFrame):
    xs = df['longitude']
    ys = df['latitude']
    plt.scatter(xs, ys)
    plt.show()

# Context

In [None]:
# current user
user = 'test-user'

# location for source, work-area and output files
source_url = URL(f"s3://jupiter-reference-data/MERIT/hydro")
work_area_url = URL(f's3://jupiter-climatescoreglobal-eos/reports/query/{user}/')
output_url = URL(f's3://jupiter-climatescoreglobal-eos/reports/{user}/test_set_dem.csv')

## Open Query Batch Source

In [None]:
# runtime query executor
provider = BatchProvider(
    work_area_url=work_area_url,
    parallelism=1 * 32,
    job_queue='csg-global-prod-validation',
    job_definition='csg-query-dev:11',
    region='us-east-1'
)

# source instance to run queries
merit_query = merit.QuerySource(
    source_url=source_url,
    provider=provider,
)

merit_query

## Prepare input for the query

### Load input/points from a single file

In [None]:
# input_url = 's3://jupiter-climatescoreglobal-eos/reports/test/hotels_70k_CSG_input.csv'
input_url = "s3://jupiter-climatescoreglobal-eos/reports/internal-projects/CSG_40K_Samples_Verification_w_tileid.csv"
df_input = pandas.read_csv(input_url, nrows=20)
df_input.info()

In [None]:
# input_url = 's3://jupiter-climatescoreglobal-eos/reports/input/*.csv'
# df_input = dd.read_csv(input_url).compute()

### Visualize the points

In [None]:
show_points(df_input)

### Specify selection

In [None]:
# Elevation selector
'''
elevationSelector = {
    "variable": {"BiasCorrected"},
    "version": {8},
    "measure": {"elevation"},
}
'''

# Slope selector
slopeSelector = {
    "variable": {"BiasCorrected"},
    "version": {8},
    "measure": {"slope"},
    "suffix": {"_slope_riserun"},
}
slopeHifiSelector = {
    "variable": {"BiasCorrected"},
    "version": {8},
    "measure": {"slope_hifi"},
    "suffix": {"_slope_riserun"},
}

# Drainage area does not have a version or a metric
drainageAreaSelector = {
    "variable": {"DrainageArea"},
}


selector = {
    # "elevation-scenario-1": elevationSelector,
    "slope": slopeSelector,
    "slope-hifi": slopeHifiSelector,
    "drainageArea": drainageAreaSelector,
}


## Launch query batch pipeline 

In [None]:
# execute the query, it will block till completion
# input: pandas.DataFrame with ['latitude', 'longitude'] fields, the names are reserved
# dry_run: if `True` it will not execute batch but verify parameters

# select needed columns

result: Result = merit_query.query(
    input=df_input[['locationId','latitude', 'longitude']],
    selector=selector,
    pipeline='gdal',
    # format='parquet',
    tag='descriptions',
    dry_run=False
)

### Load result from into dask DataFrame

_Note:_ `nodata` values returned as `np.nan`

In [None]:
# load result into pandas DataFrame

if result is not None:
    for scenario in selector:
        df = result.read(scenario)
        df.info()

Unused for now:
```
elevation_scenario_1 = result.read('elevation-scenario-1')
elevation_scenario_1
```

In [None]:
slope = result.read('slope', dtype={"locationId": int}).set_index("_id_")
slope = slope.rename(columns={"value-Slope - riserun": "center_slope"})
slope = slope.drop(columns=["variable", "version", "suffix", "measure"])

In [None]:
slope_hifi = result.read('slope-hifi', dtype={"locationId": int}).set_index("_id_")
slope_hifi = slope_hifi.rename(columns={"value-Slope - riserun": "center_slope_hifi"})
slope_hifi = slope_hifi.drop(columns=["variable", "version", "suffix", "measure"])

In [None]:
drainageArea = result.read('drainageArea', dtype={"locationId": int}).set_index("_id_")
drainageArea = drainageArea.rename(columns={"value-0": "center_drainageArea"})
drainageArea = drainageArea.drop(columns=["variable", "version", "suffix", "measure"])

In [None]:
if "tile" not in df_input.columns:
    df_input = df_input.rename(columns={"tileid": "tile"})

merge_cols = ["tile", "locationId", "latitude", "longitude"]
df_out = df_input.merge(
    slope, on=merge_cols
).merge(
    slope_hifi, on=merge_cols
).merge(
    drainageArea, on=merge_cols
).rename(
    columns={"tile": "tileId"}
)
df_out.head()

In [None]:
df_out.to_csv(str(output_url), index=False)