From 3fdc6d986a155953a7abb1548ff4d6389f9aadcb Mon Sep 17 00:00:00 2001 From: Wes McKinney Date: Tue, 15 May 2012 15:18:18 -0400 Subject: [PATCH] ENH: add group-wise merge capability to ordered_merge, unit tests, close #813 --- RELEASE.rst | 2 + pandas/src/join.pyx | 21 ++++++ pandas/tools/merge.py | 126 +++++++++++++++++++++++++------ pandas/tools/tests/test_merge.py | 23 +++--- vb_suite/join_merge.py | 17 +++++ 5 files changed, 155 insertions(+), 34 deletions(-) diff --git a/RELEASE.rst b/RELEASE.rst index 24ab824914b98..1c9b24815ad0d 100644 --- a/RELEASE.rst +++ b/RELEASE.rst @@ -48,6 +48,8 @@ pandas 0.8.0 a particular order (#610) - Can pass dicts with lists of functions or dicts to GroupBy aggregate to do much more flexible multiple function aggregation (#642) + - New ordered_merge functions for merging DataFrames with ordered + data. Also supports group-wise merging for panel data (#813) **Improvements to existing features** diff --git a/pandas/src/join.pyx b/pandas/src/join.pyx index a135a1c86126b..06d00fe2e16f7 100644 --- a/pandas/src/join.pyx +++ b/pandas/src/join.pyx @@ -191,6 +191,27 @@ def _get_result_indexer(sorter, indexer): return res + +def ffill_indexer(ndarray[int64_t] indexer): + cdef: + Py_ssize_t i, n = len(indexer) + ndarray[int64_t] result + int64_t val, last_obs + + result = np.empty(n, dtype=np.int64) + last_obs = -1 + + for i in range(n): + val = indexer[i] + if val == -1: + result[i] = last_obs + else: + result[i] = val + last_obs = val + + return result + + def ffill_by_group(ndarray[int64_t] indexer, ndarray[int64_t] group_ids, int64_t max_group): cdef: diff --git a/pandas/tools/merge.py b/pandas/tools/merge.py index 680864ee542c1..eaf833f47dd7b 100644 --- a/pandas/tools/merge.py +++ b/pandas/tools/merge.py @@ -33,17 +33,107 @@ def merge(left, right, how='inner', on=None, left_on=None, right_on=None, if __debug__: merge.__doc__ = _merge_doc % '\nleft : DataFrame' -def ordered_merge(left, right, on=None, by=None, left_on=None, right_on=None, - left_index=False, right_index=False, fill_method=None, - suffixes=('_x', '_y')): - """ +def ordered_merge(left, right, on=None, left_by=None, right_by=None, + left_on=None, right_on=None, + fill_method=None, suffixes=('_x', '_y')): + """Perform merge with optional filling/interpolation designed for ordered + data like time series data. Optionally perform group-wise merge (see + examples) + + Parameters + ---------- + left : DataFrame + right : DataFrame + fill_method : {'ffill', None}, default None + Interpolation method for data + on : label or list + Field names to join on. Must be found in both DataFrames. + left_on : label or list, or array-like + Field names to join on in left DataFrame. Can be a vector or list of + vectors of the length of the DataFrame to use a particular vector as + the join key instead of columns + right_on : label or list, or array-like + Field names to join on in right DataFrame or vector/list of vectors per + left_on docs + left_by : column name or list of column names + Group left DataFrame by group columns and merge piece by piece with + right DataFrame + right_by : column name or list of column names + Group right DataFrame by group columns and merge piece by piece with + left DataFrame + suffixes : 2-length sequence (tuple, list, ...) + Suffix to apply to overlapping column names in the left and right + side, respectively + + Examples + -------- + >>> A >>> B + key lvalue group key rvalue + 0 a 1 a 0 b 1 + 1 c 2 a 1 c 2 + 2 e 3 a 2 d 3 + 3 a 1 b + 4 c 2 b + 5 e 3 b + + >>> ordered_merge(A, B, fill_method='ffill', left_by='group') + key lvalue group rvalue + 0 a 1 a NaN + 1 b 1 a 1 + 2 c 2 a 2 + 3 d 2 a 3 + 4 e 3 a 3 + 5 f 3 a 4 + 6 a 1 b NaN + 7 b 1 b 1 + 8 c 2 b 2 + 9 d 2 b 3 + 10 e 3 b 3 + 11 f 3 b 4 + Returns + ------- + merged : DataFrame """ - op = _OrderedMerge(left, right, on=on, left_on=left_on, - right_on=right_on, left_index=left_index, - right_index=right_index, suffixes=suffixes, - fill_method=fill_method, by=by) - return op.get_result() + def _merger(x, y): + op = _OrderedMerge(x, y, on=on, left_on=left_on, right_on=right_on, + # left_index=left_index, right_index=right_index, + suffixes=suffixes, fill_method=fill_method) + return op.get_result() + + if left_by is not None and right_by is not None: + raise ValueError('Can only group either left or right frames') + elif left_by is not None: + if not isinstance(left_by, (list, tuple)): + left_by = [left_by] + pieces = [] + for key, xpiece in left.groupby(left_by): + merged = _merger(xpiece, right) + for k in left_by: + # May have passed ndarray + try: + if k in merged: + merged[k] = key + except: + pass + pieces.append(merged) + return concat(pieces, ignore_index=True) + elif right_by is not None: + if not isinstance(right_by, (list, tuple)): + right_by = [right_by] + pieces = [] + for key, ypiece in right.groupby(right_by): + merged = _merger(left, ypiece) + for k in right_by: + try: + if k in merged: + merged[k] = key + except: + pass + pieces.append(merged) + return concat(pieces, ignore_index=True) + else: + return _merger(left, right) @@ -158,9 +248,6 @@ def _get_join_info(self): # max groups = largest possible number of distinct groups left_key, right_key, max_groups = self._get_group_keys() - # left_key = com._ensure_int64(left_key) - # right_key = com._ensure_int64(right_key) - join_func = _join_functions[self.how] left_indexer, right_indexer = join_func(left_key, right_key, max_groups) @@ -346,7 +433,6 @@ def __init__(self, left, right, on=None, by=None, left_on=None, suffixes=('_x', '_y'), copy=True, fill_method=None): - self.by = by self.fill_method = fill_method _MergeOperation.__init__(self, left, right, on=on, left_on=left_on, @@ -365,15 +451,8 @@ def get_result(self): ldata, rdata = self._get_merge_data() if self.fill_method == 'ffill': - # group_index, max_group = self._get_group_index() - - group_index = np.repeat(0, len(left_indexer)) - max_group = 1 - - left_join_indexer = lib.ffill_by_group(left_indexer, group_index, - max_group) - right_join_indexer = lib.ffill_by_group(right_indexer, group_index, - max_group) + left_join_indexer = lib.ffill_indexer(left_indexer) + right_join_indexer = lib.ffill_indexer(right_indexer) else: left_join_indexer = left_indexer right_join_indexer = right_indexer @@ -389,9 +468,6 @@ def get_result(self): return result - def _get_group_index(self): - pass - def _get_multiindex_indexer(join_keys, index, sort=False): shape = [] labels = [] diff --git a/pandas/tools/tests/test_merge.py b/pandas/tools/tests/test_merge.py index 75d432af94e27..701acfddf5ea5 100644 --- a/pandas/tools/tests/test_merge.py +++ b/pandas/tools/tests/test_merge.py @@ -1277,22 +1277,27 @@ def test_ffill(self): assert_frame_equal(result, expected) def test_multigroup(self): - raise nose.SkipTest left = concat([self.left, self.left], ignore_index=True) - right = concat([self.right, self.right], ignore_index=True) + # right = concat([self.right, self.right], ignore_index=True) left['group'] = ['a'] * 3 + ['b'] * 3 - right['group'] = ['a'] * 4 + ['b'] * 4 + # right['group'] = ['a'] * 4 + ['b'] * 4 - result = ordered_merge(left, right, on='key', by='group', + result = ordered_merge(left, self.right, on='key', left_by='group', fill_method='ffill') - - expected = DataFrame({'key': ['a', 'b', 'c', 'd', 'e', 'f'], - 'lvalue': [1., 1, 2, 2, 3, 3.], - 'rvalue': [nan, 1, 2, 3, 3, 4]}) + expected = DataFrame({'key': ['a', 'b', 'c', 'd', 'e', 'f'] * 2, + 'lvalue': [1., 1, 2, 2, 3, 3.] * 2, + 'rvalue': [nan, 1, 2, 3, 3, 4] * 2}) expected['group'] = ['a'] * 6 + ['b'] * 6 - assert_frame_equal(result, expected) + assert_frame_equal(result, expected.ix[:, result.columns]) + + result2 = ordered_merge(self.right, left, on='key', right_by='group', + fill_method='ffill') + assert_frame_equal(result, result2.ix[:, result.columns]) + + result = ordered_merge(left, self.right, on='key', left_by='group') + self.assert_(result['group'].notnull().all()) if __name__ == '__main__': import nose diff --git a/vb_suite/join_merge.py b/vb_suite/join_merge.py index 657ca398f01bb..07fcfcb5ddc14 100644 --- a/vb_suite/join_merge.py +++ b/vb_suite/join_merge.py @@ -150,3 +150,20 @@ def sample(values, k): concat_series_axis1 = Benchmark('concat(pieces, axis=1)', setup, start_date=datetime(2012, 2, 27)) + +#---------------------------------------------------------------------- +# Ordered merge + +setup = common_setup + """ +groups = np.array([rands(10) for _ in xrange(10)], dtype='O') + +left = DataFrame({'group': groups.repeat(5000), + 'key' : np.tile(np.arange(0, 10000, 2), 10), + 'lvalue': np.random.randn(50000)}) + +right = DataFrame({'key' : np.arange(10000), + 'rvalue' : np.random.randn(10000)}) + +""" + +stmt = "ordered_merge(left, right, on='key', left_by='group')"