Skip to content
This repository was archived by the owner on Jul 3, 2023. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/workflows/main.yml
Original file line number Diff line number Diff line change
Expand Up @@ -114,13 +114,13 @@ jobs:
uses: actions-rs/cargo@v1
with:
command: test
args: ${{ matrix.cargo_args }} --no-run --all-features --target ${{ matrix.target }}
args: ${{ matrix.cargo_args }} --no-run --all-features --target ${{ matrix.target }}

- name: Build tests with no features
uses: actions-rs/cargo@v1
with:
command: test
args: ${{ matrix.cargo_args }} --no-run --no-default-features --target ${{ matrix.target }}
args: ${{ matrix.cargo_args }} --no-run --no-default-features --target ${{ matrix.target }}

- name: Run tests
if: runner.os != 'Windows'
Expand Down
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 10 additions & 1 deletion crates/pipeline_manager/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@ futures = "0.3"
tokio-postgres = "0.7"
async-trait = "0.1"
# Waiting for https://github.com/faokunega/pg-embed/pull/26
pg-embed = { git = "https://github.com/gz/pg-embed.git", rev = "8906af8" }
pg-embed = { git = "https://github.com/gz/pg-embed.git", rev = "8906af8", optional = true }
once_cell = { version = "1.17.1", optional = true }

[target.'cfg(unix)'.dependencies]
daemonize = { version = "0.4.1" }
Expand All @@ -48,3 +49,11 @@ static-files = "0.2.3"
proptest = "1.0.0"
proptest-derive = "0.3.0"
pretty_assertions = "1.3.0"
# Workaround to enable dev feature during tests: https://github.com/rust-lang/cargo/issues/2911
dbsp_pipeline_manager = { path = ".", features = ["dev"]}

[package.metadata.cargo-udeps.ignore]
development = ["dbsp_pipeline_manager"] # false positive from cargo udeps

[features]
dev = ["dep:pg-embed", "dep:once_cell"]
3 changes: 2 additions & 1 deletion crates/pipeline_manager/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ fn default_sql_compiler_home() -> String {
}

fn default_db_connection_string() -> String {
"postgres-embed".to_string()
"".to_string()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be postgres:// now or something similar?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

tokio-postgres doesn't fill things in automatically, so "postgres://" does not work.

}

/// Pipeline manager configuration read from a YAML config file or from command
Expand Down Expand Up @@ -228,6 +228,7 @@ impl ManagerConfig {
}

/// Where Postgres embed stores the database.
#[cfg(feature = "dev")]
pub(crate) fn postgres_embed_data_dir(&self) -> PathBuf {
Path::new(&self.working_directory).join("data")
}
Expand Down
59 changes: 25 additions & 34 deletions crates/pipeline_manager/src/db/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,15 @@ use async_trait::async_trait;
use chrono::{DateTime, NaiveDateTime, Utc};
use log::{debug, error};
use serde::{Deserialize, Serialize};
use std::{error::Error as StdError, fmt, fmt::Display, path::PathBuf};
use std::{error::Error as StdError, fmt, fmt::Display};
use storage::Storage;
use tokio_postgres::{Client, NoTls};
use utoipa::ToSchema;

#[cfg(test)]
mod test;

#[cfg(any(test, feature = "dev"))]
mod pg_setup;
pub(crate) mod storage;

Expand All @@ -28,10 +29,14 @@ pub(crate) mod storage;
/// time, which determines the position of the project in the queue.
pub(crate) struct ProjectDB {
conn: Client,
#[allow(dead_code)] // We don't have to interact with it, but dropping it stops the DB server.
pub pg: Option<pg_embed::postgres::PgEmbed>,
}

// Used in dev mode for having an embedded Postgres DB live through the lifetime
// of the program.
#[cfg(feature = "dev")]
static PG: once_cell::sync::OnceCell<pg_embed::postgres::PgEmbed> =
once_cell::sync::OnceCell::new();

/// Unique project id.
#[derive(Clone, Copy, Debug, PartialEq, Eq, Ord, PartialOrd, Serialize, Deserialize, ToSchema)]
#[cfg_attr(test, derive(proptest_derive::Arbitrary))]
Expand Down Expand Up @@ -1013,21 +1018,21 @@ impl Storage for ProjectDB {

impl ProjectDB {
pub(crate) async fn connect(config: &ManagerConfig) -> AnyResult<Self> {
let connection_str = &config.database_connection_string();
let connection_str = config.database_connection_string();
let initial_sql = &config.initial_sql;
let database_dir = config.postgres_embed_data_dir();

Self::connect_inner(
connection_str,
initial_sql,
// Remaining settings for postgres-embed only:
database_dir,
true,
// Use a non-standard port for postgres-embed so we don't interfere
// with an existing postgres installation
Some(8082),
)
.await

#[cfg(feature = "dev")]
let connection_str = if connection_str.starts_with("postgres-embed") {
let database_dir = config.postgres_embed_data_dir();
let pg = pg_setup::install(database_dir, true, Some(8082)).await?;
let connection_string = pg.db_uri.to_string();
let _ = PG.set(pg);
connection_string
} else {
connection_str
};

Self::connect_inner(connection_str.as_str(), initial_sql).await
}

/// Connect to the project database.
Expand All @@ -1041,27 +1046,13 @@ impl ProjectDB {
/// - `is_persistent`: Whether the embedded postgres database should be
/// persistent or removed on shutdown.
/// - `port`: The port to use for the embedded Postgres database to run on.
async fn connect_inner(
connection_str: &str,
initial_sql: &Option<String>,
database_dir: PathBuf,
is_persistent: bool,
port: Option<u16>,
) -> AnyResult<Self> {
async fn connect_inner(connection_str: &str, initial_sql: &Option<String>) -> AnyResult<Self> {
if !connection_str.starts_with("postgres") {
panic!("Unsupported connection string {}", connection_str)
}

let (pg, connection_str) = if connection_str.starts_with("postgres-embed") {
let pg = pg_setup::install(database_dir, is_persistent, port).await?;
let connection_string = pg.db_uri.to_string();
(Some(pg), connection_string)
} else {
(None, connection_str.to_string())
};

debug!("Opening connection to {:?}", connection_str);
let (client, conn) = tokio_postgres::connect(&connection_str, NoTls).await?;
let (client, conn) = tokio_postgres::connect(connection_str, NoTls).await?;

// The `tokio_postgres` API requires allocating a thread to `connection`,
// which will handle datbase I/O and should automatically terminate once
Expand Down Expand Up @@ -1171,7 +1162,7 @@ impl ProjectDB {
}
}

Ok(Self { conn: client, pg })
Ok(Self { conn: client })
}

/// Attach connector to the config.
Expand Down
4 changes: 2 additions & 2 deletions crates/pipeline_manager/src/db/pg_setup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ use pg_embed::pg_fetch::{PgFetchSettings, PG_V15};
use pg_embed::postgres::{PgEmbed, PgSettings};
use std::path::PathBuf;

/// Install and start an embedded postgres DB instance. This only runs if the
/// manager is started with postgres-embedded.
/// Install and start an embedded postgres DB instance. This only runs if
/// the manager is started with postgres-embedded.
///
/// # Arguments
/// - `database_dir` - Path to the directory where the database files will be
Expand Down
28 changes: 13 additions & 15 deletions crates/pipeline_manager/src/db/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,11 @@ use super::{
storage::Storage, AttachedConnector, ConfigDescr, ConfigId, ConnectorDescr, ConnectorId,
ConnectorType, PipelineId, ProjectDB, ProjectDescr, ProjectId, ProjectStatus, Version,
};
use crate::db::DBError;
use crate::db::{pg_setup, DBError};
Copy link
Contributor

@gz gz Apr 28, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so mod test in mod.rs maybe also needs to be conditionally included with cfg(feature = dev)?

or cargo test --no-default-features might fail?

Copy link
Contributor Author

@lalithsuresh lalithsuresh Apr 28, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've updated the CI workflows to have feature=dev wherever --no-default-features is called. It's better to fail the run than to silently not run the tests.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe it will result in a compilation error if you do cargo test --no-default-features which is a bit awkward. You could update to include the crate::db::pg_setup cfg(feature = "dev") and in addition also for cfg(test)?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

e.g., cfg(any(test, feature = "dev") might do the trick?

use anyhow::Result as AnyResult;
use async_trait::async_trait;
use chrono::DateTime;
use pg_embed::postgres::PgEmbed;
use pretty_assertions::assert_eq;
use proptest::prelude::*;
use proptest::test_runner::{Config, TestRunner};
Expand All @@ -20,6 +21,7 @@ use tokio::sync::Mutex;

struct DbHandle {
db: ProjectDB,
pg: PgEmbed,
_temp_dir: TempDir,
}

Expand All @@ -29,11 +31,9 @@ impl Drop for DbHandle {
// shutdown postgres). Otherwise postgres log an error that the
// directory is already gone during shutdown which could be
// confusing for a developer.
if let Some(pg) = self.db.pg.as_mut() {
let _r = async {
pg.stop_db().await.unwrap();
};
}
let _r = async {
self.pg.stop_db().await.unwrap();
};
}
}

Expand All @@ -55,19 +55,17 @@ async fn test_setup() -> DbHandle {
.map(|l| l.port())
.unwrap_or(DB_PORT_COUNTER.fetch_add(1, Ordering::Relaxed))
};
let pg = pg_setup::install(temp_path.into(), false, Some(port))
.await
.unwrap();

let conn = ProjectDB::connect_inner(
"postgres-embed",
&Some("".to_string()),
temp_path.into(),
false,
Some(port),
)
.await
.unwrap();
let conn = ProjectDB::connect_inner(&pg.db_uri, &Some("".to_string()))
.await
.unwrap();

DbHandle {
db: conn,
pg: pg,
_temp_dir,
}
}
Expand Down
4 changes: 2 additions & 2 deletions deploy/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,13 @@ RUN /root/.cargo/bin/cargo chef prepare --recipe-path recipe.json
# layer for faster incremental builds of source-code only changes
FROM chef AS builder
COPY --from=planner /app/recipe.json recipe.json
RUN /root/.cargo/bin/cargo chef cook --release --recipe-path recipe.json --bin=dbsp_pipeline_manager
RUN /root/.cargo/bin/cargo chef cook --release --recipe-path recipe.json --bin=dbsp_pipeline_manager --no-default-features
COPY . .
RUN rm /app/crates/dbsp/benches/ldbc-graphalytics.rs \
&& rm /app/crates/dbsp/benches/gdelt.rs \
&& rm /app/crates/nexmark/benches/nexmark.rs \
&& rm /app/crates/nexmark/benches/nexmark-gen.rs
RUN /root/.cargo/bin/cargo build --release --bin=dbsp_pipeline_manager
RUN /root/.cargo/bin/cargo build --release --bin=dbsp_pipeline_manager --no-default-features

# Java build can be performed in parallel
FROM base as javabuild
Expand Down
3 changes: 2 additions & 1 deletion scripts/start_manager.sh
Original file line number Diff line number Diff line change
Expand Up @@ -37,4 +37,5 @@ cd "${MANAGER_DIR}" && ~/.cargo/bin/cargo run --release -- \
--sql-compiler-home="${SQL_COMPILER_DIR}" \
--dbsp-override-path="${ROOT_DIR}" \
--static-html=static \
--unix-daemon
--unix-daemon \
--db-connection-string="postgres-embed"