Skip to content
This repository was archived by the owner on Jul 3, 2023. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 2 additions & 6 deletions crates/pipeline_manager/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ tokio-postgres = "0.7"
async-trait = "0.1"
# Waiting for https://github.com/faokunega/pg-embed/pull/26
pg-embed = { git = "https://github.com/gz/pg-embed.git", rev = "8906af8", optional = true }
once_cell = { version = "1.17.1", optional = true }

[target.'cfg(unix)'.dependencies]
daemonize = { version = "0.4.1" }
Expand All @@ -50,10 +49,7 @@ proptest = "1.0.0"
proptest-derive = "0.3.0"
pretty_assertions = "1.3.0"
# Workaround to enable dev feature during tests: https://github.com/rust-lang/cargo/issues/2911
dbsp_pipeline_manager = { path = ".", features = ["dev"]}
dbsp_pipeline_manager = { path = ".", features = ["pg-embed"]}

[package.metadata.cargo-udeps.ignore]
development = ["dbsp_pipeline_manager"] # false positive from cargo udeps

[features]
dev = ["dep:pg-embed", "dep:once_cell"]
development = ["dbsp_pipeline_manager"] # false positive from cargo udeps
2 changes: 1 addition & 1 deletion crates/pipeline_manager/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,7 @@ impl ManagerConfig {
}

/// Where Postgres embed stores the database.
#[cfg(feature = "dev")]
#[cfg(feature = "pg-embed")]
pub(crate) fn postgres_embed_data_dir(&self) -> PathBuf {
Path::new(&self.working_directory).join("data")
}
Expand Down
56 changes: 38 additions & 18 deletions crates/pipeline_manager/src/db/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use utoipa::ToSchema;
#[cfg(test)]
mod test;

#[cfg(any(test, feature = "dev"))]
#[cfg(any(test, feature = "pg-embed"))]
mod pg_setup;
pub(crate) mod storage;

Expand All @@ -29,14 +29,13 @@ pub(crate) mod storage;
/// time, which determines the position of the project in the queue.
pub(crate) struct ProjectDB {
conn: Client,
// Used in dev mode for having an embedded Postgres DB live through the
// lifetime of the program.
#[cfg(feature = "pg-embed")]
#[allow(dead_code)] // It has to stay alive until ProjectDB is dropped.
pg_inst: Option<pg_embed::postgres::PgEmbed>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just so I understand, how is this more robust to dropping during kill-9 than a shared reference with OneCell?

Copy link
Contributor Author

@gz gz May 2, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we press ctrl+c on the manager in the console running as dev, and pg_inst is in a OnceCell, then drop() is not called, and so the Postgres process remains running (pg-embed does not stop it). So if you restart it after, it complains because it can't bring up the postgres instance again (it's still running the stale instance). So you have to manually go and kill postgres which is a bit annoying when working on the manager. If PgEmbed is in the struct then drop() gets called and Postgres is stopped.

with kill-9 this is still a problem even now: the postgres daemon is not cleaned up and we have to do it manually. But hopefully we won't need to kill-9 the process a lot anyways.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I get that, but what I don't get is:

If OnceCell is dropped the PgEmbed instance should also be dropped right? Same as when we add the PgEmbed instance to the ProjectDB struct? What makes OnceCell not be dropped during sigterm?

Copy link
Contributor Author

@gz gz May 3, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because it's a static:

Static items do not call drop at the end of the program.
https://doc.rust-lang.org/reference/items/static-items.html
https://users.rust-lang.org/t/about-drop-call-of-static-object/50544/4

It has been discussed as expected behavior for OnceCell: matklad/once_cell#98
e.g., link suggest to use atexit functionality with other libraries

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, missed the part about it being static. Thanks.

}

// Used in dev mode for having an embedded Postgres DB live through the lifetime
// of the program.
#[cfg(feature = "dev")]
static PG: once_cell::sync::OnceCell<pg_embed::postgres::PgEmbed> =
once_cell::sync::OnceCell::new();

/// Unique project id.
#[derive(Clone, Copy, Debug, PartialEq, Eq, Ord, PartialOrd, Serialize, Deserialize, ToSchema)]
#[cfg_attr(test, derive(proptest_derive::Arbitrary))]
Expand Down Expand Up @@ -1021,18 +1020,22 @@ impl ProjectDB {
let connection_str = config.database_connection_string();
let initial_sql = &config.initial_sql;

#[cfg(feature = "dev")]
let connection_str = if connection_str.starts_with("postgres-embed") {
#[cfg(feature = "pg-embed")]
if connection_str.starts_with("postgres-embed") {
let database_dir = config.postgres_embed_data_dir();
let pg = pg_setup::install(database_dir, true, Some(8082)).await?;
let connection_string = pg.db_uri.to_string();
let _ = PG.set(pg);
connection_string
} else {
connection_str
let pg_inst = pg_setup::install(database_dir, true, Some(8082)).await?;
let connection_string = pg_inst.db_uri.to_string();
return Self::connect_inner(connection_string.as_str(), initial_sql, Some(pg_inst))
.await;
};

Self::connect_inner(connection_str.as_str(), initial_sql).await
Self::connect_inner(
connection_str.as_str(),
initial_sql,
#[cfg(feature = "pg-embed")]
None,
)
.await
}

/// Connect to the project database.
Expand All @@ -1046,7 +1049,11 @@ impl ProjectDB {
/// - `is_persistent`: Whether the embedded postgres database should be
/// persistent or removed on shutdown.
/// - `port`: The port to use for the embedded Postgres database to run on.
async fn connect_inner(connection_str: &str, initial_sql: &Option<String>) -> AnyResult<Self> {
async fn connect_inner(
connection_str: &str,
initial_sql: &Option<String>,
#[cfg(feature = "pg-embed")] pg_inst: Option<pg_embed::postgres::PgEmbed>,
) -> AnyResult<Self> {
if !connection_str.starts_with("postgres") {
panic!("Unsupported connection string {}", connection_str)
}
Expand Down Expand Up @@ -1112,6 +1119,13 @@ impl ProjectDB {
)
.await?;

client
.execute(
"ALTER TABLE pipeline DROP CONSTRAINT IF EXISTS pipeline_config_id_fkey CASCADE;
",
&[],
)
.await?;
client
.execute(
"
Expand Down Expand Up @@ -1162,7 +1176,13 @@ impl ProjectDB {
}
}

Ok(Self { conn: client })
#[cfg(feature = "pg-embed")]
return Ok(Self {
conn: client,
pg_inst,
});
#[cfg(not(feature = "pg-embed"))]
return Ok(Self { conn: client });
}

/// Attach connector to the config.
Expand Down
15 changes: 7 additions & 8 deletions crates/pipeline_manager/src/db/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ use crate::db::{pg_setup, DBError};
use anyhow::Result as AnyResult;
use async_trait::async_trait;
use chrono::DateTime;
use pg_embed::postgres::PgEmbed;
use pretty_assertions::assert_eq;
use proptest::prelude::*;
use proptest::test_runner::{Config, TestRunner};
Expand All @@ -21,7 +20,6 @@ use tokio::sync::Mutex;

struct DbHandle {
db: ProjectDB,
pg: PgEmbed,
_temp_dir: TempDir,
}

Expand All @@ -31,9 +29,11 @@ impl Drop for DbHandle {
// shutdown postgres). Otherwise postgres log an error that the
// directory is already gone during shutdown which could be
// confusing for a developer.
let _r = async {
self.pg.stop_db().await.unwrap();
};
if let Some(pg) = self.db.pg_inst.as_mut() {
let _r = async {
pg.stop_db().await.unwrap();
};
}
}
}

Expand All @@ -58,14 +58,13 @@ async fn test_setup() -> DbHandle {
let pg = pg_setup::install(temp_path.into(), false, Some(port))
.await
.unwrap();

let conn = ProjectDB::connect_inner(&pg.db_uri, &Some("".to_string()))
let db_uri = pg.db_uri.clone();
let conn = ProjectDB::connect_inner(&db_uri, &Some("".to_string()), Some(pg))
.await
.unwrap();

DbHandle {
db: conn,
pg: pg,
_temp_dir,
}
}
Expand Down
4 changes: 2 additions & 2 deletions scripts/start_manager.sh
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@ pkill -9 dbsp_pipeline_

set -e

cd "${MANAGER_DIR}" && ~/.cargo/bin/cargo build --release
cd "${MANAGER_DIR}" && ~/.cargo/bin/cargo run --release -- \
cd "${MANAGER_DIR}" && ~/.cargo/bin/cargo build --release --features pg-embed
cd "${MANAGER_DIR}" && ~/.cargo/bin/cargo run --release --features pg-embed -- \
--bind-address="${BIND_ADDRESS}" \
--working-directory="${WORKING_DIR_ABS}" \
--sql-compiler-home="${SQL_COMPILER_DIR}" \
Expand Down
47 changes: 28 additions & 19 deletions web-ui/src/connectors/dialogs/CsvFileConnector.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,11 @@ import { Controller, useForm } from 'react-hook-form'
import Transition from './tabs/Transition'
import { ConnectorDescr, ConnectorType, NewConnectorRequest, UpdateConnectorRequest } from 'src/types/manager'
import { ConnectorFormNewRequest, ConnectorFormUpdateRequest } from './SubmitHandler'
import { connectorToFormSchema, connectorTypeToConfig } from 'src/types/connectors'
import { connectorTypeToConfig, parseCsvFileSchema } from 'src/types/connectors'
import { AddConnectorCard } from './AddConnectorCard'
import ConnectorDialogProps from './ConnectorDialogProps'
import { PLACEHOLDER_VALUES } from 'src/utils'
import { useEffect, useState } from 'react'

const schema = yup
.object({
Expand All @@ -39,34 +40,42 @@ const schema = yup
export type CsvFileSchema = yup.InferType<typeof schema>

export const CsvFileConnectorDialog = (props: ConnectorDialogProps) => {
const handleClose = () => {
props.setShow(false)
}
const onFormSubmitted = (connector: ConnectorDescr | undefined) => {
handleClose()
if (connector !== undefined && props.onSuccess !== undefined) {
props.onSuccess(connector)
}
}
const [curValues, setCurValues] = useState<CsvFileSchema | undefined>(undefined)

// Initialize the form either with default or values from the passed in connector
const defaultValues = props.connector
? connectorToFormSchema(props.connector)
: {
name: '',
description: '',
url: '',
has_headers: true
}
useEffect(() => {
if (props.connector) {
setCurValues(parseCsvFileSchema(props.connector))
}
}, [props.connector])

const {
control,
reset,
handleSubmit,
formState: { errors }
} = useForm<CsvFileSchema>({
resolver: yupResolver(schema),
defaultValues
defaultValues: {
name: '',
description: '',
url: '',
has_headers: true
},
values: curValues
})

const handleClose = () => {
reset()
props.setShow(false)
}
const onFormSubmitted = (connector: ConnectorDescr | undefined) => {
handleClose()
if (connector !== undefined && props.onSuccess !== undefined) {
props.onSuccess(connector)
}
}

// Define what should happen when the form is submitted
const genericRequest = (data: CsvFileSchema, connector_id?: number): NewConnectorRequest | UpdateConnectorRequest => {
return {
Expand Down
66 changes: 37 additions & 29 deletions web-ui/src/connectors/dialogs/GenericEditorConnector.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
//
// It just has an editor for the YAML config.

import { useEffect, useState } from 'react'
import Box from '@mui/material/Box'
import Dialog from '@mui/material/Dialog'
import Button from '@mui/material/Button'
Expand All @@ -24,10 +25,10 @@ import { Editor } from '@monaco-editor/react'
import Transition from './tabs/Transition'
import { ConnectorDescr, ConnectorType, NewConnectorRequest, UpdateConnectorRequest } from 'src/types/manager'
import { ConnectorFormNewRequest, ConnectorFormUpdateRequest } from './SubmitHandler'
import { connectorToFormSchema } from 'src/types/connectors'
import { AddConnectorCard } from './AddConnectorCard'
import ConnectorDialogProps from './ConnectorDialogProps'
import { PLACEHOLDER_VALUES } from 'src/utils'
import { parseEditorSchema } from 'src/types/connectors'

const schema = yup
.object({
Expand All @@ -40,46 +41,53 @@ const schema = yup
export type EditorSchema = yup.InferType<typeof schema>

export const ConfigEditorDialog = (props: ConnectorDialogProps) => {
const handleClose = () => {
props.setShow(false)
}
const onFormSubmitted = (connector: ConnectorDescr | undefined) => {
handleClose()
if (connector !== undefined && props.onSuccess !== undefined) {
props.onSuccess(connector)
}
}

const theme = useTheme()
const vscodeTheme = theme.palette.mode === 'dark' ? 'vs-dark' : 'vs'
const [curValues, setCurValues] = useState<EditorSchema | undefined>(undefined)

// Initialize the form either with default or values from the passed in connector
const defaultValues = props.connector
? connectorToFormSchema(props.connector)
: {
name: '',
description: '',
config: YAML.stringify({
transport: {
name: 'transport-name',
config: {
property: 'value'
}
},
format: {
name: 'csv'
}
})
}
useEffect(() => {
if (props.connector) {
setCurValues(parseEditorSchema(props.connector))
}
}, [props.connector])

const {
control,
reset,
handleSubmit,
formState: { errors }
} = useForm<EditorSchema>({
resolver: yupResolver(schema),
defaultValues
defaultValues: {
name: '',
description: '',
config: YAML.stringify({
transport: {
name: 'transport-name',
config: {
property: 'value'
}
},
format: {
name: 'csv'
}
})
},
values: curValues
})

const handleClose = () => {
reset()
props.setShow(false)
}
const onFormSubmitted = (connector: ConnectorDescr | undefined) => {
handleClose()
if (connector !== undefined && props.onSuccess !== undefined) {
props.onSuccess(connector)
}
}

// Define what should happen when the form is submitted
const genericRequest = (data: EditorSchema, connector_id?: number): NewConnectorRequest | UpdateConnectorRequest => {
return {
Expand Down
Loading