Skip to content

Commit

Permalink
KAFKA-3209: KIP-66: more single message transforms
Browse files Browse the repository at this point in the history
Renames `HoistToStruct` SMT to `HoistField`.

Adds the following SMTs:
`ExtractField`
`MaskField`
`RegexRouter`
`ReplaceField`
`SetSchemaMetadata`
`ValueToKey`

Adds HTML doc generation and updates to `connect.html`.

Author: Shikhar Bhushan <shikhar@confluent.io>

Reviewers: Ewen Cheslack-Postava <ewen@confluent.io>

Closes apache#2374 from shikhar/more-smt
  • Loading branch information
Shikhar Bhushan authored and soenkeliebau committed Feb 7, 2017
1 parent a4fb9f5 commit 597d918
Show file tree
Hide file tree
Showing 24 changed files with 1,779 additions and 138 deletions.
9 changes: 8 additions & 1 deletion build.gradle
Expand Up @@ -508,7 +508,7 @@ project(':core') {

task siteDocsTar(dependsOn: ['genProtocolErrorDocs', 'genProtocolApiKeyDocs', 'genProtocolMessageDocs',
'genProducerConfigDocs', 'genConsumerConfigDocs', 'genKafkaConfigDocs',
'genTopicConfigDocs', ':connect:runtime:genConnectConfigDocs',
'genTopicConfigDocs', ':connect:runtime:genConnectConfigDocs', ':connect:runtime:genConnectTransformationDocs',
':streams:genStreamsConfigDocs'], type: Tar) {
classifier = 'site-docs'
compression = Compression.GZIP
Expand Down Expand Up @@ -948,6 +948,13 @@ project(':connect:runtime') {
if( !generatedDocsDir.exists() ) { generatedDocsDir.mkdirs() }
standardOutput = new File(generatedDocsDir, "connect_config.html").newOutputStream()
}

task genConnectTransformationDocs(type: JavaExec) {
classpath = sourceSets.main.runtimeClasspath
main = 'org.apache.kafka.connect.tools.TransformationDoc'
if( !generatedDocsDir.exists() ) { generatedDocsDir.mkdirs() }
standardOutput = new File(generatedDocsDir, "connect_transforms.html").newOutputStream()
}
}

project(':connect:file') {
Expand Down
@@ -0,0 +1,87 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p/>
* http://www.apache.org/licenses/LICENSE-2.0
* <p/>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
**/

package org.apache.kafka.connect.tools;

import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.connect.transforms.ExtractField;
import org.apache.kafka.connect.transforms.HoistField;
import org.apache.kafka.connect.transforms.InsertField;
import org.apache.kafka.connect.transforms.MaskField;
import org.apache.kafka.connect.transforms.RegexRouter;
import org.apache.kafka.connect.transforms.ReplaceField;
import org.apache.kafka.connect.transforms.SetSchemaMetadata;
import org.apache.kafka.connect.transforms.TimestampRouter;
import org.apache.kafka.connect.transforms.ValueToKey;

import java.io.PrintStream;
import java.util.Arrays;
import java.util.List;

public class TransformationDoc {

private static final class DocInfo {
final String transformationName;
final String overview;
final ConfigDef configDef;

private DocInfo(String transformationName, String overview, ConfigDef configDef) {
this.transformationName = transformationName;
this.overview = overview;
this.configDef = configDef;
}
}

private static final List<DocInfo> TRANSFORMATIONS = Arrays.asList(
new DocInfo(InsertField.class.getName(), InsertField.OVERVIEW_DOC, InsertField.CONFIG_DEF),
new DocInfo(ReplaceField.class.getName(), ReplaceField.OVERVIEW_DOC, ReplaceField.CONFIG_DEF),
new DocInfo(MaskField.class.getName(), MaskField.OVERVIEW_DOC, MaskField.CONFIG_DEF),
new DocInfo(ValueToKey.class.getName(), ValueToKey.OVERVIEW_DOC, ValueToKey.CONFIG_DEF),
new DocInfo(HoistField.class.getName(), HoistField.OVERVIEW_DOC, HoistField.CONFIG_DEF),
new DocInfo(ExtractField.class.getName(), ExtractField.OVERVIEW_DOC, ExtractField.CONFIG_DEF),
new DocInfo(SetSchemaMetadata.class.getName(), SetSchemaMetadata.OVERVIEW_DOC, SetSchemaMetadata.CONFIG_DEF),
new DocInfo(TimestampRouter.class.getName(), TimestampRouter.OVERVIEW_DOC, TimestampRouter.CONFIG_DEF),
new DocInfo(RegexRouter.class.getName(), RegexRouter.OVERVIEW_DOC, RegexRouter.CONFIG_DEF)
);

private static void printTransformationHtml(PrintStream out, DocInfo docInfo) {
out.println("<div id=\"" + docInfo.transformationName + "\">");

out.print("<h5>");
out.print(docInfo.transformationName);
out.println("</h5>");

out.println(docInfo.overview);

out.println("<p/>");

out.println(docInfo.configDef.toHtmlTable());

out.println("</div>");
}

private static void printHtml(PrintStream out) throws NoSuchFieldException, IllegalAccessException, InstantiationException {
for (final DocInfo docInfo : TRANSFORMATIONS) {
printTransformationHtml(out, docInfo);
}
}

public static void main(String... args) throws Exception {
printHtml(System.out);
}

}
@@ -0,0 +1,114 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p/>
* http://www.apache.org/licenses/LICENSE-2.0
* <p/>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
**/

package org.apache.kafka.connect.transforms;

import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.connect.connector.ConnectRecord;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.transforms.util.SimpleConfig;

import java.util.Map;

import static org.apache.kafka.connect.transforms.util.Requirements.requireMap;
import static org.apache.kafka.connect.transforms.util.Requirements.requireStruct;

public abstract class ExtractField<R extends ConnectRecord<R>> implements Transformation<R> {

public static final String OVERVIEW_DOC =
"Extract the specified field from a Struct when schema present, or a Map in the case of schemaless data."
+ "<p/>Use the concrete transformation type designed for the record key (<code>" + Key.class.getCanonicalName() + "</code>) "
+ "or value (<code>" + Value.class.getCanonicalName() + "</code>).";

private static final String FIELD_CONFIG = "field";

public static final ConfigDef CONFIG_DEF = new ConfigDef()
.define(FIELD_CONFIG, ConfigDef.Type.STRING, ConfigDef.NO_DEFAULT_VALUE, ConfigDef.Importance.MEDIUM, "Field name to extract.");

private static final String PURPOSE = "field extraction";

private String fieldName;

@Override
public void configure(Map<String, ?> props) {
final SimpleConfig config = new SimpleConfig(CONFIG_DEF, props);
fieldName = config.getString(FIELD_CONFIG);
}

@Override
public R apply(R record) {
final Schema schema = operatingSchema(record);
if (schema == null) {
final Map<String, Object> value = requireMap(operatingValue(record), PURPOSE);
return newRecord(record, null, value.get(fieldName));
} else {
final Struct value = requireStruct(operatingValue(record), PURPOSE);
return newRecord(record, schema.field(fieldName).schema(), value.get(fieldName));
}
}

@Override
public void close() {
}

@Override
public ConfigDef config() {
return CONFIG_DEF;
}

protected abstract Schema operatingSchema(R record);

protected abstract Object operatingValue(R record);

protected abstract R newRecord(R record, Schema updatedSchema, Object updatedValue);

public static class Key<R extends ConnectRecord<R>> extends ExtractField<R> {
@Override
protected Schema operatingSchema(R record) {
return record.keySchema();
}

@Override
protected Object operatingValue(R record) {
return record.key();
}

@Override
protected R newRecord(R record, Schema updatedSchema, Object updatedValue) {
return record.newRecord(record.topic(), record.kafkaPartition(), updatedSchema, updatedValue, record.valueSchema(), record.value(), record.timestamp());
}
}

public static class Value<R extends ConnectRecord<R>> extends ExtractField<R> {
@Override
protected Schema operatingSchema(R record) {
return record.valueSchema();
}

@Override
protected Object operatingValue(R record) {
return record.value();
}

@Override
protected R newRecord(R record, Schema updatedSchema, Object updatedValue) {
return record.newRecord(record.topic(), record.kafkaPartition(), record.keySchema(), record.key(), updatedSchema, updatedValue, record.timestamp());
}
}

}
Expand Up @@ -27,15 +27,21 @@
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.transforms.util.SimpleConfig;

import java.util.Collections;
import java.util.Map;

public abstract class HoistToStruct<R extends ConnectRecord<R>> implements Transformation<R> {
public abstract class HoistField<R extends ConnectRecord<R>> implements Transformation<R> {

public static final String FIELD_CONFIG = "field";
public static final String OVERVIEW_DOC =
"Wrap data using the specified field name in a Struct when schema present, or a Map in the case of schemaless data."
+ "<p/>Use the concrete transformation type designed for the record key (<code>" + Key.class.getCanonicalName() + "</code>) "
+ "or value (<code>" + Value.class.getCanonicalName() + "</code>).";

private static final ConfigDef CONFIG_DEF = new ConfigDef()
private static final String FIELD_CONFIG = "field";

public static final ConfigDef CONFIG_DEF = new ConfigDef()
.define(FIELD_CONFIG, ConfigDef.Type.STRING, ConfigDef.NO_DEFAULT_VALUE, ConfigDef.Importance.MEDIUM,
"Field name for the single field that will be created in the resulting Struct.");
"Field name for the single field that will be created in the resulting Struct or Map.");

private Cache<Schema, Schema> schemaUpdateCache;

Expand All @@ -53,15 +59,19 @@ public R apply(R record) {
final Schema schema = operatingSchema(record);
final Object value = operatingValue(record);

Schema updatedSchema = schemaUpdateCache.get(schema);
if (updatedSchema == null) {
updatedSchema = SchemaBuilder.struct().field(fieldName, schema).build();
schemaUpdateCache.put(schema, updatedSchema);
}
if (schema == null) {
return newRecord(record, null, Collections.singletonMap(fieldName, value));
} else {
Schema updatedSchema = schemaUpdateCache.get(schema);
if (updatedSchema == null) {
updatedSchema = SchemaBuilder.struct().field(fieldName, schema).build();
schemaUpdateCache.put(schema, updatedSchema);
}

final Struct updatedValue = new Struct(updatedSchema).put(fieldName, value);
final Struct updatedValue = new Struct(updatedSchema).put(fieldName, value);

return newRecord(record, updatedSchema, updatedValue);
return newRecord(record, updatedSchema, updatedValue);
}
}

@Override
Expand All @@ -80,11 +90,7 @@ public ConfigDef config() {

protected abstract R newRecord(R record, Schema updatedSchema, Object updatedValue);

/**
* Wraps the record key in a {@link org.apache.kafka.connect.data.Struct} with specified field name.
*/
public static class Key<R extends ConnectRecord<R>> extends HoistToStruct<R> {

public static class Key<R extends ConnectRecord<R>> extends HoistField<R> {
@Override
protected Schema operatingSchema(R record) {
return record.keySchema();
Expand All @@ -99,14 +105,9 @@ protected Object operatingValue(R record) {
protected R newRecord(R record, Schema updatedSchema, Object updatedValue) {
return record.newRecord(record.topic(), record.kafkaPartition(), updatedSchema, updatedValue, record.valueSchema(), record.value(), record.timestamp());
}

}

/**
* Wraps the record value in a {@link org.apache.kafka.connect.data.Struct} with specified field name.
*/
public static class Value<R extends ConnectRecord<R>> extends HoistToStruct<R> {

public static class Value<R extends ConnectRecord<R>> extends HoistField<R> {
@Override
protected Schema operatingSchema(R record) {
return record.valueSchema();
Expand All @@ -121,7 +122,6 @@ protected Object operatingValue(R record) {
protected R newRecord(R record, Schema updatedSchema, Object updatedValue) {
return record.newRecord(record.topic(), record.kafkaPartition(), record.keySchema(), record.key(), updatedSchema, updatedValue, record.timestamp());
}

}

}

0 comments on commit 597d918

Please sign in to comment.