Skip to content

Commit

Permalink
Change property from max_writer_tasks_count to max_writer_task_count
Browse files Browse the repository at this point in the history
  • Loading branch information
radek committed Nov 17, 2023
1 parent 85d4b6a commit b6eebec
Show file tree
Hide file tree
Showing 7 changed files with 43 additions and 43 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ public final class SystemSessionProperties
public static final String USE_PREFERRED_WRITE_PARTITIONING = "use_preferred_write_partitioning";
public static final String SCALE_WRITERS = "scale_writers";
public static final String TASK_SCALE_WRITERS_ENABLED = "task_scale_writers_enabled";
public static final String MAX_WRITER_TASKS_COUNT = "max_writer_tasks_count";
public static final String MAX_WRITER_TASK_COUNT = "max_writer_task_count";
public static final String WRITER_SCALING_MIN_DATA_PROCESSED = "writer_scaling_min_data_processed";
public static final String SKEWED_PARTITION_MIN_DATA_PROCESSED_REBALANCE_THRESHOLD = "skewed_partition_min_data_processed_rebalance_threshold";
public static final String MAX_MEMORY_PER_PARTITION_WRITER = "max_memory_per_partition_writer";
Expand Down Expand Up @@ -320,10 +320,10 @@ public SystemSessionProperties(
featuresConfig.isScaleWriters(),
false),
integerProperty(
MAX_WRITER_TASKS_COUNT,
MAX_WRITER_TASK_COUNT,
"Maximum number of tasks that will participate in writing data",
queryManagerConfig.getMaxWriterTasksCount(),
value -> validateIntegerValue(value, MAX_WRITER_TASKS_COUNT, 1, false),
queryManagerConfig.getMaxWriterTaskCount(),
value -> validateIntegerValue(value, MAX_WRITER_TASK_COUNT, 1, false),
false),
booleanProperty(
TASK_SCALE_WRITERS_ENABLED,
Expand Down Expand Up @@ -1149,7 +1149,7 @@ public static boolean isTaskScaleWritersEnabled(Session session)

public static int getMaxWriterTaskCount(Session session)
{
return session.getSystemProperty(MAX_WRITER_TASKS_COUNT, Integer.class);
return session.getSystemProperty(MAX_WRITER_TASK_COUNT, Integer.class);
}

public static DataSize getWriterScalingMinDataProcessed(Session session)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ public class QueryManagerConfig
private int maxHashPartitionCount = 100;
private int minHashPartitionCount = 4;
private int minHashPartitionCountForWrite = 50;
private int maxWriterTasksCount = 100;
private int maxWriterTaskCount = 100;
private Duration minQueryExpireAge = new Duration(15, TimeUnit.MINUTES);
private int maxQueryHistory = 100;
private int maxQueryLength = 1_000_000;
Expand Down Expand Up @@ -260,16 +260,16 @@ public QueryManagerConfig setMinHashPartitionCountForWrite(int minHashPartitionC
}

@Min(1)
public int getMaxWriterTasksCount()
public int getMaxWriterTaskCount()
{
return maxWriterTasksCount;
return maxWriterTaskCount;
}

@Config("query.max-writer-task-count")
@ConfigDescription("Maximum number of tasks that will participate in writing data")
public QueryManagerConfig setMaxWriterTasksCount(int maxWritersNodesCount)
public QueryManagerConfig setMaxWriterTaskCount(int maxWritersNodesCount)
{
this.maxWriterTasksCount = maxWritersNodesCount;
this.maxWriterTaskCount = maxWritersNodesCount;
return this;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ public void testDefaults()
.setFaultTolerantExecutionSmallStageSourceSizeMultiplier(1.2)
.setFaultTolerantExecutionSmallStageRequireNoMorePartitions(false)
.setFaultTolerantExecutionStageEstimationForEagerParentEnabled(true)
.setMaxWriterTasksCount(100));
.setMaxWriterTaskCount(100));
}

@Test
Expand Down Expand Up @@ -258,7 +258,7 @@ public void testExplicitPropertyMappings()
.setFaultTolerantExecutionSmallStageSourceSizeMultiplier(1.6)
.setFaultTolerantExecutionSmallStageRequireNoMorePartitions(true)
.setFaultTolerantExecutionStageEstimationForEagerParentEnabled(false)
.setMaxWriterTasksCount(101);
.setMaxWriterTaskCount(101);

