diff --git a/flink-end-to-end-tests/run-nightly-tests.sh b/flink-end-to-end-tests/run-nightly-tests.sh index 4cfd778960ddde..289868282ec1a2 100755 --- a/flink-end-to-end-tests/run-nightly-tests.sh +++ b/flink-end-to-end-tests/run-nightly-tests.sh @@ -113,17 +113,32 @@ if [ $EXIT_CODE == 0 ]; then fi if [ $EXIT_CODE == 0 ]; then - run_test "Resuming Externalized Checkpoint (file, async) end-to-end test" "$END_TO_END_DIR/test-scripts/test_resume_externalized_checkpoints.sh file true" + run_test "Resuming Externalized Checkpoint (file, async) end-to-end test" "$END_TO_END_DIR/test-scripts/test_resume_externalized_checkpoints.sh file true false" EXIT_CODE=$? fi if [ $EXIT_CODE == 0 ]; then - run_test "Resuming Externalized Checkpoint (file, sync) end-to-end test" "$END_TO_END_DIR/test-scripts/test_resume_externalized_checkpoints.sh file false" + run_test "Resuming Externalized Checkpoint (file, sync) end-to-end test" "$END_TO_END_DIR/test-scripts/test_resume_externalized_checkpoints.sh file false false" EXIT_CODE=$? fi if [ $EXIT_CODE == 0 ]; then - run_test "Resuming Externalized Checkpoint (rocks) end-to-end test" "$END_TO_END_DIR/test-scripts/test_resume_externalized_checkpoints.sh rocks" + run_test "Resuming Externalized Checkpoint (rocks) end-to-end test" "$END_TO_END_DIR/test-scripts/test_resume_externalized_checkpoints.sh rocks false" + EXIT_CODE=$? +fi + +if [ $EXIT_CODE == 0 ]; then + run_test "Resuming Externalized Checkpoint after terminal failure (file, async) end-to-end test" "$END_TO_END_DIR/test-scripts/test_resume_externalized_checkpoints.sh file true true" + EXIT_CODE=$? +fi + +if [ $EXIT_CODE == 0 ]; then + run_test "Resuming Externalized Checkpoint after terminal failure (file, sync) end-to-end test" "$END_TO_END_DIR/test-scripts/test_resume_externalized_checkpoints.sh file false true" + EXIT_CODE=$? +fi + +if [ $EXIT_CODE == 0 ]; then + run_test "Resuming Externalized Checkpoint after terminal failure (rocks) end-to-end test" "$END_TO_END_DIR/test-scripts/test_resume_externalized_checkpoints.sh rocks true" EXIT_CODE=$? fi diff --git a/flink-end-to-end-tests/test-scripts/common.sh b/flink-end-to-end-tests/test-scripts/common.sh index 94e179e6449912..18833cc85e8934 100644 --- a/flink-end-to-end-tests/test-scripts/common.sh +++ b/flink-end-to-end-tests/test-scripts/common.sh @@ -239,6 +239,23 @@ function wait_job_running { done } +function wait_job_terminal_state { + local job=$1 + local terminal_state=$2 + + echo "Waiting for job ($job) to reach terminal state $terminal_state ..." + + while : ; do + N=$(grep -o "Job $job reached globally terminal state $terminal_state" $FLINK_DIR/log/*standalonesession*.log | tail -1) + + if [[ -z $N ]]; then + sleep 1 + else + break + fi + done +} + function take_savepoint { "$FLINK_DIR"/bin/flink savepoint $1 $2 } diff --git a/flink-end-to-end-tests/test-scripts/test_resume_externalized_checkpoints.sh b/flink-end-to-end-tests/test-scripts/test_resume_externalized_checkpoints.sh index 3dc990965696f7..3994e3054e5af9 100755 --- a/flink-end-to-end-tests/test-scripts/test_resume_externalized_checkpoints.sh +++ b/flink-end-to-end-tests/test-scripts/test_resume_externalized_checkpoints.sh @@ -21,6 +21,7 @@ source "$(dirname "$0")"/common.sh STATE_BACKEND_TYPE=${1:-file} STATE_BACKEND_FILE_ASYNC=${2:-true} +SIMULATE_FAILURE=${3:-false} setup_flink_slf4j_metric_reporter start_cluster @@ -43,8 +44,11 @@ CHECKPOINT_DIR="$TEST_DATA_DIR/externalized-chckpt-e2e-backend-dir" CHECKPOINT_DIR_URI="file://$CHECKPOINT_DIR" # run the DataStream allroundjob + +echo "Running externalized checkpoints test, with STATE_BACKEND_TYPE=$STATE_BACKEND_TYPE STATE_BACKEND_FILE_ASYNC=$STATE_BACKEND_FILE_ASYNC SIMULATE_FAILURE=$SIMULATE_FAILURE ..." + TEST_PROGRAM_JAR=$TEST_INFRA_DIR/../../flink-end-to-end-tests/flink-datastream-allround-test/target/DataStreamAllroundTestProgram.jar -DATASTREAM_JOB=$($FLINK_DIR/bin/flink run -d $TEST_PROGRAM_JAR \ +BASE_JOB_CMD="$FLINK_DIR/bin/flink run -d $TEST_PROGRAM_JAR \ --test.semantics exactly-once \ --environment.externalize_checkpoint true \ --environment.externalize_checkpoint.cleanup retain \ @@ -52,15 +56,35 @@ DATASTREAM_JOB=$($FLINK_DIR/bin/flink run -d $TEST_PROGRAM_JAR \ --state_backend.checkpoint_directory $CHECKPOINT_DIR_URI \ --state_backend.file.async $STATE_BACKEND_FILE_ASYNC \ --sequence_generator_source.sleep_time 15 \ - --sequence_generator_source.sleep_after_elements 1 \ - | grep "Job has been submitted with JobID" | sed 's/.* //g') + --sequence_generator_source.sleep_after_elements 1" + +JOB_CMD="" +if [[ $SIMULATE_FAILURE == "true" ]]; then + # the submitted job should fail after at least 1 complete checkpoint. + # When simulating failures with the general purpose DataStream job, + # we disable restarting because we want to manually do that after the job fails. + JOB_CMD="$BASE_JOB_CMD \ + --test.simulate_failure true \ + --test.simulate_failure.num_records 200 \ + --test.simulate_failure.num_checkpoints 1 \ + --test.simulate_failure.max_failures 1 \ + --environment.restart_strategy no_restart" +else + JOB_CMD=$BASE_JOB_CMD +fi + +DATASTREAM_JOB=$($JOB_CMD | grep "Job has been submitted with JobID" | sed 's/.* //g') wait_job_running $DATASTREAM_JOB -wait_num_checkpoints $DATASTREAM_JOB 1 -wait_oper_metric_num_in_records ArtificalKeyedStateMapper.0 200 +if [[ $SIMULATE_FAILURE == "true" ]]; then + wait_job_terminal_state $DATASTREAM_JOB FAILED +else + wait_num_checkpoints $DATASTREAM_JOB 1 + wait_oper_metric_num_in_records ArtificalKeyedStateMapper.0 200 -cancel_job $DATASTREAM_JOB + cancel_job $DATASTREAM_JOB +fi CHECKPOINT_PATH=$(ls -d $CHECKPOINT_DIR/$DATASTREAM_JOB/chk-[1-9]*) @@ -78,19 +102,9 @@ if (( $NUM_CHECKPOINTS > 1 )); then fi echo "Restoring job with externalized checkpoint at $CHECKPOINT_PATH ..." -DATASTREAM_JOB=$($FLINK_DIR/bin/flink run -s $CHECKPOINT_PATH -d $TEST_PROGRAM_JAR \ - --test.semantics exactly-once \ - --environment.externalize_checkpoint true \ - --environment.externalize_checkpoint.cleanup retain \ - --state_backend $STATE_BACKEND_TYPE \ - --state_backend.checkpoint_directory $CHECKPOINT_DIR_URI \ - --state_backend.file.async $STATE_BACKEND_FILE_ASYNC \ - --sequence_generator_source.sleep_time 15 \ - --sequence_generator_source.sleep_after_elements 1 \ - | grep "Job has been submitted with JobID" | sed 's/.* //g') +DATASTREAM_JOB=$($BASE_JOB_CMD | grep "Job has been submitted with JobID" | sed 's/.* //g') wait_job_running $DATASTREAM_JOB - wait_oper_metric_num_in_records ArtificalKeyedStateMapper.0 200 # if state is errorneous and the general purpose DataStream job produces alerting messages,