Skip to content

Commit

Permalink
Merge pull request #148 from roocs/intake
Browse files Browse the repository at this point in the history
using intake catalog
  • Loading branch information
agstephens committed Mar 23, 2021
2 parents c154aa4 + 3d05a9f commit 185303e
Show file tree
Hide file tree
Showing 17 changed files with 421 additions and 277 deletions.
4 changes: 4 additions & 0 deletions environment.yml
Original file line number Diff line number Diff line change
Expand Up @@ -24,5 +24,9 @@ dependencies:
- prov>=2.0.0
- pydot
- graphviz
# catalog
- intake
- pandas
- aiohttp
# tests
- pytest
4 changes: 4 additions & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,7 @@ networkx
# provenance
prov>=2.0.0
pydot
# catalog
intake
pandas
aiohttp
19 changes: 19 additions & 0 deletions rook/catalog/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
from rook.exceptions import InvalidCollection

from .intake import IntakeCatalog
from .db import DBCatalog


def get_catalog(project):
if project == "c3s-cmip6":
catalog = DBCatalog(project)
else:
raise InvalidCollection()
return catalog


__all__ = [
get_catalog,
IntakeCatalog,
DBCatalog,
]
56 changes: 56 additions & 0 deletions rook/catalog/base.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
import os
from rook import CONFIG


def make_list(value):
if isinstance(value, list):
val = value
else:
val = [value]
return val


class Catalog:
def __init__(self, project):
self.project = project

def _query(self, collection, time=None):
raise NotImplementedError

def search(self, collection, time=None):
cols = make_list(collection)
records = self._query(cols, time)
result = Result(self.project, records)
return result


class Result:
def __init__(self, project, records):
"""records are an OrderedDict of dataset ids with a list of files:
{'ds_id': [files]}
"""
self.base_dir = CONFIG.get(f"project:{project}", {}).get("base_dir")
self.base_url = CONFIG.get(f"project:{project}", {}).get("data_node_root")
self.records = records

@property
def matches(self):
"""Return number of matched records."""
return len(self.records)

def __len__(self):
return self.matches

def _records(self, prefix):
new_records = {}
for ds_id, fpaths in self.records.items():
new_records[ds_id] = [os.path.join(prefix, fpath) for fpath in fpaths]
return new_records

def files(self):
"""Return matched records with file path."""
return self._records(prefix=self.base_dir)

def download_urls(self):
"""Return matched records with download URL."""
return self._records(prefix=self.base_url)
81 changes: 81 additions & 0 deletions rook/catalog/db.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
import sqlalchemy
from sqlalchemy.types import Integer, Text, String, DateTime
import pandas as pd

from pywps.dblog import get_session

from .base import Catalog
from .intake import IntakeCatalog
from .util import parse_time, MIN_DATETIME, MAX_DATETIME


class DBCatalog(Catalog):
def __init__(self, project, url=None):
super(DBCatalog, self).__init__(project)
self.table_name = f"rook_catalog_{self.project}".replace("-", "_")
self.intake_catalog = IntakeCatalog(project, url)

def exists(self):
session = get_session()
engine = get_session().get_bind()
try:
ins = sqlalchemy.inspect(engine)
exists_ = ins.dialect.has_table(engine.connect(), self.table_name)
except Exception:
exists_ = False
finally:
session.close()
return exists_

def update(self):
if not self.exists():
self.to_db()

def to_db(self):
df = self.intake_catalog.load()
# workaround for NaN values when no time axis (fx datasets)
sdf = df.fillna({"start_time": MIN_DATETIME, "end_time": MAX_DATETIME})
sdf = sdf.set_index("ds_id")
# db connection
session = get_session()
try:
sdf.to_sql(
self.table_name,
session.connection(),
if_exists="replace",
index=True,
chunksize=500,
)
session.commit()
finally:
session.close()

def _query(self, collection, time=None):
"""
https://stackoverflow.com/questions/8603088/sqlalchemy-in-clause
"""
self.update()
start, end = parse_time(time)
session = get_session()
try:
if len(collection) > 1:
query_ = (
f"SELECT * FROM {self.table_name} WHERE ds_id IN {tuple(collection)} "
f"and end_time>='{start}' and start_time<='{end}'"
)
else:
query_ = (
f"SELECT * FROM {self.table_name} WHERE ds_id='{collection[0]}' "
f"and end_time>='{start}' and start_time<='{end}'"
)
result = session.execute(query_).fetchall()
except Exception:
result = []
finally:
session.close()
records = {}
for row in result:
if row.ds_id not in records:
records[row.ds_id] = []
records[row.ds_id].append(row.path)
return records
46 changes: 46 additions & 0 deletions rook/catalog/intake.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
import intake