assertFullMapping(properties, expected);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@
import java.util.Optional;
import java.util.OptionalInt;

import static io.trino.SystemSessionProperties.MAX_WRITER_TASKS_COUNT;
import static io.trino.SystemSessionProperties.MAX_WRITER_TASK_COUNT;
import static io.trino.SystemSessionProperties.REDISTRIBUTE_WRITES;
import static io.trino.SystemSessionProperties.RETRY_POLICY;
import static io.trino.SystemSessionProperties.SCALE_WRITERS;
Expand Down Expand Up @@ -144,7 +144,7 @@ public void testPlanWhenInsertToUnpartitionedTableScaleWritersDisabled()
@Language("SQL") String query = "INSERT INTO unpartitioned_target_table VALUES ('one', 'two')";

Session session = Session.builder(getQueryRunner().getDefaultSession())
.setSystemProperty(MAX_WRITER_TASKS_COUNT, "2")
.setSystemProperty(MAX_WRITER_TASK_COUNT, "2")
.setSystemProperty(SCALE_WRITERS, "false")
.setCatalog(catalogName)
.build();
Expand All @@ -155,7 +155,7 @@ public void testPlanWhenInsertToUnpartitionedTableScaleWritersDisabled()
anyTree(
node(TableWriterNode.class,
exchange(LOCAL, Optional.empty(),
// partitionCount for writing stage should be set to because session variable MAX_WRITER_TASKS_COUNT is set to 2
// partitionCount for writing stage should be set to because session variable MAX_WRITER_TASK_COUNT is set to 2
exchange(REMOTE, FIXED_ARBITRARY_DISTRIBUTION, Optional.of(2),
values("column_a", "column_b"))))));
}
Expand All @@ -166,7 +166,7 @@ public void testPlanWhenInsertToUnpartitionedTableScaleWritersEnabled()
@Language("SQL") String query = "INSERT INTO unpartitioned_target_table VALUES ('one', 'two')";

Session session = Session.builder(getQueryRunner().getDefaultSession())
.setSystemProperty(MAX_WRITER_TASKS_COUNT, "2")
.setSystemProperty(MAX_WRITER_TASK_COUNT, "2")
.setSystemProperty(SCALE_WRITERS, "true")
.setCatalog(catalogName)
.build();
Expand All @@ -177,7 +177,7 @@ public void testPlanWhenInsertToUnpartitionedTableScaleWritersEnabled()
anyTree(
node(TableWriterNode.class,
exchange(LOCAL, Optional.empty(),
// partitionCount for writing stage should be set to because session variable MAX_WRITER_TASKS_COUNT is set to 2
// partitionCount for writing stage should be set to because session variable MAX_WRITER_TASK_COUNT is set to 2
exchange(REMOTE, SystemPartitioningHandle.SCALED_WRITER_ROUND_ROBIN_DISTRIBUTION, Optional.of(2),
values("column_a", "column_b"))))));
}
Expand All @@ -188,7 +188,7 @@ public void testPlanWhenInsertToUnpartitionedSourceDistribution()
@Language("SQL") String query = "INSERT INTO unpartitioned_target_table VALUES ('one', 'two')";

Session session = Session.builder(getQueryRunner().getDefaultSession())
.setSystemProperty(MAX_WRITER_TASKS_COUNT, "2")
.setSystemProperty(MAX_WRITER_TASK_COUNT, "2")
.setSystemProperty(SCALE_WRITERS, "false")
.setSystemProperty(REDISTRIBUTE_WRITES, "false")
.setCatalog(catalogName)
Expand All @@ -209,7 +209,7 @@ public void testPlanWhenInsertToPartitionedTablePreferredPartitioningEnabled()
@Language("SQL") String query = "INSERT INTO partitioned_target_table VALUES ('one', 'two'), ('three', 'four')";

