Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

rptest: Refactor consumer validation and tune table idle settings #16250

Merged
merged 5 commits into from
Jan 29, 2024

Conversation

savex
Copy link
Contributor

@savex savex commented Jan 23, 2024

Since Table API consumer waits for the index indefinitely, update validation to

  • first wait for data file to be created
  • then wait for data files to have proper index value

This PR also fixes logs copy for docker envs by properly getting hostname

Fixes: redpanda-data/devprod#1031

Backports Required

  • none - not a bug fix
  • none - this is a backport
  • none - issue does not exist in previous branches
  • none - papercut/not impactful enough to backport
  • v23.3.x
  • v23.2.x
  • v23.1.x

Release Notes

  • none

@savex savex requested a review from bharathv January 23, 2024 19:07
@savex savex marked this pull request as ready for review January 23, 2024 19:08
@savex savex marked this pull request as draft January 24, 2024 00:12
@savex savex force-pushed the dp-1031-fix-transaction-flackyness branch 5 times, most recently from 93cbf3d to 36adda4 Compare January 24, 2024 18:19
@savex savex marked this pull request as ready for review January 24, 2024 18:19
@savex savex force-pushed the dp-1031-fix-transaction-flackyness branch from 36adda4 to 7c37a4f Compare January 24, 2024 21:56
@@ -128,6 +128,12 @@ def setup(self):
table_env = StreamTableEnvironment.create(
stream_execution_environment=env, environment_settings=settings)

# Tune table idle state handling
# Clear the state if it has not changed
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just curious: are these needed for this fix or these just nice to have

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a safe guard to trigger cleanups/aborts for ongoing actions/transformations. Like we do not want flink to wait infinitely (default) for other party to answer if Table API is querying something in RP

tests/rptest/tests/flink_basic_test.py Show resolved Hide resolved
@bharathv
Copy link
Contributor

/ci-repeat 1
skip-units
dt-repeat=20
tests/rptest/tests/flink_basic_test.py

@savex
Copy link
Contributor Author

savex commented Jan 25, 2024

/ci-repeat 1
skip-units
dt-repeat=10
tests/rptest/tests/flink_basic_test.py

   Use internal jobmanager metrics to detect if job's
   subtasks (vertices) has been idle for 30 sec. Key metrics
   used is accumulated-idle-time and accumulated-busy-time
@savex savex force-pushed the dp-1031-fix-transaction-flackyness branch from 10a1656 to bd25808 Compare January 25, 2024 21:21
@redpanda-data redpanda-data deleted a comment from CLAassistant Jan 25, 2024
@redpanda-data redpanda-data deleted a comment from CLAassistant Jan 25, 2024
@savex
Copy link
Contributor Author

savex commented Jan 25, 2024

EC2 check

------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
test_id:    rptest.tests.flink_basic_test.FlinkBasicTests.test_transaction_workload
status:     PASS
run time:   1 minute 46.423 seconds
------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
==============================================================================================================================================================================================================================================================================================================================
SESSION REPORT (ALL TESTS)
ducktape version: 0.8.18
session_id:       2024-01-25--025
run time:         48 minutes 55.396 seconds
tests run:        20
passed:           20
flaky:            0
failed:           0
ignored:          0
opassed:          0
ofailed:          0
==============================================================================================================================================================================================================================================================================================================================

@savex
Copy link
Contributor Author

savex commented Jan 25, 2024

/ci-repeat 1
skip-units
dt-repeat=20
tests/rptest/tests/flink_basic_test.py

@savex savex requested a review from bharathv January 25, 2024 22:11
@vbotbuildovich
Copy link
Collaborator

vbotbuildovich commented Jan 25, 2024

new failures in https://buildkite.com/redpanda/redpanda/builds/44317#018d42dc-79df-4324-8242-621b6f201002:

"rptest.tests.flink_basic_test.FlinkBasicTests.test_transaction_workload"
"rptest.tests.flink_basic_test.FlinkBasicTests.test_transaction_workload"
"rptest.tests.flink_basic_test.FlinkBasicTests.test_transaction_workload"
"rptest.tests.flink_basic_test.FlinkBasicTests.test_transaction_workload"
"rptest.tests.flink_basic_test.FlinkBasicTests.test_transaction_workload"
"rptest.tests.flink_basic_test.FlinkBasicTests.test_transaction_workload"
"rptest.tests.flink_basic_test.FlinkBasicTests.test_transaction_workload"
"rptest.tests.flink_basic_test.FlinkBasicTests.test_transaction_workload"
"rptest.tests.flink_basic_test.FlinkBasicTests.test_transaction_workload"

new failures in https://buildkite.com/redpanda/redpanda/builds/44312#018d4672-d00c-4fc6-84f7-dd4cd71c7e0c:

"rptest.tests.flink_basic_test.FlinkBasicTests.test_transaction_workload"

@savex
Copy link
Contributor Author

savex commented Jan 26, 2024

Caught the reason for above errors:

        Caused by: org.apache.flink.kafka.shaded.org.apache.kafka.common.errors.ProducerFencedException: There is a newer producer with the same transactionalId which fences the current one.
Caused by: org.apache.flink.util.FlinkRuntimeException: Failed to send data to Kafka flink_workload_topic-0@-1 with FlinkKafkaInternalProducer{transactionalId='flink_transaction_test_1-0-1', inTransaction=true, closed=false}
        at org.apache.flink.connector.kafka.sink.KafkaWriter$WriterCallback.decorateException(KafkaWriter.java:477) ~[?:?]
        at org.apache.flink.connector.kafka.sink.KafkaWriter$WriterCallback.onCompletion(KafkaWriter.java:451) ~[?:?]
        at org.apache.flink.kafka.shaded.org.apache.kafka.clients.producer.KafkaProducer$InterceptorCallback.onCompletion(KafkaProducer.java:1418) ~[?:?]
        at org.apache.flink.kafka.shaded.org.apache.kafka.clients.producer.internals.ProducerBatch.completeFutureAndFireCallbacks(ProducerBatch.java:273) ~[?:?]
        at org.apache.flink.kafka.shaded.org.apache.kafka.clients.producer.internals.ProducerBatch.abort(ProducerBatch.java:161) ~[?:?]
        at org.apache.flink.kafka.shaded.org.apache.kafka.clients.producer.internals.RecordAccumulator.abortBatches(RecordAccumulator.java:794) ~[?:?]
        at org.apache.flink.kafka.shaded.org.apache.kafka.clients.producer.internals.Sender.maybeAbortBatches(Sender.java:498) ~[?:?]
        at org.apache.flink.kafka.shaded.org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:307) ~[?:?]
        at org.apache.flink.kafka.shaded.org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:243) ~[?:?]

When flink parallelize INSERT operators to different jobs, it uses the same transactionId

@savex
Copy link
Contributor Author

savex commented Jan 26, 2024

This is the result of this issue.

    When flink parallelize jobs based on different INSERT operators it uses the same sink.transactional-id-prefix as set for single temporary table. This causes fencing on the RP side and as a result provides flackyness when run in slower environments (docker). This is fixed by create/delete a temporary table for each batch.
@savex savex self-assigned this Jan 26, 2024
@savex
Copy link
Contributor Author

savex commented Jan 29, 2024

/ci-repeat 1
skip-units
dt-repeat=20
tests/rptest/tests/flink_basic_test.py

   copy operation sometimes not finishes copying and/or
   not copying file at all. Rely on a ssh_output
   in scope of basic test
@savex savex force-pushed the dp-1031-fix-transaction-flackyness branch from abbd201 to 7506cef Compare January 29, 2024 19:19
@savex
Copy link
Contributor Author

savex commented Jan 29, 2024

/ci-repeat 1
skip-units
dt-repeat=20
tests/rptest/tests/flink_basic_test.py

active = [
job for job in jobs['jobs']
if job['status'] in self.job_active_statuses
]
return active

def _has_active_jobs(self):
def is_job_idle(self, jobid) -> bool:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is way too complicated lol, cant imagine flink API is this horrible, we are probably missing something.

@savex savex merged commit bc76650 into dev Jan 29, 2024
17 checks passed
@savex savex deleted the dp-1031-fix-transaction-flackyness branch January 29, 2024 23:13
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants