Skip to content

Commit

Permalink
feat(risedev): support starting cluster with sqlite meta backend (#15693
Browse files Browse the repository at this point in the history
) (#16128)
  • Loading branch information
yezizp2012 committed Apr 3, 2024
1 parent dc0ea2d commit 96c76ca
Show file tree
Hide file tree
Showing 6 changed files with 89 additions and 19 deletions.
12 changes: 11 additions & 1 deletion risedev.yml
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ profile:
- use: frontend
- use: compactor

# `dev-compute-node` have the same settings as default except the the compute node will be started by user.
# `dev-compute-node` have the same settings as default except the compute node will be started by user.
dev-compute-node:
steps:
- use: meta-node
Expand Down Expand Up @@ -1043,6 +1043,13 @@ template:
# Other etcd nodes
provide-etcd: "etcd*"

sqlite:
# Id of this instance
id: sqlite

# File name of the sqlite database
file: metadata.db

compute-node:
# Compute-node advertise address
address: "127.0.0.1"
Expand Down Expand Up @@ -1117,6 +1124,9 @@ template:
# Etcd backend config
provide-etcd-backend: "etcd*"

# Sqlite backend config
provide-sqlite-backend: "sqlite*"

# Prometheus nodes used by dashboard service
provide-prometheus: "prometheus*"

Expand Down
1 change: 1 addition & 0 deletions src/risedevtool/src/bin/risedev-compose.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ fn main() -> Result<()> {
volumes.insert(c.id.clone(), ComposeVolume::default());
(c.address.clone(), c.compose(&compose_config)?)
}
ServiceConfig::Sqlite(_) => continue,
ServiceConfig::Prometheus(c) => {
volumes.insert(c.id.clone(), ComposeVolume::default());
(c.address.clone(), c.compose(&compose_config)?)
Expand Down
33 changes: 31 additions & 2 deletions src/risedevtool/src/bin/risedev-dev.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

use std::env;
use std::fmt::Write;
use std::path::Path;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use std::time::{Duration, Instant};

Expand All @@ -27,7 +27,7 @@ use risedev::{
generate_risedev_env, preflight_check, AwsS3Config, CompactorService, ComputeNodeService,
ConfigExpander, ConfigureTmuxTask, EnsureStopService, ExecuteContext, FrontendService,
GrafanaService, KafkaService, MetaNodeService, MinioService, OpendalConfig, PrometheusService,
PubsubService, RedisService, ServiceConfig, Task, TempoService, ZooKeeperService,
PubsubService, RedisService, ServiceConfig, SqliteConfig, Task, TempoService, ZooKeeperService,
RISEDEV_SESSION_NAME,
};
use tempfile::tempdir;
Expand Down Expand Up @@ -101,6 +101,7 @@ fn task_main(
let listen_info = match service {
ServiceConfig::Minio(c) => Some((c.port, c.id.clone())),
ServiceConfig::Etcd(c) => Some((c.port, c.id.clone())),
ServiceConfig::Sqlite(_) => None,
ServiceConfig::Prometheus(c) => Some((c.port, c.id.clone())),
ServiceConfig::ComputeNode(c) => Some((c.port, c.id.clone())),
ServiceConfig::MetaNode(c) => Some((c.port, c.id.clone())),
Expand Down Expand Up @@ -158,6 +159,34 @@ fn task_main(
risedev::ConfigureGrpcNodeTask::new(c.address.clone(), c.port, false)?;
task.execute(&mut ctx)?;
}
ServiceConfig::Sqlite(c) => {
let mut ctx =
ExecuteContext::new(&mut logger, manager.new_progress(), status_dir.clone());

struct SqliteService(SqliteConfig);
impl Task for SqliteService {
fn execute(
&mut self,
_ctx: &mut ExecuteContext<impl std::io::Write>,
) -> anyhow::Result<()> {
Ok(())
}

fn id(&self) -> String {
self.0.id.clone()
}
}

let prefix_data = env::var("PREFIX_DATA")?;
let file_dir = PathBuf::from(&prefix_data).join(&c.id);
std::fs::create_dir_all(&file_dir)?;
let file_path = file_dir.join(&c.file);

ctx.service(&SqliteService(c.clone()));
ctx.complete_spin();
ctx.pb
.set_message(format!("using local sqlite: {:?}", file_path));
}
ServiceConfig::Prometheus(c) => {
let mut ctx =
ExecuteContext::new(&mut logger, manager.new_progress(), status_dir.clone());
Expand Down
1 change: 1 addition & 0 deletions src/risedevtool/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,7 @@ impl ConfigExpander {
let result = match use_type.as_str() {
"minio" => ServiceConfig::Minio(serde_yaml::from_str(&out_str)?),
"etcd" => ServiceConfig::Etcd(serde_yaml::from_str(&out_str)?),
"sqlite" => ServiceConfig::Sqlite(serde_yaml::from_str(&out_str)?),
"frontend" => ServiceConfig::Frontend(serde_yaml::from_str(&out_str)?),
"compactor" => ServiceConfig::Compactor(serde_yaml::from_str(&out_str)?),
"compute-node" => ServiceConfig::ComputeNode(serde_yaml::from_str(&out_str)?),
Expand Down
14 changes: 14 additions & 0 deletions src/risedevtool/src/service_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ pub struct MetaNodeConfig {
pub user_managed: bool,

pub provide_etcd_backend: Option<Vec<EtcdConfig>>,
pub provide_sqlite_backend: Option<Vec<SqliteConfig>>,
pub provide_prometheus: Option<Vec<PrometheusConfig>>,

pub provide_compute_node: Option<Vec<ComputeNodeConfig>>,
Expand Down Expand Up @@ -168,6 +169,17 @@ pub struct EtcdConfig {
pub provide_etcd: Option<Vec<EtcdConfig>>,
}

#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
#[serde(rename_all = "kebab-case")]
#[serde(deny_unknown_fields)]
pub struct SqliteConfig {
#[serde(rename = "use")]
phantom_use: Option<String>,
pub id: String,

pub file: String,
}

#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
#[serde(rename_all = "kebab-case")]
#[serde(deny_unknown_fields)]
Expand Down Expand Up @@ -331,6 +343,7 @@ pub enum ServiceConfig {
Compactor(CompactorConfig),
Minio(MinioConfig),
Etcd(EtcdConfig),
Sqlite(SqliteConfig),
Prometheus(PrometheusConfig),
Grafana(GrafanaConfig),
Tempo(TempoConfig),
Expand All @@ -352,6 +365,7 @@ impl ServiceConfig {
Self::Compactor(c) => &c.id,
Self::Minio(c) => &c.id,
Self::Etcd(c) => &c.id,
Self::Sqlite(c) => &c.id,
Self::Prometheus(c) => &c.id,
Self::Grafana(c) => &c.id,
Self::Tempo(c) => &c.id,
Expand Down
47 changes: 31 additions & 16 deletions src/risedevtool/src/task/meta_node_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
// limitations under the License.

use std::env;
use std::path::Path;
use std::path::{Path, PathBuf};
use std::process::Command;

use anyhow::{anyhow, Result};
Expand Down Expand Up @@ -76,21 +76,36 @@ impl MetaNodeService {
}

let mut is_persistent_meta_store = false;
match config.provide_etcd_backend.as_ref().unwrap().as_slice() {
[] => {
cmd.arg("--backend").arg("mem");
}
etcds => {
is_persistent_meta_store = true;
cmd.arg("--backend")
.arg("etcd")
.arg("--etcd-endpoints")
.arg(
etcds
.iter()
.map(|etcd| format!("{}:{}", etcd.address, etcd.port))
.join(","),
);

if let Some(sqlite_config) = &config.provide_sqlite_backend
&& !sqlite_config.is_empty()
{
is_persistent_meta_store = true;
let prefix_data = env::var("PREFIX_DATA")?;
let file_path = PathBuf::from(&prefix_data)
.join(&sqlite_config[0].id)
.join(&sqlite_config[0].file);
cmd.arg("--backend")
.arg("sql")
.arg("--sql-endpoint")
.arg(format!("sqlite://{}?mode=rwc", file_path.display()));
} else {
match config.provide_etcd_backend.as_ref().unwrap().as_slice() {
[] => {
cmd.arg("--backend").arg("mem");
}
etcds => {
is_persistent_meta_store = true;
cmd.arg("--backend")
.arg("etcd")
.arg("--etcd-endpoints")
.arg(
etcds
.iter()
.map(|etcd| format!("{}:{}", etcd.address, etcd.port))
.join(","),
);
}
}
}

Expand Down

0 comments on commit 96c76ca

Please sign in to comment.