Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add CSV export capability to DataFrameClient #45

Merged
merged 20 commits into from
Apr 6, 2023
Merged
Show file tree
Hide file tree
Changes from 19 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions docs/api_reference/core.rst
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,7 @@ nisystemlink.clients.core
:members:
:inherited-members:
:imported-members:

.. automodule:: nisystemlink.clients.core.helpers
:members:
:imported-members:
1 change: 1 addition & 0 deletions docs/api_reference/dataframe.rst
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ nisystemlink.clients.dataframe
.. automethod:: get_table_data
.. automethod:: append_table_data
.. automethod:: query_table_data
.. automethod:: export_table_data
kjohn1922 marked this conversation as resolved.
Show resolved Hide resolved
.. automethod:: query_decimated_data

.. automodule:: nisystemlink.clients.dataframe.models
Expand Down
17 changes: 13 additions & 4 deletions docs/getting_started.rst
Original file line number Diff line number Diff line change
Expand Up @@ -76,13 +76,13 @@ Subscribe to tag changes
:language: python
:linenos:

Data Frame API
DataFrame API
-------

Overview
~~~~~~~~

The :class:`.DataFrameClient` class is the primary entry point of the Data Frame API.
The :class:`.DataFrameClient` class is the primary entry point of the DataFrame API.

When constructing a :class:`.DataFrameClient`, you can pass an
:class:`.HttpConfiguration` (like one retrieved from the
Expand All @@ -91,11 +91,14 @@ default connection. The default connection depends on your environment.

With a :class:`.DataFrameClient` object, you can:

* Create and delete Data Frame Tables.
* Create and delete data tables.

* Modify table metadata and query for tables by their metadata.

* Append rows of data to a table, query for rows of data from a table, and decimate table data.
* Append rows of data to a table, query for rows of data from a table, and
decimate table data.

* Export table data in a comma-separated values (CSV) format.

Examples
~~~~~~~~
Expand All @@ -111,3 +114,9 @@ Query and read data from a table
.. literalinclude:: ../examples/dataframe/query_read_data.py
:language: python
:linenos:

Export data from a table

.. literalinclude:: ../examples/dataframe/export_data.py
kjohn1922 marked this conversation as resolved.
Show resolved Hide resolved
:language: python
:linenos:
36 changes: 36 additions & 0 deletions examples/dataframe/export_data.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
from shutil import copyfileobj

from nisystemlink.clients.dataframe import DataFrameClient
from nisystemlink.clients.dataframe.models import (
ColumnFilter,
ColumnOrderBy,
ExportFormat,
ExportTableDataRequest,
FilterOperation,
)

client = DataFrameClient()

# List a table
response = client.list_tables(take=1)
table = response.tables[0]

# Export table data with query options
request = ExportTableDataRequest(
columns=["col1"],
order_by=[ColumnOrderBy(column="col2", descending=True)],
filters=[
ColumnFilter(column="col1", operation=FilterOperation.NotEquals, value="0")
],
response_format=ExportFormat.CSV,
)

data = client.export_table_data(id=table.id, query=request)

# Write the export data to a file
with open(f"{table.name}.csv", "wb") as f:
copyfileobj(data, f)

# Alternatively, load the export data into a pandas dataframe
# import pandas as pd
# df = pd.read_csv(data)
3 changes: 3 additions & 0 deletions mypy.ini
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@ disallow_untyped_decorators=True

strict_equality=True

[mypy-pandas.*]
ignore_missing_imports=True

kjohn1922 marked this conversation as resolved.
Show resolved Hide resolved
[mypy-tests.*]
disallow_untyped_calls=True
disallow_untyped_decorators=True
Expand Down
19 changes: 18 additions & 1 deletion nisystemlink/clients/core/_uplink/_methods.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,13 @@

from typing import Any, Callable, Optional, Sequence, Tuple, TypeVar, Union

from uplink import Body, commands, json, returns
from uplink import (
Body,
commands,
json,
response_handler as uplink_response_handler,
returns,
)

F = TypeVar("F", bound=Callable[..., Any])

Expand Down Expand Up @@ -50,3 +56,14 @@ def decorator(func: F) -> F:
return commands.delete(path, args=args)(func) # type: ignore

return decorator


def response_handler(
handler: Any, requires_consumer: Optional[bool] = False
) -> Callable[[F], F]:
"""Annotation for creating custom response handlers."""

def decorator(func: F) -> F:
return uplink_response_handler(handler, requires_consumer)(func) # type: ignore

return decorator
3 changes: 3 additions & 0 deletions nisystemlink/clients/core/helpers/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
from ._iterator_file_like import IteratorFileLike

# flake8: noqa
30 changes: 30 additions & 0 deletions nisystemlink/clients/core/helpers/_iterator_file_like.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
from typing import Any, Iterator


class IteratorFileLike:
"""A file-like object adapter that wraps a python iterator, providing a way to
read from the iterator as if it was a file.
"""

def __init__(self, iterator: Iterator[Any]):
self._iterator = iterator
self._buffer = b""

def read(self, size: int = -1) -> bytes:
"""Read at most `size` bytes from the file-like object. If `size` is not
specified or is negative, read until the iterator is exhausted and
returns all bytes or characters read.
"""
while size < 0 or len(self._buffer) < size:
try:
chunk = next(self._iterator)
self._buffer += chunk
except StopIteration:
break
if size < 0:
data = self._buffer
self._buffer = b""
else:
data = self._buffer[:size]
self._buffer = self._buffer[size:]
return data
77 changes: 54 additions & 23 deletions nisystemlink/clients/dataframe/_data_frame_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,15 @@

from nisystemlink.clients import core
from nisystemlink.clients.core._uplink._base_client import BaseClient
from nisystemlink.clients.core._uplink._methods import delete, get, patch, post
from nisystemlink.clients.core._uplink._methods import (
delete,
get,
patch,
post,
response_handler,
)
from nisystemlink.clients.core.helpers import IteratorFileLike
from requests.models import Response
from uplink import Body, Field, Path, Query

from . import models
Expand All @@ -21,7 +29,7 @@ def __init__(self, configuration: Optional[core.HttpConfiguration] = None):
is used.

Raises:
ApiException: if unable to communicate with the Data Frame service.
ApiException: if unable to communicate with the DataFrame Service.
kjohn1922 marked this conversation as resolved.
Show resolved Hide resolved
"""
if configuration is None:
configuration = core.JupyterHttpConfiguration()
Expand All @@ -36,7 +44,7 @@ def api_info(self) -> models.ApiInfo:
Information about available API operations.

Raises:
ApiException: if unable to communicate with the Data Frame service.
ApiException: if unable to communicate with the DataFrame Service.
"""
...

Expand Down Expand Up @@ -74,7 +82,7 @@ def list_tables(
The list of tables with a continuation token.

Raises:
ApiException: if unable to communicate with the Data Frame service
ApiException: if unable to communicate with the DataFrame Service
or provided an invalid argument.
"""
...
Expand All @@ -90,7 +98,7 @@ def create_table(self, table: models.CreateTableRequest) -> str:
The ID of the newly created table.

Raises:
ApiException: if unable to communicate with the Data Frame service
ApiException: if unable to communicate with the DataFrame Service
or provided an invalid argument.
"""
...
Expand All @@ -106,7 +114,7 @@ def query_tables(self, query: models.QueryTablesRequest) -> models.PagedTables:
The list of tables with a continuation token.

Raises:
ApiException: if unable to communicate with the Data Frame service
ApiException: if unable to communicate with the DataFrame Service
or provided an invalid argument.
"""
...
Expand All @@ -116,13 +124,13 @@ def get_table_metadata(self, id: str) -> models.TableMetadata:
"""Retrieves the metadata and column information for a single table identified by its ID.

Args:
id (str): Unique ID of a DataFrame table.
id (str): Unique ID of a data table.

Returns:
The metadata for the table.

Raises:
ApiException: if unable to communicate with the Data Frame service
ApiException: if unable to communicate with the DataFrame Service
or provided an invalid argument.
"""
...
Expand All @@ -132,11 +140,11 @@ def modify_table(self, id: str, update: models.ModifyTableRequest) -> None:
"""Modify properties of a table or its columns.

Args:
id: Unique ID of a DataFrame table.
id: Unique ID of a data table.
update: The metadata to update.

Raises:
ApiException: if unable to communicate with the Data Frame service
ApiException: if unable to communicate with the DataFrame Service
or provided an invalid argument.
"""
...
Expand All @@ -146,10 +154,10 @@ def delete_table(self, id: str) -> None:
"""Deletes a table.

Args:
id (str): Unique ID of a DataFrame table.
id (str): Unique ID of a data table.

Raises:
ApiException: if unable to communicate with the Data Frame service
ApiException: if unable to communicate with the DataFrame Service
or provided an invalid argument.
"""
...
Expand All @@ -161,14 +169,14 @@ def delete_tables(
"""Deletes multiple tables.

Args:
ids (List[str]): List of unique IDs of DataFrame tables.
ids (List[str]): List of unique IDs of data tables.

Returns:
A partial success if any tables failed to delete, or None if all
tables were deleted successfully.

Raises:
ApiException: if unable to communicate with the Data Frame service
ApiException: if unable to communicate with the DataFrame Service
or provided an invalid argument.
"""
...
Expand All @@ -187,7 +195,7 @@ def modify_tables(
tables were modified successfully.

Raises:
ApiException: if unable to communicate with the Data Frame service
ApiException: if unable to communicate with the DataFrame Service
or provided an invalid argument.
"""
...
Expand Down Expand Up @@ -215,7 +223,7 @@ def get_table_data(
"""Reads raw data from the table identified by its ID.

Args:
id: Unique ID of a DataFrame table.
id: Unique ID of a data table.
columns: Columns to include in the response. Data will be returned in the same order as
the columns. If not specified, all columns are returned.
order_by: List of columns to sort by. Multiple columns may be specified to order rows
Expand All @@ -230,7 +238,7 @@ def get_table_data(
The table data and total number of rows with a continuation token.

Raises:
ApiException: if unable to communicate with the Data Frame service
ApiException: if unable to communicate with the DataFrame Service
or provided an invalid argument.
"""
...
Expand All @@ -240,11 +248,11 @@ def append_table_data(self, id: str, data: models.AppendTableDataRequest) -> Non
"""Appends one or more rows of data to the table identified by its ID.

Args:
id: Unique ID of a DataFrame table.
id: Unique ID of a data table.
data: The rows of data to append and any additional options.

Raises:
ApiException: if unable to communicate with the Data Frame service
ApiException: if unable to communicate with the DataFrame Service
or provided an invalid argument.
"""
...
Expand All @@ -256,14 +264,14 @@ def query_table_data(
"""Reads rows of data that match a filter from the table identified by its ID.

Args:
id: Unique ID of a DataFrame table.
id: Unique ID of a data table.
query: The filtering and sorting to apply when reading data.

Returns:
The table data and total number of rows with a continuation token.

Raises:
ApiException: if unable to communicate with the Data Frame service
ApiException: if unable to communicate with the DataFrame Service
or provided an invalid argument.
"""
...
Expand All @@ -275,14 +283,37 @@ def query_decimated_data(
"""Reads decimated rows of data from the table identified by its ID.

Args:
id: Unique ID of a DataFrame table.
id: Unique ID of a data table.
query: The filtering and decimation options to apply when reading data.

Returns:
The decimated table data.

Raises:
ApiException: if unable to communicate with the Data Frame service
ApiException: if unable to communicate with the DataFrame Service
or provided an invalid argument.
"""
...

def _iter_content_filelike_wrapper(response: Response) -> IteratorFileLike:
return IteratorFileLike(response.iter_content(chunk_size=4096))

@response_handler(_iter_content_filelike_wrapper)
@post("tables/{id}/export-data", args=[Path, Body])
def export_table_data(
self, id: str, query: models.ExportTableDataRequest
) -> IteratorFileLike:
kjohn1922 marked this conversation as resolved.
Show resolved Hide resolved
"""Exports rows of data that match a filter from the table identified by its ID.

Args:
id: Unique ID of a data table.
query: The filtering, sorting, and export format to apply when exporting data.

Returns:
A file-like object for reading the exported data.

Raises:
ApiException: if unable to communicate with the DataFrame Service
or provided an invalid argument.
"""
...
Loading