Skip to content

Commit

Permalink
KAFKA-15031: Add plugin.discovery to Connect worker configuration (KI…
Browse files Browse the repository at this point in the history
…P-898) (apache#14055)

Reviewers: Chris Egerton <chrise@aiven.io>
  • Loading branch information
gharris1727 committed Aug 8, 2023
1 parent 60a5117 commit ff4fed5
Show file tree
Hide file tree
Showing 27 changed files with 688 additions and 7 deletions.
@@ -0,0 +1,16 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

org.apache.kafka.connect.storage.StringConverter
@@ -0,0 +1,17 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

org.apache.kafka.connect.storage.SimpleHeaderConverter
org.apache.kafka.connect.storage.StringConverter
@@ -0,0 +1,16 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

org.apache.kafka.connect.sink.SinkConnectorTest$TestSinkConnector
@@ -0,0 +1,16 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

org.apache.kafka.connect.source.SourceConnectorTest$TestSourceConnector
@@ -0,0 +1,16 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

org.apache.kafka.connect.file.FileStreamSinkConnector
@@ -0,0 +1,16 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

org.apache.kafka.connect.file.FileStreamSourceConnector
@@ -0,0 +1,16 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

org.apache.kafka.connect.json.JsonConverter
@@ -0,0 +1,16 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

org.apache.kafka.connect.json.JsonConverter
@@ -0,0 +1,18 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

org.apache.kafka.connect.mirror.MirrorCheckpointConnector
org.apache.kafka.connect.mirror.MirrorHeartbeatConnector
org.apache.kafka.connect.mirror.MirrorSourceConnector
Expand Up @@ -25,7 +25,9 @@
import org.apache.kafka.common.config.ConfigDef.Importance;
import org.apache.kafka.common.config.ConfigDef.Type;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.runtime.isolation.PluginDiscoveryMode;
import org.apache.kafka.connect.runtime.rest.RestServerConfig;
import org.apache.kafka.connect.storage.SimpleHeaderConverter;
import org.slf4j.Logger;
Expand All @@ -35,13 +37,18 @@
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ExecutionException;

import static org.apache.kafka.common.config.ConfigDef.Range.atLeast;
import static org.apache.kafka.common.config.ConfigDef.ValidString.in;
import static org.apache.kafka.connect.runtime.SourceConnectorConfig.TOPIC_CREATION_PREFIX;
import static org.apache.kafka.connect.runtime.isolation.PluginDiscoveryMode.ONLY_SCAN;
import static org.apache.kafka.connect.runtime.isolation.PluginDiscoveryMode.HYBRID_WARN;
import static org.apache.kafka.connect.runtime.isolation.PluginDiscoveryMode.HYBRID_FAIL;
import static org.apache.kafka.connect.runtime.isolation.PluginDiscoveryMode.SERVICE_LOAD;

/**
* Common base class providing configuration for Kafka Connect workers, whether standalone or distributed.
Expand Down Expand Up @@ -122,6 +129,18 @@ public class WorkerConfig extends AbstractConfig {
+ "by the worker's scanner before config providers are initialized and used to "
+ "replace variables.";

public static final String PLUGIN_DISCOVERY_CONFIG = "plugin.discovery";
protected static final String PLUGIN_DISCOVERY_DOC = "Method to use to discover plugins present in the classpath "
+ "and plugin.path configuration. This can be one of multiple values with the following meanings:\n"
+ "* " + ONLY_SCAN + ": Discover plugins only by reflection. "
+ "Plugins which are not discoverable by ServiceLoader will not impact worker startup.\n"
+ "* " + HYBRID_WARN + ": Discover plugins reflectively and by ServiceLoader. "
+ "Plugins which are not discoverable by ServiceLoader will print warnings during worker startup.\n"
+ "* " + HYBRID_FAIL + ": Discover plugins reflectively and by ServiceLoader. "
+ "Plugins which are not discoverable by ServiceLoader will cause worker startup to fail.\n"
+ "* " + SERVICE_LOAD + ": Discover plugins only by ServiceLoader. Faster startup than other modes. "
+ "Plugins which are not discoverable by ServiceLoader may not be usable.";

public static final String CONFIG_PROVIDERS_CONFIG = "config.providers";
protected static final String CONFIG_PROVIDERS_DOC =
"Comma-separated names of <code>ConfigProvider</code> classes, loaded and used "
Expand Down Expand Up @@ -199,6 +218,12 @@ protected static ConfigDef baseConfigDef() {
null,
Importance.LOW,
PLUGIN_PATH_DOC)
.define(PLUGIN_DISCOVERY_CONFIG,
Type.STRING,
PluginDiscoveryMode.HYBRID_WARN.toString(),
ConfigDef.CaseInsensitiveValidString.in(Utils.enumOptions(PluginDiscoveryMode.class)),
Importance.LOW,
PLUGIN_DISCOVERY_DOC)
.define(METRICS_SAMPLE_WINDOW_MS_CONFIG, Type.LONG,
30000, atLeast(0), Importance.LOW,
CommonClientConfigs.METRICS_SAMPLE_WINDOW_MS_DOC)
Expand Down Expand Up @@ -401,6 +426,16 @@ public static String pluginPath(Map<String, String> props) {
return props.get(WorkerConfig.PLUGIN_PATH_CONFIG);
}

public static PluginDiscoveryMode pluginDiscovery(Map<String, String> props) {
String value = props.getOrDefault(WorkerConfig.PLUGIN_DISCOVERY_CONFIG, PluginDiscoveryMode.HYBRID_WARN.toString());
try {
return PluginDiscoveryMode.valueOf(value.toUpperCase(Locale.ROOT));
} catch (IllegalArgumentException e) {
throw new ConnectException("Invalid " + PLUGIN_DISCOVERY_CONFIG + " value, must be one of "
+ Arrays.toString(Utils.enumOptions(PluginDiscoveryMode.class)));
}
}

public WorkerConfig(ConfigDef definition, Map<String, String> props) {
super(definition, props);
logInternalConverterRemovalWarnings(props);
Expand Down
@@ -0,0 +1,65 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.connect.runtime.isolation;

import java.util.Locale;

/**
* Strategy to use to discover plugins usable on a Connect worker.
* @see <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-898%3A+Modernize+Connect+plugin+discovery">KIP-898</a>
*/
public enum PluginDiscoveryMode {

/**
* Scan for plugins reflectively. This corresponds to the legacy behavior of Connect prior to KIP-898.
* <p>Note: the following plugins are still loaded using {@link java.util.ServiceLoader} in this mode:
* <ul>
* <li>{@link org.apache.kafka.common.config.provider.ConfigProvider}</li>
* <li>{@link org.apache.kafka.connect.rest.ConnectRestExtension}</li>
* <li>{@link org.apache.kafka.connect.connector.policy.ConnectorClientConfigOverridePolicy}</li>
* </ul>
*/
ONLY_SCAN,
/**
* Scan for plugins reflectively and via {@link java.util.ServiceLoader}.
* Emit warnings if one or more plugins is not available via {@link java.util.ServiceLoader}
*/
HYBRID_WARN,
/**
* Scan for plugins reflectively and via {@link java.util.ServiceLoader}.
* Fail worker during startup if one or more plugins is not available via {@link java.util.ServiceLoader}
*/
HYBRID_FAIL,
/**
* Discover plugins via {@link java.util.ServiceLoader} only.
* Plugins may not be usable if they are not available via {@link java.util.ServiceLoader}
*/
SERVICE_LOAD;

public boolean reflectivelyScan() {
return this != SERVICE_LOAD;
}

public boolean serviceLoad() {
return this != ONLY_SCAN;
}

@Override
public String toString() {
return name().toLowerCase(Locale.ROOT);
}
}

0 comments on commit ff4fed5

Please sign in to comment.