diff --git a/CHANGELOG.md b/CHANGELOG.md index 56b918ead..5d6459917 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,10 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). +## Unreleased +### Changed +- Rename `compacting` entities and names to `compaction` + ## [0.180.0] - 2024-05-08 ### Added - GraphQL account flows endpoints: @@ -34,7 +38,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - account ID resolutions are no longer mocked - REST API to login remotely using password and GitHub methods ### Fixed -- Compacting datasets stored in an S3 bucket +- Compaction datasets stored in an S3 bucket ## [0.177.0] - 2024-04-25 ### Added diff --git a/resources/cli-reference.md b/resources/cli-reference.md index e11911eff..4e9407afc 100644 --- a/resources/cli-reference.md +++ b/resources/cli-reference.md @@ -1238,20 +1238,20 @@ Compact a dataset * `--max-slice-records ` — Maximum amount of records in a single data slice file Default value: `10000` -* `--hard` — Perform 'hard' compacting that rewrites the history of a dataset -* `--verify` — Perform verification of the dataset before running a compacting +* `--hard` — Perform 'hard' compaction that rewrites the history of a dataset +* `--verify` — Perform verification of the dataset before running a compaction For datasets that get frequent small appends the number of data slices can grow over time and affect the performance of querying. This command allows to merge multiple small data slices into a few large files, which can be beneficial in terms of size from more compact encoding, and in query performance, as data engines will have to scan through far fewer file headers. -There are two types of compactings: soft and hard. +There are two types of compactions: soft and hard. -Soft compactings produce new files while leaving the old blocks intact. This allows for faster queries, while still preserving the accurate history of how dataset evolved over time. +Soft compactions produce new files while leaving the old blocks intact. This allows for faster queries, while still preserving the accurate history of how dataset evolved over time. -Hard compactings rewrite the history of the dataset as if data was originally written in big batches. They allow to shrink the history of a dataset to just a few blocks, reclaim the space used by old data files, but at the expense of history loss. Hard compactings will rewrite the metadata chain, changing block hashes. Therefore, they will **break all downstream datasets** that depend on them. +Hard compactions rewrite the history of the dataset as if data was originally written in big batches. They allow to shrink the history of a dataset to just a few blocks, reclaim the space used by old data files, but at the expense of history loss. Hard compactions will rewrite the metadata chain, changing block hashes. Therefore, they will **break all downstream datasets** that depend on them. **Examples:** -Perform a history-altering hard compacting: +Perform a history-altering hard compaction: kamu system compact --hard my.dataset diff --git a/resources/schema.gql b/resources/schema.gql index 0db9762e1..ef7c60a7f 100644 --- a/resources/schema.gql +++ b/resources/schema.gql @@ -189,7 +189,7 @@ type CommitResultSuccess implements CommitResult & UpdateReadmeResult { message: String! } -input CompactingConditionInput { +input CompactionConditionInput { maxSliceSize: Int! maxSliceRecords: Int! } @@ -427,7 +427,7 @@ type DatasetFlowConfigs { type DatasetFlowConfigsMut { setConfigSchedule(datasetFlowType: DatasetFlowType!, paused: Boolean!, schedule: ScheduleInput!): SetFlowConfigResult! setConfigBatching(datasetFlowType: DatasetFlowType!, paused: Boolean!, batching: BatchingConditionInput!): SetFlowBatchingConfigResult! - setConfigCompacting(datasetFlowType: DatasetFlowType!, compactingArgs: CompactingConditionInput!): SetFlowCompactingConfigResult! + setConfigCompaction(datasetFlowType: DatasetFlowType!, compactionArgs: CompactionConditionInput!): SetFlowCompactionConfigResult! pauseFlows(datasetFlowType: DatasetFlowType): Boolean! resumeFlows(datasetFlowType: DatasetFlowType): Boolean! } @@ -451,7 +451,7 @@ type DatasetFlowRunsMut { enum DatasetFlowType { INGEST EXECUTE_TRANSFORM - HARD_COMPACTING + HARD_COMPACTION } type DatasetFlows { @@ -775,7 +775,7 @@ type FlowConfiguration { paused: Boolean! schedule: FlowConfigurationSchedule batching: FlowConfigurationBatching - compacting: FlowConfigurationCompacting + compaction: FlowConfigurationCompaction } type FlowConfigurationBatching { @@ -783,7 +783,7 @@ type FlowConfigurationBatching { maxBatchingInterval: TimeDelta! } -type FlowConfigurationCompacting { +type FlowConfigurationCompaction { maxSliceSize: Int! maxSliceRecords: Int! } @@ -811,19 +811,19 @@ type FlowDatasetCompactedFailedError { message: String! } -union FlowDescription = FlowDescriptionDatasetPollingIngest | FlowDescriptionDatasetPushIngest | FlowDescriptionDatasetExecuteTransform | FlowDescriptionDatasetHardCompacting | FlowDescriptionSystemGC +union FlowDescription = FlowDescriptionDatasetPollingIngest | FlowDescriptionDatasetPushIngest | FlowDescriptionDatasetExecuteTransform | FlowDescriptionDatasetHardCompaction | FlowDescriptionSystemGC type FlowDescriptionDatasetExecuteTransform { datasetId: DatasetID! transformResult: FlowDescriptionUpdateResult } -type FlowDescriptionDatasetHardCompacting { +type FlowDescriptionDatasetHardCompaction { datasetId: DatasetID! - compactingResult: FlowDescriptionDatasetHardCompactingResult + compactionResult: FlowDescriptionDatasetHardCompactionResult } -union FlowDescriptionDatasetHardCompactingResult = FlowDescriptionHardCompactingNothingToDo | FlowDescriptionHardCompactingSuccess +union FlowDescriptionDatasetHardCompactionResult = FlowDescriptionHardCompactionNothingToDo | FlowDescriptionHardCompactionSuccess type FlowDescriptionDatasetPollingIngest { datasetId: DatasetID! @@ -837,12 +837,12 @@ type FlowDescriptionDatasetPushIngest { ingestResult: FlowDescriptionUpdateResult } -type FlowDescriptionHardCompactingNothingToDo { +type FlowDescriptionHardCompactionNothingToDo { dummy: String! message: String! } -type FlowDescriptionHardCompactingSuccess { +type FlowDescriptionHardCompactionSuccess { originalBlocksCount: Int! resultingBlocksCount: Int! newHead: Multihash! @@ -910,7 +910,7 @@ union FlowFailedReason = FlowFailedMessage | FlowDatasetCompactedFailedError scalar FlowID -type FlowIncompatibleDatasetKind implements SetFlowConfigResult & SetFlowBatchingConfigResult & SetFlowCompactingConfigResult & TriggerFlowResult { +type FlowIncompatibleDatasetKind implements SetFlowConfigResult & SetFlowBatchingConfigResult & SetFlowCompactionConfigResult & TriggerFlowResult { expectedDatasetKind: DatasetKind! actualDatasetKind: DatasetKind! message: String! @@ -921,7 +921,7 @@ type FlowInvalidBatchingConfig implements SetFlowBatchingConfigResult { message: String! } -type FlowInvalidCompactingConfig implements SetFlowCompactingConfigResult { +type FlowInvalidCompactionConfig implements SetFlowCompactionConfigResult { reason: String! message: String! } @@ -1007,7 +1007,7 @@ type FlowTriggerPush { dummy: Boolean! } -type FlowTypeIsNotSupported implements SetFlowConfigResult & SetFlowBatchingConfigResult & SetFlowCompactingConfigResult { +type FlowTypeIsNotSupported implements SetFlowConfigResult & SetFlowBatchingConfigResult & SetFlowCompactionConfigResult { message: String! } @@ -1389,7 +1389,7 @@ interface SetFlowBatchingConfigResult { message: String! } -interface SetFlowCompactingConfigResult { +interface SetFlowCompactionConfigResult { message: String! } @@ -1397,7 +1397,7 @@ interface SetFlowConfigResult { message: String! } -type SetFlowConfigSuccess implements SetFlowConfigResult & SetFlowBatchingConfigResult & SetFlowCompactingConfigResult { +type SetFlowConfigSuccess implements SetFlowConfigResult & SetFlowBatchingConfigResult & SetFlowCompactionConfigResult { config: FlowConfiguration! message: String! } diff --git a/src/adapter/graphql/src/mutations/flows_mut/dataset_flow_configs_mut.rs b/src/adapter/graphql/src/mutations/flows_mut/dataset_flow_configs_mut.rs index 51168167e..157b975c2 100644 --- a/src/adapter/graphql/src/mutations/flows_mut/dataset_flow_configs_mut.rs +++ b/src/adapter/graphql/src/mutations/flows_mut/dataset_flow_configs_mut.rs @@ -10,7 +10,7 @@ use chrono::Utc; use kamu_flow_system::{ BatchingRule, - CompactingRule, + CompactionRule, FlowConfigurationRule, FlowConfigurationService, FlowKeyDataset, @@ -163,28 +163,28 @@ impl DatasetFlowConfigsMut { } #[graphql(guard = "LoggedInGuard::new()")] - async fn set_config_compacting( + async fn set_config_compaction( &self, ctx: &Context<'_>, dataset_flow_type: DatasetFlowType, - compacting_args: CompactingConditionInput, - ) -> Result { + compaction_args: CompactionConditionInput, + ) -> Result { if !ensure_set_config_flow_supported( dataset_flow_type, - std::any::type_name::(), + std::any::type_name::(), ) { - return Ok(SetFlowCompactingConfigResult::TypeIsNotSupported( + return Ok(SetFlowCompactionConfigResult::TypeIsNotSupported( FlowTypeIsNotSupported, )); } - let compacting_rule = match CompactingRule::new_checked( - compacting_args.max_slice_size, - compacting_args.max_slice_records, + let compaction_rule = match CompactionRule::new_checked( + compaction_args.max_slice_size, + compaction_args.max_slice_records, ) { Ok(rule) => rule, Err(e) => { - return Ok(SetFlowCompactingConfigResult::InvalidCompactingConfig( - FlowInvalidCompactingConfig { + return Ok(SetFlowCompactionConfigResult::InvalidCompactionConfig( + FlowInvalidCompactionConfig { reason: e.to_string(), }, )) @@ -194,7 +194,7 @@ impl DatasetFlowConfigsMut { if let Some(e) = ensure_expected_dataset_kind(ctx, &self.dataset_handle, dataset_flow_type).await? { - return Ok(SetFlowCompactingConfigResult::IncompatibleDatasetKind(e)); + return Ok(SetFlowCompactionConfigResult::IncompatibleDatasetKind(e)); } ensure_scheduling_permission(ctx, &self.dataset_handle).await?; @@ -206,14 +206,14 @@ impl DatasetFlowConfigsMut { FlowKeyDataset::new(self.dataset_handle.id.clone(), dataset_flow_type.into()) .into(), false, - FlowConfigurationRule::CompactingRule(compacting_rule), + FlowConfigurationRule::CompactionRule(compaction_rule), ) .await .map_err(|e| match e { SetFlowConfigurationError::Internal(e) => GqlError::Internal(e), })?; - Ok(SetFlowCompactingConfigResult::Success( + Ok(SetFlowCompactionConfigResult::Success( SetFlowConfigSuccess { config: res.into() }, )) } @@ -301,7 +301,7 @@ struct BatchingConditionInput { /////////////////////////////////////////////////////////////////////////////// #[derive(InputObject)] -struct CompactingConditionInput { +struct CompactionConditionInput { pub max_slice_size: u64, pub max_slice_records: u64, } @@ -355,12 +355,12 @@ impl FlowInvalidBatchingConfig { #[derive(SimpleObject, Debug, Clone)] #[graphql(complex)] -pub(crate) struct FlowInvalidCompactingConfig { +pub(crate) struct FlowInvalidCompactionConfig { reason: String, } #[ComplexObject] -impl FlowInvalidCompactingConfig { +impl FlowInvalidCompactionConfig { pub async fn message(&self) -> String { self.reason.clone() } @@ -368,10 +368,10 @@ impl FlowInvalidCompactingConfig { #[derive(Interface)] #[graphql(field(name = "message", ty = "String"))] -enum SetFlowCompactingConfigResult { +enum SetFlowCompactionConfigResult { Success(SetFlowConfigSuccess), IncompatibleDatasetKind(FlowIncompatibleDatasetKind), - InvalidCompactingConfig(FlowInvalidCompactingConfig), + InvalidCompactionConfig(FlowInvalidCompactionConfig), TypeIsNotSupported(FlowTypeIsNotSupported), } diff --git a/src/adapter/graphql/src/mutations/flows_mut/flows_mut_utils.rs b/src/adapter/graphql/src/mutations/flows_mut/flows_mut_utils.rs index 19e713f33..4dd191d66 100644 --- a/src/adapter/graphql/src/mutations/flows_mut/flows_mut_utils.rs +++ b/src/adapter/graphql/src/mutations/flows_mut/flows_mut_utils.rs @@ -127,7 +127,7 @@ pub(crate) async fn ensure_flow_preconditions( })); }; } - DatasetFlowType::HardCompacting => (), + DatasetFlowType::HardCompaction => (), } Ok(None) } diff --git a/src/adapter/graphql/src/queries/flows/flow.rs b/src/adapter/graphql/src/queries/flows/flow.rs index ede5c6db9..bb9c8968d 100644 --- a/src/adapter/graphql/src/queries/flows/flow.rs +++ b/src/adapter/graphql/src/queries/flows/flow.rs @@ -103,11 +103,11 @@ impl Flow { .int_err()?, }) } - fs::DatasetFlowType::HardCompacting => { - FlowDescriptionDataset::HardCompacting(FlowDescriptionDatasetHardCompacting { + fs::DatasetFlowType::HardCompaction => { + FlowDescriptionDataset::HardCompaction(FlowDescriptionDatasetHardCompaction { dataset_id: dataset_key.dataset_id.clone().into(), - compacting_result: - FlowDescriptionDatasetHardCompactingResult::from_maybe_flow_outcome( + compaction_result: + FlowDescriptionDatasetHardCompactionResult::from_maybe_flow_outcome( self.flow_state.outcome.as_ref(), ), }) @@ -234,7 +234,7 @@ enum FlowDescriptionDataset { PollingIngest(FlowDescriptionDatasetPollingIngest), PushIngest(FlowDescriptionDatasetPushIngest), ExecuteTransform(FlowDescriptionDatasetExecuteTransform), - HardCompacting(FlowDescriptionDatasetHardCompacting), + HardCompaction(FlowDescriptionDatasetHardCompaction), } #[derive(SimpleObject)] @@ -258,9 +258,9 @@ struct FlowDescriptionDatasetExecuteTransform { } #[derive(SimpleObject)] -struct FlowDescriptionDatasetHardCompacting { +struct FlowDescriptionDatasetHardCompaction { dataset_id: DatasetID, - compacting_result: Option, + compaction_result: Option, } /////////////////////////////////////////////////////////////////////////////// @@ -310,13 +310,13 @@ impl FlowDescriptionUpdateResult { /////////////////////////////////////////////////////////////////////////////// #[derive(Union, Debug, Clone)] -enum FlowDescriptionDatasetHardCompactingResult { - NothingToDo(FlowDescriptionHardCompactingNothingToDo), - Success(FlowDescriptionHardCompactingSuccess), +enum FlowDescriptionDatasetHardCompactionResult { + NothingToDo(FlowDescriptionHardCompactionNothingToDo), + Success(FlowDescriptionHardCompactionSuccess), } #[derive(SimpleObject, Debug, Clone)] -struct FlowDescriptionHardCompactingSuccess { +struct FlowDescriptionHardCompactionSuccess { original_blocks_count: u64, resulting_blocks_count: u64, new_head: Multihash, @@ -324,30 +324,30 @@ struct FlowDescriptionHardCompactingSuccess { #[derive(SimpleObject, Debug, Clone)] #[graphql(complex)] -pub struct FlowDescriptionHardCompactingNothingToDo { +pub struct FlowDescriptionHardCompactionNothingToDo { pub _dummy: String, } #[ComplexObject] -impl FlowDescriptionHardCompactingNothingToDo { +impl FlowDescriptionHardCompactionNothingToDo { async fn message(&self) -> String { "Nothing to do".to_string() } } -impl FlowDescriptionDatasetHardCompactingResult { +impl FlowDescriptionDatasetHardCompactionResult { fn from_maybe_flow_outcome(maybe_outcome: Option<&fs::FlowOutcome>) -> Option { if let Some(outcome) = maybe_outcome { match outcome { fs::FlowOutcome::Success(result) => match result { fs::FlowResult::DatasetUpdate(_) => None, fs::FlowResult::Empty => Some(Self::NothingToDo( - FlowDescriptionHardCompactingNothingToDo { + FlowDescriptionHardCompactionNothingToDo { _dummy: "Nothing to do".to_string(), }, )), fs::FlowResult::DatasetCompact(compact) => { - Some(Self::Success(FlowDescriptionHardCompactingSuccess { + Some(Self::Success(FlowDescriptionHardCompactionSuccess { original_blocks_count: compact.old_num_blocks as u64, resulting_blocks_count: compact.new_num_blocks as u64, new_head: compact.new_head.clone().into(), diff --git a/src/adapter/graphql/src/scalars/flow_configuration.rs b/src/adapter/graphql/src/scalars/flow_configuration.rs index aeb9ab000..b65604819 100644 --- a/src/adapter/graphql/src/scalars/flow_configuration.rs +++ b/src/adapter/graphql/src/scalars/flow_configuration.rs @@ -9,7 +9,7 @@ use kamu_flow_system::{ BatchingRule, - CompactingRule, + CompactionRule, FlowConfigurationRule, Schedule, ScheduleCron, @@ -24,7 +24,7 @@ pub struct FlowConfiguration { pub paused: bool, pub schedule: Option, pub batching: Option, - pub compacting: Option, + pub compaction: Option, } impl From for FlowConfiguration { @@ -48,7 +48,7 @@ impl From for FlowConfiguration { } else { None }, - compacting: if let FlowConfigurationRule::CompactingRule(args) = value.rule { + compaction: if let FlowConfigurationRule::CompactionRule(args) = value.rule { Some(args.into()) } else { None @@ -79,13 +79,13 @@ impl From for FlowConfigurationBatching { } #[derive(SimpleObject, Clone, PartialEq, Eq)] -pub struct FlowConfigurationCompacting { +pub struct FlowConfigurationCompaction { pub max_slice_size: u64, pub max_slice_records: u64, } -impl From for FlowConfigurationCompacting { - fn from(value: CompactingRule) -> Self { +impl From for FlowConfigurationCompaction { + fn from(value: CompactionRule) -> Self { Self { max_slice_size: value.max_slice_size(), max_slice_records: value.max_slice_records(), diff --git a/src/adapter/graphql/src/scalars/flow_scalars.rs b/src/adapter/graphql/src/scalars/flow_scalars.rs index 3f6c9d64d..0b8481ef9 100644 --- a/src/adapter/graphql/src/scalars/flow_scalars.rs +++ b/src/adapter/graphql/src/scalars/flow_scalars.rs @@ -105,7 +105,7 @@ pub enum FlowStatus { pub enum DatasetFlowType { Ingest, ExecuteTransform, - HardCompacting, + HardCompaction, } ///////////////////////////////////////////////////////////////////////////////////////// diff --git a/src/adapter/graphql/tests/tests/test_gql_account_flows_configs.rs b/src/adapter/graphql/tests/tests/test_gql_account_flows_configs.rs index 3ead44f84..5b195c604 100644 --- a/src/adapter/graphql/tests/tests/test_gql_account_flows_configs.rs +++ b/src/adapter/graphql/tests/tests/test_gql_account_flows_configs.rs @@ -444,15 +444,15 @@ impl FlowConfigHarness { flowId description { __typename - ... on FlowDescriptionDatasetHardCompacting { + ... on FlowDescriptionDatasetHardCompaction { datasetId - compactingResult { - ... on FlowDescriptionHardCompactingSuccess { + compactionResult { + ... on FlowDescriptionHardCompactionSuccess { originalBlocksCount resultingBlocksCount newHead } - ... on FlowDescriptionHardCompactingNothingToDo { + ... on FlowDescriptionHardCompactionNothingToDo { message } } @@ -715,7 +715,7 @@ impl FlowConfigHarness { batching { __typename } - compacting { + compaction { __typename } } diff --git a/src/adapter/graphql/tests/tests/test_gql_dataset_flow_configs.rs b/src/adapter/graphql/tests/tests/test_gql_dataset_flow_configs.rs index 0949ac602..a7ddcabb7 100644 --- a/src/adapter/graphql/tests/tests/test_gql_dataset_flow_configs.rs +++ b/src/adapter/graphql/tests/tests/test_gql_dataset_flow_configs.rs @@ -60,7 +60,7 @@ async fn test_crud_time_delta_root_dataset() { batching { __typename } - compacting { + compaction { __typename } } @@ -132,7 +132,7 @@ async fn test_crud_time_delta_root_dataset() { "unit": "DAYS" }, "batching": null, - "compacting": null + "compaction": null } } } @@ -177,7 +177,7 @@ async fn test_crud_time_delta_root_dataset() { "unit": "HOURS" }, "batching": null, - "compacting": null + "compaction": null } } } @@ -290,7 +290,7 @@ async fn test_crud_cron_root_dataset() { batching { __typename } - compacting { + compaction { __typename } } @@ -360,7 +360,7 @@ async fn test_crud_cron_root_dataset() { "cron5ComponentExpression": "*/2 * * * *", }, "batching": null, - "compacting": null + "compaction": null } } } @@ -403,7 +403,7 @@ async fn test_crud_cron_root_dataset() { "cron5ComponentExpression": "0 */1 * * *", }, "batching": null, - "compacting": null + "compaction": null } } } @@ -488,7 +488,7 @@ async fn test_crud_batching_derived_dataset() { unit } } - compacting { + compaction { __typename } } @@ -563,7 +563,7 @@ async fn test_crud_batching_derived_dataset() { "unit": "MINUTES" } }, - "compacting": null + "compaction": null } } } @@ -577,7 +577,7 @@ async fn test_crud_batching_derived_dataset() { //////////////////////////////////////////////////////////////////////////////////////// #[test_log::test(tokio::test)] -async fn test_crud_compacting_root_dataset() { +async fn test_crud_compaction_root_dataset() { let harness = FlowConfigHarness::with_overrides(FlowRunsHarnessOverrides { transform_service_mock: Some(MockTransformService::with_set_transform()), polling_service_mock: Some(MockPollingIngestService::with_active_polling_source()), @@ -591,7 +591,7 @@ async fn test_crud_compacting_root_dataset() { byId (datasetId: "") { flows { configs { - byType (datasetFlowType: "HARD_COMPACTING") { + byType (datasetFlowType: "HARD_COMPACTION") { __typename paused schedule { @@ -600,7 +600,7 @@ async fn test_crud_compacting_root_dataset() { batching { __typename } - compacting { + compaction { __typename maxSliceSize maxSliceRecords @@ -639,9 +639,9 @@ async fn test_crud_compacting_root_dataset() { }) ); - let mutation_code = FlowConfigHarness::set_config_compacting_mutation( + let mutation_code = FlowConfigHarness::set_config_compaction_mutation( &create_result.dataset_handle.id, - "HARD_COMPACTING", + "HARD_COMPACTION", 1_000_000, 10000, ); @@ -661,7 +661,7 @@ async fn test_crud_compacting_root_dataset() { "byId": { "flows": { "configs": { - "setConfigCompacting": { + "setConfigCompaction": { "__typename": "SetFlowConfigSuccess", "message": "Success", "config": { @@ -669,8 +669,8 @@ async fn test_crud_compacting_root_dataset() { "paused": false, "schedule": null, "batching": null, - "compacting": { - "__typename": "FlowConfigurationCompacting", + "compaction": { + "__typename": "FlowConfigurationCompaction", "maxSliceSize": 1_000_000, "maxSliceRecords": 10000 } @@ -755,7 +755,7 @@ async fn test_batching_config_validation() { //////////////////////////////////////////////////////////////////////////////////////// #[test_log::test(tokio::test)] -async fn test_compacting_config_validation() { +async fn test_compaction_config_validation() { let harness = FlowConfigHarness::with_overrides(FlowRunsHarnessOverrides { transform_service_mock: Some(MockTransformService::with_set_transform()), polling_service_mock: Some(MockPollingIngestService::with_active_polling_source()), @@ -773,9 +773,9 @@ async fn test_compacting_config_validation() { "Maximum slice records must be a positive number", ), ] { - let mutation_code = FlowConfigHarness::set_config_compacting_mutation( + let mutation_code = FlowConfigHarness::set_config_compaction_mutation( &create_derived_result.dataset_handle.id, - "HARD_COMPACTING", + "HARD_COMPACTION", test_case.0, test_case.1, ); @@ -794,8 +794,8 @@ async fn test_compacting_config_validation() { "byId": { "flows": { "configs": { - "setConfigCompacting": { - "__typename": "FlowInvalidCompactingConfig", + "setConfigCompaction": { + "__typename": "FlowInvalidCompactionConfig", "message": test_case.2, } } @@ -944,14 +944,14 @@ async fn test_pause_resume_dataset_flows() { check_dataset_all_configs_status(&harness, &schema, dataset_id, expect_paused).await; } - let mutation_pause_root_compacting = FlowConfigHarness::pause_flows_of_type_mutation( + let mutation_pause_root_compaction = FlowConfigHarness::pause_flows_of_type_mutation( &create_derived_result.dataset_handle.id, "EXECUTE_TRANSFORM", ); let res = schema .execute( - async_graphql::Request::new(mutation_pause_root_compacting) + async_graphql::Request::new(mutation_pause_root_compaction) .data(harness.catalog_authorized.clone()), ) .await; @@ -1301,9 +1301,9 @@ async fn test_incorrect_dataset_kinds_for_flow_type() { //// - let mutation_code = FlowConfigHarness::set_config_compacting_mutation( + let mutation_code = FlowConfigHarness::set_config_compaction_mutation( &create_derived_result.dataset_handle.id, - "HARD_COMPACTING", + "HARD_COMPACTION", 1000, 1000, ); @@ -1323,7 +1323,7 @@ async fn test_incorrect_dataset_kinds_for_flow_type() { "byId": { "flows": { "configs": { - "setConfigCompacting": { + "setConfigCompaction": { "__typename": "FlowIncompatibleDatasetKind", "message": "Expected a Root dataset, but a Derivative dataset was provided", } @@ -1338,7 +1338,7 @@ async fn test_incorrect_dataset_kinds_for_flow_type() { //////////////////////////////////////////////////////////////////////////////////////// #[test_log::test(tokio::test)] -async fn test_set_config_for_hard_compacting_fails() { +async fn test_set_config_for_hard_compaction_fails() { let harness = FlowConfigHarness::with_overrides(FlowRunsHarnessOverrides { transform_service_mock: Some(MockTransformService::without_set_transform()), polling_service_mock: Some(MockPollingIngestService::without_active_polling_source()), @@ -1349,7 +1349,7 @@ async fn test_set_config_for_hard_compacting_fails() { let mutation_code = FlowConfigHarness::set_config_batching_mutation( &create_root_result.dataset_handle.id, - "HARD_COMPACTING", + "HARD_COMPACTION", false, 1, (30, "MINUTES"), @@ -1387,7 +1387,7 @@ async fn test_set_config_for_hard_compacting_fails() { let mutation_code = FlowConfigHarness::set_config_cron_expression_mutation( &create_root_result.dataset_handle.id, - "HARD_COMPACTING", + "HARD_COMPACTION", false, "0 */2 * * *", ); @@ -1613,7 +1613,7 @@ impl FlowConfigHarness { batching { __typename } - compacting { + compaction { __typename } } @@ -1668,7 +1668,7 @@ impl FlowConfigHarness { batching { __typename } - compacting { + compaction { __typename } } @@ -1729,7 +1729,7 @@ impl FlowConfigHarness { unit } } - compacting { + compaction { __typename } } @@ -1751,7 +1751,7 @@ impl FlowConfigHarness { .replace("", &min_records_to_await.to_string()) } - fn set_config_compacting_mutation( + fn set_config_compaction_mutation( id: &DatasetID, dataset_flow_type: &str, max_slice_size: u64, @@ -1764,9 +1764,9 @@ impl FlowConfigHarness { byId (datasetId: "") { flows { configs { - setConfigCompacting ( + setConfigCompaction ( datasetFlowType: "", - compactingArgs: { + compactionArgs: { maxSliceSize: , maxSliceRecords: } @@ -1786,7 +1786,7 @@ impl FlowConfigHarness { batching { __typename } - compacting { + compaction { __typename maxSliceSize maxSliceRecords diff --git a/src/adapter/graphql/tests/tests/test_gql_dataset_flow_runs.rs b/src/adapter/graphql/tests/tests/test_gql_dataset_flow_runs.rs index a87a4f6e3..47bfac49e 100644 --- a/src/adapter/graphql/tests/tests/test_gql_dataset_flow_runs.rs +++ b/src/adapter/graphql/tests/tests/test_gql_dataset_flow_runs.rs @@ -34,7 +34,7 @@ use kamu_accounts_inmem::AccountRepositoryInMemory; use kamu_accounts_services::AuthenticationServiceImpl; use kamu_core::{ auth, - CompactingResult, + CompactionResult, CreateDatasetResult, DatasetChangesService, DatasetIntervalIncrement, @@ -636,7 +636,7 @@ async fn test_trigger_execute_transform_derived_dataset() { //////////////////////////////////////////////////////////////////////////////////////// #[test_log::test(tokio::test)] -async fn test_trigger_compacting_root_dataset() { +async fn test_trigger_compaction_root_dataset() { let harness = FlowRunsHarness::with_overrides(FlowRunsHarnessOverrides { dependency_graph_mock: Some(MockDependencyGraphRepository::no_dependencies()), dataset_changes_mock: Some(MockDatasetChangesService::with_increment_between( @@ -653,7 +653,7 @@ async fn test_trigger_compacting_root_dataset() { let create_result = harness.create_root_dataset().await; let mutation_code = - FlowRunsHarness::trigger_flow_mutation(&create_result.dataset_handle.id, "HARD_COMPACTING"); + FlowRunsHarness::trigger_flow_mutation(&create_result.dataset_handle.id, "HARD_COMPACTION"); let schema = kamu_adapter_graphql::schema_quiet(); let response = schema @@ -709,9 +709,9 @@ async fn test_trigger_compacting_root_dataset() { { "flowId": "0", "description": { - "__typename": "FlowDescriptionDatasetHardCompacting", + "__typename": "FlowDescriptionDatasetHardCompaction", "datasetId": create_result.dataset_handle.id.to_string(), - "compactingResult": null, + "compactionResult": null, }, "status": "WAITING", "outcome": null, @@ -774,9 +774,9 @@ async fn test_trigger_compacting_root_dataset() { { "flowId": "0", "description": { - "__typename": "FlowDescriptionDatasetHardCompacting", + "__typename": "FlowDescriptionDatasetHardCompaction", "datasetId": create_result.dataset_handle.id.to_string(), - "compactingResult": null, + "compactionResult": null, }, "status": "WAITING", "outcome": null, @@ -848,9 +848,9 @@ async fn test_trigger_compacting_root_dataset() { { "flowId": "0", "description": { - "__typename": "FlowDescriptionDatasetHardCompacting", + "__typename": "FlowDescriptionDatasetHardCompaction", "datasetId": create_result.dataset_handle.id.to_string(), - "compactingResult": null, + "compactionResult": null, }, "status": "RUNNING", "outcome": null, @@ -903,9 +903,9 @@ async fn test_trigger_compacting_root_dataset() { .mimic_task_completed( flow_task_id, complete_time, - ts::TaskOutcome::Success(ts::TaskResult::CompactingDatasetResult( - ts::TaskCompactingDatasetResult { - compacting_result: CompactingResult::Success { + ts::TaskOutcome::Success(ts::TaskResult::CompactionDatasetResult( + ts::TaskCompactionDatasetResult { + compaction_result: CompactionResult::Success { old_head: Multihash::from_digest_sha3_256(b"old-slice"), new_head: new_head.clone(), old_num_blocks: 5, @@ -936,9 +936,9 @@ async fn test_trigger_compacting_root_dataset() { { "flowId": "0", "description": { - "__typename": "FlowDescriptionDatasetHardCompacting", + "__typename": "FlowDescriptionDatasetHardCompaction", "datasetId": create_result.dataset_handle.id.to_string(), - "compactingResult": { + "compactionResult": { "originalBlocksCount": 5, "resultingBlocksCount": 4, "newHead": new_head @@ -1003,8 +1003,8 @@ async fn test_list_flows_with_filters_and_pagination() { let ingest_mutation_code = FlowRunsHarness::trigger_flow_mutation(&create_result.dataset_handle.id, "INGEST"); - let compacting_mutation_code = - FlowRunsHarness::trigger_flow_mutation(&create_result.dataset_handle.id, "HARD_COMPACTING"); + let compaction_mutation_code = + FlowRunsHarness::trigger_flow_mutation(&create_result.dataset_handle.id, "HARD_COMPACTION"); let schema = kamu_adapter_graphql::schema_quiet(); @@ -1018,7 +1018,7 @@ async fn test_list_flows_with_filters_and_pagination() { let response = schema .execute( - async_graphql::Request::new(compacting_mutation_code.clone()) + async_graphql::Request::new(compaction_mutation_code.clone()) .data(harness.catalog_authorized.clone()), ) .await; @@ -1152,7 +1152,7 @@ async fn test_list_flows_with_filters_and_pagination() { runs { listFlows( filters: { - byFlowType: "HARD_COMPACTING" + byFlowType: "HARD_COMPACTION" } ) { nodes { @@ -1556,7 +1556,7 @@ async fn test_incorrect_dataset_kinds_for_flow_type() { let mutation_code = FlowRunsHarness::trigger_flow_mutation( &create_derived_result.dataset_handle.id, - "HARD_COMPACTING", + "HARD_COMPACTION", ); let schema = kamu_adapter_graphql::schema_quiet(); @@ -1732,7 +1732,7 @@ async fn test_cancel_running_transform_derived_dataset() { //////////////////////////////////////////////////////////////////////////////////////// #[test_log::test(tokio::test)] -async fn test_cancel_hard_compacting_root_dataset() { +async fn test_cancel_hard_compaction_root_dataset() { let harness = FlowRunsHarness::with_overrides(FlowRunsHarnessOverrides { dependency_graph_mock: None, dataset_changes_mock: None, @@ -1742,7 +1742,7 @@ async fn test_cancel_hard_compacting_root_dataset() { let create_result = harness.create_root_dataset().await; let mutation_code = - FlowRunsHarness::trigger_flow_mutation(&create_result.dataset_handle.id, "HARD_COMPACTING"); + FlowRunsHarness::trigger_flow_mutation(&create_result.dataset_handle.id, "HARD_COMPACTION"); let schema = kamu_adapter_graphql::schema_quiet(); let response = schema @@ -2528,15 +2528,15 @@ impl FlowRunsHarness { flowId description { __typename - ... on FlowDescriptionDatasetHardCompacting { + ... on FlowDescriptionDatasetHardCompaction { datasetId - compactingResult { - ... on FlowDescriptionHardCompactingSuccess { + compactionResult { + ... on FlowDescriptionHardCompactionSuccess { originalBlocksCount resultingBlocksCount newHead } - ... on FlowDescriptionHardCompactingNothingToDo { + ... on FlowDescriptionHardCompactionNothingToDo { message } } diff --git a/src/app/cli/src/app.rs b/src/app/cli/src/app.rs index 380df0167..aab40260f 100644 --- a/src/app/cli/src/app.rs +++ b/src/app/cli/src/app.rs @@ -270,9 +270,9 @@ pub fn configure_base_catalog( b.add::(); b.add_builder( - CompactingServiceImpl::builder().with_run_info_dir(workspace_layout.run_info_dir.clone()), + CompactionServiceImpl::builder().with_run_info_dir(workspace_layout.run_info_dir.clone()), ); - b.bind::(); + b.bind::(); b.add::(); diff --git a/src/app/cli/src/cli_parser.rs b/src/app/cli/src/cli_parser.rs index 7a761b9cf..8e929668a 100644 --- a/src/app/cli/src/cli_parser.rs +++ b/src/app/cli/src/cli_parser.rs @@ -1322,25 +1322,25 @@ pub fn cli() -> Command { Arg::new("hard") .long("hard") .action(ArgAction::SetTrue) - .help("Perform 'hard' compacting that rewrites the history of a dataset"), + .help("Perform 'hard' compaction that rewrites the history of a dataset"), Arg::new("verify") .long("verify") .action(ArgAction::SetTrue) - .help("Perform verification of the dataset before running a compacting"), + .help("Perform verification of the dataset before running a compaction"), ]) .after_help(indoc::indoc!( r#" For datasets that get frequent small appends the number of data slices can grow over time and affect the performance of querying. This command allows to merge multiple small data slices into a few large files, which can be beneficial in terms of size from more compact encoding, and in query performance, as data engines will have to scan through far fewer file headers. - There are two types of compactings: soft and hard. + There are two types of compactions: soft and hard. - Soft compactings produce new files while leaving the old blocks intact. This allows for faster queries, while still preserving the accurate history of how dataset evolved over time. + Soft compactions produce new files while leaving the old blocks intact. This allows for faster queries, while still preserving the accurate history of how dataset evolved over time. - Hard compactings rewrite the history of the dataset as if data was originally written in big batches. They allow to shrink the history of a dataset to just a few blocks, reclaim the space used by old data files, but at the expense of history loss. Hard compactings will rewrite the metadata chain, changing block hashes. Therefore, they will **break all downstream datasets** that depend on them. + Hard compactions rewrite the history of the dataset as if data was originally written in big batches. They allow to shrink the history of a dataset to just a few blocks, reclaim the space used by old data files, but at the expense of history loss. Hard compactions will rewrite the metadata chain, changing block hashes. Therefore, they will **break all downstream datasets** that depend on them. **Examples:** - Perform a history-altering hard compacting: + Perform a history-altering hard compaction: kamu system compact --hard my.dataset "# diff --git a/src/app/cli/src/commands/compact_command.rs b/src/app/cli/src/commands/compact_command.rs index 156b30451..4cc3294da 100644 --- a/src/app/cli/src/commands/compact_command.rs +++ b/src/app/cli/src/commands/compact_command.rs @@ -10,8 +10,8 @@ use std::sync::Arc; use kamu::domain::{ - CompactingOptions, - CompactingService, + CompactionOptions, + CompactionService, DatasetRepository, VerificationMultiListener, VerificationOptions, @@ -19,12 +19,12 @@ use kamu::domain::{ }; use opendatafabric::{DatasetHandle, DatasetRef}; -use crate::{CLIError, Command, CompactingMultiProgress, VerificationMultiProgress}; +use crate::{CLIError, Command, CompactionMultiProgress, VerificationMultiProgress}; pub struct CompactCommand { dataset_repo: Arc, verification_svc: Arc, - compacting_svc: Arc, + compaction_svc: Arc, dataset_ref: DatasetRef, max_slice_size: u64, max_slice_records: u64, @@ -36,7 +36,7 @@ impl CompactCommand { pub fn new( dataset_repo: Arc, verification_svc: Arc, - compacting_svc: Arc, + compaction_svc: Arc, dataset_ref: DatasetRef, max_slice_size: u64, max_slice_records: u64, @@ -46,7 +46,7 @@ impl CompactCommand { Self { dataset_repo, verification_svc, - compacting_svc, + compaction_svc, dataset_ref, max_slice_size, max_slice_records, @@ -84,7 +84,7 @@ impl Command for CompactCommand { async fn run(&mut self) -> Result<(), CLIError> { if !self.is_hard { return Err(CLIError::usage_error( - "Soft compactings are not yet supported", + "Soft compactions are not yet supported", )); } let dataset_handle = self @@ -97,24 +97,24 @@ impl Command for CompactCommand { if let Err(err) = self.verify_dataset(&dataset_handle).await { eprintln!( "{}", - console::style("Cannot perform compacting, dataset is invalid".to_string()) + console::style("Cannot perform compaction, dataset is invalid".to_string()) .red() ); return Err(err); } } - let progress = CompactingMultiProgress::new(); + let progress = CompactionMultiProgress::new(); let listener = Arc::new(progress.clone()); let draw_thread = std::thread::spawn(move || { progress.draw(); }); - self.compacting_svc + self.compaction_svc .compact_dataset( &dataset_handle, - CompactingOptions { + CompactionOptions { max_slice_size: Some(self.max_slice_size), max_slice_records: Some(self.max_slice_records), }, diff --git a/src/app/cli/src/output/compact_progress.rs b/src/app/cli/src/output/compact_progress.rs index 8e801cded..674935ffd 100644 --- a/src/app/cli/src/output/compact_progress.rs +++ b/src/app/cli/src/output/compact_progress.rs @@ -12,20 +12,20 @@ use std::sync::Arc; use std::time::Duration; use kamu::domain::{ - CompactingListener, - CompactingMultiListener, - CompactingPhase, - CompactingResult, + CompactionListener, + CompactionMultiListener, + CompactionPhase, + CompactionResult, }; use opendatafabric::DatasetHandle; #[derive(Clone)] -pub struct CompactingMultiProgress { +pub struct CompactionMultiProgress { pub multi_progress: Arc, pub finished: Arc, } -impl CompactingMultiProgress { +impl CompactionMultiProgress { pub fn new() -> Self { Self { multi_progress: Arc::new(indicatif::MultiProgress::new()), @@ -47,23 +47,23 @@ impl CompactingMultiProgress { } } -impl CompactingMultiListener for CompactingMultiProgress { - fn begin_compact(&self, dataset_handle: &DatasetHandle) -> Option> { - Some(Arc::new(CompactingProgress::new( +impl CompactionMultiListener for CompactionMultiProgress { + fn begin_compact(&self, dataset_handle: &DatasetHandle) -> Option> { + Some(Arc::new(CompactionProgress::new( dataset_handle, &self.multi_progress, ))) } } -pub struct CompactingProgress { +pub struct CompactionProgress { dataset_handle: DatasetHandle, curr_progress: indicatif::ProgressBar, } /////////////////////////////////////////////////////////////////////////////// -impl CompactingProgress { +impl CompactionProgress { pub fn new( dataset_handle: &DatasetHandle, multi_progress: &Arc, @@ -96,20 +96,20 @@ impl CompactingProgress { /////////////////////////////////////////////////////////////////////////////// -impl CompactingListener for CompactingProgress { +impl CompactionListener for CompactionProgress { fn begin(&self) { self.curr_progress - .set_message(self.spinner_message("Compacting dataset")); + .set_message(self.spinner_message("Compaction dataset")); } - fn success(&self, res: &CompactingResult) { + fn success(&self, res: &CompactionResult) { match res { - CompactingResult::NothingToDo => { + CompactionResult::NothingToDo => { self.curr_progress.finish_with_message( self.spinner_message(console::style("Dataset was left intact").green()), ); } - CompactingResult::Success { + CompactionResult::Success { old_head: _, new_head: _, old_num_blocks, @@ -128,14 +128,14 @@ impl CompactingListener for CompactingProgress { } } - fn begin_phase(&self, phase: CompactingPhase) { + fn begin_phase(&self, phase: CompactionPhase) { let message = match phase { - CompactingPhase::GatherChainInfo => "Gathering chain information", - CompactingPhase::MergeDataslices => "Merging dataslices", - CompactingPhase::CommitNewBlocks => "Committing new blocks", + CompactionPhase::GatherChainInfo => "Gathering chain information", + CompactionPhase::MergeDataslices => "Merging dataslices", + CompactionPhase::CommitNewBlocks => "Committing new blocks", }; self.curr_progress.set_message(message); } - fn end_phase(&self, _phase: CompactingPhase) {} + fn end_phase(&self, _phase: CompactionPhase) {} } diff --git a/src/domain/core/src/services/compacting_service.rs b/src/domain/core/src/services/compaction_service.rs similarity index 70% rename from src/domain/core/src/services/compacting_service.rs rename to src/domain/core/src/services/compaction_service.rs index e6b9cf694..06e555311 100644 --- a/src/domain/core/src/services/compacting_service.rs +++ b/src/domain/core/src/services/compaction_service.rs @@ -19,13 +19,13 @@ pub const DEFAULT_MAX_SLICE_SIZE: u64 = 1_073_741_824; pub const DEFAULT_MAX_SLICE_RECORDS: u64 = 10000; #[async_trait::async_trait] -pub trait CompactingService: Send + Sync { +pub trait CompactionService: Send + Sync { async fn compact_dataset( &self, dataset_handle: &DatasetHandle, - options: CompactingOptions, - listener: Option>, - ) -> Result; + options: CompactionOptions, + listener: Option>, + ) -> Result; } /////////////////////////////////////////////////////////////////////////////// @@ -33,7 +33,7 @@ pub trait CompactingService: Send + Sync { /////////////////////////////////////////////////////////////////////////////// #[derive(Debug, Error)] -pub enum CompactingError { +pub enum CompactionError { #[error(transparent)] DatasetNotFound( #[from] @@ -60,7 +60,7 @@ pub enum CompactingError { ), } -impl From for CompactingError { +impl From for CompactionError { fn from(v: GetDatasetError) -> Self { match v { GetDatasetError::NotFound(e) => Self::DatasetNotFound(e), @@ -69,7 +69,7 @@ impl From for CompactingError { } } -impl From for CompactingError { +impl From for CompactionError { fn from(v: auth::DatasetActionUnauthorizedError) -> Self { match v { auth::DatasetActionUnauthorizedError::Access(e) => Self::Access(e), @@ -78,7 +78,7 @@ impl From for CompactingError { } } -impl From for CompactingError { +impl From for CompactionError { fn from(v: GetRefError) -> Self { match v { GetRefError::NotFound(e) => Self::Internal(e.int_err()), @@ -88,22 +88,22 @@ impl From for CompactingError { } } -impl From for CompactingError { +impl From for CompactionError { fn from(v: IterBlocksError) -> Self { match v { - IterBlocksError::Access(e) => CompactingError::Access(e), - IterBlocksError::Internal(e) => CompactingError::Internal(e), - _ => CompactingError::Internal(v.int_err()), + IterBlocksError::Access(e) => CompactionError::Access(e), + IterBlocksError::Internal(e) => CompactionError::Internal(e), + _ => CompactionError::Internal(v.int_err()), } } } -impl From for CompactingError { +impl From for CompactionError { fn from(v: SetRefError) -> Self { match v { - SetRefError::Access(e) => CompactingError::Access(e), - SetRefError::Internal(e) => CompactingError::Internal(e), - _ => CompactingError::Internal(v.int_err()), + SetRefError::Access(e) => CompactionError::Access(e), + SetRefError::Internal(e) => CompactionError::Internal(e), + _ => CompactionError::Internal(v.int_err()), } } } @@ -118,38 +118,38 @@ pub struct InvalidDatasetKindError { // Progress bar /////////////////////////////////////////////////////////////////////////////// -pub trait CompactingListener: Send + Sync { +pub trait CompactionListener: Send + Sync { fn begin(&self) {} - fn success(&self, _res: &CompactingResult) {} - fn error(&self, _err: &CompactingError) {} + fn success(&self, _res: &CompactionResult) {} + fn error(&self, _err: &CompactionError) {} - fn begin_phase(&self, _phase: CompactingPhase) {} - fn end_phase(&self, _phase: CompactingPhase) {} + fn begin_phase(&self, _phase: CompactionPhase) {} + fn end_phase(&self, _phase: CompactionPhase) {} } -pub struct NullCompactingListener; -impl CompactingListener for NullCompactingListener {} +pub struct NullCompactionListener; +impl CompactionListener for NullCompactionListener {} /////////////////////////////////////////////////////////////////////////////// -pub trait CompactingMultiListener: Send + Sync { - fn begin_compact(&self, _dataset: &DatasetHandle) -> Option> { +pub trait CompactionMultiListener: Send + Sync { + fn begin_compact(&self, _dataset: &DatasetHandle) -> Option> { None } } -pub struct NullCompactingMultiListener; -impl CompactingMultiListener for NullCompactingMultiListener {} +pub struct NullCompactionMultiListener; +impl CompactionMultiListener for NullCompactionMultiListener {} #[derive(Debug, Clone, Copy, PartialEq, Eq)] -pub enum CompactingPhase { +pub enum CompactionPhase { GatherChainInfo, MergeDataslices, CommitNewBlocks, } #[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)] -pub enum CompactingResult { +pub enum CompactionResult { NothingToDo, Success { old_head: Multihash, @@ -162,12 +162,12 @@ pub enum CompactingResult { /////////////////////////////////////////////////////////////////////////////// #[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)] -pub struct CompactingOptions { +pub struct CompactionOptions { pub max_slice_size: Option, pub max_slice_records: Option, } -impl Default for CompactingOptions { +impl Default for CompactionOptions { fn default() -> Self { Self { max_slice_size: Some(DEFAULT_MAX_SLICE_SIZE), diff --git a/src/domain/core/src/services/mod.rs b/src/domain/core/src/services/mod.rs index 604aeecca..fcbd9f7ec 100644 --- a/src/domain/core/src/services/mod.rs +++ b/src/domain/core/src/services/mod.rs @@ -10,7 +10,7 @@ // Re-exports pub use container_runtime::{NullPullImageListener, PullImageListener}; -pub mod compacting_service; +pub mod compaction_service; pub mod dataset_changes_service; pub mod dependency_graph_repository; pub mod dependency_graph_service; @@ -31,7 +31,7 @@ pub mod sync_service; pub mod transform_service; pub mod verification_service; -pub use compacting_service::*; +pub use compaction_service::*; pub use dataset_changes_service::*; pub use dependency_graph_repository::*; pub use dependency_graph_service::*; diff --git a/src/domain/flow-system/domain/src/entities/flow/flow_outcome.rs b/src/domain/flow-system/domain/src/entities/flow/flow_outcome.rs index e67bf84a9..dd7c2e7e4 100644 --- a/src/domain/flow-system/domain/src/entities/flow/flow_outcome.rs +++ b/src/domain/flow-system/domain/src/entities/flow/flow_outcome.rs @@ -7,7 +7,7 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. -use kamu_core::{CompactingResult, PullResult}; +use kamu_core::{CompactionResult, PullResult}; use kamu_task_system as ts; use opendatafabric::{DatasetID, Multihash}; use ts::TaskError; @@ -100,10 +100,10 @@ impl From for FlowResult { } } } - ts::TaskResult::CompactingDatasetResult(task_compacting_result) => { - match task_compacting_result.compacting_result { - CompactingResult::NothingToDo => Self::Empty, - CompactingResult::Success { + ts::TaskResult::CompactionDatasetResult(task_compaction_result) => { + match task_compaction_result.compaction_result { + CompactionResult::NothingToDo => Self::Empty, + CompactionResult::Success { new_head, old_num_blocks, new_num_blocks, diff --git a/src/domain/flow-system/domain/src/entities/flow/flow_trigger.rs b/src/domain/flow-system/domain/src/entities/flow/flow_trigger.rs index b2a74a95e..425d0a863 100644 --- a/src/domain/flow-system/domain/src/entities/flow/flow_trigger.rs +++ b/src/domain/flow-system/domain/src/entities/flow/flow_trigger.rs @@ -220,7 +220,7 @@ mod tests { FlowTriggerInputDatasetFlow { trigger_time: Utc::now(), dataset_id: TEST_DATASET_ID.clone(), - flow_type: DatasetFlowType::HardCompacting, // unrelated + flow_type: DatasetFlowType::HardCompaction, // unrelated flow_id: FlowID::new(7), flow_result: FlowResult::Empty } diff --git a/src/domain/flow-system/domain/src/entities/flow_configuration/flow_configuration_rule.rs b/src/domain/flow-system/domain/src/entities/flow_configuration/flow_configuration_rule.rs index 0e16dba42..4e3149fd4 100644 --- a/src/domain/flow-system/domain/src/entities/flow_configuration/flow_configuration_rule.rs +++ b/src/domain/flow-system/domain/src/entities/flow_configuration/flow_configuration_rule.rs @@ -9,7 +9,7 @@ use serde::{Deserialize, Serialize}; -use crate::{BatchingRule, CompactingRule, Schedule}; +use crate::{BatchingRule, CompactionRule, Schedule}; ///////////////////////////////////////////////////////////////////////////////////////// @@ -17,7 +17,7 @@ use crate::{BatchingRule, CompactingRule, Schedule}; pub enum FlowConfigurationRule { Schedule(Schedule), BatchingRule(BatchingRule), - CompactingRule(CompactingRule), + CompactionRule(CompactionRule), } ///////////////////////////////////////////////////////////////////////////////////////// diff --git a/src/domain/flow-system/domain/src/entities/shared/compacting_rule.rs b/src/domain/flow-system/domain/src/entities/shared/compaction_rule.rs similarity index 71% rename from src/domain/flow-system/domain/src/entities/shared/compacting_rule.rs rename to src/domain/flow-system/domain/src/entities/shared/compaction_rule.rs index 5eb4732d9..77c4912ff 100644 --- a/src/domain/flow-system/domain/src/entities/shared/compacting_rule.rs +++ b/src/domain/flow-system/domain/src/entities/shared/compaction_rule.rs @@ -13,21 +13,21 @@ use thiserror::Error; ///////////////////////////////////////////////////////////////////////////////////////// #[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] -pub struct CompactingRule { +pub struct CompactionRule { max_slice_size: u64, max_slice_records: u64, } -impl CompactingRule { +impl CompactionRule { pub fn new_checked( max_slice_size: u64, max_slice_records: u64, - ) -> Result { + ) -> Result { if max_slice_size == 0 { - return Err(CompactingRuleValidationError::MaxSliceSizeNotPositive); + return Err(CompactionRuleValidationError::MaxSliceSizeNotPositive); } if max_slice_records == 0 { - return Err(CompactingRuleValidationError::MaxSliceRecordsNotPositive); + return Err(CompactionRuleValidationError::MaxSliceRecordsNotPositive); } Ok(Self { @@ -50,7 +50,7 @@ impl CompactingRule { ///////////////////////////////////////////////////////////////////////////////////////// #[derive(Error, Debug)] -pub enum CompactingRuleValidationError { +pub enum CompactionRuleValidationError { #[error("Maximum slice records must be a positive number")] MaxSliceRecordsNotPositive, @@ -64,28 +64,28 @@ pub enum CompactingRuleValidationError { mod tests { use std::assert_matches::assert_matches; - use crate::{CompactingRule, CompactingRuleValidationError}; + use crate::{CompactionRule, CompactionRuleValidationError}; #[test] - fn test_valid_compacting_rule() { - assert_matches!(CompactingRule::new_checked(1, 1), Ok(_)); - assert_matches!(CompactingRule::new_checked(1_000_000, 1_000_000), Ok(_)); - assert_matches!(CompactingRule::new_checked(1, 20), Ok(_)); + fn test_valid_compaction_rule() { + assert_matches!(CompactionRule::new_checked(1, 1), Ok(_)); + assert_matches!(CompactionRule::new_checked(1_000_000, 1_000_000), Ok(_)); + assert_matches!(CompactionRule::new_checked(1, 20), Ok(_)); } #[test] fn test_non_positive_max_slice_records() { assert_matches!( - CompactingRule::new_checked(100, 0), - Err(CompactingRuleValidationError::MaxSliceRecordsNotPositive) + CompactionRule::new_checked(100, 0), + Err(CompactionRuleValidationError::MaxSliceRecordsNotPositive) ); } #[test] fn test_non_positive_max_slice_size() { assert_matches!( - CompactingRule::new_checked(0, 100), - Err(CompactingRuleValidationError::MaxSliceSizeNotPositive) + CompactionRule::new_checked(0, 100), + Err(CompactionRuleValidationError::MaxSliceSizeNotPositive) ); } } diff --git a/src/domain/flow-system/domain/src/entities/shared/flow_type.rs b/src/domain/flow-system/domain/src/entities/shared/flow_type.rs index f46957eb8..5ef7d5656 100644 --- a/src/domain/flow-system/domain/src/entities/shared/flow_type.rs +++ b/src/domain/flow-system/domain/src/entities/shared/flow_type.rs @@ -11,24 +11,24 @@ use serde::{Deserialize, Serialize}; -use crate::{BatchingRule, CompactingRule, Schedule}; +use crate::{BatchingRule, CompactionRule, Schedule}; #[derive(Debug, Copy, Clone, Hash, Eq, PartialEq, Serialize, Deserialize, sqlx::Type)] #[sqlx(type_name = "dataset_flow_type", rename_all = "snake_case")] pub enum DatasetFlowType { Ingest, ExecuteTransform, - HardCompacting, + HardCompaction, } impl DatasetFlowType { pub fn all() -> &'static [DatasetFlowType] { - &[Self::Ingest, Self::ExecuteTransform, Self::HardCompacting] + &[Self::Ingest, Self::ExecuteTransform, Self::HardCompaction] } pub fn dataset_kind_restriction(&self) -> Option { match self { - DatasetFlowType::Ingest | DatasetFlowType::HardCompacting => { + DatasetFlowType::Ingest | DatasetFlowType::HardCompaction => { Some(opendatafabric::DatasetKind::Root) } DatasetFlowType::ExecuteTransform => Some(opendatafabric::DatasetKind::Derivative), @@ -41,8 +41,8 @@ impl DatasetFlowType { DatasetFlowType::ExecuteTransform => { flow_configuration_type == std::any::type_name::() } - DatasetFlowType::HardCompacting => { - flow_configuration_type == std::any::type_name::() + DatasetFlowType::HardCompaction => { + flow_configuration_type == std::any::type_name::() } } } @@ -77,7 +77,7 @@ impl AnyFlowType { AnyFlowType::Dataset( DatasetFlowType::Ingest | DatasetFlowType::ExecuteTransform - | DatasetFlowType::HardCompacting, + | DatasetFlowType::HardCompaction, ) => FlowSuccessFollowupMethod::TriggerDependent, _ => FlowSuccessFollowupMethod::Ignore, } diff --git a/src/domain/flow-system/domain/src/entities/shared/mod.rs b/src/domain/flow-system/domain/src/entities/shared/mod.rs index 21c7fac23..0e57808bb 100644 --- a/src/domain/flow-system/domain/src/entities/shared/mod.rs +++ b/src/domain/flow-system/domain/src/entities/shared/mod.rs @@ -8,13 +8,13 @@ // by the Apache License, Version 2.0. mod batching_rule; -mod compacting_rule; +mod compaction_rule; mod flow_key; mod flow_type; mod schedule; pub use batching_rule::*; -pub use compacting_rule::*; +pub use compaction_rule::*; pub use flow_key::*; pub use flow_type::*; pub use schedule::*; diff --git a/src/domain/flow-system/services/src/flow/active_configs_state.rs b/src/domain/flow-system/services/src/flow/active_configs_state.rs index b7f0a9474..657385ab3 100644 --- a/src/domain/flow-system/services/src/flow/active_configs_state.rs +++ b/src/domain/flow-system/services/src/flow/active_configs_state.rs @@ -19,7 +19,7 @@ pub(crate) struct ActiveConfigsState { dataset_schedules: HashMap, system_schedules: HashMap, dataset_batching_rules: HashMap, - dataset_compacting_rules: HashMap, + dataset_compaction_rules: HashMap, } impl ActiveConfigsState { @@ -36,8 +36,8 @@ impl ActiveConfigsState { FlowConfigurationRule::BatchingRule(batching) => { self.dataset_batching_rules.insert(key, batching); } - FlowConfigurationRule::CompactingRule(compacting) => { - self.dataset_compacting_rules.insert(key, compacting); + FlowConfigurationRule::CompactionRule(compaction) => { + self.dataset_compaction_rules.insert(key, compaction); } } } @@ -66,7 +66,7 @@ impl ActiveConfigsState { fn drop_dataset_flow_config(&mut self, flow_key: BorrowedFlowKeyDataset) { self.dataset_schedules.remove(flow_key.as_trait()); self.dataset_batching_rules.remove(flow_key.as_trait()); - self.dataset_compacting_rules.remove(flow_key.as_trait()); + self.dataset_compaction_rules.remove(flow_key.as_trait()); } pub fn try_get_flow_schedule(&self, flow_key: &FlowKey) -> Option { @@ -92,12 +92,12 @@ impl ActiveConfigsState { .copied() } - pub fn try_get_dataset_compacting_rule( + pub fn try_get_dataset_compaction_rule( &self, dataset_id: &DatasetID, flow_type: DatasetFlowType, - ) -> Option { - self.dataset_compacting_rules + ) -> Option { + self.dataset_compaction_rules .get(BorrowedFlowKeyDataset::new(dataset_id, flow_type).as_trait()) .copied() } diff --git a/src/domain/flow-system/services/src/flow/flow_service_impl.rs b/src/domain/flow-system/services/src/flow/flow_service_impl.rs index dfcf17335..d2a9508c1 100644 --- a/src/domain/flow-system/services/src/flow/flow_service_impl.rs +++ b/src/domain/flow-system/services/src/flow/flow_service_impl.rs @@ -190,9 +190,9 @@ impl FlowServiceImpl { self.enqueue_auto_polling_flow_unconditionally(start_time, &flow_key) .await?; } - // Sucn as compacting is very dangerous operation we + // Sucn as compaction is very dangerous operation we // skip running it during activation flow configurations - FlowConfigurationRule::CompactingRule(_) => (), + FlowConfigurationRule::CompactionRule(_) => (), } let mut state = self.state.lock().unwrap(); @@ -745,25 +745,25 @@ impl FlowServiceImpl { dataset_id: flow_key.dataset_id.clone(), }) } - DatasetFlowType::HardCompacting => { + DatasetFlowType::HardCompaction => { let mut max_slice_size: Option = None; let mut max_slice_records: Option = None; - let maybe_compacting_rule = self + let maybe_compaction_rule = self .state .lock() .unwrap() .active_configs - .try_get_dataset_compacting_rule( + .try_get_dataset_compaction_rule( &flow_key.dataset_id, - DatasetFlowType::HardCompacting, + DatasetFlowType::HardCompaction, ); - if let Some(opts) = maybe_compacting_rule { + if let Some(opts) = maybe_compaction_rule { max_slice_size = Some(opts.max_slice_size()); max_slice_records = Some(opts.max_slice_records()); }; - LogicalPlan::HardCompactingDataset(HardCompactingDataset { + LogicalPlan::HardCompactionDataset(HardCompactionDataset { dataset_id: flow_key.dataset_id.clone(), max_slice_size, max_slice_records, diff --git a/src/domain/flow-system/services/tests/tests/test_flow_configuration_service_impl.rs b/src/domain/flow-system/services/tests/tests/test_flow_configuration_service_impl.rs index 65ab54a3f..a30ef7025 100644 --- a/src/domain/flow-system/services/tests/tests/test_flow_configuration_service_impl.rs +++ b/src/domain/flow-system/services/tests/tests/test_flow_configuration_service_impl.rs @@ -48,12 +48,12 @@ async fn test_visibility() { ) .await; - let foo_compacting_schedule: Schedule = Duration::try_weeks(1).unwrap().into(); + let foo_compaction_schedule: Schedule = Duration::try_weeks(1).unwrap().into(); harness .set_dataset_flow_schedule( foo_id.clone(), - DatasetFlowType::HardCompacting, - foo_compacting_schedule.clone(), + DatasetFlowType::HardCompaction, + foo_compaction_schedule.clone(), ) .await; @@ -79,8 +79,8 @@ async fn test_visibility() { ), ( foo_id, - DatasetFlowType::HardCompacting, - &foo_compacting_schedule, + DatasetFlowType::HardCompaction, + &foo_compaction_schedule, ), (bar_id, DatasetFlowType::Ingest, &bar_ingest_schedule), ] { @@ -169,7 +169,7 @@ async fn test_pause_resume_all_dataset_flows() { assert!(harness.list_enabled_configurations().await.is_empty()); assert_eq!(0, harness.configuration_events_count()); - // Make a dataset and configure ingestion and compacting schedule + // Make a dataset and configure ingestion and compaction schedule let foo_id = harness.create_root_dataset("foo").await; let foo_ingest_schedule: Schedule = Duration::try_days(1).unwrap().into(); harness @@ -179,12 +179,12 @@ async fn test_pause_resume_all_dataset_flows() { foo_ingest_schedule.clone(), ) .await; - let foo_compacting_schedule: Schedule = Duration::try_weeks(1).unwrap().into(); + let foo_compaction_schedule: Schedule = Duration::try_weeks(1).unwrap().into(); harness .set_dataset_flow_schedule( foo_id.clone(), - DatasetFlowType::HardCompacting, - foo_compacting_schedule.clone(), + DatasetFlowType::HardCompaction, + foo_compaction_schedule.clone(), ) .await; @@ -200,8 +200,8 @@ async fn test_pause_resume_all_dataset_flows() { harness.expect_dataset_flow_schedule( &configs, foo_id.clone(), - DatasetFlowType::HardCompacting, - &foo_compacting_schedule, + DatasetFlowType::HardCompaction, + &foo_compaction_schedule, ); assert_eq!(2, harness.configuration_events_count()); @@ -228,16 +228,16 @@ async fn test_pause_resume_all_dataset_flows() { FlowConfigurationRule::Schedule(foo_ingest_schedule.clone()) ); - let flow_config_compacting_state = harness - .get_dataset_flow_config_from_store(foo_id.clone(), DatasetFlowType::HardCompacting) + let flow_config_compaction_state = harness + .get_dataset_flow_config_from_store(foo_id.clone(), DatasetFlowType::HardCompaction) .await; assert_eq!( - flow_config_compacting_state.status, + flow_config_compaction_state.status, FlowConfigurationStatus::PausedTemporarily ); assert_eq!( - flow_config_compacting_state.rule, - FlowConfigurationRule::Schedule(foo_compacting_schedule.clone()) + flow_config_compaction_state.rule, + FlowConfigurationRule::Schedule(foo_compaction_schedule.clone()) ); // Now, resume all configurations @@ -256,8 +256,8 @@ async fn test_pause_resume_all_dataset_flows() { harness.expect_dataset_flow_schedule( &configs, foo_id.clone(), - DatasetFlowType::HardCompacting, - &foo_compacting_schedule, + DatasetFlowType::HardCompaction, + &foo_compaction_schedule, ); assert_eq!(6, harness.configuration_events_count()); } diff --git a/src/domain/flow-system/services/tests/tests/test_flow_service_impl.rs b/src/domain/flow-system/services/tests/tests/test_flow_service_impl.rs index d418c3557..a455f27f0 100644 --- a/src/domain/flow-system/services/tests/tests/test_flow_service_impl.rs +++ b/src/domain/flow-system/services/tests/tests/test_flow_service_impl.rs @@ -435,7 +435,7 @@ async fn test_manual_trigger() { ///////////////////////////////////////////////////////////////////////////////////////// #[test_log::test(tokio::test)] -async fn test_manual_trigger_compacting() { +async fn test_manual_trigger_compaction() { let harness = FlowHarness::new(); let foo_id = harness.create_root_dataset("foo").await; @@ -444,9 +444,9 @@ async fn test_manual_trigger_compacting() { harness.eager_dependencies_graph_init().await; let foo_flow_key: FlowKey = - FlowKeyDataset::new(foo_id.clone(), DatasetFlowType::HardCompacting).into(); + FlowKeyDataset::new(foo_id.clone(), DatasetFlowType::HardCompaction).into(); let bar_flow_key: FlowKey = - FlowKeyDataset::new(bar_id.clone(), DatasetFlowType::HardCompacting).into(); + FlowKeyDataset::new(bar_id.clone(), DatasetFlowType::HardCompaction).into(); let test_flow_listener = harness.catalog.get_one::().unwrap(); test_flow_listener.define_dataset_display_name(foo_id.clone(), "foo".to_string()); @@ -471,7 +471,7 @@ async fn test_manual_trigger_compacting() { dataset_id: Some(foo_id.clone()), run_since_start: Duration::try_milliseconds(10).unwrap(), finish_in_with: Some((Duration::try_milliseconds(20).unwrap(), TaskOutcome::Success(TaskResult::Empty))), - expected_logical_plan: LogicalPlan::HardCompactingDataset(HardCompactingDataset { + expected_logical_plan: LogicalPlan::HardCompactionDataset(HardCompactionDataset { dataset_id: foo_id.clone(), max_slice_size: None, max_slice_records: None, @@ -484,7 +484,7 @@ async fn test_manual_trigger_compacting() { dataset_id: Some(bar_id.clone()), run_since_start: Duration::try_milliseconds(60).unwrap(), finish_in_with: Some((Duration::try_milliseconds(10).unwrap(), TaskOutcome::Success(TaskResult::Empty))), - expected_logical_plan: LogicalPlan::HardCompactingDataset(HardCompactingDataset { + expected_logical_plan: LogicalPlan::HardCompactionDataset(HardCompactionDataset { dataset_id: bar_id.clone(), max_slice_size: None, max_slice_records: None, @@ -530,35 +530,35 @@ async fn test_manual_trigger_compacting() { #0: +0ms: #1: +10ms: - "foo" HardCompacting: + "foo" HardCompaction: Flow ID = 0 Waiting Manual Executor(task=0, since=10ms) #2: +10ms: - "foo" HardCompacting: + "foo" HardCompaction: Flow ID = 0 Running(task=0) #3: +30ms: - "bar" HardCompacting: + "bar" HardCompaction: Flow ID = 1 Waiting Manual - "foo" HardCompacting: + "foo" HardCompaction: Flow ID = 0 Finished Success #4: +50ms: - "bar" HardCompacting: + "bar" HardCompaction: Flow ID = 1 Waiting Manual Executor(task=1, since=50ms) - "foo" HardCompacting: + "foo" HardCompaction: Flow ID = 0 Finished Success #5: +60ms: - "bar" HardCompacting: + "bar" HardCompaction: Flow ID = 1 Running(task=1) - "foo" HardCompacting: + "foo" HardCompaction: Flow ID = 0 Finished Success #6: +70ms: - "bar" HardCompacting: + "bar" HardCompaction: Flow ID = 1 Finished Success - "foo" HardCompacting: + "foo" HardCompaction: Flow ID = 0 Finished Success "# @@ -569,7 +569,7 @@ async fn test_manual_trigger_compacting() { ///////////////////////////////////////////////////////////////////////////////////////// #[test_log::test(tokio::test)] -async fn test_manual_trigger_compacting_with_config() { +async fn test_manual_trigger_compaction_with_config() { let max_slice_size = 1_000_000u64; let max_slice_records = 1000u64; let harness = FlowHarness::new(); @@ -578,16 +578,16 @@ async fn test_manual_trigger_compacting_with_config() { harness.eager_dependencies_graph_init().await; harness - .set_dataset_flow_compacting_rule( + .set_dataset_flow_compaction_rule( harness.now_datetime(), foo_id.clone(), - DatasetFlowType::HardCompacting, - CompactingRule::new_checked(max_slice_size, max_slice_records).unwrap(), + DatasetFlowType::HardCompaction, + CompactionRule::new_checked(max_slice_size, max_slice_records).unwrap(), ) .await; let foo_flow_key: FlowKey = - FlowKeyDataset::new(foo_id.clone(), DatasetFlowType::HardCompacting).into(); + FlowKeyDataset::new(foo_id.clone(), DatasetFlowType::HardCompaction).into(); let test_flow_listener = harness.catalog.get_one::().unwrap(); test_flow_listener.define_dataset_display_name(foo_id.clone(), "foo".to_string()); @@ -611,7 +611,7 @@ async fn test_manual_trigger_compacting_with_config() { dataset_id: Some(foo_id.clone()), run_since_start: Duration::try_milliseconds(30).unwrap(), finish_in_with: Some((Duration::try_milliseconds(10).unwrap(), TaskOutcome::Success(TaskResult::Empty))), - expected_logical_plan: LogicalPlan::HardCompactingDataset(HardCompactingDataset { + expected_logical_plan: LogicalPlan::HardCompactionDataset(HardCompactionDataset { dataset_id: foo_id.clone(), max_slice_size: Some(max_slice_size), max_slice_records: Some(max_slice_records), @@ -644,15 +644,15 @@ async fn test_manual_trigger_compacting_with_config() { #0: +0ms: #1: +20ms: - "foo" HardCompacting: + "foo" HardCompaction: Flow ID = 0 Waiting Manual Executor(task=0, since=20ms) #2: +30ms: - "foo" HardCompacting: + "foo" HardCompaction: Flow ID = 0 Running(task=0) #3: +40ms: - "foo" HardCompacting: + "foo" HardCompaction: Flow ID = 0 Finished Success "# @@ -3752,19 +3752,19 @@ impl FlowHarness { .unwrap(); } - async fn set_dataset_flow_compacting_rule( + async fn set_dataset_flow_compaction_rule( &self, request_time: DateTime, dataset_id: DatasetID, dataset_flow_type: DatasetFlowType, - compacting_rule: CompactingRule, + compaction_rule: CompactionRule, ) { self.flow_configuration_service .set_configuration( request_time, FlowKeyDataset::new(dataset_id, dataset_flow_type).into(), false, - FlowConfigurationRule::CompactingRule(compacting_rule), + FlowConfigurationRule::CompactionRule(compaction_rule), ) .await .unwrap(); diff --git a/src/domain/flow-system/services/tests/tests/utils/task_driver.rs b/src/domain/flow-system/services/tests/tests/utils/task_driver.rs index ddaadb604..65862a917 100644 --- a/src/domain/flow-system/services/tests/tests/utils/task_driver.rs +++ b/src/domain/flow-system/services/tests/tests/utils/task_driver.rs @@ -99,7 +99,7 @@ impl TaskDriver { assert_eq!(&ud.dataset_id, self.args.dataset_id.as_ref().unwrap()); } LogicalPlan::Probe(_) => assert!(self.args.dataset_id.is_none()), - LogicalPlan::HardCompactingDataset(_) => (), + LogicalPlan::HardCompactionDataset(_) => (), } } } diff --git a/src/domain/task-system/domain/src/entities/logical_plan.rs b/src/domain/task-system/domain/src/entities/logical_plan.rs index 38fe17741..a2661c5ba 100644 --- a/src/domain/task-system/domain/src/entities/logical_plan.rs +++ b/src/domain/task-system/domain/src/entities/logical_plan.rs @@ -23,8 +23,8 @@ pub enum LogicalPlan { UpdateDataset(UpdateDataset), /// A task that can be used for testing the scheduling system Probe(Probe), - /// Perform dataset hard compacting - HardCompactingDataset(HardCompactingDataset), + /// Perform dataset hard compaction + HardCompactionDataset(HardCompactionDataset), } impl LogicalPlan { @@ -33,8 +33,8 @@ impl LogicalPlan { match self { LogicalPlan::UpdateDataset(upd) => Some(&upd.dataset_id), LogicalPlan::Probe(p) => p.dataset_id.as_ref(), - LogicalPlan::HardCompactingDataset(hard_compacting) => { - Some(&hard_compacting.dataset_id) + LogicalPlan::HardCompactionDataset(hard_compaction) => { + Some(&hard_compaction.dataset_id) } } } @@ -63,9 +63,9 @@ pub struct Probe { ///////////////////////////////////////////////////////////////////////////////////////// -/// A task to perform a hard compacting of dataset +/// A task to perform a hard compaction of dataset #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] -pub struct HardCompactingDataset { +pub struct HardCompactionDataset { pub dataset_id: DatasetID, pub max_slice_size: Option, pub max_slice_records: Option, diff --git a/src/domain/task-system/domain/src/entities/task_status.rs b/src/domain/task-system/domain/src/entities/task_status.rs index d795cded4..e48f3c42a 100644 --- a/src/domain/task-system/domain/src/entities/task_status.rs +++ b/src/domain/task-system/domain/src/entities/task_status.rs @@ -7,7 +7,7 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. -use kamu_core::{CompactingResult, PullResult}; +use kamu_core::{CompactionResult, PullResult}; use opendatafabric::DatasetID; use serde::{Deserialize, Serialize}; @@ -49,7 +49,7 @@ impl TaskOutcome { pub enum TaskResult { Empty, UpdateDatasetResult(TaskUpdateDatasetResult), - CompactingDatasetResult(TaskCompactingDatasetResult), + CompactionDatasetResult(TaskCompactionDatasetResult), } #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] @@ -64,14 +64,14 @@ impl From for TaskUpdateDatasetResult { } #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] -pub struct TaskCompactingDatasetResult { - pub compacting_result: CompactingResult, +pub struct TaskCompactionDatasetResult { + pub compaction_result: CompactionResult, } -impl From for TaskCompactingDatasetResult { - fn from(value: CompactingResult) -> Self { +impl From for TaskCompactionDatasetResult { + fn from(value: CompactionResult) -> Self { Self { - compacting_result: value, + compaction_result: value, } } } diff --git a/src/domain/task-system/services/src/task_executor_impl.rs b/src/domain/task-system/services/src/task_executor_impl.rs index 93ca26070..2652daea8 100644 --- a/src/domain/task-system/services/src/task_executor_impl.rs +++ b/src/domain/task-system/services/src/task_executor_impl.rs @@ -12,8 +12,8 @@ use std::sync::Arc; use dill::*; use event_bus::EventBus; use kamu_core::{ - CompactingOptions, - CompactingService, + CompactionOptions, + CompactionService, DatasetRepository, PullOptions, PullService, @@ -123,23 +123,23 @@ impl TaskExecutor for TaskExecutorImpl { .clone() .unwrap_or(TaskOutcome::Success(TaskResult::Empty)) } - LogicalPlan::HardCompactingDataset(HardCompactingDataset { + LogicalPlan::HardCompactionDataset(HardCompactionDataset { dataset_id, max_slice_size, max_slice_records, }) => { - let compacting_svc = - self.catalog.get_one::().int_err()?; + let compaction_svc = + self.catalog.get_one::().int_err()?; let dataset_repo = self.catalog.get_one::().int_err()?; let dataset_handle = dataset_repo .resolve_dataset_ref(&dataset_id.as_local_ref()) .await .int_err()?; - let compacting_result = compacting_svc + let compaction_result = compaction_svc .compact_dataset( &dataset_handle, - CompactingOptions { + CompactionOptions { max_slice_size: *max_slice_size, max_slice_records: *max_slice_records, }, @@ -147,9 +147,9 @@ impl TaskExecutor for TaskExecutorImpl { ) .await; - match compacting_result { + match compaction_result { Ok(result) => { - TaskOutcome::Success(TaskResult::CompactingDatasetResult(result.into())) + TaskOutcome::Success(TaskResult::CompactionDatasetResult(result.into())) } Err(err) => { tracing::info!( diff --git a/src/infra/core/src/compacting_service_impl.rs b/src/infra/core/src/compaction_service_impl.rs similarity index 90% rename from src/infra/core/src/compacting_service_impl.rs rename to src/infra/core/src/compaction_service_impl.rs index 27164e9e1..aa5c4e684 100644 --- a/src/infra/core/src/compacting_service_impl.rs +++ b/src/infra/core/src/compaction_service_impl.rs @@ -16,15 +16,15 @@ use chrono::{DateTime, Utc}; use datafusion::prelude::*; use dill::{component, interface}; use domain::{ - CompactingError, - CompactingListener, - CompactingMultiListener, - CompactingOptions, - CompactingPhase, - CompactingResult, - CompactingService, + CompactionError, + CompactionListener, + CompactionMultiListener, + CompactionOptions, + CompactionPhase, + CompactionResult, + CompactionService, InvalidDatasetKindError, - NullCompactingListener, + NullCompactionListener, DEFAULT_MAX_SLICE_RECORDS, DEFAULT_MAX_SLICE_SIZE, }; @@ -46,7 +46,7 @@ use url::Url; use crate::*; -pub struct CompactingServiceImpl { +pub struct CompactionServiceImpl { dataset_repo: Arc, dataset_authorizer: Arc, object_store_registry: Arc, @@ -95,8 +95,8 @@ struct ChainFilesInfo { } #[component(pub)] -#[interface(dyn CompactingService)] -impl CompactingServiceImpl { +#[interface(dyn CompactionService)] +impl CompactionServiceImpl { pub fn new( dataset_authorizer: Arc, dataset_repo: Arc, @@ -118,7 +118,7 @@ impl CompactingServiceImpl { dataset: Arc, max_slice_size: u64, max_slice_records: u64, - ) -> Result { + ) -> Result { // Declare mut values for result let mut old_num_blocks: usize = 0; @@ -159,7 +159,7 @@ impl CompactingServiceImpl { || batch_records + current_records > max_slice_records { let is_appended = - CompactingServiceImpl::append_add_data_batch_to_chain_info( + CompactionServiceImpl::append_add_data_batch_to_chain_info( &mut data_slice_batches, ¤t_hash, &mut data_slice_batch_info, @@ -207,7 +207,7 @@ impl CompactingServiceImpl { if let MetadataEvent::SetVocab(set_vocab_event) = event { vocab_event = Some(set_vocab_event); } - let is_appended = CompactingServiceImpl::append_add_data_batch_to_chain_info( + let is_appended = CompactionServiceImpl::append_add_data_batch_to_chain_info( &mut data_slice_batches, ¤t_hash, &mut data_slice_batch_info, @@ -254,8 +254,8 @@ impl CompactingServiceImpl { &self, data_slice_batches: &mut [DataSliceBatch], offset_column: &str, - compacting_dir_path: &Path, - ) -> Result<(), CompactingError> { + compaction_dir_path: &Path, + ) -> Result<(), CompactionError> { let ctx = new_session_context(self.object_store_registry.clone()); for (index, data_slice_batch) in data_slice_batches.iter_mut().enumerate() { @@ -276,7 +276,7 @@ impl CompactingServiceImpl { .int_err()?; let new_file_path = - compacting_dir_path.join(format!("merge-slice-{index}").as_str()); + compaction_dir_path.join(format!("merge-slice-{index}").as_str()); data_frame .write_parquet( @@ -294,19 +294,19 @@ impl CompactingServiceImpl { Ok(()) } - fn create_run_compacting_dir(&self) -> Result { - let compacting_dir_path = self + fn create_run_compaction_dir(&self) -> Result { + let compaction_dir_path = self .run_info_dir - .join(get_random_name(Some("compacting-"), 10)); - fs::create_dir_all(&compacting_dir_path).int_err()?; - Ok(compacting_dir_path) + .join(get_random_name(Some("compaction-"), 10)); + fs::create_dir_all(&compaction_dir_path).int_err()?; + Ok(compaction_dir_path) } async fn commit_new_blocks( &self, dataset: Arc, chain_files_info: &ChainFilesInfo, - ) -> Result<(Vec, Multihash, usize), CompactingError> { + ) -> Result<(Vec, Multihash, usize), CompactionError> { let chain = dataset.as_metadata_chain(); let mut current_head = chain_files_info.old_head.clone(); let mut old_data_slices: Vec = vec![]; @@ -389,30 +389,30 @@ impl CompactingServiceImpl { dataset: Arc, max_slice_size: u64, max_slice_records: u64, - listener: Arc, - ) -> Result { - let compacting_dir_path = self.create_run_compacting_dir()?; + listener: Arc, + ) -> Result { + let compaction_dir_path = self.create_run_compaction_dir()?; - listener.begin_phase(CompactingPhase::GatherChainInfo); + listener.begin_phase(CompactionPhase::GatherChainInfo); let mut chain_files_info = self .gather_chain_info(dataset.clone(), max_slice_size, max_slice_records) .await?; // if slices amount +1(seed block) eq to amount of blocks we will not perform - // compacting + // compaction if chain_files_info.data_slice_batches.len() + 1 == chain_files_info.old_num_blocks { - return Ok(CompactingResult::NothingToDo); + return Ok(CompactionResult::NothingToDo); } - listener.begin_phase(CompactingPhase::MergeDataslices); + listener.begin_phase(CompactionPhase::MergeDataslices); self.merge_files( &mut chain_files_info.data_slice_batches, chain_files_info.offset_column.as_str(), - &compacting_dir_path, + &compaction_dir_path, ) .await?; - listener.begin_phase(CompactingPhase::CommitNewBlocks); + listener.begin_phase(CompactionPhase::CommitNewBlocks); let (_old_data_slices, new_head, new_num_blocks) = self .commit_new_blocks(dataset.clone(), &chain_files_info) .await?; @@ -429,7 +429,7 @@ impl CompactingServiceImpl { ) .await?; - let res = CompactingResult::Success { + let res = CompactionResult::Success { old_head: chain_files_info.old_head, new_head, old_num_blocks: chain_files_info.old_num_blocks, @@ -443,14 +443,14 @@ impl CompactingServiceImpl { } #[async_trait::async_trait] -impl CompactingService for CompactingServiceImpl { +impl CompactionService for CompactionServiceImpl { #[tracing::instrument(level = "info", skip_all)] async fn compact_dataset( &self, dataset_handle: &DatasetHandle, - options: CompactingOptions, - multi_listener: Option>, - ) -> Result { + options: CompactionOptions, + multi_listener: Option>, + ) -> Result { self.dataset_authorizer .check_action_allowed(dataset_handle, domain::auth::DatasetAction::Write) .await?; @@ -467,7 +467,7 @@ impl CompactingService for CompactingServiceImpl { .kind; if dataset_kind != DatasetKind::Root { - return Err(CompactingError::InvalidDatasetKind( + return Err(CompactionError::InvalidDatasetKind( InvalidDatasetKindError { dataset_name: dataset_handle.alias.dataset_name.clone(), }, @@ -476,7 +476,7 @@ impl CompactingService for CompactingServiceImpl { let listener = multi_listener .and_then(|l| l.begin_compact(dataset_handle)) - .unwrap_or(Arc::new(NullCompactingListener {})); + .unwrap_or(Arc::new(NullCompactionListener {})); let max_slice_size = options.max_slice_size.unwrap_or(DEFAULT_MAX_SLICE_SIZE); let max_slice_records = options diff --git a/src/infra/core/src/lib.rs b/src/infra/core/src/lib.rs index c9e2744e8..3a0799a53 100644 --- a/src/infra/core/src/lib.rs +++ b/src/infra/core/src/lib.rs @@ -25,7 +25,7 @@ mod repos; pub mod testing; // TODO: Put under feature flag pub mod utils; -mod compacting_service_impl; +mod compaction_service_impl; mod dataset_changes_service_impl; mod dataset_config; mod dataset_layout; @@ -44,7 +44,7 @@ mod sync_service_impl; mod transform_service_impl; mod verification_service_impl; -pub use compacting_service_impl::*; +pub use compaction_service_impl::*; pub use dataset_changes_service_impl::*; pub use dataset_config::*; pub use dataset_layout::*; diff --git a/src/infra/core/tests/tests/test_compact_service_impl.rs b/src/infra/core/tests/tests/test_compact_service_impl.rs index ea63761e8..fac130fab 100644 --- a/src/infra/core/tests/tests/test_compact_service_impl.rs +++ b/src/infra/core/tests/tests/test_compact_service_impl.rs @@ -15,11 +15,11 @@ use datafusion::execution::config::SessionConfig; use datafusion::execution::context::SessionContext; use dill::Component; use domain::{ - CompactingError, - CompactingOptions, - CompactingResult, - CompactingService, - NullCompactingMultiListener, + CompactionError, + CompactionOptions, + CompactionResult, + CompactionService, + NullCompactionMultiListener, }; use event_bus::EventBus; use futures::TryStreamExt; @@ -51,7 +51,7 @@ async fn test_dataset_compact() { .await .unwrap(); - // Round 1: Compacting is a no-op + // Round 1: Compaction is a no-op // // Before/after: seed <- add_push_source <- set_vocab <- set_schema <- // set_data_schema <- add_data(3 records) @@ -77,14 +77,14 @@ async fn test_dataset_compact() { assert_matches!( harness - .compacting_svc + .compaction_svc .compact_dataset( &dataset_handle, - CompactingOptions::default(), - Some(Arc::new(NullCompactingMultiListener {})) + CompactionOptions::default(), + Some(Arc::new(NullCompactionMultiListener {})) ) .await, - Ok(CompactingResult::NothingToDo) + Ok(CompactionResult::NothingToDo) ); assert_eq!( @@ -121,14 +121,14 @@ async fn test_dataset_compact() { assert_matches!( harness - .compacting_svc + .compaction_svc .compact_dataset( &dataset_handle, - CompactingOptions::default(), - Some(Arc::new(NullCompactingMultiListener {})) + CompactionOptions::default(), + Some(Arc::new(NullCompactionMultiListener {})) ) .await, - Ok(CompactingResult::Success { + Ok(CompactionResult::Success { new_head, old_head, new_num_blocks: 5, @@ -199,7 +199,7 @@ async fn test_dataset_compact_s3() { .await .unwrap(); - // Round 1: Compacting is a no-op + // Round 1: Compaction is a no-op // // Before/after: seed <- add_push_source <- set_vocab <- set_schema <- // set_data_schema <- add_data(3 records) @@ -225,14 +225,14 @@ async fn test_dataset_compact_s3() { assert_matches!( harness - .compacting_svc + .compaction_svc .compact_dataset( &dataset_handle, - CompactingOptions::default(), - Some(Arc::new(NullCompactingMultiListener {})) + CompactionOptions::default(), + Some(Arc::new(NullCompactionMultiListener {})) ) .await, - Ok(CompactingResult::NothingToDo) + Ok(CompactionResult::NothingToDo) ); assert_eq!( @@ -269,14 +269,14 @@ async fn test_dataset_compact_s3() { assert_matches!( harness - .compacting_svc + .compaction_svc .compact_dataset( &dataset_handle, - CompactingOptions::default(), - Some(Arc::new(NullCompactingMultiListener {})) + CompactionOptions::default(), + Some(Arc::new(NullCompactionMultiListener {})) ) .await, - Ok(CompactingResult::Success { + Ok(CompactionResult::Success { new_head, old_head, new_num_blocks: 5, @@ -303,7 +303,7 @@ async fn test_dataset_compact_s3() { #[test_group::group(ingest, datafusion, compact)] #[tokio::test] -async fn test_dataset_compacting_watermark_only_blocks() { +async fn test_dataset_compaction_watermark_only_blocks() { let harness = CompactTestHarness::new(); let created = harness.create_test_dataset().await; @@ -392,16 +392,16 @@ async fn test_dataset_compacting_watermark_only_blocks() { // After: ... <- add_data(6 records, wm2, src2) let res = harness - .compacting_svc + .compaction_svc .compact_dataset( &dataset_handle, - CompactingOptions::default(), - Some(Arc::new(NullCompactingMultiListener {})), + CompactionOptions::default(), + Some(Arc::new(NullCompactionMultiListener {})), ) .await .unwrap(); - let CompactingResult::Success { + let CompactionResult::Success { old_num_blocks, new_num_blocks, new_head, @@ -479,7 +479,7 @@ async fn test_dataset_compacting_watermark_only_blocks() { #[test_group::group(ingest, datafusion, compact)] #[tokio::test] -async fn test_dataset_compacting_limits() { +async fn test_dataset_compaction_limits() { let harness = CompactTestHarness::new(); let created = harness.create_test_dataset().await; @@ -565,17 +565,17 @@ async fn test_dataset_compacting_limits() { assert_matches!( harness - .compacting_svc + .compaction_svc .compact_dataset( &dataset_handle, - CompactingOptions { + CompactionOptions { max_slice_records: Some(6), max_slice_size: None, }, - Some(Arc::new(NullCompactingMultiListener {})) + Some(Arc::new(NullCompactionMultiListener {})) ) .await, - Ok(CompactingResult::Success { + Ok(CompactionResult::Success { new_head, old_head, new_num_blocks: 7, @@ -653,7 +653,7 @@ async fn test_dataset_compacting_limits() { #[test_group::group(ingest, datafusion, compact)] #[tokio::test] -async fn test_dataset_compacting_keep_all_non_data_blocks() { +async fn test_dataset_compaction_keep_all_non_data_blocks() { let harness = CompactTestHarness::new(); let created = harness.create_test_dataset().await; @@ -731,14 +731,14 @@ async fn test_dataset_compacting_keep_all_non_data_blocks() { assert_matches!( harness - .compacting_svc + .compaction_svc .compact_dataset( &dataset_handle, - CompactingOptions::default(), - Some(Arc::new(NullCompactingMultiListener {})) + CompactionOptions::default(), + Some(Arc::new(NullCompactionMultiListener {})) ) .await, - Ok(CompactingResult::Success { + Ok(CompactionResult::Success { new_head, old_head, new_num_blocks: 7, @@ -803,7 +803,7 @@ async fn test_dataset_compacting_keep_all_non_data_blocks() { #[test_group::group(compact)] #[tokio::test] -async fn test_dataset_compacting_derive_error() { +async fn test_dataset_compaction_derive_error() { let harness = CompactTestHarness::new(); let created = harness @@ -818,14 +818,14 @@ async fn test_dataset_compacting_derive_error() { assert_matches!( harness - .compacting_svc + .compaction_svc .compact_dataset( &created.dataset_handle, - CompactingOptions::default(), - Some(Arc::new(NullCompactingMultiListener {})) + CompactionOptions::default(), + Some(Arc::new(NullCompactionMultiListener {})) ) .await, - Err(CompactingError::InvalidDatasetKind(_)), + Err(CompactionError::InvalidDatasetKind(_)), ); } @@ -883,17 +883,17 @@ async fn test_large_dataset_compact() { assert_matches!( harness - .compacting_svc + .compaction_svc .compact_dataset( &dataset_handle, - CompactingOptions { + CompactionOptions { max_slice_records: Some(10), max_slice_size: None, }, - Some(Arc::new(NullCompactingMultiListener {})) + Some(Arc::new(NullCompactionMultiListener {})) ) .await, - Ok(CompactingResult::Success { + Ok(CompactionResult::Success { new_head, old_head, new_num_blocks: 24, @@ -951,7 +951,7 @@ async fn test_large_dataset_compact() { struct CompactTestHarness { _temp_dir: tempfile::TempDir, dataset_repo: Arc, - compacting_svc: Arc, + compaction_svc: Arc, push_ingest_svc: Arc, verification_svc: Arc, current_date_time: DateTime, @@ -987,8 +987,8 @@ impl CompactTestHarness { .add::() .add::() .add::() - .add_builder(CompactingServiceImpl::builder().with_run_info_dir(run_info_dir.clone())) - .bind::() + .add_builder(CompactionServiceImpl::builder().with_run_info_dir(run_info_dir.clone())) + .bind::() .add_builder( PushIngestServiceImpl::builder() .with_object_store_registry(Arc::new(ObjectStoreRegistryImpl::new(vec![ @@ -1004,14 +1004,14 @@ impl CompactTestHarness { .build(); let dataset_repo = catalog.get_one::().unwrap(); - let compacting_svc = catalog.get_one::().unwrap(); + let compaction_svc = catalog.get_one::().unwrap(); let push_ingest_svc = catalog.get_one::().unwrap(); let verification_svc = catalog.get_one::().unwrap(); Self { _temp_dir: temp_dir, dataset_repo, - compacting_svc, + compaction_svc, push_ingest_svc, verification_svc, current_date_time, @@ -1051,7 +1051,7 @@ impl CompactTestHarness { let dataset_repo = catalog.get_one::().unwrap(); let object_store_registry = catalog.get_one::().unwrap(); - let compacting_svc = CompactingServiceImpl::new( + let compaction_svc = CompactionServiceImpl::new( catalog.get_one().unwrap(), dataset_repo.clone(), object_store_registry.clone(), @@ -1073,7 +1073,7 @@ impl CompactTestHarness { Self { _temp_dir: temp_dir, dataset_repo, - compacting_svc: Arc::new(compacting_svc), + compaction_svc: Arc::new(compaction_svc), push_ingest_svc: Arc::new(push_ingest_svc), verification_svc, current_date_time, diff --git a/src/infra/flow-system/inmem/tests/tests/test_flow_event_store_inmem.rs b/src/infra/flow-system/inmem/tests/tests/test_flow_event_store_inmem.rs index b19a2aa9f..68593b4bf 100644 --- a/src/infra/flow-system/inmem/tests/tests/test_flow_event_store_inmem.rs +++ b/src/infra/flow-system/inmem/tests/tests/test_flow_event_store_inmem.rs @@ -40,9 +40,9 @@ async fn test_dataset_flow_empty_filters_distingush_dataset() { }, 6, vec![ - foo_cases.compacting_flow_ids.flow_id_finished, - foo_cases.compacting_flow_ids.flow_id_running, - foo_cases.compacting_flow_ids.flow_id_waiting, + foo_cases.compaction_flow_ids.flow_id_finished, + foo_cases.compaction_flow_ids.flow_id_running, + foo_cases.compaction_flow_ids.flow_id_waiting, foo_cases.ingest_flow_ids.flow_id_finished, foo_cases.ingest_flow_ids.flow_id_running, foo_cases.ingest_flow_ids.flow_id_waiting, @@ -60,9 +60,9 @@ async fn test_dataset_flow_empty_filters_distingush_dataset() { }, 6, vec![ - bar_cases.compacting_flow_ids.flow_id_finished, - bar_cases.compacting_flow_ids.flow_id_running, - bar_cases.compacting_flow_ids.flow_id_waiting, + bar_cases.compaction_flow_ids.flow_id_finished, + bar_cases.compaction_flow_ids.flow_id_running, + bar_cases.compaction_flow_ids.flow_id_waiting, bar_cases.ingest_flow_ids.flow_id_finished, bar_cases.ingest_flow_ids.flow_id_running, bar_cases.ingest_flow_ids.flow_id_waiting, @@ -87,7 +87,7 @@ async fn test_dataset_flow_filter_by_status() { ..Default::default() }, vec![ - foo_cases.compacting_flow_ids.flow_id_waiting, + foo_cases.compaction_flow_ids.flow_id_waiting, foo_cases.ingest_flow_ids.flow_id_waiting, ], ), @@ -97,7 +97,7 @@ async fn test_dataset_flow_filter_by_status() { ..Default::default() }, vec![ - foo_cases.compacting_flow_ids.flow_id_running, + foo_cases.compaction_flow_ids.flow_id_running, foo_cases.ingest_flow_ids.flow_id_running, ], ), @@ -107,7 +107,7 @@ async fn test_dataset_flow_filter_by_status() { ..Default::default() }, vec![ - foo_cases.compacting_flow_ids.flow_id_finished, + foo_cases.compaction_flow_ids.flow_id_finished, foo_cases.ingest_flow_ids.flow_id_finished, ], ), @@ -152,13 +152,13 @@ async fn test_dataset_flow_filter_by_flow_type() { ), ( DatasetFlowFilters { - by_flow_type: Some(DatasetFlowType::HardCompacting), + by_flow_type: Some(DatasetFlowType::HardCompaction), ..Default::default() }, vec![ - foo_cases.compacting_flow_ids.flow_id_finished, - foo_cases.compacting_flow_ids.flow_id_running, - foo_cases.compacting_flow_ids.flow_id_waiting, + foo_cases.compaction_flow_ids.flow_id_finished, + foo_cases.compaction_flow_ids.flow_id_running, + foo_cases.compaction_flow_ids.flow_id_waiting, ], ), ( @@ -204,7 +204,7 @@ async fn test_dataset_flow_filter_by_initiator() { ..Default::default() }, vec![ - foo_cases.compacting_flow_ids.flow_id_running, + foo_cases.compaction_flow_ids.flow_id_running, foo_cases.ingest_flow_ids.flow_id_running, ], ), @@ -216,7 +216,7 @@ async fn test_dataset_flow_filter_by_initiator() { ..Default::default() }, vec![ - foo_cases.compacting_flow_ids.flow_id_waiting, + foo_cases.compaction_flow_ids.flow_id_waiting, foo_cases.ingest_flow_ids.flow_id_waiting, ], ), @@ -226,7 +226,7 @@ async fn test_dataset_flow_filter_by_initiator() { ..Default::default() }, vec![ - foo_cases.compacting_flow_ids.flow_id_finished, + foo_cases.compaction_flow_ids.flow_id_finished, foo_cases.ingest_flow_ids.flow_id_finished, ], ), @@ -269,12 +269,12 @@ async fn test_dataset_flow_filter_combinations() { ( DatasetFlowFilters { by_flow_status: Some(FlowStatus::Waiting), - by_flow_type: Some(DatasetFlowType::HardCompacting), + by_flow_type: Some(DatasetFlowType::HardCompaction), by_initiator: Some(InitiatorFilter::Account(AccountID::new_seeded_ed25519( b"petya", ))), }, - vec![foo_cases.compacting_flow_ids.flow_id_waiting], + vec![foo_cases.compaction_flow_ids.flow_id_waiting], ), ( DatasetFlowFilters { @@ -318,8 +318,8 @@ async fn test_dataset_flow_pagination() { limit: 2, }, vec![ - foo_cases.compacting_flow_ids.flow_id_finished, - foo_cases.compacting_flow_ids.flow_id_running, + foo_cases.compaction_flow_ids.flow_id_finished, + foo_cases.compaction_flow_ids.flow_id_running, ], ), ( @@ -328,7 +328,7 @@ async fn test_dataset_flow_pagination() { limit: 3, }, vec![ - foo_cases.compacting_flow_ids.flow_id_waiting, + foo_cases.compaction_flow_ids.flow_id_waiting, foo_cases.ingest_flow_ids.flow_id_finished, foo_cases.ingest_flow_ids.flow_id_running, ], @@ -786,7 +786,7 @@ fn make_event_stores() -> (Arc, Arc