Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rename compacting names to compaction #629

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
6 changes: 5 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## Unreleased
### Changed
- Rename `compacting` entities and names to `compaction`

## [0.180.0] - 2024-05-08
### Added
- GraphQL account flows endpoints:
Expand Down Expand Up @@ -34,7 +38,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- account ID resolutions are no longer mocked
- REST API to login remotely using password and GitHub methods
### Fixed
- Compacting datasets stored in an S3 bucket
- Compaction datasets stored in an S3 bucket

## [0.177.0] - 2024-04-25
### Added
Expand Down
12 changes: 6 additions & 6 deletions resources/cli-reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -1238,20 +1238,20 @@ Compact a dataset
* `--max-slice-records <RECORDS>` — Maximum amount of records in a single data slice file

Default value: `10000`
* `--hard` — Perform 'hard' compacting that rewrites the history of a dataset
* `--verify` — Perform verification of the dataset before running a compacting
* `--hard` — Perform 'hard' compaction that rewrites the history of a dataset
* `--verify` — Perform verification of the dataset before running a compaction

For datasets that get frequent small appends the number of data slices can grow over time and affect the performance of querying. This command allows to merge multiple small data slices into a few large files, which can be beneficial in terms of size from more compact encoding, and in query performance, as data engines will have to scan through far fewer file headers.

There are two types of compactings: soft and hard.
There are two types of compactions: soft and hard.

Soft compactings produce new files while leaving the old blocks intact. This allows for faster queries, while still preserving the accurate history of how dataset evolved over time.
Soft compactions produce new files while leaving the old blocks intact. This allows for faster queries, while still preserving the accurate history of how dataset evolved over time.

Hard compactings rewrite the history of the dataset as if data was originally written in big batches. They allow to shrink the history of a dataset to just a few blocks, reclaim the space used by old data files, but at the expense of history loss. Hard compactings will rewrite the metadata chain, changing block hashes. Therefore, they will **break all downstream datasets** that depend on them.
Hard compactions rewrite the history of the dataset as if data was originally written in big batches. They allow to shrink the history of a dataset to just a few blocks, reclaim the space used by old data files, but at the expense of history loss. Hard compactions will rewrite the metadata chain, changing block hashes. Therefore, they will **break all downstream datasets** that depend on them.

**Examples:**

Perform a history-altering hard compacting:
Perform a history-altering hard compaction:

kamu system compact --hard my.dataset

Expand Down
32 changes: 16 additions & 16 deletions resources/schema.gql
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ type CommitResultSuccess implements CommitResult & UpdateReadmeResult {
message: String!
}

