-
Notifications
You must be signed in to change notification settings - Fork 2
/
dal_catalog.py
112 lines (95 loc) · 3.96 KB
/
dal_catalog.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
import functools
import pkg_resources
import yaml
from intake import Catalog
from intake.utils import yaml_load
from intake_nested_yaml_catalog.nested_yaml_catalog import (
NestedYAMLFileCatalog,
)
from intake_dal.dal_source import DalSource
class DalCatalog(NestedYAMLFileCatalog):
"""
DalCatalog combines the functionality of a nested hierarchical catalog
along with a the "dal" DataSource.
"""
name = "dal_cat"
version = pkg_resources.get_distribution("intake-dal").version
def __init__(self, path=None, catalog_data=None, storage_mode=None, autoreload=True, **kwargs):
"""
Parameters
----------
path: str
Location of the file to parse (can be remote)
catalog_data: dict
If catalog data is in memory, pass it through `catalog_data` to populate the intake catalog.
If the dataset/catalog is in the local or a specific url is given, please use the `path` argument.
reload: bool
Whether to watch the source file for changes; make False if you want
an editable Catalog
storage_mode: str
The dal default storage mode override for this instantiation of the
catalog.
Example catalog:
sources:
user_events:
driver: dal
args:
default: 'local'
storage:
local: 'csv://{{ CATALOG_DIR }}/data/user_events.csv'
serving: 'in-memory-kv://foo'
batch: 'parquet://{{ CATALOG_DIR }}/data/user_events.parquet'
Following overrides the default from 'local' to 'serving'.
>>> cat = DalCatalog(path, storage_mode="serving")
>>> df = cat.user_events.read()
"""
self.storage_mode = storage_mode
self.is_path = False
if catalog_data and not path:
# A user passes catalog data, not passes path info.
self.path_or_catalog = catalog_data
else:
# A user passes path and url.
# In this case, ignore catalog_data.
self.path_or_catalog = path
self.is_path = True
super(DalCatalog, self).__init__(self.path_or_catalog, autoreload, **kwargs)
def __getitem__(self, key):
# TODO(Taleb Zeghmi): Remove once https://github.com/zillow/intake-nested-yaml-catalog/issues/6 is resolved
if len(key.split(".")) > 1:
return self._construct_dataset(key, self)
else:
ret = super().__getitem__(key)
return ret
def _load(self, reload=False):
if self.is_path:
# File path or url. Load and parse.
super()._load()
else:
# It's catalog data and not requires directory/url information.
# Set self._dir to an empty value
self._dir = ""
self.parse(yaml.dump(self.path_or_catalog))
def parse(self, text):
data = yaml_load(text)
# modify sources default storage mode
self._set_dal_default_storage_mode(data)
transformed_text = yaml.dump(data, default_flow_style=False)
# Reuse default NestedYAMLFileCatalog YAML parser
# parse() does the heavy lifting of populating the catalog
super().parse(transformed_text)
def _set_dal_default_storage_mode(self, data):
"""
Traverses the catalog to set all default dal source
storage modes to self.storage_mode
"""
if self.storage_mode:
for k, v in data.items():
if isinstance(v, dict) and v.get("driver", None) == "dal":
v["args"]["default"] = self.storage_mode
elif isinstance(v, dict):
self._set_dal_default_storage_mode(v)
@staticmethod
def _construct_dataset(canonical_name: str, catalog: Catalog) -> DalSource:
catalog_entity = functools.reduce(lambda acc, x: acc[x], canonical_name.split("."), catalog)
return catalog_entity