Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Merged by Bors] - feat: support custom configs in connector SDK #2910

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 14 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions connector/json-test-connector/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ futures = { version = "0.3", default-features = false }
anyhow = { workspace = true}
async-std = { version = "1.12", default-features = false, features = ["attributes", "tokio1"]}
tokio = { version = "1.23", default-features = false, features = ["time"]}
serde = { version = "1.0", default-features = false, features = ["derive"]}

fluvio = { path = "../../crates/fluvio/", features = ["smartengine"]}
fluvio-connector-common = { path = "../../crates/fluvio-connector-common/", features = ["derive"] }
Expand Down
12 changes: 10 additions & 2 deletions connector/json-test-connector/Connector.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,16 @@ source = true
image = "infinyon/fluvio-connect-json-test-source:0.1.0"


[custom]
required = ["template", "interval"]

sehz marked this conversation as resolved.
Show resolved Hide resolved
[[params]]
name = "template"
[custom.properties.template]
title = "template"
description = "JSON template"
type = "string"

[custom.properties.interval]
title = "Interval"
description = "Interval of producing value"
type = "integer"

5 changes: 2 additions & 3 deletions connector/json-test-connector/config-example.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,8 @@ version: 0.1.0
name: my-json-test-connector
type: json-test-source
topic: test-topic
parameters:
interval: 10
template: '{"template":"test"}'
interval: 10
template: '{"template":"test"}'
transforms:
- uses: infinyon/jolt@0.1.0
with:
Expand Down
11 changes: 9 additions & 2 deletions connector/json-test-connector/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
mod source;

use fluvio::{TopicProducer, RecordKey};
use fluvio_connector_common::{Source, connector, ConnectorConfig, Result};
use fluvio_connector_common::{Source, connector, Result};
use futures::StreamExt;
use serde::Deserialize;

use crate::source::TestJsonSource;

