forked from apache/flink
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
[FLINK-4582] [kinesis] Consumer for DynamoDB streams via Kinesis API (a…
…pache#6968) Introduces a new Flink source to consume from DynamoDB streams. This new source is built on top of the existing Kinesis connector. It interacts with the DynamoDB streams via a dynamodb-streams-kinesis-adapter client.
- Loading branch information
1 parent
f4e2d57
commit 83dce8b
Showing
12 changed files
with
725 additions
and
39 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
84 changes: 84 additions & 0 deletions
84
...main/java/org/apache/flink/streaming/connectors/kinesis/FlinkDynamoDBStreamsConsumer.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,84 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one or more | ||
* contributor license agreements. See the NOTICE file distributed with | ||
* this work for additional information regarding copyright ownership. | ||
* The ASF licenses this file to You under the Apache License, Version 2.0 | ||
* (the "License"); you may not use this file except in compliance with | ||
* the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package org.apache.flink.streaming.connectors.kinesis; | ||
|
||
import org.apache.flink.api.common.functions.RuntimeContext; | ||
import org.apache.flink.api.common.serialization.DeserializationSchema; | ||
import org.apache.flink.streaming.api.functions.source.SourceFunction; | ||
import org.apache.flink.streaming.connectors.kinesis.internals.DynamoDBStreamsDataFetcher; | ||
import org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher; | ||
import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchema; | ||
|
||
import org.slf4j.Logger; | ||
import org.slf4j.LoggerFactory; | ||
|
||
import java.util.List; | ||
import java.util.Properties; | ||
|
||
/** | ||
* Consume events from DynamoDB streams. | ||
* | ||
* @param <T> the type of data emitted | ||
*/ | ||
public class FlinkDynamoDBStreamsConsumer<T> extends FlinkKinesisConsumer<T> { | ||
private static final Logger LOG = LoggerFactory.getLogger(FlinkDynamoDBStreamsConsumer.class); | ||
|
||
/** | ||
* Constructor of FlinkDynamoDBStreamsConsumer. | ||
* | ||
* @param stream stream to consume | ||
* @param deserializer deserialization schema | ||
* @param config config properties | ||
*/ | ||
public FlinkDynamoDBStreamsConsumer( | ||
String stream, | ||
DeserializationSchema<T> deserializer, | ||
Properties config) { | ||
super(stream, deserializer, config); | ||
} | ||
|
||
/** | ||
* Constructor of FlinkDynamodbStreamConsumer. | ||
* | ||
* @param streams list of streams to consume | ||
* @param deserializer deserialization schema | ||
* @param config config properties | ||
*/ | ||
public FlinkDynamoDBStreamsConsumer( | ||
List<String> streams, | ||
KinesisDeserializationSchema deserializer, | ||
Properties config) { | ||
super(streams, deserializer, config); | ||
} | ||
|
||
@Override | ||
protected KinesisDataFetcher<T> createFetcher( | ||
List<String> streams, | ||
SourceFunction.SourceContext<T> sourceContext, | ||
RuntimeContext runtimeContext, | ||
Properties configProps, | ||
KinesisDeserializationSchema<T> deserializationSchema) { | ||
return new DynamoDBStreamsDataFetcher<T>( | ||
streams, | ||
sourceContext, | ||
runtimeContext, | ||
configProps, | ||
deserializationSchema, | ||
getShardAssigner()); | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
109 changes: 109 additions & 0 deletions
109
...a/org/apache/flink/streaming/connectors/kinesis/internals/DynamoDBStreamsDataFetcher.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,109 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one or more | ||
* contributor license agreements. See the NOTICE file distributed with | ||
* this work for additional information regarding copyright ownership. | ||
* The ASF licenses this file to You under the Apache License, Version 2.0 | ||
* (the "License"); you may not use this file except in compliance with | ||
* the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package org.apache.flink.streaming.connectors.kinesis.internals; | ||
|
||
import org.apache.flink.api.common.functions.RuntimeContext; | ||
import org.apache.flink.streaming.api.functions.source.SourceFunction; | ||
import org.apache.flink.streaming.connectors.kinesis.KinesisShardAssigner; | ||
import org.apache.flink.streaming.connectors.kinesis.metrics.ShardMetricsReporter; | ||
import org.apache.flink.streaming.connectors.kinesis.model.DynamoDBStreamsShardHandle; | ||
import org.apache.flink.streaming.connectors.kinesis.model.SequenceNumber; | ||
import org.apache.flink.streaming.connectors.kinesis.model.StreamShardHandle; | ||
import org.apache.flink.streaming.connectors.kinesis.proxy.DynamoDBStreamsProxy; | ||
import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchema; | ||
|
||
import java.util.ArrayList; | ||
import java.util.List; | ||
import java.util.Properties; | ||
import java.util.concurrent.atomic.AtomicReference; | ||
|
||
/** | ||
* Dynamodb streams data fetcher. | ||
* @param <T> type of fetched data. | ||
*/ | ||
public class DynamoDBStreamsDataFetcher<T> extends KinesisDataFetcher<T> { | ||
private boolean shardIdFormatCheck = false; | ||
|
||
/** | ||
* Constructor. | ||
* | ||
* @param streams list of streams to fetch data | ||
* @param sourceContext source context | ||
* @param runtimeContext runtime context | ||
* @param configProps config properties | ||
* @param deserializationSchema deserialization schema | ||
* @param shardAssigner shard assigner | ||
*/ | ||
public DynamoDBStreamsDataFetcher(List<String> streams, | ||
SourceFunction.SourceContext<T> sourceContext, | ||
RuntimeContext runtimeContext, | ||
Properties configProps, | ||
KinesisDeserializationSchema<T> deserializationSchema, | ||
KinesisShardAssigner shardAssigner) { | ||
|
||
super(streams, | ||
sourceContext, | ||
sourceContext.getCheckpointLock(), | ||
runtimeContext, | ||
configProps, | ||
deserializationSchema, | ||
shardAssigner, | ||
null, | ||
new AtomicReference<>(), | ||
new ArrayList<>(), | ||
createInitialSubscribedStreamsToLastDiscoveredShardsState(streams), | ||
// use DynamoDBStreamsProxy | ||
DynamoDBStreamsProxy::create); | ||
} | ||
|
||
@Override | ||
protected boolean shouldAdvanceLastDiscoveredShardId(String shardId, String lastSeenShardIdOfStream) { | ||
if (DynamoDBStreamsShardHandle.compareShardIds(shardId, lastSeenShardIdOfStream) <= 0) { | ||
// shardID update is valid only if the given shard id is greater | ||
// than the previous last seen shard id of the stream. | ||
return false; | ||
} | ||
|
||
return true; | ||
} | ||
|
||
/** | ||
* Create a new DynamoDB streams shard consumer. | ||
* | ||
* @param subscribedShardStateIndex the state index of the shard this consumer is subscribed to | ||
* @param handle stream handle | ||
* @param lastSeqNum last sequence number | ||
* @param shardMetricsReporter the reporter to report metrics to | ||
* @return | ||
*/ | ||
@Override | ||
protected ShardConsumer createShardConsumer( | ||
Integer subscribedShardStateIndex, | ||
StreamShardHandle handle, | ||
SequenceNumber lastSeqNum, | ||
ShardMetricsReporter shardMetricsReporter) { | ||
|
||
return new ShardConsumer( | ||
this, | ||
subscribedShardStateIndex, | ||
handle, | ||
lastSeqNum, | ||
DynamoDBStreamsProxy.create(getConsumerConfiguration()), | ||
shardMetricsReporter); | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
62 changes: 62 additions & 0 deletions
62
.../java/org/apache/flink/streaming/connectors/kinesis/model/DynamoDBStreamsShardHandle.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,62 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one or more | ||
* contributor license agreements. See the NOTICE file distributed with | ||
* this work for additional information regarding copyright ownership. | ||
* The ASF licenses this file to You under the Apache License, Version 2.0 | ||
* (the "License"); you may not use this file except in compliance with | ||
* the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package org.apache.flink.streaming.connectors.kinesis.model; | ||
|
||
import com.amazonaws.services.kinesis.model.Shard; | ||
|
||
/** | ||
* DynamoDB streams shard handle format and utilities. | ||
*/ | ||
public class DynamoDBStreamsShardHandle extends StreamShardHandle{ | ||
public static final String SHARDID_PREFIX = "shardId-"; | ||
public static final int SHARDID_PREFIX_LEN = SHARDID_PREFIX.length(); | ||
|
||
public DynamoDBStreamsShardHandle(String streamName, Shard shard) { | ||
super(streamName, shard); | ||
} | ||
|
||
public static int compareShardIds(String firstShardId, String secondShardId) { | ||
if (!isValidShardId(firstShardId)) { | ||
throw new IllegalArgumentException( | ||
String.format("The first shard id %s has invalid format.", firstShardId)); | ||
} else if (!isValidShardId(secondShardId)) { | ||
throw new IllegalArgumentException( | ||
String.format("The second shard id %s has invalid format.", secondShardId)); | ||
} | ||
|
||
return firstShardId.substring(SHARDID_PREFIX_LEN).compareTo( | ||
secondShardId.substring(SHARDID_PREFIX_LEN)); | ||
} | ||
|
||
/** | ||
* <p> | ||
* Dynamodb streams shard ID is a char string ranging from 28 characters to 65 characters. | ||
* (See https://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_streams_Shard.html) | ||
* | ||
* The shardId observed usually takes the format of: "shardId-00000001536805703746-69688cb1", | ||
* where "shardId-" is a prefix, followed by a 20-digit timestamp string and 0-36 or more | ||
* characters, separated by '-'. Following this format, it is expected the child shards created | ||
* during a re-sharding event have shardIds bigger than their parents. | ||
* </p> | ||
* @param shardId shard Id | ||
* @return boolean indicate if the given shard Id is valid | ||
*/ | ||
public static boolean isValidShardId(String shardId) { | ||
return shardId == null ? false : shardId.matches("^shardId-\\d{20}-{0,1}\\w{0,36}"); | ||
} | ||
} |
Oops, something went wrong.