Skip to content

Commit

Permalink
Migrated Date64 to Timestamp() (#1474)
Browse files Browse the repository at this point in the history
  • Loading branch information
jorgecarleitao committed Oct 11, 2021
1 parent 4d5facf commit 69b3ac7
Show file tree
Hide file tree
Showing 7 changed files with 27 additions and 67 deletions.
4 changes: 2 additions & 2 deletions polars/polars-core/src/datatypes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -496,7 +496,7 @@ impl DataType {
Float64 => ArrowDataType::Float64,
Utf8 => ArrowDataType::LargeUtf8,
Date => ArrowDataType::Date32,
Datetime => ArrowDataType::Date64,
Datetime => ArrowDataType::Timestamp(TimeUnit::Millisecond, None),
List(dt) => ArrowDataType::LargeList(Box::new(arrow::datatypes::Field::new(
"",
dt.to_arrow(),
Expand Down Expand Up @@ -703,7 +703,7 @@ impl From<&ArrowDataType> for DataType {
ArrowDataType::Float64 => DataType::Float64,
ArrowDataType::LargeList(f) => DataType::List(Box::new(f.data_type().into())),
ArrowDataType::Date32 => DataType::Date,
ArrowDataType::Date64 => DataType::Datetime,
ArrowDataType::Timestamp(TimeUnit::Millisecond, None) => DataType::Datetime,
ArrowDataType::Utf8 => DataType::Utf8,
dt => panic!("Arrow datatype {:?} not supported by Polars", dt),
}
Expand Down
29 changes: 17 additions & 12 deletions polars/polars-core/src/series/from.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,11 +146,23 @@ impl std::convert::TryFrom<(&str, Vec<ArrayRef>)> for Series {
.into_series())
}
#[cfg(feature = "dtype-datetime")]
ArrowDataType::Date64 => {
let chunks = cast_chunks(&chunks, &DataType::Int64).unwrap();
Ok(Int64Chunked::new_from_chunks(name, chunks)
.into_date()
.into_series())
ArrowDataType::Timestamp(tu, tz) => {
let s = if tz.is_none() || tz == &Some("".to_string()) {
let chunks = cast_chunks(&chunks, &DataType::Int64).unwrap();
Int64Chunked::new_from_chunks(name, chunks)
.into_date()
.into_series()
} else {
return Err(PolarsError::InvalidOperation(
"Cannot create polars series timestamp with timezone".into(),
));
};
Ok(match tu {
TimeUnit::Second => &s / 1000,
TimeUnit::Millisecond => s,
TimeUnit::Microsecond => &s * 1000,
TimeUnit::Nanosecond => &s * 1000000,
})
}
ArrowDataType::LargeList(_) => {
Ok(ListChunked::new_from_chunks(name, chunks).into_series())
Expand All @@ -163,13 +175,6 @@ impl std::convert::TryFrom<(&str, Vec<ArrayRef>)> for Series {
#[cfg(not(feature = "dtype-i8"))]
Ok(UInt32Chunked::full_null(name, len).into_series())
}
#[cfg(feature = "dtype-datetime")]
ArrowDataType::Timestamp(TimeUnit::Millisecond, None) => {
let chunks = cast_chunks(&chunks, &DataType::Datetime).unwrap();
Ok(Int64Chunked::new_from_chunks(name, chunks)
.into_date()
.into_series())
}
#[cfg(feature = "dtype-categorical")]
ArrowDataType::Dictionary(key_type, value_type) => {
use crate::chunked_array::categorical::CategoricalChunkedBuilder;
Expand Down
7 changes: 4 additions & 3 deletions polars/polars-io/src/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,10 @@ where
W: Write,
{
fn new(buffer: W) -> Self {
// 6f because our precision is milliseconds
// no need for 3 traling zeros
let options = write::SerializeOptions {
date64_format: "%FT%H:%M:%S.%6f".to_string(),
timestamp_format: "%FT%H:%M:%S.%6f".to_string(),
..Default::default()
};

Expand Down Expand Up @@ -121,8 +123,7 @@ where

/// Set the CSV file's timestamp format array in
pub fn with_timestamp_format(mut self, format: String) -> Self {
self.options.timestamp_format = format.clone();
self.options.date64_format = format;
self.options.timestamp_format = format;
self
}

Expand Down
47 changes: 2 additions & 45 deletions polars/polars-io/src/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
use super::{finish_reader, ArrowReader, ArrowResult, RecordBatch};
use crate::prelude::*;
use crate::{PhysicalIoExpr, ScanAggregation};
use arrow::compute::cast;
use arrow::datatypes::PhysicalType;
use arrow::io::parquet::write::{array_to_pages, DynIter, Encoding};
use arrow::io::parquet::{read, write};
Expand Down Expand Up @@ -155,50 +154,8 @@ where

/// Write the given DataFrame in the the writer `W`.
pub fn finish(mut self, df: &DataFrame) -> Result<()> {
let mut fields = df.schema().to_arrow().fields().clone();

// date64 is not supported by parquet and will be be truncated to date32
// We coerce these to timestamp(ms)
let datetime_columns = df
.get_columns()
.iter()
.enumerate()
.filter_map(|(i, s)| match s.dtype() {
DataType::Datetime => {
fields[i] = ArrowField::new(
s.name(),
ArrowDataType::Timestamp(TimeUnit::Millisecond, None),
s.null_count() > 0,
);
Some(i)
}
_ => None,
})
.collect::<Vec<_>>();
let column_names = df
.get_columns()
.iter()
.map(|s| s.name().to_string())
.collect::<Vec<_>>();

let rb_iter = df.iter_record_batches().map(|rb| {
if !datetime_columns.is_empty() {
let mut columns = rb.columns().to_vec();
for i in &datetime_columns {
let array = cast::cast(columns[*i].as_ref(), &ArrowDataType::Int64).unwrap();
let array = cast::cast(
array.as_ref(),
&ArrowDataType::Timestamp(TimeUnit::Millisecond, None),
)
.unwrap()
.into();
columns[*i] = array;
}
RecordBatch::try_from_iter(column_names.iter().zip(columns)).unwrap()
} else {
rb
}
});
let fields = df.schema().to_arrow().fields().clone();
let rb_iter = df.iter_record_batches();

let options = write::WriteOptions {
write_statistics: false,
Expand Down
3 changes: 0 additions & 3 deletions py-polars/polars/eager/frame.py
Original file line number Diff line number Diff line change
Expand Up @@ -801,9 +801,6 @@ def to_parquet(
else:
name = column._name

# parquet casts date64 to date32
if column.type == pa.date64():
column = pa.compute.cast(column, pa.timestamp("ms", None))
data[name] = column
tbl = pa.table(data)

Expand Down
2 changes: 1 addition & 1 deletion py-polars/polars/internals/construction.py
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ def _pandas_series_to_arrow(
np.array(values.values, dtype="datetime64[ms]"), from_pandas=nan_to_none
)
arr = pa.compute.cast(arr, pa.int64())
return pa.compute.cast(arr, pa.date64())
return pa.compute.cast(arr, pa.timestamp("ms"))
elif dtype == "object" and len(values) > 0 and isinstance(values.iloc[0], str):
return pa.array(values, pa.large_utf8(), from_pandas=nan_to_none)
else:
Expand Down
2 changes: 1 addition & 1 deletion py-polars/polars/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ def coerce_arrow(array: "pa.Array") -> "pa.Array":
ts_ms = pa.compute.cast(array, pa.timestamp("ms"), safe=False)
ms = pa.compute.cast(ts_ms, pa.int64())
del ts_ms
array = pa.compute.cast(ms, pa.date64())
array = pa.compute.cast(ms, pa.timestamp("ms"))
del ms
# note: Decimal256 could not be cast to float
elif isinstance(array.type, pa.Decimal128Type):
Expand Down

0 comments on commit 69b3ac7

Please sign in to comment.