Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(connector): SinkExecutor init + MySQLSink mvp #2969

Merged
merged 65 commits into from
Jun 13, 2022

Conversation

nanderstabel
Copy link
Contributor

@nanderstabel nanderstabel commented Jun 2, 2022

What's changed and what's your intention?

Intention is to create a simple start of the implementation of a the SinkExecutor MVP.

Please explain IN DETAIL what the changes are in this PR and why they are needed:

  • Sink trait is added:
pub trait Sink {
    fn write_batch(&mut self);
}
  • As well as a simple SinkExecutor struct:
pub struct SinkExecutor<S: Sink> {
    child: BoxedExecutor,
    external_sink: S,
    identity: String
}
  • As well as an initial MySQLSink struct:
pub struct MySQLSink {
    endpoint: String,
    table: String,
    database: Option<String>,
    user: Option<String>,
    password: Option<String>,
}

For MySQLSink is a mvp version of Fn write_batch implemented.

Checklist

  • I have written necessary docs and comments
  • I have added necessary unit tests and integration tests
  • All checks passed in ./risedev check (or alias, ./risedev c)

@nanderstabel nanderstabel changed the title SinkExecutor init feat(stream): SinkExecutor + Trait init Jun 2, 2022
@neverchanje
Copy link
Contributor

I suggest you should create a new crate for the sink code. Because we may not want the MySQL and redis libs bundled in the streaming crate.

@tabVersion
Copy link
Contributor

I suggest you should create a new crate for the sink code.

is sink on the same level of source? we may reuse the connector's code later?

@nanderstabel
Copy link
Contributor Author

I suggest you should create a new crate for the sink code.

is sink on the same level of source? we may reuse the connector's code later?

I was thinking it would make sense to move sink into the connector crate and have a separate sink and source directory there. What do you think @neverchanje ?

@neverchanje
Copy link
Contributor

I was thinking it would make sense to move sink into the connector crate and have a separate sink and source directory there. What do you think @neverchanje ?

I partly agree, we can go iteratively and leave the sink code in connector crate in the first version.

Copy link
Contributor

@github-actions github-actions bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

license-eye has totally checked 832 files.

Valid Invalid Ignored Fixed
830 1 1 0
Click to see the invalid file list
  • src/frontend/test_runner/tests/gen/testcases.rs

src/frontend/test_runner/tests/gen/testcases.rs Outdated Show resolved Hide resolved


#[tokio::test]
async fn test_expr() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't have this file now. It was removed by some pr long ago.

pub struct RedisSink;

#[async_trait]
impl Sink for RedisSink {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

/sink/mysql.rs
/sink/redis.rs
/sink/mod.rs

Personally recommend this way to organize the files.

Comment on lines 31 to 35
fn endpoint(&self) -> String;
fn table(&self) -> String;
fn database(&self) -> Option<String>;
fn user(&self) -> Option<String>;
fn password(&self) -> Option<String>; // TODO(nanderstabel): auth?
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is sink-specific. Not all sinks have the concepts of "table", "database", or "user". You need to generalize them into a "SinkConfig", which could be an enum.

pub enum SinkConfig {
  Mysql(MysqlConfig),
  Redis(RedisConfig)
}

impl Sink {
  fn new(cfg: SinkConfig) {
    match cfg {
      SinkConfig::Mysql(cfg) => ...
    }
  }
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not 100% what you mean by impl Sink since Sink is a trait?

I solved it now like this:

pub enum SinkImpl {
    MySQL(MySQLSink),
    Redis(RedisSink),
}

impl SinkImpl {
    fn new(cfg: SinkConfig) -> Self {
        match cfg {
            SinkConfig::Mysql(cfg) => SinkImpl::MySQL(MySQLSink::new(cfg)),
            SinkConfig::Redis(cfg) => SinkImpl::Redis(RedisSink::new(cfg)),
        }
    }
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry for not giving the right syntax. I'm indicating https://doc.rust-lang.org/book/ch10-02-traits.html. Anyway, SinkImpl also looks good to me.

@neverchanje
Copy link
Contributor

The overall design looks nice to me, but I think the e2e testing would be a hard problem to solve.

@@ -23,6 +23,7 @@ lru = { git = "https://github.com/singularity-data/lru-rs.git", rev = "e0e9ddaf8
madsim = "=0.2.0-alpha.3"
memcomparable = { path = "../utils/memcomparable" }
more-asserts = "0.3"
mysql_async = "0.30"
Copy link
Contributor

@skyzh skyzh Jun 13, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't add more dependencies to common crate 🤣

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

@@ -138,6 +139,8 @@ pub enum ErrorCode {
},
#[error("Invalid Parameter Value: {0}")]
InvalidParameterValue(String),
#[error("MySQL error: {0}")]
MySQLError(MySQLError),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use BoxedError here so that we don't need to import mysql_async to common crate

@codecov
Copy link

codecov bot commented Jun 13, 2022

Codecov Report

Merging #2969 (ca2a6e6) into main (1b749b3) will decrease coverage by 0.05%.
The diff coverage is 43.21%.

@@            Coverage Diff             @@
##             main    #2969      +/-   ##
==========================================
- Coverage   73.65%   73.59%   -0.06%     
==========================================
  Files         740      744       +4     
  Lines      101830   102029     +199     
==========================================
+ Hits        75001    75089      +88     
- Misses      26829    26940     +111     
Flag Coverage Δ
rust 73.59% <43.21%> (-0.06%) ⬇️

Flags with carried forward coverage won't be shown. Click here to find out more.

Impacted Files Coverage Δ
src/common/src/error.rs 66.17% <0.00%> (-0.25%) ⬇️
src/common/src/types/mod.rs 67.91% <ø> (ø)
src/connector/src/lib.rs 100.00% <ø> (ø)
src/connector/src/sink/mod.rs 0.00% <0.00%> (ø)
src/connector/src/sink/redis.rs 0.00% <0.00%> (ø)
src/stream/src/executor/mod.rs 48.63% <ø> (ø)
src/connector/src/sink/mysql.rs 43.97% <43.97%> (ø)
src/stream/src/executor/sink.rs 64.86% <64.86%> (ø)
src/common/src/types/ordered_float.rs 24.70% <0.00%> (-0.20%) ⬇️
src/common/src/types/chrono_wrapper.rs 83.07% <0.00%> (+2.30%) ⬆️

📣 Codecov can now indicate which changes are the most critical in Pull Requests. Learn more

@nanderstabel nanderstabel marked this pull request as ready for review June 13, 2022 11:56
@nanderstabel nanderstabel enabled auto-merge (squash) June 13, 2022 11:56
Copy link
Contributor

@neverchanje neverchanje left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lgtm, great work!

@nanderstabel nanderstabel merged commit b7537bb into main Jun 13, 2022
@nanderstabel nanderstabel deleted the ns-sink-write-batch branch June 13, 2022 12:07
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

4 participants