diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/AsyncLookupJoinITCase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/AsyncLookupJoinITCase.scala index 1a4ffb89661906..5bb390596ead46 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/AsyncLookupJoinITCase.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/AsyncLookupJoinITCase.scala @@ -38,7 +38,10 @@ import java.util.{Collection => JCollection} import scala.collection.JavaConversions._ @RunWith(classOf[Parameterized]) -class AsyncLookupJoinITCase(legacyTableSource: Boolean, backend: StateBackendMode) +class AsyncLookupJoinITCase( + legacyTableSource: Boolean, + backend: StateBackendMode, + objectReuse: Boolean) extends StreamingWithStateTestBase(backend) { val data = List( @@ -53,9 +56,11 @@ class AsyncLookupJoinITCase(legacyTableSource: Boolean, backend: StateBackendMod @Before override def before(): Unit = { super.before() - // TODO: remove this until [FLINK-12351] is fixed. - // currently AsyncWaitOperator doesn't copy input element which is a bug - env.getConfig.disableObjectReuse() + if (objectReuse) { + env.getConfig.enableObjectReuse() + } else { + env.getConfig.disableObjectReuse() + } createScanTable("src", data) createLookupTable("user_table", userData) @@ -298,13 +303,13 @@ class AsyncLookupJoinITCase(legacyTableSource: Boolean, backend: StateBackendMod } object AsyncLookupJoinITCase { - @Parameterized.Parameters(name = "LegacyTableSource={0}, StateBackend={1}") + @Parameterized.Parameters(name = "LegacyTableSource={0}, StateBackend={1}, ObjectReuse={2}") def parameters(): JCollection[Array[Object]] = { Seq[Array[AnyRef]]( - Array(JBoolean.TRUE, HEAP_BACKEND), - Array(JBoolean.TRUE, ROCKSDB_BACKEND), - Array(JBoolean.FALSE, HEAP_BACKEND), - Array(JBoolean.FALSE, ROCKSDB_BACKEND) + Array(JBoolean.TRUE, HEAP_BACKEND, JBoolean.TRUE), + Array(JBoolean.TRUE, ROCKSDB_BACKEND, JBoolean.FALSE), + Array(JBoolean.FALSE, HEAP_BACKEND, JBoolean.FALSE), + Array(JBoolean.FALSE, ROCKSDB_BACKEND, JBoolean.TRUE) ) } }