# from intake.config import conf as intake_config

from rook import CONFIG

from .base import Catalog
from .util import parse_time, MIN_DATETIME, MAX_DATETIME


class IntakeCatalog(Catalog):
def __init__(self, project, url=None):
super(IntakeCatalog, self).__init__(project)
self.url = url or CONFIG.get("catalog", {}).get("intake_catalog_url")
self._cat = None
self._store = {}
# intake_config["cache_dir"] = "/tmp/inventory_cache"

@property
def catalog(self):
if not self._cat:
self._cat = intake.open_catalog(self.url)
return self._cat

def load(self):
if self.project not in self._store:
self._store[self.project] = self.catalog[self.project].read()
return self._store[self.project]

def _query(self, collection, time=None):
df = self.load()
start, end = parse_time(time)
# workaround for NaN values when no time axis (fx datasets)
sdf = df.fillna({"start_time": MIN_DATETIME, "end_time": MAX_DATETIME})
# search
result = sdf.loc[
(sdf.ds_id.isin(collection))
& (sdf.end_time >= start)
& (sdf.start_time <= end)
]
records = {}
for _, row in result.iterrows():
if row.ds_id not in records:
records[row.ds_id] = []
records[row.ds_id].append(row.path)
return records
16 changes: 16 additions & 0 deletions rook/catalog/util.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
import datetime

from roocs_utils.parameter import time_parameter

MIN_DATETIME = datetime.datetime(datetime.MINYEAR, 1, 1).isoformat()
MAX_DATETIME = datetime.datetime(datetime.MAXYEAR, 12, 30).isoformat()


def parse_time(time):
# TODO: refactor code ... maybe we need this only in the catalog.
if time:
start, end = time_parameter.TimeParameter(time).tuple
else:
start = MIN_DATETIME
end = MAX_DATETIME
return start, end
27 changes: 18 additions & 9 deletions rook/director/director.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,10 @@

from rook import CONFIG
from rook.exceptions import InvalidCollection
from rook.catalog import get_catalog

from ..utils.input_utils import clean_inputs
from .alignment import SubsetAlignmentChecker
from .inventory import Inventory


def wrap_director(collection, inputs, runner):
Expand All @@ -34,10 +34,11 @@ def __init__(self, coll, inputs):
self.use_original_files = False
self.original_file_urls = None
self.output_uris = None
self.search_result = None

if CONFIG[f"project:{self.project}"].get("use_inventory"):
try:
self.inv = Inventory(self.project)
self.catalog = get_catalog(self.project)
except Exception:
raise InvalidCollection()

Expand All @@ -62,14 +63,18 @@ def _resolve(self):
ProcessError: [description]
ProcessError: [description]
"""
# Raise exception if any of the data is not in the inventory
if not self.inv.contains(self.coll):
# search
self.search_result = self.catalog.search(
collection=self.coll, time=self.inputs.get("time")
)
# Raise exception if any of the dataset ids is not in the inventory
if len(self.search_result) != len(self.coll):
raise InvalidCollection()

# If original files are requested then go straight there
if self.inputs.get("original_files"):
self.original_file_urls = self.search_result.download_urls()
self.use_original_files = True
self.original_file_urls = self.inv.get_file_urls(self.coll)
return

# Raise exception if "pre_checked" selected but data has not been characterised by dachar
Expand All @@ -94,7 +99,7 @@ def _resolve(self):
# If we got here: then WPS will be used, because `self.use_original_files == False`

def requires_fixes(self):
for ds_id in self.inv.get_matches(self.coll):
for ds_id in self.search_result.files():
fix = fixer.Fixer(ds_id)

if fix.pre_processor or fix.post_processors:
Expand All @@ -118,11 +123,10 @@ def request_aligns_with_files(self):
return: boolean
"""
files = self.inv.get_file_urls(self.coll)
required_files = OrderedDict()

for ds_id, fpaths in files.items():
sac = SubsetAlignmentChecker(fpaths, self.inputs)
for ds_id, urls in self.search_result.download_urls().items():
sac = SubsetAlignmentChecker(urls, self.inputs)

if not sac.is_aligned:
self.use_original_files = False
Expand Down Expand Up @@ -152,6 +156,11 @@ def process(self, runner):
# else: generate the new subset of files
else:
clean_inputs(self.inputs)
# use search result if available
if self.search_result:
self.inputs["collection"] = []
for ds_id, file_uris in self.search_result.files().items():
self.inputs["collection"].extend(file_uris)
try:
file_uris = runner(self.inputs)
except Exception as e:
Expand Down
55 changes: 0 additions & 55 deletions rook/director/inv_cache.py

This file was deleted.

0 comments on commit 185303e

Please sign in to comment.