Skip to content

Commit

Permalink
fix: add new types to udaf functions (confluentinc#8081)
Browse files Browse the repository at this point in the history
* fix: add new types to udaf functions

* checkstyle

* typos

* indent
  • Loading branch information
Zara Lim committed Sep 1, 2021
1 parent d0e2ea7 commit a3ea6a4
Show file tree
Hide file tree
Showing 82 changed files with 10,766 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@
import io.confluent.ksql.function.udaf.UdafDescription;
import io.confluent.ksql.function.udaf.UdafFactory;
import io.confluent.ksql.util.KsqlConstants;
import java.nio.ByteBuffer;
import java.sql.Date;
import java.sql.Time;
import java.sql.Timestamp;
import java.util.List;
import java.util.Map;
import org.apache.kafka.common.Configurable;
Expand Down Expand Up @@ -67,6 +71,27 @@ public static TableUdaf<Boolean, List<Boolean>, List<Boolean>> createCollectList
return new Collect<>();
}

@UdafFactory(description = "collect values of a Timestamp field into a single Array")
public static TableUdaf<Timestamp, List<Timestamp>, List<Timestamp>>
createCollectListTimestamp() {
return new Collect<>();
}

@UdafFactory(description = "collect values of a Time field into a single Array")
public static TableUdaf<Time, List<Time>, List<Time>> createCollectListTime() {
return new Collect<>();
}

@UdafFactory(description = "collect values of a Date field into a single Array")
public static TableUdaf<Date, List<Date>, List<Date>> createCollectListDate() {
return new Collect<>();
}

@UdafFactory(description = "collect values of a Bytes field into a single Array")
public static TableUdaf<ByteBuffer, List<ByteBuffer>, List<ByteBuffer>> createCollectListBytes() {
return new Collect<>();
}

private static final class Collect<T> implements TableUdaf<T, List<T>, List<T>>, Configurable {

private int limit = Integer.MAX_VALUE;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@
import io.confluent.ksql.function.udaf.UdafDescription;
import io.confluent.ksql.function.udaf.UdafFactory;
import io.confluent.ksql.util.KsqlConstants;
import java.nio.ByteBuffer;
import java.sql.Date;
import java.sql.Time;
import java.sql.Timestamp;
import java.util.List;
import java.util.Map;
import org.apache.kafka.common.Configurable;
Expand Down Expand Up @@ -65,6 +69,26 @@ public static Udaf<Boolean, List<Boolean>, List<Boolean>> createCollectSetBool()
return new Collect<>();
}

@UdafFactory(description = "collect distinct values of a Timestamp field into a single Array")
public static Udaf<Timestamp, List<Timestamp>, List<Timestamp>> createCollectSetTimestamp() {
return new Collect<>();
}

@UdafFactory(description = "collect distinct values of a Time field into a single Array")
public static Udaf<Time, List<Time>, List<Time>> createCollectSetTime() {
return new Collect<>();
}

@UdafFactory(description = "collect distinct values of a Date field into a single Array")
public static Udaf<Date, List<Date>, List<Date>> createCollectSetDate() {
return new Collect<>();
}

@UdafFactory(description = "collect distinct values of a Bytes field into a single Array")
public static Udaf<ByteBuffer, List<ByteBuffer>, List<ByteBuffer>> createCollectSetBytes() {
return new Collect<>();
}

private static final class Collect<T> implements Udaf<T, List<T>, List<T>>, Configurable {

private int limit = Integer.MAX_VALUE;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,14 @@

import static io.confluent.ksql.function.udaf.offset.KudafByOffsetUtils.INTERMEDIATE_STRUCT_COMPARATOR;
import static io.confluent.ksql.function.udaf.offset.KudafByOffsetUtils.STRUCT_BOOLEAN;
import static io.confluent.ksql.function.udaf.offset.KudafByOffsetUtils.STRUCT_BYTES;
import static io.confluent.ksql.function.udaf.offset.KudafByOffsetUtils.STRUCT_DATE;
import static io.confluent.ksql.function.udaf.offset.KudafByOffsetUtils.STRUCT_DOUBLE;
import static io.confluent.ksql.function.udaf.offset.KudafByOffsetUtils.STRUCT_INTEGER;
import static io.confluent.ksql.function.udaf.offset.KudafByOffsetUtils.STRUCT_LONG;
import static io.confluent.ksql.function.udaf.offset.KudafByOffsetUtils.STRUCT_STRING;
import static io.confluent.ksql.function.udaf.offset.KudafByOffsetUtils.STRUCT_TIME;
import static io.confluent.ksql.function.udaf.offset.KudafByOffsetUtils.STRUCT_TIMESTAMP;
import static io.confluent.ksql.function.udaf.offset.KudafByOffsetUtils.VAL_FIELD;

import com.google.common.annotations.VisibleForTesting;
Expand All @@ -29,6 +33,10 @@
import io.confluent.ksql.function.udaf.UdafDescription;
import io.confluent.ksql.function.udaf.UdafFactory;
import io.confluent.ksql.util.KsqlConstants;
import java.nio.ByteBuffer;
import java.sql.Date;
import java.sql.Time;
import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
Expand Down Expand Up @@ -186,6 +194,116 @@ public static Udaf<String, List<Struct>, List<String>> earliestStrings(
return earliestN(STRUCT_STRING, earliestN, ignoreNulls);
}

@UdafFactory(description = "return the earliest value of a timestamp column",
aggregateSchema = "STRUCT<SEQ BIGINT, VAL TIMESTAMP>")
public static Udaf<Timestamp, Struct, Timestamp> earliestTimestamp() {
return earliestTimestamp(true);
}

@UdafFactory(description = "return the earliest value of a timestamp column",
aggregateSchema = "STRUCT<SEQ BIGINT, VAL TIMESTAMP>")
public static Udaf<Timestamp, Struct, Timestamp> earliestTimestamp(final boolean ignoreNulls) {
return earliest(STRUCT_TIMESTAMP, ignoreNulls);
}

@UdafFactory(description = "return the earliest N values of a timestamp column",
aggregateSchema = "ARRAY<STRUCT<SEQ BIGINT, VAL TIMESTAMP>>")
public static Udaf<Timestamp, List<Struct>, List<Timestamp>> earliestTimestamps(
final int earliestN) {
return earliestTimestamps(earliestN, true);
}

@UdafFactory(description = "return the earliest N values of a timestamp column",
aggregateSchema = "ARRAY<STRUCT<SEQ BIGINT, VAL TIMESTAMP>>")
public static Udaf<Timestamp, List<Struct>, List<Timestamp>> earliestTimestamps(
final int earliestN,
final boolean ignoreNulls
) {
return earliestN(STRUCT_TIMESTAMP, earliestN, ignoreNulls);
}

@UdafFactory(description = "return the earliest value of a time column",
aggregateSchema = "STRUCT<SEQ BIGINT, VAL TIME>")
public static Udaf<Time, Struct, Time> earliestTime() {
return earliestTime(true);
}

@UdafFactory(description = "return the earliest value of a time column",
aggregateSchema = "STRUCT<SEQ BIGINT, VAL TIME>")
public static Udaf<Time, Struct, Time> earliestTime(final boolean ignoreNulls) {
return earliest(STRUCT_TIME, ignoreNulls);
}

@UdafFactory(description = "return the earliest N values of a time column",
aggregateSchema = "ARRAY<STRUCT<SEQ BIGINT, VAL TIME>>")
public static Udaf<Time, List<Struct>, List<Time>> earliestTimes(final int earliestN) {
return earliestTimes(earliestN, true);
}

@UdafFactory(description = "return the earliest N values of a time column",
aggregateSchema = "ARRAY<STRUCT<SEQ BIGINT, VAL TIME>>")
public static Udaf<Time, List<Struct>, List<Time>> earliestTimes(
final int earliestN,
final boolean ignoreNulls
) {
return earliestN(STRUCT_TIME, earliestN, ignoreNulls);
}

@UdafFactory(description = "return the earliest value of a date column",
aggregateSchema = "STRUCT<SEQ BIGINT, VAL DATE>")
public static Udaf<Date, Struct, Date> earliestDate() {
return earliestDate(true);
}

@UdafFactory(description = "return the earliest value of a date column",
aggregateSchema = "STRUCT<SEQ BIGINT, VAL DATE>")
public static Udaf<Date, Struct, Date> earliestDate(final boolean ignoreNulls) {
return earliest(STRUCT_DATE, ignoreNulls);
}

@UdafFactory(description = "return the earliest N values of a date column",
aggregateSchema = "ARRAY<STRUCT<SEQ BIGINT, VAL DATE>>")
public static Udaf<Date, List<Struct>, List<Date>> earliestDates(final int earliestN) {
return earliestDates(earliestN, true);
}

@UdafFactory(description = "return the earliest N values of a date column",
aggregateSchema = "ARRAY<STRUCT<SEQ BIGINT, VAL DATE>>")
public static Udaf<Date, List<Struct>, List<Date>> earliestDates(
final int earliestN,
final boolean ignoreNulls
) {
return earliestN(STRUCT_DATE, earliestN, ignoreNulls);
}

@UdafFactory(description = "return the earliest value of a bytes column",
aggregateSchema = "STRUCT<SEQ BIGINT, VAL BYTES>")
public static Udaf<ByteBuffer, Struct, ByteBuffer> earliestBytes() {
return earliestBytes(true);
}

@UdafFactory(description = "return the earliest value of a bytes column",
aggregateSchema = "STRUCT<SEQ BIGINT, VAL BYTES>")
public static Udaf<ByteBuffer, Struct, ByteBuffer> earliestBytes(final boolean ignoreNulls) {
return earliest(STRUCT_BYTES, ignoreNulls);
}

@UdafFactory(description = "return the earliest N values of a bytes column",
aggregateSchema = "ARRAY<STRUCT<SEQ BIGINT, VAL BYTES>>")
public static Udaf<ByteBuffer, List<Struct>, List<ByteBuffer>> earliestBytes(
final int earliestN) {
return earliestBytes(earliestN, true);
}

@UdafFactory(description = "return the earliest N values of a bytes column",
aggregateSchema = "ARRAY<STRUCT<SEQ BIGINT, VAL BYTES>>")
public static Udaf<ByteBuffer, List<Struct>, List<ByteBuffer>> earliestBytes(
final int earliestN,
final boolean ignoreNulls
) {
return earliestN(STRUCT_BYTES, earliestN, ignoreNulls);
}

@VisibleForTesting
static <T> Struct createStruct(final Schema schema, final T val) {
return KudafByOffsetUtils.createStruct(schema, generateSequence(), val);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,10 @@

package io.confluent.ksql.function.udaf.offset;

import static io.confluent.ksql.serde.connect.ConnectSchemaUtil.OPTIONAL_DATE_SCHEMA;
import static io.confluent.ksql.serde.connect.ConnectSchemaUtil.OPTIONAL_TIMESTAMP_SCHEMA;
import static io.confluent.ksql.serde.connect.ConnectSchemaUtil.OPTIONAL_TIME_SCHEMA;

import java.util.Comparator;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
Expand Down Expand Up @@ -49,6 +53,26 @@ final class KudafByOffsetUtils {
.field(SEQ_FIELD, Schema.OPTIONAL_INT64_SCHEMA)
.field(VAL_FIELD, Schema.OPTIONAL_STRING_SCHEMA)
.build();

static final Schema STRUCT_TIMESTAMP = SchemaBuilder.struct().optional()
.field(SEQ_FIELD, Schema.OPTIONAL_INT64_SCHEMA)
.field(VAL_FIELD, OPTIONAL_TIMESTAMP_SCHEMA)
.build();

static final Schema STRUCT_TIME = SchemaBuilder.struct().optional()
.field(SEQ_FIELD, Schema.OPTIONAL_INT64_SCHEMA)
.field(VAL_FIELD, OPTIONAL_TIME_SCHEMA)
.build();

static final Schema STRUCT_DATE = SchemaBuilder.struct().optional()
.field(SEQ_FIELD, Schema.OPTIONAL_INT64_SCHEMA)
.field(VAL_FIELD, OPTIONAL_DATE_SCHEMA)
.build();

static final Schema STRUCT_BYTES = SchemaBuilder.struct().optional()
.field(SEQ_FIELD, Schema.OPTIONAL_INT64_SCHEMA)
.field(VAL_FIELD, Schema.OPTIONAL_BYTES_SCHEMA)
.build();

static final Comparator<Struct> INTERMEDIATE_STRUCT_COMPARATOR = (struct1, struct2) -> {
// Deal with overflow - we assume if one is positive and the other negative then the sequence
Expand Down
Loading

0 comments on commit a3ea6a4

Please sign in to comment.