Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 78 additions & 0 deletions etl-api/src/routes/pipelines.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ use crate::k8s_client::{RESTARTED_AT_ANNOTATION_KEY, TRUSTED_ROOT_CERT_KEY_NAME}
use crate::routes::{
ErrorMessage, TenantIdError, connect_to_source_database_with_defaults, extract_tenant_id,
};
use crate::utils::parse_docker_image_tag;

#[derive(Debug, Error)]
pub enum PipelineError {
Expand Down Expand Up @@ -368,6 +369,23 @@ pub enum PipelineStatus {
Failed,
}

#[derive(Debug, Serialize, Deserialize, ToSchema)]
pub struct PipelineVersion {
#[schema(example = 1)]
pub id: i64,
#[schema(example = "1.2.3")]
pub name: String,
}

#[derive(Debug, Serialize, Deserialize, ToSchema)]
pub struct GetPipelineVersionResponse {
#[schema(example = 1)]
pub pipeline_id: i64,
pub version: PipelineVersion,
#[serde(skip_serializing_if = "Option::is_none")]
pub new_version: Option<PipelineVersion>,
}

#[utoipa::path(
summary = "Create a pipeline",
description = "Creates a pipeline linking a source to a destination.",
Expand Down Expand Up @@ -710,6 +728,66 @@ pub async fn stop_all_pipelines(
Ok(HttpResponse::Ok().finish())
}

#[utoipa::path(
summary = "Get pipeline version",
description = "Returns the current version for the pipeline and an optional new default version.",
params(
("pipeline_id" = i64, Path, description = "Unique ID of the pipeline"),
("tenant_id" = String, Header, description = "Tenant ID used to scope the request")
),
responses(
(status = 200, description = "Pipeline version retrieved successfully", body = GetPipelineVersionResponse),
(status = 404, description = "Pipeline not found", body = ErrorMessage),
(status = 500, description = "Internal server error", body = ErrorMessage)
),
tag = "Pipelines"
)]
#[get("/pipelines/{pipeline_id}/version")]
pub async fn get_pipeline_version(
req: HttpRequest,
pool: Data<PgPool>,
pipeline_id: Path<i64>,
) -> Result<impl Responder, PipelineError> {
let tenant_id = extract_tenant_id(&req)?;
let pipeline_id = pipeline_id.into_inner();

let mut txn = pool.begin().await?;

let replicator =
db::replicators::read_replicator_by_pipeline_id(txn.deref_mut(), tenant_id, pipeline_id)
.await?
.ok_or(PipelineError::ReplicatorNotFound(pipeline_id))?;

let current_image = db::images::read_image_by_replicator_id(txn.deref_mut(), replicator.id)
.await?
.ok_or(PipelineError::ImageNotFound(replicator.id))?;

let default_image = db::images::read_default_image(txn.deref_mut()).await?;

txn.commit().await?;

let current_version = PipelineVersion {
id: current_image.id,
name: parse_docker_image_tag(&current_image.name),
};

let new_version = match default_image {
Some(default_image) if default_image.id != current_image.id => Some(PipelineVersion {
id: default_image.id,
name: parse_docker_image_tag(&default_image.name),
}),
_ => None,
};

let response = GetPipelineVersionResponse {
pipeline_id,
version: current_version,
new_version,
};

Ok(Json(response))
}

