Skip to content

Commit

Permalink
add upsert
Browse files Browse the repository at this point in the history
modify
  • Loading branch information
xxhZs committed Sep 21, 2023
1 parent 5a7e088 commit b571f08
Show file tree
Hide file tree
Showing 13 changed files with 68 additions and 34 deletions.
3 changes: 1 addition & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

33 changes: 26 additions & 7 deletions integration_tests/doris-sink/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,27 @@

In this demo, we want to showcase how RisingWave is able to sink data to Doris.

1. Launch the cluster:
1. Modify max_map_count

```sh
# linux
sysctl -w vm.max_map_count=2000000
# Macos
docker run -it --privileged --pid=host --name=change_count debian nsenter -t 1 -m -u -n -i sh
```

If, after running these commands, Docker still encounters Doris startup errors, please refer to: https://doris.apache.org/docs/dev/install/construct-docker/run-docker-cluster


2. Launch the cluster:

```sh
docker-compose up -d
```

The cluster contains a RisingWave cluster and its necessary dependencies, a datagen that generates the data, a Doris fe and be for sink.


2. Create the Doris table via mysql:
3. Create the Doris table via mysql:

Login to mysql
```sh
Expand All @@ -35,12 +46,20 @@ CREATE USER 'users'@'%' IDENTIFIED BY '123456';
GRANT ALL ON *.* TO 'users'@'%';
```

3. Execute the SQL queries in sequence:
4. Execute the SQL queries in sequence:

- append-only sql:
- append-only/create_source.sql
- append-only/create_mv.sql
- append-only/create_sink.sql

- create_source.sql
- create_mv.sql
- create_sink.sql
- upsert sql:
- upsert/create_table.sql
- upsert/create_mv.sql
- upsert/create_sink.sql
- upsert/insert_update_delete.sql

We only support `upsert` with doris' `UNIQUE KEY`

Run the following query
```sql
Expand Down
File renamed without changes.
19 changes: 0 additions & 19 deletions integration_tests/doris-sink/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -82,25 +82,6 @@ services:
networks:
mynetwork:
ipv4_address: 172.21.0.11
message_queue:
extends:
file: ../../docker/docker-compose.yml
service: message_queue
networks:
mynetwork:
ipv4_address: 172.21.0.12
datagen:
build: ../datagen
depends_on: [message_queue]
command:
- /bin/sh
- -c
- /datagen --mode clickstream --qps 2 kafka --brokers message_queue:29092
restart: always
container_name: datagen
networks:
mynetwork:
ipv4_address: 172.21.0.13
volumes:
compute-node-0:
external: false
Expand Down
7 changes: 7 additions & 0 deletions integration_tests/doris-sink/upsert/create_mv.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
CREATE MATERIALIZED VIEW bhv_mv AS
SELECT
user_id,
target_id,
event_timestamp
FROM
user_behaviors;
12 changes: 12 additions & 0 deletions integration_tests/doris-sink/upsert/create_sink.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
CREATE SINK bhv_doris_sink
FROM
bhv_mv WITH (
connector = 'doris',
type = 'upsert',
doris.url = 'http://fe:8030',
doris.user = 'users',
doris.password = '123456',
doris.database = 'demo',
doris.table='demo_bhv_table',
primary_key = 'user_id'
);
10 changes: 10 additions & 0 deletions integration_tests/doris-sink/upsert/create_table.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
CREATE table user_behaviors (
user_id int,
target_id VARCHAR,
target_type VARCHAR,
event_timestamp TIMESTAMP,
behavior_type VARCHAR,
parent_target_type VARCHAR,
parent_target_id VARCHAR,
PRIMARY KEY(user_id)
);
8 changes: 8 additions & 0 deletions integration_tests/doris-sink/upsert/insert_update_delete.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
INSERT INTO user_behaviors VALUES(1,'1','1','2020-01-01 01:01:01','1','1','1'),
(2,'2','2','2020-01-01 01:01:02','2','2','2'),
(3,'3','3','2020-01-01 01:01:03','3','3','3'),
(4,'4','4','2020-01-01 01:01:04','4','4','4');

DELETE FROM user_behaviors WHERE user_id = 2;

UPDATE user_behaviors SET target_id = 30 WHERE user_id = 3;
4 changes: 2 additions & 2 deletions src/connector/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ http = "0.2"
hyper = { version = "0.14", features = ["client", "tcp", "http1", "http2", "stream"] }
hyper-tls = "0.5"
icelake = { workspace = true }
indexmap ={ version = "2.0.0", features = ["serde"] }
indexmap ={ version = "1.9.3", features = ["serde"] }
itertools = "0.11"
jni = { version = "0.21.1", features = ["invocation"] }
jsonschema-transpiler = { git = "https://github.com/mozilla/jsonschema-transpiler", rev = "c1a89d720d118843d8bcca51084deb0ed223e4b4" }
Expand Down Expand Up @@ -99,7 +99,7 @@ risingwave_rpc_client = { workspace = true }
rust_decimal = "1"
serde = { version = "1", features = ["derive", "rc"] }
serde_derive = "1"
serde_json = { version = "1" }
serde_json = "1"
serde_with = { version = "3", features = ["json"] }
simd-json = "0.10.6"
tempfile = "3"
Expand Down
2 changes: 1 addition & 1 deletion src/connector/src/sink/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ pub fn doris_rows_to_json(
let encoder = JsonEncoder::new_with_doris(
schema,
None,
TimestampHandlingMode::Milli,
TimestampHandlingMode::String,
decimal_map.clone(),
);
let map = encoder.encode(row)?;
Expand Down
4 changes: 1 addition & 3 deletions src/workspace-hack/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,7 @@ hashbrown-582f2526e08bb6a0 = { package = "hashbrown", version = "0.14", features
hashbrown-5ef9efb8ec2df382 = { package = "hashbrown", version = "0.12", features = ["nightly", "raw"] }
heck = { version = "0.4", features = ["unicode"] }
hyper = { version = "0.14", features = ["full"] }
indexmap-dff4ba8e3ae991db = { package = "indexmap", version = "1", default-features = false, features = ["std"] }
indexmap-f595c2ba2a3f28df = { package = "indexmap", version = "2", features = ["serde"] }
indexmap = { version = "1", default-features = false, features = ["serde", "std"] }
itertools = { version = "0.10" }
jni = { version = "0.21", features = ["invocation"] }
lazy_static = { version = "1", default-features = false, features = ["spin_no_std"] }
Expand Down Expand Up @@ -155,7 +154,6 @@ futures-task = { version = "0.3" }
futures-util = { version = "0.3", features = ["channel", "io", "sink"] }
hashbrown-582f2526e08bb6a0 = { package = "hashbrown", version = "0.14", features = ["nightly", "raw"] }
heck = { version = "0.4", features = ["unicode"] }
indexmap-f595c2ba2a3f28df = { package = "indexmap", version = "2", features = ["serde"] }
itertools = { version = "0.10" }
lazy_static = { version = "1", default-features = false, features = ["spin_no_std"] }
libc = { version = "0.2", features = ["extra_traits"] }
Expand Down

0 comments on commit b571f08

Please sign in to comment.