Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Poor Performance on TPC-H Queries #6948

Closed
hesamshahrokhi opened this issue Feb 19, 2024 · 17 comments · Fixed by #6951
Closed

Poor Performance on TPC-H Queries #6948

hesamshahrokhi opened this issue Feb 19, 2024 · 17 comments · Fixed by #6951
Labels
bug 🦗 Something isn't working External Pull requests and issues from people who do not regularly contribute to modin P1 Important tasks that we should complete soon Performance 🚀 Performance related issues and pull requests.

Comments

@hesamshahrokhi
Copy link

In a benchmark project (link), I tried to compare the performance of Pandas and Modin (on Ray and Dask) on simple TPC-H queries (Q1 and Q6) written in Pandas. The results (link) show that the read_table API is 2x faster than its Pandas equivalent on Ray. However, the same API on Dask, and also the individual query run times on both backends are much slower than Pandas. Moreover, the results of Q1 are not correct.

Could you please help me to understand why this is the case? I tried to use the suggested configurations and imported the data using read_table which is officially supported by Modin.

@hesamshahrokhi hesamshahrokhi added question ❓ Questions about Modin Triage 🩹 Issues that need triage labels Feb 19, 2024
@anmyachev
Copy link
Collaborator

Hi @hesamshahrokhi! Thanks for the question!

The method you use to initialize dask is not intended for Modin:
https://github.com/hesamshahrokhi/modin_tpch_benchmark/blob/e7cf88cf5d6152c32593193a94c837f5fde4f298/tpch_dask.py#L12C17-L12C32. The processes parameter should be True. Could I also ask why you initialize the dask yourself rather than relying on the initialization that Modin does implicitly?

Moreover, the results of Q1 are not correct.

Let's see what's wrong. Could you please provide more information?

@anmyachev anmyachev added External Pull requests and issues from people who do not regularly contribute to modin P1 Important tasks that we should complete soon labels Feb 19, 2024
@hesamshahrokhi
Copy link
Author

Hi @anmyachev
Thanks for your quick response!

I did the explicit initialization of Dask and Ray because of a runtime warning that asked me to do so. Now, based on your suggestion, I entirely removed the explicit initializations for Ray and Dask. But, after re-running the benchmark, I still see poor performance from Modin (on both backends). Here are the updated benchmark results and the mentioned warnings for Ray and Dask.

Regarding the correctness issue in Q1, that still exists, I would refer you to the correct answer by Pandas and the incorrect answers by Modin on Ray and Dask. In the Modin outputs, the titles for these columns sum_disc_price, sum_charge, avg_disc, and count_order are not match to their relevant values.

@anmyachev
Copy link
Collaborator

of a runtime warning that asked me to do so.

This was not the most obvious warning, which in fact simply showed that Modin initializes the engine itself. We have already removed it in Modin 0.27.0.

What version of Modin are you using? Can you use version 0.27.0? Perhaps the bug has already been fixed and the performance is better.

Regarding the correctness issue in Q1, that still exists, I would refer you to the correct answer by Pandas and the incorrect answers by Modin on Ray and Dask. In the Modin outputs, the titles for these columns sum_disc_price, sum_charge, avg_disc, and count_order are not match to their relevant values.

We'll look at this problem.

@anmyachev anmyachev removed the Triage 🩹 Issues that need triage label Feb 20, 2024
@anmyachev
Copy link
Collaborator

anmyachev commented Feb 20, 2024

I'm able to reproduce the problem with the following reproducer (for Modin 0.27.0):

import numpy as np
import modin.pandas as pd
from modin.pandas.test.utils import df_equals

np.random.seed(42)

columns = ['l_returnflag', 'l_linestatus', 'l_discount', 'l_extendedprice', 'l_quantity', 'b', 'a'] # , 'l_extendedprice'] first fail
columns = ['l_returnflag', 'l_linestatus', 'l_discount', 'l_extendedprice', 'l_quantity']
df = pd.DataFrame(np.random.randint(0, 100, size=(1000, len(columns))), columns=columns)
df["a"] = ((df.l_extendedprice) * (1 - (df.l_discount)))
df['b'] = ((df.l_extendedprice) * (1 - (df.l_discount))) * 1.94

