forked from ibis-project/ibis
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
ENH: initial support a file-like backend with csv & hdf5 implementations
closes ibis-project#1165
- Loading branch information
Showing
19 changed files
with
641 additions
and
3 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -13,6 +13,7 @@ dependencies: | |
- pytest | ||
- python=3.6 | ||
- python-graphviz | ||
- pytables | ||
- sh | ||
- six | ||
- sqlalchemy>=1.0.0 | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -16,6 +16,7 @@ dependencies: | |
- pytest | ||
- python=3.6 | ||
- python-graphviz | ||
- pytables | ||
- sh | ||
- six | ||
- sphinx_rtd_theme | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Empty file.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,88 @@ | ||
import ibis | ||
import ibis.expr.types as ir | ||
from ibis.file.utils import pathlib | ||
from ibis.pandas.core import execute, execute_with_scope # noqa | ||
|
||
|
||
class FileClient(ibis.client.Client): | ||
|
||
def __init__(self, root): | ||
super(FileClient, self).__init__() | ||
self.root = pathlib.Path(str(root)) | ||
self.dictionary = {} | ||
|
||
def insert(self, path, expr, **kwargs): | ||
raise NotImplementedError | ||
|
||
def table(self, name, path): | ||
raise NotImplementedError | ||
|
||
def database(self, name=None, path=None): | ||
if name is None: | ||
return FileDatabase('root', self, path=path) | ||
|
||
if name not in self.list_databases(path): | ||
raise AttributeError(name) | ||
if path is None: | ||
path = self.root | ||
|
||
new_name = "{}.{}".format(name, self.extension) | ||
if (self.root / name).is_dir(): | ||
path = path / name | ||
elif not str(path).endswith(new_name): | ||
path = path / new_name | ||
|
||
return FileDatabase(name, self, path=path) | ||
|
||
def execute(self, expr, params=None, **kwargs): # noqa | ||
assert isinstance(expr, ir.Expr) | ||
scope = kwargs.pop('scope', {}) | ||
return execute_with_scope( | ||
expr, scope=scope, | ||
params=params, **kwargs) | ||
|
||
def list_tables(self, path=None): | ||
raise NotImplementedError | ||
|
||
def list_databases(self, path=None): | ||
raise NotImplementedError | ||
|
||
|
||
class FileDatabase(ibis.client.Database): | ||
|
||
def __init__(self, name, client, path=None): | ||
super(FileDatabase, self).__init__(name, client) | ||
self.path = path | ||
|
||
def __str__(self): | ||
return '{0.__class__.__name__}({0.name})'.format(self) | ||
|
||
def __dir__(self): | ||
dbs = self.list_databases(path=self.path) | ||
tables = self.list_tables(path=self.path) | ||
return sorted(list(set(dbs).union(set(tables)))) | ||
|
||
def __getattr__(self, name): | ||
try: | ||
return object.__getattribute__(self, name) | ||
except AttributeError: | ||
try: | ||
return self.table(name, path=self.path) | ||
except AttributeError: | ||
return self.database(name, path=self.path) | ||
|
||
def table(self, name, path): | ||
return self.client.table(name, path=path) | ||
|
||
def database(self, name=None, path=None): | ||
return self.client.database(name=name, path=path) | ||
|
||
def list_databases(self, path=None): | ||
if path is None: | ||
path = self.path | ||
return sorted(self.client.list_databases(path=path)) | ||
|
||
def list_tables(self, path=None): | ||
if path is None: | ||
path = self.path | ||
return sorted(self.client.list_tables(path=path)) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,111 @@ | ||
import pandas as pd | ||
import ibis.expr.operations as ops | ||
from ibis.file.client import FileClient | ||
from ibis.pandas.core import pre_execute, execute # noqa | ||
from ibis.pandas.client import pandas_dtypes_to_ibis_schema | ||
from ibis.pandas.execution.selection import physical_tables | ||
|
||
|
||
def connect(path): | ||
"""Create a CSVClient for use with Ibis | ||
Parameters | ||
---------- | ||
path: str or pathlib.Path | ||
Returns | ||
------- | ||
CSVClient | ||
""" | ||
|
||
return CSVClient(path) | ||
|
||
|
||
class CSVTable(ops.DatabaseTable): | ||
pass | ||
|
||
|
||
class CSVClient(FileClient): | ||
extension = 'csv' | ||
|
||
def insert(self, path, t, index=False, **kwargs): | ||
path = self.root / path | ||
data = execute(t) | ||
data.to_csv(str(path), index=index, **kwargs) | ||
|
||
def table(self, name, path=None): | ||
if name not in self.list_tables(path): | ||
raise AttributeError(name) | ||
|
||
if path is None: | ||
path = self.root | ||
|
||
# get the schema | ||
f = path / "{}.{}".format(name, self.extension) | ||
df = pd.read_csv(str(f), header=0, nrows=10) | ||
schema = pandas_dtypes_to_ibis_schema(df, {}) | ||
|
||
t = CSVTable(name, schema, self).to_expr() | ||
self.dictionary[name] = f | ||
return t | ||
|
||
def list_tables(self, path=None): | ||
# tables are files in a dir | ||
if path is None: | ||
path = self.root | ||
|
||
tables = [] | ||
if path.is_dir(): | ||
for d in path.iterdir(): | ||
if d.is_file(): | ||
if str(d).endswith(self.extension): | ||
tables.append(d.stem) | ||
elif path.is_file(): | ||
if str(path).endswith(self.extension): | ||
tables.append(path.stem) | ||
return tables | ||
|
||
def list_databases(self, path=None): | ||
# databases are dir | ||
if path is None: | ||
path = self.root | ||
|
||
tables = [] | ||
if path.is_dir(): | ||
for d in path.iterdir(): | ||
if d.is_dir(): | ||
tables.append(d.name) | ||
return tables | ||
|
||
|
||
@pre_execute.register(CSVTable, CSVClient) | ||
def csv_pre_execute_table(op, client, scope=None, **kwargs): | ||
path = client.dictionary[op.name] | ||
df = pd.read_csv(str(path), header=0) | ||
return {op: df} | ||
|
||
|
||
@pre_execute.register(ops.Selection, CSVClient) | ||
def csv_pre_execute(op, client, scope=None, **kwargs): | ||
|
||
pt = physical_tables(op.table.op()) | ||
pt = pt[0] | ||
|
||
path = client.dictionary[pt.name] | ||
|
||
if op.selections: | ||
|
||
header = pd.read_csv(str(path), header=0, nrows=1) | ||
usecols = [getattr(s.op(), 'name', None) or s.get_name() | ||
for s in op.selections] | ||
|
||
# we cannot read all the columns taht we would like | ||
if len(pd.Index(usecols) & header.columns) != len(usecols): | ||
usecols = None | ||
|
||
else: | ||
|
||
usecols = None | ||
|
||
df = pd.read_csv(str(path), usecols=usecols, header=0) | ||
return {op: df} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,89 @@ | ||
import pandas as pd | ||
import ibis.expr.operations as ops | ||
from ibis.file.client import FileClient | ||
from ibis.pandas.core import pre_execute, execute # noqa | ||
from ibis.pandas.client import pandas_dtypes_to_ibis_schema | ||
|
||
|
||
def connect(path): | ||
"""Create a HDF5Client for use with Ibis | ||
Parameters | ||
---------- | ||
path: str or pathlib.Path | ||
Returns | ||
------- | ||
HDF5Client | ||
""" | ||
return HDFClient(path) | ||
|
||
|
||
class HDFTable(ops.DatabaseTable): | ||
pass | ||
|
||
|
||
class HDFClient(FileClient): | ||
extension = 'h5' | ||
|
||
def insert(self, path, key, t, format='table', | ||
data_columns=True, **kwargs): | ||
|
||
path = self.root / path | ||
data = execute(t) | ||
data.to_hdf(str(path), key, format=format, | ||
data_columns=data_columns, **kwargs) | ||
|
||
def table(self, name, path): | ||
if name not in self.list_tables(path): | ||
raise AttributeError(name) | ||
|
||
# get the schema | ||
with pd.HDFStore(str(path), mode='r') as store: | ||
df = store.select(name, start=0, stop=0) | ||
schema = pandas_dtypes_to_ibis_schema(df, {}) | ||
|
||
t = HDFTable(name, schema, self).to_expr() | ||
self.dictionary[name] = path | ||
return t | ||
|
||
def list_tables(self, path=None): | ||
# tables are individual tables within a file | ||
|
||
if path is None: | ||
path = self.root | ||
|
||
if path.is_file() and str(path).endswith(self.extension): | ||
|
||
with pd.HDFStore(str(path), mode='r') as store: | ||
# strip leading / | ||
return [k[1:] for k in store.keys()] | ||
|
||
return [] | ||
|
||
def list_databases(self, path=None): | ||
# databases are dir & a .h5 file | ||
if path is None: | ||
path = self.root | ||
|
||
tables = [] | ||
if path.is_dir(): | ||
for d in path.iterdir(): | ||
if d.is_dir(): | ||
tables.append(d.name) | ||
elif d.is_file(): | ||
if str(d).endswith(self.extension): | ||
tables.append(d.stem) | ||
elif path.is_file(): | ||
# by definition we are at the db level at this point | ||
pass | ||
|
||
return tables | ||
|
||
|
||
@pre_execute.register(HDFTable, HDFClient) | ||
def hdf_pre_execute_table(op, client, scope=None, **kwargs): | ||
key = op.name | ||
path = client.dictionary[key] | ||
df = pd.read_hdf(str(path), key, mode='r') | ||
return {op: df} |
Empty file.
Oops, something went wrong.