Skip to content
This repository has been archived by the owner on Jan 11, 2021. It is now read-only.

CSV to Parquet #183

Open
nevi-me opened this issue Nov 3, 2018 · 9 comments
Open

CSV to Parquet #183

nevi-me opened this issue Nov 3, 2018 · 9 comments

Comments

@nevi-me
Copy link

nevi-me commented Nov 3, 2018

Hi, I'm experimenting with creating a CSV to Parquet writer, and I have a few questions.

My endgoal from the experiment is to create a crate that converts various file formats (csv, bson, json) to parquet. This would lend itself to being a possible CSV to Apache Arrow reader.

  1. How can I write strings? I used message schema {REQUIRED BYTE_ARRAY name;}, but when I read a parquet file in Python, the strings are shown as bytes. I found this one, {REQUIRED BYTE_ARRAY name;} (UTF8)

If I have a csv file that looks like:

Id,Name,Age
123-adf,John Doe,25
sfd-ge2,Jane Doe,26

I have written the below:

fn write_parquet() -> Result<(), Box<Error>> {

    // TODO let message_type = build_parquet_schema();
    let message_type = "
        message schema {REQUIRED BYTE_ARRAY Id;REQUIRED BYTE_ARRAY Name;REQUIRED INT32 Age;}
    ";

    let schema = Rc::new(parse_message_type(message_type)?);
    let props = Rc::new(WriterProperties::builder().build());
    let file = File::create("./data/file1.parquet")?;
    let mut writer = SerializedFileWriter::new(file, schema, props).unwrap();
    let mut row_group_writer = writer.next_row_group().unwrap();
    let mut col_number = 0;

    while let Some(mut col_writer) = row_group_writer.next_column().unwrap() {
        col_number = col_number + 1;
        match col_writer {
            ColumnWriter::ByteArrayColumnWriter(ref mut typed_writer) => {
                println!("writing a byte array");
                // I can remove this if-else when I start taking fn parameters of my schema and columns
                if col_number == 1 {
                    typed_writer.write_batch(
                        &[parquet::data_type::ByteArray::from("123-adf"), parquet::data_type::ByteArray::from("sdf-ge2")], None, None
                    )?;
                } else {
                    typed_writer.write_batch(
                        &[parquet::data_type::ByteArray::from("John Doe"), parquet::data_type::ByteArray::from("Jane Doe")], None, None
                    )?;
                }
            },
            ColumnWriter::Int32ColumnWriter(ref mut typed_writer) => {
                println!("writing an integer");
                typed_writer.write_batch(
                    &[25, 26], None, None
                )?;
            },
            _ => {}
        }
        row_group_writer.close_column(col_writer)?;
    }
    writer.close_row_group(row_group_writer)?;
    writer.close()?;
    Ok(())
}
  1. Similar to Q1, am I writing my strings properly, or is there a better way?

  2. From reading through the conversation on How to write a None value to a column #174, it looks like I have to specify an index of where my values are. So, does typed_writer.write_batch(&[24,25,24,26,27,28], None, None) produce a less compact file? Is it even valid?

  3. In general, does this library allow appending to existing parquet files? I haven't tried it yet.

I'm trying a naive approach of first reading csv files and generating the schema with a string builder. If that works, I can look at macros.

The other thing that I'll explore separately is how to convert csv's StringRecord values into parquet Types. Might use regex to figure out if things are strings, i32, i64, timestamps, etc. If someone knows of an existing way, that'd also be welcome.

@sadikovi
Copy link
Collaborator

sadikovi commented Nov 3, 2018

ping @xrl and @sunchao You, guys, are more familiar with Arrow and converting CSV to Parquet.

  1. Yes, you are right, it is a byte array with a UTF8 logical type. Here is the doc that might help you: https://github.com/apache/parquet-format/blob/master/LogicalTypes.md

  2. Looks right to me. You just need to specify UTF8 for that column in your schema.

  3. Don't quite understand what you mean. Definition and repetition levels are for the cases when you nulls or repeated fields, like arrays, maps, etc. If you don't have either of those, that is the way to write values. It should be faster to write when you just have a list of non-null values, because we short circuit a few things in the code. Not sure about less compact file, because it depends on values you write. Imagine if you write 100 identical non-null strings in one file and 100 identical strings, but 99 of them are nulls, in the other. Of course, the second file will be smaller, because we would write only 1 string and then store definition levels like 1-99.

  4. I don't think you can append to parquet files in general, due to their structure. You would have to either create a new file, or read records from one, add new ones and write a new file. I know that you don't have to necessarily decompress values, you can just append row groups or something like that, parquet-mr has tooling for it - please, correct me if I am wrong. This library does not have anything for it yet - might do later.

