Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

FIX-#6879: Convert the right DF to single partition before broadcasting in query_compiler.merge #6880

Merged
merged 16 commits into from Feb 13, 2024

Conversation

arunjose696
Copy link
Collaborator

@arunjose696 arunjose696 commented Jan 25, 2024

What do these changes do?

  • first commit message and PR title follow format outlined here

    NOTE: If you edit the PR title to match this format, you need to add another commit (even if it's empty) or amend your last commit for the CI job that checks the PR title to pick up the new PR title.

  • passes flake8 modin/ asv_bench/benchmarks scripts/doc_checker.py
  • passes black --check modin/ asv_bench/benchmarks scripts/doc_checker.py
  • signed commit with git commit -s
  • Resolves The query_compiler.merge reconstructs the Right dataframe for every partition of Left Dataframe #6879
  • tests added and passing
  • module layout described at docs/development/architecture.rst is up-to-date

@arunjose696
Copy link
Collaborator Author

arunjose696 commented Jan 25, 2024

Few issues I have with the approach
1)peak memory consumption in the worker that converts dataframe to single partition would be still high and same as the previous approach.
2)As we are broadcasting the right df as modin DF this is slightly heavier in memory compared to converting the dataframe to pandas. When comparing peak memory consumption of workers for the below snippet the observation was.
Approach with converting right.to_pandas<current<master

modin_df =  pd.DataFrame(np.random.randint(0,100,size=(1000, 1000)),)
modin_df2 = pd.DataFrame(np.random.randint(0,100,size=(10000, 1000)))
modin_result = pd.merge(modin_df, modin_df2, how="left")

Can I have some suggestions on how to improve this?

@arunjose696 arunjose696 changed the title dfToSinglePartition FIX:6879 Convert the right DF to single partition before broadcasting in query_compiler.merge Jan 25, 2024
@arunjose696 arunjose696 changed the title FIX:6879 Convert the right DF to single partition before broadcasting in query_compiler.merge FIX-#6879 Convert the right DF to single partition before broadcasting in query_compiler.merge Jan 25, 2024
@arunjose696 arunjose696 changed the title FIX-#6879 Convert the right DF to single partition before broadcasting in query_compiler.merge FIX-#6879: Convert the right DF to single partition before broadcasting in query_compiler.merge Jan 25, 2024
@arunjose696 arunjose696 force-pushed the dfToSinglePartition branch 2 times, most recently from 5331555 to 0a872b3 Compare January 25, 2024 12:18
@anmyachev
Copy link
Collaborator

Few issues I have with the approach 1)peak memory consumption in the worker that converts dataframe to single partition would be still high and same as the previous approach. 2)As we are broadcasting the right df as modin DF this is slightly heavier in memory compared to converting the dataframe to pandas. When comparing peak memory consumption of workers for the below snippet the observation was. Approach with converting right.to_pandas<current<master

modin_df =  pd.DataFrame(np.random.randint(0,100,size=(1000, 1000)),)
modin_df2 = pd.DataFrame(np.random.randint(0,100,size=(10000, 1000)))
modin_result = pd.merge(modin_df, modin_df2, how="left")

Can I have some suggestions on how to improve this?

Might it be possible to call right.to_pandas in the worker process instead of the main one? (with some changes)

@arunjose696
Copy link
Collaborator Author

Might it be possible to call right.to_pandas in the worker process instead of the main one? (with some changes)

To call right.to_pandas on workers we would need to still send the right modin dataframe to worker. Wouldnt it still increase the memory consumption same as that in this case, why would it be better compared to the current approach?

@YarShev
Copy link
Collaborator

YarShev commented Jan 27, 2024

@arunjose696, I think @anmyachev means calling to_pandas in a single worker process to get a single partitioned Modin DataFrame out of all partitions of the right Modin DataFrame. It seems we could do the following but I am not sure if this works. I guess some changes would be required for the implementation to work.

def force_materialization(self) -> "PandasDataframe":
    row_partitions = self._partition_mgr_cls.row_partitions(self._partitions)
    col_partition = self._partition_mgr_cls.column_partitions(row_partitions)
    new_frame = np.array([col_partition[0].apply(lambda df: df, num_splits=1)])
    return new_frame

@arunjose696
Copy link
Collaborator Author

arunjose696 commented Jan 29, 2024

def force_materialization(self) -> "PandasDataframe":
    row_partitions = self._partition_mgr_cls.row_partitions(self._partitions)
    col_partition = self._partition_mgr_cls.column_partitions(row_partitions)
    new_frame = np.array([col_partition[0].apply(lambda df: df, num_splits=1)])
    return new_frame

I tried this approach. by making a small change, it converts to a single partiton df . However the the memory consumption is increasing for several workers during the force_materialization even though only one remote worker is utilized. I tried checking the memory consumption of workers with below script in which I log the memory consumption in workers before and after the force materialization call. The memory consumption in multiple workers have gone up.

force_materialization.py
import modin.pandas as pd
from modin.utils import execute
import numpy as np
import time
import sys
import os
import re
import platform
import warnings

import psutil

_VM_PEAK_PATTERN = r"VmHWM:\s+(\d+)"

def get_max_memory_usage(proc=psutil.Process()):
    """Reads maximum memory usage in MB from process history. Returns 0 on non-linux systems
    or if the process is not alive."""
    max_mem = 0
    try:
        with open(f"/proc/{proc.pid}/status", "r") as stat:
            for match in re.finditer(_VM_PEAK_PATTERN, stat.read()):
                max_mem = float(match.group(1))
                # MB conversion
                max_mem = int(max_mem / 1024)
                break
    except FileNotFoundError:
        if platform.system() == "Linux":
            warnings.warn(f"Couldn't open `/proc/{proc.pid}/status` file. Is the process alive?")
        else:
            warnings.warn("Couldn't get the max memory usage on a non-Linux platform.")
        return 0
    max_mem_used=max_mem + sum(get_max_memory_usage(c) for c in proc.children())
    print(f"for proceess with name {proc.name()} and  { len(proc.children())} children the mem used is {max_mem}")
    return max_mem_used
modin_df =  pd.DataFrame(np.random.randint(0,100,size=(1000, 1000)),)

execute(modin_df)
print("before force_materialization")
print(f"total memory consumed ={get_max_memory_usage()}")
xf=modin_df._query_compiler._modin_frame.force_materialization()

print("/n/nafter force_materialization")
print(f"total memory consumed ={get_max_memory_usage()}")

@anmyachev
Copy link
Collaborator

def force_materialization(self) -> "PandasDataframe":
    row_partitions = self._partition_mgr_cls.row_partitions(self._partitions)
    col_partition = self._partition_mgr_cls.column_partitions(row_partitions)
    new_frame = np.array([col_partition[0].apply(lambda df: df, num_splits=1)])
    return new_frame

I tried this approach. by making a small change, it works. However the the memory consumption is increasing for several workers during the force_materialization even though only one remote worker is utilized. I tried checking the memory consumption of workers with below script in which I log the memory consumption in workers before and after the force materialization call. The memory consumption in multiple workers have gone up.

As far as I remember, with this approach it is possible that intermediate partitions are created using method force_materialization, which may explain the increased memory consumption.

# If this axis partition is made of axis partitions
# for the other axes, squeeze such partitions into a single
# block so that this partition only holds a one-dimensional
# list of blocks. We could change this implementation to
# hold a 2-d list of blocks, but that would complicate the
# code quite a bit.
self._list_of_block_partitions.append(
partition.force_materialization().list_of_block_partitions[0]
)

I would like to consider the possibility of creating a pandas dataframe in a worker process, without creating intermediate objects. The closest implementation to what I think is the best solution here is the following code:

@arunjose696 arunjose696 force-pushed the dfToSinglePartition branch 6 times, most recently from 3880889 to 07522fb Compare February 5, 2024 15:15
@arunjose696
Copy link
Collaborator Author

arunjose696 commented Feb 5, 2024

I would like to consider the possibility of creating a pandas dataframe in a worker process, without creating intermediate objects. The closest implementation to what I think is the best solution here is the following code:

I have done an implementation which makes use of to_pandas and calling it in remote function, could you check this once.

@arunjose696 arunjose696 force-pushed the dfToSinglePartition branch 2 times, most recently from 7813297 to d6e62ba Compare February 7, 2024 12:31
@arunjose696 arunjose696 force-pushed the dfToSinglePartition branch 2 times, most recently from ff5639d to 942a2a9 Compare February 12, 2024 13:01
Signed-off-by: arunjose696 <arunjose696@gmail.com>
Co-authored-by: Anatoly Myachev <anatoliimyachev@mail.com>
anmyachev
anmyachev previously approved these changes Feb 12, 2024
Copy link
Collaborator

@anmyachev anmyachev left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM!

@arunjose696 arunjose696 force-pushed the dfToSinglePartition branch 4 times, most recently from 09f5d6c to b6c2f3b Compare February 12, 2024 15:03
Signed-off-by: Igoshev, Iaroslav <iaroslav.igoshev@intel.com>
@YarShev
Copy link
Collaborator

YarShev commented Feb 13, 2024

@anmyachev, any comments?

@anmyachev anmyachev merged commit 9ff1c15 into modin-project:master Feb 13, 2024
37 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

The query_compiler.merge reconstructs the Right dataframe for every partition of Left Dataframe
3 participants