Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor existing functionality to make intake-esm robust and extensible #77

Merged
merged 21 commits into from
Jun 14, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion ci/environment-dev-3.6.yml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ dependencies:
- pytest-cov
- pytest-icdiff
- python=3.6
- pytoml
- pyyaml
- tqdm
- xarray
- docrep
2 changes: 1 addition & 1 deletion ci/environment-dev-3.7.yml
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,11 @@ dependencies:
- pytest-cov
- pytest-icdiff
- python=3.7
- pytoml
- pyyaml
- recommonmark
- sphinx_rtd_theme
- sphinx>=1.6
- tornado==5.1.1 # Pin version until latest release is stable
- tqdm
- xarray
- docrep
27 changes: 0 additions & 27 deletions example_input/collection_input_cesm_dple.yml

This file was deleted.

303 changes: 87 additions & 216 deletions intake_esm/cesm.py
Original file line number Diff line number Diff line change
@@ -1,228 +1,62 @@
#!/usr/bin/env python
""" Implementation for NCAR's Community Earth System Model (CESM) data holdings """

import logging
import os
import re

import numpy as np
import pandas as pd
import xarray as xr
from tqdm.autonotebook import tqdm

from . import aggregate, config
from .common import BaseSource, Collection, StorageResource, get_subset

logger = logging.getLogger(__name__)
logger.setLevel(level=logging.WARNING)
from .collection import Collection, docstrings, get_subset
from .source import BaseSource