input CompactingConditionInput {
input CompactionConditionInput {
maxSliceSize: Int!
maxSliceRecords: Int!
}
Expand Down Expand Up @@ -427,7 +427,7 @@ type DatasetFlowConfigs {
type DatasetFlowConfigsMut {
setConfigSchedule(datasetFlowType: DatasetFlowType!, paused: Boolean!, schedule: ScheduleInput!): SetFlowConfigResult!
setConfigBatching(datasetFlowType: DatasetFlowType!, paused: Boolean!, batching: BatchingConditionInput!): SetFlowBatchingConfigResult!
setConfigCompacting(datasetFlowType: DatasetFlowType!, compactingArgs: CompactingConditionInput!): SetFlowCompactingConfigResult!
setConfigCompaction(datasetFlowType: DatasetFlowType!, compactionArgs: CompactionConditionInput!): SetFlowCompactionConfigResult!
pauseFlows(datasetFlowType: DatasetFlowType): Boolean!
resumeFlows(datasetFlowType: DatasetFlowType): Boolean!
}
Expand All @@ -451,7 +451,7 @@ type DatasetFlowRunsMut {
enum DatasetFlowType {
INGEST
EXECUTE_TRANSFORM
HARD_COMPACTING
HARD_COMPACTION
}

type DatasetFlows {
Expand Down Expand Up @@ -775,15 +775,15 @@ type FlowConfiguration {
paused: Boolean!
schedule: FlowConfigurationSchedule
batching: FlowConfigurationBatching
compacting: FlowConfigurationCompacting
compaction: FlowConfigurationCompaction
}

type FlowConfigurationBatching {
minRecordsToAwait: Int!
maxBatchingInterval: TimeDelta!
}

type FlowConfigurationCompacting {
type FlowConfigurationCompaction {
maxSliceSize: Int!
maxSliceRecords: Int!
}
Expand Down Expand Up @@ -811,19 +811,19 @@ type FlowDatasetCompactedFailedError {
message: String!
}

union FlowDescription = FlowDescriptionDatasetPollingIngest | FlowDescriptionDatasetPushIngest | FlowDescriptionDatasetExecuteTransform | FlowDescriptionDatasetHardCompacting | FlowDescriptionSystemGC
union FlowDescription = FlowDescriptionDatasetPollingIngest | FlowDescriptionDatasetPushIngest | FlowDescriptionDatasetExecuteTransform | FlowDescriptionDatasetHardCompaction | FlowDescriptionSystemGC

type FlowDescriptionDatasetExecuteTransform {
datasetId: DatasetID!
transformResult: FlowDescriptionUpdateResult
}

type FlowDescriptionDatasetHardCompacting {
type FlowDescriptionDatasetHardCompaction {
datasetId: DatasetID!
compactingResult: FlowDescriptionDatasetHardCompactingResult
compactionResult: FlowDescriptionDatasetHardCompactionResult
}

union FlowDescriptionDatasetHardCompactingResult = FlowDescriptionHardCompactingNothingToDo | FlowDescriptionHardCompactingSuccess
union FlowDescriptionDatasetHardCompactionResult = FlowDescriptionHardCompactionNothingToDo | FlowDescriptionHardCompactionSuccess

type FlowDescriptionDatasetPollingIngest {
datasetId: DatasetID!
Expand All @@ -837,12 +837,12 @@ type FlowDescriptionDatasetPushIngest {
ingestResult: FlowDescriptionUpdateResult
}

type FlowDescriptionHardCompactingNothingToDo {
type FlowDescriptionHardCompactionNothingToDo {
dummy: String!
message: String!
}

type FlowDescriptionHardCompactingSuccess {
type FlowDescriptionHardCompactionSuccess {
originalBlocksCount: Int!
resultingBlocksCount: Int!
newHead: Multihash!
Expand Down Expand Up @@ -910,7 +910,7 @@ union FlowFailedReason = FlowFailedMessage | FlowDatasetCompactedFailedError

scalar FlowID

type FlowIncompatibleDatasetKind implements SetFlowConfigResult & SetFlowBatchingConfigResult & SetFlowCompactingConfigResult & TriggerFlowResult {
type FlowIncompatibleDatasetKind implements SetFlowConfigResult & SetFlowBatchingConfigResult & SetFlowCompactionConfigResult & TriggerFlowResult {
expectedDatasetKind: DatasetKind!
actualDatasetKind: DatasetKind!
message: String!
Expand All @@ -921,7 +921,7 @@ type FlowInvalidBatchingConfig implements SetFlowBatchingConfigResult {
message: String!
}

type FlowInvalidCompactingConfig implements SetFlowCompactingConfigResult {
type FlowInvalidCompactionConfig implements SetFlowCompactionConfigResult {
reason: String!
message: String!
}
Expand Down Expand Up @@ -1007,7 +1007,7 @@ type FlowTriggerPush {
dummy: Boolean!
}

type FlowTypeIsNotSupported implements SetFlowConfigResult & SetFlowBatchingConfigResult & SetFlowCompactingConfigResult {
type FlowTypeIsNotSupported implements SetFlowConfigResult & SetFlowBatchingConfigResult & SetFlowCompactionConfigResult {
message: String!
}

Expand Down Expand Up @@ -1389,15 +1389,15 @@ interface SetFlowBatchingConfigResult {
message: String!
}

interface SetFlowCompactingConfigResult {
interface SetFlowCompactionConfigResult {
message: String!
}

interface SetFlowConfigResult {
message: String!
}

type SetFlowConfigSuccess implements SetFlowConfigResult & SetFlowBatchingConfigResult & SetFlowCompactingConfigResult {
type SetFlowConfigSuccess implements SetFlowConfigResult & SetFlowBatchingConfigResult & SetFlowCompactionConfigResult {
config: FlowConfiguration!
message: String!
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
use chrono::Utc;
use kamu_flow_system::{
BatchingRule,
CompactingRule,
CompactionRule,
FlowConfigurationRule,
FlowConfigurationService,
FlowKeyDataset,
Expand Down Expand Up @@ -163,28 +163,28 @@ impl DatasetFlowConfigsMut {
}

#[graphql(guard = "LoggedInGuard::new()")]
async fn set_config_compacting(
async fn set_config_compaction(
&self,
ctx: &Context<'_>,
dataset_flow_type: DatasetFlowType,
compacting_args: CompactingConditionInput,
) -> Result<SetFlowCompactingConfigResult> {
compaction_args: CompactionConditionInput,
) -> Result<SetFlowCompactionConfigResult> {
if !ensure_set_config_flow_supported(
dataset_flow_type,
std::any::type_name::<CompactingRule>(),
std::any::type_name::<CompactionRule>(),
) {
return Ok(SetFlowCompactingConfigResult::TypeIsNotSupported(
return Ok(SetFlowCompactionConfigResult::TypeIsNotSupported(
FlowTypeIsNotSupported,
));
}
let compacting_rule = match CompactingRule::new_checked(
compacting_args.max_slice_size,
compacting_args.max_slice_records,
let compaction_rule = match CompactionRule::new_checked(
compaction_args.max_slice_size,
compaction_args.max_slice_records,
) {
Ok(rule) => rule,
Err(e) => {
return Ok(SetFlowCompactingConfigResult::InvalidCompactingConfig(
FlowInvalidCompactingConfig {
return Ok(SetFlowCompactionConfigResult::InvalidCompactionConfig(
FlowInvalidCompactionConfig {
reason: e.to_string(),
},
))
Expand All @@ -194,7 +194,7 @@ impl DatasetFlowConfigsMut {
if let Some(e) =
ensure_expected_dataset_kind(ctx, &self.dataset_handle, dataset_flow_type).await?
{
return Ok(SetFlowCompactingConfigResult::IncompatibleDatasetKind(e));
return Ok(SetFlowCompactionConfigResult::IncompatibleDatasetKind(e));
}
ensure_scheduling_permission(ctx, &self.dataset_handle).await?;

Expand All @@ -206,14 +206,14 @@ impl DatasetFlowConfigsMut {
FlowKeyDataset::new(self.dataset_handle.id.clone(), dataset_flow_type.into())
.into(),
false,
FlowConfigurationRule::CompactingRule(compacting_rule),
FlowConfigurationRule::CompactionRule(compaction_rule),
)
.await
.map_err(|e| match e {
SetFlowConfigurationError::Internal(e) => GqlError::Internal(e),
})?;

Ok(SetFlowCompactingConfigResult::Success(
Ok(SetFlowCompactionConfigResult::Success(
SetFlowConfigSuccess { config: res.into() },
))
}
Expand Down Expand Up @@ -301,7 +301,7 @@ struct BatchingConditionInput {
///////////////////////////////////////////////////////////////////////////////

#[derive(InputObject)]
struct CompactingConditionInput {
struct CompactionConditionInput {
pub max_slice_size: u64,
pub max_slice_records: u64,
}
Expand Down Expand Up @@ -355,23 +355,23 @@ impl FlowInvalidBatchingConfig {

#[derive(SimpleObject, Debug, Clone)]
#[graphql(complex)]
pub(crate) struct FlowInvalidCompactingConfig {
pub(crate) struct FlowInvalidCompactionConfig {
reason: String,
}

#[ComplexObject]
impl FlowInvalidCompactingConfig {
impl FlowInvalidCompactionConfig {
pub async fn message(&self) -> String {
self.reason.clone()
}
}

#[derive(Interface)]
#[graphql(field(name = "message", ty = "String"))]
enum SetFlowCompactingConfigResult {
enum SetFlowCompactionConfigResult {
Success(SetFlowConfigSuccess),
IncompatibleDatasetKind(FlowIncompatibleDatasetKind),
InvalidCompactingConfig(FlowInvalidCompactingConfig),
InvalidCompactionConfig(FlowInvalidCompactionConfig),
TypeIsNotSupported(FlowTypeIsNotSupported),
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ pub(crate) async fn ensure_flow_preconditions(
}));
};
}
DatasetFlowType::HardCompacting => (),
DatasetFlowType::HardCompaction => (),
}
Ok(None)
}
Expand Down
32 changes: 16 additions & 16 deletions src/adapter/graphql/src/queries/flows/flow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,11 +103,11 @@ impl Flow {
.int_err()?,
})
}
fs::DatasetFlowType::HardCompacting => {
FlowDescriptionDataset::HardCompacting(FlowDescriptionDatasetHardCompacting {
fs::DatasetFlowType::HardCompaction => {
FlowDescriptionDataset::HardCompaction(FlowDescriptionDatasetHardCompaction {
dataset_id: dataset_key.dataset_id.clone().into(),
compacting_result:
FlowDescriptionDatasetHardCompactingResult::from_maybe_flow_outcome(
compaction_result:
FlowDescriptionDatasetHardCompactionResult::from_maybe_flow_outcome(
self.flow_state.outcome.as_ref(),
),
})
Expand Down Expand Up @@ -234,7 +234,7 @@ enum FlowDescriptionDataset {
PollingIngest(FlowDescriptionDatasetPollingIngest),
PushIngest(FlowDescriptionDatasetPushIngest),
ExecuteTransform(FlowDescriptionDatasetExecuteTransform),
HardCompacting(FlowDescriptionDatasetHardCompacting),
HardCompaction(FlowDescriptionDatasetHardCompaction),
}

#[derive(SimpleObject)]
Expand All @@ -258,9 +258,9 @@ struct FlowDescriptionDatasetExecuteTransform {
}

#[derive(SimpleObject)]
struct FlowDescriptionDatasetHardCompacting {
struct FlowDescriptionDatasetHardCompaction {
dataset_id: DatasetID,
compacting_result: Option<FlowDescriptionDatasetHardCompactingResult>,
compaction_result: Option<FlowDescriptionDatasetHardCompactionResult>,
}

///////////////////////////////////////////////////////////////////////////////
Expand Down Expand Up @@ -310,44 +310,44 @@ impl FlowDescriptionUpdateResult {
///////////////////////////////////////////////////////////////////////////////

#[derive(Union, Debug, Clone)]
enum FlowDescriptionDatasetHardCompactingResult {
NothingToDo(FlowDescriptionHardCompactingNothingToDo),
Success(FlowDescriptionHardCompactingSuccess),
enum FlowDescriptionDatasetHardCompactionResult {
NothingToDo(FlowDescriptionHardCompactionNothingToDo),
Success(FlowDescriptionHardCompactionSuccess),
}

#[derive(SimpleObject, Debug, Clone)]
struct FlowDescriptionHardCompactingSuccess {
struct FlowDescriptionHardCompactionSuccess {
original_blocks_count: u64,
resulting_blocks_count: u64,
new_head: Multihash,
}

#[derive(SimpleObject, Debug, Clone)]
#[graphql(complex)]
pub struct FlowDescriptionHardCompactingNothingToDo {
pub struct FlowDescriptionHardCompactionNothingToDo {
pub _dummy: String,
}

#[ComplexObject]
impl FlowDescriptionHardCompactingNothingToDo {
impl FlowDescriptionHardCompactionNothingToDo {
async fn message(&self) -> String {
"Nothing to do".to_string()
}
}

impl FlowDescriptionDatasetHardCompactingResult {
impl FlowDescriptionDatasetHardCompactionResult {
fn from_maybe_flow_outcome(maybe_outcome: Option<&fs::FlowOutcome>) -> Option<Self> {
if let Some(outcome) = maybe_outcome {
match outcome {
fs::FlowOutcome::Success(result) => match result {
fs::FlowResult::DatasetUpdate(_) => None,
fs::FlowResult::Empty => Some(Self::NothingToDo(
FlowDescriptionHardCompactingNothingToDo {
FlowDescriptionHardCompactionNothingToDo {
_dummy: "Nothing to do".to_string(),
},
)),
fs::FlowResult::DatasetCompact(compact) => {
Some(Self::Success(FlowDescriptionHardCompactingSuccess {
Some(Self::Success(FlowDescriptionHardCompactionSuccess {
original_blocks_count: compact.old_num_blocks as u64,
resulting_blocks_count: compact.new_num_blocks as u64,
new_head: compact.new_head.clone().into(),
Expand Down