Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
-- Remove CASCADE DELETE for destinations and sources to prevent automatic pipeline deletion since
-- we want full control over deleting resources (except when a tenant id deleted)

alter table app.pipelines
drop constraint pipelines_sink_id_fkey,
add constraint pipelines_sink_id_fkey
foreign key (destination_id)
references app.destinations (id);

alter table app.pipelines
drop constraint pipelines_source_id_fkey,
add constraint pipelines_source_id_fkey
foreign key (source_id)
references app.sources (id);
47 changes: 47 additions & 0 deletions etl-api/src/db/pipelines.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,19 @@
use etl_config::shared::BatchConfig;
use etl_postgres::replication::{schema, state};
use serde::{Deserialize, Serialize};
use sqlx::{PgExecutor, PgTransaction};
use std::ops::DerefMut;
use thiserror::Error;
use utoipa::ToSchema;

use crate::db;
use crate::db::destinations::{Destination, DestinationsDbError};
use crate::db::replicators::{ReplicatorsDbError, create_replicator};
use crate::db::serde::{
DbDeserializationError, DbSerializationError, deserialize_from_value, serialize,
};
use crate::db::sources::Source;
use crate::routes::connect_to_source_database_with_defaults;

/// Pipeline configuration used during replication. This struct's fields
/// should be kept in sync with [`OptionalPipelineConfig`]. If a new optional
Expand Down Expand Up @@ -87,6 +92,9 @@ pub enum PipelinesDbError {

#[error(transparent)]
ReplicatorsDb(#[from] ReplicatorsDbError),

#[error(transparent)]
DestinationsDb(#[from] DestinationsDbError),
}

pub async fn create_pipeline(
Expand Down Expand Up @@ -225,6 +233,45 @@ where
Ok(record.map(|r| r.id))
}

pub async fn delete_pipeline_cascading(
mut txn: PgTransaction<'_>,
tenant_id: &str,
pipeline: &Pipeline,
source: &Source,
destination: Option<&Destination>,
) -> Result<(), PipelinesDbError> {
let source_pool =
connect_to_source_database_with_defaults(&source.config.clone().into_connection_config())
.await?;

// We start a transaction in the source database while the other transaction is active in the
// api database so that in case of failures when deleting the state, we also rollback the transaction
// in the api database.
let mut source_txn = source_pool.begin().await?;

// Delete the pipeline from the main database (this does NOT cascade delete the replicator due to missing constraint)
delete_pipeline(txn.deref_mut(), tenant_id, pipeline.id).await?;

// Manually delete the replicator since there's no cascade constraint
db::replicators::delete_replicator(txn.deref_mut(), tenant_id, pipeline.replicator_id).await?;

// If a destination is supplied, also the destination will be deleted.
if let Some(destination) = destination {
db::destinations::delete_destination(txn.deref_mut(), tenant_id, destination.id).await?;
}

// Delete state and schema from the source database
state::delete_pipeline_replication_state(source_txn.deref_mut(), pipeline.id).await?;
schema::delete_pipeline_table_schemas(source_txn.deref_mut(), pipeline.id).await?;

// Here we finish `txn` before `source_txn` since we want the guarantee that the pipeline has
// been deleted before committing the state deletions.
txn.commit().await?;
source_txn.commit().await?;

Ok(())
}

pub async fn read_all_pipelines<'c, E>(
executor: E,
tenant_id: &str,
Expand Down
23 changes: 23 additions & 0 deletions etl-api/src/db/replicators.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,3 +118,26 @@ where

Ok(record.map(|r| r.id))
}

pub async fn delete_replicator<'c, E>(
executor: E,
tenant_id: &str,
replicator_id: i64,
) -> Result<Option<i64>, ReplicatorsDbError>
where
E: PgExecutor<'c>,
{
let record = sqlx::query!(
r#"
delete from app.replicators
where tenant_id = $1 and id = $2
returning id
"#,
tenant_id,
replicator_id
)
.fetch_optional(executor)
.await?;

Ok(record.map(|r| r.id))
}
101 changes: 98 additions & 3 deletions etl-api/src/routes/destinations_pipelines.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use actix_web::{
HttpRequest, HttpResponse, Responder, ResponseError,
HttpRequest, HttpResponse, Responder, ResponseError, delete,
http::{StatusCode, header::ContentType},
post,
web::{Data, Json, Path},
Expand All @@ -15,7 +15,7 @@ use crate::db;
use crate::db::destinations::{DestinationsDbError, destination_exists};
use crate::db::destinations_pipelines::DestinationPipelinesDbError;
use crate::db::images::ImagesDbError;
use crate::db::pipelines::PipelineConfig;
use crate::db::pipelines::{PipelineConfig, PipelinesDbError, read_pipeline};
use crate::db::sources::{SourcesDbError, source_exists};
use crate::encryption::EncryptionKey;

Expand All @@ -38,6 +38,9 @@ enum DestinationPipelineError {
#[error("The pipeline with id {0} was not found")]
PipelineNotFound(i64),

#[error("The pipeline with id {0} is not connected to destination with id {1}")]
PipelineDestinationMismatch(i64, i64),

#[error(transparent)]
Destination(#[from] DestinationError),

Expand All @@ -56,6 +59,9 @@ enum DestinationPipelineError {
#[error(transparent)]
SourcesDb(#[from] SourcesDbError),

#[error(transparent)]
PipelinesDb(#[from] PipelinesDbError),

#[error("Database error: {0}")]
Database(#[from] sqlx::Error),
}
Expand Down Expand Up @@ -83,6 +89,7 @@ impl DestinationPipelineError {
| DestinationPipelineError::DestinationsDb(DestinationsDbError::Database(_))
| DestinationPipelineError::ImagesDb(ImagesDbError::Database(_))
| DestinationPipelineError::SourcesDb(SourcesDbError::Database(_))
| DestinationPipelineError::PipelinesDb(PipelinesDbError::Database(_))
| DestinationPipelineError::Database(_) => "internal server error".to_string(),
// Every other message is ok, as they do not divulge sensitive information
e => e.to_string(),
Expand All @@ -99,11 +106,15 @@ impl ResponseError for DestinationPipelineError {
| DestinationPipelineError::DestinationsDb(_)
| DestinationPipelineError::ImagesDb(_)
| DestinationPipelineError::SourcesDb(_)
| DestinationPipelineError::PipelinesDb(_)
| DestinationPipelineError::Database(_) => StatusCode::INTERNAL_SERVER_ERROR,
DestinationPipelineError::TenantId(_)
| DestinationPipelineError::SourceNotFound(_)
| DestinationPipelineError::DestinationNotFound(_)
| DestinationPipelineError::PipelineNotFound(_) => StatusCode::BAD_REQUEST,
| DestinationPipelineError::PipelineNotFound(_)
| DestinationPipelineError::PipelineDestinationMismatch(_, _) => {
StatusCode::BAD_REQUEST
}
DestinationPipelineError::DuplicatePipeline => StatusCode::CONFLICT,
}
}
Expand Down Expand Up @@ -176,6 +187,7 @@ pub async fn create_destination_and_pipeline(
let destination_and_pipeline = destination_and_pipeline.into_inner();

let mut txn = pool.begin().await?;

if !source_exists(
txn.deref_mut(),
tenant_id,
Expand All @@ -191,6 +203,7 @@ pub async fn create_destination_and_pipeline(
let image = db::images::read_default_image(&**pool)
.await?
.ok_or(DestinationPipelineError::NoDefaultImageFound)?;

let (destination_id, pipeline_id) =
db::destinations_pipelines::create_destination_and_pipeline(
&mut txn,
Expand All @@ -203,6 +216,7 @@ pub async fn create_destination_and_pipeline(
&encryption_key,
)
.await?;

txn.commit().await?;

let response = CreateDestinationPipelineResponse {
Expand Down Expand Up @@ -241,6 +255,7 @@ pub async fn update_destination_and_pipeline(
let destination_and_pipeline = destination_and_pipeline.into_inner();

let mut txn = pool.begin().await?;

if !source_exists(
txn.deref_mut(),
tenant_id,
Expand All @@ -259,6 +274,17 @@ pub async fn update_destination_and_pipeline(
));
}

let pipeline = read_pipeline(txn.deref_mut(), tenant_id, pipeline_id)
.await?
.ok_or(DestinationPipelineError::PipelineNotFound(pipeline_id))?;

if pipeline.destination_id != destination_id {
return Err(DestinationPipelineError::PipelineDestinationMismatch(
pipeline_id,
destination_id,
));
}

db::destinations_pipelines::update_destination_and_pipeline(
txn,
tenant_id,
Expand All @@ -283,3 +309,72 @@ pub async fn update_destination_and_pipeline(

Ok(HttpResponse::Ok().finish())
}

#[utoipa::path(
params(
("destination_id" = i64, Path, description = "ID of the destination to delete"),
("pipeline_id" = i64, Path, description = "ID of the pipeline to delete"),
("tenant_id" = String, Header, description = "The tenant ID")
),
responses(
(status = 200, description = "Delete destination and pipeline"),
(status = 404, description = "Pipeline or destination not found", body = ErrorMessage),
(status = 400, description = "Bad request", body = ErrorMessage),
(status = 500, description = "Internal server error", body = ErrorMessage)
),
tag = "Destinations and Pipelines"
)]
#[delete("/destinations-pipelines/{destination_id}/{pipeline_id}")]
pub async fn delete_destination_and_pipeline(
req: HttpRequest,
pool: Data<PgPool>,
encryption_key: Data<EncryptionKey>,
destination_and_pipeline_ids: Path<(i64, i64)>,
) -> Result<impl Responder, DestinationPipelineError> {
let tenant_id = extract_tenant_id(&req)?;
let (destination_id, pipeline_id) = destination_and_pipeline_ids.into_inner();

let mut txn = pool.begin().await?;

let pipeline = read_pipeline(txn.deref_mut(), tenant_id, pipeline_id)
.await?
.ok_or(DestinationPipelineError::PipelineNotFound(pipeline_id))?;

if pipeline.destination_id != destination_id {
return Err(DestinationPipelineError::PipelineDestinationMismatch(
pipeline_id,
destination_id,
));
}

let destination = db::destinations::read_destination(
txn.deref_mut(),
tenant_id,
destination_id,
&encryption_key,
)
.await?
.ok_or(DestinationPipelineError::DestinationNotFound(
destination_id,
))?;

let source = db::sources::read_source(
txn.deref_mut(),
tenant_id,
pipeline.source_id,
&encryption_key,
)
.await?
.ok_or(DestinationPipelineError::SourceNotFound(pipeline.source_id))?;

db::pipelines::delete_pipeline_cascading(
txn,
tenant_id,
&pipeline,
&source,
Some(&destination),
)
.await?;

Ok(HttpResponse::Ok().finish())
}
26 changes: 3 additions & 23 deletions etl-api/src/routes/pipelines.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use etl_config::shared::{
DestinationConfig, PgConnectionConfig, PipelineConfig as SharedPipelineConfig,
ReplicatorConfig, SupabaseConfig, TlsConfig,
};
use etl_postgres::replication::{TableLookupError, get_table_name_from_oid, schema, state};
use etl_postgres::replication::{TableLookupError, get_table_name_from_oid, state};
use etl_postgres::schema::TableId;
use secrecy::ExposeSecret;
use serde::{Deserialize, Serialize};
Expand All @@ -31,7 +31,7 @@ use crate::routes::{
};

#[derive(Debug, Error)]
enum PipelineError {
pub enum PipelineError {
#[error("The pipeline with id {0} was not found")]
PipelineNotFound(i64),

Expand Down Expand Up @@ -527,7 +527,6 @@ pub async fn delete_pipeline(

let mut txn = pool.begin().await?;

// First, verify the pipeline exists and get source info for cleanup
let pipeline = db::pipelines::read_pipeline(txn.deref_mut(), tenant_id, pipeline_id)
.await?
.ok_or(PipelineError::PipelineNotFound(pipeline_id))?;
Expand All @@ -541,26 +540,7 @@ pub async fn delete_pipeline(
.await?
.ok_or(PipelineError::SourceNotFound(pipeline.source_id))?;

// Delete the pipeline from the main database (this will cascade delete the replicator)
db::pipelines::delete_pipeline(txn.deref_mut(), tenant_id, pipeline_id)
.await?
.ok_or(PipelineError::PipelineNotFound(pipeline_id))?;

let source_pool =
connect_to_source_database_with_defaults(&source.config.into_connection_config()).await?;

// We start a transaction in the source database while the other transaction is active in the
// api database so that in case of failures when deleting the state, we also rollback the transaction
// in the api database.
let mut source_txn = source_pool.begin().await?;

state::delete_pipeline_replication_state(source_txn.deref_mut(), pipeline_id).await?;
schema::delete_pipeline_table_schemas(source_txn.deref_mut(), pipeline_id).await?;

// Here we finish `txn` before `source_txn` since we want the guarantee that the pipeline has
// been deleted before committing the state deletions.
txn.commit().await?;
source_txn.commit().await?;
db::pipelines::delete_pipeline_cascading(txn, tenant_id, &pipeline, &source, None).await?;

Ok(HttpResponse::Ok().finish())
}
Expand Down
Loading
Loading