Skip to content

Commit

Permalink
Merge branch 'main' into bz/scale-sim
Browse files Browse the repository at this point in the history
  • Loading branch information
mergify[bot] committed Oct 8, 2022
2 parents e275e05 + e14255a commit 1870972
Show file tree
Hide file tree
Showing 93 changed files with 1,345 additions and 612 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions ci/scripts/build.sh
Original file line number Diff line number Diff line change
Expand Up @@ -60,3 +60,6 @@ buildkite-agent artifact upload ./sqlsmith-"$profile"
echo "--- upload misc"
cp src/source/src/test_data/simple-schema.avsc ./avro-simple-schema.avsc
buildkite-agent artifact upload ./avro-simple-schema.avsc

cp src/source/src/test_data/complex-schema.avsc ./avro-complex-schema.avsc
buildkite-agent artifact upload ./avro-complex-schema.avsc
8 changes: 2 additions & 6 deletions ci/scripts/deterministic-recovery-test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,11 @@ export RUST_LOG=off

# bugs here! Tracking issue https://github.com/risingwavelabs/risingwave/issues/4527
echo "--- deterministic simulation e2e, ci-3cn-1fe, recovery, streaming"
seq 1 | parallel MADSIM_TEST_SEED={} $RUNNER --kill-compute './e2e_test/streaming/\*\*/\*.slt'
seq 1 | parallel MADSIM_TEST_SEED={} $RUNNER --kill-meta --kill-frontend --kill-compute './e2e_test/streaming/\*\*/\*.slt'

# bugs here! Tracking issue https://github.com/risingwavelabs/risingwave/issues/4527
echo "--- deterministic simulation e2e, ci-3cn-1fe, recovery, batch"
seq 1 | parallel MADSIM_TEST_SEED={} $RUNNER --kill-compute './e2e_test/batch/\*\*/\*.slt'

# bugs here! Tracking issue https://github.com/risingwavelabs/risingwave/issues/4527
echo "--- deterministic simulation e2e, ci-3cn-1fe, recovery, streaming"
seq 1 | parallel MADSIM_TEST_SEED={} $RUNNER --kill-meta './e2e_test/streaming/\*\*/\*.slt'
seq 1 | parallel MADSIM_TEST_SEED={} $RUNNER --kill-meta --kill-frontend --kill-compute './e2e_test/batch/\*\*/\*.slt'

# bugs here! Tracking issue https://github.com/risingwavelabs/risingwave/issues/5103
echo "--- deterministic simulation e2e, ci-3cn-1fe, recovery, streaming"
Expand Down
1 change: 1 addition & 0 deletions ci/scripts/e2e-source-test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ mv target/debug/risedev-dev-"$profile" target/debug/risedev-dev

echo "--- Download mise"
buildkite-agent artifact download avro-simple-schema.avsc ./
buildkite-agent artifact download avro-complex-schema.avsc ./

