Skip to content

Commit

Permalink
add decimal support
Browse files Browse the repository at this point in the history
  • Loading branch information
suharev7 committed Jul 16, 2019
1 parent cb74aab commit 69d4f99
Show file tree
Hide file tree
Showing 14 changed files with 794 additions and 14 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "clickhouse-rs"
version = "0.1.12"
version = "0.1.13"
authors = ["Mikhail Sukharev <suharev7@gmail.com>"]
license = "MIT"
homepage = "https://github.com/suharev7/clickhouse-rs"
Expand Down
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ clickhouse-rs = "*"

* Date
* DateTime
* Decimal(P, S)
* Float32, Float64
* String, FixedString(N)
* UInt8, UInt16, UInt32, UInt64, Int8, Int16, Int32, Int64
Expand Down
1 change: 1 addition & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
//!
//! * Date
//! * DateTime
//! * Decimal(P, S)
//! * Float32, Float64
//! * String, FixedString(N)
//! * UInt8, UInt16, UInt32, UInt64, Int8, Int16, Int32, Int64
Expand Down
278 changes: 278 additions & 0 deletions src/types/column/decimal.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,278 @@
use crate::{
binary::{Encoder, ReadEx},
errors::Error,
types::{
column::{
list::List, nullable::NullableColumnData, BoxColumnWrapper, ColumnFrom, ColumnWrapper,
Either, VectorColumnData,
},
decimal::{Decimal, NoBits},
from_sql::FromSql,
Column, SqlType, Value, ValueRef,
},
};
use chrono_tz::Tz;

use super::column_data::ColumnData;

pub(crate) struct DecimalColumnData {
inner: Box<dyn ColumnData + Send + Sync>,
precision: u8,
scale: u8,
nobits: NoBits,
}

pub(crate) struct DecimalAdapter {
pub(crate) column: Column,
pub(crate) precision: u8,
pub(crate) scale: u8,
pub(crate) nobits: NoBits,
}

pub(crate) struct NullableDecimalAdapter {
pub(crate) column: Column,
pub(crate) precision: u8,
pub(crate) scale: u8,
pub(crate) nobits: NoBits,
}

impl DecimalColumnData {
pub(crate) fn load<T: ReadEx>(
reader: &mut T,
precision: u8,
scale: u8,
nobits: NoBits,
size: usize,
tz: Tz,
) -> Result<Self, Error> {
let type_name = match nobits {
NoBits::N32 => "Int32",
NoBits::N64 => "Int64",
};
let inner = ColumnData::load_data::<BoxColumnWrapper, _>(reader, type_name, size, tz)?;

Ok(DecimalColumnData {
inner,
precision,
scale,
nobits,
})
}
}

impl ColumnFrom for Vec<Decimal> {
fn column_from<W: ColumnWrapper>(source: Self) -> W::Wrapper {
let mut data = List::<i64>::with_capacity(source.len());
let mut precision = 18;
let mut opt_scale = None;
for s in source {
precision = std::cmp::max(precision, s.precision);
if let Some(old_scale) = opt_scale {
if old_scale != s.scale {
panic!("scale != scale")
}
} else {
opt_scale = Some(s.scale);
}

data.push(s.internal());
}
let scale = opt_scale.unwrap_or(4);
let inner = Box::new(VectorColumnData { data });

let column = DecimalColumnData {
inner,
precision,
scale,
nobits: NoBits::N64,
};

W::wrap(column)
}
}

impl ColumnFrom for Vec<Option<Decimal>> {
fn column_from<W: ColumnWrapper>(source: Self) -> W::Wrapper {
let mut nulls: Vec<u8> = Vec::with_capacity(source.len());
let mut data = List::<i64>::with_capacity(source.len());
let mut precision = 18;
let mut opt_scale = None;
for os in source {
if let Some(s) = os {
precision = std::cmp::max(precision, s.precision);
if let Some(old_scale) = opt_scale {
if old_scale != s.scale {
panic!("scale != scale")
}
} else {
opt_scale = Some(s.scale);
}

data.push(s.internal());
nulls.push(0);
} else {
data.push(0);
nulls.push(1);
}
}
let scale = opt_scale.unwrap_or(4);
let inner = Box::new(VectorColumnData { data });

let inner = DecimalColumnData {
inner,
precision,
scale,
nobits: NoBits::N64,
};

W::wrap(NullableColumnData {
nulls,
inner: Box::new(inner),
})
}
}

impl ColumnData for DecimalColumnData {
fn sql_type(&self) -> SqlType {
SqlType::Decimal(self.precision, self.scale)
}

fn save(&self, encoder: &mut Encoder, start: usize, end: usize) {
self.inner.save(encoder, start, end)
}

fn len(&self) -> usize {
self.inner.len()
}

fn push(&mut self, value: Value) {
if let Value::Decimal(decimal) = value {
match self.nobits {
NoBits::N32 => {
let internal: i32 = decimal.internal();
self.inner.push(internal.into())
}
NoBits::N64 => {
let internal: i64 = decimal.internal();
self.inner.push(internal.into())
}
}
} else {
panic!("value should be decimal ({:?})", value);
}
}

fn at(&self, index: usize) -> ValueRef {
let underlying: i64 = match self.nobits {
NoBits::N32 => i64::from(i32::from(self.inner.at(index))),
NoBits::N64 => i64::from(self.inner.at(index)),
};

ValueRef::Decimal(Decimal {
underlying,
precision: self.precision,
scale: self.scale,
nobits: self.nobits,
})
}
}

