Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

BUG: pandas.DataFrame.to_parquet() causing memory leak #55296

Closed
2 of 3 tasks
RizzoV opened this issue Sep 26, 2023 · 7 comments
Closed
2 of 3 tasks

BUG: pandas.DataFrame.to_parquet() causing memory leak #55296

RizzoV opened this issue Sep 26, 2023 · 7 comments
Labels
Arrow pyarrow functionality Bug Performance Memory or execution speed performance Upstream issue Issue related to pandas dependency

Comments

@RizzoV
Copy link

RizzoV commented Sep 26, 2023

Pandas version checks

  • I have checked that this issue has not already been reported.

  • I have confirmed this bug exists on the latest version of pandas.

  • I have confirmed this bug exists on the main branch of pandas.

Reproducible Example

import os
import string
import sys
from random import choice, randint
from uuid import uuid4

import pandas as pd
import pyarrow as pa
from memory_profiler import profile

sample_schema = pa.struct(
    [
        ("a", pa.string()),
        (
            "b",
            pa.struct(
                [
                    ("ba", pa.list_(pa.string())),
                    ("bc", pa.string()),
                    ("bd", pa.string()),
                    ("be", pa.list_(pa.string())),
                    (
                        "bf",
                        pa.list_(
                            pa.struct(
                                [
                                    (
                                        "bfa",
                                        pa.struct(
                                            [
                                                ("bfaa", pa.string()),
                                                ("bfab", pa.string()),
                                                ("bfac", pa.string()),
                                                ("bfad", pa.float64()),
                                                ("bfae", pa.string()),
                                            ]
                                        ),
                                    )
                                ]
                            )
                        ),
                    ),
                ]
            ),
        ),
        ("c", pa.int64()),
        ("d", pa.int64()),
        ("e", pa.string()),
        (
            "f",
            pa.struct(
                [
                    ("fa", pa.string()),
                    ("fb", pa.string()),
                    ("fc", pa.string()),
                    ("fd", pa.string()),
                    ("fe", pa.string()),
                    ("ff", pa.string()),
                    ("fg", pa.string()),
                ]
            ),
        ),
        ("g", pa.int64()),
    ]
)


def generate_random_string(str_length: int) -> str:
    return "".join(
        [choice(string.ascii_lowercase + string.digits) for n in range(str_length)]
    )


@profile
def write_to_parquet(df, output_dir):
    output_path = os.path.join(output_dir, f"{uuid4()}.parquet")
    df.to_parquet(output_path, schema=pa.schema(sample_schema))
    return output_path


@profile
def write_to_json(df, output_dir):
    output_path = os.path.join(output_dir, f"{uuid4()}.json")
    df.to_json(output_path)
    return output_path


def main():
    output_dir = os.path.abspath(
        sys.argv[1]
    )  # destination for temporary files when outputting Parquets/JSONs

    if not os.path.isdir(output_dir):
        print("Creating output dir", output_dir)
        os.makedirs(output_dir)

    for i in range(10000):
        df = pd.DataFrame.from_dict(generate_random_data())
        # pa.jemalloc_set_decay_ms(0)
        output_path = write_to_parquet(df, output_dir)  # memory leak
        # output_path = write_to_json(df, output_dir)  # stable memory usage
        if output_path is not None:
            os.remove(output_path)


def generate_random_data():
    return {
        "a": [generate_random_string(128)],
        "b": [
            {
                "ba": [generate_random_string(128) for i in range(50)],
                "bc": generate_random_string(128),
                "bd": generate_random_string(128),
                "be": [generate_random_string(128) for i in range(50)],
                "bf": [
                    {
                        "bfa": {
                            "bfaa": generate_random_string(128),
                            "bfab": generate_random_string(128),
                            "bfac": generate_random_string(128),
                            "bfad": randint(0, 2**32),
                            "bfae": generate_random_string(128),
                        }
                    }
                ],
            }
        ],
        "c": [randint(0, 2**32)],
        "d": [randint(0, 2**32)],
        "e": [generate_random_string(128)],
        "f": [
            {
                "fa": generate_random_string(128),
                "fb": generate_random_string(128),
                "fc": generate_random_string(128),
                "fd": generate_random_string(128),
                "fe": generate_random_string(128),
                "ff": generate_random_string(128),
                "fg": generate_random_string(128),
            }
        ],
        "g": [randint(0, 2**32)],
    }


