forked from ibis-project/ibis
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
ENH: initial support a file-like backend with csv & hdf5 implementations
closes ibis-project#1165
- Loading branch information
Showing
14 changed files
with
566 additions
and
2 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -13,6 +13,7 @@ dependencies: | |
- pytest | ||
- python=3.6 | ||
- python-graphviz | ||
- pytables | ||
- sh | ||
- six | ||
- sqlalchemy>=1.0.0 | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Empty file.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,42 @@ | ||
from ibis.expr.types import TableExpr | ||
from ibis.expr.api import _add_methods | ||
from ibis.file import operations as _ops | ||
|
||
|
||
# --------------------------------------------------------------------- | ||
# IO API | ||
|
||
def to_csv(self, path): | ||
""" | ||
Write as a csv to the indicated path | ||
Parameters | ||
---------- | ||
path : pathlib.Path | ||
""" | ||
|
||
return _ops.CSV(self, str(path)).to_expr() | ||
|
||
|
||
def to_hdf(self, path, key): | ||
""" | ||
Write as a HDF5 file to the indicated path | ||
Parameters | ||
---------- | ||
path : pathlib.Path | ||
key : string | ||
""" | ||
|
||
return _ops.HDF(self, str(path), key).to_expr() | ||
|
||
|
||
_io_ops = dict( | ||
to_csv=to_csv, | ||
to_hdf=to_hdf, | ||
) | ||
|
||
|
||
_add_methods(TableExpr, _io_ops) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,210 @@ | ||
try: | ||
import pathlib | ||
except ImportError: | ||
|
||
# py2 compat | ||
import pathlib2 as pathlib | ||
|
||
import ibis | ||
import ibis.expr.types as ir | ||
import ibis.expr.operations as ops | ||
from ibis.pandas.execution import execute | ||
from ibis.pandas.core import data_preload | ||
from ibis.pandas.client import pandas_dtypes_to_ibis_schema | ||
import ibis.file.execution # noqa | ||
import ibis.file.api # noqa | ||
import pandas as pd | ||
|
||
|
||
class HDFTable(ops.DatabaseTable): | ||
pass | ||
|
||
|
||
class CSVTable(ops.DatabaseTable): | ||
pass | ||
|
||
|
||
class Client(ibis.client.Client): | ||
|
||
def __init__(self, root): | ||
super(Client, self).__init__() | ||
self.root = pathlib.Path(root) | ||
self.dictionary = {} | ||
|
||
def database(self, name=None, path=None): | ||
if name is None: | ||
return Database('root', self, path=path) | ||
|
||
if name not in self.list_databases(path): | ||
raise AttributeError(name) | ||
if path is None: | ||
path = self.root | ||
|
||
new_name = "{}.{}".format(name, self.extension) | ||
if (self.root / name).is_dir(): | ||
path = path / name | ||
elif not str(path).endswith(new_name): | ||
path = path / new_name | ||
|
||
return Database(name, self, path=path) | ||
|
||
def table(self, name, path): | ||
raise NotImplementedError | ||
|
||
def execute(self, expr, params=None, **kwargs): | ||
assert isinstance(expr, ir.Expr) | ||
return execute(expr, params=params) | ||
|
||
def list_tables(self, path=None): | ||
raise NotImplementedError | ||
|
||
def list_databases(self, path=None): | ||
raise NotImplementedError | ||
|
||
|
||
class HDFClient(Client): | ||
extension = 'h5' | ||
|
||
def table(self, name, path): | ||
if name not in self.list_tables(path): | ||
raise AttributeError(name) | ||
|
||
# get the schema | ||
with pd.HDFStore(str(path), mode='r') as store: | ||
df = store.select(name, start=0, stop=0) | ||
schema = pandas_dtypes_to_ibis_schema(df, {}) | ||
|
||
t = HDFTable(name, schema, self).to_expr() | ||
self.dictionary[name] = path | ||
return t | ||
|
||
def list_tables(self, path=None): | ||
# tables are individual tables within a file | ||
|
||
if path is None: | ||
path = self.root | ||
|
||
if (path.is_file() and str(path).endswith(self.extension)): | ||
|
||
with pd.HDFStore(str(path), mode='r') as store: | ||
# strip leading / | ||
return [k[1:] for k in store.keys()] | ||
|
||
return [] | ||
|
||
def list_databases(self, path=None): | ||
# databases are dir & a .h5 file | ||
if path is None: | ||
path = self.root | ||
|
||
tables = [] | ||
if path.is_dir(): | ||
for d in path.iterdir(): | ||
if d.is_dir(): | ||
tables.append(d.name) | ||
elif d.is_file(): | ||
if str(d).endswith(self.extension): | ||
tables.append(d.stem) | ||
elif path.is_file(): | ||
# by definition we are at the db level at this point | ||
pass | ||
|
||
return tables | ||
|
||
|
||
class CSVClient(Client): | ||
extension = 'csv' | ||
|
||
def table(self, name, path=None): | ||
if name not in self.list_tables(path): | ||
raise AttributeError(name) | ||
|
||
if path is None: | ||
path = self.root | ||
|
||
# get the schema | ||
f = path / "{}.{}".format(name, self.extension) | ||
df = pd.read_csv(str(f), header=0, nrows=10) | ||
schema = pandas_dtypes_to_ibis_schema(df, {}) | ||
|
||
t = CSVTable(name, schema, self).to_expr() | ||
self.dictionary[name] = f | ||
return t | ||
|
||
def list_tables(self, path=None): | ||
# tables are files in a dir | ||
if path is None: | ||
path = self.root | ||
|
||
tables = [] | ||
if path.is_dir(): | ||
for d in path.iterdir(): | ||
if d.is_file(): | ||
if str(d).endswith(self.extension): | ||
tables.append(d.stem) | ||
elif path.is_file(): | ||
if str(path).endswith(self.extension): | ||
tables.append(path.stem) | ||
return tables | ||
|
||
def list_databases(self, path=None): | ||
# databases are dir | ||
if path is None: | ||
path = self.root | ||
|
||
tables = [] | ||
if path.is_dir(): | ||
for d in path.iterdir(): | ||
if d.is_dir(): | ||
tables.append(d.name) | ||
return tables | ||
|
||
|
||
class Database(ibis.client.Database): | ||
|
||
def __init__(self, name, client, path=None): | ||
super(Database, self).__init__(name, client) | ||
self.path = path | ||
|
||
def __str__(self): | ||
return '{0.__class__.__name__}({0.name})'.format(self) | ||
|
||
def __dir__(self): | ||
dbs = self.list_databases(path=self.path) | ||
tables = self.list_tables(path=self.path) | ||
return sorted(list(set(dbs).union(set(tables)))) | ||
|
||
def __getattr__(self, name): | ||
try: | ||
return object.__getattribute__(self, name) | ||
except AttributeError: | ||
try: | ||
return self.table(name, path=self.path) | ||
except AttributeError: | ||
return self.database(name, path=self.path) | ||
|
||
def table(self, name, path): | ||
return self.client.table(name, path=path) | ||
|
||
def database(self, name=None, path=None): | ||
return self.client.database(name=name, path=path) | ||
|
||
def list_databases(self, path=None): | ||
if path is None: | ||
path = self.path | ||
return sorted(self.client.list_databases(path=path)) | ||
|
||
def list_tables(self, path=None): | ||
if path is None: | ||
path = self.path | ||
return sorted(self.client.list_tables(path=path)) | ||
|
||
|
||
@data_preload.register(HDFTable, pathlib.Path) | ||
def hdf_data_preload_uri_client(table, path, scope=None, **kwargs): | ||
return pd.read_hdf(str(path), table.name, mode='r') | ||
|
||
|
||
@data_preload.register(CSVTable, pathlib.Path) | ||
def csv_data_preload_uri_client(table, path, scope=None, **kwargs): | ||
return pd.read_csv(str(path), header=0) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,16 @@ | ||
import pandas as pd | ||
from ibis.file import operations as ops | ||
from ibis.pandas.dispatch import execute_node | ||
|
||
|
||
@execute_node.register(ops.CSV, pd.DataFrame) | ||
def execute_to_csv_dataframe(op, data, **kwargs): | ||
path = op.path | ||
data.to_csv(path, index=False) | ||
|
||
|
||
@execute_node.register(ops.HDF, pd.DataFrame) | ||
def execute_to_hdf_dataframe(op, data, **kwargs): | ||
path = op.path | ||
key = op.key | ||
data.to_hdf(path, key, format='table', data_columns=True) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,30 @@ | ||
from ibis.expr import rules | ||
from ibis.expr.operations import TableNode as _TableNode | ||
import ibis.expr.types as ir | ||
|
||
|
||
class TableNode(_TableNode): | ||
|
||
def root_tables(self): | ||
return ir.distinct_roots(self.table) | ||
|
||
|
||
class CSV(TableNode): | ||
|
||
def __init__(self, table, path): | ||
if not rules.is_table(table): | ||
raise ValueError("must pass a table") | ||
self.table = table | ||
self.path = path | ||
super(CSV, self).__init__([self.table, self.path]) | ||
|
||
|
||
class HDF(TableNode): | ||
|
||
def __init__(self, table, path, key): | ||
if not rules.is_table(table): | ||
raise ValueError("must pass a table") | ||
self.table = table | ||
self.path = path | ||
self.key = key | ||
super(HDF, self).__init__([self.table, self.path, self.key]) |
Empty file.
Oops, something went wrong.