In [2]:
database_name = 'ismn.duckdb'
force_rebuild_duckdb = True

In [3]:
import glob
from pathlib import Path
from metacatalog_api import core
from metacatalog_api import db

import duckdb

# build the Duckdb path
db_path = Path('/out') / database_name
db_path

postgresql://postgres:postgres@db:5432/metacatalog


PosixPath('/out/ismn.duckdb')

In [4]:
# check if this MetaCatalog instance is a new one
with core.connect() as con:
    if not db.check_installed(con):
        db.install(con)

In [8]:
all_files = glob.glob('/data/ismn/**/**/*.stm')
print(len(all_files))

25349


In [9]:
from tqdm import tqdm
import pandas as pd
import io

buffer = io.StringIO()
buffer.write("network,station,station_name,lat,lon,elevation,depth_from,depth_to,variable,device,filename\n")
def parse_file(path):
    with open(path, 'r') as f:
        raw_header = f.readline()
        header = [c.strip() for c in raw_header.split(' ') if c != '']

        # we overwrite the network and station information from the file, as these differ
        path_chunks = Path(path).name.split('_')
        
        # quote the network and station name
        header[0] = f'"{path_chunks[0]}"'
        header[1] = f'"{path_chunks[1]}"'
        header[2] = f'"{path_chunks[2]}"'
        static = ','.join(header[:8])
        device = ' '.join(header[8:]).replace(',', '_')
        line = f"{static},{path_chunks[3]},{device},\"{Path(path).name}\"\n"

        return line
        # return [*header[:8], ' '.join(header[8:]), Path(path).name]


for file_name in tqdm(all_files):
    header_line = parse_file(file_name)
    buffer.write(header_line)

buffer.seek(0)
raw_header = pd.read_csv(buffer, quotechar='"')

raw_header.insert(0, 'id', range(1, len(raw_header) + 1))
raw_header

100%|██████████| 25349/25349 [00:07<00:00, 3513.15it/s]


Unnamed: 0,id,network,station,station_name,lat,lon,elevation,depth_from,depth_to,variable,device,filename
0,1,AWDN,AWDN,NorthPlatte,41.05000,-100.46000,861.0,0.10,0.10,sm,ThetaProbe ML2X,AWDN_AWDN_NorthPlatte_sm_0.100000_0.100000_The...
1,2,AWDN,AWDN,NorthPlatte,41.05000,-100.46000,861.0,0.50,0.50,sm,ThetaProbe ML2X,AWDN_AWDN_NorthPlatte_sm_0.500000_0.500000_The...
2,3,AWDN,AWDN,NorthPlatte,41.05000,-100.46000,861.0,1.00,1.00,sm,ThetaProbe ML2X,AWDN_AWDN_NorthPlatte_sm_1.000000_1.000000_The...
3,4,AWDN,AWDN,NorthPlatte,41.05000,-100.46000,861.0,0.25,0.25,sm,ThetaProbe ML2X,AWDN_AWDN_NorthPlatte_sm_0.250000_0.250000_The...
4,5,AWDN,AWDN,Smithfield,40.35000,-99.40000,768.0,1.00,1.00,sm,ThetaProbe ML2X,AWDN_AWDN_Smithfield_sm_1.000000_1.000000_Thet...
...,...,...,...,...,...,...,...,...,...,...,...,...
25344,25345,IMA-CAN1,IMA-CAN1,station3,44.68241,8.62657,272.7,0.10,0.10,sm,5TM,IMA-CAN1_IMA-CAN1_station3_sm_0.100000_0.10000...
25345,25346,IMA-CAN1,IMA-CAN1,station10,44.68275,8.62636,278.5,0.10,0.10,sm,5TM,IMA-CAN1_IMA-CAN1_station10_sm_0.100000_0.1000...
25346,25347,IMA-CAN1,IMA-CAN1,station10,44.68275,8.62636,278.5,0.10,0.10,ts,5TM,IMA-CAN1_IMA-CAN1_station10_ts_0.100000_0.1000...
25347,25348,IMA-CAN1,IMA-CAN1,station11,44.68253,8.62671,272.6,0.10,0.10,sm,5TM,IMA-CAN1_IMA-CAN1_station11_sm_0.100000_0.1000...