#[utoipa::path(
summary = "Check pipeline status",
description = "Returns the current status of the pipeline's replicator.",
Expand Down
16 changes: 10 additions & 6 deletions etl-api/src/startup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,13 @@ use crate::{
metrics::metrics,
pipelines::{
CreatePipelineRequest, CreatePipelineResponse, GetPipelineReplicationStatusResponse,
GetPipelineStatusResponse, ReadPipelineResponse, ReadPipelinesResponse,
SimpleTableReplicationState, TableReplicationStatus, UpdatePipelineImageRequest,
UpdatePipelineRequest, create_pipeline, delete_pipeline,
get_pipeline_replication_status, get_pipeline_status, read_all_pipelines,
read_pipeline, rollback_table_state, start_pipeline, stop_all_pipelines, stop_pipeline,
update_pipeline, update_pipeline_config, update_pipeline_image,
GetPipelineStatusResponse, GetPipelineVersionResponse, ReadPipelineResponse,
ReadPipelinesResponse, SimpleTableReplicationState, TableReplicationStatus,
UpdatePipelineImageRequest, UpdatePipelineRequest, create_pipeline, delete_pipeline,
get_pipeline_replication_status, get_pipeline_status, get_pipeline_version,
read_all_pipelines, read_pipeline, rollback_table_state, start_pipeline,
stop_all_pipelines, stop_pipeline, update_pipeline, update_pipeline_config,
update_pipeline_image,
},
sources::{
CreateSourceRequest, CreateSourceResponse, ReadSourceResponse, ReadSourcesResponse,
Expand Down Expand Up @@ -181,6 +182,7 @@ pub async fn run(
UpdatePipelineRequest,
ReadPipelineResponse,
ReadPipelinesResponse,
GetPipelineVersionResponse,
UpdatePipelineImageRequest,
GetPipelineStatusResponse,
GetPipelineReplicationStatusResponse,
Expand Down Expand Up @@ -231,6 +233,7 @@ pub async fn run(
crate::routes::pipelines::delete_pipeline,
crate::routes::pipelines::read_all_pipelines,
crate::routes::pipelines::get_pipeline_status,
crate::routes::pipelines::get_pipeline_version,
crate::routes::pipelines::get_pipeline_replication_status,
crate::routes::pipelines::rollback_table_state,
crate::routes::pipelines::update_pipeline_image,
Expand Down Expand Up @@ -315,6 +318,7 @@ pub async fn run(
.service(stop_pipeline)
.service(stop_all_pipelines)
.service(get_pipeline_status)
.service(get_pipeline_version)
.service(get_pipeline_replication_status)
.service(rollback_table_state)
.service(update_pipeline_image)
Expand Down
85 changes: 85 additions & 0 deletions etl-api/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,88 @@ pub fn generate_random_alpha_str(len: usize) -> String {
.map(|_| chars[rng.random_range(0..chars.len())])
.collect()
}

/// Parses a Docker image reference to extract the tag to be used as a version name.
///
/// Expected formats: `HOST[:PORT]/NAMESPACE/REPOSITORY[:TAG][@DIGEST]`.
/// - If a tag is present, returns it (ignoring any trailing digest part).
/// - If no tag is present and also no digest, defaults to `latest`.
/// - If parsing fails or only a digest is present, returns `unavailable`.
pub fn parse_docker_image_tag(image: &str) -> String {
// Work on the last path segment only
let last_slash = image.rfind('/').map(|i| i + 1).unwrap_or(0);
let segment = &image[last_slash..];

// Identify optional digest marker within the segment
let at_pos = segment.find('@');

// Search for ':' in the segment, but if a digest '@' exists, ignore ':' that occur after it
let colon_pos_in_segment = match at_pos {
Some(at_idx) => segment[..at_idx].find(':'),
None => segment.find(':'),
};

if let Some(col_idx) = colon_pos_in_segment {
// Extract tag between ':' and optional '@'
let after_colon = &segment[col_idx + 1..];
let tag = match at_pos {
Some(at_idx) => &segment[col_idx + 1..at_idx],
None => after_colon,
};

if tag.is_empty() {
return "unavailable".to_string();
}

return tag.to_string();
}

// No tag in the segment. If there's a digest in the segment, we can't infer a tag.
if at_pos.is_some() {
return "unavailable".to_string();
}

// No tag and no digest in the segment -> default docker tag is latest
"latest".to_string()
}

#[cfg(test)]
mod tests {
use crate::utils::parse_docker_image_tag;

#[test]
fn parse_with_tag() {
assert_eq!(parse_docker_image_tag("supabase/replicator:1.2.3"), "1.2.3");
assert_eq!(
parse_docker_image_tag("example.com:5000/team/my-app:2.0"),
"2.0"
);
assert_eq!(
parse_docker_image_tag("ghcr.io/dockersamples/example-app:pr-311"),
"pr-311"
);
}

#[test]
fn parse_with_tag_and_digest() {
assert_eq!(
parse_docker_image_tag("example.com:5000/team/my-app:2.0@sha256:abcdef0123456789"),
"2.0"
);
}

#[test]
fn parse_without_tag_defaults_to_latest() {
assert_eq!(parse_docker_image_tag("alpine"), "latest");
assert_eq!(parse_docker_image_tag("library/alpine"), "latest");
assert_eq!(parse_docker_image_tag("docker.io/library/alpine"), "latest");
}

#[test]
fn parse_with_only_digest_unavailable() {
assert_eq!(
parse_docker_image_tag("repo/name@sha256:abcdef0123456789"),
"unavailable"
);
}
}
116 changes: 102 additions & 14 deletions etl-api/tests/pipelines.rs
Original file line number Diff line number Diff line change
@@ -1,19 +1,9 @@
use crate::support::database::{
create_test_source_database, run_etl_migrations_on_source_database,
};
use crate::{
support::mocks::create_default_image,
support::mocks::destinations::create_destination,
support::mocks::sources::create_source,
support::mocks::tenants::{create_tenant, create_tenant_with_id_and_name},
support::test_app::{TestApp, spawn_test_app},
};
use etl_api::routes::pipelines::{
CreatePipelineRequest, CreatePipelineResponse, GetPipelineReplicationStatusResponse,
ReadPipelineResponse, ReadPipelinesResponse, RollbackTableStateRequest,
RollbackTableStateResponse, RollbackType, SimpleTableReplicationState,
UpdatePipelineConfigRequest, UpdatePipelineConfigResponse, UpdatePipelineImageRequest,
UpdatePipelineRequest,
GetPipelineVersionResponse, ReadPipelineResponse, ReadPipelinesResponse,
RollbackTableStateRequest, RollbackTableStateResponse, RollbackType,
SimpleTableReplicationState, UpdatePipelineConfigRequest, UpdatePipelineConfigResponse,
UpdatePipelineImageRequest, UpdatePipelineRequest,
};
use etl_config::shared::{BatchConfig, PgConnectionConfig};
use etl_postgres::sqlx::test_utils::drop_pg_database;
Expand All @@ -22,11 +12,22 @@ use reqwest::StatusCode;
use sqlx::PgPool;
use sqlx::postgres::types::Oid;

use crate::support::database::{
create_test_source_database, run_etl_migrations_on_source_database,
};
use crate::support::mocks::create_image_with_name;
use crate::support::mocks::pipelines::{
ConfigUpdateType, create_pipeline_with_config, new_pipeline_config,
partially_updated_optional_pipeline_config, updated_optional_pipeline_config,
updated_pipeline_config,
};
use crate::{
support::mocks::create_default_image,
support::mocks::destinations::create_destination,
support::mocks::sources::create_source,
support::mocks::tenants::{create_tenant, create_tenant_with_id_and_name},
support::test_app::{TestApp, spawn_test_app},
};

mod support;

Expand Down Expand Up @@ -1280,3 +1281,90 @@ async fn deleting_pipeline_removes_table_schemas_from_source_database() {

drop_pg_database(&source_db_config).await;
}

#[tokio::test(flavor = "multi_thread")]
async fn pipeline_version_returns_current_version_and_no_new_version_when_default_matches() {
init_test_tracing();
// Arrange
let app = spawn_test_app().await;
let tenant_id = create_tenant(&app).await;
let source_id = create_source(&app, &tenant_id).await;
let destination_id = create_destination(&app, &tenant_id).await;

// Create a default image without a tag -> should parse to "latest".
create_image_with_name(&app, "some/image".to_string(), true).await;

let pipeline_id = {
let req = CreatePipelineRequest {
source_id,
destination_id,
config: new_pipeline_config(),
};
let resp = app.create_pipeline(&tenant_id, &req).await;
let resp: CreatePipelineResponse =
resp.json().await.expect("failed to deserialize response");
resp.id
};

// Act
let response = app.get_pipeline_version(&tenant_id, pipeline_id).await;

// Assert
assert!(response.status().is_success());
let version: GetPipelineVersionResponse = response
.json()
.await
.expect("failed to deserialize response");
assert_eq!(version.version.name, "latest");
assert!(version.new_version.is_none());
}

#[tokio::test(flavor = "multi_thread")]
async fn pipeline_version_includes_new_default_version_when_available() {
init_test_tracing();
// Arrange
let app = spawn_test_app().await;
let tenant_id = create_tenant(&app).await;
let source_id = create_source(&app, &tenant_id).await;
let destination_id = create_destination(&app, &tenant_id).await;

// Initial default image for pipeline creation
let old_default_image_id =
create_image_with_name(&app, "supabase/replicator:1.2.3".to_string(), true).await;

let pipeline_id = {
let req = CreatePipelineRequest {
source_id,
destination_id,
config: new_pipeline_config(),
};
let resp = app.create_pipeline(&tenant_id, &req).await;
let resp: CreatePipelineResponse =
resp.json().await.expect("failed to deserialize response");
resp.id
};

// Create a new default image (should flip default)
let default_image_id =
create_image_with_name(&app, "supabase/replicator:1.3.0".to_string(), true).await;

// Act
let response = app.get_pipeline_version(&tenant_id, pipeline_id).await;

// Assert
assert!(response.status().is_success());
let version: GetPipelineVersionResponse = response
.json()
.await
.expect("failed to deserialize response");

let current_version = version.version;
assert_eq!(current_version.id, old_default_image_id);
assert_eq!(current_version.name, "1.2.3");

let new_version = version
.new_version
.expect("expected new_version to be present");
assert_eq!(new_version.id, default_image_id);
assert_eq!(new_version.name, "1.3.0");
}
15 changes: 15 additions & 0 deletions etl-api/tests/support/test_app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -463,6 +463,21 @@ impl TestApp {
.expect("failed to execute request")
}

pub async fn get_pipeline_version(
&self,
tenant_id: &str,
pipeline_id: i64,
) -> reqwest::Response {
self.get_authenticated(format!(
"{}/v1/pipelines/{}/version",
&self.address, pipeline_id
))
.header("tenant_id", tenant_id)
.send()
.await
.expect("failed to execute request")
}

pub async fn rollback_table_state(
&self,
tenant_id: &str,
Expand Down