Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Slicing nullable list arrays into multiple parquet pages doesn't work #1356

Closed
tjwilson90 opened this issue Jan 12, 2023 · 3 comments · Fixed by #1390
Closed

Slicing nullable list arrays into multiple parquet pages doesn't work #1356

tjwilson90 opened this issue Jan 12, 2023 · 3 comments · Fixed by #1390
Labels
bug Something isn't working

Comments

@tjwilson90
Copy link

arrow version 0.15

use arrow2::array::{MutableArray, MutableListArray, MutableUtf8Array, TryPush};
use arrow2::chunk::Chunk;
use arrow2::datatypes::{DataType, Field, Schema};
use arrow2::io::parquet::read;
use arrow2::io::parquet::write::{
    transverse, CompressionOptions, Encoding, FileWriter, RowGroupIterator, Version, WriteOptions,
};
use std::fs::File;

fn main() {
    let mut array = MutableListArray::<i32, MutableUtf8Array<i32>>::new();
    array
        .try_push(Some([
            Some("This is some data"),
            Some(" to include"),
            Some(" in the "),
            Some("output of this array"),
        ]))
        .unwrap();
    array.push_null();
    array.push_null();
    array.push_null();
    array
        .try_push(Some([
            Some(" assuming"),
            Some(" this works properly."),
            Some(" Unfortunately, "),
        ]))
        .unwrap();
    array.push_null();
    array
        .try_push(Some([
            Some("it does not appear"),
            Some(" to work properly"),
        ]))
        .unwrap();
    array.push_null();
    array.push_null();
    array.push_null();
    array
        .try_push(Some([
            Some(" and some of these strings "),
            Some("will end up missing in the final"),
            Some(" output."),
        ]))
        .unwrap();
    array.push_null();
    array.push_null();
    array.push_null();
    array.push_null();
    let array = array.into_box();
    let schema = Schema::from(vec![Field::new(
        "lists",
        DataType::List(Box::new(Field::new("strings", DataType::Utf8, false))),
        true,
    )]);
    let chunk = Chunk::new(vec![array]);

    let options = WriteOptions {
        write_statistics: true,
        compression: CompressionOptions::Uncompressed,
        version: Version::V2,
        data_pagesize_limit: Some(64),
    };

    let iter = vec![Ok(chunk)];

    let encodings = schema
        .fields
        .iter()
        .map(|f| transverse(&f.data_type, |_| Encoding::Plain))
        .collect();

    let row_groups =
        RowGroupIterator::try_new(iter.into_iter(), &schema, options, encodings).unwrap();

    let mut file = File::create("list.parquet").unwrap();
    let mut writer = FileWriter::try_new(&mut file, schema, options).unwrap();
    for group in row_groups {
        writer.write(group.unwrap()).unwrap();
    }
    writer.end(None).unwrap();

    let mut reader = File::open("list.parquet").unwrap();
    let metadata = read::read_metadata(&mut reader).unwrap();
    let schema = read::infer_schema(&metadata).unwrap();
    let row_groups = metadata.row_groups;
    let chunks = read::FileReader::new(reader, row_groups, schema, Some(1024 * 8 * 8), None, None);
    for chunk in chunks {
        let chunk = chunk.unwrap();
        dbg!(chunk);
    }
}

outputs

[src/main.rs:91] chunk = Chunk {
    arrays: [
        ListArray[[This is some data,  to include,  in the , output of this array], None, None, [], [], None, [it does not appear,  to work properly], None, None, [], [], None, [], None, None],
    ],
}

but should output

[src/main.rs:91] chunk = Chunk {
    arrays: [
        ListArray[[This is some data,  to include,  in the , output of this array], None, None, None, [ assuming,  this works properly.,  Unfortunately, ], None, [it does not appear,  to work properly], None, None, None, [ and some of these strings , will end up missing in the final,  output.], None, None, None, None],
    ],
}

While debugging this in the context in which I originally found it, it appeared that the data page headers in the written parquet file were incorrect, so I'm pretty sure the problem is with writing, not reading. I'm pretty confident it's caused somehow by partitioning nullable list columns into multiple data pages since increasing data_pagesize_limit to a force only a single page to be created avoids the issue.

@ritchie46

@tjwilson90
Copy link
Author

It looks like the issue has to do with the definition level encoder.

When written as a single page the reps, defs, and values are as follows

reps = [0, 1, 1, 1, 0, 0, 0, 0, 1, 1, 0, 0, 1, 0, 0, 0, 0, 1, 1, 0, 0, 0, 0]
defs = [2, 2, 2, 2, 0, 0, 0, 2, 2, 2, 0, 2, 2, 0, 0, 0, 2, 2, 2, 0, 0, 0, 0]
vals = ["This is some data", " to include", " in the ", "output of this array", " assuming", " this works properly.", " Unfortunately, ", "it does not appear", " to work properly", " and some of these strings ", "will end up missing in the final", " output."]

but when written as multiple pages you get

reps = [0, 1, 1, 1, 0, 0]
defs = [2, 2, 2, 2, 0, 0]
vals = ["This is some data", " to include", " in the ", "output of this array"]

reps = [0, 0, 1, 1, 0]
defs = [1, 1, 1, 1, 0]
vals = [" assuming", " this works properly.", " Unfortunately, "]

reps = [0, 1, 0, 0]
defs = [2, 2, 0, 0]
vals = ["it does not appear", " to work properly"]

reps = [0, 0, 1, 1, 0]
defs = [1, 1, 1, 1, 0]
vals = [" and some of these strings ", "will end up missing in the final", " output."]

reps = [0, 0, 0]
defs = [1, 0, 0]
vals = []

@tjwilson90
Copy link
Author

This commit tjwilson90@f8b0cca appears to partially address the issue, but it doesn't fix all the problems I'm seeing in the non-minimized application I'm attempting to upgrade that makes lists of structs containing strings.

One part that looks quite suspicious to me but I haven't figured out how to correct is how arrow2::io::parquet::write::nested::rep::num_values is supposed to work when the nested vec has been sliced. Calling slice_parquet_array does nothing to change the length of a Nested::Struct. This means that the num_values call will always base its calculation on the total number of structs in the unsliced column, not the number of structs in the sliced page.

@tjwilson90
Copy link
Author

A similar problem exists with ListArray.slice when a list contains structs since the underlying StructArray isn't changed. As an illustration, this program outputs a 25MB parquet file even though the values in the output should only be two ints.

use arrow2::array::{Int32Array, ListArray, StructArray};
use arrow2::chunk::Chunk;
use arrow2::datatypes::{DataType, Field, Schema};
use arrow2::io::parquet::write::{
    transverse, CompressionOptions, Encoding, FileWriter, RowGroupIterator, Version, WriteOptions,
};
use std::fs::File;
use arrow2::offset::{OffsetsBuffer};

fn main() {
    let array = Int32Array::from_values(0..100_000_000);
    let dtype = DataType::Struct(vec![Field::new("ints", DataType::Int32, false)]);
    let array = StructArray::new(dtype.clone(), vec![array.boxed()], None);
    let offsets = OffsetsBuffer::try_from((0..100_000_001).collect::<Vec<_>>()).unwrap();
    let dtype = DataType::List(Box::new(Field::new("structs", dtype, false)));
    let array = ListArray::new(dtype.clone(), offsets, array.boxed(), None);
    let array = array.slice(0, 2);
    let schema = Schema::from(vec![Field::new(
        "lists",
        dtype,
        false,
    )]);
    let chunk = Chunk::new(vec![array.boxed()]);

    let options = WriteOptions {
        write_statistics: true,
        compression: CompressionOptions::Uncompressed,
        version: Version::V2,
        data_pagesize_limit: None,
    };

    let iter = vec![Ok(chunk)];

    let encodings = schema
        .fields
        .iter()
        .map(|f| transverse(&f.data_type, |_| Encoding::Plain))
        .collect();

    let row_groups =
        RowGroupIterator::try_new(iter.into_iter(), &schema, options, encodings).unwrap();

    let mut file = File::create("list.parquet").unwrap();
    let mut writer = FileWriter::try_new(&mut file, schema, options).unwrap();
    for group in row_groups {
        writer.write(group.unwrap()).unwrap();
    }
    writer.end(None).unwrap();
}

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
bug Something isn't working
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants