Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Issue #42] Add initial batch read support for Flink Pravega Connectors #54

Merged
merged 2 commits into from
Oct 10, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
179 changes: 179 additions & 0 deletions src/main/java/io/pravega/connectors/flink/FlinkPravegaInputFormat.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,179 @@
/**
* Copyright (c) 2017 Dell Inc., or its subsidiaries. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*/

package io.pravega.connectors.flink;

import com.google.common.base.Preconditions;

import io.pravega.client.admin.ReaderGroupManager;
import io.pravega.client.stream.EventRead;
import io.pravega.client.stream.EventStreamReader;
import io.pravega.client.stream.ReaderConfig;
import io.pravega.client.stream.ReaderGroupConfig;

import org.apache.flink.api.common.io.GenericInputFormat;
import org.apache.flink.api.common.io.InputFormat;
import org.apache.flink.core.io.GenericInputSplit;
import org.apache.flink.streaming.util.serialization.DeserializationSchema;

import java.io.IOException;
import java.net.URI;
import java.util.Set;

import static io.pravega.connectors.flink.util.FlinkPravegaUtils.createPravegaReader;
import static io.pravega.connectors.flink.util.FlinkPravegaUtils.generateRandomReaderGroupName;

/**
* A Flink {@link InputFormat} that can be added as a source to read from Pravega in a Flink batch job.
*/
public class FlinkPravegaInputFormat<T> extends GenericInputFormat<T> {

private static final long serialVersionUID = 1L;

private static final long DEFAULT_EVENT_READ_TIMEOUT = 1000;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is fine, but on the optimistic side, perhaps a value like 10s would suit better. If this is a batch read and all the stream data is there, then we should in the absence of any glitches never time out.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This timeout also affects job completion, since we don't have a true EOF. Elsewhere a code comment suggests using a retry to disambiguate EOF, which would be better than an increased timeout.


// The supplied event deserializer.
private final DeserializationSchema<T> deserializationSchema;

// The pravega controller endpoint.
private final URI controllerURI;

// The scope name of the destination stream.
private final String scopeName;

// The readergroup name to coordinate the parallel readers. This should be unique for a Flink job.
private final String readerGroupName;

// The names of Pravega streams to read
private final Set<String> streamNames;

// The configured start time for the reader
private final long startTime;

// The event read timeout
private long eventReadTimeout = DEFAULT_EVENT_READ_TIMEOUT;

// The Pravega reader; a new reader will be opened for each input split
private transient EventStreamReader<T> pravegaReader;

// Read-ahead event; null indicates that end of input is reached
private transient EventRead<T> lastReadAheadEvent;

/**
* Creates a new Flink Pravega {@link InputFormat} which can be added as a source to a Flink batch job.
*
* <p>The number of created input splits is equivalent to the parallelism of the source. For each input split,
* a Pravega reader will be created to read from the specified Pravega streams. Each input split is closed when
* the next read event returns {@code null} on {@link EventRead#getEvent()}.
*
* @param controllerURI The pravega controller endpoint address.
* @param scope The destination stream's scope name.
* @param streamNames The list of stream names to read events from.
* @param startTime The start time from when to read events from.
* Use 0 to read all stream events from the beginning.
* @param deserializationSchema The implementation to deserialize events from pravega streams.
*/
public FlinkPravegaInputFormat(
final URI controllerURI,
final String scope,
final Set<String> streamNames,
final long startTime,
final DeserializationSchema<T> deserializationSchema) {

Preconditions.checkNotNull(controllerURI, "controllerURI");
Preconditions.checkNotNull(scope, "scope");
Preconditions.checkNotNull(streamNames, "streamNames");
Preconditions.checkArgument(startTime >= 0, "start time must be >= 0");
Preconditions.checkNotNull(deserializationSchema, "deserializationSchema");

this.controllerURI = controllerURI;
this.scopeName = scope;
this.deserializationSchema = deserializationSchema;
this.streamNames = streamNames;
this.startTime = startTime;
this.readerGroupName = generateRandomReaderGroupName();

// TODO: This will require the client to have access to the pravega controller and handle any temporary errors.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As a general suggestion, I like the idea of creating issues for TODOs in the code so that we can track what we need to fix in the future.

ReaderGroupManager.withScope(scopeName, controllerURI)
.createReaderGroup(this.readerGroupName, ReaderGroupConfig.builder().startingTime(startTime).build(),
streamNames);
}

// ------------------------------------------------------------------------
// User configurations
// ------------------------------------------------------------------------

/**
* Gets the timeout for the call to read events from Pravega. After the timeout
* expires (without an event being returned), another call will be made.
*
* <p>This timeout is passed to {@link EventStreamReader#readNextEvent(long)}.
*
* @param eventReadTimeout The timeout, in milliseconds
*/
public void setEventReadTimeout(long eventReadTimeout) {
Preconditions.checkArgument(eventReadTimeout > 0, "timeout must be >= 0");
this.eventReadTimeout = eventReadTimeout;
}

/**
* Gets the timeout for the call to read events from Pravega.
*
* <p>This timeout is the value passed to {@link EventStreamReader#readNextEvent(long)}.
*
* @return The timeout, in milliseconds
*/
public long getEventReadTimeout() {
return eventReadTimeout;
}

// ------------------------------------------------------------------------
// Input split life cycle methods
// ------------------------------------------------------------------------

@Override
public void open(GenericInputSplit split) throws IOException {
super.open(split);

// build a new reader for each input split
this.pravegaReader = createPravegaReader(
this.scopeName,
this.controllerURI,
getRuntimeContext().getTaskNameWithSubtasks(),
this.readerGroupName,
this.deserializationSchema,
ReaderConfig.builder().build());
}

@Override
public boolean reachedEnd() throws IOException {
// look ahead to see if we have reached the end of input
try {
this.lastReadAheadEvent = pravegaReader.readNextEvent(eventReadTimeout);
} catch (Exception e) {
throw new IOException("Failed to read next event.", e);
}

// TODO this "end of input" marker is too brittle, as the timeout could easily be a temporary hiccup;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See pravega/pravega#1916. We can fix this here once we complete the work of that issue.

// TODO to make this more robust, we could loop and try to fetch a few more times before concluding end of input
return lastReadAheadEvent.getEvent() == null;
}

@Override
public T nextRecord(T t) throws IOException {
// reachedEnd() will be checked first, so lastReadAheadEvent should never be null
return lastReadAheadEvent.getEvent();
}

@Override
public void close() throws IOException {
this.pravegaReader.close();
}
}
77 changes: 12 additions & 65 deletions src/main/java/io/pravega/connectors/flink/FlinkPravegaReader.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,24 +9,17 @@
*/
package io.pravega.connectors.flink;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;

