Skip to content

Commit

Permalink
Implement FlinkPravegaInputFormat for batch support
Browse files Browse the repository at this point in the history
  • Loading branch information
tzulitai committed Sep 26, 2017
1 parent 2fe0e92 commit 12a2fe3
Show file tree
Hide file tree
Showing 7 changed files with 413 additions and 73 deletions.
14 changes: 7 additions & 7 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -61,19 +61,19 @@ dependencies {
compile group: 'io.pravega', name: 'pravega-shared-protocol', version: pravegaVersion
compile group: 'io.pravega', name: 'pravega-shared-controller-api', version: pravegaVersion
compile group: 'org.slf4j', name: 'slf4j-api', version: slf4jApiVersion
compileOnly group: 'org.apache.flink', name: 'flink-streaming-java_2.11', version: flinkVersion
compileOnly group: 'org.apache.flink', name: 'flink-streaming-java_2.10', version: flinkVersion
testCompile group: 'junit', name: 'junit', version: junitVersion
testCompile group: 'org.mockito', name: 'mockito-all', version: mockitoVersion
testCompile group: 'org.apache.flink', name: 'flink-tests_2.11', version: flinkVersion
testCompile group: 'org.apache.flink', name: 'flink-test-utils_2.11', version: flinkVersion
testCompile group: 'org.apache.flink', name: 'flink-streaming-contrib_2.11', version: flinkVersion
testCompile group: 'org.apache.flink', name: 'flink-streaming-java_2.11', classifier: 'tests', version: flinkVersion
testCompile group: 'org.apache.flink', name: 'flink-runtime_2.11', classifier: 'tests', version: flinkVersion
testCompile group: 'org.apache.flink', name: 'flink-tests_2.10', version: flinkVersion
testCompile group: 'org.apache.flink', name: 'flink-test-utils_2.10', version: flinkVersion
testCompile group: 'org.apache.flink', name: 'flink-streaming-contrib_2.10', version: flinkVersion
testCompile group: 'org.apache.flink', name: 'flink-streaming-java_2.10', classifier: 'tests', version: flinkVersion
testCompile group: 'org.apache.flink', name: 'flink-runtime_2.10', classifier: 'tests', version: flinkVersion
testCompile group: 'io.pravega', name: 'pravega-standalone', version: pravegaVersion

// configuring the shaded pom dependencies
shadow group: 'org.slf4j', name: 'slf4j-api', version: slf4jApiVersion
shadow group: 'org.apache.flink', name: 'flink-streaming-java_2.11', version: flinkVersion
shadow group: 'org.apache.flink', name: 'flink-streaming-java_2.10', version: flinkVersion
}

