From 910d36c02ccb2e2e897666171932fe2381a2f9de Mon Sep 17 00:00:00 2001 From: Stefan Ellmauthaler <71695780+ellmau@users.noreply.github.com> Date: Mon, 21 Nov 2022 15:38:22 +0100 Subject: [PATCH] Add String datatype to stage2 (#68) * Add String datatype to stage2 * Fix default method for the Dictionary trait * Update simple unit test to use String * Extend Documentation * implements #83 --- src/io/csv.rs | 52 ++++++++++++++++--- .../columns/adaptive_column_builder.rs | 11 ++++ src/physical/columns/column.rs | 2 + src/physical/columns/interval_column.rs | 2 + src/physical/columns/ranged_column_scan.rs | 21 +++++--- src/physical/datatypes/data_type_name.rs | 6 ++- src/physical/datatypes/data_value.rs | 44 +++++++++++++--- src/physical/dictionary.rs | 9 ++-- src/physical/tables/materialize.rs | 2 + src/physical/tables/trie.rs | 7 ++- src/physical/tables/trie_difference.rs | 2 + src/physical/tables/trie_join.rs | 8 +-- src/physical/tables/trie_project.rs | 3 ++ src/physical/tables/trie_select.rs | 4 ++ src/physical/tables/trie_union.rs | 1 + src/physical/util.rs | 3 +- 16 files changed, 149 insertions(+), 28 deletions(-) diff --git a/src/io/csv.rs b/src/io/csv.rs index de630cfdb..7d730d942 100644 --- a/src/io/csv.rs +++ b/src/io/csv.rs @@ -2,13 +2,20 @@ use crate::error::Error; use crate::physical::datatypes::{data_value::VecT, DataTypeName}; +use crate::physical::dictionary::Dictionary; use csv::Reader; /// Imports a csv file -/// Needs a list of Options of [DataTypeName] and a [csv::Reader] reference +/// Needs a list of Options of [DataTypeName] and a [csv::Reader] reference, as well as a [Dictionary][crate::physical::dictionary::Dictionary] +/// # Parameters +/// * `datatypes` this is a list of [`DataTypeName`] options, which needs to match the number of fields in the csv-file. +/// If the Option is [`None`] the field will be ignored. [`Some(DataTypeName)`] describes the datatype of the field in the csv-file. +/// # Behaviour +/// If a given datatype from `datatypes` is not matching the value in the field (i.e. it cannot be parsed into such a value), the whole line will be ignored and an error message is emitted to the log. pub fn read( datatypes: &[Option], csv_reader: &mut Reader, + dict: &mut dyn Dictionary, ) -> Result, Error> where T: std::io::Read, @@ -21,6 +28,7 @@ where DataTypeName::U64 => VecT::U64(Vec::new()), DataTypeName::Float => VecT::Float(Vec::new()), DataTypeName::Double => VecT::Double(Vec::new()), + DataTypeName::String => VecT::String(Vec::new()), }) })); }); @@ -30,7 +38,7 @@ where if let Err(Error::RollBack(rollback)) = row.iter().enumerate().try_for_each(|(idx, item)| { if let Some(datatype) = datatypes[idx] { - match datatype.parse(item) { + match datatype.parse(item, dict) { Ok(val) => { result[idx].as_mut().map(|vect| { vect.push(&val); @@ -62,6 +70,7 @@ where #[cfg(test)] mod test { use super::*; + use crate::physical::dictionary::PrefixedStringDictionary; use csv::ReaderBuilder; use quickcheck_macros::quickcheck; use test_log::test; @@ -76,7 +85,11 @@ Boston;United States;4628910 .delimiter(b';') .from_reader(data.as_bytes()); - let x = read(&[None, None, None], &mut rdr); + let x = read( + &[None, None, None], + &mut rdr, + &mut PrefixedStringDictionary::new(), + ); assert!(x.is_ok()); assert_eq!(x.unwrap().len(), 0); } @@ -87,7 +100,7 @@ Boston;United States;4628910 let data = "\ 10;20;30;40;20;valid asdf;12.2;413;22.3;23;invalid -node01;22;33.33;12.333332;10;valid +node01;22;33.33;12.333332;10;valid again node02;1312;12.33;313;1431;valid node03;123;123;13;55;123;invalid "; @@ -103,14 +116,40 @@ node03;123;123;13;55;123;invalid Some(DataTypeName::Double), Some(DataTypeName::Float), Some(DataTypeName::U64), - None, + Some(DataTypeName::String), ], &mut rdr, + &mut PrefixedStringDictionary::new(), ); assert!(imported.is_ok()); - assert_eq!(imported.as_ref().unwrap().len(), 4); + assert_eq!(imported.as_ref().unwrap().len(), 5); assert_eq!(imported.as_ref().unwrap()[0].len(), 3); + log::debug!("imported: {:?}", imported); + assert_eq!( + imported.as_ref().unwrap()[4] + .get(0) + .map(|v| v.as_string().unwrap()), + Some(0usize) + ); + assert_eq!( + imported.as_ref().unwrap()[4] + .get(1) + .map(|v| v.as_string().unwrap()), + Some(1usize) + ); + assert_eq!( + imported.as_ref().unwrap()[4] + .get(2) + .map(|v| v.as_string().unwrap()), + Some(0usize) + ); + assert_eq!( + imported.as_ref().unwrap()[4] + .get(3) + .map(|v| v.as_string().unwrap()), + None + ); } #[quickcheck] @@ -151,6 +190,7 @@ node03;123;123;13;55;123;invalid Some(DataTypeName::Float), ], &mut rdr, + &mut PrefixedStringDictionary::new(), ); assert!(imported.is_ok()); diff --git a/src/physical/columns/adaptive_column_builder.rs b/src/physical/columns/adaptive_column_builder.rs index 68079c4e1..85e2db500 100644 --- a/src/physical/columns/adaptive_column_builder.rs +++ b/src/physical/columns/adaptive_column_builder.rs @@ -132,6 +132,8 @@ pub enum AdaptiveColumnBuilderT { Float(AdaptiveColumnBuilder), /// Case Double Double(AdaptiveColumnBuilder), + /// Case String + String(AdaptiveColumnBuilder), } impl AdaptiveColumnBuilderT { @@ -141,6 +143,7 @@ impl AdaptiveColumnBuilderT { DataTypeName::U64 => Self::U64(AdaptiveColumnBuilder::new()), DataTypeName::Float => Self::Float(AdaptiveColumnBuilder::new()), DataTypeName::Double => Self::Double(AdaptiveColumnBuilder::new()), + DataTypeName::String => Self::String(AdaptiveColumnBuilder::new()), } } @@ -168,6 +171,13 @@ impl AdaptiveColumnBuilderT { panic!("value does not match AdaptiveColumn type"); } } + Self::String(cb) => { + cb.add( + value + .as_string() + .expect("Value does not match AdaptiveColumn type"), + ); + } } } @@ -177,6 +187,7 @@ impl AdaptiveColumnBuilderT { Self::U64(cb) => cb.count(), Self::Float(cb) => cb.count(), Self::Double(cb) => cb.count(), + Self::String(cb) => cb.count(), } } } diff --git a/src/physical/columns/column.rs b/src/physical/columns/column.rs index 87ca5c737..688652824 100644 --- a/src/physical/columns/column.rs +++ b/src/physical/columns/column.rs @@ -80,6 +80,8 @@ pub enum ColumnT { Float(ColumnEnum), /// Case ColumnEnum Double(ColumnEnum), + /// Case ColumnEnum + String(ColumnEnum), } generate_datatype_forwarder!(forward_to_column_enum); diff --git a/src/physical/columns/interval_column.rs b/src/physical/columns/interval_column.rs index ed061ad9c..a79129a2f 100644 --- a/src/physical/columns/interval_column.rs +++ b/src/physical/columns/interval_column.rs @@ -98,6 +98,8 @@ pub enum IntervalColumnT { Float(IntervalColumnEnum), /// Case Double Double(IntervalColumnEnum), + /// Case String + String(IntervalColumnEnum), } generate_datatype_forwarder!(forward_to_interval_column_enum); diff --git a/src/physical/columns/ranged_column_scan.rs b/src/physical/columns/ranged_column_scan.rs index 3f028220c..3c666f188 100644 --- a/src/physical/columns/ranged_column_scan.rs +++ b/src/physical/columns/ranged_column_scan.rs @@ -354,6 +354,8 @@ pub enum RangedColumnScanT<'a> { Float(RangedColumnScanCell<'a, Float>), /// Case Double Double(RangedColumnScanCell<'a, Double>), + /// Case String + String(RangedColumnScanCell<'a, usize>), } generate_datatype_forwarder!(forward_to_ranged_column_scan_cell); @@ -399,18 +401,25 @@ impl<'a> ColumnScan for RangedColumnScanT<'a> { match self { Self::U64(cs) => match value { Self::Item::U64(val) => cs.seek(val).map(DataValueT::U64), - Self::Item::Float(_val) => None, - Self::Item::Double(_val) => None, + Self::Item::Float(_) => None, + Self::Item::Double(_) => None, + Self::Item::String(_) => None, }, Self::Float(cs) => match value { - Self::Item::U64(_val) => None, + Self::Item::U64(_) => None, Self::Item::Float(val) => cs.seek(val).map(DataValueT::Float), - Self::Item::Double(_val) => None, + Self::Item::Double(_) => None, + Self::Item::String(_) => None, }, Self::Double(cs) => match value { - Self::Item::U64(_val) => None, - Self::Item::Float(_val) => None, + Self::Item::U64(_) => None, + Self::Item::Float(_) => None, Self::Item::Double(val) => cs.seek(val).map(DataValueT::Double), + Self::Item::String(_) => None, + }, + Self::String(cs) => match value { + Self::Item::String(val) => cs.seek(val).map(DataValueT::String), + _ => None, // no type mixing allowed, so in any other case it should be [None] }, } } diff --git a/src/physical/datatypes/data_type_name.rs b/src/physical/datatypes/data_type_name.rs index d1e45412c..3169ec15a 100644 --- a/src/physical/datatypes/data_type_name.rs +++ b/src/physical/datatypes/data_type_name.rs @@ -1,6 +1,7 @@ use crate::error::Error; use super::DataValueT; +use crate::physical::dictionary::Dictionary; /// Descriptors to refer to the possible data types at runtime. #[derive(Clone, Copy, Debug, Ord, PartialOrd, Eq, PartialEq)] @@ -11,15 +12,18 @@ pub enum DataTypeName { Float, /// Data type [`super::double::Double`] Double, + /// Data type `String`, uses [`usize`] and a [dictionary][crate::physical::dictionary::Dictionary] + String, } impl DataTypeName { /// Parses a string, based on the name of the Datatype - pub fn parse(&self, string: &str) -> Result { + pub fn parse(&self, string: &str, dict: &mut dyn Dictionary) -> Result { Ok(match self { DataTypeName::U64 => DataValueT::U64(string.parse::()?), DataTypeName::Float => DataValueT::Float(super::Float::new(string.parse::()?)?), DataTypeName::Double => DataValueT::Double(super::Double::new(string.parse::()?)?), + DataTypeName::String => DataValueT::String(dict.add(string.to_string())), }) } } diff --git a/src/physical/datatypes/data_value.rs b/src/physical/datatypes/data_value.rs index 8c9d424d8..2696d30f6 100644 --- a/src/physical/datatypes/data_value.rs +++ b/src/physical/datatypes/data_value.rs @@ -18,10 +18,12 @@ pub enum DataValueT { Float(Float), /// Case Double Double(Double), + /// Case String + String(usize), } impl DataValueT { - /// Returns either `Option` or `None` + /// Returns either [`Option`], answering whether the [`DataValueT`] is of this datatype pub fn as_u64(&self) -> Option { match *self { DataValueT::U64(val) => Some(val), @@ -29,7 +31,7 @@ impl DataValueT { } } - /// Returns either `Option` or `None` + /// Returns either [`Option`], answering whether [`DataValueT`] is of this datatype pub fn as_float(&self) -> Option { match *self { DataValueT::Float(val) => Some(val), @@ -37,7 +39,7 @@ impl DataValueT { } } - /// Returns either `Option` or `None` + /// Returns either [`Option`], answering whether the [`DataValueT`] is of this datatype pub fn as_double(&self) -> Option { match *self { DataValueT::Double(val) => Some(val), @@ -45,12 +47,21 @@ impl DataValueT { } } + /// Returns an [`Option`] , answering whether the [`DataValueT`] is of this datatype + pub fn as_string(&self) -> Option { + match *self { + DataValueT::String(val) => Some(val), + _ => None, + } + } + /// Compares its value with another given [`DataValueT`] pub fn compare(&self, other: &Self) -> Option { match self { DataValueT::U64(val) => other.as_u64().map(|otherval| val.cmp(&otherval)), DataValueT::Float(val) => other.as_float().map(|otherval| val.cmp(&otherval)), DataValueT::Double(val) => other.as_double().map(|otherval| val.cmp(&otherval)), + DataValueT::String(val) => other.as_string().map(|otherval| val.cmp(&otherval)), } } @@ -60,6 +71,7 @@ impl DataValueT { Self::U64(_) => DataTypeName::U64, Self::Float(_) => DataTypeName::Float, Self::Double(_) => DataTypeName::Double, + Self::String(_) => DataTypeName::String, } } } @@ -70,6 +82,7 @@ impl std::fmt::Display for DataValueT { Self::U64(val) => write!(f, "{}", val), Self::Float(val) => write!(f, "{}", val), Self::Double(val) => write!(f, "{}", val), + Self::String(val) => write!(f, "str{}", val), } } } @@ -83,6 +96,8 @@ pub enum VecT { Float(Vec), /// Case Vec Double(Vec), + /// Case Vec + String(Vec), } generate_datatype_forwarder!(forward_to_vec); @@ -94,6 +109,7 @@ impl VecT { DataTypeName::U64 => Self::U64(Vec::new()), DataTypeName::Float => Self::Float(Vec::new()), DataTypeName::Double => Self::Double(Vec::new()), + DataTypeName::String => Self::String(Vec::new()), } } @@ -103,6 +119,7 @@ impl VecT { Self::U64(_) => DataTypeName::U64, Self::Float(_) => DataTypeName::Float, Self::Double(_) => DataTypeName::Double, + Self::String(_) => DataTypeName::String, } } @@ -117,6 +134,7 @@ impl VecT { VecT::U64(vec) => vec.get(index).copied().map(DataValueT::U64), VecT::Float(vec) => vec.get(index).copied().map(DataValueT::Float), VecT::Double(vec) => vec.get(index).copied().map(DataValueT::Double), + VecT::String(vec) => vec.get(index).copied().map(DataValueT::String), } } @@ -124,9 +142,20 @@ impl VecT { /// Note that it is not checked if the [DataValueT] has the right enum-variant pub(crate) fn push(&mut self, value: &DataValueT) { match self { - VecT::U64(vec) => vec.push(value.as_u64().unwrap()), - VecT::Float(vec) => vec.push(value.as_float().unwrap()), - VecT::Double(vec) => vec.push(value.as_double().unwrap()), + VecT::U64(vec) => { + vec.push(value.as_u64().expect( + "expecting VecT::U64 and DataValueT::U64, but DataValueT does not match", + )) + } + VecT::Float(vec) => vec.push(value.as_float().expect( + "expecting VecT::Float and DataValueT::Float, but DataValueT does not match", + )), + VecT::Double(vec) => vec.push(value.as_double().expect( + "expecting VecT::Double and DataValueT::Double, but DataValueT does not match", + )), + VecT::String(vec) => vec.push(value.as_string().expect( + "expecting VecT::String and DataValueT::String, but DataValueT does not match", + )), }; } @@ -153,6 +182,9 @@ impl VecT { VecT::Double(vec) => vec .get(idx_a) .and_then(|&val_a| vec.get(idx_b).map(|val_b| val_a.cmp(val_b))), + VecT::String(vec) => vec + .get(idx_a) + .and_then(|&val_a| vec.get(idx_b).map(|val_b| val_a.cmp(val_b))), } } } diff --git a/src/physical/dictionary.rs b/src/physical/dictionary.rs index f2f329fa3..f1cbdc788 100644 --- a/src/physical/dictionary.rs +++ b/src/physical/dictionary.rs @@ -11,9 +11,12 @@ pub mod string_dictionary; pub use string_dictionary::StringDictionary; /// This Dictionary Trait defines dictionaries, which keep ownership of the inserted elements. -pub trait Dictionary: Default { - /// Initialize a new Dictionary - fn init() -> Self { +pub trait Dictionary { + /// Construct a new and empty [`Dictionary`] + fn new() -> Self + where + Self: Sized + Default, + { Self::default() } /// Add a new string to the dictionary diff --git a/src/physical/tables/materialize.rs b/src/physical/tables/materialize.rs index f9642d57e..e894453f8 100644 --- a/src/physical/tables/materialize.rs +++ b/src/physical/tables/materialize.rs @@ -34,6 +34,8 @@ pub fn materialize(trie_scan: &mut TrieScanEnum) -> Trie { .push(AdaptiveColumnBuilderT::Float(AdaptiveColumnBuilder::new())), DataTypeName::Double => data_column_builders .push(AdaptiveColumnBuilderT::Double(AdaptiveColumnBuilder::new())), + DataTypeName::String => data_column_builders + .push(AdaptiveColumnBuilderT::String(AdaptiveColumnBuilder::new())), } } diff --git a/src/physical/tables/trie.rs b/src/physical/tables/trie.rs index 2cc4a1834..e55fadff6 100644 --- a/src/physical/tables/trie.rs +++ b/src/physical/tables/trie.rs @@ -181,7 +181,7 @@ impl Table for Trie { .map(|_| { let empty_data_col = AdaptiveColumnBuilderT::new(DataTypeName::U64); let empty_interval_col = AdaptiveColumnBuilder::::new(); - build_interval_column!(empty_data_col, empty_interval_col; U64; Float; Double) + build_interval_column!(empty_data_col, empty_interval_col; U64; Float; Double; String) }) .collect(), ); @@ -201,6 +201,9 @@ impl Table for Trie { VecT::Double(vec) => VecT::Double(permutator.permutate(vec).expect( "length matches since permutator is constructed from these vectores", )), + VecT::String(vec) => VecT::String(permutator.permutate(vec).expect( + "length matches since permutator is constructed from these vectors", + )), }) .collect(); @@ -291,7 +294,7 @@ impl Table for Trie { condensed_data_builders .into_iter() .zip(condensed_interval_starts_builders) - .map(|(col, iv)| build_interval_column!(col, iv; U64; Float; Double)) + .map(|(col, iv)| build_interval_column!(col, iv; U64; Float; Double; String)) .collect(), ) } diff --git a/src/physical/tables/trie_difference.rs b/src/physical/tables/trie_difference.rs index ca99c5346..6457fa071 100644 --- a/src/physical/tables/trie_difference.rs +++ b/src/physical/tables/trie_difference.rs @@ -67,6 +67,7 @@ impl<'a> TrieDifference<'a> { DataTypeName::U64 => init_scans_for_datatype!(U64), DataTypeName::Float => init_scans_for_datatype!(Float), DataTypeName::Double => init_scans_for_datatype!(Double), + DataTypeName::String => init_scans_for_datatype!(String), }; } @@ -157,6 +158,7 @@ impl<'a> TrieScan<'a> for TrieDifference<'a> { DataTypeName::U64 => down_for_datatype!(U64), DataTypeName::Float => down_for_datatype!(Float), DataTypeName::Double => down_for_datatype!(Double), + DataTypeName::String => down_for_datatype!(String), } } else { self.difference_scans[next_layer].get_mut().reset(); diff --git a/src/physical/tables/trie_join.rs b/src/physical/tables/trie_join.rs index e277c0f0d..2e4167716 100644 --- a/src/physical/tables/trie_join.rs +++ b/src/physical/tables/trie_join.rs @@ -88,6 +88,7 @@ impl<'a> TrieJoin<'a> { DataTypeName::U64 => merge_join_for_datatype!(U64, u64), DataTypeName::Float => merge_join_for_datatype!(Float, Float), DataTypeName::Double => merge_join_for_datatype!(Double, Double), + DataTypeName::String => merge_join_for_datatype!(String, usize), } } @@ -137,9 +138,10 @@ impl<'a> TrieScan<'a> for TrieJoin<'a> { debug_assert!(self.current_variable.is_some()); match self.target_schema.get_type(self.current_variable?) { - DataTypeName::U64 | DataTypeName::Float | DataTypeName::Double => { - Some(&self.merge_joins[self.current_variable?]) - } + DataTypeName::U64 + | DataTypeName::Float + | DataTypeName::Double + | DataTypeName::String => Some(&self.merge_joins[self.current_variable?]), } } diff --git a/src/physical/tables/trie_project.rs b/src/physical/tables/trie_project.rs index 6507721d6..4c4947449 100644 --- a/src/physical/tables/trie_project.rs +++ b/src/physical/tables/trie_project.rs @@ -35,6 +35,7 @@ fn shrink_position(column: &IntervalColumnT, pos: usize) -> usize { IntervalColumnT::U64(col) => shrink_position_t(col, pos), IntervalColumnT::Float(col) => shrink_position_t(col, pos), IntervalColumnT::Double(col) => shrink_position_t(col, pos), + IntervalColumnT::String(col) => shrink_position_t(col, pos), } } @@ -87,6 +88,7 @@ impl<'a> TrieProject<'a> { DataTypeName::U64 => init_scans_for_datatype!(U64), DataTypeName::Float => init_scans_for_datatype!(Float), DataTypeName::Double => init_scans_for_datatype!(Double), + DataTypeName::String => init_scans_for_datatype!(String), } } let target_schema = TrieSchema::new(target_attributes); @@ -188,6 +190,7 @@ impl<'a> TrieScan<'a> for TrieProject<'a> { IntervalColumnT::U64(_) => down_for_datatype!(U64), IntervalColumnT::Float(_) => down_for_datatype!(Float), IntervalColumnT::Double(_) => down_for_datatype!(Double), + IntervalColumnT::String(_) => down_for_datatype!(String), } self.current_layer = Some(next_layer); diff --git a/src/physical/tables/trie_select.rs b/src/physical/tables/trie_select.rs index 97f0123aa..062ee7e6a 100644 --- a/src/physical/tables/trie_select.rs +++ b/src/physical/tables/trie_select.rs @@ -46,6 +46,7 @@ impl<'a> TrieSelectEqual<'a> { DataTypeName::U64 => init_scans_for_datatype!(U64), DataTypeName::Float => init_scans_for_datatype!(Float), DataTypeName::Double => init_scans_for_datatype!(Double), + DataTypeName::String => init_scans_for_datatype!(String), }; } @@ -87,6 +88,7 @@ impl<'a> TrieSelectEqual<'a> { DataTypeName::U64 => init_scans_for_datatype!(U64), DataTypeName::Float => init_scans_for_datatype!(Float), DataTypeName::Double => init_scans_for_datatype!(Double), + DataTypeName::String => init_scans_for_datatype!(String), } } } @@ -182,6 +184,7 @@ impl<'a> TrieSelectValue<'a> { DataTypeName::U64 => init_scans_for_datatype!(U64), DataTypeName::Float => init_scans_for_datatype!(Float), DataTypeName::Double => init_scans_for_datatype!(Double), + DataTypeName::String => init_scans_for_datatype!(String), } } @@ -217,6 +220,7 @@ impl<'a> TrieSelectValue<'a> { DataTypeName::U64 => init_scans_for_datatype!(U64), DataTypeName::Float => init_scans_for_datatype!(Float), DataTypeName::Double => init_scans_for_datatype!(Double), + DataTypeName::String => init_scans_for_datatype!(String), } } diff --git a/src/physical/tables/trie_union.rs b/src/physical/tables/trie_union.rs index 161ec551e..afc56df26 100644 --- a/src/physical/tables/trie_union.rs +++ b/src/physical/tables/trie_union.rs @@ -58,6 +58,7 @@ impl<'a> TrieUnion<'a> { DataTypeName::U64 => init_scans_for_datatype!(U64, u64), DataTypeName::Float => init_scans_for_datatype!(Float, Float), DataTypeName::Double => init_scans_for_datatype!(Double, Double), + DataTypeName::String => init_scans_for_datatype!(String, usize), }; } diff --git a/src/physical/util.rs b/src/physical/util.rs index 3b2d1a586..7ce3c2143 100644 --- a/src/physical/util.rs +++ b/src/physical/util.rs @@ -92,6 +92,7 @@ macro_rules! generate_datatype_forwarder { $crate::generate_forwarder!($name; U64, Float, - Double); + Double, + String); } }