Skip to content

Commit

Permalink
source: Added Pandas DataFrame
Browse files Browse the repository at this point in the history
Co-authored-by: John Andersen <johnandersenpdx@gmail.com>
Signed-off-by: John Andersen <johnandersenpdx@gmail.com>
  • Loading branch information
spur19 and pdxjohnny committed Jul 17, 2021
1 parent cab8349 commit 1ef5a16
Show file tree
Hide file tree
Showing 5 changed files with 312 additions and 1 deletion.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- Usecase example notebook for "Evaluating Model Performance"
- Tests for all notebooks auto created and run via ``test_notebooks.py``
- Support for additional layers in pytorch pretrained models via Python API
- Pandas DataFrame can now be passed directly to high level APIs
### Changed
- Calls to hashlib now go through helper functions
- Build docs using `dffml service dev docs`
Expand Down
202 changes: 202 additions & 0 deletions dffml/source/dataframe.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,202 @@
"""
Expose Pandas DataFrame as DFFML Source
"""
from typing import Dict, List, AsyncIterator


from ..record import Record
from ..base import config, field
from ..util.entrypoint import entrypoint
from ..util.net import DEFAULT_PROTOCOL_ALLOWLIST
from .source import BaseSourceContext, BaseSource


class DataFrameSourceContext(BaseSourceContext):
async def update(self, record: Record):
# Shorthand for DataFrame
df = self.parent.config.dataframe
# Store feature data
features = record.features()
for col in df.columns:
if col in features:
df.loc[record.key, col] = features[col]
# Store prediction
predictions = record.predictions()
for col in self.parent.config.predictions:
if col in predictions:
df.loc[record.key, col] = predictions[col]["value"]

async def records(self) -> AsyncIterator[Record]:
for row in self.parent.config.dataframe.itertuples():
features = dict(row._asdict())
predictions = {
key: {"value": features[key]}
for key in self.parent.config.predictions
}
del features["Index"]
for key in predictions.keys():
if key in features:
del features[key]
yield Record(
str(row.Index),
data={"features": features, "prediction": predictions},
)

async def record(self, key: str) -> Record:
data = self.parent.config.dataframe.iloc[int(key)]
predictions = {
key: data[key] for key in self.parent.config.predictions
}
features = {
key: value for key in data.items() if key not in predictions
}
return Record(
str(key), data={"features": features, "prediction": predictions},
)


@config
class DataFrameSourceConfig:
dataframe: "pandas.DataFrame" = field(
"The pandas DataFrame to proxy", default=None
)
predictions: List[str] = field(
"Prediction columns whose values we have to update",
default_factory=lambda: [],
)
# TODO Get rid of this basic appoach when we implement #1168
html: str = field(
"Construct a DataFrame using DataFrame.read_html(). Passing this as URL",
default=None,
)
html_table_index: int = field(
"If there are multiple html tables on a page, which one? Array indexed"
", so first table means 0, if you want the second table on the page"
", use 1 here.",
default=0,
)
protocol_allowlist: List[str] = field(
'List of protocols allowed for ``html`` URL. Example ``["http://"]``',
default_factory=lambda: DEFAULT_PROTOCOL_ALLOWLIST,
)


@entrypoint("dataframe")
class DataFrameSource(BaseSource):
r"""
Proxy for a pandas DataFrame
Examples
--------
You can pass a pandas DataFrame to this class directly via the Python API.
Or you can create DataFrames from other data sources via the Python API or
the command line.
**Example of creating a DataFrame from HTML via command line.**
Create an HTML table.
**index.html**
.. code-block:: html
:test:
:filepath: index.html
<table>
<tr>
<th>Years</th>
<th>Salary</th>
</tr>
<tr>
<td>0</td>
<td>10</td>
</tr>
<tr>
<td>1</td>
<td>20</td>
</tr>
<tr>
<td>2</td>
<td>30</td>
</tr>
</table>
Start the HTTP server to server the HTML page with the table
.. code-block:: console
:test:
:daemon: 8000
$ python -m http.server 8000
In another terminal. List all the records in the source.
.. code-block:: console
:test:
:replace: cmds[0][-3] = cmds[0][-3].replace("8000", str(ctx["HTTP_SERVER"]["8000"]))
$ dffml list records \
-sources table=dataframe \
-source-table-html http://127.0.0.1:8000/index.html \
-source-table-protocol_allowlist http://
[
{
"extra": {},
"features": {
"Salary": 10,
"Years": 0
},
"key": "0"
},
{
"extra": {},
"features": {
"Salary": 20,
"Years": 1
},
"key": "1"
},
{
"extra": {},
"features": {
"Salary": 30,
"Years": 2
},
"key": "2"
}
]
"""

