Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

LazyFrame - Unnested columns are missing in Lazy Frame #16460

Closed
2 tasks done
herrmann1981 opened this issue May 24, 2024 · 8 comments
Closed
2 tasks done

LazyFrame - Unnested columns are missing in Lazy Frame #16460

herrmann1981 opened this issue May 24, 2024 · 8 comments
Labels
bug Something isn't working needs triage Awaiting prioritization by a maintainer python Related to Python Polars

Comments

@herrmann1981
Copy link

Checks

  • I have checked that this issue has not already been reported.
  • I have confirmed this bug exists on the latest version of Polars.

Reproducible example

import random
import string
import polars as pl

# Generate test data (just some random data with an index column)
# We do this to try to closely match the actual data
LENGTH = 1000
data = pl.LazyFrame(
    {
        'index': [x for x in range(LENGTH)],
        'payload': [random.randint(0, x + 1) for x in range(LENGTH)],
        'category': [''.join(random.choices(string.ascii_lowercase, k=10)) for x in range(LENGTH)]
    }
)

# We apply a function to the Lazy Frame that returns a tuple of dicts
lf_temp = data\
    .with_columns(pl.struct(['index', 'payload', 'category'])
                  .map_elements(lambda row: ({'a': row['index'], 'b': row['payload'], 'c': row['category']},
                                      {'a': row['index'] + 1, 'b': row['payload'] + 1, 'c': row['category']}),
                         return_dtype=pl.List(pl.Struct))
                  .alias('struct_column')) \

# Now we Explode and unnest this newly generated struct column
lf_result = lf_temp.explode('struct_column').unnest('struct_column')

print('Schema of Lazy Frame')
print(lf_result.schema)
# Will give:
# OrderedDict([('index', Int64), ('payload', Int64), ('category', String)])
lf_result.sink_parquet(path='lazy_frame.parquet', compression='zstd', compression_level=16, statistics=True)

print('Schema of Data Frame')
print(lf_result.collect().schema)
# Will give:
# OrderedDict([('index', Int64), ('payload', Int64), ('category', String),  ('a', Int64), ('b', Int64), ('c', String)])
lf_result.collect().write_parquet(file='data_frame.parquet', compression='zstd', compression_level=16, statistics=True)

# The Data Frame shows the exploded / unnested columns, but the Lazy Frame not
# Unfortunately we can not collect the data for the real use case due to the amount of data

Log output

Schema of Lazy Frame
OrderedDict([('index', Int64), ('payload', Int64), ('category', String)])
RUN STREAMING PIPELINE
[df -> hstack -> function -> function -> parquet_sink]
Schema of Data Frame
OrderedDict([('index', Int64), ('payload', Int64), ('category', String), ('a', Int64), ('b', Int64), ('c', String)])

Process finished with exit code 0

Issue description

We are processing large amounts of data, where we need to use the streaming feature because the data will not fit into RAM. Also we are running on a Kubernetes cluster where it is desirable to have a more constant RAM consumption.

We have a custom function that will return a List of Structs. This column needs to be exploded and unested . Unfortunately this unnested columns are then not part of the result. When we sink the results to an output parquet file the columns are missing.
When we collect the lazy frame into a DataFrame, the columns are present. Unfortunately we can not collect the results due to Resource limitations.

This only happens when we use the "map_elements" function. When the List of structs is already within the source data, then the explode and unnest is working as expected. For example this here is working as expected:

import random
import string
import polars as pl

# Generate test data (just some random data with an index column)
# We do this to try to closely match the actual data
LENGTH = 1000
lf_temp = pl.LazyFrame(
    {
        'index': [x for x in range(LENGTH)],
        'struct_column': [(
            {
                'a': random.randint(0, x + 1),
                'b': ''.join(random.choices(string.ascii_lowercase, k=10))
            },
            {
                'a': random.randint(0, x + 1),
                'b': ''.join(random.choices(string.ascii_lowercase, k=10))
            }
        ) for x in range(LENGTH)],
    }
)

# Now we Explode and unnest this newly generated struct column
lf_result = lf_temp.explode('struct_column').unnest('struct_column')

print('Schema of Lazy Frame')
print(lf_result.schema)
# Will give:
# OrderedDict([('index', Int64), ('a', Int64), ('b', String)])

print('Schema of Data Frame')
print(lf_result.collect().schema)
# Will give:
# OrderedDict([('index', Int64), ('a', Int64), ('b', String)])