shadowJar {
Expand Down
179 changes: 179 additions & 0 deletions src/main/java/io/pravega/connectors/flink/FlinkPravegaInputFormat.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,179 @@
/**
* Copyright (c) 2017 Dell Inc., or its subsidiaries. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*/

package io.pravega.connectors.flink;

import com.google.common.base.Preconditions;

import io.pravega.client.admin.ReaderGroupManager;
import io.pravega.client.stream.EventRead;
import io.pravega.client.stream.EventStreamReader;
import io.pravega.client.stream.ReaderConfig;
import io.pravega.client.stream.ReaderGroupConfig;

import org.apache.flink.api.common.io.GenericInputFormat;
import org.apache.flink.api.common.io.InputFormat;
import org.apache.flink.core.io.GenericInputSplit;
import org.apache.flink.streaming.util.serialization.DeserializationSchema;

import java.io.IOException;
import java.net.URI;
import java.util.Set;

import static io.pravega.connectors.flink.util.FlinkPravegaUtils.createPravegaReader;
import static io.pravega.connectors.flink.util.FlinkPravegaUtils.generateRandomReaderGroupName;

/**
* A Flink {@link InputFormat} that can be added as a source to read from Pravega in a Flink batch job.
*/
public class FlinkPravegaInputFormat<T> extends GenericInputFormat<T> {

private static final long serialVersionUID = 1L;

private static final long DEFAULT_EVENT_READ_TIMEOUT = 1000;

// The supplied event deserializer.
private final DeserializationSchema<T> deserializationSchema;

// The pravega controller endpoint.
private final URI controllerURI;

// The scope name of the destination stream.
private final String scopeName;

// The readergroup name to coordinate the parallel readers. This should be unique for a Flink job.
private final String readerGroupName;

// The names of Pravega streams to read
private final Set<String> streamNames;

// The configured start time for the reader
private final long startTime;

// The event read timeout
private long eventReadTimeout = DEFAULT_EVENT_READ_TIMEOUT;

// The Pravega reader; a new reader will be opened for each input split
private transient EventStreamReader<T> pravegaReader;

// Read-ahead event; null indicates that end of input is reached
private transient EventRead<T> lastReadAheadEvent;

/**
* Creates a new Flink Pravega {@link InputFormat} which can be added as a source to a Flink batch job.
*
* <p>The number of created input splits is equivalent to the parallelism of the source. For each input split,
* a Pravega reader will be created to read from the specified Pravega streams. Each input split is closed when
* the next read event returns {@code null} on {@link EventRead#getEvent()}.
*
* @param controllerURI The pravega controller endpoint address.
* @param scope The destination stream's scope name.
* @param streamNames The list of stream names to read events from.
* @param startTime The start time from when to read events from.
* Use 0 to read all stream events from the beginning.
* @param deserializationSchema The implementation to deserialize events from pravega streams.
*/
public FlinkPravegaInputFormat(
final URI controllerURI,
final String scope,
final Set<String> streamNames,
final long startTime,
final DeserializationSchema<T> deserializationSchema) {

Preconditions.checkNotNull(controllerURI, "controllerURI");
Preconditions.checkNotNull(scope, "scope");
Preconditions.checkNotNull(streamNames, "streamNames");
Preconditions.checkArgument(startTime >= 0, "start time must be >= 0");
Preconditions.checkNotNull(deserializationSchema, "deserializationSchema");

this.controllerURI = controllerURI;
this.scopeName = scope;
this.deserializationSchema = deserializationSchema;
this.streamNames = streamNames;
this.startTime = startTime;
this.readerGroupName = generateRandomReaderGroupName();

// TODO: This will require the client to have access to the pravega controller and handle any temporary errors.
ReaderGroupManager.withScope(scopeName, controllerURI)
.createReaderGroup(this.readerGroupName, ReaderGroupConfig.builder().startingTime(startTime).build(),
streamNames);
}

// ------------------------------------------------------------------------
// User configurations
// ------------------------------------------------------------------------

/**
* Gets the timeout for the call to read events from Pravega. After the timeout
* expires (without an event being returned), another call will be made.
*
* <p>This timeout is passed to {@link EventStreamReader#readNextEvent(long)}.
*
* @param eventReadTimeout The timeout, in milliseconds
*/
public void setEventReadTimeout(long eventReadTimeout) {
Preconditions.checkArgument(eventReadTimeout > 0, "timeout must be >= 0");
this.eventReadTimeout = eventReadTimeout;
}

/**
* Gets the timeout for the call to read events from Pravega.
*
* <p>This timeout is the value passed to {@link EventStreamReader#readNextEvent(long)}.
*
* @return The timeout, in milliseconds
*/
public long getEventReadTimeout() {
return eventReadTimeout;
}

// ------------------------------------------------------------------------
// Input split life cycle methods
// ------------------------------------------------------------------------

@Override
public void open(GenericInputSplit split) throws IOException {
super.open(split);

// build a new reader for each input split
this.pravegaReader = createPravegaReader(
this.scopeName,
this.controllerURI,
getRuntimeContext().getTaskNameWithSubtasks(),
this.readerGroupName,
this.deserializationSchema,
ReaderConfig.builder().build());
}

@Override
public boolean reachedEnd() throws IOException {
// look ahead to see if we have reached the end of input
try {
this.lastReadAheadEvent = pravegaReader.readNextEvent(eventReadTimeout);
} catch (Exception e) {
throw new IOException("Failed to read next event.", e);
}

// TODO this "end of input" marker is too brittle, as the timeout could easily be a temporary hiccup;
// TODO to make this more robust, we could loop and try to fetch a few more times before concluding end of input
return lastReadAheadEvent.getEvent() == null;
}

@Override
public T nextRecord(T t) throws IOException {
// reachedEnd() will be checked first, so lastReadAheadEvent should never be null
return lastReadAheadEvent.getEvent();
}

@Override
public void close() throws IOException {
this.pravegaReader.close();
}
}
77 changes: 12 additions & 65 deletions src/main/java/io/pravega/connectors/flink/FlinkPravegaReader.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,24 +9,17 @@
*/
package io.pravega.connectors.flink;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;

import io.pravega.client.ClientFactory;
import io.pravega.client.admin.ReaderGroupManager;
import io.pravega.client.stream.Checkpoint;
import io.pravega.client.stream.EventRead;
import io.pravega.client.stream.EventStreamReader;
import io.pravega.client.stream.ReaderConfig;
import io.pravega.client.stream.ReaderGroupConfig;
import io.pravega.client.stream.Serializer;
import io.pravega.connectors.flink.serialization.WrappingSerializer;

import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;

import org.apache.commons.lang3.RandomStringUtils;

import org.apache.flink.api.common.functions.StoppableFunction;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
Expand All @@ -38,10 +31,12 @@
import org.apache.flink.util.FlinkException;

import java.net.URI;
import java.nio.ByteBuffer;
import java.util.Set;
import java.util.concurrent.ScheduledExecutorService;
import java.util.stream.Collectors;

import static io.pravega.connectors.flink.util.FlinkPravegaUtils.createPravegaReader;
import static io.pravega.connectors.flink.util.FlinkPravegaUtils.generateRandomReaderGroupName;
import static io.pravega.connectors.flink.util.FlinkPravegaUtils.getDefaultReaderName;

/**
* Flink source implementation for reading from pravega storage.
Expand Down Expand Up @@ -153,7 +148,7 @@ public FlinkPravegaReader(final URI controllerURI, final String scope, final Set
this.controllerURI = controllerURI;
this.scopeName = scope;
this.deserializationSchema = deserializationSchema;
this.readerGroupName = "flink" + RandomStringUtils.randomAlphanumeric(20).toLowerCase();
this.readerGroupName = generateRandomReaderGroupName();

// TODO: This will require the client to have access to the pravega controller and handle any temporary errors.
// See https://github.com/pravega/pravega/issues/553.
Expand Down Expand Up @@ -229,15 +224,13 @@ public void run(SourceContext<T> ctx) throws Exception {
log.info("{} : Creating Pravega reader with ID '{}' for controller URI: {}",
getRuntimeContext().getTaskNameWithSubtasks(), readerId, this.controllerURI);

// create the adapter between Pravega's serializers and Flink's serializers
@SuppressWarnings("unchecked")
final Serializer<T> deserializer = this.deserializationSchema instanceof WrappingSerializer ?
((WrappingSerializer<T>) this.deserializationSchema).getWrappedSerializer() :
new FlinkDeserializer<>(this.deserializationSchema);

// build the reader
try (EventStreamReader<T> pravegaReader = ClientFactory.withScope(this.scopeName, this.controllerURI)
.createReader(readerId, this.readerGroupName, deserializer, ReaderConfig.builder().build())) {
try (EventStreamReader<T> pravegaReader = createPravegaReader(
this.scopeName,
this.controllerURI,
readerId,
this.readerGroupName,
this.deserializationSchema,
ReaderConfig.builder().build())) {

log.info("Starting Pravega reader '{}' for controller URI {}", readerId, this.controllerURI);

Expand Down Expand Up @@ -323,50 +316,4 @@ private void triggerCheckpoint(String checkpointIdentifier) throws FlinkExceptio

checkpointTrigger.triggerCheckpoint(checkpointId);
}

// ------------------------------------------------------------------------
// serializer
// ------------------------------------------------------------------------

@VisibleForTesting
static final class FlinkDeserializer<T> implements Serializer<T> {

private final DeserializationSchema<T> deserializationSchema;

FlinkDeserializer(DeserializationSchema<T> deserializationSchema) {
this.deserializationSchema = deserializationSchema;
}

@Override
public ByteBuffer serialize(T value) {
throw new IllegalStateException("serialize() called within a deserializer");
}

@Override
@SneakyThrows
public T deserialize(ByteBuffer buffer) {
byte[] array;
if (buffer.hasArray() && buffer.arrayOffset() == 0 && buffer.position() == 0 && buffer.limit() == buffer.capacity()) {
array = buffer.array();
} else {
array = new byte[buffer.remaining()];
buffer.get(array);
}

return deserializationSchema.deserialize(array);
}
}

/*
* Helper method that derives default reader name from stream and scope name
*/
private static String getDefaultReaderName(final String scope, final Set<String> streamNames) {
final String delimiter = "-";
final String reader = streamNames.stream().collect(Collectors.joining(delimiter)) + delimiter + scope;
int hash = 0;
for (int i = 0; i < reader.length(); i++) {
hash = reader.charAt(i) + (31 * hash);
}
return Integer.toString(hash);
}
}
Loading

0 comments on commit 12a2fe3

Please sign in to comment.