Skip to content

Commit

Permalink
feat: implements ARRAY_JOIN as requested in (confluentinc#5028) (conf…
Browse files Browse the repository at this point in the history
  • Loading branch information
hpgrahsl authored and stevenpyzhang committed Jun 17, 2020
1 parent e4b9971 commit eb29a98
Show file tree
Hide file tree
Showing 13 changed files with 971 additions and 0 deletions.
11 changes: 11 additions & 0 deletions docs/developer-guide/ksqldb-reference/scalar-functions.md
Original file line number Diff line number Diff line change
Expand Up @@ -380,6 +380,17 @@ SLICE(col1, from, to)
Slices a list based on the supplied indices. The indices start at 1 and
include both endpoints.

### `ARRAY_JOIN`

```sql
ARRAY_JOIN(col1, delimiter)
```

Creates a flat string representation of all the elements contained in the given array.
The elements in the resulting string are separated by the chosen `delimiter`,
which is an optional parameter that falls back to a comma `,`. The current implementation only
allows for array elements of primitive ksqlDB types.

## Strings

### `CHR`
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
/*
* Copyright 2020 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"; you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.function.udf.array;

import com.google.common.collect.ImmutableSet;
import io.confluent.ksql.function.KsqlFunctionException;
import io.confluent.ksql.function.udf.Udf;
import io.confluent.ksql.function.udf.UdfDescription;
import io.confluent.ksql.function.udf.UdfParameter;
import io.confluent.ksql.util.KsqlConstants;
import java.math.BigDecimal;
import java.util.List;
import java.util.Set;
import java.util.StringJoiner;

@SuppressWarnings("MethodMayBeStatic") // UDF methods can not be static.
@UdfDescription(
name = "ARRAY_JOIN",
description = "joins the array elements into a flat string representation",
author = KsqlConstants.CONFLUENT_AUTHOR
)
public class ArrayJoin {

private static final String DEFAULT_DELIMITER = ",";
private static final Set<Class> KSQL_PRIMITIVES = ImmutableSet.of(
Boolean.class,Integer.class,Long.class,Double.class,BigDecimal.class,String.class
);

@Udf
public <T> String join(
@UdfParameter(description = "the array to join using the default delimiter '"
+ DEFAULT_DELIMITER + "'") final List<T> array
) {
return join(array, DEFAULT_DELIMITER);
}

@Udf
public <T> String join(
@UdfParameter(description = "the array to join using the specified delimiter")
final List<T> array,
@UdfParameter(description = "the string to be used as element delimiter")
final String delimiter
) {

if (array == null) {
return null;
}

final StringJoiner sj = new StringJoiner(delimiter == null ? "" : delimiter);
array.forEach(e -> processElement(e, sj));
return sj.toString();

}

@SuppressWarnings("unchecked")
private static <T> void processElement(final T element, final StringJoiner joiner) {

if (element == null || KSQL_PRIMITIVES.contains(element.getClass())) {
handlePrimitiveType(element, joiner);
} else {
throw new KsqlFunctionException("error: hit element of type "
+ element.getClass().getTypeName() + " which is currently not supported");
}

}

private static void handlePrimitiveType(final Object element, final StringJoiner joiner) {
joiner.add(element != null ? element.toString() : null);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
/*
* Copyright 2020 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"; you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.function.udf.array;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.nullValue;
import static org.junit.Assert.assertThrows;

import io.confluent.ksql.function.KsqlFunctionException;
import java.math.BigDecimal;
import java.math.BigInteger;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import org.junit.Test;

public class ArrayJoinTest {

private static final String CUSTOM_DELIMITER = "|";

private final ArrayJoin arrayJoinUDF = new ArrayJoin();

@Test
public void shouldReturnNullForNullInput() {
assertThat(arrayJoinUDF.join(null), nullValue());
assertThat(arrayJoinUDF.join(null,CUSTOM_DELIMITER), nullValue());
}

@Test
public void shouldReturnEmptyStringForEmptyArrays() {
assertThat(arrayJoinUDF.join(Collections.emptyList()).isEmpty(),is(true));
assertThat(arrayJoinUDF.join(Collections.emptyList(),CUSTOM_DELIMITER).isEmpty(),is(true));
}

@Test
public void shouldReturnCorrectStringForFlatArraysWithPrimitiveTypes() {

assertThat(arrayJoinUDF.join(Arrays.asList(true, null, false),""),
is("truenullfalse")
);
assertThat(arrayJoinUDF.join(Arrays.asList(true, null, false)),
is("true,null,false")
);
assertThat(arrayJoinUDF.join(Arrays.asList(true,null,false),CUSTOM_DELIMITER),
is("true"+CUSTOM_DELIMITER+"null"+CUSTOM_DELIMITER+"false")
);

assertThat(arrayJoinUDF.join(Arrays.asList(1,23,-42,0),null), is("123-420"));
assertThat(arrayJoinUDF.join(Arrays.asList(1,23,-42,0)), is("1,23,-42,0"));
assertThat(arrayJoinUDF.join(Arrays.asList(1,23,-42,0),CUSTOM_DELIMITER),
is("1"+CUSTOM_DELIMITER+"23"+CUSTOM_DELIMITER+"-42"+CUSTOM_DELIMITER+"0")
);

assertThat(arrayJoinUDF.join(Arrays.asList(-4294967297L, 8589934592L),""),
is("-42949672978589934592")
);
assertThat(arrayJoinUDF.join(Arrays.asList(-4294967297L, 8589934592L)),
is("-4294967297,8589934592")
);
assertThat(arrayJoinUDF.join(Arrays.asList(-4294967297L, 8589934592L), CUSTOM_DELIMITER),
is("-4294967297"+CUSTOM_DELIMITER+"8589934592")
);

assertThat(arrayJoinUDF.join(Arrays.asList(1.23,-23.42,0.0),null),
is("1.23-23.420.0")
);
assertThat(arrayJoinUDF.join(Arrays.asList(1.23,-23.42,0.0)),
is("1.23,-23.42,0.0")
);
assertThat(arrayJoinUDF.join(Arrays.asList(1.23,-23.42,0.0),CUSTOM_DELIMITER),
is("1.23"+CUSTOM_DELIMITER+"-23.42"+CUSTOM_DELIMITER+"0.0")
);

assertThat(arrayJoinUDF.join(
Arrays.asList(new BigDecimal("123.45"), new BigDecimal("987.65")),null
),
is("123.45987.65")
);
assertThat(arrayJoinUDF.join(Arrays.asList(new BigDecimal("123.45"), new BigDecimal("987.65"))),
is("123.45,987.65")
);
assertThat(arrayJoinUDF.join(
Arrays.asList(new BigDecimal("123.45"), new BigDecimal("987.65")),CUSTOM_DELIMITER),
is("123.45"+CUSTOM_DELIMITER+"987.65")
);

assertThat(arrayJoinUDF.join(Arrays.asList("Hello","From","","Ksqldb","Udf"),""),
is("HelloFromKsqldbUdf")
);
assertThat(arrayJoinUDF.join(Arrays.asList("Hello","From","","Ksqldb","Udf")),
is("Hello,From,,Ksqldb,Udf")
);
assertThat(
arrayJoinUDF.join(Arrays.asList("hello","from","","ksqldb","udf",null),CUSTOM_DELIMITER),
is("hello"+CUSTOM_DELIMITER+"from"+CUSTOM_DELIMITER+CUSTOM_DELIMITER
+"ksqldb"+CUSTOM_DELIMITER+"udf"+CUSTOM_DELIMITER+"null")
);

}

@Test
public void shouldThrowExceptionForExamplesOfUnsupportedElementTypes() {
assertThrows(KsqlFunctionException.class,
() -> arrayJoinUDF.join(Arrays.asList('a','b')));
assertThrows(KsqlFunctionException.class,
() -> arrayJoinUDF.join(Arrays.asList(BigInteger.ONE,BigInteger.ZERO)));
assertThrows(KsqlFunctionException.class,
() -> arrayJoinUDF.join(Arrays.asList(-23.0f,42.42f,0.0f)));
assertThrows(KsqlFunctionException.class,
() -> arrayJoinUDF.join(Arrays.asList(
new HashSet<>(Arrays.asList("foo", "blah")),
new HashSet<>(Arrays.asList("ksqlDB", "UDF"))
))
);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
{
"plan" : [ {
"@type" : "ksqlPlanV1",
"statementText" : "CREATE STREAM TEST (ID STRING KEY, BOOLEANARRAY ARRAY<BOOLEAN>, INTARRAY ARRAY<INTEGER>, BIGINTARRAY ARRAY<BIGINT>, DOUBLEARRAY ARRAY<DOUBLE>, DECIMALARRAY ARRAY<DECIMAL(5, 2)>, STRINGARRAY ARRAY<STRING>) WITH (KAFKA_TOPIC='test_topic', VALUE_FORMAT='JSON');",
"ddlCommand" : {
"@type" : "createStreamV1",
"sourceName" : "TEST",
"schema" : "`ID` STRING KEY, `BOOLEANARRAY` ARRAY<BOOLEAN>, `INTARRAY` ARRAY<INTEGER>, `BIGINTARRAY` ARRAY<BIGINT>, `DOUBLEARRAY` ARRAY<DOUBLE>, `DECIMALARRAY` ARRAY<DECIMAL(5, 2)>, `STRINGARRAY` ARRAY<STRING>",
"topicName" : "test_topic",
"formats" : {
"keyFormat" : {
"format" : "KAFKA"
},
"valueFormat" : {
"format" : "JSON"
}
}
}
}, {
"@type" : "ksqlPlanV1",
"statementText" : "CREATE STREAM OUTPUT AS SELECT\n TEST.ID ID,\n ARRAY_JOIN(TEST.BOOLEANARRAY, '|') KSQL_COL_0,\n ARRAY_JOIN(TEST.INTARRAY, '?') KSQL_COL_1,\n ARRAY_JOIN(TEST.BIGINTARRAY, ';') KSQL_COL_2,\n ARRAY_JOIN(TEST.DOUBLEARRAY, ' ') KSQL_COL_3,\n ARRAY_JOIN(TEST.DECIMALARRAY, '#') KSQL_COL_4,\n ARRAY_JOIN(TEST.STRINGARRAY, '_') KSQL_COL_5\nFROM TEST TEST\nEMIT CHANGES",
"ddlCommand" : {
"@type" : "createStreamV1",
"sourceName" : "OUTPUT",
"schema" : "`ID` STRING KEY, `KSQL_COL_0` STRING, `KSQL_COL_1` STRING, `KSQL_COL_2` STRING, `KSQL_COL_3` STRING, `KSQL_COL_4` STRING, `KSQL_COL_5` STRING",
"topicName" : "OUTPUT",
"formats" : {
"keyFormat" : {
"format" : "KAFKA"
},
"valueFormat" : {
"format" : "JSON"
}
}
},
"queryPlan" : {
"sources" : [ "TEST" ],
"sink" : "OUTPUT",
"physicalPlan" : {
"@type" : "streamSinkV1",
"properties" : {
"queryContext" : "OUTPUT"
},
"source" : {
"@type" : "streamSelectV1",
"properties" : {
"queryContext" : "Project"
},
"source" : {
"@type" : "streamSourceV1",
"properties" : {
"queryContext" : "KsqlTopic/Source"
},
"topicName" : "test_topic",
"formats" : {
"keyFormat" : {
"format" : "KAFKA"
},
"valueFormat" : {
"format" : "JSON"
}
},
"sourceSchema" : "`ID` STRING KEY, `BOOLEANARRAY` ARRAY<BOOLEAN>, `INTARRAY` ARRAY<INTEGER>, `BIGINTARRAY` ARRAY<BIGINT>, `DOUBLEARRAY` ARRAY<DOUBLE>, `DECIMALARRAY` ARRAY<DECIMAL(5, 2)>, `STRINGARRAY` ARRAY<STRING>"
},
"keyColumnNames" : [ "ID" ],
"selectExpressions" : [ "ARRAY_JOIN(BOOLEANARRAY, '|') AS KSQL_COL_0", "ARRAY_JOIN(INTARRAY, '?') AS KSQL_COL_1", "ARRAY_JOIN(BIGINTARRAY, ';') AS KSQL_COL_2", "ARRAY_JOIN(DOUBLEARRAY, ' ') AS KSQL_COL_3", "ARRAY_JOIN(DECIMALARRAY, '#') AS KSQL_COL_4", "ARRAY_JOIN(STRINGARRAY, '_') AS KSQL_COL_5" ]
},
"formats" : {
"keyFormat" : {
"format" : "KAFKA"
},
"valueFormat" : {
"format" : "JSON"
}
},
"topicName" : "OUTPUT"
},
"queryId" : "CSAS_OUTPUT_0"
}
} ],
"configs" : {
"ksql.extension.dir" : "ext",
"ksql.streams.cache.max.bytes.buffering" : "0",
"ksql.security.extension.class" : null,
"ksql.transient.prefix" : "transient_",
"ksql.persistence.wrap.single.values" : "true",
"ksql.authorization.cache.expiry.time.secs" : "30",
"ksql.schema.registry.url" : "",
"ksql.streams.default.deserialization.exception.handler" : "io.confluent.ksql.errors.LogMetricAndContinueExceptionHandler",
"ksql.output.topic.name.prefix" : "",
"ksql.streams.auto.offset.reset" : "earliest",
"ksql.query.pull.enable.standby.reads" : "false",
"ksql.connect.url" : "http://localhost:8083",
"ksql.service.id" : "some.ksql.service.id",
"ksql.internal.topic.min.insync.replicas" : "1",
"ksql.streams.shutdown.timeout.ms" : "300000",
"ksql.internal.topic.replicas" : "1",
"ksql.insert.into.values.enabled" : "true",
"ksql.query.pull.max.allowed.offset.lag" : "9223372036854775807",
"ksql.query.pull.max.qps" : "2147483647",
"ksql.streams.default.production.exception.handler" : "io.confluent.ksql.errors.ProductionExceptionHandlerUtil$LogAndFailProductionExceptionHandler",
"ksql.access.validator.enable" : "auto",
"ksql.streams.bootstrap.servers" : "localhost:0",
"ksql.streams.commit.interval.ms" : "2000",
"ksql.metric.reporters" : "",
"ksql.query.pull.metrics.enabled" : "false",
"ksql.streams.auto.commit.interval.ms" : "0",
"ksql.metrics.extension" : null,
"ksql.streams.topology.optimization" : "all",
"ksql.hidden.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses",
"ksql.streams.num.stream.threads" : "4",
"ksql.timestamp.throw.on.invalid" : "false",
"ksql.authorization.cache.max.entries" : "10000",
"ksql.metrics.tags.custom" : "",
"ksql.pull.queries.enable" : "true",
"ksql.udfs.enabled" : "true",
"ksql.udf.enable.security.manager" : "true",
"ksql.connect.worker.config" : "",
"ksql.sink.window.change.log.additional.retention" : "1000000",
"ksql.readonly.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses",
"ksql.udf.collect.metrics" : "false",
"ksql.persistent.prefix" : "query_",
"ksql.query.persistent.active.limit" : "2147483647",
"ksql.error.classifier.regex" : ""
}
}
Loading

0 comments on commit eb29a98

Please sign in to comment.