Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Commit

Permalink
Added support to write timestamps with timezones.
Browse files Browse the repository at this point in the history
  • Loading branch information
jorgecarleitao committed Nov 22, 2021
1 parent b3ed162 commit e91b0af
Show file tree
Hide file tree
Showing 3 changed files with 328 additions and 80 deletions.
167 changes: 141 additions & 26 deletions src/io/csv/write/serialize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,30 +15,19 @@ use crate::array::{DictionaryArray, DictionaryKey, Offset};
use std::any::Any;

/// Options to serialize logical types to CSV
#[derive(Debug, PartialEq, Eq, Hash, Clone)]
/// The default is to format times and dates as `chrono` crate formats them.
#[derive(Debug, PartialEq, Eq, Hash, Clone, Default)]
pub struct SerializeOptions {
/// used for [`DataType::Date32`]
pub date32_format: String,
pub date32_format: Option<String>,
/// used for [`DataType::Date64`]
pub date64_format: String,
pub date64_format: Option<String>,
/// used for [`DataType::Time32`]
pub time32_format: String,
pub time32_format: Option<String>,
/// used for [`DataType::Time64`]
pub time64_format: String,
pub time64_format: Option<String>,
/// used for [`DataType::Timestamp`]
pub timestamp_format: String,
}

impl Default for SerializeOptions {
fn default() -> Self {
Self {
date32_format: "%F".to_string(),
date64_format: "%F".to_string(),
time32_format: "%T".to_string(),
time64_format: "%T".to_string(),
timestamp_format: "%FT%H:%M:%S.%9f".to_string(),
}
}
pub timestamp_format: Option<String>,
}

fn primitive_write<'a, T: NativeType + ToLexical>(
Expand Down Expand Up @@ -68,16 +57,134 @@ macro_rules! dyn_date {
.as_any()
.downcast_ref::<PrimitiveArray<$ty>>()
.unwrap();
Box::new(BufStreamingIterator::new(
if let Some(format) = $format {
Box::new(BufStreamingIterator::new(
array.iter(),
move |x, buf| {
if let Some(x) = x {
buf.extend_from_slice(($fn)(*x).format(format).to_string().as_bytes())
}
},
vec![],
))
} else {
Box::new(BufStreamingIterator::new(
array.iter(),
move |x, buf| {
if let Some(x) = x {
buf.extend_from_slice(($fn)(*x).to_string().as_bytes())
}
},
vec![],
))
}
}};
}

fn timestamp_with_tz_default<'a>(
array: &'a PrimitiveArray<i64>,
time_unit: TimeUnit,
tz: &str,
) -> Result<Box<dyn StreamingIterator<Item = [u8]> + 'a>> {
let timezone = temporal_conversions::parse_offset(tz);
Ok(match timezone {
Ok(timezone) => Box::new(BufStreamingIterator::new(
array.iter(),
move |x, buf| {
if let Some(x) = x {
buf.extend_from_slice(($fn)(*x).format($format).to_string().as_bytes())
let data =
temporal_conversions::timestamp_to_datetime(*x, time_unit, &timezone)
.to_string();
buf.extend_from_slice(data.as_bytes())
}
},
vec![],
))
}};
)),
#[cfg(feature = "chrono-tz")]
_ => {
let timezone = temporal_conversions::parse_offset_tz(tz)?;
Box::new(BufStreamingIterator::new(
array.iter(),
move |x, buf| {
if let Some(x) = x {
let data =
temporal_conversions::timestamp_to_datetime(*x, time_unit, &timezone)
.to_string();
buf.extend_from_slice(data.as_bytes())
}
},
vec![],
))
}
#[cfg(not(feature = "chrono-tz"))]
_ => {
return Err(crate::error::ArrowError::InvalidArgumentError(
"Invalid Offset format (must be [-]00:00) or chrono-tz feature not active"
.to_string(),
))
}
})
}