if __name__ == "__main__":
    main()

Issue Description

pandas.DataFrame.to_parquet() causes a memory leak when engine='pyarrow' (default option).

Using another engine (e.g.: engine='fastparquet') or outputting the same data in another format (e.g.: pandas.DataFrame.to_json(), see write_to_json() in the Reproducible Example) avoids the memory leak.

The problem seems to be more pronounced on DataFrames containing nested structs. A sample problematic data schema and a compliant data generator is included in the Reproducible Example.

From the Reproducible Example above:

  • 1st pd.DataFrame.to_parquet() call:
Line #    Mem usage    Increment  Occurrences   Line Contents
=============================================================
    74     91.9 MiB     91.9 MiB           1   @profile
    75                                         def write_to_parquet(df, output_dir):
    76     91.9 MiB      0.0 MiB           1       output_path = os.path.join(output_dir, f"{uuid4()}.parquet")
    77     92.1 MiB      0.1 MiB           1       df.to_parquet(output_path, schema=pa.schema(sample_schema))
    78     92.1 MiB      0.0 MiB           1       return output_path
  • 2000th call:
Line #    Mem usage    Increment  Occurrences   Line Contents
=============================================================
    74    140.1 MiB    140.1 MiB           1   @profile
    75                                         def write_to_parquet(df, output_dir):
    76    140.1 MiB      0.0 MiB           1       output_path = os.path.join(output_dir, f"{uuid4()}.parquet")
    77    140.1 MiB      0.0 MiB           1       df.to_parquet(output_path, schema=pa.schema(sample_schema))
    78    140.1 MiB      0.0 MiB           1       return output_path
  • 10000th call:
Line #    Mem usage    Increment  Occurrences   Line Contents
=============================================================
    74    330.5 MiB    330.5 MiB           1   @profile
    75                                         def write_to_parquet(df, output_dir):
    76    330.5 MiB      0.0 MiB           1       output_path = os.path.join(output_dir, f"{uuid4()}.parquet")
    77    330.5 MiB      0.0 MiB           1       df.to_parquet(output_path, schema=pa.schema(sample_schema))
    78    330.5 MiB      0.0 MiB           1       return output_path

VS the same code but setting engine='fastparquet' in pd.DataFrame.to_parquet()

  • 1st call:
Line #    Mem usage    Increment  Occurrences   Line Contents
=============================================================
    74     91.6 MiB     91.7 MiB           1   @profile
    75                                         def write_to_parquet(df, output_dir):
    76     91.6 MiB      0.0 MiB           1       output_path = os.path.join(output_dir, f"{uuid4()}.parquet")
    77     91.6 MiB      0.0 MiB           1       df.to_parquet(output_path, engine='fastparquet')
    78     91.6 MiB      0.0 MiB           1       return output_path
  • 10000th call:
Line #    Mem usage    Increment  Occurrences   Line Contents
=============================================================
    74     91.9 MiB     91.7 MiB           1   @profile
    75                                         def write_to_parquet(df, output_dir):
    76     91.9 MiB      0.0 MiB           1       output_path = os.path.join(output_dir, f"{uuid4()}.parquet")
    77     91.9 MiB      0.0 MiB           1       df.to_parquet(output_path, engine='fastparquet')
    78     91.9 MiB      0.0 MiB           1       return output_path

Expected Behavior

No memory leaks.

Installed Versions

INSTALLED VERSIONS ------------------ commit : e86ed37 python : 3.10.9.final.0 python-bits : 64 OS : Darwin OS-release : 22.6.0 Version : Darwin Kernel Version 22.6.0: Fri Sep 15 13:39:52 PDT 2023; root:xnu-8796.141.3.700.8~1/RELEASE_X86_64 machine : x86_64 processor : i386 byteorder : little LC_ALL : None LANG : it_IT.UTF-8 LOCALE : it_IT.UTF-8

