Skip to content

Commit

Permalink
ENH: initial support a file-like backend with csv & hdf5 implementations
Browse files Browse the repository at this point in the history
  • Loading branch information
jreback committed Oct 16, 2017
1 parent 34cc0c2 commit d8b7581
Show file tree
Hide file tree
Showing 14 changed files with 565 additions and 1 deletion.
3 changes: 2 additions & 1 deletion appveyor.yml
Expand Up @@ -30,7 +30,8 @@ test_script:
- "%CONDA% install conda=4.3.22 --channel conda-forge"
- "%CONDA% create --name \"ibis_%PYTHON_VERSION%\" python=%PYTHON_VERSION% --channel conda-forge"
- "%ACTIVATE% \"ibis_%PYTHON_VERSION%\""
- "pip install -e .\"[sqlite, postgres, visualization, pandas]\""
- "%CONDA% install pytables"
- "pip install -e .\"[sqlite, postgres, visualization, pandas, file]\""
- "pip install flake8 mock pytest click \"pbs==0.110\""
- "flake8"
- "python ci\\datamgr.py download --directory \"%USERPROFILE%\""
Expand Down
2 changes: 2 additions & 0 deletions ci/requirements-dev-2.7.yml
Expand Up @@ -15,6 +15,8 @@ dependencies:
- pytest
- python=2.7
- python-graphviz
- pytables
- pathlib2
- sh
- six
- sqlalchemy>=1.0.0
Expand Down
1 change: 1 addition & 0 deletions ci/requirements-dev-3.6.yml
Expand Up @@ -13,6 +13,7 @@ dependencies:
- pytest
- python=3.6
- python-graphviz
- pytables
- sh
- six
- sqlalchemy>=1.0.0
Expand Down
4 changes: 4 additions & 0 deletions conda-recipes/ibis-framework/meta.yaml
Expand Up @@ -13,6 +13,7 @@ source:
requirements:
build:
- enum34 # [py27]
- pathlib2 # [py27]
- numpy >=1.10.0
- pandas >=0.18.1
- python
Expand All @@ -21,6 +22,7 @@ requirements:
- toolz
run:
- enum34 # [py27]
- pathlib2 # [py27]
- numpy >=1.10.0
- pandas >=0.18.1
- python
Expand Down Expand Up @@ -56,6 +58,8 @@ test:
- ibis.sql.tests
- ibis.sql.vertica
- ibis.sql.vertica.tests
- ibis.file
- ibis.file.tests
- ibis.tests
commands:
- pytest --version
Expand Down
Empty file added ibis/file/__init__.py
Empty file.
42 changes: 42 additions & 0 deletions ibis/file/api.py
@@ -0,0 +1,42 @@
from ibis.expr.types import TableExpr
from ibis.expr.api import _add_methods
from ibis.file import operations as _ops


# ---------------------------------------------------------------------
# IO API

def to_csv(self, path):
"""
Write as a csv to the indicated path
Parameters
----------
path : pathlib.Path
"""

return _ops.CSV(self, str(path)).to_expr()


def to_hdf(self, path, key):
"""
Write as a HDF5 file to the indicated path
Parameters
----------
path : pathlib.Path
key : string
"""

return _ops.HDF(self, str(path), key).to_expr()


_io_ops = dict(
to_csv=to_csv,
to_hdf=to_hdf,
)


_add_methods(TableExpr, _io_ops)
210 changes: 210 additions & 0 deletions ibis/file/client.py
@@ -0,0 +1,210 @@
try:
import pathlib
except ImportError:

# py2 compat
import pathlib2 as pathlib

import ibis
import ibis.expr.types as ir
import ibis.expr.operations as ops
from ibis.pandas.execution import execute
from ibis.pandas.core import data_preload
from ibis.pandas.client import pandas_dtypes_to_ibis_schema
import ibis.file.execution # noqa
import ibis.file.api # noqa
import pandas as pd


class HDFTable(ops.DatabaseTable):
pass


class CSVTable(ops.DatabaseTable):
pass


class Client(ibis.client.Client):

def __init__(self, root):
super(Client, self).__init__()
self.root = pathlib.Path(str(root))
self.dictionary = {}

def database(self, name=None, path=None):
if name is None:
return Database('root', self, path=path)

if name not in self.list_databases(path):
raise AttributeError(name)
if path is None:
path = self.root

new_name = "{}.{}".format(name, self.extension)
if (self.root / name).is_dir():
path = path / name
elif not str(path).endswith(new_name):
path = path / new_name

return Database(name, self, path=path)

def table(self, name, path):
raise NotImplementedError

def execute(self, expr, params=None, **kwargs):
assert isinstance(expr, ir.Expr)
return execute(expr, params=params)

def list_tables(self, path=None):
raise NotImplementedError

def list_databases(self, path=None):
raise NotImplementedError


