-
-
Notifications
You must be signed in to change notification settings - Fork 19.1k
Description
Code Sample (microbenchmark)
from __future__ import division
import pandas as pd
import numpy as np
import functools
import operator as op
import time
def join_raw(df1, df2, *args, **kwargs):
start_time = time.time()
result = df1.join(df2, *args, **kwargs)
end_time = time.time()
return result, end_time - start_time
def join_with_workaround(df1, df2, *args, **kwargs):
index1 = list(df1.index.names)
index2 = list(df2.index.names)
index1common = [level for level in index1 if level in index2]
index2common = [level for level in index2 if level in index1]
index1remove = [level for level in index1 if level not in index2]
index2remove = [level for level in index2 if level not in index1]
if index1remove:
df1 = df1.reset_index(index1remove)
if index2remove:
df2 = df2.reset_index(index2remove)
# The indices should be the same now, except that levels might be in a
# different order, which causes "join" to fail as well.
if df1.index.names != df2.index.names:
df2 = df2.reorder_levels(df1.index.names)
start_time = time.time()
result = df1.join(df2, *args, **kwargs)
end_time = time.time()
# Add all removed levels to the index of the result.
if index1remove:
result.set_index(index1remove, append=True, inplace=True)
# Make sure the levels are ordered the same way as in df1.
# Additional levels only appearing in df2 will be added at the
# end, though.
if result.index.names != index1:
result = result.reorder_levels(index1)
if index2remove:
result.set_index(index2remove, append=True, inplace=True)
return result, end_time - start_time
def join_with_reset_index(df1, df2, *args, **kwargs):
index1 = list(df1.index.names)
index2 = list(df2.index.names)
index2common = [level for level in index2 if level in index1]
index2remove = [level for level in index2 if level not in index1]
df1 = df1.reset_index()
if index2remove:
df2 = df2.reset_index(index2remove)
start_time = time.time()
result = df1.join(df2, on=index2common, *args, **kwargs)
end_time = time.time()
result.set_index(index1 + index2remove, inplace=True)
return result, end_time - start_time
def join_via_concat(df1, df2, how='left', lsuffix='', rsuffix=''):
index1 = list(df1.index.names)
index2 = list(df2.index.names)
index1remove = [level for level in index1 if level not in index2]
index2remove = [level for level in index2 if level not in index1]
# Remove levels from both indices as necessary.
if index1remove:
df1 = df1.reset_index(index1remove)
if index2remove:
df2 = df2.reset_index(index2remove)
# The indices should be the same now, except that levels might be in a
# different order.
if df1.index.names != df2.index.names:
df2 = df2.reorder_levels(df1.index.names)
if lsuffix:
df1 = df1.rename(columns=lambda c: c + lsuffix if c in df2.columns.values else c, copy=False)
if rsuffix:
df2 = df2.rename(columns=lambda c: c + rsuffix if c in df1.columns.values else c, copy=False)
overlapping_columns = [column for column in df1.columns.values if column in df2.columns.values]
if overlapping_columns:
raise Exception('Overlapping column names when joining data frames: %s (specify lsuffix or rsuffix)' % overlapping_columns)
# Use concat to simulate join.
start_time = time.time()
if how == 'left':
result = pd.concat([df1, df2], axis=1, join_axes=[df1.index])
elif how == 'right':
result = pd.concat([df1, df2], axis=1, join_axes=[df2.index])
else:
result = pd.concat([df1, df2], axis=1, join=how)
end_time = time.time()
# Add all removed levels to the index of the result.
if index1remove:
result.set_index(index1remove, append=True, inplace=True)
# Make sure the levels are ordered the same way as in df1.
# Additional levels only appearing in df2 will be added at the
# end, though.
if result.index.names != index1:
result = result.reorder_levels(index1)
if index2remove:
result.set_index(index2remove, append=True, inplace=True)
return result, end_time - start_time
result_row_count = 10000000
random_numbers = np.random.randn(result_row_count)
def create_dataframe(index_columns, index_sizes):
index_ranges = [list(range(size)) for size in index_sizes]
index = pd.MultiIndex.from_product(index_ranges, names=index_columns)
row_count = functools.reduce(op.mul, index_sizes, 1)
return pd.DataFrame(random_numbers[:row_count], columns=['V'], index=index)
def test_join_fn(fn, index1, index2, index_sizes):
df1 = create_dataframe(index1, [index_sizes[c] for c in index1])
df2 = create_dataframe(index2, [index_sizes[c] for c in index2])
try:
start_time = time.time()
result, main_time_1 = fn(df1, df2, lsuffix='l', rsuffix='r')
mid_time = time.time()
result, main_time_2 = fn(df1, df2, lsuffix='l', rsuffix='r')
end_time = time.time()
print(" %.3f (%.3f), then %.3f (%.3f)" % (mid_time - start_time, main_time_1, end_time - mid_time, main_time_2))
return True
except:
print(" ERROR")
return False
def test_joins(*args):
print(" Raw join on index:")
if not test_join_fn(join_raw, *args):
print(" Join on index with workaround (partial reset_index):")
test_join_fn(join_with_workaround, *args)
print(" Join after reset_index:")
test_join_fn(join_with_reset_index, *args)
print(" Join via concat:")
test_join_fn(join_via_concat, *args)
print("")
def test_index_combination(index1, index2):
result_index = index1 + [c for c in index2 if c not in index1]
print("%s + %s -> %s:" % (index1, index2, result_index))
remaining_row_count = result_row_count
index_sizes = {}
for c in result_index[:-1]:
if remaining_row_count >= 10:
index_sizes[c] = 10
remaining_row_count = (remaining_row_count + 9) // 10
else:
index_sizes[c] = 1
index_sizes[result_index[-1]] = remaining_row_count
test_joins(index1, index2, index_sizes)
test_index_combination(list('A'), list('A'))
test_index_combination(list('AB'), list('AB'))
test_index_combination(list('ABC'), list('ABC'))
test_index_combination(list('ABCDE'), list('ABCDE'))
test_index_combination(list('ABCDEFG'), list('ABCDEFG'))
test_index_combination(list('A'), list('AB'))
test_index_combination(list('B'), list('AB'))
test_index_combination(list('A'), list('ABC'))
test_index_combination(list('B'), list('ABC'))
test_index_combination(list('AB'), list('A'))
test_index_combination(list('AB'), list('B'))
test_index_combination(list('ABC'), list('A'))
test_index_combination(list('ABC'), list('B'))
test_index_combination(list('ABC'), list('AB'))
test_index_combination(list('ABCDEF'), list('ABCDE'))
test_index_combination(list('AB'), list('ABC'))
test_index_combination(list('ABCDE'), list('ABCDEF'))
test_index_combination(list('ABC'), list('ABD'))
test_index_combination(list('ABC'), list('BC'))
test_index_combination(list('BC'), list('ABC'))
test_index_combination(list('ABC'), list('AC'))
test_index_combination(list('AC'), list('ABC'))
test_index_combination(list('AB'), list('BA'))
test_index_combination(list('ABCEF'), list('ACDBF'))
Problem description
We are getting severe performance bottlenecks in our code due to join operations on multiple index levels.
To analyze and illustrate the problem, we have created a microbenchmark which executes joins on data frames with different indices and one data column. The benchmark is constructed such that the resulting data frame always has the same number of rows (10000000 in this case, though we also have to deal with larger data sets). We would therefore expect all join operations to take a similar amount of time, perhaps somewhat linear in the number of index levels. Instead, we see up to 200x longer execution times in some cases (see output).
The benchmark works as follows:
- First, we create two data frames with unique, hierarchically sorted multi-indices. The data frames are built so that after they are joined, all index levels except the last have 10 values each, and the last level has however many values are necessary to get 10000000 rows.
- Then these data frames are joined twice in a row, measuring the time each join operation takes.
- Moreover, we perform the same join in several different ways that (as far as we can tell) yield the same result but are sometimes faster, and/or work in cases where the native join operation fails with a "currently not implemented" exception. In particular, using concat instead of join sometimes helps a lot. Unfortunately, even the fastest method is often too slow for our purposes.
The numbers show the times of both joins that are executed on the same data frames. Interestingly, in some cases, the first join often takes a lot longer than the second. For the alternative implementations, the numbers in parentheses show the number of seconds spent in the main inner operation (either join or concat).
We would be grateful for answers to any of the following questions:
- What is causing these large differences in performance?
- Is there anything we can do on our side to speed things up (other than selecting the fastest option in each case, which we already do)?
- Are there any plans to improve the performance of joins in Pandas, in general?
- Could we assist with this, e.g. with a bug bounty?
Output
Expected Output
We would expect all execution times to be less than a second, usually much less.
Output of pd.show_versions()
pandas: 0.19.1
nose: None
pip: 9.0.1
setuptools: 20.10.1
Cython: None
numpy: 1.11.2
scipy: None
statsmodels: None
xarray: None
IPython: None
sphinx: None
patsy: None
dateutil: 2.6.0
pytz: 2016.7
blosc: None
bottleneck: None
tables: None
numexpr: None
matplotlib: None
openpyxl: None
xlrd: None
xlwt: None
xlsxwriter: None
lxml: None
bs4: None
html5lib: None
httplib2: None
apiclient: None
sqlalchemy: None
pymysql: None
psycopg2: None
jinja2: None
boto: None
pandas_datareader: None