Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support schema registry in risedev #17001

Merged
merged 5 commits into from
May 30, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 12 additions & 10 deletions e2e_test/source/basic/schema_registry.slt
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
control substitution on

# wrong strategy name
statement error
create source s1 () with (
connector = 'kafka',
topic = 'upsert_avro_json-record',
properties.bootstrap.server = 'message_queue:29092'
properties.bootstrap.server = '${RISEDEV_KAFKA_BOOTSTRAP_SERVERS}'
) format plain encode avro (
schema.registry = 'http://message_queue:8081',
schema.registry = '${RISEDEV_SCHEMA_REGISTRY_URL}',
schema.registry.name.strategy = 'no sense',
message = 'CPLM.OBJ_ATTRIBUTE_VALUE',
);
Expand All @@ -15,9 +17,9 @@ statement error
create source s1 () with (
connector = 'kafka',
topic = 'upsert_avro_json-record',
properties.bootstrap.server = 'message_queue:29092'
properties.bootstrap.server = '${RISEDEV_KAFKA_BOOTSTRAP_SERVERS}'
) format plain encode avro (
schema.registry = 'http://message_queue:8081',
schema.registry = '${RISEDEV_SCHEMA_REGISTRY_URL}',
schema.registry.name.strategy = 'record_name_strategy',
message = 'CPLM.OBJ_ATTRIBUTE_VALUE',
key.message = 'string'
Expand All @@ -27,9 +29,9 @@ statement ok
create source s1 () with (
connector = 'kafka',
topic = 'upsert_avro_json-record',
properties.bootstrap.server = 'message_queue:29092'
properties.bootstrap.server = '${RISEDEV_KAFKA_BOOTSTRAP_SERVERS}'
) format plain encode avro (
schema.registry = 'http://message_queue:8081',
schema.registry = '${RISEDEV_SCHEMA_REGISTRY_URL}',
schema.registry.name.strategy = 'record_name_strategy',
message = 'CPLM.OBJ_ATTRIBUTE_VALUE',
);
Expand All @@ -39,9 +41,9 @@ statement error
create table t1 () with (
connector = 'kafka',
topic = 'upsert_avro_json-topic-record',
properties.bootstrap.server = 'message_queue:29092'
properties.bootstrap.server = '${RISEDEV_KAFKA_BOOTSTRAP_SERVERS}'
) format upsert encode avro (
schema.registry = 'http://message_queue:8081',
schema.registry = '${RISEDEV_SCHEMA_REGISTRY_URL}',
schema.registry.name.strategy = 'topic_record_name_strategy',
message = 'CPLM.OBJ_ATTRIBUTE_VALUE'
);
Expand All @@ -52,9 +54,9 @@ INCLUDE KEY AS rw_key
with (
connector = 'kafka',
topic = 'upsert_avro_json-topic-record',
properties.bootstrap.server = 'message_queue:29092'
properties.bootstrap.server = '${RISEDEV_KAFKA_BOOTSTRAP_SERVERS}'
) format upsert encode avro (
schema.registry = 'http://message_queue:8081',
schema.registry = '${RISEDEV_SCHEMA_REGISTRY_URL}',
schema.registry.name.strategy = 'topic_record_name_strategy',
message = 'CPLM.OBJ_ATTRIBUTE_VALUE',
key.message = 'string'
Expand Down
26 changes: 23 additions & 3 deletions risedev.yml
Original file line number Diff line number Diff line change
Expand Up @@ -861,6 +861,10 @@ profile:
user-managed: true
address: message_queue
port: 29092
- use: schema-registry
user-managed: true
address: schemaregistry
port: 8081

ci-inline-source-test:
config-path: src/config/ci-recovery.toml
Expand Down Expand Up @@ -1431,9 +1435,8 @@ template:

# Listen port of KRaft controller
controller-port: 29093

# Listen address
listen-address: ${address}
Comment on lines -1435 to -1436
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

listen-address is unused

# Listen port for other services in docker (schema-registry)
docker-port: 29094

# The docker image. Can be overridden to use a different version.
image: "confluentinc/cp-kafka:7.6.1"
Expand All @@ -1446,6 +1449,23 @@ template:

user-managed: false

schema-registry:
# Id to be picked-up by services
id: schema-registry-${port}

# Advertise address
address: "127.0.0.1"

# Listen port of Schema Registry
port: 8081

# The docker image. Can be overridden to use a different version.
image: "confluentinc/cp-schema-registry:7.6.1"

user-managed: false

provide-kafka: "kafka*"

# Google pubsub emulator service
pubsub:
id: pubsub-${port}
Expand Down
7 changes: 4 additions & 3 deletions src/risedevtool/src/bin/risedev-compose.rs
Original file line number Diff line number Diff line change
Expand Up @@ -219,9 +219,10 @@ fn main() -> Result<()> {
volumes.insert(c.id.clone(), ComposeVolume::default());
(c.address.clone(), c.compose(&compose_config)?)
}
ServiceConfig::Redis(_) | ServiceConfig::MySql(_) | ServiceConfig::Postgres(_) => {
return Err(anyhow!("not supported"))
}
ServiceConfig::Redis(_)
| ServiceConfig::MySql(_)
| ServiceConfig::Postgres(_)
| ServiceConfig::SchemaRegistry(_) => return Err(anyhow!("not supported")),
};
compose.container_name = service.id().to_string();
if opts.deploy {
Expand Down
16 changes: 14 additions & 2 deletions src/risedevtool/src/bin/risedev-dev.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ use risedev::{
generate_risedev_env, preflight_check, CompactorService, ComputeNodeService, ConfigExpander,
ConfigureTmuxTask, DummyService, EnsureStopService, ExecuteContext, FrontendService,
GrafanaService, KafkaService, MetaNodeService, MinioService, MySqlService, PostgresService,
PrometheusService, PubsubService, RedisService, ServiceConfig, SqliteConfig, Task,
TempoService, RISEDEV_NAME,
PrometheusService, PubsubService, RedisService, SchemaRegistryService, ServiceConfig,
SqliteConfig, Task, TempoService, RISEDEV_NAME,
};
use tempfile::tempdir;
use thiserror_ext::AsReport;
Expand Down Expand Up @@ -279,6 +279,18 @@ fn task_main(
ctx.pb
.set_message(format!("kafka {}:{}", c.address, c.port));
}
ServiceConfig::SchemaRegistry(c) => {
let mut ctx =
ExecuteContext::new(&mut logger, manager.new_progress(), status_dir.clone());
let mut service = SchemaRegistryService::new(c.clone());
service.execute(&mut ctx)?;
let mut task =
risedev::TcpReadyCheckTask::new(c.address.clone(), c.port, c.user_managed)?;
Comment on lines +287 to +288
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have you checked whether this is accurate?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How can I check?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe perform a GET request and check the code equals to 200. Here's an example

curl -X GET http://localhost:8081/subjects

task.execute(&mut ctx)?;
ctx.pb
.set_message(format!("schema registry http://{}:{}", c.address, c.port));
}

ServiceConfig::Pubsub(c) => {
let mut ctx =
ExecuteContext::new(&mut logger, manager.new_progress(), status_dir.clone());
Expand Down
1 change: 1 addition & 0 deletions src/risedevtool/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,7 @@ impl ConfigExpander {
"redpanda" => ServiceConfig::RedPanda(serde_yaml::from_str(&out_str)?),
"mysql" => ServiceConfig::MySql(serde_yaml::from_str(&out_str)?),
"postgres" => ServiceConfig::Postgres(serde_yaml::from_str(&out_str)?),
"schema-registry" => ServiceConfig::SchemaRegistry(serde_yaml::from_str(&out_str)?),
other => return Err(anyhow!("unsupported use type: {}", other)),
};
Ok(result)
Expand Down
9 changes: 9 additions & 0 deletions src/risedevtool/src/risedev_env.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,15 @@ pub fn generate_risedev_env(services: &Vec<ServiceConfig>) -> String {
writeln!(env, r#"RISEDEV_KAFKA_WITH_OPTIONS_COMMON="connector='kafka',properties.bootstrap.server='{brokers}'""#).unwrap();
writeln!(env, r#"RPK_BROKERS="{brokers}""#).unwrap();
}
ServiceConfig::SchemaRegistry(c) => {
let address = &c.address;
let port = &c.port;
writeln!(
env,
r#"RISEDEV_SCHEMA_REGISTRY_URL="http://{address}:{port}""#,
)
.unwrap();
}
ServiceConfig::MySql(c) => {
let host = &c.address;
let port = &c.port;
Expand Down
37 changes: 34 additions & 3 deletions src/risedevtool/src/service_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -271,19 +271,45 @@ pub struct KafkaConfig {
phantom_use: Option<String>,
pub id: String,

/// Advertise address
pub address: String,
#[serde(with = "string")]
pub port: u16,
/// Port for other services in docker. They need to connect to `host.docker.internal`, while the host
/// need to connect to `localhost`.
pub docker_port: u16,

#[serde(with = "string")]
pub controller_port: u16,
pub listen_address: String,

pub image: String,
pub persist_data: bool,
pub node_id: u32,

pub user_managed: bool,
}

#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
#[serde(rename_all = "kebab-case")]
#[serde(deny_unknown_fields)]
pub struct SchemaRegistryConfig {
#[serde(rename = "use")]
phantom_use: Option<String>,

pub id: String,

pub address: String,
#[serde(with = "string")]
pub port: u16,

pub provide_kafka: Option<Vec<KafkaConfig>>,

pub image: String,
/// Redpanda supports schema registry natively. You can configure a `user_managed` schema registry
/// to use with redpanda.
pub user_managed: bool,
}

#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
#[serde(rename_all = "kebab-case")]
#[serde(deny_unknown_fields)]
Expand Down Expand Up @@ -380,6 +406,7 @@ pub enum ServiceConfig {
Opendal(OpendalConfig),
AwsS3(AwsS3Config),
Kafka(KafkaConfig),
SchemaRegistry(SchemaRegistryConfig),
Pubsub(PubsubConfig),
Redis(RedisConfig),
RedPanda(RedPandaConfig),
Expand Down Expand Up @@ -407,10 +434,12 @@ impl ServiceConfig {
Self::RedPanda(c) => &c.id,
Self::Opendal(c) => &c.id,
Self::MySql(c) => &c.id,
ServiceConfig::Postgres(c) => &c.id,
Self::Postgres(c) => &c.id,
Self::SchemaRegistry(c) => &c.id,
}
}

/// Used to check whether the port is occupied before running the service.
pub fn port(&self) -> Option<u16> {
match self {
Self::ComputeNode(c) => Some(c.port),
Expand All @@ -430,7 +459,8 @@ impl ServiceConfig {
Self::RedPanda(_c) => None,
Self::Opendal(_) => None,
Self::MySql(c) => Some(c.port),
ServiceConfig::Postgres(c) => Some(c.port),
Self::Postgres(c) => Some(c.port),
Self::SchemaRegistry(c) => Some(c.port),
}
}

Expand All @@ -454,6 +484,7 @@ impl ServiceConfig {
Self::Opendal(_c) => false,
Self::MySql(c) => c.user_managed,
Self::Postgres(c) => c.user_managed,
Self::SchemaRegistry(c) => c.user_managed,
}
}
}
Expand Down
2 changes: 2 additions & 0 deletions src/risedevtool/src/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ mod postgres_service;
mod prometheus_service;
mod pubsub_service;
mod redis_service;
mod schema_registry_service;
mod task_configure_minio;
mod task_etcd_ready_check;
mod task_kafka_ready_check;
Expand Down Expand Up @@ -68,6 +69,7 @@ pub use self::postgres_service::*;
pub use self::prometheus_service::*;
pub use self::pubsub_service::*;
pub use self::redis_service::*;
pub use self::schema_registry_service::SchemaRegistryService;
pub use self::task_configure_minio::*;
pub use self::task_etcd_ready_check::*;
pub use self::task_kafka_ready_check::*;
Expand Down
4 changes: 3 additions & 1 deletion src/risedevtool/src/task/docker_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,9 @@ where
cmd.arg("run")
.arg("--rm")
.arg("--name")
.arg(format!("risedev-{}", self.id()));
.arg(format!("risedev-{}", self.id()))
.arg("--add-host")
.arg("host.docker.internal:host-gateway");

for (k, v) in self.config.envs() {
cmd.arg("-e").arg(format!("{k}={v}"));
Expand Down
18 changes: 14 additions & 4 deletions src/risedevtool/src/task/kafka_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,15 +37,18 @@ impl DockerServiceConfig for KafkaConfig {
),
(
"KAFKA_LISTENERS".to_owned(),
"PLAINTEXT://:9092,CONTROLLER://:9093".to_owned(),
"HOST://:9092,CONTROLLER://:9093,DOCKER://:9094".to_owned(),
),
(
"KAFKA_ADVERTISED_LISTENERS".to_owned(),
format!("PLAINTEXT://{}:{}", self.address, self.port),
format!(
"HOST://{}:{},DOCKER://host.docker.internal:{}",
self.address, self.port, self.docker_port
),
),
(
"KAFKA_LISTENER_SECURITY_PROTOCOL_MAP".to_owned(),
"PLAINTEXT:PLAINTEXT,CONTROLLER:PLAINTEXT".to_owned(),
"HOST:PLAINTEXT,CONTROLLER:PLAINTEXT,DOCKER:PLAINTEXT".to_owned(),
),
(
"KAFKA_CONTROLLER_QUORUM_VOTERS".to_owned(),
Expand All @@ -55,12 +58,19 @@ impl DockerServiceConfig for KafkaConfig {
"KAFKA_CONTROLLER_LISTENER_NAMES".to_owned(),
"CONTROLLER".to_owned(),
),
(
"KAFKA_INTER_BROKER_LISTENER_NAME".to_owned(),
"HOST".to_owned(),
),
("CLUSTER_ID".to_owned(), "RiseDevRiseDevRiseDev1".to_owned()),
]
}

fn ports(&self) -> Vec<(String, String)> {
vec![(self.port.to_string(), "9092".to_owned())]
vec![
(self.port.to_string(), "9092".to_owned()),
(self.docker_port.to_string(), "9094".to_owned()),
]
}

fn data_path(&self) -> Option<String> {
Expand Down
Loading
Loading