diff --git a/appveyor.yml b/appveyor.yml index a43cd9325da26..ed2bc28f29394 100644 --- a/appveyor.yml +++ b/appveyor.yml @@ -30,7 +30,8 @@ test_script: - "%CONDA% install conda=4.3.22 --channel conda-forge" - "%CONDA% create --name \"ibis_%PYTHON_VERSION%\" python=%PYTHON_VERSION% --channel conda-forge" - "%ACTIVATE% \"ibis_%PYTHON_VERSION%\"" - - "pip install -e .\"[sqlite, postgres, visualization, pandas]\"" + - "%CONDA% install pytables" + - "pip install -e .\"[sqlite, postgres, visualization, pandas, file]\"" - "pip install flake8 mock pytest click \"pbs==0.110\"" - "flake8" - "python ci\\datamgr.py download --directory \"%USERPROFILE%\"" diff --git a/ci/requirements-dev-2.7.yml b/ci/requirements-dev-2.7.yml index afb6f280f6646..d6c14fd1d3788 100644 --- a/ci/requirements-dev-2.7.yml +++ b/ci/requirements-dev-2.7.yml @@ -15,6 +15,8 @@ dependencies: - pytest - python=2.7 - python-graphviz + - pytables + - pathlib2 - sh - six - sqlalchemy>=1.0.0 diff --git a/ci/requirements-dev-3.6.yml b/ci/requirements-dev-3.6.yml index 308e1559f7673..d9fdcda7537f7 100644 --- a/ci/requirements-dev-3.6.yml +++ b/ci/requirements-dev-3.6.yml @@ -13,6 +13,7 @@ dependencies: - pytest - python=3.6 - python-graphviz + - pytables - sh - six - sqlalchemy>=1.0.0 diff --git a/conda-recipes/ibis-framework/meta.yaml b/conda-recipes/ibis-framework/meta.yaml index a129ebb754224..64ab121ec1f9d 100644 --- a/conda-recipes/ibis-framework/meta.yaml +++ b/conda-recipes/ibis-framework/meta.yaml @@ -13,6 +13,7 @@ source: requirements: build: - enum34 # [py27] + - pathlib2 # [py27] - numpy >=1.10.0 - pandas >=0.18.1 - python @@ -21,6 +22,7 @@ requirements: - toolz run: - enum34 # [py27] + - pathlib2 # [py27] - numpy >=1.10.0 - pandas >=0.18.1 - python @@ -56,6 +58,8 @@ test: - ibis.sql.tests - ibis.sql.vertica - ibis.sql.vertica.tests + - ibis.file + - ibis.file.tests - ibis.tests commands: - pytest --version diff --git a/ibis/file/__init__.py b/ibis/file/__init__.py new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/ibis/file/api.py b/ibis/file/api.py new file mode 100644 index 0000000000000..28b2a37291b44 --- /dev/null +++ b/ibis/file/api.py @@ -0,0 +1,42 @@ +from ibis.expr.types import TableExpr +from ibis.expr.api import _add_methods +from ibis.file import operations as _ops + + +# --------------------------------------------------------------------- +# IO API + +def to_csv(self, path): + """ + Write as a csv to the indicated path + + Parameters + ---------- + path : pathlib.Path + + """ + + return _ops.CSV(self, str(path)).to_expr() + + +def to_hdf(self, path, key): + """ + Write as a HDF5 file to the indicated path + + Parameters + ---------- + path : pathlib.Path + key : string + + """ + + return _ops.HDF(self, str(path), key).to_expr() + + +_io_ops = dict( + to_csv=to_csv, + to_hdf=to_hdf, +) + + +_add_methods(TableExpr, _io_ops) diff --git a/ibis/file/client.py b/ibis/file/client.py new file mode 100644 index 0000000000000..4b84bd7a4268b --- /dev/null +++ b/ibis/file/client.py @@ -0,0 +1,210 @@ +try: + import pathlib +except ImportError: + + # py2 compat + import pathlib2 as pathlib + +import ibis +import ibis.expr.types as ir +import ibis.expr.operations as ops +from ibis.pandas.execution import execute +from ibis.pandas.core import data_preload +from ibis.pandas.client import pandas_dtypes_to_ibis_schema +import ibis.file.execution # noqa +import ibis.file.api # noqa +import pandas as pd + + +class HDFTable(ops.DatabaseTable): + pass + + +class CSVTable(ops.DatabaseTable): + pass + + +class Client(ibis.client.Client): + + def __init__(self, root): + super(Client, self).__init__() + self.root = pathlib.Path(str(root)) + self.dictionary = {} + + def database(self, name=None, path=None): + if name is None: + return Database('root', self, path=path) + + if name not in self.list_databases(path): + raise AttributeError(name) + if path is None: + path = self.root + + new_name = "{}.{}".format(name, self.extension) + if (self.root / name).is_dir(): + path = path / name + elif not str(path).endswith(new_name): + path = path / new_name + + return Database(name, self, path=path) + + def table(self, name, path): + raise NotImplementedError + + def execute(self, expr, params=None, **kwargs): + assert isinstance(expr, ir.Expr) + return execute(expr, params=params) + + def list_tables(self, path=None): + raise NotImplementedError + + def list_databases(self, path=None): + raise NotImplementedError + + +class HDFClient(Client): + extension = 'h5' + + def table(self, name, path): + if name not in self.list_tables(path): + raise AttributeError(name) + + # get the schema + with pd.HDFStore(str(path), mode='r') as store: + df = store.select(name, start=0, stop=0) + schema = pandas_dtypes_to_ibis_schema(df, {}) + + t = HDFTable(name, schema, self).to_expr() + self.dictionary[name] = path + return t + + def list_tables(self, path=None): + # tables are individual tables within a file + + if path is None: + path = self.root + + if (path.is_file() and str(path).endswith(self.extension)): + + with pd.HDFStore(str(path), mode='r') as store: + # strip leading / + return [k[1:] for k in store.keys()] + + return [] + + def list_databases(self, path=None): + # databases are dir & a .h5 file + if path is None: + path = self.root + + tables = [] + if path.is_dir(): + for d in path.iterdir(): + if d.is_dir(): + tables.append(d.name) + elif d.is_file(): + if str(d).endswith(self.extension): + tables.append(d.stem) + elif path.is_file(): + # by definition we are at the db level at this point + pass + + return tables + + +class CSVClient(Client): + extension = 'csv' + + def table(self, name, path=None): + if name not in self.list_tables(path): + raise AttributeError(name) + + if path is None: + path = self.root + + # get the schema + f = path / "{}.{}".format(name, self.extension) + df = pd.read_csv(str(f), header=0, nrows=10) + schema = pandas_dtypes_to_ibis_schema(df, {}) + + t = CSVTable(name, schema, self).to_expr() + self.dictionary[name] = f + return t + + def list_tables(self, path=None): + # tables are files in a dir + if path is None: + path = self.root + + tables = [] + if path.is_dir(): + for d in path.iterdir(): + if d.is_file(): + if str(d).endswith(self.extension): + tables.append(d.stem) + elif path.is_file(): + if str(path).endswith(self.extension): + tables.append(path.stem) + return tables + + def list_databases(self, path=None): + # databases are dir + if path is None: + path = self.root + + tables = [] + if path.is_dir(): + for d in path.iterdir(): + if d.is_dir(): + tables.append(d.name) + return tables + + +class Database(ibis.client.Database): + + def __init__(self, name, client, path=None): + super(Database, self).__init__(name, client) + self.path = path + + def __str__(self): + return '{0.__class__.__name__}({0.name})'.format(self) + + def __dir__(self): + dbs = self.list_databases(path=self.path) + tables = self.list_tables(path=self.path) + return sorted(list(set(dbs).union(set(tables)))) + + def __getattr__(self, name): + try: + return object.__getattribute__(self, name) + except AttributeError: + try: + return self.table(name, path=self.path) + except AttributeError: + return self.database(name, path=self.path) + + def table(self, name, path): + return self.client.table(name, path=path) + + def database(self, name=None, path=None): + return self.client.database(name=name, path=path) + + def list_databases(self, path=None): + if path is None: + path = self.path + return sorted(self.client.list_databases(path=path)) + + def list_tables(self, path=None): + if path is None: + path = self.path + return sorted(self.client.list_tables(path=path)) + + +@data_preload.register(HDFTable, (pathlib.Path, str)) +def hdf_data_preload_uri_client(table, path, scope=None, **kwargs): + return pd.read_hdf(str(path), table.name, mode='r') + + +@data_preload.register(CSVTable, (pathlib.Path, str)) +def csv_data_preload_uri_client(table, path, scope=None, **kwargs): + return pd.read_csv(str(path), header=0) diff --git a/ibis/file/execution.py b/ibis/file/execution.py new file mode 100644 index 0000000000000..b8066bfd70584 --- /dev/null +++ b/ibis/file/execution.py @@ -0,0 +1,16 @@ +import pandas as pd +from ibis.file import operations as ops +from ibis.pandas.dispatch import execute_node + + +@execute_node.register(ops.CSV, pd.DataFrame) +def execute_to_csv_dataframe(op, data, **kwargs): + path = op.path + data.to_csv(path, index=False) + + +@execute_node.register(ops.HDF, pd.DataFrame) +def execute_to_hdf_dataframe(op, data, **kwargs): + path = op.path + key = op.key + data.to_hdf(path, key, format='table', data_columns=True) diff --git a/ibis/file/operations.py b/ibis/file/operations.py new file mode 100644 index 0000000000000..9f12b02e7acb7 --- /dev/null +++ b/ibis/file/operations.py @@ -0,0 +1,30 @@ +from ibis.expr import rules +from ibis.expr.operations import TableNode as _TableNode +import ibis.expr.types as ir + + +class TableNode(_TableNode): + + def root_tables(self): + return ir.distinct_roots(self.table) + + +class CSV(TableNode): + + def __init__(self, table, path): + if not rules.is_table(table): + raise ValueError("must pass a table") + self.table = table + self.path = path + super(CSV, self).__init__([self.table, self.path]) + + +class HDF(TableNode): + + def __init__(self, table, path, key): + if not rules.is_table(table): + raise ValueError("must pass a table") + self.table = table + self.path = path + self.key = key + super(HDF, self).__init__([self.table, self.path, self.key]) diff --git a/ibis/file/tests/__init__.py b/ibis/file/tests/__init__.py new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/ibis/file/tests/conftest.py b/ibis/file/tests/conftest.py new file mode 100644 index 0000000000000..adfef6f16d5d5 --- /dev/null +++ b/ibis/file/tests/conftest.py @@ -0,0 +1,53 @@ +import pytest +import pandas as pd +import numpy as np +from ibis.file.client import HDFClient, CSVClient + + +@pytest.fixture +def df(): + # basic time/ticker frame + rng = pd.date_range('20170101', periods=10, freq='D') + tickers = ['GOOGL', 'FB', 'APPL', 'NFLX', 'AMZN'] + return pd.DataFrame( + {'time': np.repeat(rng, len(tickers)), + 'ticker': np.tile(tickers, len(rng))}) + + +@pytest.fixture +def closes(df): + return df.assign(**{'close': np.random.randn(len(df))}) + + +@pytest.fixture +def opens(df): + return df.assign(**{'open': np.random.randn(len(df))}) + + +@pytest.fixture +def data(opens, closes): + return {'open': opens, 'close': closes} + + +@pytest.fixture +def hdf(tmpdir, data): + + hdf = tmpdir.mkdir('hdf') + f = hdf / 'prices.h5' + + for k, v in data.items(): + v.to_hdf(str(f), k, format='table', data_columns=True) + + return HDFClient(tmpdir).database() + + +@pytest.fixture +def csv(tmpdir, data): + + csv = tmpdir.mkdir('csv') + + for k, v in data.items(): + f = csv / '{}.csv'.format(k) + v.to_csv(str(f), index=False) + + return CSVClient(tmpdir).database() diff --git a/ibis/file/tests/test_basic.py b/ibis/file/tests/test_basic.py new file mode 100644 index 0000000000000..e59f0d102230f --- /dev/null +++ b/ibis/file/tests/test_basic.py @@ -0,0 +1,200 @@ +import pytest +import pandas as pd + +from pandas.util import testing as tm +pytest.importorskip('tables') + +from ibis.file.client import ( + CSVClient, HDFClient, Database, CSVTable, + HDFTable, execute) # noqa: E402 + + +@pytest.fixture +def transformed(hdf, csv): + + # we need to cast to a timestamp type + # as we read in as strings + closes = csv.csv.close + closes = closes.mutate(time=closes.time.cast('timestamp')) + + opens = hdf.hdf.prices.open + + t = opens.inner_join(closes, ['time', 'ticker']) + t = t[opens, closes.close] + t = t.mutate(avg=(t.open + t.close) / 2) + t = t[['time', 'ticker', 'avg']] + return t + + +def test_creation(hdf): + # we have existing files in our dir + d = hdf.client.root + assert len(list(d.iterdir())) == 1 + + hdf = d / 'hdf' + assert len(list(hdf.iterdir())) == 1 + + prices = str(hdf / 'prices.h5') + assert len(pd.read_hdf(prices, 'open')) == 50 + assert len(pd.read_hdf(prices, 'close')) == 50 + + +def test_client_hdf(tmpdir, data): + + # construct with a path to a file + hdf = tmpdir + f = hdf / 'prices.h5' + + for k, v in data.items(): + v.to_hdf(str(f), k, format='table', data_columns=True) + + c = HDFClient(tmpdir) + assert c.list_databases() == ['prices'] + assert c.database().prices.list_tables() == ['close', 'open'] + + c = HDFClient(tmpdir / 'prices.h5') + assert c.list_databases() == [] + assert c.list_tables() == ['close', 'open'] + + +def test_navigation_hdf(hdf): + + # directory navigation + assert isinstance(hdf, Database) + result = dir(hdf) + assert result == ['hdf'] + + hdf = hdf.hdf + assert isinstance(hdf, Database) + result = dir(hdf) + assert result == ['prices'] + + prices = hdf.prices + assert isinstance(prices, Database) + + result = dir(prices) + assert result == ['close', 'open'] + result = prices.list_tables() + assert result == ['close', 'open'] + + opens = prices.open + assert isinstance(opens.op(), HDFTable) + + closes = prices.close + assert isinstance(closes.op(), HDFTable) + + +def test_client_csv(tmpdir, data): + + # construct with a path to a file + csv = tmpdir + + for k, v in data.items(): + f = csv / '{}.csv'.format(k) + v.to_csv(str(f), index=False) + + c = CSVClient(csv / 'open.csv') + assert c.list_databases() == [] + assert c.list_tables() == ['open'] + + c = CSVClient(csv / 'close.csv') + assert c.list_databases() == [] + assert c.list_tables() == ['close'] + + +def test_navigation_csv(csv): + + # directory navigation + assert isinstance(csv, Database) + result = dir(csv) + assert result == ['csv'] + + prices = csv.csv + assert isinstance(prices, Database) + result = dir(prices) + assert result == ['close', 'open'] + result = prices.list_tables() + assert result == ['close', 'open'] + + opens = prices.open + assert isinstance(opens.op(), CSVTable) + + closes = prices.close + assert isinstance(closes.op(), CSVTable) + + +def test_read_hdf(hdf, data): + + closes = hdf.hdf.prices.close + assert str(closes) is not None + + result = closes.execute() + expected = data['close'] + tm.assert_frame_equal(result, expected) + + result = execute(closes) + tm.assert_frame_equal(result, expected) + + +def test_read_csv(csv, data): + + closes = csv.csv.close + assert str(closes) is not None + + result = closes.execute() + expected = data['close'] + + # csv's don't preserve dtypes + expected['time'] = expected['time'].astype(str) + tm.assert_frame_equal(result, expected) + + result = execute(closes) + tm.assert_frame_equal(result, expected) + + +def test_write_csv(transformed, tmpdir): + t = transformed + + # csv's don't preserve dtypes + expected = execute(t) + expected['time'] = expected['time'].astype(str) + + tpath = tmpdir / 'new_csv' + tpath.mkdir() + path = tpath / 'foo.csv' + + assert not path.exists() + t = t.to_csv(path) + execute(t) + assert path.exists() + + # readback + t = CSVClient(str(tpath)).database() + result = t.list_tables() + assert result == ['foo'] + + result = t.foo.execute() + tm.assert_frame_equal(result, expected) + + +def test_write_hdf(transformed, tmpdir): + + t = transformed + expected = execute(t) + + tpath = tmpdir / 'new_csv' + tpath.mkdir() + path = tpath / 'foo.h5' + + assert not path.exists() + t = t.to_hdf(path, 'bar') + execute(t) + assert path.exists() + + # readback + t = HDFClient(str(tpath)).database() + result = t.list_databases() + assert result == ['foo'] + + result = t.foo.bar.execute() + tm.assert_frame_equal(result, expected) diff --git a/ibis/pandas/client.py b/ibis/pandas/client.py index 7788430f54eef..7a1dc86a6b40b 100644 --- a/ibis/pandas/client.py +++ b/ibis/pandas/client.py @@ -29,6 +29,7 @@ 'string': 'string', 'unicode': 'string', 'bytes': 'string', + 'empty': 'string', } diff --git a/setup.py b/setup.py index e839b84bbd0c3..315370913b145 100644 --- a/setup.py +++ b/setup.py @@ -40,6 +40,7 @@ visualization_requires = ['graphviz'] pandas_requires = ['multipledispatch'] clickhouse_requires = ['clickhouse-driver'] +file_requires = pandas_requires + ['tables>=3.0.0'] all_requires = ( impala_requires + @@ -48,6 +49,7 @@ visualization_requires + pandas_requires + clickhouse_requires + file_requires ) develop_requires = all_requires + [ @@ -80,6 +82,8 @@ 'visualization': visualization_requires, 'pandas': pandas_requires, 'clickhouse': clickhouse_requires + 'file': file_requires, + 'file:python_version < "3"': file_requires + ['pathlib2'], }, scripts=[ os.path.relpath(