class CESMCollection(Collection):
""" Defines a CESM collection

Parameters
----------
collection_spec : dict


See Also
--------
intake_esm.core.ESMMetadataStoreCatalog
intake_esm.cmip.CMIP5Collection
intake_esm.cmip.CMIP6Collection
__doc__ = docstrings.with_indents(
""" Builds a collection for data produced by
NCAR's Community Earth System Model (CESM).
%(Collection.parameters)s
"""
)

def __init__(self, collection_spec):
super(CESMCollection, self).__init__(collection_spec)
self.component_streams = self.collection_definition.get(
config.normalize_key('component_streams'), None
)
self.replacements = self.collection_definition.get('replacements', {})
self.include_cache_dir = self.collection_spec.get('include_cache_dir', False)
self.df = pd.DataFrame(columns=self.columns)

def build(self):
self._validate()
# Loop over data sources/experiments
for experiment, experiment_attrs in self.collection_spec['data_sources'].items():
logger.warning(f'Working on experiment: {experiment}')

component_attrs = experiment_attrs['component_attrs']
ensembles = experiment_attrs['case_members']
self.assemble_file_list(experiment, experiment_attrs, component_attrs, ensembles)
logger.warning(self.df.info())
self.persist_db_file()
return self.df

def assemble_file_list(self, experiment, experiment_attrs, component_attrs, ensembles):
df_files = {}
for location in experiment_attrs['locations']:
res_key = ':'.join([location['name'], location['loc_type'], location['urlpath']])
if res_key not in df_files:
logger.warning(f'Getting file listing : {res_key}')

if 'exclude_dirs' not in location:
location['exclude_dirs'] = []

resource = StorageResource(
urlpath=location['urlpath'],
loc_type=location['loc_type'],
exclude_dirs=location['exclude_dirs'],
)

df_files[res_key] = self._assemble_collection_df_files(
resource_key=res_key,
resource_type=location['loc_type'],
direct_access=location['direct_access'],
filelist=resource.filelist,
)

# Include user defined data cache directories
if self.include_cache_dir:
res_key = ':'.join(['CACHE', 'posix', self.data_cache_dir])
if res_key not in df_files:
logger.warning(f'Getting file listing : {res_key}')
resource = StorageResource(
urlpath=self.data_cache_dir, loc_type='posix', exclude_dirs=[]
)

df_files[res_key] = self._assemble_collection_df_files(
resource_key=res_key,
resource_type='posix',
direct_access=True,
filelist=resource.filelist,
)

# Loop over ensemble members
for ensemble, ensemble_attrs in enumerate(ensembles):
input_attrs_base = {'experiment': experiment}

# Get attributes from ensemble_attrs
case = ensemble_attrs['case']

if 'ensemble' not in ensemble_attrs:
input_attrs_base.update({'ensemble': ensemble})

if 'sequence_order' not in ensemble_attrs:
input_attrs_base.update({'sequence_order': 0})

if 'has_ocean_bgc' not in ensemble_attrs:
input_attrs_base.update({'has_ocean_bgc': False})

if 'ctrl_branch_year' not in ensemble_attrs:
input_attrs_base.update({'ctrl_branch_year': np.datetime64('NaT')})

for res_key, df_f in df_files.items():
# Find entries relevant to *this* ensemble:
# "case" matches
condition = df_f['case'] == case

# If there are any matching files, append to self.df
if any(condition):
input_attrs = dict(input_attrs_base)

input_attrs.update(
{
key: val
for key, val in ensemble_attrs.items()
if key in self.columns and key not in df_f.columns
}
)

# Relevant files
temp_df = pd.DataFrame(df_f.loc[condition])

# Append data coming from input file (input_attrs)
for col, val in input_attrs.items():
temp_df.insert(loc=0, column=col, value=val)

# Add data from "component_attrs" to appropriate column
for component in temp_df.component.unique():
if component not in component_attrs:
continue

for key, val in component_attrs[component].items():
if key in self.columns:
loc = temp_df['component'] == component
temp_df.loc[loc, key] = val

# Append
self.df = pd.concat([temp_df, self.df], ignore_index=True, sort=False)

# Make replacements
self.df.replace(self.replacements, inplace=True)

# Reorder columns
self.df = self.df[self.columns]

# Remove duplicates
self.df = self.df.drop_duplicates(
subset=['resource', 'file_fullpath'], keep='last'
).reset_index(drop=True)

def _assemble_collection_df_files(self, resource_key, resource_type, direct_access, filelist):
entries = {
key: []
for key in [
'resource',
'resource_type',
'direct_access',
'case',
'component',
'stream',
'variable',
'date_range',
'file_basename',
'file_dirname',
'file_fullpath',
]
}

# If there are no files, return empty dataframe
if not filelist:
return pd.DataFrame(entries)

logger.warning(f'Building file database : {resource_key}')
for f in filelist:
fileparts = self._get_filename_parts(os.path.basename(f), self.component_streams)

if fileparts is None or len(fileparts) == 0:
continue

entries['resource'].append(resource_key)
entries['resource_type'].append(resource_type)
entries['direct_access'].append(direct_access)

entries['case'].append(fileparts['case'])
entries['component'].append(fileparts['component'])
entries['stream'].append(fileparts['stream'])
entries['variable'].append(fileparts['variable'])
entries['date_range'].append(fileparts['datestr'])

entries['file_basename'].append(os.path.basename(f))
entries['file_dirname'].append(os.path.dirname(f) + '/')
entries['file_fullpath'].append(f)

return pd.DataFrame(entries)

def _get_filename_parts(self, filename, component_streams):
def _get_file_attrs(self, filepath):
""" Extract each part of case.stream.variable.datestr.nc file pattern. """
file_basename = os.path.basename(filepath)
keys = list(set(self.columns) - set(['resource', 'resource_type', 'direct_access']))
fileparts = {key: None for key in keys}
fileparts['file_basename'] = file_basename
fileparts['file_dirname'] = os.path.dirname(filepath) + '/'
fileparts['file_fullpath'] = filepath

# Get Date string
datestr = CESMCollection._extract_date_str(filename)
date_str_regex = r'\d{4}\-\d{4}|\d{6}\-\d{6}|\d{8}\-\d{8}|\d{10}\-\d{10}|\d{12}\-\d{12}'
datestr = CESMCollection._extract_attr_with_regex(file_basename, regex=date_str_regex)

if datestr:
for component, streams in component_streams.items():
fileparts['date_range'] = datestr

for component, streams in self.component_streams.items():
# Loop over stream strings
# NOTE: The order matters here!
for stream in sorted(streams, key=lambda s: len(s), reverse=True):

# Search for case.stream part of filename
s = filename.find(stream)
s = file_basename.find(stream)

if s >= 0: # Got a match
# Get varname.datestr.nc part of filename
case = filename[0 : s - 1]
case = file_basename[0 : s - 1]
idx = len(stream)
variable_datestr_nc = filename[s + idx + 1 :]
variable_datestr_nc = file_basename[s + idx + 1 :]
variable = variable_datestr_nc[: variable_datestr_nc.find('.')]

# Assert expected pattern
Expand All @@ -232,33 +66,70 @@ def _get_filename_parts(self, filename, component_streams):

# Ensure that filename conforms to expected pattern
if datestr_nc != f'{datestr}.nc':
logger.warning(
f'Filename : {filename} does not conform to expected pattern'
print(
f'Filename : {file_basename} does not conform to expected pattern'
)
return

return {
'case': case,
'component': component,
'stream': stream,
'variable': variable,
'datestr': datestr,
}

logger.warning(f'Could not identify CESM fileparts for : {filename}')
return
else:
return

@staticmethod
def _extract_date_str(filename):
""" Extract a date string from a file name"""
try:
b = filename.split('.')[-2]
return b
except Exception:
logger.warning(f'Could not extract date string from : {filename}')
return

else:
fileparts['case'] = case
fileparts['component'] = component
fileparts['stream'] = stream
fileparts['variable'] = variable

return fileparts

def _add_extra_attributes(self, data_source, df, extra_attrs):

res_df = pd.DataFrame(columns=self.columns)
ensembles = extra_attrs['case_members']
component_attrs = extra_attrs['component_attrs']

for ensemble, ensemble_attrs in enumerate(ensembles):
input_attrs_base = {'experiment': data_source}
case = ensemble_attrs['case']

if 'ensemble' not in ensemble_attrs:
input_attrs_base.update({'ensemble': ensemble})

if 'sequence_order' not in ensemble_attrs:
input_attrs_base.update({'sequence_order': 0})

if 'has_ocean_bgc' not in ensemble_attrs:
input_attrs_base.update({'has_ocean_bgc': False})

# Find entries relevant to *this* ensemble:
# "case" matches
condition = df['case'] == case

# If there are any matching files, append to self.df
if any(condition):
input_attrs = dict(input_attrs_base)

input_attrs.update(
{key: val for key, val in ensemble_attrs.items() if key in self.columns}
)

# Relevant files
temp_df = pd.DataFrame(df.loc[condition])

# Append data coming from input file (input_attrs)
for col, val in input_attrs.items():
temp_df[col] = val

# Add data from "component_attrs" to appropriate column
for component in temp_df['component'].unique():
if component not in component_attrs:
continue

for key, val in component_attrs[component].items():
if key in self.columns:
loc = temp_df['component'] == component
temp_df.loc[loc, key] = val

res_df = pd.concat([temp_df, res_df], ignore_index=True, sort=False)

res_df.replace(self.replacements, inplace=True)
return res_df


class CESMSource(BaseSource):
Expand Down
Loading