Session session = Session.builder(getQueryRunner().getDefaultSession())
.setSystemProperty(MAX_WRITER_TASKS_COUNT, "2")
.setSystemProperty(MAX_WRITER_TASK_COUNT, "2")
.setSystemProperty(USE_PREFERRED_WRITE_PARTITIONING, "true")
.setCatalog(catalogName)
.build();
Expand All @@ -220,7 +220,7 @@ public void testPlanWhenInsertToPartitionedTablePreferredPartitioningEnabled()
anyTree(
node(TableWriterNode.class,
exchange(LOCAL,
// partitionCount for writing stage should be set to because session variable MAX_WRITER_TASKS_COUNT is set to 2
// partitionCount for writing stage should be set to because session variable MAX_WRITER_TASK_COUNT is set to 2
exchange(REMOTE, SCALED_WRITER_HASH_DISTRIBUTION, Optional.of(2),
values("column_a", "column_b"))))));
}
Expand All @@ -231,7 +231,7 @@ public void testPlanWhenInsertToPartitionedAndBucketedTable()
@Language("SQL") String query = "INSERT INTO partitioned_bucketed_target_table VALUES ('one', 'two'), ('three', 'four')";

Session session = Session.builder(getQueryRunner().getDefaultSession())
.setSystemProperty(MAX_WRITER_TASKS_COUNT, "2")
.setSystemProperty(MAX_WRITER_TASK_COUNT, "2")
.setCatalog(catalogName)
.build();

Expand All @@ -252,7 +252,7 @@ public void testPlanWhenMaxWriterTasksSpecified()
@Language("SQL") String query = "INSERT INTO partitioned_target_table VALUES ('one', 'two'), ('three', 'four')";

Session session = Session.builder(getQueryRunner().getDefaultSession())
.setSystemProperty(MAX_WRITER_TASKS_COUNT, "2")
.setSystemProperty(MAX_WRITER_TASK_COUNT, "2")
.setSystemProperty(USE_PREFERRED_WRITE_PARTITIONING, "true")
.setCatalog(catalogNameWithMaxWriterTasksSpecified)
.build();
Expand All @@ -274,7 +274,7 @@ public void testPlanWhenRetryPolicyIsTask()
@Language("SQL") String query = "INSERT INTO partitioned_target_table VALUES ('one', 'two'), ('three', 'four')";

Session session = Session.builder(getQueryRunner().getDefaultSession())
.setSystemProperty(MAX_WRITER_TASKS_COUNT, "2")
.setSystemProperty(MAX_WRITER_TASK_COUNT, "2")
.setSystemProperty(USE_PREFERRED_WRITE_PARTITIONING, "true")
.setSystemProperty(RETRY_POLICY, "TASK")
.setCatalog(catalogNameWithMaxWriterTasksSpecified)
Expand All @@ -296,7 +296,7 @@ public void testPlanWhenExecuteOnUnpartitionedTableScaleWritersDisabled()
@Language("SQL") String query = "ALTER TABLE unpartitioned_target_table EXECUTE optimize(file_size_threshold => '10MB')";

Session session = Session.builder(getQueryRunner().getDefaultSession())
.setSystemProperty(MAX_WRITER_TASKS_COUNT, "2")
.setSystemProperty(MAX_WRITER_TASK_COUNT, "2")
.setSystemProperty(SCALE_WRITERS, "false")
.setCatalog(catalogName)
.build();
Expand All @@ -307,7 +307,7 @@ public void testPlanWhenExecuteOnUnpartitionedTableScaleWritersDisabled()
anyTree(
node(TableExecuteNode.class,
exchange(LOCAL, Optional.empty(),
// partitionCount for writing stage should be set to because session variable MAX_WRITER_TASKS_COUNT is set to 2
// partitionCount for writing stage should be set to because session variable MAX_WRITER_TASK_COUNT is set to 2
exchange(REMOTE, FIXED_ARBITRARY_DISTRIBUTION, Optional.of(2),
tableScan(unPartitionedTable))))));
}
Expand All @@ -318,7 +318,7 @@ public void testPlanWhenTableExecuteToUnpartitionedTableScaleWritersEnabled()
@Language("SQL") String query = "ALTER TABLE unpartitioned_target_table EXECUTE optimize(file_size_threshold => '10MB')";