impl ColumnData for DecimalAdapter {
fn sql_type(&self) -> SqlType {
SqlType::Decimal(self.precision, self.scale)
}

fn save(&self, encoder: &mut Encoder, start: usize, end: usize) {
for i in start..end {
if let ValueRef::Decimal(decimal) = self.at(i) {
match self.nobits {
NoBits::N32 => {
let internal: i32 = decimal.internal();
encoder.write(internal);
}
NoBits::N64 => {
let internal: i64 = decimal.internal();
encoder.write(internal);
}
}
} else {
panic!("should be decimal");
}
}
}

fn len(&self) -> usize {
self.column.len()
}

fn push(&mut self, _: Value) {
unimplemented!()
}

fn at(&self, index: usize) -> ValueRef {
if let ValueRef::Decimal(decimal) = self.column.at(index) {
let mut d = decimal.set_scale(self.scale);
d.precision = self.precision;
d.nobits = self.nobits;
ValueRef::Decimal(d)
} else {
panic!("should be decimal");
}
}
}

impl ColumnData for NullableDecimalAdapter {
fn sql_type(&self) -> SqlType {
SqlType::Nullable(SqlType::Decimal(self.precision, self.scale).into())
}

fn save(&self, encoder: &mut Encoder, start: usize, end: usize) {
let size = end - start;
let mut nulls = vec![0; size];
let mut values: Vec<Option<Decimal>> = vec![None; size];

for (i, index) in (start..end).enumerate() {
values[i] = Option::from_sql(self.at(index)).unwrap();
if values[i].is_none() {
nulls[i] = 1;
}
}

encoder.write_bytes(nulls.as_ref());

for value in values {
let underlying = if let Some(v) = value { v.underlying } else { 0 };

match self.nobits {
NoBits::N32 => {
encoder.write(underlying as i32);
}
NoBits::N64 => {
encoder.write(underlying);
}
}
}
}

fn len(&self) -> usize {
self.column.len()
}

fn push(&mut self, _: Value) {
unimplemented!()
}

fn at(&self, index: usize) -> ValueRef {
let value: Option<Decimal> = Option::from_sql(self.column.at(index)).unwrap();
match value {
None => ValueRef::Nullable(Either::Left(self.sql_type())),
Some(mut v) => {
v = v.set_scale(self.scale);
v.precision = self.precision;
v.nobits = self.nobits;
let inner = ValueRef::Decimal(v);
ValueRef::Nullable(Either::Right(Box::new(inner)))
}
}
}
}
67 changes: 66 additions & 1 deletion src/types/column/factory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,12 @@ use super::{
column_data::ColumnData, date::DateColumnData, nullable::NullableColumnData,
numeric::VectorColumnData, string::StringColumnData, ColumnWrapper,
};
use crate::types::column::{array::ArrayColumnData, fixed_string::FixedStringColumnData};
use crate::types::{
column::{
array::ArrayColumnData, decimal::DecimalColumnData, fixed_string::FixedStringColumnData,
},
decimal::NoBits,
};

impl dyn ColumnData {
pub(crate) fn load_data<W: ColumnWrapper, T: ReadEx>(
Expand Down Expand Up @@ -36,6 +41,10 @@ impl dyn ColumnData {
W::wrap(FixedStringColumnData::load(reader, size, str_len)?)
} else if let Some(inner_type) = parse_array_type(type_name) {
W::wrap(ArrayColumnData::load(reader, inner_type, size, tz)?)
} else if let Some((precision, scale, nobits)) = parse_decimal(type_name) {
W::wrap(DecimalColumnData::load(
reader, precision, scale, nobits, size, tz,
)?)
} else {
let message = format!("Unsupported column type \"{}\".", type_name);
return Err(message.into());
Expand Down Expand Up @@ -80,10 +89,66 @@ fn parse_array_type(source: &str) -> Option<&str> {
Some(inner_type)
}

fn parse_decimal(source: &str) -> Option<(u8, u8, NoBits)> {
if source.len() < 12 {
return None;
}

if !source.starts_with("Decimal") {
return None;
}

if source.chars().nth(7) != Some('(') {
return None;
}

if !source.ends_with(')') {
return None;
}

let mut params: Vec<u8> = Vec::with_capacity(2);
for cell in source[8..source.len() - 1].split(',').map(|s| s.trim()) {
if let Ok(value) = cell.parse() {
params.push(value)
} else {
return None;
}
}

if params.len() != 2 {
return None;
}

let precision = params[0];
let scale = params[1];

if scale > precision {
return None;
}

if let Some(nobits) = NoBits::from_precision(precision) {
return Some((precision, scale, nobits));
}

None
}

#[cfg(test)]
mod test {
use super::*;

#[test]
fn test_parse_decimal() {
assert_eq!(parse_decimal("Decimal(9, 4)"), Some((9, 4, NoBits::N32)));
assert_eq!(parse_decimal("Decimal(10, 4)"), Some((10, 4, NoBits::N64)));
assert_eq!(parse_decimal("Decimal(20, 4)"), None);
assert_eq!(parse_decimal("Decimal(2000, 4)"), None);
assert_eq!(parse_decimal("Decimal(3, 4)"), None);
assert_eq!(parse_decimal("Decimal(20, -4)"), None);
assert_eq!(parse_decimal("Decimal(0)"), None);
assert_eq!(parse_decimal("Decimal(1, 2, 3)"), None);
}

#[test]
fn test_parse_array_type() {
assert_eq!(parse_array_type("Array(UInt8)"), Some("UInt8"));
Expand Down
Loading

0 comments on commit 69d4f99

Please sign in to comment.