Skip to content

Commit

Permalink
fix(sink): fix starrocks doris and clickhouse decimal (#15664)
Browse files Browse the repository at this point in the history
  • Loading branch information
xxhZs committed Apr 11, 2024
1 parent 331f079 commit 9dfbe22
Show file tree
Hide file tree
Showing 9 changed files with 54 additions and 110 deletions.
16 changes: 8 additions & 8 deletions ci/scripts/e2e-clickhouse-sink-test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ sleep 1
echo "--- create clickhouse table"
curl https://clickhouse.com/ | sh
sleep 2
./clickhouse client --host=clickhouse-server --port=9000 --query="CREATE table demo_test(v1 Int32,v2 Int64,v3 String,v4 Enum16('A'=1,'B'=2))ENGINE = ReplacingMergeTree PRIMARY KEY (v1);"
./clickhouse client --host=clickhouse-server --port=9000 --query="CREATE table demo_test(v1 Int32,v2 Int64,v3 String,v4 Enum16('A'=1,'B'=2), v5 decimal64(3))ENGINE = ReplacingMergeTree PRIMARY KEY (v1);"

echo "--- testing sinks"
sqllogictest -p 4566 -d dev './e2e_test/sink/clickhouse_sink.slt'
Expand All @@ -41,13 +41,13 @@ sleep 5

# check sink destination using shell
if cat ./query_result.csv | sort | awk -F "," '{
if ($1 == 1 && $2 == 50 && $3 == "\"1-50\"" && $4 == "\"A\"") c1++;
if ($1 == 13 && $2 == 2 && $3 == "\"13-2\"" && $4 == "\"B\"") c2++;
if ($1 == 2 && $2 == 2 && $3 == "\"2-2\"" && $4 == "\"B\"") c3++;
if ($1 == 21 && $2 == 2 && $3 == "\"21-2\"" && $4 == "\"A\"") c4++;
if ($1 == 3 && $2 == 2 && $3 == "\"3-2\"" && $4 == "\"A\"") c5++;
if ($1 == 5 && $2 == 2 && $3 == "\"5-2\"" && $4 == "\"B\"") c6++;
if ($1 == 8 && $2 == 2 && $3 == "\"8-2\"" && $4 == "\"A\"") c7++; }
if ($1 == 1 && $2 == 50 && $3 == "\"1-50\"" && $4 == "\"A\"" && $5 == 1.1) c1++;
if ($1 == 13 && $2 == 2 && $3 == "\"13-2\"" && $4 == "\"B\"" && $5 == 0) c2++;
if ($1 == 2 && $2 == 2 && $3 == "\"2-2\"" && $4 == "\"B\"" && $5 == 2.2) c3++;
if ($1 == 21 && $2 == 2 && $3 == "\"21-2\"" && $4 == "\"A\"" && $5 == 0) c4++;
if ($1 == 3 && $2 == 2 && $3 == "\"3-2\"" && $4 == "\"A\"" && $5 == 3.3) c5++;
if ($1 == 5 && $2 == 2 && $3 == "\"5-2\"" && $4 == "\"B\"" && $5 == 4.4) c6++;
if ($1 == 8 && $2 == 2 && $3 == "\"8-2\"" && $4 == "\"A\"" && $5 == 0) c7++; }
END { exit !(c1 == 1 && c2 == 1 && c3 == 1 && c4 == 1 && c5 == 1 && c6 == 1 && c7 == 1); }'; then
echo "Clickhouse sink check passed"
else
Expand Down
4 changes: 2 additions & 2 deletions ci/scripts/e2e-starrocks-sink-test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ echo "--- create starrocks table"
apt-get update -y && apt-get install -y mysql-client
sleep 2
mysql -uroot -P 9030 -h starrocks-fe-server -e "CREATE database demo;use demo;
CREATE table demo_bhv_table(v1 int,v2 smallint,v3 bigint,v4 float,v5 double,v6 string,v7 date,v8 datetime,v9 boolean,v10 json) ENGINE=OLAP
CREATE table demo_bhv_table(v1 int,v2 smallint,v3 bigint,v4 float,v5 double,v6 string,v7 date,v8 datetime,v9 boolean,v10 json,v11 decimal(10,5)) ENGINE=OLAP
PRIMARY KEY(\`v1\`)
DISTRIBUTED BY HASH(\`v1\`) properties(\"replication_num\" = \"1\");
CREATE USER 'users'@'%' IDENTIFIED BY '123456';
Expand All @@ -46,7 +46,7 @@ mysql -uroot -P 9030 -h starrocks-fe-server -e "select * from demo.demo_bhv_tabl


if cat ./query_result.csv | sed '1d; s/\t/,/g' | awk -F "," '{
exit !($1 == 1 && $2 == 1 && $3 == 1 && $4 == 1.1 && $5 == 1.2 && $6 == "test" && $7 == "2013-01-01" && $8 == "2013-01-01 01:01:01" && $9 == 0 && $10 = "{"v101": 100}"); }'; then
exit !($1 == 1 && $2 == 1 && $3 == 1 && $4 == 1.1 && $5 == 1.2 && $6 == "test" && $7 == "2013-01-01" && $8 == "2013-01-01 01:01:01" && $9 == 0 && $10 = "{"v101": 100}" && $11 == 1.12346); }'; then
echo "Starrocks sink check passed"
else
cat ./query_result.csv
Expand Down
6 changes: 3 additions & 3 deletions e2e_test/sink/clickhouse_sink.slt
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
statement ok
CREATE TABLE t6 (v1 int primary key, v2 bigint, v3 varchar, v4 smallint);
CREATE TABLE t6 (v1 int primary key, v2 bigint, v3 varchar, v4 smallint, v5 decimal);

statement ok
CREATE MATERIALIZED VIEW mv6 AS SELECT * FROM t6;

statement ok
CREATE SINK s6 AS select mv6.v1 as v1, mv6.v2 as v2, mv6.v3 as v3, mv6.v4 as v4 from mv6 WITH (
CREATE SINK s6 AS select mv6.v1 as v1, mv6.v2 as v2, mv6.v3 as v3, mv6.v4 as v4, mv6.v5 as v5 from mv6 WITH (
connector = 'clickhouse',
type = 'append-only',
force_append_only='true',
Expand All @@ -17,7 +17,7 @@ CREATE SINK s6 AS select mv6.v1 as v1, mv6.v2 as v2, mv6.v3 as v3, mv6.v4 as v4
);

statement ok
INSERT INTO t6 VALUES (1, 50, '1-50', 1), (2, 2, '2-2', 2), (3, 2, '3-2', 1), (5, 2, '5-2', 2), (8, 2, '8-2', 1), (13, 2, '13-2', 2), (21, 2, '21-2', 1);
INSERT INTO t6 VALUES (1, 50, '1-50', 1, 1.1), (2, 2, '2-2', 2, 2.2), (3, 2, '3-2', 1, 3.3), (5, 2, '5-2', 2, 4.4), (8, 2, '8-2', 1, 'inf'), (13, 2, '13-2', 2, '-inf'), (21, 2, '21-2', 1, 'nan');

statement ok
FLUSH;
Expand Down
4 changes: 2 additions & 2 deletions e2e_test/sink/starrocks_sink.slt
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
statement ok
CREATE TABLE t6 (v1 int primary key, v2 smallint, v3 bigint, v4 real, v5 float, v6 varchar, v7 date, v8 timestamp, v9 boolean, v10 jsonb);
CREATE TABLE t6 (v1 int primary key, v2 smallint, v3 bigint, v4 real, v5 float, v6 varchar, v7 date, v8 timestamp, v9 boolean, v10 jsonb, v11 decimal);

statement ok
CREATE MATERIALIZED VIEW mv6 AS SELECT * FROM t6;
Expand All @@ -21,7 +21,7 @@ FROM
);

statement ok
INSERT INTO t6 VALUES (1, 1, 1, 1.1, 1.2, 'test', '2013-01-01', '2013-01-01 01:01:01' , false, '{"v101":100}');
INSERT INTO t6 VALUES (1, 1, 1, 1.1, 1.2, 'test', '2013-01-01', '2013-01-01 01:01:01' , false, '{"v101":100}',1.12345678910);

statement ok
FLUSH;
Expand Down
29 changes: 15 additions & 14 deletions src/connector/src/sink/clickhouse.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ use serde::Serialize;
use serde_derive::Deserialize;
use serde_with::serde_as;
use thiserror_ext::AsReport;
use tracing::warn;
use with_options::WithOptions;

use super::{DummySinkCommitCoordinator, SinkWriterParam};
Expand Down Expand Up @@ -747,27 +748,27 @@ impl ClickHouseFieldWithNull {
ScalarRefImpl::Utf8(v) => ClickHouseField::String(v.to_string()),
ScalarRefImpl::Bool(v) => ClickHouseField::Bool(v),
ScalarRefImpl::Decimal(d) => {
if let Decimal::Normalized(d) = d {
let d = if let Decimal::Normalized(d) = d {
let scale =
clickhouse_schema_feature.accuracy_decimal.1 as i32 - d.scale() as i32;

let scale = if scale < 0 {
if scale < 0 {
d.mantissa() / 10_i128.pow(scale.unsigned_abs())
} else {
d.mantissa() * 10_i128.pow(scale as u32)
};

if clickhouse_schema_feature.accuracy_decimal.0 <= 9 {
ClickHouseField::Decimal(ClickHouseDecimal::Decimal32(scale as i32))
} else if clickhouse_schema_feature.accuracy_decimal.0 <= 18 {
ClickHouseField::Decimal(ClickHouseDecimal::Decimal64(scale as i64))
} else {
ClickHouseField::Decimal(ClickHouseDecimal::Decimal128(scale))
}
} else if clickhouse_schema_feature.can_null {
warn!("Inf, -Inf, Nan in RW decimal is converted into clickhouse null!");
return Ok(vec![ClickHouseFieldWithNull::None]);
} else {
return Err(SinkError::ClickHouse(
"clickhouse can not support Decimal NAN,-INF and INF".to_string(),
));
warn!("Inf, -Inf, Nan in RW decimal is converted into clickhouse 0!");
0_i128
};
if clickhouse_schema_feature.accuracy_decimal.0 <= 9 {
ClickHouseField::Decimal(ClickHouseDecimal::Decimal32(d as i32))
} else if clickhouse_schema_feature.accuracy_decimal.0 <= 18 {
ClickHouseField::Decimal(ClickHouseDecimal::Decimal64(d as i64))
} else {
ClickHouseField::Decimal(ClickHouseDecimal::Decimal128(d))
}
}
ScalarRefImpl::Interval(_) => {
Expand Down
6 changes: 2 additions & 4 deletions src/connector/src/sink/doris.rs
Original file line number Diff line number Diff line change
Expand Up @@ -495,11 +495,9 @@ pub struct DorisField {
aggregation_type: String,
}
impl DorisField {
pub fn get_decimal_pre_scale(&self) -> Option<(u8, u8)> {
pub fn get_decimal_pre_scale(&self) -> Option<u8> {
if self.r#type.contains("DECIMAL") {
let a = self.precision.clone().unwrap().parse::<u8>().unwrap();
let b = self.scale.clone().unwrap().parse::<u8>().unwrap();
Some((a, b))
Some(self.scale.clone().unwrap().parse::<u8>().unwrap())
} else {
None
}
Expand Down
45 changes: 17 additions & 28 deletions src/connector/src/sink/encoder/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use itertools::Itertools;
use risingwave_common::array::{ArrayError, ArrayResult};
use risingwave_common::catalog::{Field, Schema};
use risingwave_common::row::Row;
use risingwave_common::types::{DataType, DatumRef, Decimal, JsonbVal, ScalarRefImpl, ToText};
use risingwave_common::types::{DataType, DatumRef, JsonbVal, ScalarRefImpl, ToText};
use risingwave_common::util::iter_util::ZipEqDebug;
use serde_json::{json, Map, Value};
use thiserror_ext::AsReport;
Expand Down Expand Up @@ -83,7 +83,7 @@ impl JsonEncoder {
pub fn new_with_doris(
schema: Schema,
col_indices: Option<Vec<usize>>,
map: HashMap<String, (u8, u8)>,
map: HashMap<String, u8>,
) -> Self {
Self {
schema,
Expand All @@ -97,19 +97,15 @@ impl JsonEncoder {
}
}

pub fn new_with_starrocks(
schema: Schema,
col_indices: Option<Vec<usize>>,
map: HashMap<String, (u8, u8)>,
) -> Self {
pub fn new_with_starrocks(schema: Schema, col_indices: Option<Vec<usize>>) -> Self {
Self {
schema,
col_indices,
time_handling_mode: TimeHandlingMode::Milli,
date_handling_mode: DateHandlingMode::String,
timestamp_handling_mode: TimestampHandlingMode::String,
timestamptz_handling_mode: TimestamptzHandlingMode::UtcWithoutSuffix,
custom_json_type: CustomJsonType::StarRocks(map),
custom_json_type: CustomJsonType::StarRocks,
kafka_connect: None,
}
}
Expand Down Expand Up @@ -242,24 +238,17 @@ fn datum_to_json_object(
(DataType::Varchar, ScalarRefImpl::Utf8(v)) => {
json!(v)
}
// Doris/Starrocks will convert out-of-bounds decimal and -INF, INF, NAN to NULL
(DataType::Decimal, ScalarRefImpl::Decimal(mut v)) => match custom_json_type {
CustomJsonType::Doris(map) | CustomJsonType::StarRocks(map) => {
if !matches!(v, Decimal::Normalized(_)) {
return Err(ArrayError::internal(
"doris/starrocks can't support decimal Inf, -Inf, Nan".to_string(),
));
}
let (p, s) = map.get(&field.name).unwrap();
CustomJsonType::Doris(map) => {
let s = map.get(&field.name).unwrap();
v.rescale(*s as u32);
let v_string = v.to_text();
let len = v_string.clone().replace(['.', '-'], "").len();
if len > *p as usize {
return Err(ArrayError::internal(
format!("rw Decimal's precision is large than doris/starrocks max decimal len is {:?}, doris max is {:?}",v_string.len(),p)));
}
json!(v_string)
json!(v.to_text())
}
CustomJsonType::Es | CustomJsonType::None | CustomJsonType::BigQuery => {
CustomJsonType::Es
| CustomJsonType::None
| CustomJsonType::BigQuery
| CustomJsonType::StarRocks => {
json!(v.to_text())
}
},
Expand Down Expand Up @@ -310,7 +299,7 @@ fn datum_to_json_object(
json!(v.as_iso_8601())
}
(DataType::Jsonb, ScalarRefImpl::Jsonb(jsonb_ref)) => match custom_json_type {
CustomJsonType::Es | CustomJsonType::StarRocks(_) => JsonbVal::from(jsonb_ref).take(),
CustomJsonType::Es | CustomJsonType::StarRocks => JsonbVal::from(jsonb_ref).take(),
CustomJsonType::Doris(_) | CustomJsonType::None | CustomJsonType::BigQuery => {
json!(jsonb_ref.to_string())
}
Expand Down Expand Up @@ -357,7 +346,7 @@ fn datum_to_json_object(
serde_json::to_string(&map).context("failed to serialize into JSON")?,
)
}
CustomJsonType::StarRocks(_) => {
CustomJsonType::StarRocks => {
return Err(ArrayError::internal(
"starrocks can't support struct".to_string(),
));
Expand Down Expand Up @@ -470,8 +459,8 @@ fn type_as_json_schema(rw_type: &DataType) -> Map<String, Value> {
mod tests {

use risingwave_common::types::{
DataType, Date, Interval, Scalar, ScalarImpl, StructRef, StructType, StructValue, Time,
Timestamp,
DataType, Date, Decimal, Interval, Scalar, ScalarImpl, StructRef, StructType, StructValue,
Time, Timestamp,
};

use super::*;
Expand Down Expand Up @@ -639,7 +628,7 @@ mod tests {
assert_eq!(interval_value, json!("P1Y1M2DT0H0M1S"));

let mut map = HashMap::default();
map.insert("aaa".to_string(), (10_u8, 5_u8));
map.insert("aaa".to_string(), 5_u8);
let decimal = datum_to_json_object(
&Field {
data_type: DataType::Decimal,
Expand Down
4 changes: 2 additions & 2 deletions src/connector/src/sink/encoder/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,11 +139,11 @@ pub enum CustomJsonType {
// Doris's json need date is string.
// The internal order of the struct should follow the insertion order.
// The decimal needs verification and calibration.
Doris(HashMap<String, (u8, u8)>),
Doris(HashMap<String, u8>),
// Es's json need jsonb is struct
Es,
// starrocks' need jsonb is struct
StarRocks(HashMap<String, (u8, u8)>),
StarRocks,
// bigquery need null array -> []
BigQuery,
None,
Expand Down
50 changes: 3 additions & 47 deletions src/connector/src/sink/starrocks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ use std::sync::Arc;
use anyhow::anyhow;
use async_trait::async_trait;
use bytes::Bytes;
use itertools::Itertools;
use mysql_async::prelude::Queryable;
use mysql_async::Opts;
use risingwave_common::array::{Op, StreamChunk};
Expand Down Expand Up @@ -223,8 +222,7 @@ impl Sink for StarrocksSink {
self.schema.clone(),
self.pk_indices.clone(),
self.is_append_only,
)
.await?
)?
.into_log_sinker(writer_param.sink_metrics))
}

Expand Down Expand Up @@ -293,54 +291,12 @@ impl TryFrom<SinkParam> for StarrocksSink {
}

impl StarrocksSinkWriter {
pub async fn new(
pub fn new(
config: StarrocksConfig,
schema: Schema,
pk_indices: Vec<usize>,
is_append_only: bool,
) -> Result<Self> {
let mut decimal_map = HashMap::default();
let starrocks_columns = StarrocksSchemaClient::new(
config.common.host.clone(),
config.common.mysql_port.clone(),
config.common.table.clone(),
config.common.database.clone(),
config.common.user.clone(),
config.common.password.clone(),
)
.await?
.get_columns_from_starrocks()
.await?;

for (name, column_type) in &starrocks_columns {
if column_type.contains("decimal") {
let decimal_all = column_type
.split("decimal(")
.last()
.ok_or_else(|| SinkError::Starrocks("must have last".to_string()))?
.split(')')
.next()
.ok_or_else(|| SinkError::Starrocks("must have next".to_string()))?
.split(',')
.collect_vec();
let length = decimal_all
.first()
.ok_or_else(|| SinkError::Starrocks("must have next".to_string()))?
.parse::<u8>()
.map_err(|e| {
SinkError::Starrocks(format!("starrocks sink error: {}", e.as_report()))
})?;

let scale = decimal_all
.last()
.ok_or_else(|| SinkError::Starrocks("must have next".to_string()))?
.parse::<u8>()
.map_err(|e| {
SinkError::Starrocks(format!("starrocks sink error: {}", e.as_report()))
})?;
decimal_map.insert(name.to_string(), (length, scale));
}
}
let mut fields_name = schema.names_str();
if !is_append_only {
fields_name.push(STARROCKS_DELETE_SIGN);
Expand All @@ -367,7 +323,7 @@ impl StarrocksSinkWriter {
inserter_innet_builder: starrocks_insert_builder,
is_append_only,
client: None,
row_encoder: JsonEncoder::new_with_starrocks(schema, None, decimal_map),
row_encoder: JsonEncoder::new_with_starrocks(schema, None),
})
}

Expand Down

0 comments on commit 9dfbe22

Please sign in to comment.