Skip to content

Commit

Permalink
[SPARK-30901][DOCS] Fix doc exemple with deprecated codes
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?

Previous exemple given for spark-streaming-kinesis was true for Apache Spark < 2.3.0. After that the method used in exemple became deprecated:
deprecated("use initialPosition(initialPosition: KinesisInitialPosition)", "2.3.0")
def initialPositionInStream(initialPosition: InitialPositionInStream)

This PR updates the doc on rewriting exemple in Scala/Java (remain unchanged in Python) to adapt Apache Spark 2.4.0 + releases.

### Why are the changes needed?

It introduces some confusion for developers to test their spark-streaming-kinesis exemple.

### Does this PR introduce any user-facing change?

No

### How was this patch tested?

In my opinion, the change is only about the documentation level, so I did not add any special test.

Closes apache#27652 from supaggregator/SPARK-30901.

Authored-by: XU Duo <Duo.XU@canal-plus.com>
Signed-off-by: Sean Owen <srowen@gmail.com>
  • Loading branch information
duo-xu authored and Seongjin Cho committed Apr 14, 2020
1 parent 0b416de commit cc914a7
Showing 1 changed file with 12 additions and 12 deletions.
24 changes: 12 additions & 12 deletions docs/streaming-kinesis-integration.md
Original file line number Diff line number Diff line change
Expand Up @@ -46,14 +46,14 @@ A Kinesis stream can be set up at one of the valid Kinesis endpoints with 1 or m
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.kinesis.KinesisInputDStream
import org.apache.spark.streaming.{Seconds, StreamingContext}
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream
import org.apache.spark.streaming.kinesis.KinesisInitialPositions

val kinesisStream = KinesisInputDStream.builder
.streamingContext(streamingContext)
.endpointUrl([endpoint URL])
.regionName([region name])
.streamName([streamName])
.initialPositionInStream([initial position])
.initialPosition([initial position])
.checkpointAppName([Kinesis app name])
.checkpointInterval([checkpoint interval])
.storageLevel(StorageLevel.MEMORY_AND_DISK_2)
Expand All @@ -68,14 +68,14 @@ A Kinesis stream can be set up at one of the valid Kinesis endpoints with 1 or m
import org.apache.spark.streaming.kinesis.KinesisInputDStream;
import org.apache.spark.streaming.Seconds;
import org.apache.spark.streaming.StreamingContext;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream;
import org.apache.spark.streaming.kinesis.KinesisInitialPositions;

KinesisInputDStream<byte[]> kinesisStream = KinesisInputDStream.builder()
.streamingContext(streamingContext)
.endpointUrl([endpoint URL])
.regionName([region name])
.streamName([streamName])
.initialPositionInStream([initial position])
.initialPosition([initial position])
.checkpointAppName([Kinesis app name])
.checkpointInterval([checkpoint interval])
.storageLevel(StorageLevel.MEMORY_AND_DISK_2)
Expand Down Expand Up @@ -110,7 +110,7 @@ A Kinesis stream can be set up at one of the valid Kinesis endpoints with 1 or m
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.kinesis.KinesisInputDStream
import org.apache.spark.streaming.{Seconds, StreamingContext}
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream
import org.apache.spark.streaming.kinesis.KinesisInitialPositions
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration
import com.amazonaws.services.kinesis.metrics.interfaces.MetricsLevel