In [6]:
# drop the database if it exists and the user wants to force rebuild it

if db_path.exists() and force_rebuild_duckdb:
    print(f"The database {db_path} already exists, but is forced to be dropped...")
    db_path.unlink()

The database /out/ismn.duckdb already exists, but is forced to be dropped...


In [None]:
# build the DuckDB database
with duckdb.connect(str(db_path), read_only=False) as db:
    db.sql("CREATE TABLE IF NOT EXISTS raw_metadata AS SELECT * FROM raw_header;")


In [23]:
from metacatalog_api import core
print(raw_header.variable.unique())

# match these names to something we can use
# there is some info here: https://ismn.earth/media/filer_public/1f/4f/1f4f1b03-550b-4b63-b680-fc9695d6feec/data_template_description_28082023.pdf
# sm = soil moisture
# ts = soil temperature
# ta = air temperature
# p = precipitation
# sd = snow-depth
# sweq = snow water equivalent
lookup = {
    'sm': 12,
    'ts': 2,
    'ta': 1,
    'p': 8,
    'su': 15
}

variables = {short: core.variables(id=id)[0] for short, id in lookup.items()}
variables

['sm' 'ts' 'ta' 'p' 'sd' 'tsf' 'sweq' 'su']


{'sm': Variable(id=12, name='volumetric water content', symbol='theta', unit=Unit(id=113, name='cm3/cm3', symbol='cm3/cm3'), column_names=['volumetric_water_content'], keyword=Keyword(id=5727, uuid='bbe2ea34-8842-4a9f-9b0b-95dd3c71857f', value='SOIL MOISTURE/WATER CONTENT', path='EARTH SCIENCE > LAND SURFACE > SOILS > SOIL MOISTURE/WATER CONTENT', thesaurusName=Thesaurus(id=1, uuid='2e54668d-8fae-429f-a511-efe529420b12', name='GCMD', title='NASA/GCMD Earth Science Keywords', organisation='NASA', url='https://gcmdservices.gsfc.nasa.gov/kms/concepts/concept_scheme/sciencekeywords/?format=xml', description='NASA Global Clime change Master Dictionary Science Keywords'))),
 'ts': Variable(id=2, name='soil temperature', symbol='Ts', unit=Unit(id=101, name='degree Celsius', symbol='C'), column_names=['soil_temperature'], keyword=Keyword(id=5736, uuid='0546b91a-294d-45d9-8b45-76aaad0cc024', value='SOIL TEMPERATURE', path='EARTH SCIENCE > LAND SURFACE > SOILS > SOIL TEMPERATURE', thesaurusName=

## Load static properties

In [11]:
all_meta = glob.glob('/data/ismn/**/**/*.csv')
print(f"Found {len(all_meta)} files")

Found 3210 files


In [12]:
def get_static_attributes(path):
    columns = ['quantity_name', 'unit', 'depth_from', 'depth_to','value','description','quantity_source_name', 'quantity_source_description', 'quantity_source_provider', 'quantity_source_version', 'quantity_source_resolution', 'quantity_source_timerange', 'quantity_source_url']

    # get the network and station name from the file name
    network, station, station_name, *_ = Path(path).name.split('_')

    # check if the file has a valid header
    with open(path, 'r') as f:
        if not f.readline().startswith(columns[0]):
            attrs = pd.read_csv(path, sep=';', header=None, names=columns)
        else:
            attrs = pd.read_csv(path, sep=';')

    # add the network and station identifiers to the attributes
    attrs.insert(0, 'network', network)
    attrs.insert(1, 'station', station)
    attrs.insert(2, 'station_name', station_name)

    # rename depth_from and depth_to columns
    attrs.rename(columns={'depth_from[m]': 'depth_from', 'depth_to[m]': 'depth_to'}, inplace=True)
    
    # return
    return attrs.dropna(axis='columns', how='all')
    #return attrs

statics = get_static_attributes(all_meta[0]) 
statics

Unnamed: 0,network,station,station_name,quantity_name,unit,depth_from,depth_to,value,description,quantity_source_name,quantity_source_description,quantity_source_provider,quantity_source_version,quantity_source_resolution,quantity_source_timerange,quantity_source_url
0,AWDN,AWDN,NorthPlatte,saturation,m^3*m^-3,0.0,0.3,0.46,,HWSD,Harmonized World Soil Database v1.1 by IIASA,IIASA,v1.1,"30""",,http://webarchive.iiasa.ac.at/Research/LUC/Ext...
1,AWDN,AWDN,NorthPlatte,clay fraction,% weight,0.0,0.3,23.00,,HWSD,Harmonized World Soil Database v1.1 by IIASA,IIASA,v1.1,"30""",,http://webarchive.iiasa.ac.at/Research/LUC/Ext...
2,AWDN,AWDN,NorthPlatte,organic carbon,% weight,0.0,0.3,1.27,,HWSD,Harmonized World Soil Database v1.1 by IIASA,IIASA,v1.1,"30""",,http://webarchive.iiasa.ac.at/Research/LUC/Ext...
3,AWDN,AWDN,NorthPlatte,sand fraction,% weight,0.0,0.3,36.00,,HWSD,Harmonized World Soil Database v1.1 by IIASA,IIASA,v1.1,"30""",,http://webarchive.iiasa.ac.at/Research/LUC/Ext...
4,AWDN,AWDN,NorthPlatte,silt fraction,% weight,0.0,0.3,41.00,,HWSD,Harmonized World Soil Database v1.1 by IIASA,IIASA,v1.1,"30""",,http://webarchive.iiasa.ac.at/Research/LUC/Ext...
5,AWDN,AWDN,NorthPlatte,saturation,m^3*m^-3,0.3,1.0,0.44,,HWSD,Harmonized World Soil Database v1.1 by IIASA,IIASA,v1.1,"30""",,http://webarchive.iiasa.ac.at/Research/LUC/Ext...
6,AWDN,AWDN,NorthPlatte,clay fraction,% weight,0.3,1.0,29.00,,HWSD,Harmonized World Soil Database v1.1 by IIASA,IIASA,v1.1,"30""",,http://webarchive.iiasa.ac.at/Research/LUC/Ext...
7,AWDN,AWDN,NorthPlatte,organic carbon,% weight,0.3,1.0,0.57,,HWSD,Harmonized World Soil Database v1.1 by IIASA,IIASA,v1.1,"30""",,http://webarchive.iiasa.ac.at/Research/LUC/Ext...
8,AWDN,AWDN,NorthPlatte,sand fraction,% weight,0.3,1.0,29.00,,HWSD,Harmonized World Soil Database v1.1 by IIASA,IIASA,v1.1,"30""",,http://webarchive.iiasa.ac.at/Research/LUC/Ext...
9,AWDN,AWDN,NorthPlatte,silt fraction,% weight,0.3,1.0,42.00,,HWSD,Harmonized World Soil Database v1.1 by IIASA,IIASA,v1.1,"30""",,http://webarchive.iiasa.ac.at/Research/LUC/Ext...


In [10]:
from tqdm import tqdm

def check_static_table_exists(db_path) -> bool:
    with duckdb.connect(str(db_path), read_only=True) as db:
        n = db.sql("SELECT COUNT(*) as n FROM information_schema.tables WHERE table_name = 'static';").fetchone()[0]
    return n > 0

table_exists = check_static_table_exists(db_path)

if table_exists:
    with duckdb.connect(str(db_path), read_only=False) as db:
        db.sql("DROP TABLE static;")
        table_exists = False
n_errors = 0
error_msgs = []
for meta_file in tqdm(all_meta):
    # load the static properties
    statics = get_static_attributes(meta_file)

    # load to the database
    with duckdb.connect(str(db_path), read_only=False) as db:
        if not table_exists:
            db.sql("CREATE TABLE raw_statics AS SELECT * FROM statics;")
            table_exists = True
        else:
            try:
                db.sql("INSERT INTO raw_statics BY NAME SELECT * FROM statics;")
            except Exception as e:
                error_msgs.append(f"{meta_file}: Error inserting raw_statics: {e}")
                n_errors += 1
                continue

print(f"Ran into {n_errors} errors.")

100%|██████████| 3210/3210 [02:11<00:00, 24.35it/s]

Ran into 55 errors.





In [11]:
try:
    from json2args.logger import logger
    for msg in error_msgs:
        logger.error(msg)
except ImportError:
    print("It seems like you are running this notebook outside of a tool-spec container. Logging is not supported here.\nThis is what I wanted to log:\n\n")
    print('\n'.join(error_msgs))

[ERROR]: /data/RUSWET-AGRO/KrasnojarskiKrai#2/RUSWET-AGRO_RUSWET-AGRO_KrasnojarskiKrai#2_static_variables.csv: Error inserting raw_statics: Conversion Error: Could not convert string 'ET' to DOUBLE


[ERROR]: /data/RUSWET-AGRO/Gurjvskaya/RUSWET-AGRO_RUSWET-AGRO_Gurjvskaya_static_variables.csv: Error inserting raw_statics: Conversion Error: Could not convert string 'BSk' to DOUBLE
[ERROR]: /data/RUSWET-AGRO/KrasnojarskiKrai#1/RUSWET-AGRO_RUSWET-AGRO_KrasnojarskiKrai#1_static_variables.csv: Error inserting raw_statics: Conversion Error: Could not convert string 'ET' to DOUBLE
[ERROR]: /data/TERENO/Merzenhausen/TERENO_TERENO_Merzenhausen_static_variables.csv: Error inserting raw_statics: Conversion Error: Could not convert string 'depth_from[m]' to DOUBLE
[ERROR]: /data/PTSMN/Site-2/PTSMN_PTSMN_Site-2_static_variables.csv: Error inserting raw_statics: Conversion Error: Could not convert string 'depth_from[m]' to DOUBLE
[ERROR]: /data/COSMOS-UK/Writtle/COSMOS-UK_COSMOS-UK_Writtle_static_variables.csv: Error inserting raw_statics: Conversion Error: Could not convert string 'Cfb' to DOUBLE
[ERROR]: /data/COSMOS-UK/Cochno/COSMOS-UK_COSMOS-UK_Cochno_static_variables.csv: Error inserting ra

In [13]:
with duckdb.connect(str(db_path), read_only=False) as db:
    raw_statics = db.sql("FROM raw_statics").df()
raw_statics

Unnamed: 0,network,station,station_name,quantity_name,unit,depth_from,depth_to,value,description,quantity_source_name,quantity_source_description,quantity_source_provider,quantity_source_version,quantity_source_resolution,quantity_source_timerange,quantity_source_url
0,AWDN,AWDN,NorthPlatte,saturation,m^3*m^-3,0.0,0.3,0.46,,HWSD,Harmonized World Soil Database v1.1 by IIASA,IIASA,v1.1,"30""",,http://webarchive.iiasa.ac.at/Research/LUC/Ext...
1,AWDN,AWDN,NorthPlatte,clay fraction,% weight,0.0,0.3,23.00,,HWSD,Harmonized World Soil Database v1.1 by IIASA,IIASA,v1.1,"30""",,http://webarchive.iiasa.ac.at/Research/LUC/Ext...
2,AWDN,AWDN,NorthPlatte,organic carbon,% weight,0.0,0.3,1.27,,HWSD,Harmonized World Soil Database v1.1 by IIASA,IIASA,v1.1,"30""",,http://webarchive.iiasa.ac.at/Research/LUC/Ext...
3,AWDN,AWDN,NorthPlatte,sand fraction,% weight,0.0,0.3,36.00,,HWSD,Harmonized World Soil Database v1.1 by IIASA,IIASA,v1.1,"30""",,http://webarchive.iiasa.ac.at/Research/LUC/Ext...
4,AWDN,AWDN,NorthPlatte,silt fraction,% weight,0.0,0.3,41.00,,HWSD,Harmonized World Soil Database v1.1 by IIASA,IIASA,v1.1,"30""",,http://webarchive.iiasa.ac.at/Research/LUC/Ext...
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
56364,RUSWET-GRASS,RUSWET-GRASS,"KUSTANAI,AGRO",land cover classification,,,,190,Urban areas,CCI_landcover_2000,ESA CCI Land Cover,ESA,2000-v1.6.1,300m,1998-2002,http://www.esa-landcover-cci.org/
56365,RUSWET-GRASS,RUSWET-GRASS,"KUSTANAI,AGRO",land cover classification,,,,190,Urban areas,CCI_landcover_2005,ESA CCI Land Cover,ESA,2005-v1.6.1,300m,2003-2007,http://www.esa-landcover-cci.org/
56366,RUSWET-GRASS,RUSWET-GRASS,"KUSTANAI,AGRO",land cover classification,,,,190,Urban areas,CCI_landcover_2010,ESA CCI Land Cover,ESA,2010-v1.6.1,300m,2008-2012,http://www.esa-landcover-cci.org/
56367,RUSWET-GRASS,RUSWET-GRASS,"KUSTANAI,AGRO",climate classification,,,,Dfb,Cold - Without dry season - Warm Summer,koeppen_geiger_2007,Koeppen-Geiger Climate Classification,,Peel2007,0.1°,,http://www.hydrol-earth-syst-sci.net/11/1633/2...


In [30]:
from metacatalog_api.models import MetadataPayload, Author, Detail, DataSource, TemporalScale
import warnings

def get_static(row: pd.Series) -> pd.DataFrame:
    sql = f"""
    FROM raw_statics WHERE 
    network='{row.network}' and station='{row.station}' and station_name='{row.station_name}'
    and ((depth_from <= {row.depth_from} and depth_to >= {row.depth_to})  or depth_from is null)
    """
    with duckdb.connect(str(db_path), read_only=True) as db:
        df = db.sql(sql).df()
    return df

def get_details(static_attributes: pd.DataFrame) -> list[Detail]:
    details = []
    for i, d in static_attributes.iterrows(): 
        k = d.quantity_source_name if 'classification' in d.quantity_name else d.quantity_name
        v = d.description if 'classification' in d.quantity_name else float(d.value)
        details.append(Detail(key=k, value=v))
    return details

def get_datasource(row: pd.Series) -> DataSource:
    # build the path
    p = Path('/data/ismn/') / row.station / row.station_name / row.filename
    
    var = variables[row.variable]
    # build the args
    args = dict(
        skiprows=1,
        sep='\s+',
        header=None,
        parse_dates={'tstamp': [0, 1]},
        names=['date', 'time', var.name.replace(' ', '_'), 'g', 'q']
    )
    # read the data in
    with warnings.catch_warnings():
        warnings.simplefilter("ignore")
        _d = pd.read_csv(p, **args)
    # build the temporal scale
    t_scale = TemporalScale(
        dimension_names=['tstamp'],
        extent=[
            _d.tstamp.min().to_pydatetime(),
            _d.tstamp.max().to_pydatetime()
        ],
        resolution=pd.to_timedelta(_d.tstamp.diff().mode().values[0]),
        support=1.0
    )

    # build the datasource
    datasource = DataSource(
        path=str(p),
        type=core.datatypes(id=3)[0],
        variable_name=[var.name.replace(' ', '_')],
        temporal_scale=t_scale,
        args=args
    )

    return datasource



# builld ismn as first author for all datasets
ismn = Author(is_organisation=True, organisation_name="International Soil Moisture Network", organisation_abbrev="ISMN") 

row = raw_header.iloc[8,:]

datasource=get_datasource(row)

statics = get_static(row)
details = [
    Detail(key="depth_from", value=row.depth_from),
    Detail(key="depth_to", value=row.depth_to),
    *get_details(statics)
]

variable = variables[row.variable]
license = core.licenses(id=9)

MetadataPayload(
    title=f"{row.station} {row.station_name} {variable.name} ({row.depth_from}-{row.depth_to}) [{row.id}]",
    abstract=f"This is an auto-generated abstract.\nStatic attributes:\n\n{statics.to_markdown()}",
    license=license,
    details=details,
    author=ismn,
    variable=variable,
    location=f"SRID=4326;POINT({row.lon} {row.lat})",
    datasource=datasource
)

  sep='\s+',


MetadataPayload(id=None, uuid=None, title='AWDN Scottsbluff volumetric water content (0.1-0.1) [9]', abstract='This is an auto-generated abstract.\nStatic attributes:\n\n|    | network   | station   | station_name   | quantity_name             | unit     |   depth_from |   depth_to | value   | description          | quantity_source_name   | quantity_source_description                        | quantity_source_provider   | quantity_source_version   | quantity_source_resolution   | quantity_source_timerange   | quantity_source_url                                                           |\n|---:|:----------|:----------|:---------------|:--------------------------|:---------|-------------:|-----------:|:--------|:---------------------|:-----------------------|:---------------------------------------------------|:---------------------------|:--------------------------|:-----------------------------|:----------------------------|:-------------------------------------------------------------

In [31]:
core.add_entry?

[0;31mSignature:[0m [0mcore[0m[0;34m.[0m[0madd_entry[0m[0;34m([0m[0mpayload[0m[0;34m:[0m [0mmetacatalog_api[0m[0;34m.[0m[0mmodels[0m[0;34m.[0m[0mMetadataPayload[0m[0;34m)[0m [0;34m->[0m [0mmetacatalog_api[0m[0;34m.[0m[0mmodels[0m[0;34m.[0m[0mMetadata[0m[0;34m[0m[0;34m[0m[0m
[0;31mDocstring:[0m <no docstring>
[0;31mFile:[0m      /usr/local/lib/python3.12/site-packages/metacatalog_api/core.py
[0;31mType:[0m      function

In [None]:
HIER WEITER
sql = """
from raw_metadata
where depth_from >= 0 and depth_to <=0.31 and station='AWDN' and station_name='NorthPlatte'
and (
    select count(*) 
    from raw_statics where 
    station='AWDN' and station_name='NorthPlatte' and
    ((depth_from >= 0 and depth_to <=0.31) or depth_from is null )
    and quantity_name='clay fraction' and value::float > 20.0
) > 0
"""
with duckdb.connect(str(db_path), read_only=False) as db:
    df = db.sql(sql).df()
df

Unnamed: 0,id,network,station,station_name,lat,lon,elevation,depth_from,depth_to,variable,device,filename
0,1,AWDN,AWDN,NorthPlatte,41.05,-100.46,861.0,0.1,0.1,sm,ThetaProbe ML2X,AWDN_AWDN_NorthPlatte_sm_0.100000_0.100000_The...
1,4,AWDN,AWDN,NorthPlatte,41.05,-100.46,861.0,0.25,0.25,sm,ThetaProbe ML2X,AWDN_AWDN_NorthPlatte_sm_0.250000_0.250000_The...


In [7]:
# finally pivot the data to have the attributes as columns
sql = """
DROP TABLE IF EXISTS static_attributes;
CREATE TABLE IF NOT EXISTS static_attributes AS 
with stats as (
    SELECT * Exclude (quantity_name), 
    replace(quantity_name, ' ', '_') as quantity_name,
    FROM raw_statics where value is NOT NULL
)
pivot stats 
on quantity_name
using mode(value)
group by network, station, station_name, depth_from, depth_to;
"""
with duckdb.connect(str(db_path), read_only=False) as db:
    db.sql(sql)
with duckdb.connect(str(db_path), read_only=False) as db:
    df = db.sql("FROM static_attributes").df()
df

Unnamed: 0,network,station,station_name,depth_from,depth_to,bulk_density,clay_fraction,climate_classification,field_capacity,land_cover_classification,organic_carbon,permanent_wilting_point,potential_plant_available_water,sand_fraction,saturation,silt_fraction
0,AWDN,AWDN,Smithfield,0.0,0.3,,23.00,,,,1.27,,,36.00,0.46,41.00
1,AWDN,AWDN,Scottsbluff,0.0,0.3,,23.00,,,,1.27,,,36.00,0.46,41.00
2,AWDN,AWDN,Brunswick,0.3,1.0,,13.00,,,,0.50,,,72.00,0.40,15.00
3,AWDN,AWDN,MitchellFarms,0.0,0.3,,21.00,,,,1.28,,,34.00,0.46,45.00
4,AWDN,AWDN,MitchellFarms,,,,,BSk,,130,,,,,,
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
12132,IMA-CAN1,IMA-CAN1,station6,0.1,0.2,1.26,32.80,,,,1.01,,,22.78,0.47,44.40
12133,IMA-CAN1,IMA-CAN1,station9,0.0,0.1,1.40,21.60,,,,1.86,,,35.10,0.48,43.40
12134,IMA-CAN1,IMA-CAN1,station2,0.1,0.2,1.29,16.60,,,,1.97,,,30.30,0.50,53.10
12135,IMA-CAN1,IMA-CAN1,station10,0.1,0.2,1.29,22.10,,,,0.84,,,38.90,0.43,39.10


In [4]:
# union both tables together
sql = """
DROP VIEW IF EXISTS metadata;
CREATE VIEW metadata AS
FROM raw_metadata m JOIN static_attributes s 
on s.network=m.network and s.station=m.station and s.station_name=m.station_name
and (m.depth_from >= s.depth_from and m.depth_to <= s.depth_to) or s.depth_from is null; 
FROM metadata;
"""
with duckdb.connect(str(db_path), read_only=False) as db:
    df = db.sql(sql).df()
df.climate_classification.unique()

: 

: 

: 

## Quacking DuckDB

In [93]:
from contextlib import contextmanager
from pydantic import BaseModel
from functools import cached_property

class Quack(BaseModel):
    db_path: Path
    # maybe we need to separate the filters per table here
    filters: list[str] = []
    selected_columns: list[str] | str = '*'

    @contextmanager
    def db(self, read_only: bool = False):
        with duckdb.connect(str(self.db_path), read_only=read_only) as con:
            yield con
    
    @cached_property
    def columns(self) -> list[str]:
        with self.db() as db:
            tup = db.sql("select column_name FROM information_schema.columns where table_name = 'metadata'").fetchall()
        return [t[0] for t in tup]
    
    @cached_property
    def col_types(self) -> dict[str, str]:
        with self.db() as db:
            tup = db.sql("select column_name, data_type FROM information_schema.columns where table_name = 'metadata'").fetchall()
        return {t[0]: t[1] for t in tup}
    
    def __build_sql_list(self, expression: str | list, q: callable = lambda x: x):
        if isinstance(expression, str):
            expression = expression.strip('! ')
        if isinstance(expression, str) and ',' in expression:
            expression = expression.split(',')
        else:
            expression = [expression]
        
        return ', '.join([q(e) for e in expression])
    
    def __build_filter(self, column_name: str, expression: str):
        # check if we need to quote the values in the expression
        if self.col_types[column_name] == 'VARCHAR':
            q = lambda x: f"'{x}'"
        else:
            q = lambda x: x

        # switch the different operators (as strings)
        if isinstance(expression, (list, tuple)):
            expression = f" IN ({self.__build_sql_list(expression, q)})"
        elif isinstance(expression, str) and '!' in expression and ',' in expression:
            expression = f" NOT IN ({self.__build_sql_list(expression, q)})"
        elif isinstance(expression, str) and ',' in expression:
            expression = f" IN ({self.__build_sql_list(expression, q)})"
        elif isinstance(expression, str) and '!' in expression:
            expression = f" != {q(expression.strip('! '))}"
        elif not isinstance(expression, str):
            expression = f"f{q(expression)}"
        elif '<' not in expression and '>' not in expression and '=' not in expression:
                expression = f"={q(expression)}"
        else:
            expression = q(expression)

        return f"m.{column_name}{expression}"

    def filter(self, filters: dict[str, str | float | list[str] | list[float]] = {}, **kwargs):
        filters.update(kwargs)
        new_filters = []
        
        for key, value in kwargs.items():
            if value is None:
                continue
            # TODO here we can handle specific cases, like between (ie. expressed as depth='0.3:0.5')
            new_filters.append(self.__build_filter(key, value))
        return Quack(db_path=self.db_path, selected_columns=self.selected_columns, filters=[*self.filters, *new_filters])

    def col(self, *column_names: str):
        # format the columns
        new_columns = []
        for column in column_names:
            if '.' not in column:
                new_columns.append(f"m.{column}")
            else:
                new_columns.append(column)
        
        # get the existing columns
        columns = self.selected_columns
        if columns == '*':
            columns = new_columns
        else:
            columns = [*columns, *new_columns]
        return Quack(db_path=self.db_path, selected_columns=columns, filters=self.filters)

    def unique(self):
        cols = ','.join(self.selected_columns) if isinstance(self.selected_columns, list) else self.selected_columns
        sql = f"SELECT DISTINCT {cols} FROM metadata m"
        if len(self.filters) > 0:
            sql += f" WHERE {' AND '.join(self.filters)}"
        
        # query the unique values
        with self.db() as duck:
            df = duck.sql(sql).df()
        
        if len(df.columns) == 1:
            return df.iloc[0, :].values.tolist()
        else:
            return df

    def df(self) -> pd.DataFrame:
        if isinstance(self.selected_columns, list):
            cols = ', '.join(self.selected_columns)
        else:
            cols = self.selected_columns
        sql = f"SELECT {cols} FROM metadata m"
        if len(self.filters) > 0:
            sql += f" WHERE {' AND '.join(self.filters)}"
        print(sql)
        with self.db() as duck:
            return duck.sql(sql).df()

duck = Quack(db_path=db_path)

df = duck.filter(depth_to='<=0.3', variable="!sm").col('station_name', 'lon', 'lat', 'variable', 'depth_to').df()
df

SELECT m.station_name, m.lon, m.lat, m.variable, m.depth_to FROM metadata m WHERE m.depth_to<=0.3 AND m.variable != 'sm'


Unnamed: 0,station_name,lon,lat,variable,depth_to
0,NQ01,91.82861,31.32611,ts,0.2000
1,NQNorth,91.86848,31.37878,ts,0.1000
2,NQ03,91.78889,31.27750,ts,0.2000
3,Schoeneseiffen,6.37559,50.51490,ts,0.2000
4,Wildenrath,6.16918,51.13274,ts,0.2000
...,...,...,...,...,...
9122,JohnsonFarm,-101.71700,40.36700,ts,0.2032
9123,JohnsonFarm,-101.71700,40.36700,ts,0.2032
9124,JohnsonFarm,-101.71700,40.36700,ts,0.0508
9125,JohnsonFarm,-101.71700,40.36700,ts,0.2032


In [97]:
duck.col('land_cover_classification').unique()

[None]

In [103]:
with duck.db() as db:
    s = db.sql("SELECT DISTINCT climate_classification from static_attributes").df() 
s

Unnamed: 0,climate_classification
0,Dfc
1,BSh
2,
3,Cfa
4,Aw
5,ET
6,Dfb
7,Dwa
8,Dwc
9,Csb
