Skip to content

Commit

Permalink
[FLINK-8977] [e2e] Extend externalized checkpoint e2e to simulate job…
Browse files Browse the repository at this point in the history
… failures

This closes apache#6004.
  • Loading branch information
tzulitai committed May 18, 2018
1 parent 87f76a6 commit be002ba
Show file tree
Hide file tree
Showing 3 changed files with 66 additions and 20 deletions.
21 changes: 18 additions & 3 deletions flink-end-to-end-tests/run-nightly-tests.sh
Expand Up @@ -113,17 +113,32 @@ if [ $EXIT_CODE == 0 ]; then
fi

if [ $EXIT_CODE == 0 ]; then
run_test "Resuming Externalized Checkpoint (file, async) end-to-end test" "$END_TO_END_DIR/test-scripts/test_resume_externalized_checkpoints.sh file true"
run_test "Resuming Externalized Checkpoint (file, async) end-to-end test" "$END_TO_END_DIR/test-scripts/test_resume_externalized_checkpoints.sh file true false"
EXIT_CODE=$?
fi

if [ $EXIT_CODE == 0 ]; then
run_test "Resuming Externalized Checkpoint (file, sync) end-to-end test" "$END_TO_END_DIR/test-scripts/test_resume_externalized_checkpoints.sh file false"
run_test "Resuming Externalized Checkpoint (file, sync) end-to-end test" "$END_TO_END_DIR/test-scripts/test_resume_externalized_checkpoints.sh file false false"
EXIT_CODE=$?
fi

if [ $EXIT_CODE == 0 ]; then
run_test "Resuming Externalized Checkpoint (rocks) end-to-end test" "$END_TO_END_DIR/test-scripts/test_resume_externalized_checkpoints.sh rocks"
run_test "Resuming Externalized Checkpoint (rocks) end-to-end test" "$END_TO_END_DIR/test-scripts/test_resume_externalized_checkpoints.sh rocks false"
EXIT_CODE=$?
fi

if [ $EXIT_CODE == 0 ]; then
run_test "Resuming Externalized Checkpoint after terminal failure (file, async) end-to-end test" "$END_TO_END_DIR/test-scripts/test_resume_externalized_checkpoints.sh file true true"
EXIT_CODE=$?
fi

if [ $EXIT_CODE == 0 ]; then
run_test "Resuming Externalized Checkpoint after terminal failure (file, sync) end-to-end test" "$END_TO_END_DIR/test-scripts/test_resume_externalized_checkpoints.sh file false true"
EXIT_CODE=$?
fi

if [ $EXIT_CODE == 0 ]; then
run_test "Resuming Externalized Checkpoint after terminal failure (rocks) end-to-end test" "$END_TO_END_DIR/test-scripts/test_resume_externalized_checkpoints.sh rocks true"
EXIT_CODE=$?
fi

Expand Down
17 changes: 17 additions & 0 deletions flink-end-to-end-tests/test-scripts/common.sh
Expand Up @@ -239,6 +239,23 @@ function wait_job_running {
done
}

