Skip to content

Commit

Permalink
Beam SQL UDF cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
nielsbasjes committed Sep 26, 2021
1 parent 1450e08 commit 0f37571
Show file tree
Hide file tree
Showing 4 changed files with 10 additions and 7 deletions.
2 changes: 1 addition & 1 deletion src/main/docs/UDF-ApacheBeamSql.md
Expand Up @@ -30,7 +30,7 @@ to give

Phone

### Getting several values as a Map (requires Apache Beam 2.34.0 or newer)
### Getting several values as a Map (requires Apache Beam 2.30.0 or newer)
You can ask for a all fields and return the full map with all of them in there.

ParseUserAgent(userAgent) AS allFields
Expand Down
Expand Up @@ -24,6 +24,9 @@ abstract class BaseParseUserAgentUDF implements BeamSqlUdf {
private static transient UserAgentAnalyzer userAgentAnalyzer = null;

protected static UserAgentAnalyzer getInstance() {
// NOTE: We currently do NOT make an instance with only the wanted fields.
// We only know the required parameters the moment the call is done.
// At that point it is too late to create an optimized instance.
if (userAgentAnalyzer == null) {
userAgentAnalyzer = UserAgentAnalyzer
.newBuilder()
Expand Down
Expand Up @@ -43,6 +43,7 @@
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Objects;

@Category(ValidatesRunner.class)
public class TestParseUserAgentSQL implements Serializable {
Expand Down Expand Up @@ -308,9 +309,10 @@ public void testUserAgentAnalysisSQL() { // NOSONAR java:S2699 Tests should incl
.apply(Create.of(useragents))
.setCoder(ListCoder.of(StringUtf8Coder.of()))
.apply(ParDo.of(new DoFn<List<String>, Row>() {
@SuppressWarnings("unused") // Called via the annotation
@ProcessElement
public void processElement(ProcessContext c) {
c.output(listToRow(c.element(), inputSchema));
c.output(listToRow(Objects.requireNonNull(c.element()), inputSchema));
}
}))
.setCoder(RowCoder.of(inputSchema));
Expand Down Expand Up @@ -431,7 +433,7 @@ public void processElement(ProcessContext c) {
);

// Just to see the output of the query while debugging
// result.apply(ParDo.of(new RowPrinter()));
result.apply(ParDo.of(new RowPrinter()));

// Assert on the results.
PAssert.that(result)
Expand All @@ -441,7 +443,7 @@ public void processElement(ProcessContext c) {
}

public static class RowPrinter extends DoFn<Row, Void> {
@SuppressWarnings("unused") // Used via reflection
@SuppressWarnings("unused") // Called via the annotation
@ProcessElement
public void processElement(ProcessContext c) {
final Row row = c.element();
Expand Down
Expand Up @@ -32,7 +32,6 @@
import org.apache.beam.sdk.values.Row;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
Expand All @@ -47,7 +46,6 @@

import static nl.basjes.parse.useragent.UserAgent.USERAGENT_FIELDNAME;

@Ignore("This test will fail on Beam before 2.34.0 because of a needed fix.")
@Category(ValidatesRunner.class)
public class TestParseUserAgentSQLMap implements Serializable {

Expand Down Expand Up @@ -236,7 +234,7 @@ public void testUserAgentAnalysisSQL() { // NOSONAR java:S2699 Tests should incl
}

public static class RowPrinter extends DoFn<Row, Void> {
@SuppressWarnings("unused") // Used via reflection
@SuppressWarnings("unused") // Called via the annotation
@ProcessElement
public void processElement(ProcessContext c) {
final Row row = c.element();
Expand Down

0 comments on commit 0f37571

Please sign in to comment.