Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Correctly coerce Parquet Int96 timestamps into requested TimeUnits #1532

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
77 changes: 67 additions & 10 deletions src/io/parquet/read/deserialize/simple.rs
Original file line number Diff line number Diff line change
Expand Up @@ -391,6 +391,44 @@ fn unifiy_timestmap_unit(
}
}

#[inline]
pub fn int96_to_i64_us(value: [u32; 3]) -> i64 {
const JULIAN_DAY_OF_EPOCH: i64 = 2_440_588;
const SECONDS_PER_DAY: i64 = 86_400;
const MICROS_PER_SECOND: i64 = 1_000_000;

let day = value[2] as i64;
let microseconds = (((value[1] as i64) << 32) + value[0] as i64) / 1_000;
let seconds = (day - JULIAN_DAY_OF_EPOCH) * SECONDS_PER_DAY;

seconds * MICROS_PER_SECOND + microseconds
}

#[inline]
pub fn int96_to_i64_ms(value: [u32; 3]) -> i64 {
const JULIAN_DAY_OF_EPOCH: i64 = 2_440_588;
const SECONDS_PER_DAY: i64 = 86_400;
const MILLIS_PER_SECOND: i64 = 1_000;

let day = value[2] as i64;
let milliseconds = (((value[1] as i64) << 32) + value[0] as i64) / 1_000_000;
let seconds = (day - JULIAN_DAY_OF_EPOCH) * SECONDS_PER_DAY;

seconds * MILLIS_PER_SECOND + milliseconds
}

#[inline]
pub fn int96_to_i64_s(value: [u32; 3]) -> i64 {
const JULIAN_DAY_OF_EPOCH: i64 = 2_440_588;
const SECONDS_PER_DAY: i64 = 86_400;

let day = value[2] as i64;
let seconds = (((value[1] as i64) << 32) + value[0] as i64) / 1_000_000_000;
let day_seconds = (day - JULIAN_DAY_OF_EPOCH) * SECONDS_PER_DAY;

day_seconds + seconds
}

