Skip to content

Commit

Permalink
implements sample
Browse files Browse the repository at this point in the history
  • Loading branch information
sdpython committed Nov 1, 2017
1 parent 10df5b5 commit 87abc7e
Show file tree
Hide file tree
Showing 4 changed files with 134 additions and 30 deletions.
35 changes: 35 additions & 0 deletions _doc/sphinxdoc/source/tutorial/index.rst
Expand Up @@ -27,3 +27,38 @@ when it does not fit into memory.
sdf.train_test_split("dataset_split_{}.txt", sep="\t")

>>> ['dataset_split_train.txt', 'dataset_split_test.txt']

.. contents::
:local:

Check the schema consistency of a large file
++++++++++++++++++++++++++++++++++++++++++++

Large files usually comes from an export of a database and this
for some reason, this export failed for a couple of lines.
It can be character *end of line* not removed from a comment,
a separator also present in the data. When that happens, :epkg:`pandas`
takes the least strict type as the column type. Sometimes, we prefer to get a
an idea of where we could find the error.

.. runpython::
:showcode:

import pandas
df = pandas.DataFrame([dict(cf=0, cint=0, cstr="0"), dict(cf=1, cint=1, cstr="1"),
dict(cf=2, cint="s2", cstr="2"), dict(cf=3, cint=3, cstr="3")])
name = "temp_df.csv"
df.to_csv(name, index=False)

from pandas_streaming.df import StreamingDataFrame
try:
sdf = StreamingDataFrame.read_csv(name, chunksize=2)
for df in sdf:
print(df.dtypes)
except Exception as e:
print(e)

The method :py:meth:`__iter__ <pandas_streaming.df.dataframe.StreamingDataFrame.__iter__>`
checks that the schema does not change between two iterations.
It can be disabled by adding *check_schema=False* when
the constructor is called.
38 changes: 31 additions & 7 deletions _unittests/ut_df/test_streaming_dataframe.py
Expand Up @@ -45,6 +45,7 @@
from src.pandas_streaming.data import dummy_streaming_dataframe
from src.pandas_streaming.exc import StreamingInefficientException
from src.pandas_streaming.df import StreamingDataFrame
from src.pandas_streaming.df.dataframe import StreamingDataFrameSchemaError


class TestStreamingDataFrame(ExtTestCase):
Expand Down Expand Up @@ -196,9 +197,9 @@ def test_apply(self):
self.assertNotEmpty(list(sdf))
sdf = sdf.applymap(str)
self.assertNotEmpty(list(sdf))
sdf = sdf.apply(lambda row: row["cint"] + "r", axis=1)
sdf = sdf.apply(lambda row: row[["cint"]] + "r", axis=1)
self.assertNotEmpty(list(sdf))
text = sdf.to_csv()
text = sdf.to_csv(header=False)
self.assertStartsWith("0,0r\n1,1r\n2,2r\n3,3r", text)

def test_train_test_split(self):
Expand Down Expand Up @@ -257,19 +258,23 @@ def test_train_test_split_streaming_tiny(self):
sdftr, sdfte = sdf2.train_test_split(test_size=0.5)
df1 = sdfte.head()
df2 = sdfte.head()
self.assertEqualDataFrame(df1, df2)
if df1 is not None or df2 is not None:
self.assertEqualDataFrame(df1, df2)
df1 = sdftr.head()
df2 = sdftr.head()
self.assertEqualDataFrame(df1, df2)
if df1 is not None or df2 is not None:
self.assertEqualDataFrame(df1, df2)
sdf = StreamingDataFrame.read_df(df)
sdf2 = sdf.concat(sdf)
sdftr, sdfte = sdf2.train_test_split(test_size=0.5)
df1 = sdfte.head()
df2 = sdfte.head()
self.assertEqualDataFrame(df1, df2)
if df1 is not None or df2 is not None:
self.assertEqualDataFrame(df1, df2)
df1 = sdftr.head()
df2 = sdftr.head()
self.assertEqualDataFrame(df1, df2)
if df1 is not None or df2 is not None:
self.assertEqualDataFrame(df1, df2)

def test_train_test_split_streaming_strat(self):
fLOG(
Expand Down Expand Up @@ -411,7 +416,7 @@ def test_groupby(self):

df20 = dummy_streaming_dataframe(20).to_dataframe()
df20["key"] = df20["cint"].apply(lambda i: i % 3 == 0)
sdf20 = StreamingDataFrame.read_df(df20, chunk_size=5)
sdf20 = StreamingDataFrame.read_df(df20, chunksize=5)
gr = sdf20.groupby("key", lambda gr: gr.sum())
gr2 = df20.groupby("key").sum()
self.assertEqualDataFrame(gr, gr2)
Expand Down Expand Up @@ -451,6 +456,25 @@ def test_merge_2(self):
self.assertEqualDataFrame(jm.sort_values(["X", "Y"]).reset_index(drop=True),
sjm.to_dataframe().sort_values(["X", "Y"]).reset_index(drop=True))

def test_schema_consistant(self):
fLOG(
__file__,
self._testMethodName,
OutputPrint=__name__ == "__main__")

df = pandas.DataFrame([dict(cf=0, cint=0, cstr="0"), dict(cf=1, cint=1, cstr="1"),
dict(cf=2, cint="s2", cstr="2"), dict(cf=3, cint=3, cstr="3")])
temp = get_temp_folder(__file__, "temp_schema_consistant")
name = os.path.join(temp, "df.csv")
df.to_csv(name, index=False)
self.assertEqual(df.shape, (4, 3))
sdf = StreamingDataFrame.read_csv(name, chunksize=2)
self.assertRaise(lambda: list(sdf), StreamingDataFrameSchemaError)
sdf = StreamingDataFrame.read_csv(
name, chunksize=2, check_schema=False)
pieces = list(sdf)
self.assertEqual(len(pieces), 2)


if __name__ == "__main__":
unittest.main()
Expand Down
6 changes: 3 additions & 3 deletions src/pandas_streaming/data/dummy.py
Expand Up @@ -7,13 +7,13 @@
from ..df import StreamingDataFrame


def dummy_streaming_dataframe(n, chunk_size=10, asfloat=False, **cols):
def dummy_streaming_dataframe(n, chunksize=10, asfloat=False, **cols):
"""
Returns a dummy streaming dataframe
mostly for unit test purposes.
@param n number of rows
@param chunk_size chunk size
@param chunksize chunk size
@param asfloat use random float and not random int
@param cols additional columns
@return a @see cl StreamingDataFrame
Expand All @@ -26,4 +26,4 @@ def dummy_streaming_dataframe(n, chunk_size=10, asfloat=False, **cols):
"s{0}".format(i) for i in range(0, n)]))
for k, v in cols.items():
df[k] = v
return StreamingDataFrame.read_df(df, chunk_size=chunk_size)
return StreamingDataFrame.read_df(df, chunksize=chunksize)
85 changes: 65 additions & 20 deletions src/pandas_streaming/df/dataframe.py
Expand Up @@ -9,6 +9,13 @@
from ..exc import StreamingInefficientException


class StreamingDataFrameSchemaError(Exception):
"""
Reveals an issue with inconsistant schemas.
"""
pass


class StreamingDataFrame:
"""
Defines a streaming dataframe.
Expand All @@ -35,11 +42,19 @@ class StreamingDataFrame:
methods can be chained.
"""

def __init__(self, iter_creation):
def __init__(self, iter_creation, check_schema=True):
"""
@param iter_creation function which creates an iterator.
@param iter_creation function which creates an iterator
@param check_schema checks that the schema is the same for every dataframe
"""
self.iter_creation = iter_creation
self.check_schema = check_schema

def get_kwargs(self):
"""
Returns the parameters used to call the constructor.
"""
return dict(check_schema=self.check_schema)

def train_test_split(self, path_or_buf=None, export_method="to_csv",
names=None, streaming=True, partitions=None,
Expand Down Expand Up @@ -85,6 +100,18 @@ def train_test_split(self, path_or_buf=None, export_method="to_csv",
export_method=export_method,
names=names, **kwargs)

@staticmethod
def _process_kwargs(kwargs):
"""
Filters out parameters for the constructor of this class.
"""
kw = {}
for k in {'check_schema'}:
if k in kwargs:
kw[k] = kwargs[k]
del kwargs[k]
return kw

@staticmethod
def read_csv(*args, **kwargs) -> 'StreamingDataFrame':
"""
Expand All @@ -95,8 +122,9 @@ def read_csv(*args, **kwargs) -> 'StreamingDataFrame':
"""
if not kwargs.get('iterator', True):
raise ValueError("If specified, iterator must be True.")
kwargs_create = StreamingDataFrame._process_kwargs(kwargs)
kwargs['iterator'] = True
return StreamingDataFrame(lambda: pandas.read_csv(*args, **kwargs))
return StreamingDataFrame(lambda: pandas.read_csv(*args, **kwargs), **kwargs_create)

@staticmethod
def read_str(text, **kwargs) -> 'StreamingDataFrame':
Expand All @@ -108,38 +136,55 @@ def read_str(text, **kwargs) -> 'StreamingDataFrame':
"""
if not kwargs.get('iterator', True):
raise ValueError("If specified, iterator must be True.")
kwargs_create = StreamingDataFrame._process_kwargs(kwargs)
kwargs['iterator'] = True
buffer = StringIO(text)
return StreamingDataFrame(lambda: pandas.read_csv(buffer, **kwargs))
return StreamingDataFrame(lambda: pandas.read_csv(buffer, **kwargs), **kwargs_create)

@staticmethod
def read_df(df, chunk_size=None) -> 'StreamingDataFrame':
def read_df(df, chunksize=None, check_schema=True) -> 'StreamingDataFrame':
"""
Splits a dataframe into small chunks mostly for
unit testing purposes.
@param df :epkg:`pandas:DataFrame`
@param chunk_size number rows per chunks (// 10 by default)
@return iterator on @see cl StreamingDataFrame
@param df :epkg:`pandas:DataFrame`
@param chunksize number rows per chunks (// 10 by default)
@param check_schema check schema between two iterations
@return iterator on @see cl StreamingDataFrame
"""
if chunk_size is None:
chunk_size = df.shape[0]
if chunksize is None:
chunksize = df.shape[0]

def local_iterator():
for i in range(0, df.shape[0], chunk_size):
end = min(df.shape[0], i + chunk_size)
for i in range(0, df.shape[0], chunksize):
end = min(df.shape[0], i + chunksize)
yield df[i:end].copy()
return StreamingDataFrame(local_iterator)
return StreamingDataFrame(local_iterator, check_schema=check_schema)

def __iter__(self):
"""
Iterator on a large file with a sliding window.
Each windows is a :epkg:`pandas:DataFrame`.
The method stores a copy of the initial iterator
and restores it after the end of the iterations.
If *check_schema* was enabled when calling the constructor,
the method checks that every dataframe follows the same schema
as the first chunck.
"""
iter = self.iter_creation()
sch = None
rows = 0
for it in iter:
if sch is None:
sch = (list(it.columns), list(it.dtypes))
elif self.check_schema:
if list(it.columns) != sch[0]:
raise StreamingDataFrameSchemaError(
'Column names are different after row {0}\nFirst chunk: {1}\nCurrent chunk: {2}'.format(rows, sch[0], list(it.columns)))
if list(it.dtypes) != sch[1]:
raise StreamingDataFrameSchemaError(
'Column types are different after row {0}\nFirst chunk: {1}\nCurrent chunk: {2}'.format(rows, sch[1], list(it.dtypes)))
rows += it.shape[0]
yield it

def sort_values(self, *args, **kwargs):
Expand Down Expand Up @@ -259,7 +304,7 @@ def where(self, *args, **kwargs) -> 'StreamingDataFrame':
This function returns a @see cl StreamingDataFrame.
"""
kwargs['inplace'] = False
return StreamingDataFrame(lambda: map(lambda df: df.where(*args, **kwargs), self))
return StreamingDataFrame(lambda: map(lambda df: df.where(*args, **kwargs), self), **self.get_kwargs())

def sample(self, **kwargs) -> 'StreamingDataFrame':
"""
Expand All @@ -270,21 +315,21 @@ def sample(self, **kwargs) -> 'StreamingDataFrame':
"""
if 'n' in kwargs:
raise ValueError('Only frac is implemented.')
return StreamingDataFrame(lambda: map(lambda df: df.sample(**kwargs), self))
return StreamingDataFrame(lambda: map(lambda df: df.sample(**kwargs), self), **self.get_kwargs())

def apply(self, *args, **kwargs) -> 'StreamingDataFrame':
"""
Applies :epkg:`pandas:DataFrame:apply`.
This function returns a @see cl StreamingDataFrame.
"""
return StreamingDataFrame(lambda: map(lambda df: df.apply(*args, **kwargs), self))
return StreamingDataFrame(lambda: map(lambda df: df.apply(*args, **kwargs), self), **self.get_kwargs())

def applymap(self, *args, **kwargs) -> 'StreamingDataFrame':
"""
Applies :epkg:`pandas:DataFrame:applymap`.
This function returns a @see cl StreamingDataFrame.
"""
return StreamingDataFrame(lambda: map(lambda df: df.applymap(*args, **kwargs), self))
return StreamingDataFrame(lambda: map(lambda df: df.applymap(*args, **kwargs), self), **self.get_kwargs())

def merge(self, right, **kwargs) -> 'StreamingDataFrame':
"""
Expand All @@ -294,15 +339,15 @@ def merge(self, right, **kwargs) -> 'StreamingDataFrame':
a double loop, loop on *self*, loop on *right*.
"""
if isinstance(right, pandas.DataFrame):
return self.merge(StreamingDataFrame.read_df(right, chunk_size=right.shape[0]), **kwargs)
return self.merge(StreamingDataFrame.read_df(right, chunksize=right.shape[0]), **kwargs)

def iterator_merge(sdf1, sdf2, **kw):
for df1 in sdf1:
for df2 in sdf2:
df = df1.merge(df2, **kw)
yield df

return StreamingDataFrame(lambda: iterator_merge(self, right, **kwargs))
return StreamingDataFrame(lambda: iterator_merge(self, right, **kwargs), **self.get_kwargs())

def concat(self, others) -> 'StreamingDataFrame':
"""
Expand Down Expand Up @@ -348,7 +393,7 @@ def change_type(obj):
return obj

others = list(map(change_type, others))
return StreamingDataFrame(lambda: iterator_concat(self, others))
return StreamingDataFrame(lambda: iterator_concat(self, others), **self.get_kwargs())

def groupby(self, by=None, lambda_agg=None, in_memory=True, **kwargs) -> pandas.DataFrame:
"""
Expand Down

0 comments on commit 87abc7e

Please sign in to comment.