echo "--- Adjust permission"
chmod +x ./target/debug/risingwave
Expand Down
13 changes: 13 additions & 0 deletions e2e_test/source/kafka.slt
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,16 @@ create materialized source s9 with (
statement ok
select * from s9

statement ok
create materialized source s10 with (
connector = 'kafka', topic = 'avro_bin_c',
properties.bootstrap.server = '127.0.0.1:29092',
scan.startup.mode = 'earliest'
) row format avro message 'user' row schema location 'file:///risingwave/avro-complex-schema.avsc'

statement ok
select * from s10

statement ok
flush;

Expand Down Expand Up @@ -268,3 +278,6 @@ drop source s8

statement ok
drop source s9

statement ok
drop source s10
12 changes: 10 additions & 2 deletions proto/meta.proto
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ message TableFragments {
// Note that this can be derived backwards from the upstream actors of the Actor held by the Fragment,
// but in some scenarios (e.g. Scaling) it will lead to a lot of duplicate code,
// so we pre-generate and store it here, this member will only be initialized when creating the Fragment
// and modified when and modified when creating the mv-on-mv
// and modified when creating the mv-on-mv
repeated uint32 upstream_fragment_ids = 7;
}
uint32 table_id = 1;
Expand Down Expand Up @@ -158,9 +158,17 @@ service ClusterService {
rpc ListAllNodes(ListAllNodesRequest) returns (ListAllNodesResponse);
}

enum SubscribeType {
UNSPECIFIED = 0;
FRONTEND = 1;
COMPUTE_NODE = 2;
RISE_CTL = 3;
COMPACTOR = 4;
}

// Below for notification service.
message SubscribeRequest {
common.WorkerType worker_type = 1;
SubscribeType subscribe_type = 1;
common.HostAddress host = 2;
uint32 worker_id = 3;
}
Expand Down
2 changes: 2 additions & 0 deletions proto/stream_plan.proto
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ message UpdateMutation {
map<uint32, common.Buffer> actor_vnode_bitmap_update = 3;
// All actors to be dropped in this update.
repeated uint32 dropped_actors = 4;
// Source updates.
map<uint32, source.ConnectorSplits> actor_splits = 5;
}

message SourceChangeSplitMutation {
Expand Down
Binary file added scripts/source/test_data/avro_bin_c.1
Binary file not shown.
5 changes: 5 additions & 0 deletions src/common/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -66,4 +66,9 @@ darwin-libproc = { git = "https://github.com/risingwavelabs/darwin-libproc.git",
libc = "0.2.72"

[dev-dependencies]
criterion = "0.4"
rand = "0.8"

[[bench]]
name = "bench_encoding"
harness = false
205 changes: 205 additions & 0 deletions src/common/benches/bench_encoding.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,205 @@
// Copyright 2022 Singularity Data
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::env;
use std::sync::Arc;

use criterion::{criterion_group, criterion_main, Criterion};
use risingwave_common::array::{ListValue, StructValue};
use risingwave_common::types::struct_type::StructType;
use risingwave_common::types::{
deserialize_datum_from, serialize_datum_into, DataType, Datum, IntervalUnit,
NaiveDateTimeWrapper, NaiveDateWrapper, NaiveTimeWrapper, ScalarImpl,
};
use risingwave_common::util::value_encoding::{deserialize_datum, serialize_datum};

const ENV_BENCH_SER: &str = "BENCH_SER";
const ENV_BENCH_DE: &str = "BENCH_DE";
const ENV_CASE: &str = "CASE";

struct Case {
name: String,
ty: DataType,
datum: Datum,
}

impl Case {
pub fn new(name: &str, ty: DataType, scalar: ScalarImpl) -> Self {
Self {
name: name.to_string(),
ty,
datum: Some(scalar),
}
}
}

fn key_serialization(datum: &Datum) -> Vec<u8> {
let mut serializer = memcomparable::Serializer::new(vec![]);
serialize_datum_into(datum, &mut serializer).unwrap();
serializer.into_inner()
}

fn value_serialization(datum: &Datum) -> Vec<u8> {
let mut buf = vec![];
serialize_datum(datum, &mut buf);
buf
}

fn key_deserialization(ty: &DataType, datum: &[u8]) {
let mut deserializer = memcomparable::Deserializer::new(datum);
let _ = deserialize_datum_from(ty, &mut deserializer);
}

fn value_deserialization(ty: &DataType, datum: &[u8]) {
let _ = deserialize_datum(datum, ty);
}

fn bench_encoding(c: &mut Criterion) {
let cases = vec![
Case::new("Int16", DataType::Int16, ScalarImpl::Int16(1)),
Case::new("Int32", DataType::Int32, ScalarImpl::Int32(1)),
Case::new("Int64", DataType::Int64, ScalarImpl::Int64(1)),
Case::new(
"Float32",
DataType::Float32,
ScalarImpl::Float32(1.0.into()),
),
Case::new(
"Float64",
DataType::Float64,
ScalarImpl::Float64(1.0.into()),
),
Case::new("Bool", DataType::Boolean, ScalarImpl::Bool(true)),
Case::new(
"Decimal",
DataType::Decimal,
ScalarImpl::Decimal("12.13".parse().unwrap()),
),
Case::new(
"Interval",
DataType::Interval,
ScalarImpl::Interval(IntervalUnit::default()),
),
Case::new(
"NaiveDate",
DataType::Date,
ScalarImpl::NaiveDate(NaiveDateWrapper::default()),
),
Case::new(
"NaiveDateTime",
DataType::Timestamp,
ScalarImpl::NaiveDateTime(NaiveDateTimeWrapper::default()),
),
Case::new(
"NaiveTime",
DataType::Time,
ScalarImpl::NaiveTime(NaiveTimeWrapper::default()),
),
Case::new(
"Utf8 (len = 10)",
DataType::Varchar,
ScalarImpl::Utf8(String::from_iter(vec!['a'; 10])),
),
Case::new(
"Utf8 (len = 1000)",
DataType::Varchar,
ScalarImpl::Utf8(String::from_iter(vec!['a'; 1000])),
),
Case::new(
"Utf8 (len = 10000)",
DataType::Varchar,
ScalarImpl::Utf8(String::from_iter(vec!['a'; 100000])),
),
// Use bool as the inner elem/field type to eliminate the performance gap in elem/field
// encoding.
Case::new(
"Struct of Bool (len = 100)",
DataType::Struct(Arc::new(StructType::new(vec![
(
DataType::Boolean,
"".to_string()
);
100
]))),
ScalarImpl::Struct(StructValue::new(vec![Some(ScalarImpl::Bool(true)); 100])),
),
Case::new(
"List of Bool (len = 100)",
DataType::List {
datatype: Box::new(DataType::Boolean),
},
ScalarImpl::List(ListValue::new(vec![Some(ScalarImpl::Bool(true)); 100])),
),
];

let filter = env::var(ENV_CASE).unwrap_or_else(|_| "".to_string());
let cases = cases
.into_iter()
.filter(|case| case.name.contains(&filter))
.collect::<Vec<_>>();
let bench_ser = !env::var(ENV_BENCH_SER)
.unwrap_or_else(|_| "1".to_string())
.eq("0");
let bench_de = !env::var(ENV_BENCH_DE)
.unwrap_or_else(|_| "1".to_string())
.eq("0");

if bench_ser {
for case in &cases {
// Bench key encoding.
let encoded_len = key_serialization(&case.datum).len();
println!("{} key encoded len: {}", case.name, encoded_len);
c.bench_function(
format!("bench {} (key encoding serialization)", case.name).as_str(),
|bencher| bencher.iter(|| key_serialization(&case.datum)),
);

// Bench value encoding.
let encoded_len = value_serialization(&case.datum).len();
println!("{} value encoded len: {}", case.name, encoded_len);
c.bench_function(
format!("bench {} (value encoding serialization)", case.name).as_str(),
|bencher| bencher.iter(|| value_serialization(&case.datum)),
);
}
}

if bench_de {
for case in &cases {
// Bench key encoding.
let encode_result = key_serialization(&case.datum);
c.bench_function(
format!("bench {} (key encoding deserialization)", case.name).as_str(),
|bencher| bencher.iter(|| key_deserialization(&case.ty, &encode_result)),
);

// Bench value encoding.
let encode_result = value_serialization(&case.datum);
c.bench_function(
format!("bench {} (value encoding deserialization)", case.name).as_str(),
|bencher| bencher.iter(|| value_deserialization(&case.ty, &encode_result)),
);
}
}
}

// This benchmark compares encoding performance between key/value encoding.
//
// Some environment variables are available to select particular data types to ser/de.
// - `BENCH_SER`: Set to `0` to disable benchmarking serialization. Default to `1`.
// - `BENCH_DE`: Set to `0` to disable benchmarking deserialization. Default to `1`.
// - `CASE`: Filter the data types to bench. All the test cases whose name contains this variable
// will be tested. It is case-sensitive.
criterion_group!(benches, bench_encoding);
criterion_main!(benches);

0 comments on commit 1870972

Please sign in to comment.