Skip to content

Commit

Permalink
feat: support custom configs in connector SDK (#2910)
Browse files Browse the repository at this point in the history
`connector` macro will support any custom configs for connectors.

It is done in the following way:
1. Config file is read and parsed to `serde_yaml::Value`.
2. `ConnectorConfig` is parsed from the yaml value and used to initialize Producer/Consumer.
3. User-defined config is parsed from the yaml value and passed to the user-defined function.

Validation on the deployer side is done only for `ConnectorConfig` type.

Each connector will be responsible for the validation of its own configs.
  • Loading branch information
Alexander Galibey committed Jan 5, 2023
1 parent b95ec07 commit 13bb3d2
Show file tree
Hide file tree
Showing 32 changed files with 591 additions and 173 deletions.
15 changes: 14 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions connector/json-test-connector/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ futures = { version = "0.3", default-features = false }
anyhow = { workspace = true}
async-std = { version = "1.12", default-features = false, features = ["attributes", "tokio1"]}
tokio = { version = "1.23", default-features = false, features = ["time"]}
serde = { version = "1.0", default-features = false, features = ["derive"]}

fluvio = { path = "../../crates/fluvio/", features = ["smartengine"]}
fluvio-connector-common = { path = "../../crates/fluvio-connector-common/", features = ["derive"] }
Expand Down
12 changes: 10 additions & 2 deletions connector/json-test-connector/Connector.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,16 @@ source = true
image = "infinyon/fluvio-connect-json-test-source:0.1.0"


[custom]
required = ["template", "interval"]

[[params]]
name = "template"
[custom.properties.template]
title = "template"
description = "JSON template"
type = "string"

[custom.properties.interval]
title = "Interval"
description = "Interval of producing value"
type = "integer"

5 changes: 2 additions & 3 deletions connector/json-test-connector/config-example.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,8 @@ version: 0.1.0
name: my-json-test-connector
type: json-test-source
topic: test-topic
parameters:
interval: 10
template: '{"template":"test"}'
interval: 10
template: '{"template":"test"}'
transforms:
- uses: infinyon/jolt@0.1.0
with:
Expand Down
11 changes: 9 additions & 2 deletions connector/json-test-connector/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
mod source;

use fluvio::{TopicProducer, RecordKey};
use fluvio_connector_common::{Source, connector, ConnectorConfig, Result};
use fluvio_connector_common::{Source, connector, Result};
use futures::StreamExt;
use serde::Deserialize;

use crate::source::TestJsonSource;

#[connector(source)]
async fn start(config: ConnectorConfig, producer: TopicProducer) -> Result<()> {
async fn start(config: CustomConfig, producer: TopicProducer) -> Result<()> {
let source = TestJsonSource::new(&config)?;
let mut stream = source.connect(None).await?;
while let Some(item) = stream.next().await {
Expand All @@ -16,3 +17,9 @@ async fn start(config: ConnectorConfig, producer: TopicProducer) -> Result<()> {
}
Ok(())
}

#[derive(Deserialize)]
pub(crate) struct CustomConfig {
pub interval: u64,
pub template: String,
}
19 changes: 7 additions & 12 deletions connector/json-test-connector/src/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,30 +8,25 @@ use anyhow::Result;

use async_trait::async_trait;
use fluvio::Offset;
use fluvio_connector_common::{Source, ConnectorConfig};
use fluvio_connector_common::Source;
use futures::{stream::LocalBoxStream, Stream, StreamExt};

use tokio::time::Interval;

use crate::CustomConfig;

#[derive(Debug)]
pub(crate) struct TestJsonSource {
interval: Interval,
template: String,
}

impl TestJsonSource {
pub(crate) fn new(config: &ConnectorConfig) -> Result<Self> {
let interval = match config.parameters.get("interval") {
Some(value) => value.as_u32()?,
None => anyhow::bail!("interval not found"),
};
let template = match config.parameters.get("template") {
Some(value) => value.as_string()?,
None => anyhow::bail!("template not found"),
};
pub(crate) fn new(config: &CustomConfig) -> Result<Self> {
let CustomConfig { interval, template } = config;
Ok(Self {
interval: tokio::time::interval(Duration::from_secs(interval as u64)),
template,
interval: tokio::time::interval(Duration::from_secs(*interval)),
template: template.clone(),
})
}
}
Expand Down
4 changes: 3 additions & 1 deletion connector/sink-test-connector/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
mod sink;

use fluvio_connector_common::{connector, ConnectorConfig, Result, consumer::ConsumerStream, Sink};
use fluvio_connector_common::{
connector, config::ConnectorConfig, Result, consumer::ConsumerStream, Sink,
};
use futures::SinkExt;
use sink::TestSink;

Expand Down
2 changes: 1 addition & 1 deletion connector/sink-test-connector/src/sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use anyhow::Result;
use async_trait::async_trait;

use fluvio::Offset;
use fluvio_connector_common::{ConnectorConfig, Sink, LocalBoxSink};
use fluvio_connector_common::{config::ConnectorConfig, Sink, LocalBoxSink};

#[derive(Debug)]
pub(crate) struct TestSink {}
Expand Down
13 changes: 5 additions & 8 deletions crates/cdk/src/deploy.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,11 @@
use std::{
fmt::Debug,
path::{PathBuf, Path},
};
use std::{fmt::Debug, path::PathBuf};

use anyhow::Result;
use clap::{Parser, Subcommand};

use cargo_builder::package::PackageInfo;
use fluvio_connector_deployer::{Deployment, DeploymentType};
use fluvio_connector_package::{config::ConnectorConfig, metadata::ConnectorMetadata};
use fluvio_connector_package::metadata::ConnectorMetadata;

use crate::cmd::PackageCmd;

Expand Down Expand Up @@ -47,17 +44,17 @@ impl DeployCmd {
let mut builder = Deployment::builder();
builder
.executable(p.target_bin_path()?)
.config(ConnectorConfig::from_file(self.config())?)
.config(self.config())
.pkg(connector_metadata)
.deployment_type(self.deployment_type.into());
builder.deploy()?;

Ok(())
}

fn config(&self) -> &Path {
fn config(&self) -> PathBuf {
match &self.deployment_type {
DeploymentTypeCmd::Local { config } => config.as_path(),
DeploymentTypeCmd::Local { config } => config.clone(),
}
}
}
Expand Down
1 change: 1 addition & 0 deletions crates/fluvio-connector-common/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ async-net = { version = "1.7", default-features = false }
futures-util = { version = "0.3", features = ["sink"], default-features = false }
serde = { version = "1", features = ["derive", "rc"], default-features = false }
serde_json = { version = "1" }
serde_yaml = { version = "0.8" }

fluvio = { path = "../fluvio/", features = ["smartengine"]}
fluvio-connector-package = { path = "../fluvio-connector-package/" }
Expand Down
16 changes: 16 additions & 0 deletions crates/fluvio-connector-common/src/config.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
pub use fluvio_connector_package::config::ConnectorConfig;

use std::{path::PathBuf, fs::File};

use serde::de::DeserializeOwned;
use anyhow::{Result, Context};
use serde_yaml::Value;

pub fn value_from_file<P: Into<PathBuf>>(path: P) -> Result<Value> {
let file = File::open(path.into())?;
serde_yaml::from_reader(file).context("unable to parse config file into YAML")
}

pub fn from_value<T: DeserializeOwned>(value: Value) -> Result<T> {
serde_yaml::from_value(value).context("unable to parse custom config type from YAML")
}
2 changes: 1 addition & 1 deletion crates/fluvio-connector-common/src/consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use fluvio::{FluvioConfig, Fluvio};
use fluvio::dataplane::record::ConsumerRecord;
use fluvio_sc_schema::errors::ErrorCode;
use futures::StreamExt;
use crate::{ConnectorConfig, Result};
use crate::{config::ConnectorConfig, Result};
use crate::ensure_topic_exists;
use crate::smartmodule::smartmodule_vec_from_config;

Expand Down
5 changes: 2 additions & 3 deletions crates/fluvio-connector-common/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,11 @@ pub mod producer;
pub mod smartmodule;
pub mod monitoring;
pub mod consumer;
pub mod config;

#[cfg(feature = "derive")]
pub use fluvio_connector_derive::connector;

pub use fluvio_connector_package::config::ConnectorConfig;

use fluvio::{Offset, metadata::topic::TopicSpec};
use futures::stream::LocalBoxStream;
use async_trait::async_trait;
Expand Down Expand Up @@ -36,7 +35,7 @@ pub trait Sink<I> {
async fn connect(self, offset: Option<Offset>) -> Result<LocalBoxSink<I>>;
}

pub async fn ensure_topic_exists(config: &ConnectorConfig) -> Result<()> {
pub async fn ensure_topic_exists(config: &config::ConnectorConfig) -> Result<()> {
let admin = fluvio::FluvioAdmin::connect().await?;
let topics = admin
.list::<TopicSpec, String>(vec![config.topic.clone()])
Expand Down
2 changes: 1 addition & 1 deletion crates/fluvio-connector-common/src/producer.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use fluvio::{FluvioConfig, Fluvio, TopicProducer, TopicProducerConfigBuilder};
use crate::{ConnectorConfig, Result};
use crate::{config::ConnectorConfig, Result};

use crate::{ensure_topic_exists, smartmodule::smartmodule_chain_from_config};

Expand Down
2 changes: 1 addition & 1 deletion crates/fluvio-connector-common/src/smartmodule.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use fluvio::{FluvioConfig, SmartModuleInvocation, SmartModuleKind};
use crate::{ConnectorConfig, Result};
use crate::{config::ConnectorConfig, Result};
use fluvio_smartengine::transformation::TransformationConfig;

pub async fn smartmodule_chain_from_config(
Expand Down
2 changes: 0 additions & 2 deletions crates/fluvio-connector-deployer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,4 @@ tracing = { workspace = true }
anyhow = { workspace = true }
derive_builder = { workspace = true }

tempfile = {version = "3.3", default-features = false}

fluvio-connector-package = { path = "../fluvio-connector-package" }
6 changes: 3 additions & 3 deletions crates/fluvio-connector-deployer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ use std::path::PathBuf;
use anyhow::Result;
use derive_builder::Builder;

use fluvio_connector_package::config::ConnectorConfig;
use fluvio_connector_package::metadata::ConnectorMetadata;

#[derive(Clone)]
Expand All @@ -23,7 +22,7 @@ pub struct Deployment {
pub executable: PathBuf, // path to executable
#[builder(default)]
pub secrets: Vec<Secret>, // List of Secrets
pub config: ConnectorConfig, // Configuration to pass along,
pub config: PathBuf, // Configuration to pass along,
pub pkg: ConnectorMetadata, // Connector pkg definition
pub deployment_type: DeploymentType, // deployment type
}
Expand All @@ -37,7 +36,8 @@ impl Deployment {
impl DeploymentBuilder {
pub fn deploy(self) -> Result<()> {
let deployment = self.build()?;
deployment.pkg.validate_config(&deployment.config)?;
let config_file = std::fs::File::open(&deployment.config)?;
deployment.pkg.validate_config(config_file)?;
match deployment.deployment_type {
DeploymentType::Local => local::deploy_local(&deployment)?,
DeploymentType::K8 => {
Expand Down
9 changes: 3 additions & 6 deletions crates/fluvio-connector-deployer/src/local.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,11 @@
use std::process::{Command, Stdio};

use anyhow::{Result, anyhow};
use tempfile::NamedTempFile;
use crate::Deployment;

pub(crate) fn deploy_local(deployment: &Deployment) -> Result<()> {
let (_, config_path) = NamedTempFile::new()?.keep()?;
deployment.config.write_to_file(&config_path)?;

let mut log_path = std::env::current_dir()?;
log_path.push(&deployment.config.name);
log_path.push(&deployment.pkg.package.name);
log_path.set_extension("log");
let log_file = std::fs::File::create(log_path.as_path())?;

Expand All @@ -19,7 +15,8 @@ pub(crate) fn deploy_local(deployment: &Deployment) -> Result<()> {
cmd.stderr(log_file);
cmd.arg("--config");
cmd.arg(
config_path
deployment
.config
.to_str()
.ok_or_else(|| anyhow!("illegal path of temp config file"))?,
);
Expand Down
29 changes: 27 additions & 2 deletions crates/fluvio-connector-derive/src/ast.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
use syn::{AttributeArgs, Result, Error, NestedMeta, Meta, spanned::Spanned, ItemFn, Ident};
use syn::{
AttributeArgs, Result, Error, NestedMeta, Meta, spanned::Spanned, ItemFn, Ident, FnArg, Path,
Type,
};

pub(crate) enum ConnectorDirection {
Source,
Expand All @@ -25,6 +28,7 @@ impl ConnectorDirection {
pub struct ConnectorFn<'a> {
pub name: &'a Ident,
pub func: &'a ItemFn,
pub config_type_path: &'a Path,
}

impl<'a> ConnectorFn<'a> {
Expand All @@ -39,7 +43,28 @@ impl<'a> ConnectorFn<'a> {
"Connector function must have two input arguments",
));
};
let config_type_path = config_type_path(&func.sig.inputs[0])?;
let name = &func.sig.ident;
Ok(Self { name, func })
Ok(Self {
name,
func,
config_type_path,
})
}
}

fn config_type_path(arg: &FnArg) -> Result<&Path> {
match arg {
FnArg::Receiver(_) => Err(Error::new(
arg.span(),
"config input argument must not be self",
)),
FnArg::Typed(pat_type) => match pat_type.ty.as_ref() {
Type::Path(type_path) => Ok(&type_path.path),
_ => Err(Error::new(
arg.span(),
"config type must valid path of owned type",
)),
},
}
}
Loading

0 comments on commit 13bb3d2

Please sign in to comment.