Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 8 additions & 4 deletions etl-api/src/configs/destination.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
use crate::configs::encryption::{
Decrypt, DecryptionError, Encrypt, EncryptedValue, EncryptionError, EncryptionKey,
decrypt_text, encrypt_text,
};
use etl_config::SerializableSecretString;
use etl_config::shared::DestinationConfig;
use secrecy::ExposeSecret;
use serde::{Deserialize, Serialize};
use utoipa::ToSchema;

use crate::configs::encryption::{
Decrypt, DecryptionError, Encrypt, EncryptedValue, EncryptionError, EncryptionKey,
decrypt_text, encrypt_text,
};
use crate::configs::store::Store;

const DEFAULT_MAX_CONCURRENT_STREAMS: usize = 8;

#[derive(Debug, Clone, Serialize, Deserialize, ToSchema)]
Expand Down Expand Up @@ -149,6 +151,8 @@ pub enum EncryptedStoredDestinationConfig {
},
}

impl Store for EncryptedStoredDestinationConfig {}

impl Decrypt<StoredDestinationConfig> for EncryptedStoredDestinationConfig {
fn decrypt(
self,
Expand Down
1 change: 1 addition & 0 deletions etl-api/src/configs/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,4 @@ pub mod encryption;
pub mod pipeline;
pub mod serde;
pub mod source;
pub mod store;
3 changes: 3 additions & 0 deletions etl-api/src/configs/pipeline.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use crate::configs::store::Store;
use etl_config::shared::{BatchConfig, PgConnectionConfig, PipelineConfig};
use serde::{Deserialize, Serialize};
use utoipa::ToSchema;
Expand Down Expand Up @@ -83,6 +84,8 @@ impl StoredPipelineConfig {
}
}

impl Store for StoredPipelineConfig {}

impl From<FullApiPipelineConfig> for StoredPipelineConfig {
fn from(value: FullApiPipelineConfig) -> Self {
Self {
Expand Down
8 changes: 4 additions & 4 deletions etl-api/src/configs/serde.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
use serde::Serialize;
use serde::de::DeserializeOwned;
use thiserror::Error;

use crate::configs::encryption::{
Decrypt, DecryptionError, Encrypt, EncryptionError, EncryptionKey,
};
use crate::configs::store::Store;

/// Errors that can occur during serialization or encryption for database storage.
#[derive(Debug, Error)]
Expand Down Expand Up @@ -35,7 +35,7 @@ pub enum DbDeserializationError {
/// Returns an error if serialization fails.
pub fn serialize<S>(value: S) -> Result<serde_json::Value, DbSerializationError>
where
S: Serialize,
S: Store,
{
let serialized_value = serde_json::to_value(value)?;

Expand Down Expand Up @@ -65,7 +65,7 @@ where
/// Returns an error if deserialization fails.
pub fn deserialize_from_value<S>(value: serde_json::Value) -> Result<S, DbDeserializationError>
where
S: DeserializeOwned,
S: Store,
{
let deserialized_value = serde_json::from_value(value)?;

Expand All @@ -84,7 +84,7 @@ pub fn decrypt_and_deserialize_from_value<T, S>(
) -> Result<S, DbDeserializationError>
where
T: Decrypt<S>,
T: DeserializeOwned,
T: Store,
{
let deserialized_value: T = serde_json::from_value(value)?;
let value = deserialized_value.decrypt(encryption_key)?;
Expand Down
3 changes: 3 additions & 0 deletions etl-api/src/configs/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use crate::configs::encryption::{
Decrypt, DecryptionError, Encrypt, EncryptedValue, EncryptionError, EncryptionKey,
decrypt_text, encrypt_text,
};
use crate::configs::store::Store;

const DEFAULT_TLS_TRUSTED_ROOT_CERTS: &str = "";
const DEFAULT_TLS_ENABLED: bool = false;
Expand Down Expand Up @@ -144,6 +145,8 @@ pub struct EncryptedStoredSourceConfig {
password: Option<EncryptedValue>,
}

impl Store for EncryptedStoredSourceConfig {}

impl Decrypt<StoredSourceConfig> for EncryptedStoredSourceConfig {
fn decrypt(
self,
Expand Down
8 changes: 8 additions & 0 deletions etl-api/src/configs/store.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
use serde::Serialize;
use serde::de::DeserializeOwned;

/// Market trait that has to be implemented by configs that can be stored in the database.
///
/// With this trait we can enforce at compile time which structs can actually be stored and avoid
/// storing the wrong struct.
pub trait Store: Serialize + DeserializeOwned {}
2 changes: 1 addition & 1 deletion etl-api/src/db/destinations_pipelines.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ pub async fn update_destination_and_pipeline(
pipeline_id,
source_id,
destination_id,
&pipeline_config,
pipeline_config,
)
.await?;

Expand Down
6 changes: 3 additions & 3 deletions etl-api/src/db/pipelines.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ pub async fn create_pipeline(
image_id: i64,
config: FullApiPipelineConfig,
) -> Result<i64, PipelinesDbError> {
let config = serialize(&config)?;
let config = serialize(StoredPipelineConfig::from(config))?;

let replicator_id = create_replicator(txn.deref_mut(), tenant_id, image_id).await?;
let record = sqlx::query!(
Expand Down Expand Up @@ -134,12 +134,12 @@ pub async fn update_pipeline<'c, E>(
pipeline_id: i64,
source_id: i64,
destination_id: i64,
config: &FullApiPipelineConfig,
config: FullApiPipelineConfig,
) -> Result<Option<i64>, PipelinesDbError>
where
E: PgExecutor<'c>,
{
let pipeline_config = serialize(config)?;
let pipeline_config = serialize(StoredPipelineConfig::from(config))?;

let record = sqlx::query!(
r#"
Expand Down
2 changes: 1 addition & 1 deletion etl-api/src/routes/pipelines.rs
Original file line number Diff line number Diff line change
Expand Up @@ -500,7 +500,7 @@ pub async fn update_pipeline(
pipeline_id,
pipeline.source_id,
pipeline.destination_id,
&pipeline.config,
pipeline.config,
)
.await?
.ok_or(PipelineError::PipelineNotFound(pipeline_id))?;
Expand Down