#[connector(source)]
async fn start(config: ConnectorConfig, producer: TopicProducer) -> Result<()> {
async fn start(config: CustomConfig, producer: TopicProducer) -> Result<()> {
sehz marked this conversation as resolved.
Show resolved Hide resolved
let source = TestJsonSource::new(&config)?;
let mut stream = source.connect(None).await?;
while let Some(item) = stream.next().await {
Expand All @@ -16,3 +17,9 @@ async fn start(config: ConnectorConfig, producer: TopicProducer) -> Result<()> {
}
Ok(())
}

#[derive(Deserialize)]
pub(crate) struct CustomConfig {
pub interval: u64,
pub template: String,
}
19 changes: 7 additions & 12 deletions connector/json-test-connector/src/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,30 +8,25 @@ use anyhow::Result;

use async_trait::async_trait;
use fluvio::Offset;
use fluvio_connector_common::{Source, ConnectorConfig};
use fluvio_connector_common::Source;
use futures::{stream::LocalBoxStream, Stream, StreamExt};

use tokio::time::Interval;

use crate::CustomConfig;

#[derive(Debug)]
pub(crate) struct TestJsonSource {
interval: Interval,
template: String,
}

impl TestJsonSource {
pub(crate) fn new(config: &ConnectorConfig) -> Result<Self> {
let interval = match config.parameters.get("interval") {
Some(value) => value.as_u32()?,
None => anyhow::bail!("interval not found"),
};
let template = match config.parameters.get("template") {
Some(value) => value.as_string()?,
None => anyhow::bail!("template not found"),
};
pub(crate) fn new(config: &CustomConfig) -> Result<Self> {
let CustomConfig { interval, template } = config;
Ok(Self {
interval: tokio::time::interval(Duration::from_secs(interval as u64)),
template,
interval: tokio::time::interval(Duration::from_secs(*interval)),
template: template.clone(),
})
}
}
Expand Down
4 changes: 3 additions & 1 deletion connector/sink-test-connector/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
mod sink;

use fluvio_connector_common::{connector, ConnectorConfig, Result, consumer::ConsumerStream, Sink};
use fluvio_connector_common::{
connector, config::ConnectorConfig, Result, consumer::ConsumerStream, Sink,
};
use futures::SinkExt;
use sink::TestSink;

Expand Down
2 changes: 1 addition & 1 deletion connector/sink-test-connector/src/sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use anyhow::Result;
use async_trait::async_trait;

use fluvio::Offset;
use fluvio_connector_common::{ConnectorConfig, Sink, LocalBoxSink};
use fluvio_connector_common::{config::ConnectorConfig, Sink, LocalBoxSink};

#[derive(Debug)]
pub(crate) struct TestSink {}
Expand Down
13 changes: 5 additions & 8 deletions crates/cdk/src/deploy.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,11 @@
use std::{
fmt::Debug,
path::{PathBuf, Path},
};
use std::{fmt::Debug, path::PathBuf};

use anyhow::Result;
use clap::{Parser, Subcommand};

use cargo_builder::package::PackageInfo;
use fluvio_connector_deployer::{Deployment, DeploymentType};
use fluvio_connector_package::{config::ConnectorConfig, metadata::ConnectorMetadata};
use fluvio_connector_package::metadata::ConnectorMetadata;

use crate::cmd::PackageCmd;

Expand Down Expand Up @@ -47,17 +44,17 @@ impl DeployCmd {
let mut builder = Deployment::builder();
builder
.executable(p.target_bin_path()?)
.config(ConnectorConfig::from_file(self.config())?)
.config(self.config())
.pkg(connector_metadata)
.deployment_type(self.deployment_type.into());
builder.deploy()?;

Ok(())
}

fn config(&self) -> &Path {
fn config(&self) -> PathBuf {
match &self.deployment_type {
DeploymentTypeCmd::Local { config } => config.as_path(),
DeploymentTypeCmd::Local { config } => config.clone(),
}
}
}
Expand Down
1 change: 1 addition & 0 deletions crates/fluvio-connector-common/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ async-net = { version = "1.7", default-features = false }
futures-util = { version = "0.3", features = ["sink"], default-features = false }
serde = { version = "1", features = ["derive", "rc"], default-features = false }
serde_json = { version = "1" }
serde_yaml = { version = "0.8" }

fluvio = { path = "../fluvio/", features = ["smartengine"]}
fluvio-connector-package = { path = "../fluvio-connector-package/" }
Expand Down
16 changes: 16 additions & 0 deletions crates/fluvio-connector-common/src/config.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
pub use fluvio_connector_package::config::ConnectorConfig;

use std::{path::PathBuf, fs::File};

use serde::de::DeserializeOwned;
use anyhow::{Result, Context};
use serde_yaml::Value;

pub fn value_from_file<P: Into<PathBuf>>(path: P) -> Result<Value> {
let file = File::open(path.into())?;
serde_yaml::from_reader(file).context("unable to parse config file into YAML")
}

pub fn from_value<T: DeserializeOwned>(value: Value) -> Result<T> {
serde_yaml::from_value(value).context("unable to parse custom config type from YAML")
}
2 changes: 1 addition & 1 deletion crates/fluvio-connector-common/src/consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use fluvio::{FluvioConfig, Fluvio};
use fluvio::dataplane::record::ConsumerRecord;
use fluvio_sc_schema::errors::ErrorCode;
use futures::StreamExt;
use crate::{ConnectorConfig, Result};
use crate::{config::ConnectorConfig, Result};
use crate::ensure_topic_exists;
use crate::smartmodule::smartmodule_vec_from_config;

Expand Down
5 changes: 2 additions & 3 deletions crates/fluvio-connector-common/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,11 @@ pub mod producer;
pub mod smartmodule;
pub mod monitoring;
pub mod consumer;
pub mod config;

#[cfg(feature = "derive")]
pub use fluvio_connector_derive::connector;

pub use fluvio_connector_package::config::ConnectorConfig;

use fluvio::{Offset, metadata::topic::TopicSpec};
use futures::stream::LocalBoxStream;
use async_trait::async_trait;
Expand Down Expand Up @@ -36,7 +35,7 @@ pub trait Sink<I> {
async fn connect(self, offset: Option<Offset>) -> Result<LocalBoxSink<I>>;
}

pub async fn ensure_topic_exists(config: &ConnectorConfig) -> Result<()> {
pub async fn ensure_topic_exists(config: &config::ConnectorConfig) -> Result<()> {
let admin = fluvio::FluvioAdmin::connect().await?;
let topics = admin
.list::<TopicSpec, String>(vec![config.topic.clone()])
Expand Down
2 changes: 1 addition & 1 deletion crates/fluvio-connector-common/src/producer.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use fluvio::{FluvioConfig, Fluvio, TopicProducer, TopicProducerConfigBuilder};
use crate::{ConnectorConfig, Result};
use crate::{config::ConnectorConfig, Result};

use crate::{ensure_topic_exists, smartmodule::smartmodule_chain_from_config};

Expand Down
2 changes: 1 addition & 1 deletion crates/fluvio-connector-common/src/smartmodule.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use fluvio::{FluvioConfig, SmartModuleInvocation, SmartModuleKind};
use crate::{ConnectorConfig, Result};
use crate::{config::ConnectorConfig, Result};
use fluvio_smartengine::transformation::TransformationConfig;

pub async fn smartmodule_chain_from_config(
Expand Down
2 changes: 0 additions & 2 deletions crates/fluvio-connector-deployer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,4 @@ tracing = { workspace = true }
anyhow = { workspace = true }
derive_builder = { workspace = true }

tempfile = {version = "3.3", default-features = false}

fluvio-connector-package = { path = "../fluvio-connector-package" }
6 changes: 3 additions & 3 deletions crates/fluvio-connector-deployer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ use std::path::PathBuf;
use anyhow::Result;
use derive_builder::Builder;

use fluvio_connector_package::config::ConnectorConfig;
use fluvio_connector_package::metadata::ConnectorMetadata;

#[derive(Clone)]
Expand All @@ -23,7 +22,7 @@ pub struct Deployment {
pub executable: PathBuf, // path to executable
#[builder(default)]
pub secrets: Vec<Secret>, // List of Secrets
pub config: ConnectorConfig, // Configuration to pass along,
pub config: PathBuf, // Configuration to pass along,
pub pkg: ConnectorMetadata, // Connector pkg definition
pub deployment_type: DeploymentType, // deployment type
}
Expand All @@ -37,7 +36,8 @@ impl Deployment {
impl DeploymentBuilder {
pub fn deploy(self) -> Result<()> {
let deployment = self.build()?;
deployment.pkg.validate_config(&deployment.config)?;
let config_file = std::fs::File::open(&deployment.config)?;
deployment.pkg.validate_config(config_file)?;
match deployment.deployment_type {
DeploymentType::Local => local::deploy_local(&deployment)?,
DeploymentType::K8 => {
Expand Down
9 changes: 3 additions & 6 deletions crates/fluvio-connector-deployer/src/local.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,11 @@
use std::process::{Command, Stdio};

use anyhow::{Result, anyhow};
use tempfile::NamedTempFile;
use crate::Deployment;

pub(crate) fn deploy_local(deployment: &Deployment) -> Result<()> {
let (_, config_path) = NamedTempFile::new()?.keep()?;
deployment.config.write_to_file(&config_path)?;

let mut log_path = std::env::current_dir()?;
log_path.push(&deployment.config.name);
log_path.push(&deployment.pkg.package.name);
log_path.set_extension("log");
let log_file = std::fs::File::create(log_path.as_path())?;

Expand All @@ -19,7 +15,8 @@ pub(crate) fn deploy_local(deployment: &Deployment) -> Result<()> {
cmd.stderr(log_file);
cmd.arg("--config");
cmd.arg(
config_path
deployment
.config
.to_str()
.ok_or_else(|| anyhow!("illegal path of temp config file"))?,
);
Expand Down
29 changes: 27 additions & 2 deletions crates/fluvio-connector-derive/src/ast.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
use syn::{AttributeArgs, Result, Error, NestedMeta, Meta, spanned::Spanned, ItemFn, Ident};
use syn::{
AttributeArgs, Result, Error, NestedMeta, Meta, spanned::Spanned, ItemFn, Ident, FnArg, Path,
Type,
};

pub(crate) enum ConnectorDirection {
Source,
Expand All @@ -25,6 +28,7 @@ impl ConnectorDirection {
pub struct ConnectorFn<'a> {
pub name: &'a Ident,
pub func: &'a ItemFn,
pub config_type_path: &'a Path,
}

impl<'a> ConnectorFn<'a> {
Expand All @@ -39,7 +43,28 @@ impl<'a> ConnectorFn<'a> {
"Connector function must have two input arguments",
));
};
let config_type_path = config_type_path(&func.sig.inputs[0])?;
let name = &func.sig.ident;
Ok(Self { name, func })
Ok(Self {
name,
func,
config_type_path,
})
}
}

fn config_type_path(arg: &FnArg) -> Result<&Path> {
match arg {
FnArg::Receiver(_) => Err(Error::new(
arg.span(),
"config input argument must not be self",
)),
FnArg::Typed(pat_type) => match pat_type.ty.as_ref() {
Type::Path(type_path) => Ok(&type_path.path),
_ => Err(Error::new(
arg.span(),
"config type must valid path of owned type",
)),
},
}
}
Loading