Skip to content

Commit

Permalink
Add --pubsubIdAttribute beam option for dedup when reading pubsub
Browse files Browse the repository at this point in the history
  • Loading branch information
relud committed Jan 21, 2021
1 parent 14513c5 commit 5fef5bf
Show file tree
Hide file tree
Showing 4 changed files with 28 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import javax.annotation.Nullable;
import org.apache.avro.generic.GenericRecord;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
Expand All @@ -35,15 +36,18 @@ public abstract class Read extends PTransform<PBegin, PCollection<PubsubMessage>
public static class PubsubInput extends Read {

private final ValueProvider<String> subscription;
private final String idAttribute;

public PubsubInput(ValueProvider<String> subscription) {
public PubsubInput(ValueProvider<String> subscription, @Nullable String idAttribute) {
this.subscription = subscription;
this.idAttribute = idAttribute;
}

@Override
public PCollection<PubsubMessage> expand(PBegin input) {
return input //
.apply(PubsubIO.readMessagesWithAttributesAndMessageId().fromSubscription(subscription))
.apply(PubsubIO.readMessagesWithAttributesAndMessageId().withIdAttribute(idAttribute)
.fromSubscription(subscription))
.apply(MapElements.into(TypeDescriptor.of(PubsubMessage.class)).via(message -> {
Map<String, String> attributesWithMessageId = new HashMap<>(message.getAttributeMap());
attributesWithMessageId.put(Attribute.MESSAGE_ID, message.getMessageId());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ public enum InputType {

/** Return a PTransform that reads from a Pubsub subscription. */
public Read read(SinkOptions.Parsed options) {
return new PubsubInput(options.getInput());
return new PubsubInput(options.getInput(), options.getPubsubIdAttribute());
}
},

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,12 @@ public interface SinkOptions extends PipelineOptions {

void setWindowDuration(String value);

@Description("Attribute for built-in deduplication when reading from Pub/Sub."
+ " Must be an attribute set before the message was last published to Pub/Sub.")
String getPubsubIdAttribute();

void setPubsubIdAttribute(String value);

/*
* Note: Dataflow templates accept ValueProvider options at runtime, and other options at creation
* time. When running without templates specify all options at once.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package com.mozilla.telemetry.integration;

import static com.mozilla.telemetry.matchers.Lines.matchesInAnyOrder;
import static org.junit.Assert.assertThat;
import static org.hamcrest.MatcherAssert.assertThat;

import com.google.cloud.ServiceOptions;
import com.google.cloud.pubsub.v1.AckReplyConsumer;
Expand Down Expand Up @@ -104,13 +104,27 @@ public void deletePubsubResources() throws IOException {

@Test(timeout = 30000)
public void canReadPubsubInput() throws Exception {
readPubsubInput(false);
}

@Test(timeout = 30000)
public void canDedupPubsubInput() throws Exception {
readPubsubInput(true);
}

private void readPubsubInput(boolean provideIdAttribute) throws Exception {
List<String> inputLines = Lines.resources("testdata/basic-messages-nonempty.ndjson");
publishLines(inputLines);

pipeline.getOptions().as(DirectOptions.class).setBlockOnRun(false);

SinkOptions.Parsed sinkOptions = pipeline.getOptions().as(SinkOptions.Parsed.class);
sinkOptions.setInput(pipeline.newProvider(subscriptionName.toString()));
if (provideIdAttribute) {
sinkOptions.setPubsubIdAttribute("host");
// publish inputLines again so that there are duplicates to remove
publishLines(inputLines);
}

PCollection<String> output = pipeline.apply(InputType.pubsub.read(sinkOptions))
.apply("encodeJson", OutputFileFormat.json.encode());
Expand Down

0 comments on commit 5fef5bf

Please sign in to comment.