Expand All @@ -119,7 +119,7 @@ A Kinesis stream can be set up at one of the valid Kinesis endpoints with 1 or m
.endpointUrl([endpoint URL])
.regionName([region name])
.streamName([streamName])
.initialPositionInStream([initial position])
.initialPosition([initial position])
.checkpointAppName([Kinesis app name])
.checkpointInterval([checkpoint interval])
.storageLevel(StorageLevel.MEMORY_AND_DISK_2)
Expand All @@ -133,7 +133,7 @@ A Kinesis stream can be set up at one of the valid Kinesis endpoints with 1 or m
import org.apache.spark.streaming.kinesis.KinesisInputDStream;
import org.apache.spark.streaming.Seconds;
import org.apache.spark.streaming.StreamingContext;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream;
import org.apache.spark.streaming.kinesis.KinesisInitialPositions;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration;
import com.amazonaws.services.kinesis.metrics.interfaces.MetricsLevel;
import scala.collection.JavaConverters;
Expand All @@ -143,7 +143,7 @@ A Kinesis stream can be set up at one of the valid Kinesis endpoints with 1 or m
.endpointUrl([endpoint URL])
.regionName([region name])
.streamName([streamName])
.initialPositionInStream([initial position])
.initialPosition([initial position])
.checkpointAppName([Kinesis app name])
.checkpointInterval([checkpoint interval])
.storageLevel(StorageLevel.MEMORY_AND_DISK_2)
Expand All @@ -170,7 +170,7 @@ A Kinesis stream can be set up at one of the valid Kinesis endpoints with 1 or m

- `[checkpoint interval]`: The interval (e.g., Duration(2000) = 2 seconds) at which the Kinesis Client Library saves its position in the stream. For starters, set it to the same as the batch interval of the streaming application.

- `[initial position]`: Can be either `InitialPositionInStream.TRIM_HORIZON` or `InitialPositionInStream.LATEST` (see [`Kinesis Checkpointing`](#kinesis-checkpointing) section and [`Amazon Kinesis API documentation`](http://docs.aws.amazon.com/streams/latest/dev/developing-consumers-with-sdk.html) for more details).
- `[initial position]`: Can be either `KinesisInitialPositions.TrimHorizon` or `KinesisInitialPositions.Latest` or `KinesisInitialPositions.AtTimestamp` (see [`Kinesis Checkpointing`](#kinesis-checkpointing) section and [`Amazon Kinesis API documentation`](http://docs.aws.amazon.com/streams/latest/dev/developing-consumers-with-sdk.html) for more details).

- `[message handler]`: A function that takes a Kinesis `Record` and outputs generic `T`.

Expand Down Expand Up @@ -272,9 +272,9 @@ de-aggregate records during consumption.

- Checkpointing too frequently will cause excess load on the AWS checkpoint storage layer and may lead to AWS throttling. The provided example handles this throttling with a random-backoff-retry strategy.

- If no Kinesis checkpoint info exists when the input DStream starts, it will start either from the oldest record available (`InitialPositionInStream.TRIM_HORIZON`) or from the latest tip (`InitialPositionInStream.LATEST`). This is configurable.
- `InitialPositionInStream.LATEST` could lead to missed records if data is added to the stream while no input DStreams are running (and no checkpoint info is being stored).
- `InitialPositionInStream.TRIM_HORIZON` may lead to duplicate processing of records where the impact is dependent on checkpoint frequency and processing idempotency.
- If no Kinesis checkpoint info exists when the input DStream starts, it will start either from the oldest record available (`KinesisInitialPositions.TrimHorizon`), or from the latest tip (`KinesisInitialPositions.Latest`), or (except Python) from the position denoted by the provided UTC timestamp (`KinesisInitialPositions.AtTimestamp(Date timestamp)`). This is configurable.
- `KinesisInitialPositions.Latest` could lead to missed records if data is added to the stream while no input DStreams are running (and no checkpoint info is being stored).
- `KinesisInitialPositions.TrimHorizon` may lead to duplicate processing of records where the impact is dependent on checkpoint frequency and processing idempotency.

#### Kinesis retry configuration
- `spark.streaming.kinesis.retry.waitTime` : Wait time between Kinesis retries as a duration string. When reading from Amazon Kinesis, users may hit `ProvisionedThroughputExceededException`'s, when consuming faster than 5 transactions/second or, exceeding the maximum read rate of 2 MiB/second. This configuration can be tweaked to increase the sleep between fetches when a fetch fails to reduce these exceptions. Default is "100ms".
Expand Down

0 comments on commit cc914a7

Please sign in to comment.