CONFIG = DataFrameSourceConfig
CONTEXT = DataFrameSourceContext

def __init__(self, config):
super().__init__(config)
# Create DataFrame if not given
if self.config.dataframe is None:
try:
# Try import
import pandas
except (ModuleNotFoundError, ImportError) as error:
# If it fails say that pandas must be installed to create new
# DataFrames
raise PandasNotInstalled(
"Pandas is required to create new DataFrames. $ pip install pandas"
) from error
# TODO Modify this in line with changes for #1168
if self.config.html is not None:
dataframes = pandas.read_html(self.config.html)
if self.config.html_table_index >= len(dataframes):
raise DataFrameHTMLTableIndexNotFoundError(
f"Index {self.config.html_table_index} requested"
f" {len(dataframes)} table(s) found."
)
self.config.dataframe = dataframes[
self.config.html_table_index
]
else:
# Create empty DataFrame
self.config.dataframe = pandas.DataFrame()
5 changes: 4 additions & 1 deletion requirements-dev.txt
Original file line number Diff line number Diff line change
Expand Up @@ -16,4 +16,7 @@ httptest>=0.0.15
Pillow>=8.3.1
pre-commit
ipykernel
matplotlib
matplotlib
# Needed to run DataFrame test, not required for module
pandas>=1.0
lxml>=4.6.3
1 change: 1 addition & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ class InstallException(Exception):
"df = dffml.source.df:DataFlowSource",
"op = dffml.source.op:OpSource",
"dir = dffml.source.dir:DirectorySource",
"dataframe = dffml.source.dataframe:DataFrameSource",
"iris.training = dffml.source.dataset.iris:iris_training.source",
],
"dffml.port": ["json = dffml.port.json:JSON"],
Expand Down
104 changes: 104 additions & 0 deletions tests/source/test_dataframe.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
import os
import pandas as pd

from dffml import (
Record,
load,
save,
AsyncTestCase,
DataFrameSource,
DataFrameSourceConfig,
)


class TestDataFrameSource(AsyncTestCase):
async def test_dataframe(self):

mydict = [{"A": 1, "B": 2, "C": 3}]
df = pd.DataFrame(mydict)

source = DataFrameSource(
DataFrameSourceConfig(dataframe=df, predictions=["C"],)
)
# Save some data in the source
await save(
source,
Record(
"1",
data={
"features": {"A": 4, "B": 5},
"prediction": {"C": {"value": 6}},
},
),
Record(
"2",
data={
"features": {"A": 7, "B": 8},
"prediction": {"C": {"value": 9}},
},
),
)

# Load all the records
records = [record async for record in load(source)]

self.assertIsInstance(records, list)
self.assertEqual(len(records), 3)
self.assertDictEqual(records[0].features(), {"A": 1, "B": 2})
self.assertDictEqual(
records[0].predictions(), {"C": {"confidence": 0.0, "value": 3}}
)
self.assertDictEqual(records[1].features(), {"A": 4, "B": 5})
self.assertDictEqual(
records[1].predictions(), {"C": {"confidence": 0.0, "value": 6}}
)
self.assertDictEqual(records[2].features(), {"A": 7, "B": 8})
self.assertDictEqual(
records[2].predictions(), {"C": {"confidence": 0.0, "value": 9}}
)

async def test_update(self):

mydict = [{"A": 1, "B": 2, "C": 3}]
df = pd.DataFrame(mydict)

source = DataFrameSource(
DataFrameSourceConfig(dataframe=df, predictions=["C", "B"])
)
# Save some data in the source
await save(
source,
Record("1", data={"features": {"A": 4, "B": 5, "C": 6}}),
Record("2", data={"features": {"A": 7, "B": 8, "C": 9}}),
)

await save(
source, Record("2", data={"features": {"A": 15, "B": 16, "C": 14}})
)

records = [record async for record in load(source)]
self.assertEqual(len(records), 3)
self.assertDictEqual(records[0].features(), {"A": 1})
self.assertDictEqual(
records[0].predictions(),
{
"B": {"confidence": 0.0, "value": 2},
"C": {"confidence": 0.0, "value": 3},
},
)
self.assertDictEqual(records[1].features(), {"A": 4})
self.assertDictEqual(
records[1].predictions(),
{
"B": {"confidence": 0.0, "value": 5},
"C": {"confidence": 0.0, "value": 6},
},
)
self.assertDictEqual(records[2].features(), {"A": 15,})
self.assertDictEqual(
records[2].predictions(),
{
"B": {"confidence": 0.0, "value": 16},
"C": {"confidence": 0.0, "value": 14},
},
)

0 comments on commit 1ef5a16

Please sign in to comment.