Skip to content

Commit

Permalink
fix: Removes config check to insert SPQ Processor (confluentinc#8062)
Browse files Browse the repository at this point in the history
* fix: Adds a new config check to insert SPQ Processor
  • Loading branch information
AlanConfluent committed Sep 1, 2021
1 parent ccb8936 commit a0be1d5
Show file tree
Hide file tree
Showing 6 changed files with 20 additions and 2 deletions.
14 changes: 14 additions & 0 deletions ksqldb-common/src/main/java/io/confluent/ksql/util/KsqlConfig.java
Expand Up @@ -285,6 +285,13 @@ public class KsqlConfig extends AbstractConfig {
+ "functions, aggregations, or joins, but may include projections and filters.";
public static final boolean KSQL_QUERY_PUSH_SCALABLE_ENABLED_DEFAULT = false;

public static final String KSQL_QUERY_PUSH_SCALABLE_REGISTRY_INSTALLED
= "ksql.query.push.scalable.registry.installed";
public static final String KSQL_QUERY_PUSH_SCALABLE_REGISTRY_INSTALLED_DOC =
"Enables whether scalable push registry should be installed. This is a requirement of "
+ "enabling scalable push queries using ksql.query.push.scalable.enabled.";
public static final boolean KSQL_QUERY_PUSH_SCALABLE_REGISTRY_INSTALLED_DEFAULT = false;

public static final String KSQL_QUERY_PUSH_SCALABLE_NEW_NODE_CONTINUITY
= "ksql.query.push.scalable.new.node.continuity";
public static final String KSQL_QUERY_PUSH_SCALABLE_NEW_NODE_CONTINUITY_DOC =
Expand Down Expand Up @@ -923,6 +930,13 @@ private static ConfigDef buildConfigDef(final ConfigGeneration generation) {
Importance.LOW,
KSQL_QUERY_PUSH_SCALABLE_ENABLED_DOC
)
.define(
KSQL_QUERY_PUSH_SCALABLE_REGISTRY_INSTALLED,
Type.BOOLEAN,
KSQL_QUERY_PUSH_SCALABLE_REGISTRY_INSTALLED_DEFAULT,
Importance.LOW,
KSQL_QUERY_PUSH_SCALABLE_REGISTRY_INSTALLED_DOC
)
.define(
KSQL_QUERY_PUSH_SCALABLE_NEW_NODE_CONTINUITY,
Type.BOOLEAN,
Expand Down
Expand Up @@ -30,7 +30,7 @@
*/
public class ProcessingQueue {

static final int BLOCKING_QUEUE_CAPACITY = 100;
static final int BLOCKING_QUEUE_CAPACITY = 1000;

private final Deque<TableRow> rowQueue;
private final QueryId queryId;
Expand Down
Expand Up @@ -225,7 +225,7 @@ private static Optional<ScalablePushRegistry> applyScalablePushProcessor(
final Map<String, Object> streamsProperties,
final KsqlConfig ksqlConfig
) {
if (!ksqlConfig.getBoolean(KsqlConfig.KSQL_QUERY_PUSH_SCALABLE_ENABLED)) {
if (!ksqlConfig.getBoolean(KsqlConfig.KSQL_QUERY_PUSH_SCALABLE_REGISTRY_INSTALLED)) {
return Optional.empty();
}
final KStream<?, GenericRow> stream;
Expand Down
Expand Up @@ -102,6 +102,7 @@ public class RestQueryTranslationTest {
.withProperty(KsqlConfig.SCHEMA_REGISTRY_URL_PROPERTY, "set")
.withProperty(KsqlConfig.KSQL_QUERY_PULL_TABLE_SCAN_ENABLED, true)
.withProperty(KsqlConfig.KSQL_QUERY_PULL_INTERPRETER_ENABLED, true)
.withProperty(KsqlConfig.KSQL_QUERY_PUSH_SCALABLE_REGISTRY_INSTALLED, true)
.withProperty(KsqlConfig.KSQL_QUERY_PUSH_SCALABLE_ENABLED, true)
.withStaticServiceContext(TEST_HARNESS::getServiceContext)
.build();
Expand Down
Expand Up @@ -215,6 +215,7 @@ public class RestApiTest {
.withProperty("sasl.mechanism", "PLAIN")
.withProperty("sasl.jaas.config", SecureKafkaHelper.buildJaasConfig(NORMAL_USER))
.withProperties(ClientTrustStore.trustStoreProps())
.withProperty(KsqlConfig.KSQL_QUERY_PUSH_SCALABLE_REGISTRY_INSTALLED, true)
.withProperty(KsqlConfig.KSQL_QUERY_PUSH_SCALABLE_ENABLED, true)
.build();

Expand Down
Expand Up @@ -98,6 +98,7 @@ public class ScalablePushQueryFunctionalTest {
.withProperty(KsqlRestConfig.LISTENERS_CONFIG, "http://localhost:8088")
.withProperty(KsqlRestConfig.ADVERTISED_LISTENER_CONFIG, "http://localhost:8088")
.withProperty(KsqlConfig.KSQL_QUERY_PULL_ENABLE_STANDBY_READS, true)
.withProperty(KsqlConfig.KSQL_QUERY_PUSH_SCALABLE_REGISTRY_INSTALLED, true)
.withProperty(KsqlConfig.KSQL_QUERY_PUSH_SCALABLE_ENABLED, true)
// Make rebalances happen quicker for the sake of the test
.withProperty(KSQL_STREAMS_PREFIX + "max.poll.interval.ms", 5000)
Expand All @@ -112,6 +113,7 @@ public class ScalablePushQueryFunctionalTest {
.withProperty(KsqlRestConfig.LISTENERS_CONFIG, "http://localhost:8089")
.withProperty(KsqlRestConfig.ADVERTISED_LISTENER_CONFIG, "http://localhost:8089")
.withProperty(KsqlConfig.KSQL_QUERY_PULL_ENABLE_STANDBY_READS, true)
.withProperty(KsqlConfig.KSQL_QUERY_PUSH_SCALABLE_REGISTRY_INSTALLED, true)
.withProperty(KsqlConfig.KSQL_QUERY_PUSH_SCALABLE_ENABLED, true)
// Make rebalances happen quicker for the sake of the test
.withProperty(KSQL_STREAMS_PREFIX + "max.poll.interval.ms", 5000)
Expand Down

0 comments on commit a0be1d5

Please sign in to comment.