agg_df = df \
    .groupby(['l_returnflag', 'l_linestatus']) \
    .agg(
        sum_qty=("l_quantity", "sum"),
        sum_base_price=("l_extendedprice", "sum"),
        sum_disc_price=("a", "sum"),
        sum_charge=("b", "sum"),
        avg_qty=("l_quantity", "mean"),
        avg_price=("l_extendedprice", "mean"),
        avg_disc=("l_discount", "mean"),
        count_order=("l_returnflag", "count"),
    ).reset_index()


agg_df_pd = df._to_pandas() \
    .groupby(['l_returnflag', 'l_linestatus']) \
    .agg(
        sum_qty=("l_quantity", "sum"),
        sum_base_price=("l_extendedprice", "sum"),
        sum_disc_price=("a", "sum"),
        sum_charge=("b", "sum"),
        avg_qty=("l_quantity", "mean"),
        avg_price=("l_extendedprice", "mean"),
        avg_disc=("l_discount", "mean"),
        count_order=("l_returnflag", "count"),
    ).reset_index()

df_equals(agg_df, agg_df_pd)

@anmyachev anmyachev added the bug 🦗 Something isn't working label Feb 20, 2024
@hesamshahrokhi
Copy link
Author

of a runtime warning that asked me to do so.

This was not the most obvious warning, which in fact simply showed that Modin initializes the engine itself. We have already removed it in Modin 0.27.0.

What version of Modin are you using? Can you use version 0.27.0? Perhaps the bug has already been fixed and the performance is better.

I changed my modin version to 0.27.0. The warnings are gone but the performance and correctness issues still exist.

Regarding the correctness issue in Q1, that still exists, I would refer you to the correct answer by Pandas and the incorrect answers by Modin on Ray and Dask. In the Modin outputs, the titles for these columns sum_disc_price, sum_charge, avg_disc, and count_order are not match to their relevant values.

We'll look at this problem.

Thanks :)

anmyachev added a commit to anmyachev/modin that referenced this issue Feb 20, 2024
…column partitions

Signed-off-by: Anatoly Myachev <anatoly.myachev@intel.com>
@hesamshahrokhi
Copy link
Author

It seems that you have fixed the correctness issue and made a pull request. This is great!

Do you have any idea how can I resolve the performance issue as well? The current performance for both workloads is much lower than Pandas.

Thanks.

@anmyachev
Copy link
Collaborator

It seems that you have fixed the correctness issue and made a pull request. This is great!

Not yet completely, I fixed it only for your case, but this approach does not always work.

Do you have any idea how can I resolve the performance issue as well? The current performance for both workloads is much lower than Pandas.

Could you tell me how many cores there are on your system? Judging by the time spent on the operations that you have in the file, the data is not very large. It might make sense to reduce the number of cores used via MODIN_CPUS environment variable.

I would also like to draw your attention to the fact that during the first operation the engine initialization time is also included. If you are interested in the operation time without overhead, then you can try to perform some operation before (kind of warming up). For example like:

import modin.config as cfg

# init the workers
pd.DataFrame([cfg.MinPartitionSize.get() * cfg.NPartitions.get()]).to_numpy()

@hesamshahrokhi
Copy link
Author

It seems that you have fixed the correctness issue and made a pull request. This is great!

Not yet completely, I fixed it only for your case, but this approach does not always work.

Do you have any idea how can I resolve the performance issue as well? The current performance for both workloads is much lower than Pandas.

Could you tell me how many cores there are on your system? Judging by the time spent on the operations that you have in the file, the data is not very large. It might make sense to reduce the number of cores used via MODIN_CPUS environment variable.

Here is some contextual information:

By default, I let Modin use the 4 available cores. However, after your suggestion, I set MODIN_CPUS to 2 which made the results worse.

I would also like to draw your attention to the fact that during the first operation the engine initialization time is also included. If you are interested in the operation time without overhead, then you can try to perform some operation before (kind of warming up). For example like:

import modin.config as cfg

# init the workers
pd.DataFrame([cfg.MinPartitionSize.get() * cfg.NPartitions.get()]).to_numpy()

I used your suggested code for a warm-up phase (repeating it 100 times). Then, I executed each query 2 times after the warmup phase. Unfortunately, I still see bad performance and it seems that it is the real operation time.

In summary, can I conclude that Modin is not a good match for my workload? Based on your experience, for what kind of workloads do you suggest using Modin?

Thank you so much for your follow-ups.

@anmyachev
Copy link
Collaborator