Session session = Session.builder(getQueryRunner().getDefaultSession())
.setSystemProperty(MAX_WRITER_TASKS_COUNT, "2")
.setSystemProperty(MAX_WRITER_TASK_COUNT, "2")
.setSystemProperty(SCALE_WRITERS, "true")
.setCatalog(catalogName)
.build();
Expand All @@ -329,7 +329,7 @@ public void testPlanWhenTableExecuteToUnpartitionedTableScaleWritersEnabled()
anyTree(
node(TableExecuteNode.class,
exchange(LOCAL, Optional.empty(),
// partitionCount for writing stage should be set to because session variable MAX_WRITER_TASKS_COUNT is set to 2
// partitionCount for writing stage should be set to because session variable MAX_WRITER_TASK_COUNT is set to 2
exchange(REMOTE, SystemPartitioningHandle.SCALED_WRITER_ROUND_ROBIN_DISTRIBUTION, Optional.of(2),
tableScan(unPartitionedTable))))));
}
Expand All @@ -340,7 +340,7 @@ public void testPlanWhenTableExecuteToUnpartitionedSourceDistribution()
@Language("SQL") String query = "ALTER TABLE unpartitioned_target_table EXECUTE optimize(file_size_threshold => '10MB')";

Session session = Session.builder(getQueryRunner().getDefaultSession())
.setSystemProperty(MAX_WRITER_TASKS_COUNT, "2")
.setSystemProperty(MAX_WRITER_TASK_COUNT, "2")
.setSystemProperty(SCALE_WRITERS, "false")
.setSystemProperty(REDISTRIBUTE_WRITES, "false")
.setCatalog(catalogName)
Expand All @@ -361,7 +361,7 @@ public void testPlanWhenTableExecuteToPartitionedTablePreferredPartitioningEnabl
@Language("SQL") String query = "ALTER TABLE partitioned_target_table EXECUTE optimize(file_size_threshold => '10MB')";

Session session = Session.builder(getQueryRunner().getDefaultSession())
.setSystemProperty(MAX_WRITER_TASKS_COUNT, "2")
.setSystemProperty(MAX_WRITER_TASK_COUNT, "2")
.setSystemProperty(USE_PREFERRED_WRITE_PARTITIONING, "true")
.setCatalog(catalogName)
.build();
Expand All @@ -372,7 +372,7 @@ public void testPlanWhenTableExecuteToPartitionedTablePreferredPartitioningEnabl
anyTree(
node(TableExecuteNode.class,
exchange(LOCAL,
// partitionCount for writing stage should be set to because session variable MAX_WRITER_TASKS_COUNT is set to 2
// partitionCount for writing stage should be set to because session variable MAX_WRITER_TASK_COUNT is set to 2
exchange(REMOTE, SCALED_WRITER_HASH_DISTRIBUTION, Optional.of(2),
node(TableScanNode.class))))));
}
Expand All @@ -383,7 +383,7 @@ public void testPlanTableExecuteWhenMaxWriterTasksSpecified()
@Language("SQL") String query = "ALTER TABLE partitioned_target_table EXECUTE optimize(file_size_threshold => '10MB')";

Session session = Session.builder(getQueryRunner().getDefaultSession())
.setSystemProperty(MAX_WRITER_TASKS_COUNT, "2")
.setSystemProperty(MAX_WRITER_TASK_COUNT, "2")
.setSystemProperty(USE_PREFERRED_WRITE_PARTITIONING, "true")
.setCatalog(catalogNameWithMaxWriterTasksSpecified)
.build();
Expand All @@ -405,7 +405,7 @@ public void testPlanTableExecuteWhenRetryPolicyIsTask()
@Language("SQL") String query = "ALTER TABLE partitioned_target_table EXECUTE optimize(file_size_threshold => '10MB')";

Session session = Session.builder(getQueryRunner().getDefaultSession())
.setSystemProperty(MAX_WRITER_TASKS_COUNT, "2")
.setSystemProperty(MAX_WRITER_TASK_COUNT, "2")
.setSystemProperty(USE_PREFERRED_WRITE_PARTITIONING, "true")
.setSystemProperty(RETRY_POLICY, "TASK")
.setCatalog(catalogNameWithMaxWriterTasksSpecified)
Expand Down
4 changes: 2 additions & 2 deletions docs/src/main/sphinx/admin/properties-query-management.md
Original file line number Diff line number Diff line change
Expand Up @@ -61,11 +61,11 @@ joins, aggregations, partitioned window functions and others.
The minimum number of partitions to use for processing distributed operations in write queries,
such as joins, aggregations, partitioned window functions and others.

