Set Indexing related executor threads priority to LOW #27153
Set Indexing related executor threads priority to LOW #27153mohityadav766 wants to merge 7 commits intomainfrom
Conversation
...e/src/main/java/org/openmetadata/service/apps/bundles/searchIndex/ElasticSearchBulkSink.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Pull request overview
This PR aims to reduce memory/CPU overhead during search reindexing by avoiding repeated JSON parsing/metric registration and by lowering thread priority for reindex-related executors, plus adds a script to help quantify GC impact during reindex runs.
Changes:
- Added
toJsonData(Object)helpers for ES/OS clients and updated bulk sinks to buildJsonDatafrom POJOs instead of parsing JSON strings. - Lowered thread priorities for reindex job/producer/consumer pools and bulk flush schedulers; introduced small in-memory caches for context data and Micrometer counters.
- Added
gc-reindex-report.shto collect/compare GC and health-probe behavior during a reindex.
Reviewed changes
Copilot reviewed 6 out of 7 changed files in this pull request and generated 6 comments.
Show a summary per file
| File | Description |
|---|---|
| openmetadata-service/src/main/java/org/openmetadata/service/search/opensearch/OsUtils.java | Adds toJsonData(Object) helper for OpenSearch JSONP serialization. |
| openmetadata-service/src/main/java/org/openmetadata/service/search/elasticsearch/EsUtils.java | Adds toJsonData(Object) helper for Elasticsearch JSONP serialization. |
| openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/searchIndex/SearchIndexExecutor.java | Caches per-entityType context maps and lowers executor thread priority. |
| openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/searchIndex/ReindexingMetrics.java | Caches dynamically-tagged counters to avoid repeated meter registration. |
| openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/searchIndex/OpenSearchBulkSink.java | Switches bulk document creation to POJO-backed JsonData, tweaks size estimation, and lowers flush scheduler priority. |
| openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/searchIndex/ElasticSearchBulkSink.java | Switches bulk document creation to POJO-backed JsonData, tweaks size estimation, and lowers flush scheduler priority. |
| bin/distributed-test/scripts/gc-reindex-report.sh | New script to run reindex and report GC pause / probe behavior vs a baseline. |
| String docId = entity.getId().toString(); | ||
| long estimatedSize = | ||
| (long) finalJson.getBytes(StandardCharsets.UTF_8).length | ||
| + BULK_OPERATION_METADATA_OVERHEAD; | ||
| long estimatedSize = (long) finalJson.length() + BULK_OPERATION_METADATA_OVERHEAD; | ||
|
|
||
| org.opensearch.client.json.JsonData jsonData = | ||
| embeddingsEnabled ? OsUtils.toJsonData(finalJson) : OsUtils.toJsonData(searchIndexDoc); |
There was a problem hiding this comment.
estimatedSize is compared against maxPayloadSizeBytes (bytes), but finalJson.length() counts UTF-16 code units, not UTF-8 bytes. This can significantly under-estimate payload size for non-ASCII content and delay flushing past the configured byte limit, increasing memory usage / risk of oversized bulk requests. Consider estimating UTF-8 byte length without allocating a byte[] (or leave estimatedSizeBytes as -1 to fall back to estimateOperationSize(operation)).
| org.opensearch.client.json.JsonData jsonData = OsUtils.toJsonData(searchIndexDoc); | ||
| String docId = entity.getId().toString(); | ||
| long estimatedSize = | ||
| (long) json.getBytes(StandardCharsets.UTF_8).length + BULK_OPERATION_METADATA_OVERHEAD; | ||
| (long) JsonUtils.pojoToJson(searchIndexDoc).length() + BULK_OPERATION_METADATA_OVERHEAD; | ||
|
|
There was a problem hiding this comment.
estimatedSize is intended to be bytes (used vs maxPayloadSizeBytes), but JsonUtils.pojoToJson(searchIndexDoc).length() returns character count. For entities with non-ASCII text this will undercount, potentially causing the buffer to exceed the byte limit before flushing.
| .docAsUpsert(true))); | ||
| } | ||
| long estimatedSize = | ||
| (long) json.getBytes(StandardCharsets.UTF_8).length + BULK_OPERATION_METADATA_OVERHEAD; | ||
| (long) JsonUtils.pojoToJson(searchIndexDoc).length() + BULK_OPERATION_METADATA_OVERHEAD; | ||
| columnBulkProcessor.add(operation, docId, Entity.TABLE_COLUMN, null, estimatedSize); |
There was a problem hiding this comment.
Same sizing issue here: JsonUtils.pojoToJson(searchIndexDoc).length() is not a byte count, but it is used as estimatedSizeBytes against maxPayloadSizeBytes. This can under-estimate payload size and reduce the effectiveness of the bulk payload limiter.
| Object searchIndexDoc = Entity.buildSearchIndex(entityType, entity).buildSearchIndexDoc(); | ||
| String json = JsonUtils.pojoToJson(searchIndexDoc); | ||
| es.co.elastic.clients.json.JsonData jsonData = EsUtils.toJsonData(searchIndexDoc); | ||
| String docId = entity.getId().toString(); | ||
| long estimatedSize = | ||
| (long) json.getBytes(StandardCharsets.UTF_8).length + BULK_OPERATION_METADATA_OVERHEAD; | ||
| (long) JsonUtils.pojoToJson(searchIndexDoc).length() + BULK_OPERATION_METADATA_OVERHEAD; | ||
|
|
There was a problem hiding this comment.
estimatedSize is treated as bytes (compared against maxPayloadSizeBytes), but JsonUtils.pojoToJson(searchIndexDoc).length() returns character count. This can undercount for non-ASCII content and let the buffer exceed the configured byte limit before flushing.
| String docId = entity.getId().toString(); | ||
| long estimatedSize = | ||
| (long) json.getBytes(StandardCharsets.UTF_8).length + BULK_OPERATION_METADATA_OVERHEAD; | ||
| (long) JsonUtils.pojoToJson(searchIndexDoc).length() + BULK_OPERATION_METADATA_OVERHEAD; |
There was a problem hiding this comment.
Same byte-vs-char sizing issue: JsonUtils.pojoToJson(searchIndexDoc).length() is a character count but is passed as estimatedSizeBytes and compared against maxPayloadSizeBytes.
| (long) JsonUtils.pojoToJson(searchIndexDoc).length() + BULK_OPERATION_METADATA_OVERHEAD; | |
| (long) | |
| JsonUtils.pojoToJson(searchIndexDoc).getBytes(StandardCharsets.UTF_8).length | |
| + BULK_OPERATION_METADATA_OVERHEAD; |
| .action(a -> a.doc(jsonData).docAsUpsert(true)))); | ||
| } | ||
| long estimatedSize = | ||
| (long) json.getBytes(StandardCharsets.UTF_8).length + BULK_OPERATION_METADATA_OVERHEAD; | ||
| (long) JsonUtils.pojoToJson(searchIndexDoc).length() + BULK_OPERATION_METADATA_OVERHEAD; | ||
| columnBulkProcessor.add(operation, docId, Entity.TABLE_COLUMN, null, estimatedSize); |
There was a problem hiding this comment.
Same sizing issue for column indexing: JsonUtils.pojoToJson(searchIndexDoc).length() returns chars, not UTF-8 bytes, but is used as estimatedSizeBytes for bulk payload limiting.
Reverted : Cherry Pick : Fix: Resolve text fields to .keyword for ES/OS sorting and aggregation (#27103)
| (long) finalJson.getBytes(StandardCharsets.UTF_8).length | ||
| + BULK_OPERATION_METADATA_OVERHEAD; | ||
|
|
||
| org.opensearch.client.json.JsonData jsonData = OsUtils.toJsonData(finalJson); |
There was a problem hiding this comment.
The variable is declared as org.opensearch.client.json.JsonData, but OsUtils.toJsonData(...) returns the relocated OpenSearch client type (os.org.opensearch.client.json.JsonData). This mismatch will not compile and should use the same os.org.opensearch.client.json.JsonData type as the rest of the OpenSearch client usage in this file.
| org.opensearch.client.json.JsonData jsonData = OsUtils.toJsonData(finalJson); | |
| os.org.opensearch.client.json.JsonData jsonData = OsUtils.toJsonData(finalJson); |
| ColumnSearchIndex columnIndex = new ColumnSearchIndex(column, table); | ||
| Map<String, Object> searchIndexDoc = columnIndex.buildSearchIndexDoc(); | ||
| String json = JsonUtils.pojoToJson(searchIndexDoc); | ||
| org.opensearch.client.json.JsonData jsonData = OsUtils.toJsonData(json); |
There was a problem hiding this comment.
Same JsonData type mismatch as above: org.opensearch.client.json.JsonData is inconsistent with the relocated OpenSearch client (os.org.opensearch.client.*) used elsewhere in this class and returned by OsUtils.toJsonData(...). This should be the relocated JsonData type to compile.
| org.opensearch.client.json.JsonData jsonData = OsUtils.toJsonData(json); | |
| os.org.opensearch.client.json.JsonData jsonData = OsUtils.toJsonData(json); |
| // Sync app_extension_time_series so the UI reflects FAILED instead of RUNNING. | ||
| // OmAppJobListener.jobWasExecuted() is bypassed during recovery (no Quartz context), | ||
| // so we update the time-series record here directly. | ||
| try { | ||
| collectionDAO | ||
| .appExtensionTimeSeriesDao() | ||
| .markRunningEntriesFailedByName(SEARCH_INDEX_APP_NAME); | ||
| } catch (Exception e) { | ||
| LOG.warn("Failed to update app_extension_time_series for failed job {}", job.getId(), e); | ||
| } |
There was a problem hiding this comment.
The comment/log refer to app_extension_time_series, but the actual table/DAO is apps_extension_time_series (plural). Please align the wording to the real table name to avoid confusion when debugging.
| } | ||
| } | ||
|
|
||
| public static JsonData toJsonData(Object pojo) { |
There was a problem hiding this comment.
The new toJsonData(Object pojo) overload can accidentally serialize a JSON string as a JSON string literal (e.g., when the value is typed as Object but contains a String), instead of parsing it like toJsonData(String) does. Consider guarding instanceof String and delegating to the String overload, or renaming the POJO method to avoid ambiguous overload resolution.
| public static JsonData toJsonData(Object pojo) { | |
| public static JsonData toJsonData(Object pojo) { | |
| if (pojo instanceof String) { | |
| return toJsonData((String) pojo); | |
| } |
| } | ||
| } | ||
|
|
||
| public static JsonData toJsonData(Object pojo) { |
There was a problem hiding this comment.
The new toJsonData(Object pojo) overload is easy to misuse when callers have an Object that happens to hold a JSON String: it will serialize the string as a JSON scalar rather than parsing it (unlike toJsonData(String)). To avoid subtle indexing bugs, consider delegating to the String overload when pojo instanceof String, or rename the POJO conversion method to something non-overloaded.
| public static JsonData toJsonData(Object pojo) { | |
| public static JsonData toJsonData(Object pojo) { | |
| if (pojo instanceof String) { | |
| return toJsonData((String) pojo); | |
| } |
| @ConnectionAwareSqlUpdate( | ||
| value = | ||
| "UPDATE apps_extension_time_series SET json = jsonb_set(json, '{status}', '\"failed\"') WHERE appName = :appName AND json->>'status' = 'running' AND extension = 'status'", | ||
| connectionType = POSTGRES) |
There was a problem hiding this comment.
This introduces another jsonb_set(..., '\"failed\"') literal for Postgres status updates. That style is fairly hard to reason about and easy to break when editing (double-escaping across Java+SQL), so it would be safer/clearer to use an explicit jsonb value in the SQL (e.g., casting a JSON string to jsonb or using to_jsonb) instead of relying on embedded quote/backslash escaping.
… case search indexing Commit 2839bc2 cherry-picked PR #27153 ("Improve memory usage for reindex") to 1.12.5, bundled with the revert of PR #27103. PR #27153 is still open/unmerged on main and introduced 6 extra file changes that were never meant for 1.12.5. The bulk sink change from `toJsonData(json_string)` to `toJsonData(map_object)` bypassed Jackson's WRITE_DATES_AS_TIMESTAMPS=false setting. During reindex, java.util.Date fields (like tags.appliedAt) were sent as raw epoch Longs instead of ISO strings. OpenSearch dynamically mapped tags.appliedAt as "long". Later, real-time indexing (test case creation via API) sent appliedAt as ISO string — OpenSearch rejected it with mapper_parsing_exception. Test cases with tags were created in DB but silently never indexed in search. This restores all 6 files to their pre-2839bc259f state, matching main. Files restored: - ElasticSearchBulkSink.java — back to pojoToJson → string → toJsonData(string) - OpenSearchBulkSink.java — same - EsUtils.java — removed toJsonData(Object) overload - OsUtils.java — same - SearchIndexExecutor.java — removed contextDataCache, Thread.MIN_PRIORITY - ReindexingMetrics.java — removed counter caching Requires reindex after deployment.
…e test case search indexing Commit 2839bc2 cherry-picked PR #27153 ("Improve memory usage for reindex") to 1.12.5, bundled with the revert of PR #27103. PR #27153 is still open/unmerged on main and introduced extra file changes that were never meant for 1.12.5. The bulk sink change from toJsonData(json_string) to toJsonData(map_object) bypassed Jackson's WRITE_DATES_AS_TIMESTAMPS=false setting. During reindex, java.util.Date fields (like tags.appliedAt) were sent as raw epoch Longs instead of ISO strings. OpenSearch dynamically mapped tags.appliedAt as "long". Later, real-time indexing (test case creation via API) sent appliedAt as ISO string — OpenSearch rejected it with mapper_parsing_exception. Test cases with tags were created in DB but silently never indexed in search. Changes: - ElasticSearchBulkSink/OpenSearchBulkSink: reverted serialization back to pojoToJson → string → toJsonData(string), kept Thread.MIN_PRIORITY - EsUtils/OsUtils: removed toJsonData(Object) overload - SearchIndexExecutor: removed contextDataCache, kept Thread.MIN_PRIORITY - ReindexingMetrics: removed counter caching Requires reindex after deployment.
…e test case search indexing (#27202) Commit 2839bc2 cherry-picked PR #27153 ("Improve memory usage for reindex") to 1.12.5, bundled with the revert of PR #27103. PR #27153 is still open/unmerged on main and introduced extra file changes that were never meant for 1.12.5. The bulk sink change from toJsonData(json_string) to toJsonData(map_object) bypassed Jackson's WRITE_DATES_AS_TIMESTAMPS=false setting. During reindex, java.util.Date fields (like tags.appliedAt) were sent as raw epoch Longs instead of ISO strings. OpenSearch dynamically mapped tags.appliedAt as "long". Later, real-time indexing (test case creation via API) sent appliedAt as ISO string — OpenSearch rejected it with mapper_parsing_exception. Test cases with tags were created in DB but silently never indexed in search. Changes: - ElasticSearchBulkSink/OpenSearchBulkSink: reverted serialization back to pojoToJson → string → toJsonData(string), kept Thread.MIN_PRIORITY - EsUtils/OsUtils: removed toJsonData(Object) overload - SearchIndexExecutor: removed contextDataCache, kept Thread.MIN_PRIORITY - ReindexingMetrics: removed counter caching Requires reindex after deployment.
Code Review ✅ Approved 1 resolved / 1 findingsImproves memory usage for the reindex process by eliminating double serialization during size estimation and lowering thread priority. No issues found. ✅ 1 resolved✅ Performance: Double serialization negates memory optimization for size estimation
OptionsDisplay: compact → Showing less information. Comment with these commands to change:
Was this helpful? React with 👍 / 👎 | Gitar |
| // Sync app_extension_time_series so the UI reflects FAILED instead of RUNNING. | ||
| // OmAppJobListener.jobWasExecuted() is bypassed during recovery (no Quartz context), | ||
| // so we update the time-series record here directly. | ||
| try { | ||
| collectionDAO | ||
| .appExtensionTimeSeriesDao() | ||
| .markRunningEntriesFailedByName(SEARCH_INDEX_APP_NAME); | ||
| } catch (Exception e) { | ||
| LOG.warn("Failed to update app_extension_time_series for failed job {}", job.getId(), e); | ||
| } |
There was a problem hiding this comment.
JobRecoveryManager now updates apps_extension_time_series via markRunningEntriesFailedByName() when failing an orphaned job, but the existing JobRecoveryManager tests don’t cover this new side-effect. Add/extend a unit test to verify the DAO method is invoked on failJob() and that exceptions from this update are swallowed as intended (since the call is wrapped in a try/catch).
|
🔴 Playwright Results — 1 failure(s), 25 flaky✅ 3598 passed · ❌ 1 failed · 🟡 25 flaky · ⏭️ 207 skipped
Genuine Failures (failed on all attempts)❌
|



Describe your changes:
Fixes
I worked on ... because ...
Type of change:
Checklist:
Fixes <issue-number>: <short explanation>Summary by Gitar
ReindexingMetricsfor stage and promotion counters to avoid creating duplicate metric instancescontextDataCacheinSearchIndexExecutorto reuse context data across entity typesElasticSearchBulkSinkandOpenSearchBulkSinkto prevent redundant conversionsThread.MIN_PRIORITYfor job, consumer, and producer executor threads in reindex processThread.MIN_PRIORITYfor bulk flush scheduler threads in both Elasticsearch and OpenSearch sinksapp_extension_time_seriesinJobRecoveryManagermarkRunningEntriesFailedByName()method toCollectionDAOfor updating job statustoJsonData(Object pojo)overload methods inEsUtilsandOsUtilsfor flexible JSON conversiongc-reindex-report.shscript for monitoring GC pause statistics during reindex runsThis will update automatically on new commits.