Skip to content

Commit

Permalink
switch to nanosecond precision (#2016)
Browse files Browse the repository at this point in the history
  • Loading branch information
PSeitz committed May 1, 2023
1 parent cbf2bdc commit ba309e1
Show file tree
Hide file tree
Showing 14 changed files with 115 additions and 98 deletions.
4 changes: 2 additions & 2 deletions columnar/src/column_values/monotonic_mapping.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,12 +139,12 @@ impl MonotonicallyMappableToU64 for i64 {
impl MonotonicallyMappableToU64 for DateTime {
#[inline(always)]
fn to_u64(self) -> u64 {
common::i64_to_u64(self.into_timestamp_micros())
common::i64_to_u64(self.into_timestamp_nanos())
}

#[inline(always)]
fn from_u64(val: u64) -> Self {
DateTime::from_timestamp_micros(common::u64_to_i64(val))
DateTime::from_timestamp_nanos(common::u64_to_i64(val))
}
}

Expand Down
2 changes: 1 addition & 1 deletion columnar/src/column_values/u64_based/stats_collector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ pub struct StatsCollector {
// This is the same as computing the difference between the values and the first value.
//
// This way, we can compress i64-converted-to-u64 (e.g. timestamp that were supplied in
// seconds, only to be converted in microseconds).
// seconds, only to be converted in nanoseconds).
increment_gcd_opt: Option<(NonZeroU64, DividerU64)>,
first_value_opt: Option<u64>,
}
Expand Down
2 changes: 1 addition & 1 deletion columnar/src/columnar/writer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,7 @@ impl ColumnarWriter {
let mut column: ColumnWriter = column_opt.unwrap_or_default();
column.record(
doc,
NumericalValue::I64(datetime.into_timestamp_micros()),
NumericalValue::I64(datetime.into_timestamp_nanos()),
arena,
);
column
Expand Down
2 changes: 1 addition & 1 deletion columnar/src/value.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ impl Coerce for f64 {
impl Coerce for DateTime {
fn coerce(value: NumericalValue) -> Self {
let timestamp_micros = i64::coerce(value);
DateTime::from_timestamp_micros(timestamp_micros)
DateTime::from_timestamp_nanos(timestamp_micros)
}
}

Expand Down
54 changes: 34 additions & 20 deletions common/src/datetime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,11 @@ pub enum DatePrecision {
Milliseconds,
/// Micro-seconds precision.
Microseconds,
/// Nano-seconds precision.
Nanoseconds,
}

/// A date/time value with microsecond precision.
/// A date/time value with nanoseconds precision.
///
/// This timestamp does not carry any explicit time zone information.
/// Users are responsible for applying the provided conversion
Expand All @@ -31,49 +33,56 @@ pub enum DatePrecision {
/// to prevent unintended usage.
#[derive(Clone, Default, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub struct DateTime {
// Timestamp in microseconds.
pub(crate) timestamp_micros: i64,
// Timestamp in nanoseconds.
pub(crate) timestamp_nanos: i64,
}

impl DateTime {
/// Minimum possible `DateTime` value.
pub const MIN: DateTime = DateTime {
timestamp_micros: i64::MIN,
timestamp_nanos: i64::MIN,
};

/// Maximum possible `DateTime` value.
pub const MAX: DateTime = DateTime {
timestamp_micros: i64::MAX,
timestamp_nanos: i64::MAX,
};

/// Create new from UNIX timestamp in seconds
pub const fn from_timestamp_secs(seconds: i64) -> Self {
Self {
timestamp_micros: seconds * 1_000_000,
timestamp_nanos: seconds * 1_000_000_000,
}
}

/// Create new from UNIX timestamp in milliseconds
pub const fn from_timestamp_millis(milliseconds: i64) -> Self {
Self {
timestamp_micros: milliseconds * 1_000,
timestamp_nanos: milliseconds * 1_000_000,
}
}

/// Create new from UNIX timestamp in microseconds.
pub const fn from_timestamp_micros(microseconds: i64) -> Self {
Self {
timestamp_micros: microseconds,
timestamp_nanos: microseconds * 1_000,
}
}

/// Create new from UNIX timestamp in nanoseconds.
pub const fn from_timestamp_nanos(nanoseconds: i64) -> Self {
Self {
timestamp_nanos: nanoseconds,
}
}

/// Create new from `OffsetDateTime`
///
/// The given date/time is converted to UTC and the actual
/// time zone is discarded.
pub const fn from_utc(dt: OffsetDateTime) -> Self {
let timestamp_micros = dt.unix_timestamp() * 1_000_000 + dt.microsecond() as i64;
Self { timestamp_micros }
pub fn from_utc(dt: OffsetDateTime) -> Self {
let timestamp_nanos = dt.unix_timestamp_nanos() as i64;
Self { timestamp_nanos }
}

/// Create new from `PrimitiveDateTime`
Expand All @@ -87,23 +96,27 @@ impl DateTime {

/// Convert to UNIX timestamp in seconds.
pub const fn into_timestamp_secs(self) -> i64 {
self.timestamp_micros / 1_000_000
self.timestamp_nanos / 1_000_000_000
}

/// Convert to UNIX timestamp in milliseconds.
pub const fn into_timestamp_millis(self) -> i64 {
self.timestamp_micros / 1_000
self.timestamp_nanos / 1_000_000
}

/// Convert to UNIX timestamp in microseconds.
pub const fn into_timestamp_micros(self) -> i64 {
self.timestamp_micros
self.timestamp_nanos / 1_000
}

/// Convert to UNIX timestamp in nanoseconds.
pub const fn into_timestamp_nanos(self) -> i64 {
self.timestamp_nanos
}

/// Convert to UTC `OffsetDateTime`
pub fn into_utc(self) -> OffsetDateTime {
let timestamp_nanos = self.timestamp_micros as i128 * 1000;
let utc_datetime = OffsetDateTime::from_unix_timestamp_nanos(timestamp_nanos)
let utc_datetime = OffsetDateTime::from_unix_timestamp_nanos(self.timestamp_nanos as i128)
.expect("valid UNIX timestamp");
debug_assert_eq!(UtcOffset::UTC, utc_datetime.offset());
utc_datetime
Expand All @@ -128,12 +141,13 @@ impl DateTime {
/// Truncates the microseconds value to the corresponding precision.
pub fn truncate(self, precision: DatePrecision) -> Self {
let truncated_timestamp_micros = match precision {
DatePrecision::Seconds => (self.timestamp_micros / 1_000_000) * 1_000_000,
DatePrecision::Milliseconds => (self.timestamp_micros / 1_000) * 1_000,
DatePrecision::Microseconds => self.timestamp_micros,
DatePrecision::Seconds => (self.timestamp_nanos / 1_000_000_000) * 1_000_000_000,
DatePrecision::Milliseconds => (self.timestamp_nanos / 1_000_000) * 1_000_000,
DatePrecision::Microseconds => (self.timestamp_nanos / 1_000) * 1_000,
DatePrecision::Nanoseconds => self.timestamp_nanos,
};
Self {
timestamp_micros: truncated_timestamp_micros,
timestamp_nanos: truncated_timestamp_micros,
}
}
}
Expand Down
81 changes: 44 additions & 37 deletions src/aggregation/bucket/histogram/date_histogram.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ pub struct DateHistogramAggregationReq {
date_interval: Option<String>,
/// The field to aggregate on.
pub field: String,
/// The format to format dates.
/// The format to format dates. Unsupported currently.
pub format: Option<String>,
/// The interval to chunk your data range. Each bucket spans a value range of
/// [0..fixed_interval). Accepted values
Expand Down Expand Up @@ -77,7 +77,7 @@ pub struct DateHistogramAggregationReq {
/// hard_bounds only limits the buckets, to force a range set both extended_bounds and
/// hard_bounds to the same range.
///
/// Needs to be provided as timestamp in microseconds precision.
/// Needs to be provided as timestamp in nanosecond precision.
///
/// ## Example
/// ```json
Expand All @@ -88,7 +88,7 @@ pub struct DateHistogramAggregationReq {
/// "interval": "1d",
/// "hard_bounds": {
/// "min": 0,
/// "max": 1420502400000000
/// "max": 1420502400000000000
/// }
/// }
/// }
Expand All @@ -114,11 +114,11 @@ impl DateHistogramAggregationReq {
self.validate()?;
Ok(HistogramAggregation {
field: self.field.to_string(),
interval: parse_into_microseconds(self.fixed_interval.as_ref().unwrap())? as f64,
interval: parse_into_nanoseconds(self.fixed_interval.as_ref().unwrap())? as f64,
offset: self
.offset
.as_ref()
.map(|offset| parse_offset_into_microseconds(offset))
.map(|offset| parse_offset_into_nanosecs(offset))
.transpose()?
.map(|el| el as f64),
min_doc_count: self.min_doc_count,
Expand Down Expand Up @@ -155,7 +155,7 @@ impl DateHistogramAggregationReq {
));
}

parse_into_microseconds(self.fixed_interval.as_ref().unwrap())?;
parse_into_nanoseconds(self.fixed_interval.as_ref().unwrap())?;

Ok(())
}
Expand All @@ -176,9 +176,12 @@ pub enum DateHistogramParseError {
/// Offset invalid
#[error("passed offset is invalid {0:?}")]
InvalidOffset(String),
/// Value out of bounds
#[error("passed value is out of bounds: {0:?}")]
OutOfBounds(String),
}

fn parse_offset_into_microseconds(input: &str) -> Result<i64, AggregationError> {
fn parse_offset_into_nanosecs(input: &str) -> Result<i64, AggregationError> {
let is_sign = |byte| &[byte] == b"-" || &[byte] == b"+";
if input.is_empty() {
return Err(DateHistogramParseError::InvalidOffset(input.to_string()).into());
Expand All @@ -187,18 +190,18 @@ fn parse_offset_into_microseconds(input: &str) -> Result<i64, AggregationError>
let has_sign = is_sign(input.as_bytes()[0]);
if has_sign {
let (sign, input) = input.split_at(1);
let val = parse_into_microseconds(input)?;
let val = parse_into_nanoseconds(input)?;
if sign == "-" {
Ok(-val)
} else {
Ok(val)
}
} else {
parse_into_microseconds(input)
parse_into_nanoseconds(input)
}
}

fn parse_into_microseconds(input: &str) -> Result<i64, AggregationError> {
fn parse_into_nanoseconds(input: &str) -> Result<i64, AggregationError> {
let split_boundary = input
.as_bytes()
.iter()
Expand Down Expand Up @@ -226,7 +229,11 @@ fn parse_into_microseconds(input: &str) -> Result<i64, AggregationError> {
_ => return Err(DateHistogramParseError::UnitNotRecognized(unit.to_string()).into()),
};

Ok(number * multiplier_from_unit * 1000)
let val = (number * multiplier_from_unit)
.checked_mul(1_000_000)
.ok_or_else(|| DateHistogramParseError::OutOfBounds(input.to_string()))?;

Ok(val)
}

#[cfg(test)]
Expand All @@ -241,49 +248,49 @@ mod tests {
use crate::Index;

#[test]
fn test_parse_into_microseconds() {
assert_eq!(parse_into_microseconds("1m").unwrap(), 60_000_000);
assert_eq!(parse_into_microseconds("2m").unwrap(), 120_000_000);
fn test_parse_into_nanosecs() {
assert_eq!(parse_into_nanoseconds("1m").unwrap(), 60_000_000_000);
assert_eq!(parse_into_nanoseconds("2m").unwrap(), 120_000_000_000);
assert_eq!(
parse_into_microseconds("2y").unwrap_err(),
parse_into_nanoseconds("2y").unwrap_err(),
DateHistogramParseError::UnitNotRecognized("y".to_string()).into()
);
assert_eq!(
parse_into_microseconds("2000").unwrap_err(),
parse_into_nanoseconds("2000").unwrap_err(),
DateHistogramParseError::UnitMissing("2000".to_string()).into()
);
assert_eq!(
parse_into_microseconds("ms").unwrap_err(),
parse_into_nanoseconds("ms").unwrap_err(),
DateHistogramParseError::NumberMissing("ms".to_string()).into()
);
}

#[test]
fn test_parse_offset_into_microseconds() {
assert_eq!(parse_offset_into_microseconds("1m").unwrap(), 60_000_000);
assert_eq!(parse_offset_into_microseconds("+1m").unwrap(), 60_000_000);
assert_eq!(parse_offset_into_microseconds("-1m").unwrap(), -60_000_000);
assert_eq!(parse_offset_into_microseconds("2m").unwrap(), 120_000_000);
assert_eq!(parse_offset_into_microseconds("+2m").unwrap(), 120_000_000);
assert_eq!(parse_offset_into_microseconds("-2m").unwrap(), -120_000_000);
assert_eq!(parse_offset_into_microseconds("-2ms").unwrap(), -2_000);
fn test_parse_offset_into_nanosecs() {
assert_eq!(parse_offset_into_nanosecs("1m").unwrap(), 60_000_000_000);
assert_eq!(parse_offset_into_nanosecs("+1m").unwrap(), 60_000_000_000);
assert_eq!(parse_offset_into_nanosecs("-1m").unwrap(), -60_000_000_000);
assert_eq!(parse_offset_into_nanosecs("2m").unwrap(), 120_000_000_000);
assert_eq!(parse_offset_into_nanosecs("+2m").unwrap(), 120_000_000_000);
assert_eq!(parse_offset_into_nanosecs("-2m").unwrap(), -120_000_000_000);
assert_eq!(parse_offset_into_nanosecs("-2ms").unwrap(), -2_000_000);
assert_eq!(
parse_offset_into_microseconds("2y").unwrap_err(),
parse_offset_into_nanosecs("2y").unwrap_err(),
DateHistogramParseError::UnitNotRecognized("y".to_string()).into()
);
assert_eq!(
parse_offset_into_microseconds("2000").unwrap_err(),
parse_offset_into_nanosecs("2000").unwrap_err(),
DateHistogramParseError::UnitMissing("2000".to_string()).into()
);
assert_eq!(
parse_offset_into_microseconds("ms").unwrap_err(),
parse_offset_into_nanosecs("ms").unwrap_err(),
DateHistogramParseError::NumberMissing("ms".to_string()).into()
);
}

#[test]
fn test_parse_into_milliseconds_do_not_accept_non_ascii() {
assert!(parse_into_microseconds("1m").is_err());
assert!(parse_into_nanoseconds("1m").is_err());
}

pub fn get_test_index_from_docs(
Expand Down Expand Up @@ -361,7 +368,7 @@ mod tests {
"buckets" : [
{
"key_as_string" : "2015-01-01T00:00:00Z",
"key" : 1420070400000000.0,
"key" : 1420070400000000000.0,
"doc_count" : 4
}
]
Expand Down Expand Up @@ -397,7 +404,7 @@ mod tests {
"buckets" : [
{
"key_as_string" : "2015-01-01T00:00:00Z",
"key" : 1420070400000000.0,
"key" : 1420070400000000000.0,
"doc_count" : 4,
"texts": {
"buckets": [
Expand Down Expand Up @@ -444,32 +451,32 @@ mod tests {
"buckets": [
{
"doc_count": 2,
"key": 1420070400000000.0,
"key": 1420070400000000000.0,
"key_as_string": "2015-01-01T00:00:00Z"
},
{
"doc_count": 1,
"key": 1420156800000000.0,
"key": 1420156800000000000.0,
"key_as_string": "2015-01-02T00:00:00Z"
},
{
"doc_count": 0,
"key": 1420243200000000.0,
"key": 1420243200000000000.0,
"key_as_string": "2015-01-03T00:00:00Z"
},
{
"doc_count": 0,
"key": 1420329600000000.0,
"key": 1420329600000000000.0,
"key_as_string": "2015-01-04T00:00:00Z"
},
{
"doc_count": 0,
"key": 1420416000000000.0,
"key": 1420416000000000000.0,
"key_as_string": "2015-01-05T00:00:00Z"
},
{
"doc_count": 1,
"key": 1420502400000000.0,
"key": 1420502400000000000.0,
"key_as_string": "2015-01-06T00:00:00Z"
}
]
Expand Down

0 comments on commit ba309e1

Please sign in to comment.