diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml index 38cc3e94..8f7c8469 100644 --- a/.github/workflows/main.yml +++ b/.github/workflows/main.yml @@ -114,13 +114,13 @@ jobs: uses: actions-rs/cargo@v1 with: command: test - args: ${{ matrix.cargo_args }} --no-run --all-features --target ${{ matrix.target }} + args: ${{ matrix.cargo_args }} --no-run --all-features --target ${{ matrix.target }} - name: Build tests with no features uses: actions-rs/cargo@v1 with: command: test - args: ${{ matrix.cargo_args }} --no-run --no-default-features --target ${{ matrix.target }} + args: ${{ matrix.cargo_args }} --no-run --no-default-features --target ${{ matrix.target }} - name: Run tests if: runner.os != 'Windows' diff --git a/Cargo.lock b/Cargo.lock index 201c1ed4..16b7f3a9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1807,12 +1807,14 @@ dependencies = [ "clap 4.2.4", "daemonize", "dbsp_adapters", + "dbsp_pipeline_manager", "env_logger", "fs_extra", "futures", "futures-util", "log", "mime", + "once_cell", "pg-embed", "pretty_assertions", "proptest", diff --git a/crates/pipeline_manager/Cargo.toml b/crates/pipeline_manager/Cargo.toml index b8832350..a2f31ae7 100644 --- a/crates/pipeline_manager/Cargo.toml +++ b/crates/pipeline_manager/Cargo.toml @@ -36,7 +36,8 @@ futures = "0.3" tokio-postgres = "0.7" async-trait = "0.1" # Waiting for https://github.com/faokunega/pg-embed/pull/26 -pg-embed = { git = "https://github.com/gz/pg-embed.git", rev = "8906af8" } +pg-embed = { git = "https://github.com/gz/pg-embed.git", rev = "8906af8", optional = true } +once_cell = { version = "1.17.1", optional = true } [target.'cfg(unix)'.dependencies] daemonize = { version = "0.4.1" } @@ -48,3 +49,11 @@ static-files = "0.2.3" proptest = "1.0.0" proptest-derive = "0.3.0" pretty_assertions = "1.3.0" +# Workaround to enable dev feature during tests: https://github.com/rust-lang/cargo/issues/2911 +dbsp_pipeline_manager = { path = ".", features = ["dev"]} + +[package.metadata.cargo-udeps.ignore] +development = ["dbsp_pipeline_manager"] # false positive from cargo udeps + +[features] +dev = ["dep:pg-embed", "dep:once_cell"] \ No newline at end of file diff --git a/crates/pipeline_manager/src/config.rs b/crates/pipeline_manager/src/config.rs index 8a6cbd9b..f258a8d4 100644 --- a/crates/pipeline_manager/src/config.rs +++ b/crates/pipeline_manager/src/config.rs @@ -24,7 +24,7 @@ fn default_sql_compiler_home() -> String { } fn default_db_connection_string() -> String { - "postgres-embed".to_string() + "".to_string() } /// Pipeline manager configuration read from a YAML config file or from command @@ -228,6 +228,7 @@ impl ManagerConfig { } /// Where Postgres embed stores the database. + #[cfg(feature = "dev")] pub(crate) fn postgres_embed_data_dir(&self) -> PathBuf { Path::new(&self.working_directory).join("data") } diff --git a/crates/pipeline_manager/src/db/mod.rs b/crates/pipeline_manager/src/db/mod.rs index e939bf30..d2f62fa3 100644 --- a/crates/pipeline_manager/src/db/mod.rs +++ b/crates/pipeline_manager/src/db/mod.rs @@ -4,7 +4,7 @@ use async_trait::async_trait; use chrono::{DateTime, NaiveDateTime, Utc}; use log::{debug, error}; use serde::{Deserialize, Serialize}; -use std::{error::Error as StdError, fmt, fmt::Display, path::PathBuf}; +use std::{error::Error as StdError, fmt, fmt::Display}; use storage::Storage; use tokio_postgres::{Client, NoTls}; use utoipa::ToSchema; @@ -12,6 +12,7 @@ use utoipa::ToSchema; #[cfg(test)] mod test; +#[cfg(any(test, feature = "dev"))] mod pg_setup; pub(crate) mod storage; @@ -28,10 +29,14 @@ pub(crate) mod storage; /// time, which determines the position of the project in the queue. pub(crate) struct ProjectDB { conn: Client, - #[allow(dead_code)] // We don't have to interact with it, but dropping it stops the DB server. - pub pg: Option, } +// Used in dev mode for having an embedded Postgres DB live through the lifetime +// of the program. +#[cfg(feature = "dev")] +static PG: once_cell::sync::OnceCell = + once_cell::sync::OnceCell::new(); + /// Unique project id. #[derive(Clone, Copy, Debug, PartialEq, Eq, Ord, PartialOrd, Serialize, Deserialize, ToSchema)] #[cfg_attr(test, derive(proptest_derive::Arbitrary))] @@ -1013,21 +1018,21 @@ impl Storage for ProjectDB { impl ProjectDB { pub(crate) async fn connect(config: &ManagerConfig) -> AnyResult { - let connection_str = &config.database_connection_string(); + let connection_str = config.database_connection_string(); let initial_sql = &config.initial_sql; - let database_dir = config.postgres_embed_data_dir(); - - Self::connect_inner( - connection_str, - initial_sql, - // Remaining settings for postgres-embed only: - database_dir, - true, - // Use a non-standard port for postgres-embed so we don't interfere - // with an existing postgres installation - Some(8082), - ) - .await + + #[cfg(feature = "dev")] + let connection_str = if connection_str.starts_with("postgres-embed") { + let database_dir = config.postgres_embed_data_dir(); + let pg = pg_setup::install(database_dir, true, Some(8082)).await?; + let connection_string = pg.db_uri.to_string(); + let _ = PG.set(pg); + connection_string + } else { + connection_str + }; + + Self::connect_inner(connection_str.as_str(), initial_sql).await } /// Connect to the project database. @@ -1041,27 +1046,13 @@ impl ProjectDB { /// - `is_persistent`: Whether the embedded postgres database should be /// persistent or removed on shutdown. /// - `port`: The port to use for the embedded Postgres database to run on. - async fn connect_inner( - connection_str: &str, - initial_sql: &Option, - database_dir: PathBuf, - is_persistent: bool, - port: Option, - ) -> AnyResult { + async fn connect_inner(connection_str: &str, initial_sql: &Option) -> AnyResult { if !connection_str.starts_with("postgres") { panic!("Unsupported connection string {}", connection_str) } - let (pg, connection_str) = if connection_str.starts_with("postgres-embed") { - let pg = pg_setup::install(database_dir, is_persistent, port).await?; - let connection_string = pg.db_uri.to_string(); - (Some(pg), connection_string) - } else { - (None, connection_str.to_string()) - }; - debug!("Opening connection to {:?}", connection_str); - let (client, conn) = tokio_postgres::connect(&connection_str, NoTls).await?; + let (client, conn) = tokio_postgres::connect(connection_str, NoTls).await?; // The `tokio_postgres` API requires allocating a thread to `connection`, // which will handle datbase I/O and should automatically terminate once @@ -1171,7 +1162,7 @@ impl ProjectDB { } } - Ok(Self { conn: client, pg }) + Ok(Self { conn: client }) } /// Attach connector to the config. diff --git a/crates/pipeline_manager/src/db/pg_setup.rs b/crates/pipeline_manager/src/db/pg_setup.rs index 7a629b6b..6c912941 100644 --- a/crates/pipeline_manager/src/db/pg_setup.rs +++ b/crates/pipeline_manager/src/db/pg_setup.rs @@ -9,8 +9,8 @@ use pg_embed::pg_fetch::{PgFetchSettings, PG_V15}; use pg_embed::postgres::{PgEmbed, PgSettings}; use std::path::PathBuf; -/// Install and start an embedded postgres DB instance. This only runs if the -/// manager is started with postgres-embedded. +/// Install and start an embedded postgres DB instance. This only runs if +/// the manager is started with postgres-embedded. /// /// # Arguments /// - `database_dir` - Path to the directory where the database files will be diff --git a/crates/pipeline_manager/src/db/test.rs b/crates/pipeline_manager/src/db/test.rs index 90e62eb8..95cb0a5f 100644 --- a/crates/pipeline_manager/src/db/test.rs +++ b/crates/pipeline_manager/src/db/test.rs @@ -3,10 +3,11 @@ use super::{ storage::Storage, AttachedConnector, ConfigDescr, ConfigId, ConnectorDescr, ConnectorId, ConnectorType, PipelineId, ProjectDB, ProjectDescr, ProjectId, ProjectStatus, Version, }; -use crate::db::DBError; +use crate::db::{pg_setup, DBError}; use anyhow::Result as AnyResult; use async_trait::async_trait; use chrono::DateTime; +use pg_embed::postgres::PgEmbed; use pretty_assertions::assert_eq; use proptest::prelude::*; use proptest::test_runner::{Config, TestRunner}; @@ -20,6 +21,7 @@ use tokio::sync::Mutex; struct DbHandle { db: ProjectDB, + pg: PgEmbed, _temp_dir: TempDir, } @@ -29,11 +31,9 @@ impl Drop for DbHandle { // shutdown postgres). Otherwise postgres log an error that the // directory is already gone during shutdown which could be // confusing for a developer. - if let Some(pg) = self.db.pg.as_mut() { - let _r = async { - pg.stop_db().await.unwrap(); - }; - } + let _r = async { + self.pg.stop_db().await.unwrap(); + }; } } @@ -55,19 +55,17 @@ async fn test_setup() -> DbHandle { .map(|l| l.port()) .unwrap_or(DB_PORT_COUNTER.fetch_add(1, Ordering::Relaxed)) }; + let pg = pg_setup::install(temp_path.into(), false, Some(port)) + .await + .unwrap(); - let conn = ProjectDB::connect_inner( - "postgres-embed", - &Some("".to_string()), - temp_path.into(), - false, - Some(port), - ) - .await - .unwrap(); + let conn = ProjectDB::connect_inner(&pg.db_uri, &Some("".to_string())) + .await + .unwrap(); DbHandle { db: conn, + pg: pg, _temp_dir, } } diff --git a/deploy/Dockerfile b/deploy/Dockerfile index ad8191c1..4f47a2e2 100644 --- a/deploy/Dockerfile +++ b/deploy/Dockerfile @@ -26,13 +26,13 @@ RUN /root/.cargo/bin/cargo chef prepare --recipe-path recipe.json # layer for faster incremental builds of source-code only changes FROM chef AS builder COPY --from=planner /app/recipe.json recipe.json -RUN /root/.cargo/bin/cargo chef cook --release --recipe-path recipe.json --bin=dbsp_pipeline_manager +RUN /root/.cargo/bin/cargo chef cook --release --recipe-path recipe.json --bin=dbsp_pipeline_manager --no-default-features COPY . . RUN rm /app/crates/dbsp/benches/ldbc-graphalytics.rs \ && rm /app/crates/dbsp/benches/gdelt.rs \ && rm /app/crates/nexmark/benches/nexmark.rs \ && rm /app/crates/nexmark/benches/nexmark-gen.rs -RUN /root/.cargo/bin/cargo build --release --bin=dbsp_pipeline_manager +RUN /root/.cargo/bin/cargo build --release --bin=dbsp_pipeline_manager --no-default-features # Java build can be performed in parallel FROM base as javabuild diff --git a/scripts/start_manager.sh b/scripts/start_manager.sh index 2191338e..c00f8959 100755 --- a/scripts/start_manager.sh +++ b/scripts/start_manager.sh @@ -37,4 +37,5 @@ cd "${MANAGER_DIR}" && ~/.cargo/bin/cargo run --release -- \ --sql-compiler-home="${SQL_COMPILER_DIR}" \ --dbsp-override-path="${ROOT_DIR}" \ --static-html=static \ - --unix-daemon + --unix-daemon \ + --db-connection-string="postgres-embed"