forked from apache/flink
-
Notifications
You must be signed in to change notification settings - Fork 4
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
[FLINK-9311] [pubsub] Add PubSubSource and PubSubSink connectors
This closes apache#6594
- Loading branch information
Showing
16 changed files
with
1,562 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,119 @@ | ||
<?xml version="1.0" encoding="UTF-8"?> | ||
<!-- | ||
Licensed to the Apache Software Foundation (ASF) under one | ||
or more contributor license agreements. See the NOTICE file | ||
distributed with this work for additional information | ||
regarding copyright ownership. The ASF licenses this file | ||
to you under the Apache License, Version 2.0 (the | ||
"License"); you may not use this file except in compliance | ||
with the License. You may obtain a copy of the License at | ||
http://www.apache.org/licenses/LICENSE-2.0 | ||
Unless required by applicable law or agreed to in writing, | ||
software distributed under the License is distributed on an | ||
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
KIND, either express or implied. See the License for the | ||
specific language governing permissions and limitations | ||
under the License. | ||
--> | ||
<project xmlns="http://maven.apache.org/POM/4.0.0" | ||
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" | ||
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> | ||
|
||
<modelVersion>4.0.0</modelVersion> | ||
|
||
<parent> | ||
<groupId>org.apache.flink</groupId> | ||
<artifactId>flink-connectors</artifactId> | ||
<version>1.7-SNAPSHOT</version> | ||
<relativePath>..</relativePath> | ||
</parent> | ||
|
||
<artifactId>flink-connector-pubsub_${scala.binary.version}</artifactId> | ||
<name>flink-connector-pubsub</name> | ||
|
||
<packaging>jar</packaging> | ||
|
||
<properties> | ||
<pubsub.version>1.37.1</pubsub.version> | ||
</properties> | ||
|
||
<dependencies> | ||
|
||
<dependency> | ||
<groupId>org.apache.flink</groupId> | ||
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId> | ||
<version>${project.version}</version> | ||
<scope>provided</scope> | ||
</dependency> | ||
|
||
<dependency> | ||
<groupId>com.google.cloud</groupId> | ||
<artifactId>google-cloud-pubsub</artifactId> | ||
<version>${pubsub.version}</version> | ||
</dependency> | ||
|
||
<dependency> | ||
<groupId>org.apache.flink</groupId> | ||
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId> | ||
<version>${project.version}</version> | ||
<scope>test</scope> | ||
<type>test-jar</type> | ||
</dependency> | ||
|
||
<dependency> | ||
<groupId>org.apache.flink</groupId> | ||
<artifactId>flink-runtime_${scala.binary.version}</artifactId> | ||
<version>${project.version}</version> | ||
<type>test-jar</type> | ||
<scope>test</scope> | ||
</dependency> | ||
|
||
</dependencies> | ||
|
||
<build> | ||
<plugins> | ||
<plugin> | ||
<groupId>org.apache.maven.plugins</groupId> | ||
<artifactId>maven-shade-plugin</artifactId> | ||
<executions> | ||
<execution> | ||
<id>shade-flink</id> | ||
<phase>package</phase> | ||
<goals> | ||
<goal>shade</goal> | ||
</goals> | ||
<configuration> | ||
<shadeTestJar>false</shadeTestJar> | ||
<artifactSet> | ||
<includes> | ||
<include>*:*</include> | ||
</includes> | ||
</artifactSet> | ||
<relocations> | ||
<relocation> | ||
<pattern>com.google.guava</pattern> | ||
<shadedPattern>org.apache.flink.streaming.connectors.pubsub.shaded.com.google.guava</shadedPattern> | ||
</relocation> | ||
<relocation> | ||
<pattern>com.google.common.base</pattern> | ||
<shadedPattern>org.apache.flink.streaming.connectors.pubsub.shaded.com.google.common.base</shadedPattern> | ||
</relocation> | ||
<relocation> | ||
<pattern>io.grpc.auth</pattern> | ||
<shadedPattern>org.apache.flink.streaming.connectors.pubsub.shaded.io.grpc.auth</shadedPattern> | ||
</relocation> | ||
<relocation> | ||
<pattern>io.grpc.protobuf</pattern> | ||
<shadedPattern>org.apache.flink.streaming.connectors.pubsub.shaded.io.grpc.protobuf</shadedPattern> | ||
</relocation> | ||
</relocations> | ||
</configuration> | ||
</execution> | ||
</executions> | ||
</plugin> | ||
</plugins> | ||
</build> | ||
|
||
</project> |
99 changes: 99 additions & 0 deletions
99
...nk-connector-pubsub/src/main/java/org/apache/flink/streaming/connectors/pubsub/Bound.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,99 @@ | ||
package org.apache.flink.streaming.connectors.pubsub; | ||
|
||
import org.apache.flink.streaming.api.functions.source.SourceFunction; | ||
|
||
import org.slf4j.Logger; | ||
import org.slf4j.LoggerFactory; | ||
|
||
import java.io.Serializable; | ||
import java.util.Timer; | ||
import java.util.TimerTask; | ||
|
||
class Bound<OUT> implements Serializable { | ||
private static final Logger LOG = LoggerFactory.getLogger(Bound.class); | ||
|
||
private final Bound.Mode mode; | ||
private final long maxMessagedReceived; | ||
private final long maxTimeBetweenMessages; | ||
|
||
private SourceFunction<OUT> sourceFunction; | ||
private transient Timer timer; | ||
private long messagesReceived; | ||
private long lastReceivedMessage; | ||
private boolean cancelled = false; | ||
|
||
private Bound(Bound.Mode mode, long maxMessagedReceived, long maxTimeBetweenMessages) { | ||
this.mode = mode; | ||
this.maxMessagedReceived = maxMessagedReceived; | ||
this.maxTimeBetweenMessages = maxTimeBetweenMessages; | ||
this.messagesReceived = 0L; | ||
} | ||
|
||
static <OUT> Bound<OUT> boundByAmountOfMessages(long maxMessagedReceived) { | ||
return new Bound<>(Mode.COUNTER, maxMessagedReceived, 0L); | ||
} | ||
|
||
static <OUT> Bound<OUT> boundByTimeSinceLastMessage(long maxTimeBetweenMessages) { | ||
return new Bound<>(Mode.TIMER, 0L, maxTimeBetweenMessages); | ||
} | ||
|
||
static <OUT> Bound<OUT> boundByAmountOfMessagesOrTimeSinceLastMessage(long maxMessagedReceived, long maxTimeBetweenMessages) { | ||
return new Bound<>(Mode.COUNTER_OR_TIMER, maxMessagedReceived, maxTimeBetweenMessages); | ||
} | ||
|
||
private TimerTask shutdownPubSubSource() { | ||
return new TimerTask() { | ||
@Override | ||
public void run() { | ||
if (maxTimeBetweenMessagesElapsed()) { | ||
cancelPubSubSource("BoundedSourceFunction: Idle timeout --> canceling source"); | ||
timer.cancel(); | ||
} | ||
} | ||
}; | ||
} | ||
|
||
private synchronized boolean maxTimeBetweenMessagesElapsed() { | ||
return System.currentTimeMillis() - lastReceivedMessage > maxTimeBetweenMessages; | ||
} | ||
|
||
private synchronized void cancelPubSubSource(String logMessage) { | ||
if (!cancelled) { | ||
cancelled = true; | ||
sourceFunction.cancel(); | ||
LOG.info(logMessage); | ||
} | ||
} | ||
|
||
void start(SourceFunction<OUT> sourceFunction) { | ||
if (this.sourceFunction != null) { | ||
throw new IllegalStateException("start() already called"); | ||
} | ||
|
||
this.sourceFunction = sourceFunction; | ||
messagesReceived = 0; | ||
|
||
if (mode == Mode.TIMER || mode == Mode.COUNTER_OR_TIMER) { | ||
lastReceivedMessage = System.currentTimeMillis(); | ||
timer = new Timer(); | ||
timer.schedule(shutdownPubSubSource(), 0, 100); | ||
} | ||
} | ||
|
||
synchronized void receivedMessage() { | ||
if (sourceFunction == null) { | ||
throw new IllegalStateException("start() not called"); | ||
} | ||
|
||
lastReceivedMessage = System.currentTimeMillis(); | ||
messagesReceived++; | ||
|
||
if ((mode == Mode.COUNTER || mode == Mode.COUNTER_OR_TIMER) && messagesReceived >= maxMessagedReceived) { | ||
cancelPubSubSource("BoundedSourceFunction: Max received messages --> canceling source"); | ||
} | ||
} | ||
|
||
private enum Mode { | ||
COUNTER, TIMER, COUNTER_OR_TIMER | ||
} | ||
} |
79 changes: 79 additions & 0 deletions
79
...ubsub/src/main/java/org/apache/flink/streaming/connectors/pubsub/BoundedPubSubSource.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,79 @@ | ||
package org.apache.flink.streaming.connectors.pubsub; | ||
|
||
import com.google.cloud.pubsub.v1.AckReplyConsumer; | ||
import com.google.pubsub.v1.PubsubMessage; | ||
|
||
import java.io.IOException; | ||
|
||
class BoundedPubSubSource<OUT> extends PubSubSource<OUT> { | ||
private Bound<OUT> bound; | ||
|
||
private BoundedPubSubSource() { | ||
super(); | ||
} | ||
|
||
protected void setBound(Bound<OUT> bound) { | ||
this.bound = bound; | ||
} | ||
|
||
@Override | ||
public void run(SourceContext<OUT> sourceContext) { | ||
bound.start(this); | ||
super.run(sourceContext); | ||
} | ||
|
||
@Override | ||
public void receiveMessage(PubsubMessage message, AckReplyConsumer consumer) { | ||
super.receiveMessage(message, consumer); | ||
bound.receivedMessage(); | ||
} | ||
|
||
@SuppressWarnings("unchecked") | ||
public static <OUT> BoundedPubSubSourceBuilder<OUT, ? extends PubSubSource, ? extends BoundedPubSubSourceBuilder> newBuilder() { | ||
return new BoundedPubSubSourceBuilder<>(new BoundedPubSubSource<OUT>()); | ||
} | ||
|
||
@SuppressWarnings("unchecked") | ||
public static class BoundedPubSubSourceBuilder<OUT, PSS extends BoundedPubSubSource<OUT>, BUILDER extends BoundedPubSubSourceBuilder<OUT, PSS, BUILDER>> extends PubSubSourceBuilder<OUT, PSS, BUILDER> { | ||
private Long boundedByAmountOfMessages; | ||
private Long boundedByTimeSinceLastMessage; | ||
|
||
BoundedPubSubSourceBuilder(PSS sourceUnderConstruction) { | ||
super(sourceUnderConstruction); | ||
} | ||
|
||
public BUILDER boundedByAmountOfMessages(long maxAmountOfMessages) { | ||
boundedByAmountOfMessages = maxAmountOfMessages; | ||
return (BUILDER) this; | ||
} | ||
|
||
public BUILDER boundedByTimeSinceLastMessage(long timeSinceLastMessage) { | ||
boundedByTimeSinceLastMessage = timeSinceLastMessage; | ||
return (BUILDER) this; | ||
} | ||
|
||
private Bound <OUT> createBound() { | ||
if (boundedByAmountOfMessages != null && boundedByTimeSinceLastMessage != null) { | ||
return Bound.boundByAmountOfMessagesOrTimeSinceLastMessage(boundedByAmountOfMessages, boundedByTimeSinceLastMessage); | ||
} | ||
|
||
if (boundedByAmountOfMessages != null) { | ||
return Bound.boundByAmountOfMessages(boundedByAmountOfMessages); | ||
} | ||
|
||
if (boundedByTimeSinceLastMessage != null) { | ||
return Bound.boundByTimeSinceLastMessage(boundedByTimeSinceLastMessage); | ||
} | ||
|
||
// This is functionally speaking no bound. | ||
return Bound.boundByAmountOfMessages(Long.MAX_VALUE); | ||
} | ||
|
||
@Override | ||
public PSS build() throws IOException { | ||
sourceUnderConstruction.setBound(createBound()); | ||
return super.build(); | ||
} | ||
} | ||
|
||
} |
Oops, something went wrong.