Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

Already on GitHub? Sign in to your account

ENH: improve performance of df.to_csv GH3054 #3059

Merged
merged 27 commits into from Mar 19, 2013
Commits
Jump to file or symbol
Failed to load files and symbols.
+620 −122
Split
View
@@ -47,6 +47,7 @@ pandas 0.11.0
**Improvements to existing features**
+ - Improved performance of dv.to_csv() by up to 10x in some cases. (GH3059_)
- added ``blocks`` attribute to DataFrames, to return a dict of dtypes to
homogeneously dtyped DataFrames
- added keyword ``convert_numeric`` to ``convert_objects()`` to try to
@@ -62,6 +63,8 @@ pandas 0.11.0
strings that can be parsed with datetime.strptime
- Add ``axes`` property to ``Series`` for compatibility
- Add ``xs`` function to ``Series`` for compatibility
+ - Add ``chunksize`` parameter to ``to_csv`` to allow writing in chunks
+ to enable constant memory usage
**API Changes**
@@ -183,6 +186,7 @@ pandas 0.11.0
.. _GH3012: https://github.com/pydata/pandas/issues/3012
.. _GH3029: https://github.com/pydata/pandas/issues/3029
.. _GH3041: https://github.com/pydata/pandas/issues/3041
+.. _GH3059: https://github.com/pydata/pandas/issues/3039
pandas 0.10.1
View
@@ -229,6 +229,8 @@ API changes
Enhancements
~~~~~~~~~~~~
+ - Improved performance of dv.to_csv() by up to 10x in some cases. (GH3059_)
+
- Numexpr is now a :ref:`Recommended Dependencies <install.recommended_dependencies>`, to accelerate certain
types of numerical and boolean operations
@@ -331,3 +333,4 @@ on GitHub for a complete list.
.. _GH2806: https://github.com/pydata/pandas/issues/2806
.. _GH2807: https://github.com/pydata/pandas/issues/2807
.. _GH2918: https://github.com/pydata/pandas/issues/2918
+.. _GH3059: https://github.com/pydata/pandas/issues/3059
View
@@ -101,7 +101,6 @@ def _isnull_old(obj):
_isnull = _isnull_new
-
def _use_inf_as_null(key):
'''Option change callback for null/inf behaviour
Choose which replacement for numpy.isnan / -numpy.isfinite is used.
@@ -1594,6 +1593,26 @@ def _check_as_is(x):
# empty queue
self.queue.truncate(0)
+ def writerows(self, rows):
+ def _check_as_is(x):
+ return (self.quoting == csv.QUOTE_NONNUMERIC and
+ is_number(x)) or isinstance(x, str)
+
+ for i, row in enumerate(rows):
+ rows[i] = [x if _check_as_is(x)
+ else pprint_thing(x).encode('utf-8') for x in row]
+
+ self.writer.writerows([[s for s in row] for row in rows])
+ # Fetch UTF-8 output from the queue ...
+ data = self.queue.getvalue()
+ data = data.decode("utf-8")
+ # ... and reencode it into the target encoding
+ data = self.encoder.encode(data)
+ # write to the target stream
+ self.stream.write(data)
+ # empty queue
+ self.queue.truncate(0)
+
_NS_DTYPE = np.dtype('M8[ns]')
View
@@ -9,7 +9,7 @@
from io import StringIO
from pandas.core.common import adjoin, isnull, notnull
-from pandas.core.index import MultiIndex, _ensure_index
+from pandas.core.index import Index, MultiIndex, _ensure_index
from pandas.util import py3compat
from pandas.core.config import get_option, set_option, reset_option
import pandas.core.common as com
@@ -18,6 +18,7 @@
import numpy as np
import itertools
+import csv
from pandas.tseries.period import PeriodIndex
@@ -763,6 +764,260 @@ def grouper(x):
return result
+class CSVFormatter(object):
+
+ def __init__(self, obj, path_or_buf, sep=",", na_rep='', float_format=None,
+ cols=None, header=True, index=True, index_label=None,
+ mode='w', nanRep=None, encoding=None, quoting=None,
+ line_terminator='\n', chunksize=None,legacy=False):
+ self.legacy=legacy # remove for 0.12
+ self.obj = obj
+ self.path_or_buf = path_or_buf
+ self.sep = sep
+ self.na_rep = na_rep
+ self.float_format = float_format
+
+ self.header = header
+ self.index = index
+ self.index_label = index_label
+ self.mode = mode
+ self.encoding = encoding
+
+ if quoting is None:
+ quoting = csv.QUOTE_MINIMAL
+ self.quoting = quoting
+
+ self.line_terminator = line_terminator
+
+ if cols is None:
+ cols = obj.columns
+
+ if isinstance(cols,Index):
+ cols = cols.to_native_types(na_rep=na_rep,float_format=float_format)
+ else:
+ cols=list(cols)
+ self.cols = cols
+
+ # preallocate data 2d list
+ self.blocks = self.obj._data.blocks
+ ncols = sum(len(b.items) for b in self.blocks)
+ self.data =[None] * ncols
+
+ # fail early if we have duplicate columns
+ if len(set(self.cols)) != len(self.cols):
+ raise Exception("duplicate columns are not permitted in to_csv")
+
+ self.colname_map = dict((k,i) for i,k in enumerate(obj.columns))
+
+ if chunksize is None:
+ chunksize = (100000/ (len(self.cols) or 1)) or 1
+ self.chunksize = chunksize
+
+ self.data_index = obj.index
+ if isinstance(obj.index, PeriodIndex):
+ self.data_index = obj.index.to_timestamp()
+
+ self.nlevels = getattr(self.data_index, 'nlevels', 1)
+ if not index:
+ self.nlevels = 0
+
+ # legacy to be removed in 0.12
+ def _helper_csv(self, writer, na_rep=None, cols=None,
+ header=True, index=True,
+ index_label=None, float_format=None):
+ if cols is None:
+ cols = self.columns
+
+ series = {}
+ for k, v in self.obj._series.iteritems():
+ series[k] = v.values
+
+
+ has_aliases = isinstance(header, (tuple, list, np.ndarray))
+ if has_aliases or header:
+ if index:
+ # should write something for index label
+ if index_label is not False:
+ if index_label is None:
+ if isinstance(self.obj.index, MultiIndex):
+ index_label = []
+ for i, name in enumerate(self.obj.index.names):
+ if name is None:
+ name = ''
+ index_label.append(name)
+ else:
+ index_label = self.obj.index.name
+ if index_label is None:
+ index_label = ['']
+ else:
+ index_label = [index_label]
+ elif not isinstance(index_label, (list, tuple, np.ndarray)):
+ # given a string for a DF with Index
+ index_label = [index_label]
+
+ encoded_labels = list(index_label)
+ else:
+ encoded_labels = []
+
+ if has_aliases:
+ if len(header) != len(cols):
+ raise ValueError(('Writing %d cols but got %d aliases'
+ % (len(cols), len(header))))
+ else:
+ write_cols = header
+ else:
+ write_cols = cols
+ encoded_cols = list(write_cols)
+
+ writer.writerow(encoded_labels + encoded_cols)
+ else:
+ encoded_cols = list(cols)
+ writer.writerow(encoded_cols)
+
+ data_index = self.obj.index
+ if isinstance(self.obj.index, PeriodIndex):
+ data_index = self.obj.index.to_timestamp()
+
+ nlevels = getattr(data_index, 'nlevels', 1)
+ for j, idx in enumerate(data_index):
+ row_fields = []
+ if index:
+ if nlevels == 1:
+ row_fields = [idx]
+ else: # handle MultiIndex
+ row_fields = list(idx)
+ for i, col in enumerate(cols):
+ val = series[col][j]
+ if lib.checknull(val):
+ val = na_rep
+
+ if float_format is not None and com.is_float(val):
+ val = float_format % val
+ elif isinstance(val, np.datetime64):
+ val = lib.Timestamp(val)._repr_base
+
+ row_fields.append(val)
+
+ writer.writerow(row_fields)
+
+ def save(self):
+ # create the writer & save
+ if hasattr(self.path_or_buf, 'read'):
+ f = self.path_or_buf
+ close = False
+ else:
+ f = com._get_handle(self.path_or_buf, self.mode, encoding=self.encoding)
+ close = True
+
+ try:
+ if self.encoding is not None:
+ self.writer = com.UnicodeWriter(f, lineterminator=self.line_terminator,
+ delimiter=self.sep, encoding=self.encoding,
+ quoting=self.quoting)
+ else:
+ self.writer = csv.writer(f, lineterminator=self.line_terminator,
+ delimiter=self.sep, quoting=self.quoting)
+
+ if self.legacy:
+ # to be removed in 0.12
+ self._helper_csv(self.writer, na_rep=self.na_rep,
+ float_format=self.float_format, cols=self.cols,
+ header=self.header, index=self.index,
+ index_label=self.index_label)
+
+ else:
+ self._save()
+
+
+ finally:
+ if close:
+ f.close()
+
+ def _save_header(self):
+
+ writer = self.writer
+ obj = self.obj
+ index_label = self.index_label
+ cols = self.cols
+ header = self.header
+
+ has_aliases = isinstance(header, (tuple, list, np.ndarray))
+ if has_aliases or self.header:
+ if self.index:
+ # should write something for index label
+ if index_label is not False:
+ if index_label is None:
+ if isinstance(obj.index, MultiIndex):
+ index_label = []
+ for i, name in enumerate(obj.index.names):
+ if name is None:
+ name = ''
+ index_label.append(name)
+ else:
+ index_label = obj.index.name
+ if index_label is None:
+ index_label = ['']
+ else:
+ index_label = [index_label]
+ elif not isinstance(index_label, (list, tuple, np.ndarray)):
+ # given a string for a DF with Index
+ index_label = [index_label]
+
+ encoded_labels = list(index_label)
+ else:
+ encoded_labels = []
+
+ if has_aliases:
+ if len(header) != len(cols):
+ raise ValueError(('Writing %d cols but got %d aliases'
+ % (len(cols), len(header))))
+ else:
+ write_cols = header
+ else:
+ write_cols = cols
+ encoded_cols = list(write_cols)
+
+ writer.writerow(encoded_labels + encoded_cols)
+ else:
+ encoded_cols = list(cols)
+ writer.writerow(encoded_cols)
+
+ def _save(self):
+
+ self._save_header()
+
+ nrows = len(self.data_index)
+
+ # write in chunksize bites
+ chunksize = self.chunksize
+ chunks = int(nrows / chunksize)+1
+
+ for i in xrange(chunks):
+ start_i = i * chunksize
+ end_i = min((i + 1) * chunksize, nrows)
+ if start_i >= end_i:
+ break
+
+ self._save_chunk(start_i, end_i)
+
+ def _save_chunk(self, start_i, end_i):
+
+ colname_map = self.colname_map
+ data_index = self.data_index
+
+ # create the data for a chunk
+ slicer = slice(start_i,end_i)
+ for i in range(len(self.blocks)):
+ b = self.blocks[i]
+ d = b.to_native_types(slicer=slicer, na_rep=self.na_rep, float_format=self.float_format)
+ for j, k in enumerate(b.items):
+ # self.data is a preallocated list
+ self.data[colname_map[k]] = d[j]
+
+ ix = data_index.to_native_types(slicer=slicer, na_rep=self.na_rep, float_format=self.float_format)
+
+ lib.write_csv_rows(self.data, ix, self.nlevels, self.cols, self.writer)
+
# from collections import namedtuple
# ExcelCell = namedtuple("ExcelCell",
# 'row, col, val, style, mergestart, mergeend')
Oops, something went wrong.