class HDFClient(Client):
extension = 'h5'

def table(self, name, path):
if name not in self.list_tables(path):
raise AttributeError(name)

# get the schema
with pd.HDFStore(str(path), mode='r') as store:
df = store.select(name, start=0, stop=0)
schema = pandas_dtypes_to_ibis_schema(df, {})

t = HDFTable(name, schema, self).to_expr()
self.dictionary[name] = path
return t

def list_tables(self, path=None):
# tables are individual tables within a file

if path is None:
path = self.root

if (path.is_file() and str(path).endswith(self.extension)):

with pd.HDFStore(str(path), mode='r') as store:
# strip leading /
return [k[1:] for k in store.keys()]

return []

def list_databases(self, path=None):
# databases are dir & a .h5 file
if path is None:
path = self.root

tables = []
if path.is_dir():
for d in path.iterdir():
if d.is_dir():
tables.append(d.name)
elif d.is_file():
if str(d).endswith(self.extension):
tables.append(d.stem)
elif path.is_file():
# by definition we are at the db level at this point
pass

return tables


class CSVClient(Client):
extension = 'csv'

def table(self, name, path=None):
if name not in self.list_tables(path):
raise AttributeError(name)

if path is None:
path = self.root

# get the schema
f = path / "{}.{}".format(name, self.extension)
df = pd.read_csv(str(f), header=0, nrows=10)
schema = pandas_dtypes_to_ibis_schema(df, {})

t = CSVTable(name, schema, self).to_expr()
self.dictionary[name] = f
return t

def list_tables(self, path=None):
# tables are files in a dir
if path is None:
path = self.root

tables = []
if path.is_dir():
for d in path.iterdir():
if d.is_file():
if str(d).endswith(self.extension):
tables.append(d.stem)
elif path.is_file():
if str(path).endswith(self.extension):
tables.append(path.stem)
return tables

def list_databases(self, path=None):
# databases are dir
if path is None:
path = self.root

tables = []
if path.is_dir():
for d in path.iterdir():
if d.is_dir():
tables.append(d.name)
return tables


class Database(ibis.client.Database):

def __init__(self, name, client, path=None):
super(Database, self).__init__(name, client)
self.path = path

def __str__(self):
return '{0.__class__.__name__}({0.name})'.format(self)

def __dir__(self):
dbs = self.list_databases(path=self.path)
tables = self.list_tables(path=self.path)
return sorted(list(set(dbs).union(set(tables))))

def __getattr__(self, name):
try:
return object.__getattribute__(self, name)
except AttributeError:
try:
return self.table(name, path=self.path)
except AttributeError:
return self.database(name, path=self.path)

def table(self, name, path):
return self.client.table(name, path=path)

def database(self, name=None, path=None):
return self.client.database(name=name, path=path)

def list_databases(self, path=None):
if path is None:
path = self.path
return sorted(self.client.list_databases(path=path))

def list_tables(self, path=None):
if path is None:
path = self.path
return sorted(self.client.list_tables(path=path))


@data_preload.register(HDFTable, (pathlib.Path, str))
def hdf_data_preload_uri_client(table, path, scope=None, **kwargs):
return pd.read_hdf(str(path), table.name, mode='r')


@data_preload.register(CSVTable, (pathlib.Path, str))
def csv_data_preload_uri_client(table, path, scope=None, **kwargs):
return pd.read_csv(str(path), header=0)
16 changes: 16 additions & 0 deletions ibis/file/execution.py
@@ -0,0 +1,16 @@
import pandas as pd
from ibis.file import operations as ops
from ibis.pandas.dispatch import execute_node


@execute_node.register(ops.CSV, pd.DataFrame)
def execute_to_csv_dataframe(op, data, **kwargs):
path = op.path
data.to_csv(path, index=False)


@execute_node.register(ops.HDF, pd.DataFrame)
def execute_to_hdf_dataframe(op, data, **kwargs):
path = op.path
key = op.key
data.to_hdf(path, key, format='table', data_columns=True)
30 changes: 30 additions & 0 deletions ibis/file/operations.py
@@ -0,0 +1,30 @@
from ibis.expr import rules
from ibis.expr.operations import TableNode as _TableNode
import ibis.expr.types as ir


class TableNode(_TableNode):

def root_tables(self):
return ir.distinct_roots(self.table)


class CSV(TableNode):

def __init__(self, table, path):
if not rules.is_table(table):
raise ValueError("must pass a table")
self.table = table
self.path = path
super(CSV, self).__init__([self.table, self.path])


class HDF(TableNode):

def __init__(self, table, path, key):
if not rules.is_table(table):
raise ValueError("must pass a table")
self.table = table
self.path = path
self.key = key
super(HDF, self).__init__([self.table, self.path, self.key])
Empty file added ibis/file/tests/__init__.py
Empty file.

0 comments on commit d8b7581

Please sign in to comment.