fn timestamp_with_tz_with_format<'a>(
array: &'a PrimitiveArray<i64>,
time_unit: TimeUnit,
tz: &str,
format: &'a str,
) -> Result<Box<dyn StreamingIterator<Item = [u8]> + 'a>> {
let timezone = temporal_conversions::parse_offset(tz);
Ok(match timezone {
Ok(timezone) => Box::new(BufStreamingIterator::new(
array.iter(),
move |x, buf| {
if let Some(x) = x {
let data =
temporal_conversions::timestamp_to_datetime(*x, time_unit, &timezone)
.format(format)
.to_string();
buf.extend_from_slice(data.as_bytes())
}
},
vec![],
)),
#[cfg(feature = "chrono-tz")]
_ => {
let timezone = temporal_conversions::parse_offset_tz(tz)?;
Box::new(BufStreamingIterator::new(
array.iter(),
move |x, buf| {
if let Some(x) = x {
let data =
temporal_conversions::timestamp_to_datetime(*x, time_unit, &timezone)
.format(format)
.to_string();
buf.extend_from_slice(data.as_bytes())
}
},
vec![],
))
}
#[cfg(not(feature = "chrono-tz"))]
_ => {
return Err(crate::error::ArrowError::InvalidArgumentError(
"Invalid Offset format (must be [-]00:00) or chrono-tz feature not active"
.to_string(),
))
}
})
}

fn timestamp_with_tz<'a>(
array: &'a PrimitiveArray<i64>,
time_unit: TimeUnit,
tz: &str,
format: Option<&'a str>,
) -> Result<Box<dyn StreamingIterator<Item = [u8]> + 'a>> {
if let Some(format) = format {
timestamp_with_tz_with_format(array, time_unit, tz, format)
} else {
timestamp_with_tz_default(array, time_unit, tz)
}
}

/// Returns a [`StreamingIterator`] that yields `&[u8]` serialized from `array` according to `options`.
Expand Down Expand Up @@ -136,23 +243,23 @@ pub fn new_serializer<'a>(
i32,
temporal_conversions::date32_to_datetime,
array,
&options.date32_format
options.date32_format.as_ref()
)
}
DataType::Time32(TimeUnit::Second) => {
dyn_date!(
i32,
temporal_conversions::time32s_to_time,
array,
&options.time32_format
options.time32_format.as_ref()
)
}
DataType::Time32(TimeUnit::Millisecond) => {
dyn_date!(
i32,
temporal_conversions::time32ms_to_time,
array,
&options.time32_format
options.time32_format.as_ref()
)
}
DataType::Int64 => {
Expand All @@ -163,7 +270,7 @@ pub fn new_serializer<'a>(
i64,
temporal_conversions::date64_to_datetime,
array,
&options.date64_format
options.date64_format.as_ref()
)
}
DataType::Time64(TimeUnit::Microsecond) => {
Expand Down Expand Up @@ -214,6 +321,14 @@ pub fn new_serializer<'a>(
&options.timestamp_format
)
}
DataType::Timestamp(time_unit, Some(tz)) => {
return timestamp_with_tz(
array.as_any().downcast_ref().unwrap(),
*time_unit,
tz.as_ref(),
options.timestamp_format.as_ref().map(|x| x.as_ref()),
)
}
DataType::Float32 => {
dyn_primitive!(f32, array)
}
Expand Down
14 changes: 6 additions & 8 deletions src/temporal_conversions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,17 +63,15 @@ pub fn time32s_to_time(v: i32) -> NaiveTime {
NaiveTime::from_num_seconds_from_midnight(v as u32, 0)
}

/// converts a `i32` representing a `time32(ms)` to [`NaiveDateTime`]
/// converts a `i32` representing a `time32(ms)` to [`NaiveTime`]
#[inline]
pub fn time32ms_to_time(v: i32) -> NaiveTime {
let v = v as i64;
NaiveTime::from_num_seconds_from_midnight(
// extract seconds from milliseconds
(v / MILLISECONDS) as u32,
// discard extracted seconds and convert milliseconds to
// nanoseconds
(v % MILLISECONDS * MICROSECONDS) as u32,
)
let seconds = v / MILLISECONDS;

let milli_to_nano = 1_000_000;
let nano = (v - seconds * MILLISECONDS) * milli_to_nano;
NaiveTime::from_num_seconds_from_midnight(seconds as u32, nano as u32)
}

/// converts a `i64` representing a `time64(us)` to [`NaiveDateTime`]
Expand Down
Loading

0 comments on commit e91b0af

Please sign in to comment.