Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Addressed feedback. #1

Merged
merged 3 commits into from
Apr 23, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 28 additions & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
language: generic
sudo: required
os:
- linux
env:
matrix:
- CONDA_PY=35
global:
secure: OqHGuTssGxl/GeL+3NmZaFpgeqADlQgG+AdfxNY33tgcaGK/CGBeJRAxaM9IAxXgW8HQybsuZNRf3mtvXZPMjuN5noOl5TYSUf1pmODg46BurMQx1FzIN+f+qBCWAsnm96q1+/RCXDwPf/nXkFpdYDtlPgaXwFdisJ5bT4yv+DVszX6qd3CceKwcCD+StmWB08p4lwcqbAU8itEovQJmaaxzoWup8bzbGprzRGenGGtiLG9MbJ1azBp8qF10xnHRXJgt/8H3mPrT8V7o8P877AVcT1uubqLA7rjak43obcVv8Z8UCZZGdTM5PH05dQO02hKL1GzoI0qA+6K4xWSGTnwcnJnKVFzPlNmlJDG3rG5mROvWH06nGEMy4r0/6bevwz3xYE9LdB2JBDpMX+nIS26ysIpslmnzXsSYxjbE9aY3PfQpJMwM5JFXYXWMZNXGJrN5Bb2+6pFLuUcSterka9Ub5ccRRhOXacXHzZx3OOdSJbJ/IF1wlO7DfjN2JouVLqKm8KkysJmQ9GhY8fcKcuzdi8AGycjSEzVo7wrSRkNZNhitYULs/i7Tco0qppZR6mgqJgNzUPspuEylMZ1aDbT9iijR6lh1t78HvezldQLIJbd8tEyvvMFvz8mnZzQtpUXeHPqQknkmb2z381dJPJYlcuP+tbUK/HDwP2Ghzmc=
install:
- |
echo "Installing a fresh version of Miniconda."
MINICONDA_URL="https://repo.continuum.io/miniconda"
MINICONDA_FILE="Miniconda3-latest-$(case $TRAVIS_OS_NAME in (linux) echo Linux;; (osx) echo MacOSX;;esac)-x86_64.sh"
curl -L -O "${MINICONDA_URL}/${MINICONDA_FILE}"
bash $MINICONDA_FILE -b
- |
echo "Configuring conda."
source $HOME/miniconda3/bin/activate root
conda install -y conda-build anaconda-client
script:
- conda build -c intake -c defaults -c conda-forge ./conda
- if [[ -n "${TRAVIS_TAG}" ]]; then anaconda -t ${ANACONDA_TOKEN} upload `conda build
-c intake/label/main --output ./conda`; fi
notifications:
email: false
on_success: change
on_failure: always
37 changes: 22 additions & 15 deletions intake_netcdf/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,18 +12,22 @@ def __init__(self):
container='python',
partition_access=True)

def open(self, urlpath, **kwargs):
def open(self, urlpath, chunks, **kwargs):
"""
Create NetCDFSource instance

Parameters
----------
table, connection, qargs, partitions
See ``NetCDFSource``.
urlpath: str
Path to source file.
chunks: int or dict
Chunks is used to load the new dataset into dask
arrays. ``chunks={}`` loads the dataset with dask using a single
chunk for all arrays.
"""
base_kwargs, source_kwargs = self.separate_base_kwargs(kwargs)
qargs = source_kwargs.pop('qargs', {})
return NetCDFSource(urlpath=urlpath,
chunks=chunks,
xarray_kwargs = source_kwargs,
metadata=base_kwargs['metadata'])

Expand All @@ -33,18 +37,21 @@ class NetCDFSource(base.DataSource):

Parameters
----------
param: type
description
urlpath: str
Path to source file.
chunks: int or dict
Chunks is used to load the new dataset into dask
arrays. ``chunks={}`` loads the dataset with dask using a single
chunk for all arrays.
"""
container = 'python'

def __init__(self, urlpath, xarray_kwargs=None, metadata=None):
def __init__(self, urlpath, chunks, xarray_kwargs=None, metadata=None):
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added chunks to signature and doc string.

self.urlpath = urlpath
self.chunks = chunks
self._kwargs = xarray_kwargs or {}
self.chunks = self._kwargs.get('chunks')
self._ds = None
super(NetCDFSource, self).__init__(
container=self.container,
container=None,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not xarray?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The container is not used since we are overloading the read* methods (Intake plugins the "hard" way). I'd rather give no information rather than false information.

metadata=metadata)

def _open_dataset(self):
Expand All @@ -70,17 +77,17 @@ def _get_schema(self):

def read(self):
self._load_metadata()
return self._ds
return self._ds.load()
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Loads all the data.


def read_chunked(self):
raise Exception('read_chunked not supported for xarray containers.')
self._load_metadata()
return self._ds
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, so this is approximately correct, but it doesn't give something you can iterate over. I'm not sure whether this should be an exception just like get_partition.


def read_partition(self, i):
raise Exception('read_partition not supported for xarray containers.')
raise NotImplementedError

def to_dask(self):
self._load_metadata()
return self._ds.to_dask_dataframe()
return self.read_chunked()
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

read_chunked is will return the dataset as a lazy container, which I believe is also what we want for the to_dask case.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree, this should just return the xarray directly (which points to dask arrays); this would be self._ds if you agree that read_chunked should be an exception.


def close(self):
self._ds.close()
3 changes: 2 additions & 1 deletion tests/data/catalog.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,5 @@ sources:
description: example netcdf source plugin
driver: netcdf
args:
urlpath: !template '{{ CATALOG_DIR }}/example_1.nc'
urlpath: !template '{{ CATALOG_DIR }}/example_1.nc'
chunks: {}
16 changes: 11 additions & 5 deletions tests/test_intake_netcdf.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@

from dask.dataframe.core import DataFrame
import numpy as np
import os
import pytest
Expand Down Expand Up @@ -31,12 +30,19 @@ def test_read(source, dataset):
assert np.all(ds.rh == dataset.rh)

def test_read_chunked():
source = NetCDFSource(TEST_URLPATH, xarray_kwargs={'chunks': {'lon': 2}})
ds = source.read()
source = NetCDFSource(TEST_URLPATH, chunks={'lon': 2})
ds = source.read_chunked()
dataset = xr.open_dataset(TEST_URLPATH, chunks={'lon': 2})

assert ds.temp.chunks == dataset.temp.chunks

def test_read_partition(source):
with pytest.raises(NotImplementedError):
ds = source.read_partition(None)

def test_to_dask(source, dataset):
df = source.to_dask()
assert type(df) == DataFrame
ds = source.to_dask()

assert ds.dims == dataset.dims
assert np.all(ds.temp == dataset.temp)
assert np.all(ds.rh == dataset.rh)
2 changes: 1 addition & 1 deletion tests/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@

@pytest.fixture
def source():
return NetCDFSource(TEST_URLPATH)
return NetCDFSource(TEST_URLPATH, {})

@pytest.fixture
def dataset():
Expand Down