pandas : 2.1.1
numpy : 1.26.0
pytz : 2021.3
dateutil : 2.8.2
setuptools : 65.6.3
pip : 23.2.1
Cython : None
pytest : None
hypothesis : None
sphinx : None
blosc : None
feather : None
xlsxwriter : None
lxml.etree : 4.9.1
html5lib : None
pymysql : None
psycopg2 : None
jinja2 : 3.1.2
IPython : 8.11.0
pandas_datareader : None
bs4 : 4.11.1
bottleneck : None
dataframe-api-compat: None
fastparquet : 0.8.3
fsspec : 2022.10.0
gcsfs : None
matplotlib : 3.7.0
numba : None
numexpr : None
odfpy : None
openpyxl : 3.1.2
pandas_gbq : None
pyarrow : 13.0.0
pyreadstat : None
pyxlsb : None
s3fs : 2022.10.0
scipy : 1.10.1
sqlalchemy : None
tables : None
tabulate : None
xarray : None
xlrd : None
zstandard : None
tzdata : 2023.3
qtpy : None
pyqt5 : None

@RizzoV RizzoV added Bug Needs Triage Issue that has not been reviewed by a pandas team member labels Sep 26, 2023
@mroeschke
Copy link
Member

Thanks. Do you get the same memory leak when you use pure pyarrow code (to_parquet is more-or-less a wrapper around the pyarrow code)?

@mroeschke mroeschke added Needs Info Clarification about behavior needed to assess issue and removed Needs Triage Issue that has not been reviewed by a pandas team member labels Sep 26, 2023
@RizzoV
Copy link
Author

RizzoV commented Sep 28, 2023

I did not try to convert the to_parquet() into equivalent pure PyArrow code, do you have a code snippet I can use to do so and make the test? Thank you.

@mroeschke
Copy link
Member

The to_parquet implementation is around here:

table = self.api.Table.from_pandas(df, **from_pandas_kwargs)

@RizzoV
Copy link
Author

RizzoV commented Oct 2, 2023

Played around with the code you linked for a bit and it looks like the leak is caused exactly by line 190, i.e. the pa.Table.from_pandas() call

table = self.api.Table.from_pandas(df, **from_pandas_kwargs)

Even without writing the resulting table to a file, the leak occurs if the Pandas DataFrame gets converted into a PyArrow Table. Here's the memory usage of the Reproducible Example above at the 10000th iteration replacing write_to_parquet() with a function that just performs the conversion:

Line #    Mem usage    Increment  Occurrences   Line Contents
=============================================================
    84    329.4 MiB    329.4 MiB           1   @profile
    85                                         def convert_df_to_table(df, output_dir):                                  
    86    329.5 MiB      0.0 MiB           1       table = pa.Table.from_pandas(df, schema=pa.schema(sample_schema))

The amount of memory leaked is basically equivalent to the pd.DataFrame.to_parquet() call.

@mroeschke
Copy link
Member

Thanks for looking into it. If this is the case it might be worth opening an issue in the arrow repository about it

@rhshadrach rhshadrach added Upstream issue Issue related to pandas dependency Arrow pyarrow functionality Performance Memory or execution speed performance and removed Needs Info Clarification about behavior needed to assess issue labels Oct 2, 2023
@RizzoV
Copy link
Author

RizzoV commented Oct 3, 2023

Just did it, @mroeschke thank you for the support!

@RizzoV RizzoV changed the title BUG: pandas.DataFrame.to_parquet() causing memory leak BUG: pandas.DataFrame.to_parquet() causing memory leak Oct 3, 2023
@lithomas1
Copy link
Member

Going to close this now, since it looks like it's an Arrow bug.

Please feel free to ping if there's anything else we need to do on our end.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Arrow pyarrow functionality Bug Performance Memory or execution speed performance Upstream issue Issue related to pandas dependency
Projects
None yet
Development

No branches or pull requests

4 participants