function wait_job_terminal_state {
local job=$1
local terminal_state=$2

echo "Waiting for job ($job) to reach terminal state $terminal_state ..."

while : ; do
N=$(grep -o "Job $job reached globally terminal state $terminal_state" $FLINK_DIR/log/*standalonesession*.log | tail -1)

if [[ -z $N ]]; then
sleep 1
else
break
fi
done
}

function take_savepoint {
"$FLINK_DIR"/bin/flink savepoint $1 $2
}
Expand Down
Expand Up @@ -21,6 +21,7 @@ source "$(dirname "$0")"/common.sh

STATE_BACKEND_TYPE=${1:-file}
STATE_BACKEND_FILE_ASYNC=${2:-true}
SIMULATE_FAILURE=${3:-false}

setup_flink_slf4j_metric_reporter
start_cluster
Expand All @@ -43,24 +44,47 @@ CHECKPOINT_DIR="$TEST_DATA_DIR/externalized-chckpt-e2e-backend-dir"
CHECKPOINT_DIR_URI="file://$CHECKPOINT_DIR"

# run the DataStream allroundjob

echo "Running externalized checkpoints test, with STATE_BACKEND_TYPE=$STATE_BACKEND_TYPE STATE_BACKEND_FILE_ASYNC=$STATE_BACKEND_FILE_ASYNC SIMULATE_FAILURE=$SIMULATE_FAILURE ..."

TEST_PROGRAM_JAR=$TEST_INFRA_DIR/../../flink-end-to-end-tests/flink-datastream-allround-test/target/DataStreamAllroundTestProgram.jar
DATASTREAM_JOB=$($FLINK_DIR/bin/flink run -d $TEST_PROGRAM_JAR \
BASE_JOB_CMD="$FLINK_DIR/bin/flink run -d $TEST_PROGRAM_JAR \
--test.semantics exactly-once \
--environment.externalize_checkpoint true \
--environment.externalize_checkpoint.cleanup retain \
--state_backend $STATE_BACKEND_TYPE \
--state_backend.checkpoint_directory $CHECKPOINT_DIR_URI \
--state_backend.file.async $STATE_BACKEND_FILE_ASYNC \
--sequence_generator_source.sleep_time 15 \
--sequence_generator_source.sleep_after_elements 1 \
| grep "Job has been submitted with JobID" | sed 's/.* //g')
--sequence_generator_source.sleep_after_elements 1"

JOB_CMD=""
if [[ $SIMULATE_FAILURE == "true" ]]; then
# the submitted job should fail after at least 1 complete checkpoint.
# When simulating failures with the general purpose DataStream job,
# we disable restarting because we want to manually do that after the job fails.
JOB_CMD="$BASE_JOB_CMD \
--test.simulate_failure true \
--test.simulate_failure.num_records 200 \
--test.simulate_failure.num_checkpoints 1 \
--test.simulate_failure.max_failures 1 \
--environment.restart_strategy no_restart"
else
JOB_CMD=$BASE_JOB_CMD
fi

DATASTREAM_JOB=$($JOB_CMD | grep "Job has been submitted with JobID" | sed 's/.* //g')

wait_job_running $DATASTREAM_JOB

wait_num_checkpoints $DATASTREAM_JOB 1
wait_oper_metric_num_in_records ArtificalKeyedStateMapper.0 200
if [[ $SIMULATE_FAILURE == "true" ]]; then
wait_job_terminal_state $DATASTREAM_JOB FAILED
else
wait_num_checkpoints $DATASTREAM_JOB 1
wait_oper_metric_num_in_records ArtificalKeyedStateMapper.0 200

cancel_job $DATASTREAM_JOB
cancel_job $DATASTREAM_JOB
fi

CHECKPOINT_PATH=$(ls -d $CHECKPOINT_DIR/$DATASTREAM_JOB/chk-[1-9]*)

Expand All @@ -78,19 +102,9 @@ if (( $NUM_CHECKPOINTS > 1 )); then
fi

echo "Restoring job with externalized checkpoint at $CHECKPOINT_PATH ..."
DATASTREAM_JOB=$($FLINK_DIR/bin/flink run -s $CHECKPOINT_PATH -d $TEST_PROGRAM_JAR \
--test.semantics exactly-once \
--environment.externalize_checkpoint true \
--environment.externalize_checkpoint.cleanup retain \
--state_backend $STATE_BACKEND_TYPE \
--state_backend.checkpoint_directory $CHECKPOINT_DIR_URI \
--state_backend.file.async $STATE_BACKEND_FILE_ASYNC \
--sequence_generator_source.sleep_time 15 \
--sequence_generator_source.sleep_after_elements 1 \
| grep "Job has been submitted with JobID" | sed 's/.* //g')
DATASTREAM_JOB=$($BASE_JOB_CMD | grep "Job has been submitted with JobID" | sed 's/.* //g')

wait_job_running $DATASTREAM_JOB

wait_oper_metric_num_in_records ArtificalKeyedStateMapper.0 200

# if state is errorneous and the general purpose DataStream job produces alerting messages,
Expand Down

0 comments on commit be002ba

Please sign in to comment.