Expected behavior

The expectation would be that the schema of the result Lazy Frame is the same as the Data Frame that is collected.

Installed versions

--------Version info---------
Polars:               0.20.29
Index type:           UInt32
Platform:             Windows-10-10.0.19045-SP0
Python:               3.11.3 (tags/v3.11.3:f3909b8, Apr  4 2023, 23:49:59) [MSC v.1934 64 bit (AMD64)]
----Optional dependencies----
adbc_driver_manager:  <not installed>
cloudpickle:          <not installed>
connectorx:           <not installed>
deltalake:            <not installed>
fastexcel:            <not installed>
fsspec:               <not installed>
gevent:               <not installed>
hvplot:               <not installed>
matplotlib:           <not installed>
nest_asyncio:         <not installed>
numpy:                1.26.4
openpyxl:             <not installed>
pandas:               <not installed>
pyarrow:              <not installed>
pydantic:             <not installed>
pyiceberg:            <not installed>
pyxlsb:               <not installed>
sqlalchemy:           <not installed>
torch:                <not installed>
xlsx2csv:             <not installed>
xlsxwriter:           <not installed>
@herrmann1981 herrmann1981 added bug Something isn't working needs triage Awaiting prioritization by a maintainer python Related to Python Polars labels May 24, 2024
@coastalwhite

This comment was marked as resolved.

@cmdlineluser
Copy link
Contributor

I'm not sure if it is the underlying cause, but whilst trying to reduce down your example, the sink_parquet panics:

import polars as pl

lf = pl.LazyFrame({
    "index": [0, 1, 2],
    "payload": [1, 1, 1],
    "category": ["a", "b", "c"]
})

lf_temp = (
    lf.with_columns(
        pl.struct("index", "payload", "category").map_elements(lambda row: 
            [
                {"a": row["index"], "b": row["payload"], "c": row["category"]},
                {"a": row["index"] + 1, "b": row["payload"] + 1, "c": row["category"]}
            ],
            return_dtype=pl.List(pl.Struct)
        )
        .alias("struct_column")
    )
)

lf_result = lf_temp.explode("struct_column").unnest("struct_column")

lf_result.sink_parquet("lazy_frame.parquet")
# pyo3_runtime.PanicException: called `Result::unwrap()` on an `Err` value: 
# ComputeError(ErrString("a StructArray must contain at least one field"))

The .collect() output looks OK:

shape: (6, 6)
┌───────┬─────────┬──────────┬─────┬─────┬─────┐
│ indexpayloadcategoryabc   │
│ ------------------ │
│ i64i64stri64i64str │
╞═══════╪═════════╪══════════╪═════╪═════╪═════╡
│ 01a01a   │
│ 01a12a   │
│ 11b11b   │
│ 11b22b   │
│ 21c21c   │
│ 21c32c   │
└───────┴─────────┴──────────┴─────┴─────┴─────┘

@herrmann1981
Copy link
Author

Interestingly from your above example this fails:

lf_result.sink_parquet("lazy_frame.parquet")

But this succeeds:

lf_result.collect().lazy().sink_parquet("lazy_frame.parquet")

@cmdlineluser
Copy link
Contributor

Yeah, the collect also panics with streaming enabled:

lf_result.collect(streaming=True)

# PanicException: called `Result::unwrap()` on an `Err` value: 
# ComputeError(ErrString("a StructArray must contain at least one field"))

@cmdlineluser
Copy link
Contributor

I just tried this again on the latest main and it still fails.

Specifying the full schema allows it to run as expected.

return_dtype=pl.List(pl.Struct({'a': pl.Int64, 'b': pl.Int64, 'c': pl.String}))

It seems allowing an empty pl.Struct leads to the problem here?

@herrmann1981
Copy link
Author

@cmdlineluser Thanks for your feedback. When I specify the full schema then streaming works for me. Also my complete use case not only the example above.
Problem is solved for me.

@jpfeuffer
Copy link

Sorry, but this cannot be the solution. What if you have an unknown amount of fields that are inferred by polars in the unnest function.
You cannot specify a schema in that case without replicating the inference process of polars.
This is an obvious bug to me and IMHO should be reopened.

@jpfeuffer
Copy link

jpfeuffer commented Jun 25, 2024

@cmdlineluser Thank you for reopening 🙏🏻 or linking

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working needs triage Awaiting prioritization by a maintainer python Related to Python Polars
Projects
None yet
Development

No branches or pull requests

4 participants