Skip to content

Commit

Permalink
[LYFT] Switch to LyftFlinkKinesisConsumer
Browse files Browse the repository at this point in the history
  • Loading branch information
tweise committed Oct 17, 2018
1 parent db79195 commit 168a533
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 21 deletions.
4 changes: 2 additions & 2 deletions runners/flink/build.gradle
Expand Up @@ -85,8 +85,8 @@ dependencies {
validatesRunner project(path: ":beam-runners-core-java", configuration: "shadowTest")
validatesRunner project(path: project.path, configuration: "shadow")
//LYFT CUSTOM
shadow "org.apache.flink:flink-connector-kafka-0.11_2.11:1.5-lyft20180815"
shadow "org.apache.flink:flink-connector-kinesis_2.11:1.5-lyft20180815"
shadow "org.apache.flink:flink-connector-kafka-0.11_2.11:1.5-lyft20180920"
shadow "com.lyft:streamingplatform-kinesis:1.2.20181003"
}

class ValidatesRunnerConfig {
Expand Down
Expand Up @@ -22,6 +22,8 @@
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
import com.lyft.streamingplatform.flink.FlinkLyftKinesisConsumer;
import com.lyft.streamingplatform.flink.InitialRoundRobinKinesisShardAssigner;
import java.io.IOException;
import java.nio.charset.Charset;
import java.util.Map;
Expand All @@ -38,8 +40,7 @@
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010;
import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer;
import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchema;
import org.apache.flink.streaming.util.serialization.DeserializationSchema;
import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -153,26 +154,29 @@ private void translateKinesisInput(
Preconditions.checkNotNull(consumerProps, "'properties' need to be set");
properties.putAll(consumerProps);
} catch (IOException e) {
throw new RuntimeException("Could not parse KafkaConsumer properties.", e);
throw new RuntimeException("Could not parse Kinesis consumer properties.", e);
}

logger.info("Kinesis consumer for stream {} with properties {}", stream, properties);

DataStreamSource<WindowedValue<byte[]>> source =
context
.getExecutionEnvironment()
.addSource(
new FlinkKinesisConsumer<>(
stream, new KinesisByteArrayWindowedValueSchema(), properties));
context.addDataStream(Iterables.getOnlyElement(pTransform.getOutputsMap().values()), source);
FlinkLyftKinesisConsumer<WindowedValue<byte[]>> source =
FlinkLyftKinesisConsumer.create(
stream, new KinesisByteArrayWindowedValueSchema(), properties);
source.setShardAssigner(
InitialRoundRobinKinesisShardAssigner.fromInitialShards(
properties, stream, context.getExecutionEnvironment().getConfig().getParallelism()));
context.addDataStream(
Iterables.getOnlyElement(pTransform.getOutputsMap().values()),
context.getExecutionEnvironment().addSource(source));
}

/**
* Deserializer for native Flink Kafka source that produces {@link WindowedValue} expected by Beam
* operators.
*/
// TODO: switch to KinesisDeserializationSchema once FlinkLyftKinesisConsumer.create supports it
private static class KinesisByteArrayWindowedValueSchema
implements KinesisDeserializationSchema<WindowedValue<byte[]>> {
implements DeserializationSchema<WindowedValue<byte[]>> {
private static final long serialVersionUID = -1L;

private final TypeInformation<WindowedValue<byte[]>> ti;
Expand All @@ -189,15 +193,13 @@ public TypeInformation<WindowedValue<byte[]>> getProducedType() {
}

@Override
public WindowedValue<byte[]> deserialize(
byte[] recordValue,
String partitionKey,
String seqNum,
long approxArrivalTimestamp,
String stream,
String shardId)
throws IOException {
public WindowedValue<byte[]> deserialize(byte[] recordValue) {
return WindowedValue.valueInGlobalWindow(recordValue);
}

@Override
public boolean isEndOfStream(WindowedValue<byte[]> nextElement) {
return false;
}
}
}

0 comments on commit 168a533

Please sign in to comment.