fn timestamp<'a, I: Pages + 'a>(
pages: I,
physical_type: &PhysicalType,
Expand All @@ -401,16 +439,35 @@ fn timestamp<'a, I: Pages + 'a>(
time_unit: TimeUnit,
) -> Result<ArrayIter<'a>> {
if physical_type == &PhysicalType::Int96 {
let iter = primitive::Iter::new(pages, data_type, num_rows, chunk_size, int96_to_i64_ns);
let logical_type = PrimitiveLogicalType::Timestamp {
unit: ParquetTimeUnit::Nanoseconds,
is_adjusted_to_utc: false,
};
let (factor, is_multiplier) = unifiy_timestmap_unit(&Some(logical_type), time_unit);
return match (factor, is_multiplier) {
(1, _) => Ok(dyn_iter(iden(iter))),
(a, true) => Ok(dyn_iter(op(iter, move |x| x * a))),
(a, false) => Ok(dyn_iter(op(iter, move |x| x / a))),
return match time_unit {
TimeUnit::Nanosecond => Ok(dyn_iter(iden(primitive::Iter::new(
pages,
data_type,
num_rows,
chunk_size,
int96_to_i64_ns,
)))),
TimeUnit::Microsecond => Ok(dyn_iter(iden(primitive::Iter::new(
pages,
data_type,
num_rows,
chunk_size,
int96_to_i64_us,
)))),
TimeUnit::Millisecond => Ok(dyn_iter(iden(primitive::Iter::new(
pages,
data_type,
num_rows,
chunk_size,
int96_to_i64_ms,
)))),
TimeUnit::Second => Ok(dyn_iter(iden(primitive::Iter::new(
pages,
data_type,
num_rows,
chunk_size,
int96_to_i64_s,
)))),
};
};

Expand Down
58 changes: 58 additions & 0 deletions tests/it/io/parquet/read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -770,3 +770,61 @@ fn invalid_utf8() -> Result<()> {
);
Ok(())
}

#[test]
fn read_int96_timestamps() -> Result<()> {
use std::collections::BTreeMap;

let timestamp_data = &[
0x50, 0x41, 0x52, 0x31, 0x15, 0x04, 0x15, 0x48, 0x15, 0x3c, 0x4c, 0x15, 0x06, 0x15, 0x00,
0x12, 0x00, 0x00, 0x24, 0x00, 0x00, 0x0d, 0x01, 0x08, 0x9f, 0xd5, 0x1f, 0x0d, 0x0a, 0x44,
0x00, 0x00, 0x59, 0x68, 0x25, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x14,
0xfb, 0x2a, 0x00, 0x15, 0x00, 0x15, 0x14, 0x15, 0x18, 0x2c, 0x15, 0x06, 0x15, 0x10, 0x15,
0x06, 0x15, 0x06, 0x1c, 0x00, 0x00, 0x00, 0x0a, 0x24, 0x02, 0x00, 0x00, 0x00, 0x06, 0x01,
0x02, 0x03, 0x24, 0x00, 0x26, 0x9e, 0x01, 0x1c, 0x15, 0x06, 0x19, 0x35, 0x10, 0x00, 0x06,
0x19, 0x18, 0x0a, 0x74, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x73, 0x15, 0x02,
0x16, 0x06, 0x16, 0x9e, 0x01, 0x16, 0x96, 0x01, 0x26, 0x60, 0x26, 0x08, 0x29, 0x2c, 0x15,
0x04, 0x15, 0x00, 0x15, 0x02, 0x00, 0x15, 0x00, 0x15, 0x10, 0x15, 0x02, 0x00, 0x00, 0x00,
0x15, 0x04, 0x19, 0x2c, 0x35, 0x00, 0x18, 0x06, 0x73, 0x63, 0x68, 0x65, 0x6d, 0x61, 0x15,
0x02, 0x00, 0x15, 0x06, 0x25, 0x02, 0x18, 0x0a, 0x74, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61,
0x6d, 0x70, 0x73, 0x00, 0x16, 0x06, 0x19, 0x1c, 0x19, 0x1c, 0x26, 0x9e, 0x01, 0x1c, 0x15,
0x06, 0x19, 0x35, 0x10, 0x00, 0x06, 0x19, 0x18, 0x0a, 0x74, 0x69, 0x6d, 0x65, 0x73, 0x74,
0x61, 0x6d, 0x70, 0x73, 0x15, 0x02, 0x16, 0x06, 0x16, 0x9e, 0x01, 0x16, 0x96, 0x01, 0x26,
0x60, 0x26, 0x08, 0x29, 0x2c, 0x15, 0x04, 0x15, 0x00, 0x15, 0x02, 0x00, 0x15, 0x00, 0x15,
0x10, 0x15, 0x02, 0x00, 0x00, 0x00, 0x16, 0x9e, 0x01, 0x16, 0x06, 0x26, 0x08, 0x16, 0x96,
0x01, 0x14, 0x00, 0x00, 0x28, 0x20, 0x70, 0x61, 0x72, 0x71, 0x75, 0x65, 0x74, 0x2d, 0x63,
0x70, 0x70, 0x2d, 0x61, 0x72, 0x72, 0x6f, 0x77, 0x20, 0x76, 0x65, 0x72, 0x73, 0x69, 0x6f,
0x6e, 0x20, 0x31, 0x32, 0x2e, 0x30, 0x2e, 0x30, 0x19, 0x1c, 0x1c, 0x00, 0x00, 0x00, 0x95,
0x00, 0x00, 0x00, 0x50, 0x41, 0x52, 0x31,
];

let parse = |time_unit: TimeUnit| {
let mut reader = Cursor::new(timestamp_data);
let metadata = read_metadata(&mut reader)?;
let schema = arrow2::datatypes::Schema {
fields: vec![arrow2::datatypes::Field::new(
"timestamps",
arrow2::datatypes::DataType::Timestamp(time_unit, None),
false,
)],
metadata: BTreeMap::new(),
};
let reader = FileReader::new(reader, metadata.row_groups, schema, Some(5), None, None);
reader.collect::<Result<Vec<_>>>()
};

// This data contains int96 timestamps in the year 1000 and 3000, which are out of range for
// Timestamp(TimeUnit::Nanoseconds) and will cause a panic in dev builds/overflow in release builds
// However, the code should work for the Microsecond/Millisecond time units
for time_unit in [
arrow2::datatypes::TimeUnit::Microsecond,
arrow2::datatypes::TimeUnit::Millisecond,
arrow2::datatypes::TimeUnit::Second,
] {
parse(time_unit).expect("Should not error");
}
std::panic::catch_unwind(|| parse(arrow2::datatypes::TimeUnit::Nanosecond))
.expect_err("Should be a panic error");

Ok(())
}
Loading