Thanks for the trying!

By default, I let Modin use the 4 available cores. However, after your suggestion, I set MODIN_CPUS to 2 which made the results worse.

4 cores is the minimum, 2 almost always works worse.

I used your suggested code for a warm-up phase (repeating it 100 times). Then, I executed each query 2 times after the warmup phase. Unfortunately, I still see bad performance and it seems that it is the real operation time.

Could you also try to use export MODIN_RANGE_PARTITIONING_GROUPBY=True mode? In many cases it is better.

In summary, can I conclude that Modin is not a good match for my workload?

I don’t see any fundamental reasons why your workload is not suitable for Modin; rather, it’s insufficient optimization on our part. @YarShev @dchigarev thoughts?

@hesamshahrokhi
Copy link
Author

Thanks for the trying!

By default, I let Modin use the 4 available cores. However, after your suggestion, I set MODIN_CPUS to 2 which made the results worse.

4 cores is the minimum, 2 almost always works worse.

I used your suggested code for a warm-up phase (repeating it 100 times). Then, I executed each query 2 times after the warmup phase. Unfortunately, I still see bad performance and it seems that it is the real operation time.

Could you also try to use export MODIN_RANGE_PARTITIONING_GROUPBY=True mode? In many cases it is better.

I tried to use it:

  • Good news: the correctness issue for Q1 is resolved by using this flag! (results)
  • Bad news: the performance is still bad.

In summary, can I conclude that Modin is not a good match for my workload?

I don’t see any fundamental reasons why your workload is not suitable for Modin; rather, it’s insufficient optimization on our part. @YarShev @dchigarev thoughts?

I see.

@anmyachev anmyachev added the Performance 🚀 Performance related issues and pull requests. label Feb 22, 2024
@YarShev
Copy link
Collaborator

YarShev commented Feb 22, 2024

@hesamshahrokhi,

  1. What is the data size used in the benchmark?
  2. What OS are you running on?

Responses to these questions may shed some light on the performance slowdown. Eventually, we would need the data you are using to reproduce the performance issue.

@anmyachev
Copy link
Collaborator

What is the data size used in the benchmark?

@YarShev Dataset: TPC-H SF-1 (~1GB)

Eventually, we would need the data you are using to reproduce the performance issue.

I think it's quite easy to generate (with https://github.com/electrum/tpch-dbgen).

@dchigarev
Copy link
Collaborator

dchigarev commented Feb 22, 2024

I don’t see any fundamental reasons why your workload is not suitable for Modin; rather, it’s insufficient optimization on our part. @YarShev @dchigarev thoughts?

After doing some measurements, I can exclude the problem of 'not having enough data to efficiently parallelize' as I tried running the queries with different data scale factors (up to 10x) and Modin performs poorly with all the data sizes. I also can exclude the problem of @hesamshahrokhi having not suitable hardware, as I did the measurements on a 16-cores machine that has more than enough memory. I also verified that the partitioning is perfect, and the dataset doesn't have tricky dtypes.

TPCH measurements with different scale factors

image

I also did the measurements of individual methods in the queries and found out that the problem seems to be in the rows filtering, as it's the slowest part:
(yes I did put execute() after every stage in order to trigger computations)
image

It's unclear to me why the rows filtering and columns insertions work that slow, need more time to profile

@anmyachev anmyachev removed the question ❓ Questions about Modin label Feb 22, 2024
YarShev pushed a commit that referenced this issue Feb 22, 2024
…ions (#6951)

Signed-off-by: Anatoly Myachev <anatoly.myachev@intel.com>
@anmyachev anmyachev reopened this Feb 22, 2024
tochigiv pushed a commit to tochigiv/modin that referenced this issue Feb 22, 2024
…column partitions (modin-project#6951)

Signed-off-by: Anatoly Myachev <anatoly.myachev@intel.com>
tochigiv pushed a commit to tochigiv/modin that referenced this issue Feb 22, 2024
…column partitions (modin-project#6951)

Signed-off-by: Anatoly Myachev <anatoly.myachev@intel.com>
@dchigarev
Copy link
Collaborator

dchigarev commented Feb 28, 2024

I took a deeper look and found several problems related to our lazy execution mechanism:

0. Should trigger execution after reading for fair measurements
Because of lazy execution, the reading slightly affects the execution time of queries, so for fair measurements it's worth calling execute(df) right after the reading.

1. We trigger the materialization of MetaList too often in some cases

When there is some queue of operations hanging on a dataframe, the metadata of partitions (length/width cache) is represented via unmaterialized meta list. Accessing partition's metadata results into materialization of this meta list. Then when computing row lengths for the whole dataframe, we request partition's cache in a sequential for-loop, meaning that we trigger meta-list computation for each partition sequentially, rather than in parallel (despite we're passing materialize=False to part.length()).

As a proof that this is indeed a bottleneck, I've prepared this patch, where instead of materializing meta-list on every .__getitem__() it returns a future instead (implemented in .lazy_get()):

git patch
diff --git a/modin/core/execution/ray/common/deferred_execution.py b/modin/core/execution/ray/common/deferred_execution.py
index 5198d835..6d4a2766 100644
--- a/modin/core/execution/ray/common/deferred_execution.py
+++ b/modin/core/execution/ray/common/deferred_execution.py
@@ -465,6 +465,9 @@ class DeferredExecution:
         """Not serializable."""
         raise NotImplementedError("DeferredExecution is not serializable!")
 
+@ray.remote
+def get_index(obj, index):
+    return obj[index]
 
 class MetaList:
     """
@@ -478,6 +481,12 @@ class MetaList:
     def __init__(self, obj: Union[ray.ObjectID, ClientObjectRef, List]):
         self._obj = obj
 
+    def lazy_get(self, index):
+        if isinstance(self._obj, list):
+            return self._obj[index]
+        
+        return get_index.remote(self._obj, index)
+
     def __getitem__(self, index):
         """
         Get item at the specified index.
diff --git a/modin/core/execution/ray/implementations/pandas_on_ray/partitioning/partition.py b/modin/core/execution/ray/implementations/pandas_on_ray/partitioning/partition.py
index 2e0ded45..12bd57d2 100644
--- a/modin/core/execution/ray/implementations/pandas_on_ray/partitioning/partition.py
+++ b/modin/core/execution/ray/implementations/pandas_on_ray/partitioning/partition.py
@@ -338,7 +338,8 @@ class PandasOnRayDataframePartition(PandasDataframePartition):
 
     @property
     def _length_cache(self):  # noqa: GL08
-        return self._meta[self._meta_offset]
+        # return self._meta[self._meta_offset]
+        return self._meta.lazy_get(self._meta_offset)
 
     @_length_cache.setter
     def _length_cache(self, value):  # noqa: GL08
@@ -346,7 +347,8 @@ class PandasOnRayDataframePartition(PandasDataframePartition):
 
     @property
     def _width_cache(self):  # noqa: GL08
-        return self._meta[self._meta_offset + 1]
+        # return self._meta[self._meta_offset + 1]
+        return self._meta.lazy_get(self._meta_offset + 1)
 
     @_width_cache.setter
     def _width_cache(self, value):  # noqa: GL08
@@ -354,7 +356,8 @@ class PandasOnRayDataframePartition(PandasDataframePartition):
 
     @property
     def _ip_cache(self):  # noqa: GL08
-        return self._meta[-1]
+        # return self._meta[-1]
+        return self._meta.lazy_get(-1)
 
     @_ip_cache.setter
     def _ip_cache(self, value):  # noqa: GL08

After applying this patch, execution time for the following reproducer decreased from 4.23s to 0.47s.

reproducer, problem 1
import modin.pandas as pd
import modin.config as cfg
from modin.utils import execute

cfg.CpuCount.put(16)

from timeit import default_timer as timer

l_columnnames = [
    'l_orderkey', 'l_partkey', 'l_suppkey', 'l_linenumber', 'l_quantity', 'l_extendedprice',
    'l_discount', 'l_tax', 'l_returnflag', 'l_linestatus', 'l_shipdate', 'l_commitdate',
    'l_receiptdate', 'l_shipinstruct', 'l_shipmode', 'l_comment'
]
l_parse_dates = ['l_shipdate', 'l_commitdate', 'l_receiptdate']

# see instructions on how to generate data at the bottom of the message
path = "../tpch-dbgen/lineitem_3.tbl"

df = pd.read_table(path, sep="|", names=l_columnnames, parse_dates=l_parse_dates, index_col=False)
print("data read")

def problem1(df):
    res = df["l_shipdate"] <= "1998-09-02"
    execute(res)

t1 = timer()
problem1(df)
print(timer() - t1)

I'm not sure though whether this is the optimal way to fix this, would like to hear @AndreyPavlenko opinion on that.

2. Unnecessary ._copartition() on binary operations (#6980)

This block with binary operations unnecessary triggers lazy executions because of ._copartition() call that ensures that indices/partitioning of both arguments are equal before performing a binary operation:

df['a'] = (df.l_extendedprice) * (1 - (df.l_discount))
df['b'] = (((df.l_extendedprice) * (1 - (df.l_discount))) * (1 + (df.l_tax)))

The current implementation of ._copartition() simply triggers computation of actual indexing and actual row lengths for both dataframes and then compares them. But in this case we know that all arguments are views of the same dataframe, meaning that both indexing and partitioning are identical, so we can potentially skip this check. The mechanism of detecting sibling frames when comparing indices/partitioning was already implemented in #6491. What we can do here is simply enable it for ._copartition() method as well.

3. Unnecessary row_lengths computation on ._propagate_index_objs(axis=1) (#6977)

The dataframe in this workflow often have ._deferred_columns flag set to True, causing for ._propagate_index_objs(axis=1) to be called almost before every operation. The columns' propagation itself can be performed lazily, however the method explicitly calls empty partition filtering at the beginning that forcibly triggers computation for row_lengths:


This ._filter_empties() call was introduced ~5 years ago by this PR (#721) and isn't really required for the indices' propagation to work. I suggest we replace this call with ._filter_empties(compute_metadata=False) to not trigger computations here.

TL;DR

There are several problems with current lazy execution in modin that should be fixed soon. After applying all the fixes from the list, I've been able to drop the execution time of the first query from 5.0s to 3.7s and the whole query from the beginning until groupby call runs completely lazy now.

P.S.

Disabling lazy execution completely helps a bit, but not much:

execution time of q1

Lazy execution on master Lazy execution with fixes from the list cfg.LazyExecution("off") pandas
5.0 3.7 4.2 4.85

@AndreyPavlenko
Copy link
Collaborator

I'm not sure though whether this is the optimal way to fix this, would like to hear @AndreyPavlenko opinion on that.

I'll address this issue - #7001

@dchigarev
Copy link
Collaborator

dchigarev commented Mar 15, 2024

@hesamshahrokhi

So we've merged all performance fixes for the problem described in this comment and released 0.28.0 version containing all of them.

Here are the performance measurements I got:
image

In the case of 16 cores being provided for Modin, there's a noticeable performance gain (~40%) for query 1. In the case of 4 cores, Modin doesn't perform that well (we generally don't recommend using modin with such a small amount of available cores, 8cores+ is the recommended minimum).

There's no performance gain for query 6, as the main bottleneck there is that there are columns containing long strings (l_comment, l_shipstruct). Ray's storage is not really good in serializing/deserializing sequences of python strings, that's why it struggles with the query. We could apply an optimization here and filter out unnecessary columns first, so the rest of the kernel would perform on data that doesn't make Ray's serialization mechanisms struggle. This will give a decent speed-up for the query 6:
image

how to rewrite the 6th query
def q6(df):
+   df = df[['l_shipdate', 'l_discount', 'l_quantity', 'l_extendedprice']] 
    df = df[
        (df.l_shipdate>='1994-01-01') & 
        (df.l_shipdate<'1995-01-01') & 
        (df.l_discount>=0.050) & 
        (df.l_discount<=0.070) & 
        (df.l_quantity<24)
    ]
-   df = df[['l_shipdate', 'l_discount', 'l_quantity', 'l_extendedprice']] 
    
    df['l_extendedpricel_discount'] = ((df.l_extendedprice) * (df.l_discount))
    res = (df.l_extendedpricel_discount).sum()
    return res

It still performs slower than Pandas though, an explanation could be that Modin doesn't work well yet on tasks that perform in a time less than 5 seconds in Pandas, the overheads of parallelization simply don't justify the gain.

@dchigarev
Copy link
Collaborator

feel free to reopen if you have any following questions

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug 🦗 Something isn't working External Pull requests and issues from people who do not regularly contribute to modin P1 Important tasks that we should complete soon Performance 🚀 Performance related issues and pull requests.
Projects
None yet
Development

Successfully merging a pull request may close this issue.

5 participants