Skip to content

Commit

Permalink
refactor: remove create_source/sync_source rpc in CN(#5269) (#5654)
Browse files Browse the repository at this point in the history
* refactor: wrap source creation into SourceDescBuilder to create stream source.

Signed-off-by: Qinglin Pan <qinglin@risingwave-labs.com>

* remove StreamSource related part of create_source rpc in CN

Signed-off-by: Qinglin Pan <qinglin@risingwave-labs.com>

* fix typo

Signed-off-by: Qinglin Pan <qinglin@risingwave-labs.com>

* fix unit test

Signed-off-by: Qinglin Pan <qinglin@risingwave-labs.com>

* remove SourceType field in SourceNode

Signed-off-by: Qinglin Pan <qinglin@risingwave-labs.com>

* remap row_id_index in SourceExecutor

Signed-off-by: Qinglin Pan <qinglin@risingwave-labs.com>

* build table source in SourceDescBuilder, remove create_source/sync_source rpc

- move create_table_source logic into SourceDescBuilder
- remove create_source rpc in CN
- remove sync_source rpc in CN
- fix all unittests' complaination

Signed-off-by: Qinglin Pan <qinglin@risingwave-labs.com>

* fix misc check error

Signed-off-by: Qinglin Pan <qinglin@risingwave-labs.com>

* move create_table_info into source::table::test_utils

Signed-off-by: Qinglin Pan <qinglin@risingwave-labs.com>

* remove row_id_index field from source_executor

Signed-off-by: Qinglin Pan <qinglin@risingwave-labs.com>

* remove SourceCatalogType in frontend

Signed-off-by: Qinglin Pan <qinglin@risingwave-labs.com>

* Update src/stream/src/executor/source/source_executor.rs

Co-authored-by: Bugen Zhao <i@bugenzhao.com>
Signed-off-by: Qinglin Pan <qinglin@risingwave-labs.com>

Signed-off-by: Qinglin Pan <qinglin@risingwave-labs.com>
Co-authored-by: Bugen Zhao <i@bugenzhao.com>
  • Loading branch information
PanQL and BugenZhao committed Oct 6, 2022
1 parent a9f1408 commit bd3bd59
Show file tree
Hide file tree
Showing 25 changed files with 425 additions and 592 deletions.
12 changes: 5 additions & 7 deletions proto/stream_plan.proto
Original file line number Diff line number Diff line change
Expand Up @@ -107,15 +107,13 @@ message ActorMapping {

// todo: StreamSourceNode or TableSourceNode
message SourceNode {
enum SourceType {
UNSPECIFIED = 0;
TABLE = 1;
SOURCE = 2;
}
uint32 source_id = 1; // use source_id to fetch SourceDesc from local source manager
repeated int32 column_ids = 2;
SourceType source_type = 3;
catalog.Table state_table = 4;
catalog.Table state_table = 3;
oneof info {
catalog.StreamSourceInfo stream_source = 4;
catalog.TableSourceInfo table_source = 5;
}
}

message SinkNode {
Expand Down
19 changes: 0 additions & 19 deletions proto/stream_service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ syntax = "proto3";

package stream_service;

import "catalog.proto";
import "common.proto";
import "hummock.proto";
import "stream_plan.proto";
Expand Down Expand Up @@ -97,14 +96,6 @@ message BroadcastActorInfoTableResponse {
common.Status status = 1;
}

message CreateSourceRequest {
catalog.Source source = 1;
}

message CreateSourceResponse {
common.Status status = 1;
}

message DropSourceRequest {
uint32 source_id = 1;
}
Expand All @@ -113,14 +104,6 @@ message DropSourceResponse {
common.Status status = 1;
}

message SyncSourcesRequest {
repeated catalog.Source sources = 1;
}

message SyncSourcesResponse {
common.Status status = 1;
}

message WaitEpochCommitRequest {
uint64 epoch = 1;
}
Expand All @@ -136,8 +119,6 @@ service StreamService {
rpc DropActors(DropActorsRequest) returns (DropActorsResponse);
rpc ForceStopActors(ForceStopActorsRequest) returns (ForceStopActorsResponse);
rpc InjectBarrier(InjectBarrierRequest) returns (InjectBarrierResponse);
rpc CreateSource(CreateSourceRequest) returns (CreateSourceResponse);
rpc SyncSources(SyncSourcesRequest) returns (SyncSourcesResponse);
rpc DropSource(DropSourceRequest) returns (DropSourceResponse);
rpc BarrierComplete(BarrierCompleteRequest) returns (BarrierCompleteResponse);
rpc WaitEpochCommit(WaitEpochCommitRequest) returns (WaitEpochCommitResponse);
Expand Down
33 changes: 8 additions & 25 deletions src/batch/src/executor/delete.rs
Original file line number Diff line number Diff line change
Expand Up @@ -145,17 +145,18 @@ mod tests {

use futures::StreamExt;
use risingwave_common::array::Array;
use risingwave_common::catalog::{schema_test_utils, ColumnDesc, ColumnId};
use risingwave_common::catalog::schema_test_utils;
use risingwave_common::test_prelude::DataChunkTestExt;
use risingwave_source::{MemSourceManager, SourceManager};
use risingwave_source::table_test_utils::create_table_info;
use risingwave_source::{MemSourceManager, SourceDescBuilder, SourceManagerRef};

use super::*;
use crate::executor::test_utils::MockExecutor;
use crate::*;

#[tokio::test]
async fn test_delete_executor() -> Result<()> {
let source_manager = Arc::new(MemSourceManager::default());
let source_manager: SourceManagerRef = Arc::new(MemSourceManager::default());

// Schema for mock executor.
let schema = schema_test_utils::ii();
Expand All @@ -164,19 +165,6 @@ mod tests {
// Schema of the table
let schema = schema_test_utils::ii();

let table_columns: Vec<_> = schema
.fields
.iter()
.enumerate()
.map(|(i, f)| ColumnDesc {
data_type: f.data_type.clone(),
column_id: ColumnId::from(i as i32), // use column index as column id
name: f.name.clone(),
field_descs: vec![],
type_name: "".to_string(),
})
.collect();

mock_executor.add(DataChunk::from_pretty(
"i i
1 2
Expand All @@ -185,20 +173,15 @@ mod tests {
7 8
9 10",
));
let row_id_index = None;
let pk_column_ids = vec![1];

let info = create_table_info(&schema, None, vec![1]);

// Create the table.
let table_id = TableId::new(0);
source_manager.create_table_source(
&table_id,
table_columns.to_vec(),
row_id_index,
pk_column_ids,
)?;
let source_builder = SourceDescBuilder::new(table_id, &info, &source_manager);

// Create reader
let source_desc = source_manager.get_source(&table_id)?;
let source_desc = source_builder.build().await?;
let source = source_desc.source.as_table().unwrap();
let mut reader = source.stream_reader(vec![0.into(), 1.into()]).await?;

Expand Down
31 changes: 7 additions & 24 deletions src/batch/src/executor/insert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -158,10 +158,11 @@ mod tests {

use futures::StreamExt;
use risingwave_common::array::{Array, ArrayImpl, I32Array, StructArray};
use risingwave_common::catalog::{schema_test_utils, ColumnDesc, ColumnId};
use risingwave_common::catalog::schema_test_utils;
use risingwave_common::column_nonnull;
use risingwave_common::types::DataType;
use risingwave_source::{MemSourceManager, SourceManager};
use risingwave_source::table_test_utils::create_table_info;
use risingwave_source::{MemSourceManager, SourceDescBuilder, SourceManagerRef};
use risingwave_storage::memory::MemoryStateStore;
use risingwave_storage::store::ReadOptions;
use risingwave_storage::*;
Expand All @@ -172,7 +173,7 @@ mod tests {

#[tokio::test]
async fn test_insert_executor() -> Result<()> {
let source_manager = Arc::new(MemSourceManager::default());
let source_manager: SourceManagerRef = Arc::new(MemSourceManager::default());
let store = MemoryStateStore::new();

// Make struct field
Expand All @@ -190,18 +191,6 @@ mod tests {
let mut schema = schema_test_utils::ii();
schema.fields.push(struct_field);
schema.fields.push(Field::unnamed(DataType::Int64)); // row_id column
let table_columns: Vec<_> = schema
.fields
.iter()
.enumerate()
.map(|(i, f)| ColumnDesc {
data_type: f.data_type.clone(),
column_id: ColumnId::from(i as i32), // use column index as column id
name: f.name.clone(),
field_descs: vec![],
type_name: "".to_string(),
})
.collect();

let col1 = column_nonnull! { I32Array, [1, 3, 5, 7, 9] };
let col2 = column_nonnull! { I32Array, [2, 4, 6, 8, 10] };
Expand All @@ -219,20 +208,14 @@ mod tests {
mock_executor.add(data_chunk.clone());

// To match the row_id column in the schema
let row_id_index = Some(3);
let pk_column_ids = vec![3];
let info = create_table_info(&schema, Some(3), vec![3]);

// Create the table.
let table_id = TableId::new(0);
source_manager.create_table_source(
&table_id,
table_columns.to_vec(),
row_id_index,
pk_column_ids,
)?;
let source_builder = SourceDescBuilder::new(table_id, &info, &source_manager);

// Create reader
let source_desc = source_manager.get_source(&table_id)?;
let source_desc = source_builder.build().await?;
let source = source_desc.source.as_table().unwrap();
let mut reader = source
.stream_reader(vec![0.into(), 1.into(), 2.into()])
Expand Down
32 changes: 7 additions & 25 deletions src/batch/src/executor/update.rs
Original file line number Diff line number Diff line change
Expand Up @@ -196,18 +196,19 @@ mod tests {

use futures::StreamExt;
use risingwave_common::array::Array;
use risingwave_common::catalog::{schema_test_utils, ColumnDesc, ColumnId};
use risingwave_common::catalog::schema_test_utils;
use risingwave_common::test_prelude::DataChunkTestExt;
use risingwave_expr::expr::InputRefExpression;
use risingwave_source::{MemSourceManager, SourceManager};
use risingwave_source::table_test_utils::create_table_info;
use risingwave_source::{MemSourceManager, SourceDescBuilder, SourceManagerRef};

use super::*;
use crate::executor::test_utils::MockExecutor;
use crate::*;

#[tokio::test]
async fn test_update_executor() -> Result<()> {
let source_manager = Arc::new(MemSourceManager::default());
let source_manager: SourceManagerRef = Arc::new(MemSourceManager::default());

// Schema for mock executor.
let schema = schema_test_utils::ii();
Expand All @@ -216,19 +217,6 @@ mod tests {
// Schema of the table
let schema = schema_test_utils::ii();

let table_columns: Vec<_> = schema
.fields
.iter()
.enumerate()
.map(|(i, f)| ColumnDesc {
data_type: f.data_type.clone(),
column_id: ColumnId::from(i as i32), // use column index as column id
name: f.name.clone(),
field_descs: vec![],
type_name: "".to_string(),
})
.collect();

mock_executor.add(DataChunk::from_pretty(
"i i
1 2
Expand All @@ -245,18 +233,12 @@ mod tests {
];

// Create the table.
let info = create_table_info(&schema, None, vec![1]);
let table_id = TableId::new(0);
let row_id_index = None;
let pk_column_ids = vec![1];
source_manager.create_table_source(
&table_id,
table_columns.to_vec(),
row_id_index,
pk_column_ids,
)?;
let source_builder = SourceDescBuilder::new(table_id, &info, &source_manager);

// Create reader
let source_desc = source_manager.get_source(&table_id)?;
let source_desc = source_builder.build().await?;
let source = source_desc.source.as_table().unwrap();
let mut reader = source.stream_reader(vec![0.into(), 1.into()]).await?;

Expand Down
7 changes: 7 additions & 0 deletions src/common/src/util/prost.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use risingwave_pb::stream_plan::source_node::Info as SourceInfo;
use risingwave_pb::stream_plan::SourceNode;
use risingwave_pb::{batch_plan, data};

pub trait TypeUrl {
Expand All @@ -29,3 +31,8 @@ impl TypeUrl for data::Column {
"type.googleapis.com/data.Column"
}
}

#[inline(always)]
pub fn is_stream_source(source_node: &SourceNode) -> bool {
matches!(source_node.info.as_ref(), Some(SourceInfo::StreamSource(_)))
}
66 changes: 1 addition & 65 deletions src/compute/src/rpc/service/stream_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,7 @@ use std::sync::Arc;
use async_stack_trace::StackTrace;
use itertools::Itertools;
use risingwave_common::catalog::TableId;
use risingwave_common::error::{tonic_err, Result as RwResult};
use risingwave_pb::catalog::Source;
use risingwave_common::error::tonic_err;
use risingwave_pb::stream_service::barrier_complete_response::GroupedSstableInfo;
use risingwave_pb::stream_service::stream_service_server::StreamService;
use risingwave_pb::stream_service::*;
Expand Down Expand Up @@ -199,35 +198,6 @@ impl StreamService for StreamServiceImpl {
Ok(Response::new(WaitEpochCommitResponse { status: None }))
}

#[cfg_attr(coverage, no_coverage)]
async fn create_source(
&self,
request: Request<CreateSourceRequest>,
) -> Result<Response<CreateSourceResponse>, Status> {
let source = request.into_inner().source.unwrap();
self.create_source_inner(&source).await.map_err(tonic_err)?;
tracing::debug!(id = %source.id, "create table source");

Ok(Response::new(CreateSourceResponse { status: None }))
}

#[cfg_attr(coverage, no_coverage)]
async fn sync_sources(
&self,
request: Request<SyncSourcesRequest>,
) -> Result<Response<SyncSourcesResponse>, Status> {
let sources = request.into_inner().sources;
self.env
.source_manager()
.clear_sources()
.map_err(tonic_err)?;
for source in sources {
self.create_source_inner(&source).await.map_err(tonic_err)?
}

Ok(Response::new(SyncSourcesResponse { status: None }))
}

#[cfg_attr(coverage, no_coverage)]
async fn drop_source(
&self,
Expand All @@ -246,37 +216,3 @@ impl StreamService for StreamServiceImpl {
Ok(Response::new(DropSourceResponse { status: None }))
}
}

impl StreamServiceImpl {
async fn create_source_inner(&self, source: &Source) -> RwResult<()> {
use risingwave_pb::catalog::source::Info;

let id = TableId::new(source.id); // TODO: use SourceId instead

match source.get_info()? {
Info::StreamSource(info) => {
self.env
.source_manager()
.create_source(&id, info.to_owned())
.await?;
}
Info::TableSource(info) => {
let columns = info
.columns
.iter()
.cloned()
.map(|c| c.column_desc.unwrap().into())
.collect_vec();

self.env.source_manager().create_table_source(
&id,
columns,
info.row_id_index.as_ref().map(|index| index.index as _),
info.pk_column_ids.clone(),
)?;
}
};

Ok(())
}
}

0 comments on commit bd3bd59

Please sign in to comment.