From 52d959f69d44d5b9969ac0d2e0fa26b545129b28 Mon Sep 17 00:00:00 2001 From: Gerd Zellweger Date: Mon, 1 May 2023 13:46:07 -0700 Subject: [PATCH 1/8] Rework pg-embed feature. Turns out if it's in a once-cell drop doesn't get called if the developer hits ctrl+c which means next time the manager is restarted it won't start again. So we put it back in ProjectDb. Also removed oncecell dependency which means we can simplify feature to just 'pg-embed'. --- Cargo.lock | 1 - crates/pipeline_manager/Cargo.toml | 8 ++---- crates/pipeline_manager/src/config.rs | 2 +- crates/pipeline_manager/src/db/mod.rs | 35 +++++++++++++------------- crates/pipeline_manager/src/db/test.rs | 15 ++++++----- scripts/start_manager.sh | 4 +-- 6 files changed, 30 insertions(+), 35 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 82cc247f..f4444e8f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1836,7 +1836,6 @@ dependencies = [ "futures-util", "log", "mime", - "once_cell", "pg-embed", "pretty_assertions", "proptest", diff --git a/crates/pipeline_manager/Cargo.toml b/crates/pipeline_manager/Cargo.toml index a2f31ae7..0ac7095e 100644 --- a/crates/pipeline_manager/Cargo.toml +++ b/crates/pipeline_manager/Cargo.toml @@ -37,7 +37,6 @@ tokio-postgres = "0.7" async-trait = "0.1" # Waiting for https://github.com/faokunega/pg-embed/pull/26 pg-embed = { git = "https://github.com/gz/pg-embed.git", rev = "8906af8", optional = true } -once_cell = { version = "1.17.1", optional = true } [target.'cfg(unix)'.dependencies] daemonize = { version = "0.4.1" } @@ -50,10 +49,7 @@ proptest = "1.0.0" proptest-derive = "0.3.0" pretty_assertions = "1.3.0" # Workaround to enable dev feature during tests: https://github.com/rust-lang/cargo/issues/2911 -dbsp_pipeline_manager = { path = ".", features = ["dev"]} +dbsp_pipeline_manager = { path = ".", features = ["pg-embed"]} [package.metadata.cargo-udeps.ignore] -development = ["dbsp_pipeline_manager"] # false positive from cargo udeps - -[features] -dev = ["dep:pg-embed", "dep:once_cell"] \ No newline at end of file +development = ["dbsp_pipeline_manager"] # false positive from cargo udeps \ No newline at end of file diff --git a/crates/pipeline_manager/src/config.rs b/crates/pipeline_manager/src/config.rs index f258a8d4..a180e776 100644 --- a/crates/pipeline_manager/src/config.rs +++ b/crates/pipeline_manager/src/config.rs @@ -228,7 +228,7 @@ impl ManagerConfig { } /// Where Postgres embed stores the database. - #[cfg(feature = "dev")] + #[cfg(feature = "pg-embed")] pub(crate) fn postgres_embed_data_dir(&self) -> PathBuf { Path::new(&self.working_directory).join("data") } diff --git a/crates/pipeline_manager/src/db/mod.rs b/crates/pipeline_manager/src/db/mod.rs index d2f62fa3..34879aad 100644 --- a/crates/pipeline_manager/src/db/mod.rs +++ b/crates/pipeline_manager/src/db/mod.rs @@ -12,7 +12,7 @@ use utoipa::ToSchema; #[cfg(test)] mod test; -#[cfg(any(test, feature = "dev"))] +#[cfg(any(test, feature = "pg-embed"))] mod pg_setup; pub(crate) mod storage; @@ -29,13 +29,13 @@ pub(crate) mod storage; /// time, which determines the position of the project in the queue. pub(crate) struct ProjectDB { conn: Client, + // Used in dev mode for having an embedded Postgres DB live through the + // lifetime of the program. + #[cfg(feature = "pg-embed")] + #[allow(dead_code)] // It has to stay alive until ProjectDB is dropped. + pg_inst: Option, } -// Used in dev mode for having an embedded Postgres DB live through the lifetime -// of the program. -#[cfg(feature = "dev")] -static PG: once_cell::sync::OnceCell = - once_cell::sync::OnceCell::new(); /// Unique project id. #[derive(Clone, Copy, Debug, PartialEq, Eq, Ord, PartialOrd, Serialize, Deserialize, ToSchema)] @@ -1021,18 +1021,16 @@ impl ProjectDB { let connection_str = config.database_connection_string(); let initial_sql = &config.initial_sql; - #[cfg(feature = "dev")] - let connection_str = if connection_str.starts_with("postgres-embed") { + #[cfg(feature = "pg-embed")] + if connection_str.starts_with("postgres-embed") { let database_dir = config.postgres_embed_data_dir(); - let pg = pg_setup::install(database_dir, true, Some(8082)).await?; - let connection_string = pg.db_uri.to_string(); - let _ = PG.set(pg); - connection_string - } else { - connection_str + let pg_inst = pg_setup::install(database_dir, true, Some(8082)).await?; + let connection_string = pg_inst.db_uri.to_string(); + return Self::connect_inner(connection_string.as_str(), initial_sql, Some(pg_inst)).await }; - Self::connect_inner(connection_str.as_str(), initial_sql).await + Self::connect_inner(connection_str.as_str(), initial_sql, #[cfg(feature = "pg-embed")] None).await + } /// Connect to the project database. @@ -1046,7 +1044,7 @@ impl ProjectDB { /// - `is_persistent`: Whether the embedded postgres database should be /// persistent or removed on shutdown. /// - `port`: The port to use for the embedded Postgres database to run on. - async fn connect_inner(connection_str: &str, initial_sql: &Option) -> AnyResult { + async fn connect_inner(connection_str: &str, initial_sql: &Option, #[cfg(feature = "pg-embed")] pg_inst: Option) -> AnyResult { if !connection_str.starts_with("postgres") { panic!("Unsupported connection string {}", connection_str) } @@ -1162,7 +1160,10 @@ impl ProjectDB { } } - Ok(Self { conn: client }) + #[cfg(feature = "pg-embed")] + return Ok(Self { conn: client, pg_inst }); + #[cfg(not(feature = "pg-embed"))] + return Ok(Self { conn: client }); } /// Attach connector to the config. diff --git a/crates/pipeline_manager/src/db/test.rs b/crates/pipeline_manager/src/db/test.rs index 95cb0a5f..1d7a9727 100644 --- a/crates/pipeline_manager/src/db/test.rs +++ b/crates/pipeline_manager/src/db/test.rs @@ -7,7 +7,6 @@ use crate::db::{pg_setup, DBError}; use anyhow::Result as AnyResult; use async_trait::async_trait; use chrono::DateTime; -use pg_embed::postgres::PgEmbed; use pretty_assertions::assert_eq; use proptest::prelude::*; use proptest::test_runner::{Config, TestRunner}; @@ -21,7 +20,6 @@ use tokio::sync::Mutex; struct DbHandle { db: ProjectDB, - pg: PgEmbed, _temp_dir: TempDir, } @@ -31,9 +29,11 @@ impl Drop for DbHandle { // shutdown postgres). Otherwise postgres log an error that the // directory is already gone during shutdown which could be // confusing for a developer. - let _r = async { - self.pg.stop_db().await.unwrap(); - }; + if let Some(pg) = self.db.pg_inst.as_mut() { + let _r = async { + pg.stop_db().await.unwrap(); + }; + } } } @@ -58,14 +58,13 @@ async fn test_setup() -> DbHandle { let pg = pg_setup::install(temp_path.into(), false, Some(port)) .await .unwrap(); - - let conn = ProjectDB::connect_inner(&pg.db_uri, &Some("".to_string())) + let db_uri = pg.db_uri.clone(); + let conn = ProjectDB::connect_inner(&db_uri, &Some("".to_string()), Some(pg)) .await .unwrap(); DbHandle { db: conn, - pg: pg, _temp_dir, } } diff --git a/scripts/start_manager.sh b/scripts/start_manager.sh index c00f8959..266520f0 100755 --- a/scripts/start_manager.sh +++ b/scripts/start_manager.sh @@ -30,8 +30,8 @@ pkill -9 dbsp_pipeline_ set -e -cd "${MANAGER_DIR}" && ~/.cargo/bin/cargo build --release -cd "${MANAGER_DIR}" && ~/.cargo/bin/cargo run --release -- \ +cd "${MANAGER_DIR}" && ~/.cargo/bin/cargo build --release --features pg-embed +cd "${MANAGER_DIR}" && ~/.cargo/bin/cargo run --release --features pg-embed -- \ --bind-address="${BIND_ADDRESS}" \ --working-directory="${WORKING_DIR_ABS}" \ --sql-compiler-home="${SQL_COMPILER_DIR}" \ From 1ba6712b337fce571703077a889b6ff7a98997d3 Mon Sep 17 00:00:00 2001 From: Gerd Zellweger Date: Mon, 1 May 2023 13:47:29 -0700 Subject: [PATCH 2/8] Re-create the constraint if we restart to ensure we can stop/start the manager. --- crates/pipeline_manager/src/db/mod.rs | 31 +++++++++++++++++++++------ 1 file changed, 25 insertions(+), 6 deletions(-) diff --git a/crates/pipeline_manager/src/db/mod.rs b/crates/pipeline_manager/src/db/mod.rs index 34879aad..2b095d60 100644 --- a/crates/pipeline_manager/src/db/mod.rs +++ b/crates/pipeline_manager/src/db/mod.rs @@ -36,7 +36,6 @@ pub(crate) struct ProjectDB { pg_inst: Option, } - /// Unique project id. #[derive(Clone, Copy, Debug, PartialEq, Eq, Ord, PartialOrd, Serialize, Deserialize, ToSchema)] #[cfg_attr(test, derive(proptest_derive::Arbitrary))] @@ -1026,11 +1025,17 @@ impl ProjectDB { let database_dir = config.postgres_embed_data_dir(); let pg_inst = pg_setup::install(database_dir, true, Some(8082)).await?; let connection_string = pg_inst.db_uri.to_string(); - return Self::connect_inner(connection_string.as_str(), initial_sql, Some(pg_inst)).await + return Self::connect_inner(connection_string.as_str(), initial_sql, Some(pg_inst)) + .await; }; - Self::connect_inner(connection_str.as_str(), initial_sql, #[cfg(feature = "pg-embed")] None).await - + Self::connect_inner( + connection_str.as_str(), + initial_sql, + #[cfg(feature = "pg-embed")] + None, + ) + .await } /// Connect to the project database. @@ -1044,7 +1049,11 @@ impl ProjectDB { /// - `is_persistent`: Whether the embedded postgres database should be /// persistent or removed on shutdown. /// - `port`: The port to use for the embedded Postgres database to run on. - async fn connect_inner(connection_str: &str, initial_sql: &Option, #[cfg(feature = "pg-embed")] pg_inst: Option) -> AnyResult { + async fn connect_inner( + connection_str: &str, + initial_sql: &Option, + #[cfg(feature = "pg-embed")] pg_inst: Option, + ) -> AnyResult { if !connection_str.starts_with("postgres") { panic!("Unsupported connection string {}", connection_str) } @@ -1110,6 +1119,13 @@ impl ProjectDB { ) .await?; + client + .execute( + "ALTER TABLE pipeline DROP CONSTRAINT IF EXISTS pipeline_config_id_fkey CASCADE; + ", + &[], + ) + .await?; client .execute( " @@ -1161,7 +1177,10 @@ impl ProjectDB { } #[cfg(feature = "pg-embed")] - return Ok(Self { conn: client, pg_inst }); + return Ok(Self { + conn: client, + pg_inst, + }); #[cfg(not(feature = "pg-embed"))] return Ok(Self { conn: client }); } From e9bf1a8ed7b6f4547957ec697d575fa2c4e45a95 Mon Sep 17 00:00:00 2001 From: Gerd Zellweger Date: Mon, 1 May 2023 17:28:28 -0700 Subject: [PATCH 3/8] Do not reload server state while saving. If we do it anyways we'll potentially overwrite the changes done by the user if they happen in parallel. --- web-ui/src/pages/streaming/builder/index.tsx | 1 + 1 file changed, 1 insertion(+) diff --git a/web-ui/src/pages/streaming/builder/index.tsx b/web-ui/src/pages/streaming/builder/index.tsx index 46891153..2f94f5aa 100644 --- a/web-ui/src/pages/streaming/builder/index.tsx +++ b/web-ui/src/pages/streaming/builder/index.tsx @@ -94,6 +94,7 @@ export const PipelineWithProvider = (props: { }) useEffect(() => { if ( + saveState !== 'isSaving' && saveState !== 'isModified' && saveState !== 'isDebouncing' && !configQuery.isLoading && !configQuery.isError && !projects.isLoading && From b0e1b426b390f329bb521caf9332b9fa03e511ad Mon Sep 17 00:00:00 2001 From: Gerd Zellweger Date: Mon, 1 May 2023 22:14:20 -0700 Subject: [PATCH 4/8] Fix regression for connector updates. We reset the form after a connector is edited/added and switch back to the initial tab. We also update the form with the current connector for edit by using useEffect which guarantees it will re-render with the correct form values. --- .../connectors/dialogs/CsvFileConnector.tsx | 47 ++++++---- .../dialogs/GenericEditorConnector.tsx | 66 ++++++++------ .../dialogs/KafkaInputConnector.tsx | 50 ++++++---- .../dialogs/KafkaOutputConnector.tsx | 50 ++++++---- .../src/connectors/dialogs/SubmitHandler.ts | 3 +- web-ui/src/types/connectors.tsx | 91 ++++++++++--------- 6 files changed, 175 insertions(+), 132 deletions(-) diff --git a/web-ui/src/connectors/dialogs/CsvFileConnector.tsx b/web-ui/src/connectors/dialogs/CsvFileConnector.tsx index c63d747f..908e63b3 100644 --- a/web-ui/src/connectors/dialogs/CsvFileConnector.tsx +++ b/web-ui/src/connectors/dialogs/CsvFileConnector.tsx @@ -22,10 +22,11 @@ import { Controller, useForm } from 'react-hook-form' import Transition from './tabs/Transition' import { ConnectorDescr, ConnectorType, NewConnectorRequest, UpdateConnectorRequest } from 'src/types/manager' import { ConnectorFormNewRequest, ConnectorFormUpdateRequest } from './SubmitHandler' -import { connectorToFormSchema, connectorTypeToConfig } from 'src/types/connectors' +import { connectorTypeToConfig, parseCsvFileSchema } from 'src/types/connectors' import { AddConnectorCard } from './AddConnectorCard' import ConnectorDialogProps from './ConnectorDialogProps' import { PLACEHOLDER_VALUES } from 'src/utils' +import { useEffect, useState } from 'react' const schema = yup .object({ @@ -39,34 +40,42 @@ const schema = yup export type CsvFileSchema = yup.InferType export const CsvFileConnectorDialog = (props: ConnectorDialogProps) => { - const handleClose = () => { - props.setShow(false) - } - const onFormSubmitted = (connector: ConnectorDescr | undefined) => { - handleClose() - if (connector !== undefined && props.onSuccess !== undefined) { - props.onSuccess(connector) - } - } + const [curValues, setCurValues] = useState(undefined) // Initialize the form either with default or values from the passed in connector - const defaultValues = props.connector - ? connectorToFormSchema(props.connector) - : { - name: '', - description: '', - url: '', - has_headers: true - } + useEffect(() => { + if (props.connector) { + setCurValues(parseCsvFileSchema(props.connector)) + } + }, [props.connector]) + const { control, + reset, handleSubmit, formState: { errors } } = useForm({ resolver: yupResolver(schema), - defaultValues + defaultValues: { + name: '', + description: '', + url: '', + has_headers: true + }, + values: curValues }) + const handleClose = () => { + reset() + props.setShow(false) + } + const onFormSubmitted = (connector: ConnectorDescr | undefined) => { + handleClose() + if (connector !== undefined && props.onSuccess !== undefined) { + props.onSuccess(connector) + } + } + // Define what should happen when the form is submitted const genericRequest = (data: CsvFileSchema, connector_id?: number): NewConnectorRequest | UpdateConnectorRequest => { return { diff --git a/web-ui/src/connectors/dialogs/GenericEditorConnector.tsx b/web-ui/src/connectors/dialogs/GenericEditorConnector.tsx index 3135b72f..056523d1 100644 --- a/web-ui/src/connectors/dialogs/GenericEditorConnector.tsx +++ b/web-ui/src/connectors/dialogs/GenericEditorConnector.tsx @@ -2,6 +2,7 @@ // // It just has an editor for the YAML config. +import { useEffect, useState } from 'react' import Box from '@mui/material/Box' import Dialog from '@mui/material/Dialog' import Button from '@mui/material/Button' @@ -24,10 +25,10 @@ import { Editor } from '@monaco-editor/react' import Transition from './tabs/Transition' import { ConnectorDescr, ConnectorType, NewConnectorRequest, UpdateConnectorRequest } from 'src/types/manager' import { ConnectorFormNewRequest, ConnectorFormUpdateRequest } from './SubmitHandler' -import { connectorToFormSchema } from 'src/types/connectors' import { AddConnectorCard } from './AddConnectorCard' import ConnectorDialogProps from './ConnectorDialogProps' import { PLACEHOLDER_VALUES } from 'src/utils' +import { parseEditorSchema } from 'src/types/connectors' const schema = yup .object({ @@ -40,46 +41,53 @@ const schema = yup export type EditorSchema = yup.InferType export const ConfigEditorDialog = (props: ConnectorDialogProps) => { - const handleClose = () => { - props.setShow(false) - } - const onFormSubmitted = (connector: ConnectorDescr | undefined) => { - handleClose() - if (connector !== undefined && props.onSuccess !== undefined) { - props.onSuccess(connector) - } - } - const theme = useTheme() const vscodeTheme = theme.palette.mode === 'dark' ? 'vs-dark' : 'vs' + const [curValues, setCurValues] = useState(undefined) // Initialize the form either with default or values from the passed in connector - const defaultValues = props.connector - ? connectorToFormSchema(props.connector) - : { - name: '', - description: '', - config: YAML.stringify({ - transport: { - name: 'transport-name', - config: { - property: 'value' - } - }, - format: { - name: 'csv' - } - }) - } + useEffect(() => { + if (props.connector) { + setCurValues(parseEditorSchema(props.connector)) + } + }, [props.connector]) + const { control, + reset, handleSubmit, formState: { errors } } = useForm({ resolver: yupResolver(schema), - defaultValues + defaultValues: { + name: '', + description: '', + config: YAML.stringify({ + transport: { + name: 'transport-name', + config: { + property: 'value' + } + }, + format: { + name: 'csv' + } + }) + }, + values: curValues }) + const handleClose = () => { + reset() + props.setShow(false) + } + const onFormSubmitted = (connector: ConnectorDescr | undefined) => { + handleClose() + if (connector !== undefined && props.onSuccess !== undefined) { + props.onSuccess(connector) + } + } + // Define what should happen when the form is submitted const genericRequest = (data: EditorSchema, connector_id?: number): NewConnectorRequest | UpdateConnectorRequest => { return { diff --git a/web-ui/src/connectors/dialogs/KafkaInputConnector.tsx b/web-ui/src/connectors/dialogs/KafkaInputConnector.tsx index 7d5387a4..a92fd381 100644 --- a/web-ui/src/connectors/dialogs/KafkaInputConnector.tsx +++ b/web-ui/src/connectors/dialogs/KafkaInputConnector.tsx @@ -23,7 +23,7 @@ import TabLabel from 'src/connectors/dialogs/tabs/TabLabel' import { ConnectorDescr, ConnectorType, NewConnectorRequest, UpdateConnectorRequest } from 'src/types/manager' import Transition from './tabs/Transition' import { ConnectorFormUpdateRequest, ConnectorFormNewRequest } from './SubmitHandler' -import { connectorTypeToConfig, connectorToFormSchema } from 'src/types/connectors' +import { connectorTypeToConfig, parseKafkaInputSchema } from 'src/types/connectors' import { AddConnectorCard } from './AddConnectorCard' import ConnectorDialogProps from './ConnectorDialogProps' @@ -39,35 +39,45 @@ export type KafkaInputSchema = yup.InferType export const KafkaInputConnectorDialog = (props: ConnectorDialogProps) => { const [activeTab, setActiveTab] = useState('detailsTab') - const handleClose = () => { - props.setShow(false) - } - const onFormSubmitted = (connector: ConnectorDescr | undefined) => { - handleClose() - if (connector !== undefined && props.onSuccess !== undefined) { - props.onSuccess(connector) - } - } + const [curValues, setCurValues] = useState(undefined) // Initialize the form either with default or values from the passed in connector - const defaultValues = props.connector - ? connectorToFormSchema(props.connector) - : { - name: '', - description: '', - host: '', - auto_offset: 'earliest', - topics: [] - } + useEffect(() => { + if (props.connector) { + setCurValues(parseKafkaInputSchema(props.connector)) + } + }, [props.connector]) + const { control, + reset, handleSubmit, formState: { errors } } = useForm({ resolver: yupResolver(schema), - defaultValues + defaultValues: { + name: '', + description: '', + host: '', + auto_offset: 'earliest', + topics: [] + }, + values: curValues }) + const handleClose = () => { + reset() + setActiveTab('detailsTab') + props.setShow(false) + } + + const onFormSubmitted = (connector: ConnectorDescr | undefined) => { + handleClose() + if (connector !== undefined && props.onSuccess !== undefined) { + props.onSuccess(connector) + } + } + // Define what should happen when the form is submitted const genericRequest = ( data: KafkaInputSchema, diff --git a/web-ui/src/connectors/dialogs/KafkaOutputConnector.tsx b/web-ui/src/connectors/dialogs/KafkaOutputConnector.tsx index fa864198..431b4fb8 100644 --- a/web-ui/src/connectors/dialogs/KafkaOutputConnector.tsx +++ b/web-ui/src/connectors/dialogs/KafkaOutputConnector.tsx @@ -22,7 +22,7 @@ import TabLabel from 'src/connectors/dialogs/tabs/TabLabel' import { ConnectorDescr, ConnectorType, NewConnectorRequest, UpdateConnectorRequest } from 'src/types/manager' import Transition from './tabs/Transition' import { ConnectorFormNewRequest, ConnectorFormUpdateRequest } from './SubmitHandler' -import { connectorToFormSchema, connectorTypeToConfig } from 'src/types/connectors' +import { connectorTypeToConfig, parseKafkaOutputSchema } from 'src/types/connectors' import TabkafkaOutputDetails from './tabs/TabKafkaOutputDetails' import { AddConnectorCard } from './AddConnectorCard' import ConnectorDialogProps from './ConnectorDialogProps' @@ -41,9 +41,38 @@ export type KafkaOutputSchema = yup.InferType export const KafkaOutputConnectorDialog = (props: ConnectorDialogProps) => { const [activeTab, setActiveTab] = useState('detailsTab') + const [curValues, setCurValues] = useState(undefined) + + // Initialize the form either with values from the passed in connector + useEffect(() => { + if (props.connector) { + setCurValues(parseKafkaOutputSchema(props.connector)) + } + }, [props.connector]) + + const { + control, + handleSubmit, + reset, + formState: { errors } + } = useForm({ + resolver: yupResolver(schema), + defaultValues: { + name: '', + description: '', + host: '', + auto_offset: 'earliest', + topic: '' + }, + values: curValues + }) + const handleClose = () => { + reset() + setActiveTab('detailsTab') props.setShow(false) } + const onFormSubmitted = (connector: ConnectorDescr | undefined) => { handleClose() console.log('onFormSubmitted', connector) @@ -52,25 +81,6 @@ export const KafkaOutputConnectorDialog = (props: ConnectorDialogProps) => { } } - // Initialize the form either with default or values from the passed in connector - const defaultValues = props.connector - ? connectorToFormSchema(props.connector) - : { - name: '', - description: '', - host: '', - auto_offset: 'earliest', - topic: '' - } - const { - control, - handleSubmit, - formState: { errors } - } = useForm({ - resolver: yupResolver(schema), - defaultValues - }) - // Define what should happen when the form is submitted const genericRequest = ( data: KafkaOutputSchema, diff --git a/web-ui/src/connectors/dialogs/SubmitHandler.ts b/web-ui/src/connectors/dialogs/SubmitHandler.ts index 6060481a..f94db53c 100644 --- a/web-ui/src/connectors/dialogs/SubmitHandler.ts +++ b/web-ui/src/connectors/dialogs/SubmitHandler.ts @@ -98,7 +98,8 @@ export const ConnectorFormUpdateRequest = ( name: source_desc.name, config: data.config, description: data.description, - direction: connectorTypeToDirection(data.typ), + // @ts-ignore + direction: connectorTypeToDirection(source_desc.typ), typ: data.typ }) }, diff --git a/web-ui/src/types/connectors.tsx b/web-ui/src/types/connectors.tsx index 53f3b4dd..d3bdb73f 100644 --- a/web-ui/src/types/connectors.tsx +++ b/web-ui/src/types/connectors.tsx @@ -1,6 +1,7 @@ import { Dispatch, SetStateAction } from 'react' import { match, P } from 'ts-pattern' import YAML from 'yaml' +import assert from 'assert' import { Direction, ConnectorDescr } from './manager' import { @@ -60,50 +61,54 @@ export const ConnectorDialog = (props: { }) .exhaustive() -// Given an existing ConnectorDescr return an object with the right values for -// the dialog form. -// -// e.g., The ConnectorType.FILE will return an object corresponding to the yup -// schema defined in CsvFileConnector.tsx. -export const connectorToFormSchema = ( - connector: ConnectorDescr -): KafkaInputSchema | KafkaOutputSchema | CsvFileSchema | EditorSchema => { +// Given an existing ConnectorDescr return the KafkaInputSchema +// if connector is of type KAFKA_IN. +export const parseKafkaInputSchema = (connector: ConnectorDescr): KafkaInputSchema => { + assert(connectorDescrToType(connector) === ConnectorType.KAFKA_IN) const config = YAML.parse(connector.config) - return match(connectorDescrToType(connector)) - .with(ConnectorType.KAFKA_IN, () => { - return { - name: connector.name, - description: connector.description, - host: config.transport.config['bootstrap.servers'], - auto_offset: config.transport.config['auto.offset.reset'], - topics: config.transport.config.topics - } as KafkaInputSchema - }) - .with(ConnectorType.KAFKA_OUT, () => { - return { - name: connector.name, - description: connector.description, - host: config.transport.config['bootstrap.servers'], - auto_offset: config.transport.config['auto.offset.reset'], - topic: config.transport.config.topic - } as KafkaOutputSchema - }) - .with(ConnectorType.FILE, () => { - return { - name: connector.name, - description: connector.description, - url: config.transport.config.path, - has_headers: true // TODO: this isn't represented by the connector - } as CsvFileSchema - }) - .with(ConnectorType.UNKNOWN, () => { - return { - name: connector.name, - description: connector.description, - config: connector.config - } as EditorSchema - }) - .exhaustive() + return { + name: connector.name, + description: connector.description, + host: config.transport.config['bootstrap.servers'], + auto_offset: config.transport.config['auto.offset.reset'], + topics: config.transport.config.topics + } +} + +// Given an existing ConnectorDescr return the KafkaOutputSchema +// if connector is of type KAFKA_OUT. +export const parseKafkaOutputSchema = (connector: ConnectorDescr): KafkaOutputSchema => { + assert(connectorDescrToType(connector) === ConnectorType.KAFKA_OUT) + const config = YAML.parse(connector.config) + return { + name: connector.name, + description: connector.description, + host: config.transport.config['bootstrap.servers'], + auto_offset: config.transport.config['auto.offset.reset'], + topic: config.transport.config.topic + } +} + +// Given an existing ConnectorDescr return the CsvFileSchema +// if connector is of type FILE. +export const parseCsvFileSchema = (connector: ConnectorDescr): CsvFileSchema => { + assert(connectorDescrToType(connector) === ConnectorType.FILE) + const config = YAML.parse(connector.config) + return { + name: connector.name, + description: connector.description, + url: config.transport.config.path, + has_headers: true // TODO: this isn't represented by the connector + } +} + +// Given an existing ConnectorDescr return EditorSchema for it. +export const parseEditorSchema = (connector: ConnectorDescr): EditorSchema => { + return { + name: connector.name, + description: connector.description, + config: connector.config + } } // Given a ConnectorType determine for what it can be used, inputs, outputs or From 2762fafe4f9fb6ca6b8435832fdb145f08d9bdba Mon Sep 17 00:00:00 2001 From: Gerd Zellweger Date: Mon, 1 May 2023 22:15:20 -0700 Subject: [PATCH 5/8] Fix regression for pipeline stats. pipeline stats werent displyaed anymore because endpoint_name semantics has changed. --- web-ui/src/streaming/PipelineTable.tsx | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/web-ui/src/streaming/PipelineTable.tsx b/web-ui/src/streaming/PipelineTable.tsx index 43a40cba..4b7bf38b 100644 --- a/web-ui/src/streaming/PipelineTable.tsx +++ b/web-ui/src/streaming/PipelineTable.tsx @@ -100,13 +100,15 @@ const DetailPanelContent = (props: { row: ConfigDescr }) => { const newInputMetrics = new Map() pipelineStatusQuery.data['inputs'].forEach((cs: ConnectorStatus) => { - newInputMetrics.set(cs.endpoint_name, cs.metrics as InputConnectorMetrics) + // @ts-ignore (config is untyped needs backend fix) + newInputMetrics.set(cs.config['stream'], cs.metrics as InputConnectorMetrics) }) setInputMetrics(newInputMetrics) const newOutputMetrics = new Map() pipelineStatusQuery.data['outputs'].forEach((cs: ConnectorStatus) => { - newOutputMetrics.set(cs.endpoint_name, cs.metrics as OutputConnectorMetrics) + // @ts-ignore (config is untyped needs backend fix) + newOutputMetrics.set(cs.config['stream'], cs.metrics as OutputConnectorMetrics) }) setOutputMetrics(newOutputMetrics) } @@ -181,8 +183,12 @@ const DetailPanelContent = (props: { row: ConfigDescr }) => { field: 'records', headerName: 'Records', flex: 0.15, - renderCell: params => - format('.1s')(inputMetrics?.get(params.row.ac.config.trim())?.total_records || 0) + renderCell: params => { + console.log(params.row.ac.config.trim()) + console.log(inputMetrics) + + return format('.1s')(inputMetrics?.get(params.row.ac.config.trim())?.total_records || 0) + } }, { field: 'traffic', From 294b922fd90ed9d67cee709e1000c1820d4cb6b6 Mon Sep 17 00:00:00 2001 From: Gerd Zellweger Date: Mon, 1 May 2023 22:15:57 -0700 Subject: [PATCH 6/8] Fix regression when saving pipeline config. Disallow backend fetches to override local state that's been modified. --- web-ui/src/pages/streaming/builder/index.tsx | 136 ++++++++++--------- 1 file changed, 69 insertions(+), 67 deletions(-) diff --git a/web-ui/src/pages/streaming/builder/index.tsx b/web-ui/src/pages/streaming/builder/index.tsx index 2f94f5aa..b886757d 100644 --- a/web-ui/src/pages/streaming/builder/index.tsx +++ b/web-ui/src/pages/streaming/builder/index.tsx @@ -93,81 +93,82 @@ export const PipelineWithProvider = (props: { configId !== undefined && saveState !== 'isSaving' && saveState !== 'isModified' && saveState !== 'isDebouncing' }) useEffect(() => { - if ( - saveState !== 'isSaving' && saveState !== 'isModified' && saveState !== 'isDebouncing' && - !configQuery.isLoading && - !configQuery.isError && - !projects.isLoading && - !projects.isError && - !connectorQuery.isLoading && - !connectorQuery.isError - ) { - setConfigId(() => configQuery.data.config_id) - setName(configQuery.data.name) - setDescription(configQuery.data.description) - setConfig(configQuery.data.config) - setSaveState('isUpToDate') + if (saveState !== 'isSaving' && saveState !== 'isModified' && saveState !== 'isDebouncing') { + if ( + !configQuery.isLoading && + !configQuery.isError && + !projects.isLoading && + !projects.isError && + !connectorQuery.isLoading && + !connectorQuery.isError + ) { + setConfigId(() => configQuery.data.config_id) + setName(configQuery.data.name) + setDescription(configQuery.data.description) + setConfig(configQuery.data.config) + setSaveState('isUpToDate') - const attachedConnectors = configQuery.data.attached_connectors - let invalidConnections: AttachedConnector[] = [] - let validConnections: AttachedConnector[] = attachedConnectors - console.log(attachedConnectors) + const attachedConnectors = configQuery.data.attached_connectors + let invalidConnections: AttachedConnector[] = [] + let validConnections: AttachedConnector[] = attachedConnectors + console.log(attachedConnectors) - // We don't set so `setSaveState` here because we don't want to override - // the saveState every time the backend returns some result. Because it - // could cancel potentially in-progress saves (started by client action). + // We don't set so `setSaveState` here because we don't want to override + // the saveState every time the backend returns some result. Because it + // could cancel potentially in-progress saves (started by client action). - if (configQuery.data.project_id) { - const foundProject = projects.data.find(p => p.project_id === configQuery.data.project_id) - if (foundProject) { - if (foundProject.schema == null) { - setMissingSchemaDialog(true) - } else { - setMissingSchemaDialog(false) - } + if (configQuery.data.project_id) { + const foundProject = projects.data.find(p => p.project_id === configQuery.data.project_id) + if (foundProject) { + if (foundProject.schema == null) { + setMissingSchemaDialog(true) + } else { + setMissingSchemaDialog(false) + } - const programWithSchema = parseProjectSchema(foundProject) - if (attachedConnectors) { - invalidConnections = attachedConnectors.filter(attached_connector => { - return !connectorConnects(attached_connector, programWithSchema.schema) - }) - validConnections = attachedConnectors.filter(attached_connector => { - return connectorConnects(attached_connector, programWithSchema.schema) - }) - } + const programWithSchema = parseProjectSchema(foundProject) + if (attachedConnectors) { + invalidConnections = attachedConnectors.filter(attached_connector => { + return !connectorConnects(attached_connector, programWithSchema.schema) + }) + validConnections = attachedConnectors.filter(attached_connector => { + return connectorConnects(attached_connector, programWithSchema.schema) + }) + } - setProject(programWithSchema) - replacePlaceholder(programWithSchema) + setProject(programWithSchema) + replacePlaceholder(programWithSchema) + } } - } - if (invalidConnections.length > 0) { - pushMessage({ - key: new Date().getTime(), - color: 'warning', - message: `Could not attach ${ - invalidConnections.length - } connector(s): No tables/views named ${invalidConnections.map(c => c.config).join(', ')} found.` - }) - } + if (invalidConnections.length > 0) { + pushMessage({ + key: new Date().getTime(), + color: 'warning', + message: `Could not attach ${ + invalidConnections.length + } connector(s): No tables/views named ${invalidConnections.map(c => c.config).join(', ')} found.` + }) + } - if (validConnections) { - validConnections.forEach(attached_connector => { - const connector = connectorQuery.data.find( - connector => connector.connector_id === attached_connector.connector_id - ) - if (connector) { - addConnector(connector, attached_connector) - } - }) + if (validConnections) { + validConnections.forEach(attached_connector => { + const connector = connectorQuery.data.find( + connector => connector.connector_id === attached_connector.connector_id + ) + if (connector) { + addConnector(connector, attached_connector) + } + }) + } + } else if (configId === undefined) { + setProject(undefined) + setSaveState('isNew') + setName('') + setDescription('') + // TODO: Set to 8 for now, needs to be configurable eventually + setConfig('workers: 8\n') } - } else if (configId === undefined) { - setProject(undefined) - setSaveState('isNew') - setName('') - setDescription('') - // TODO: Set to 8 for now, needs to be configurable eventually - setConfig('workers: 8\n') } }, [ connectorQuery.isLoading, @@ -188,7 +189,8 @@ export const PipelineWithProvider = (props: { replacePlaceholder, addConnector, configId, - pushMessage + pushMessage, + saveState ]) const debouncedSave = useDebouncedCallback(() => { From e5b51bc16e58fdc4a641dcb6ed11e035f62dc116 Mon Sep 17 00:00:00 2001 From: Gerd Zellweger Date: Mon, 1 May 2023 22:51:03 -0700 Subject: [PATCH 7/8] Fix regression that displays results again. Another PR changed how we named endpoints. --- .../pages/streaming/introspection/[config]/[view].tsx | 10 +++++++--- web-ui/src/streaming/AnalyticsPipelineTput.tsx | 1 - web-ui/src/streaming/PipelineTable.tsx | 10 ++-------- 3 files changed, 9 insertions(+), 12 deletions(-) diff --git a/web-ui/src/pages/streaming/introspection/[config]/[view].tsx b/web-ui/src/pages/streaming/introspection/[config]/[view].tsx index 91dcb92e..12572777 100644 --- a/web-ui/src/pages/streaming/introspection/[config]/[view].tsx +++ b/web-ui/src/pages/streaming/introspection/[config]/[view].tsx @@ -10,7 +10,7 @@ import { useQuery } from '@tanstack/react-query' import { useRouter } from 'next/router' import { useEffect, useRef, useState } from 'react' import PageHeader from 'src/layouts/components/page-header' -import { ConfigDescr, ConfigId, ProjectDescr } from 'src/types/manager' +import { ConfigDescr, ConfigId, Direction, ProjectDescr } from 'src/types/manager' import { parse } from 'csv-parse' import { parseProjectSchema } from 'src/types/program' @@ -68,7 +68,12 @@ const IntrospectInputOutput = () => { const ws = useRef(null) useEffect(() => { if (configDescr && configDescr.pipeline && view !== undefined && headers !== undefined && apiRef.current) { - const socket = new WebSocket('ws://localhost:' + configDescr.pipeline.port + '/output_endpoint/debug-' + view) + const endpoint = configDescr.attached_connectors.find(ac => ac.config == view) + const direction = endpoint?.direction === Direction.INPUT ? '/input_endpoint/' : '/output_endpoint/' + const socket = new WebSocket( + 'ws://localhost:' + configDescr.pipeline.port + direction + 'debug-' + endpoint?.uuid + ) + socket.onopen = () => { console.log('opened') } @@ -78,7 +83,6 @@ const IntrospectInputOutput = () => { } socket.onmessage = event => { - console.log('got message') event.data.text().then((txt: string) => { parse( txt, diff --git a/web-ui/src/streaming/AnalyticsPipelineTput.tsx b/web-ui/src/streaming/AnalyticsPipelineTput.tsx index aaedef3a..3bf84d96 100644 --- a/web-ui/src/streaming/AnalyticsPipelineTput.tsx +++ b/web-ui/src/streaming/AnalyticsPipelineTput.tsx @@ -19,7 +19,6 @@ const AnalyticsPipelineTput = (props: { metrics: GlobalMetrics[] }) => { .slice(1) .map(m => m.total_processed_records) .filter(x => x != 0) - console.log(totalProcessed) const throughput = totalProcessed.slice(1).map((x, i) => x - totalProcessed[i]) const smoothTput = throughput.slice(1).map((x, i) => x * 0.6 + 0.4 * throughput[i]) diff --git a/web-ui/src/streaming/PipelineTable.tsx b/web-ui/src/streaming/PipelineTable.tsx index 4b7bf38b..59f7896f 100644 --- a/web-ui/src/streaming/PipelineTable.tsx +++ b/web-ui/src/streaming/PipelineTable.tsx @@ -94,7 +94,6 @@ const DetailPanelContent = (props: { row: ConfigDescr }) => { useEffect(() => { if (!pipelineStatusQuery.isLoading && !pipelineStatusQuery.isError) { - console.log(pipelineStatusQuery.data) const metrics = pipelineStatusQuery.data['global_metrics'] setGlobalMetrics(oldMetrics => [...oldMetrics, metrics]) @@ -183,12 +182,8 @@ const DetailPanelContent = (props: { row: ConfigDescr }) => { field: 'records', headerName: 'Records', flex: 0.15, - renderCell: params => { - console.log(params.row.ac.config.trim()) - console.log(inputMetrics) - - return format('.1s')(inputMetrics?.get(params.row.ac.config.trim())?.total_records || 0) - } + renderCell: params => + format('.1s')(inputMetrics?.get(params.row.ac.config.trim())?.total_records || 0) }, { field: 'traffic', @@ -359,7 +354,6 @@ export default function PipelineTable() { useEffect(() => { if (!isLoading && !isError) { setRows(data) - console.log(data) } }, [isLoading, isError, data, setRows]) From 4fb73209f8c565d66a7c6685504546c5411dd33c Mon Sep 17 00:00:00 2001 From: Gerd Zellweger Date: Tue, 2 May 2023 09:10:45 -0700 Subject: [PATCH 8/8] Fix race when deleting connectors. If we don't update the query cache the UI will re-create the just deleted from the query cache one savestatus goes to isUptoDate. --- web-ui/src/pages/streaming/builder/index.tsx | 29 +++++++++++++++++--- 1 file changed, 25 insertions(+), 4 deletions(-) diff --git a/web-ui/src/pages/streaming/builder/index.tsx b/web-ui/src/pages/streaming/builder/index.tsx index b886757d..b9b91f78 100644 --- a/web-ui/src/pages/streaming/builder/index.tsx +++ b/web-ui/src/pages/streaming/builder/index.tsx @@ -1,14 +1,14 @@ +import assert from 'assert' +import { Dispatch, SetStateAction, useEffect, useState } from 'react' import Grid from '@mui/material/Grid' import Typography from '@mui/material/Typography' import PageHeader from 'src/layouts/components/page-header' - import { Card, CardContent } from '@mui/material' import PipelineGraph from 'src/streaming/builder/PipelineBuilder' import SaveIndicator, { SaveIndicatorState } from 'src/components/SaveIndicator' import { match } from 'ts-pattern' import Metadata from 'src/streaming/builder/Metadata' import { useBuilderState } from 'src/streaming/builder/useBuilderState' -import { Dispatch, SetStateAction, useEffect, useState } from 'react' import { AttachedConnector, CancelError, @@ -23,7 +23,7 @@ import { UpdateConfigRequest, UpdateConfigResponse } from 'src/types/manager' -import { useMutation, useQuery } from '@tanstack/react-query' +import { useMutation, useQuery, useQueryClient } from '@tanstack/react-query' import { ReactFlowProvider, useReactFlow } from 'reactflow' import { useDebouncedCallback } from 'use-debounce' import { removePrefix } from 'src/utils' @@ -56,6 +56,7 @@ export const PipelineWithProvider = (props: { configId: ConfigId | undefined setConfigId: Dispatch> }) => { + const queryClient = useQueryClient() const [missingSchemaDialog, setMissingSchemaDialog] = useState(false) const { configId, setConfigId } = props @@ -260,11 +261,30 @@ export const PipelineWithProvider = (props: { } updateConfigMutate(updateRequest, { + onSettled: () => { + assert(configId !== undefined) + queryClient.invalidateQueries(['configStatus', { config_id: configId }]) + }, onError: (error: CancelError) => { pushMessage({ message: error.message, key: new Date().getTime(), color: 'error' }) setSaveState('isUpToDate') }, onSuccess: () => { + // It's important to update the query cache here because otherwise + // sometimes the query cache will be out of date and the UI will + // show the old connectors again after deletion. + queryClient.setQueryData(['configStatus', { config_id: configId }], (oldData: ConfigDescr | undefined) => { + return oldData + ? { + ...oldData, + name, + description, + project_id: project?.project_id, + config, + attached_connectors: connectors + } + : oldData + }) setSaveState('isUpToDate') } }) @@ -284,7 +304,8 @@ export const PipelineWithProvider = (props: { config, getNode, getEdges, - pushMessage + pushMessage, + queryClient ]) return (