## `query.max-writer-tasks-count`
## `query.max-writer-task-count`

- **Type:** {ref}`prop-type-integer`
- **Default value:** `100`
- **Session property:** `max_writer_tasks_count`
- **Session property:** `max_writer_task_count`

The maximum number of tasks that will take part in writing data during
`INSERT`, `CREATE TABLE AS SELECT` and `EXECUTE` queries.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@
import static io.trino.SystemSessionProperties.FAULT_TOLERANT_EXECUTION_ARBITRARY_DISTRIBUTION_WRITE_TASK_TARGET_SIZE_MIN;
import static io.trino.SystemSessionProperties.FAULT_TOLERANT_EXECUTION_HASH_DISTRIBUTION_COMPUTE_TASK_TARGET_SIZE;
import static io.trino.SystemSessionProperties.FAULT_TOLERANT_EXECUTION_HASH_DISTRIBUTION_WRITE_TASK_TARGET_SIZE;
import static io.trino.SystemSessionProperties.MAX_WRITER_TASKS_COUNT;
import static io.trino.SystemSessionProperties.MAX_WRITER_TASK_COUNT;
import static io.trino.SystemSessionProperties.QUERY_MAX_MEMORY_PER_NODE;
import static io.trino.SystemSessionProperties.REDISTRIBUTE_WRITES;
import static io.trino.SystemSessionProperties.SCALE_WRITERS;
Expand Down Expand Up @@ -4227,21 +4227,21 @@ public void testMultipleWritersWhenTaskScaleWritersIsEnabledWithMemoryLimit()
}

@Test
public void testWriterTasksCountLimitUnpartitioned()
public void testWriterTaskCountLimitUnpartitioned()
{
testLimitWriterTasks(2, 2, true, true, false, DataSize.of(1, MEGABYTE));
testLimitWriterTasks(2, 2, false, true, false, DataSize.of(1, MEGABYTE));
testLimitWriterTasks(2, 3, false, false, false, DataSize.of(1, MEGABYTE));
}

@Test
public void testWriterTasksCountLimitPartitionedScaleWritersDisabled()
public void testWriterTaskCountLimitPartitionedScaleWritersDisabled()
{
testLimitWriterTasks(2, 2, false, true, true, DataSize.of(1, MEGABYTE));
}

@Test
public void testWriterTasksCountLimitPartitionedScaleWritersEnabled()
public void testWriterTaskCountLimitPartitionedScaleWritersEnabled()
{
testLimitWriterTasks(2, 4, true, true, true, DataSize.of(1, MEGABYTE));
// Since we track page size for scaling writer instead of actual compressed output file size, we need to have a
Expand All @@ -4253,7 +4253,7 @@ private void testLimitWriterTasks(int maxWriterTasks, int expectedFilesCount, bo
{
Session session = Session.builder(getSession())
.setSystemProperty(SCALE_WRITERS, Boolean.toString(scaleWritersEnabled))
.setSystemProperty(MAX_WRITER_TASKS_COUNT, Integer.toString(maxWriterTasks))
.setSystemProperty(MAX_WRITER_TASK_COUNT, Integer.toString(maxWriterTasks))
.setSystemProperty(REDISTRIBUTE_WRITES, Boolean.toString(redistributeWrites))
.setSystemProperty(TASK_MIN_WRITER_COUNT, "1")
.setSystemProperty(WRITER_SCALING_MIN_DATA_PROCESSED, writerScalingMinDataProcessed.toString())
Expand Down Expand Up @@ -8346,7 +8346,7 @@ private void testOptimizeWithWriterScaling(boolean scaleWriters, boolean taskSca
.setSystemProperty("task_min_writer_count", "1");

if (!scaleWriters) {
writerScalingSessionBuilder.setSystemProperty("max_writer_tasks_count", "1");
writerScalingSessionBuilder.setSystemProperty("max_writer_task_count", "1");
}

assertUpdate(writerScalingSessionBuilder.build(), "ALTER TABLE " + tableName + " EXECUTE optimize(file_size_threshold => '10kB')");
Expand Down

0 comments on commit b6eebec

Please sign in to comment.