import io.pravega.client.ClientFactory;
import io.pravega.client.admin.ReaderGroupManager;
import io.pravega.client.stream.Checkpoint;
import io.pravega.client.stream.EventRead;
import io.pravega.client.stream.EventStreamReader;
import io.pravega.client.stream.ReaderConfig;
import io.pravega.client.stream.ReaderGroupConfig;
import io.pravega.client.stream.Serializer;
import io.pravega.connectors.flink.serialization.WrappingSerializer;

import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;

import org.apache.commons.lang3.RandomStringUtils;

import org.apache.flink.api.common.functions.StoppableFunction;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
Expand All @@ -38,10 +31,12 @@
import org.apache.flink.util.FlinkException;

import java.net.URI;
import java.nio.ByteBuffer;
import java.util.Set;
import java.util.concurrent.ScheduledExecutorService;
import java.util.stream.Collectors;

import static io.pravega.connectors.flink.util.FlinkPravegaUtils.createPravegaReader;
import static io.pravega.connectors.flink.util.FlinkPravegaUtils.generateRandomReaderGroupName;
import static io.pravega.connectors.flink.util.FlinkPravegaUtils.getDefaultReaderName;

/**
* Flink source implementation for reading from pravega storage.
Expand Down Expand Up @@ -153,7 +148,7 @@ public FlinkPravegaReader(final URI controllerURI, final String scope, final Set
this.controllerURI = controllerURI;
this.scopeName = scope;
this.deserializationSchema = deserializationSchema;
this.readerGroupName = "flink" + RandomStringUtils.randomAlphanumeric(20).toLowerCase();
this.readerGroupName = generateRandomReaderGroupName();

// TODO: This will require the client to have access to the pravega controller and handle any temporary errors.
// See https://github.com/pravega/pravega/issues/553.
Expand Down Expand Up @@ -229,15 +224,13 @@ public void run(SourceContext<T> ctx) throws Exception {
log.info("{} : Creating Pravega reader with ID '{}' for controller URI: {}",
getRuntimeContext().getTaskNameWithSubtasks(), readerId, this.controllerURI);

// create the adapter between Pravega's serializers and Flink's serializers
@SuppressWarnings("unchecked")
final Serializer<T> deserializer = this.deserializationSchema instanceof WrappingSerializer ?
((WrappingSerializer<T>) this.deserializationSchema).getWrappedSerializer() :
new FlinkDeserializer<>(this.deserializationSchema);

// build the reader
try (EventStreamReader<T> pravegaReader = ClientFactory.withScope(this.scopeName, this.controllerURI)
.createReader(readerId, this.readerGroupName, deserializer, ReaderConfig.builder().build())) {
try (EventStreamReader<T> pravegaReader = createPravegaReader(
this.scopeName,
this.controllerURI,
readerId,
this.readerGroupName,
this.deserializationSchema,
ReaderConfig.builder().build())) {

log.info("Starting Pravega reader '{}' for controller URI {}", readerId, this.controllerURI);

Expand Down Expand Up @@ -323,50 +316,4 @@ private void triggerCheckpoint(String checkpointIdentifier) throws FlinkExceptio

checkpointTrigger.triggerCheckpoint(checkpointId);
}

// ------------------------------------------------------------------------
// serializer
// ------------------------------------------------------------------------

@VisibleForTesting
static final class FlinkDeserializer<T> implements Serializer<T> {

private final DeserializationSchema<T> deserializationSchema;

FlinkDeserializer(DeserializationSchema<T> deserializationSchema) {
this.deserializationSchema = deserializationSchema;
}

@Override
public ByteBuffer serialize(T value) {
throw new IllegalStateException("serialize() called within a deserializer");
}

@Override
@SneakyThrows
public T deserialize(ByteBuffer buffer) {
byte[] array;
if (buffer.hasArray() && buffer.arrayOffset() == 0 && buffer.position() == 0 && buffer.limit() == buffer.capacity()) {
array = buffer.array();
} else {
array = new byte[buffer.remaining()];
buffer.get(array);
}

return deserializationSchema.deserialize(array);
}
}

/*
* Helper method that derives default reader name from stream and scope name
*/
private static String getDefaultReaderName(final String scope, final Set<String> streamNames) {
final String delimiter = "-";
final String reader = streamNames.stream().collect(Collectors.joining(delimiter)) + delimiter + scope;
int hash = 0;
for (int i = 0; i < reader.length(); i++) {
hash = reader.charAt(i) + (31 * hash);
}
return Integer.toString(hash);
}
}
Loading