@sadikovi
Copy link
Collaborator

sadikovi commented Nov 3, 2018

It looks like we do need a high-level API for writes. @xrl @sunchao let me know if this is required, I can create an SPIP issue and kind of small design doc for it.

@nevi-me
Copy link
Author

nevi-me commented Nov 3, 2018

Thanks @sadikovi, for number 4, I forgot that when modifying a parquet, you indeed rewrite a new file.

On number 3, a csv use-case is simpler because I won't have nested values. I might however have nulls. Your explanation addresses most of my question. so for nulls I might be able to:

let all_fields: Vec<Type> = some_data;

let unique_vals: Vec<Type> = some_data.distinct();

let positions: Vec<usize> = fn_field_positions(&all_fields, &unique_vals);

typed_writer.write_batch(&all_fields, None, Some(positions)) // here I'm trying to write unique vals and their positions. This might obviously be incorrect

@sadikovi
Copy link
Collaborator

sadikovi commented Nov 3, 2018 via email

@nevi-me
Copy link
Author

nevi-me commented Nov 3, 2018

I've made some progress with generating a schema from inspecting a sample of csv values. An easier write API would be great, as right now I don't know how to deal with nulls.

To deal with nulls, would a function/interface that takes Option<Type> work?

@sadikovi
Copy link
Collaborator

sadikovi commented Nov 3, 2018 via email

@xrl
Copy link
Contributor

xrl commented Nov 3, 2018

Great question! I had a similar problem and this is what I came up with:

let mut column_writer = row_group.next_column().unwrap().unwrap();
write_vec_of_option_borrowed_string_single_pass(&quoted_wares, |qw| qw.as_ref().map(|x| Some(&x.name[..])).unwrap_or(None), &mut column_writer).unwrap();
row_group.close_column(column_writer).unwrap();

and the awkwardly named write_vec_of_option_borrowed_string_single_pass:

fn write_vec_of_option_borrowed_string_single_pass<T,F>(values: &[T], accessor: F, column_writer: &mut ColumnWriter) -> Result<(),()>
    where F: Fn(&T) -> Option<&str> {

    let mut col = Vec::with_capacity(values.len());
    // I think this is what you care about
    let mut def_levels = Vec::with_capacity(values.len());

    for value in values {
        let opt = accessor(&value);
        if let Some(ref inner) = opt {
            col.push(inner.as_bytes().to_owned().into());
            // there is a value present so put a 1 in the definition level (acts like a bitmask)
            def_levels.push(1);
        } else {
            // there is no value so put a 0 for the definition level
            def_levels.push(0);
        }
    }

    if let ColumnWriter::ByteArrayColumnWriter(ref mut typed) = column_writer {
        typed.write_batch(&col[..], Some(&def_levels[..]), None).unwrap();
    } else {
        return Err(())
    }

    Ok(())
}

it gets even more complicated if you schema is nested so thankfully I've avoided that! I wouldn't know where to start with building out that code. You'll also see that my code isn't as generic as it could be, that's mostly just related to the lack of generality for writing strings (they have to be owned bytes).

@xrl
Copy link
Contributor

xrl commented Nov 3, 2018

@sadikovi I do agree that a high level write API is required. I've been hammering out my diesel-to-parquet code and I've been writing many different flavors of Vec<Option> writers and stumbling over keeping the column writers lined up with the schema. I have some observations that are probably best put in an issue. I'll wait for you to post your design and I will definitely chime in!

@nevi-me
Copy link
Author

nevi-me commented Nov 3, 2018

@xrl, I haven't gotten there. with nulls. Here's my code to read a csv with strings and integers https://gist.github.com/nevi-me/443025fe11038e2709083db2e24a5e64

I can do trial & error for other field types. Not the most effecient, but it's a start.

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants