-
Notifications
You must be signed in to change notification settings - Fork 6
[NAE-2136] Speed up Elasticsearch reindex #327
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
WalkthroughCentralizes Jackson versioning and adds jitpack repository; introduces CaseField and custom LocalDateTime (de)serializers; refactors Elasticsearch configuration and adds an elasticCaseObjectMapper; implements a new ElasticIndexService with bulk reindexing, controller endpoint and IndexParams; updates reindexing flow, properties, and related repository method. Changes
Sequence Diagram(s)sequenceDiagram
autonumber
participant Client
participant ElasticController
participant IElasticIndexService
participant ElasticIndexService
participant MongoTemplate
participant ElasticCaseRepo
participant ElasticTaskRepo
participant ESClient
Client->>ElasticController: POST /api/elastic/reindex/bulk (IndexParams)
ElasticController->>IElasticIndexService: bulkIndex(indexAll, after=null, caseBatchSize, taskBatchSize)
IElasticIndexService->>ElasticIndexService: bulkIndex(...)
ElasticIndexService->>MongoTemplate: Query Cases (all or lastModified > after)
MongoTemplate-->>ElasticIndexService: Case batches
loop per case batch
ElasticIndexService->>ElasticCaseRepo: load/prepare existing
ElasticIndexService->>ESClient: Bulk UpdateRequest upsert (cases)
ESClient-->>ElasticIndexService: BulkResponse
ElasticIndexService->>ElasticTaskRepo: load/prepare tasks
ElasticIndexService->>ESClient: Bulk UpdateRequest upsert (tasks)
ESClient-->>ElasticIndexService: BulkResponse
end
ElasticIndexService-->>ElasticController: complete
ElasticController-->>Client: MessageResource success
sequenceDiagram
autonumber
participant Scheduler
participant ReindexingTask
participant IElasticIndexService
Scheduler->>ReindexingTask: trigger reindex()
ReindexingTask->>IElasticIndexService: bulkIndex(false, lastRun, null, null)
IElasticIndexService-->>ReindexingTask: complete
ReindexingTask->>ReindexingTask: update lastRun
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes Possibly related PRs
Tip 🔌 Remote MCP (Model Context Protocol) integration is now available!Pro plan users can now connect to remote MCP servers from the Integrations page. Connect with popular remote MCPs such as Notion and Linear to add more context to your reviews and chats. ✨ Finishing Touches🧪 Generate unit tests
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
SupportNeed help? Create a ticket on our support page for assistance with any issues or questions. CodeRabbit Commands (Invoked using PR/Issue comments)Type Other keywords and placeholders
CodeRabbit Configuration File (
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 10
🔭 Outside diff range comments (1)
pom.xml (1)
358-361: Update QRGen dependency coordinates or add repository
The artifactcom.github.kenglxn.qrgen:javase:2.6.0is not published to Maven Central, which is why your pipeline can’t resolve it. To fix the build failure, choose one of the following:• Add JitPack (or the repository where QRGen 2.6.0 lives) to your POM’s
<repositories>section:<repository> <id>jitpack.io</id> <url>https://jitpack.io</url> </repository>• Switch to the Central‐hosted core artifact:
<dependency> <groupId>com.github.kenglxn</groupId> <artifactId>qrgen</artifactId> <version>2.6.0</version> </dependency>• Or revert to the latest version available in Central (e.g.
javase:2.5.0).Please update the POM accordingly so the dependency can be resolved.
🧹 Nitpick comments (7)
src/main/java/com/netgrif/application/engine/elastic/web/ElasticController.java (1)
91-101: Add API documentation for consistency.The existing
reindexmethod has comprehensive OpenAPI documentation with@Operationand@ApiResponsesannotations. Consider adding similar documentation for this new endpoint to maintain consistency.+ @Operation(summary = "Reindex all cases using cursor-based approach", + description = "Caller must have the ADMIN role", + security = {@SecurityRequirement(name = "BasicAuth")}) + @ApiResponses(value = { + @ApiResponse(responseCode = "200", description = "OK"), + @ApiResponse(responseCode = "403", description = "Caller doesn't fulfill the authorisation requirements"), + }) public MessageResource cursorAllReindex() {src/main/java/com/netgrif/application/engine/elastic/service/ElasticsearchConfig.java (3)
16-18: Add public access modifier to constructor.The constructor should be explicitly marked as public for clarity and consistency with Spring best practices.
- ElasticsearchConfig(ElasticsearchProperties elasticsearchProperties) { + public ElasticsearchConfig(ElasticsearchProperties elasticsearchProperties) {
20-26: Consider adding connection resilience and authentication configuration.The current configuration creates a basic REST client without:
- Connection pooling
- Retry mechanisms
- Authentication (if required)
- Multiple hosts for high availability
Consider enhancing the configuration for production environments.
25-26: Remove unnecessary empty line.return new ElasticsearchClient(transport); - }src/main/java/com/netgrif/application/engine/elastic/service/BulkService.java (2)
44-50: Add public access modifier to constructor.The constructor should be explicitly marked as public for consistency.
- BulkService (@Qualifier("elasticsearchClient") ElasticsearchClient elasticsearchClient, + public BulkService(@Qualifier("elasticsearchClient") ElasticsearchClient elasticsearchClient,
123-130: Consider more specific exception handling.The generic
Exceptioncatch might hide specific issues. Consider catching more specific exceptions.private void executeAndValidate(BulkRequest request) { try { BulkResponse response = esClient.bulk(request); checkForBulkUpdateFailure(response); + } catch (IOException e) { + log.error("IO error during bulk indexing: {}", e.getMessage(), e); + throw new ElasticsearchException("Failed to execute bulk request", e); } catch (Exception e) { - log.error("Failed to index bulk " + e.getMessage(), e); + log.error("Unexpected error during bulk indexing: {}", e.getMessage(), e); + throw new ElasticsearchException("Unexpected error during bulk request", e); } }src/main/java/com/netgrif/application/engine/elastic/service/ReindexingTask.java (1)
131-135: Consider batch loading PetriNets to avoid N+1 queries.Loading PetriNets individually in a loop can cause performance issues with many cases.
- casesToIndex.forEach(c -> { - if (c.getPetriNet() == null) { - c.setPetriNet(petriNetService.get(c.getPetriNetObjectId())); - } - }); + // Collect all PetriNet IDs that need loading + Set<ObjectId> petriNetIds = casesToIndex.stream() + .filter(c -> c.getPetriNet() == null && c.getPetriNetObjectId() != null) + .map(Case::getPetriNetObjectId) + .collect(Collectors.toSet()); + + // Batch load PetriNets + Map<ObjectId, PetriNet> petriNets = petriNetService.findAllById(petriNetIds); + + // Assign PetriNets to cases + casesToIndex.forEach(c -> { + if (c.getPetriNet() == null && c.getPetriNetObjectId() != null) { + c.setPetriNet(petriNets.get(c.getPetriNetObjectId())); + } + });
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (14)
pom.xml(1 hunks)src/main/java/com/netgrif/application/engine/elastic/domain/CaseField.java(1 hunks)src/main/java/com/netgrif/application/engine/elastic/domain/ElasticCase.java(2 hunks)src/main/java/com/netgrif/application/engine/elastic/serializer/LocalDateTimeJsonDeserializer.java(1 hunks)src/main/java/com/netgrif/application/engine/elastic/serializer/LocalDateTimeJsonSerializer.java(1 hunks)src/main/java/com/netgrif/application/engine/elastic/service/BulkService.java(1 hunks)src/main/java/com/netgrif/application/engine/elastic/service/ElasticCaseMappingService.java(3 hunks)src/main/java/com/netgrif/application/engine/elastic/service/ElasticSearchJsonpMapper.java(1 hunks)src/main/java/com/netgrif/application/engine/elastic/service/ElasticsearchConfig.java(1 hunks)src/main/java/com/netgrif/application/engine/elastic/service/ReindexingTask.java(6 hunks)src/main/java/com/netgrif/application/engine/elastic/service/interfaces/IBulkService.java(1 hunks)src/main/java/com/netgrif/application/engine/elastic/web/ElasticController.java(1 hunks)src/main/java/com/netgrif/application/engine/workflow/domain/repositories/TaskRepository.java(1 hunks)src/main/resources/application.properties(1 hunks)
🧰 Additional context used
🧬 Code Graph Analysis (3)
src/main/java/com/netgrif/application/engine/elastic/service/ElasticSearchJsonpMapper.java (2)
src/main/java/com/netgrif/application/engine/elastic/serializer/LocalDateTimeJsonDeserializer.java (1)
LocalDateTimeJsonDeserializer(13-25)src/main/java/com/netgrif/application/engine/elastic/serializer/LocalDateTimeJsonSerializer.java (1)
LocalDateTimeJsonSerializer(11-18)
src/main/java/com/netgrif/application/engine/elastic/domain/CaseField.java (1)
src/main/java/com/netgrif/application/engine/elastic/domain/ElasticCase.java (1)
Data(26-156)
src/main/java/com/netgrif/application/engine/elastic/service/ReindexingTask.java (1)
src/main/java/com/netgrif/application/engine/elastic/service/executors/Executor.java (1)
Component(15-65)
🪛 GitHub Actions: Pull Request
pom.xml
[error] 1-1: Maven build failed due to unresolved dependency: com.github.kenglxn.qrgen:javase:jar:2.6.0 not found in central repository.
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
- GitHub Check: task-list-completed
🔇 Additional comments (11)
pom.xml (1)
369-374: LGTM - Jackson JSR310 dependency addition is appropriate.The addition of
jackson-datatype-jsr310dependency properly supports Java 8 Date/Time serialization, which aligns with the custom serializers mentioned in the AI summary.src/main/java/com/netgrif/application/engine/workflow/domain/repositories/TaskRepository.java (1)
19-19: LGTM - Well-designed bulk retrieval method.The new
findAllByCaseIdInmethod follows Spring Data JPA conventions and supports the bulk indexing functionality mentioned in the AI summary. The method signature is consistent with existing bulk methods in the repository.src/main/resources/application.properties (1)
44-44: Significant thread pool increase - ensure adequate system resources.The executor size increased from 20 to 150 threads (7.5x increase), which should improve reindexing performance but may impact system resources.
Please verify that:
- The system has sufficient memory and CPU resources for 150 concurrent threads
- The Elasticsearch cluster can handle the increased concurrent load
- Monitoring is in place to track thread pool utilization and performance
You may want to consider a gradual increase (e.g., 50-100) initially and monitor performance before reaching 150.
src/main/java/com/netgrif/application/engine/elastic/domain/ElasticCase.java (2)
17-17: LGTM - Added import for ChronoUnit.The import is appropriately added to support the truncation operation in the constructor.
119-119: Date truncation matches Elasticsearch mappingIn
ElasticCase.java:
- The
@Field(type = FieldType.Date, format = DateFormat.date_hour_minute_second_millis)annotation (line 56) already enforces millisecond precision.- The new
truncatedTo(ChronoUnit.MILLIS)call (line 119) now exactly matches that format.creationDateSortable(line 120) continues to store epoch‐millis without change.No further code updates are required. As a next step:
- Verify your live Elasticsearch index uses
date_hour_minute_second_millis.- If any existing documents were indexed with sub-millisecond precision, perform a one-time reindex to ensure consistency.
src/main/java/com/netgrif/application/engine/elastic/service/ElasticSearchJsonpMapper.java (1)
12-28: LGTM! Clean implementation of custom JSON mapping for Elasticsearch.The implementation correctly:
- Extends JacksonJsonpMapper for Elasticsearch integration
- Configures custom LocalDateTime serialization with millisecond precision
- Uses a clean static factory method pattern
- Properly disables timestamp writing for better search compatibility
src/main/java/com/netgrif/application/engine/elastic/serializer/LocalDateTimeJsonSerializer.java (1)
11-18: LGTM! Well-implemented custom LocalDateTime serializer.The implementation correctly:
- Uses ISO-8601 format with millisecond precision
- Follows Jackson serializer patterns
- Handles the serialization cleanly with appropriate exception handling
src/main/java/com/netgrif/application/engine/elastic/service/ElasticCaseMappingService.java (1)
81-82: LGTM! Consistent pattern for field type checking.The conditional logic follows the same pattern as other field type transformations in the method.
src/main/java/com/netgrif/application/engine/elastic/serializer/LocalDateTimeJsonDeserializer.java (1)
14-19: LGTM! Robust flexible formatter implementation.The DateTimeFormatterBuilder approach correctly handles optional milliseconds with 1-3 digit precision, making it compatible with various input formats while maintaining consistency with the serializer output.
src/main/java/com/netgrif/application/engine/elastic/domain/CaseField.java (1)
21-24: Consider the string representation format.Using
value.toString()on a List will produce a format like[item1, item2, item3]. Ensure this is the intended format for the parent class.Consider using a more explicit string conversion if a specific format is required:
public CaseField(List<String> value) { super(String.join(", ", value)); // or another delimiter this.caseValue = value; }src/main/java/com/netgrif/application/engine/elastic/service/ReindexingTask.java (1)
139-142: TaskRepository method verified
ThefindAllByCaseIdIn(Collection<String>)method is present insrc/main/java/com/netgrif/application/engine/workflow/domain/repositories/TaskRepository.java. No further action required.
src/main/java/com/netgrif/application/engine/elastic/service/interfaces/IBulkService.java
Outdated
Show resolved
Hide resolved
src/main/java/com/netgrif/application/engine/elastic/web/ElasticController.java
Outdated
Show resolved
Hide resolved
src/main/java/com/netgrif/application/engine/elastic/web/ElasticController.java
Outdated
Show resolved
Hide resolved
src/main/java/com/netgrif/application/engine/elastic/service/ElasticCaseMappingService.java
Show resolved
Hide resolved
...in/java/com/netgrif/application/engine/elastic/serializer/LocalDateTimeJsonDeserializer.java
Show resolved
Hide resolved
src/main/java/com/netgrif/application/engine/elastic/domain/CaseField.java
Outdated
Show resolved
Hide resolved
src/main/java/com/netgrif/application/engine/elastic/domain/CaseField.java
Outdated
Show resolved
Hide resolved
src/main/java/com/netgrif/application/engine/elastic/service/BulkService.java
Outdated
Show resolved
Hide resolved
src/main/java/com/netgrif/application/engine/elastic/service/ReindexingTask.java
Outdated
Show resolved
Hide resolved
src/main/java/com/netgrif/application/engine/elastic/service/ReindexingTask.java
Outdated
Show resolved
Hide resolved
src/main/java/com/netgrif/application/engine/elastic/service/BulkService.java
Outdated
Show resolved
Hide resolved
src/main/java/com/netgrif/application/engine/elastic/service/BulkService.java
Outdated
Show resolved
Hide resolved
src/main/java/com/netgrif/application/engine/elastic/service/BulkService.java
Outdated
Show resolved
Hide resolved
src/main/java/com/netgrif/application/engine/elastic/service/ElasticsearchConfig.java
Outdated
Show resolved
Hide resolved
src/main/java/com/netgrif/application/engine/elastic/service/ReindexingTask.java
Outdated
Show resolved
Hide resolved
src/main/java/com/netgrif/application/engine/elastic/service/ReindexingTask.java
Outdated
Show resolved
Hide resolved
…f bulk indexes set
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 3
♻️ Duplicate comments (1)
src/main/java/com/netgrif/application/engine/elastic/service/ReindexingTask.java (1)
97-99: Remove unused predicate variable.The
predicatevariable is declared but never used in the refactored implementation. This matches the previous review feedback.- BooleanExpression predicate = QCase.case$.lastModified.before(now).and(QCase.case$.lastModified.after(lastRun.minusMinutes(2)));
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (9)
pom.xml(1 hunks)src/main/java/com/netgrif/application/engine/configuration/ElasticsearchConfiguration.java(2 hunks)src/main/java/com/netgrif/application/engine/elastic/domain/CaseField.java(1 hunks)src/main/java/com/netgrif/application/engine/elastic/serializer/LocalDateTimeJsonDeserializer.java(1 hunks)src/main/java/com/netgrif/application/engine/elastic/service/BulkService.java(1 hunks)src/main/java/com/netgrif/application/engine/elastic/service/ReindexingTask.java(5 hunks)src/main/java/com/netgrif/application/engine/elastic/service/interfaces/IBulkService.java(1 hunks)src/main/java/com/netgrif/application/engine/elastic/web/ElasticController.java(2 hunks)src/main/resources/application.properties(1 hunks)
🚧 Files skipped from review as they are similar to previous changes (6)
- pom.xml
- src/main/java/com/netgrif/application/engine/elastic/service/interfaces/IBulkService.java
- src/main/java/com/netgrif/application/engine/elastic/web/ElasticController.java
- src/main/java/com/netgrif/application/engine/elastic/domain/CaseField.java
- src/main/java/com/netgrif/application/engine/elastic/serializer/LocalDateTimeJsonDeserializer.java
- src/main/java/com/netgrif/application/engine/elastic/service/BulkService.java
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (7)
- GitHub Check: task-list-completed
- GitHub Check: task-list-completed
- GitHub Check: task-list-completed
- GitHub Check: task-list-completed
- GitHub Check: task-list-completed
- GitHub Check: task-list-completed
- GitHub Check: task-list-completed
🔇 Additional comments (11)
src/main/resources/application.properties (1)
44-45: Consider validating the default bulk size values.The configurable approach for separate case and task bulk sizes is excellent and addresses the previous feedback about making bulk sizes configurable. However, the default values (5100 for cases, 20000 for tasks) seem quite high and may need validation.
Consider:
- Testing these values under different load conditions to ensure optimal performance
- Adding validation or documentation about the 100MB Elasticsearch bulk limit
- Starting with more conservative defaults and allowing configuration based on deployment needs
src/main/java/com/netgrif/application/engine/elastic/service/ReindexingTask.java (9)
4-5: LGTM on new import additions.The new imports for
interfaces.*andPetriNetServiceare properly added to support the refactored functionality.
22-25: LGTM on MongoDB streaming imports.The new MongoDB-related imports (
MongoTemplate,Criteria,Query,CloseableIterator) are correctly added to support the streaming approach.
32-35: LGTM on utility imports.The additional imports for
ArrayList,AtomicInteger, andCollectorsare appropriate for the new batch processing logic.
43-55: LGTM on field declarations with final modifiers.Good practice making the injected dependencies
finalto ensure immutability. The addition ofMongoTemplate,PetriNetService, andIBulkServicefields properly supports the refactored functionality.
69-72: LGTM on constructor parameter additions.The new constructor parameters are properly added and the configuration property name correctly references the case size property from the configuration file.
82-85: LGTM on field initialization.The new fields are properly initialized in the constructor.
95-95: LGTM on improved logging.The parameterized logging message is a good improvement for performance and readability.
141-160: LGTM on the new batch processing method.The
reindexCasesBatchmethod is well-structured:
- Properly filters cases that need reindexing
- Handles PetriNet loading for cases that need it
- Uses the new bulk service for efficient indexing
- Processes related tasks in bulk as well
This addresses the previous feedback about using bulk operations and is a significant performance improvement.
163-163: LGTM on improved logging.The parameterized logging message is consistent with other improvements in the file.
src/main/java/com/netgrif/application/engine/configuration/ElasticsearchConfiguration.java (1)
3-8: LGTM! Imports are necessary for the new Elasticsearch client.The new imports correctly support the ElasticsearchClient implementation and follow standard Java conventions.
src/main/java/com/netgrif/application/engine/configuration/ElasticsearchConfiguration.java
Outdated
Show resolved
Hide resolved
src/main/java/com/netgrif/application/engine/elastic/service/ReindexingTask.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
♻️ Duplicate comments (3)
src/main/java/com/netgrif/application/engine/elastic/service/BulkService.java (2)
63-89: Add input validation and avoid modifying input parameters.The method still has the issues flagged in previous reviews:
- Missing null/empty check for the input list
- Modifying input
Caseobjects by settinglastModified, causing side effectsApply this fix:
@Override public void bulkIndexCases(List<Case> cases) { + if (cases == null || cases.isEmpty()) return; + BulkRequest.Builder builder = new BulkRequest.Builder(); for (Case c : cases) { try { - if (c.getLastModified() == null) - c.setLastModified(LocalDateTime.now()); + // Use a local variable instead of modifying the input object + LocalDateTime lastModified = c.getLastModified() != null ? + c.getLastModified() : LocalDateTime.now(); + + // Handle this in the mapping service or create a copy if needed ElasticCase doc = elasticCaseMappingService.transform(c);
107-107: Use parameterized logging instead of string concatenation.As mentioned in previous reviews, avoid string concatenation in log messages.
Apply this fix:
-log.info("Reindexing task page {} / {}", i / taskBatchSize, totalSize / taskBatchSize); +log.info("Reindexing task page {} / {}", (i / taskBatchSize) + 1, (totalSize + taskBatchSize - 1) / taskBatchSize);Also fix the division logic to show correct page numbers (1-based instead of 0-based).
src/main/java/com/netgrif/application/engine/elastic/service/ReindexingTask.java (1)
97-99: Remove unused predicate variable.The
predicatevariable is declared but never used in the refactored MongoDB streaming implementation.Apply this fix:
LocalDateTime now = LocalDateTime.now(); - BooleanExpression predicate = QCase.case$.lastModified.before(now).and(QCase.case$.lastModified.after(lastRun.minusMinutes(2))); LocalDateTime lastRunOld = lastRun;Also update the count query to use MongoDB instead of QueryDSL:
- long count = caseRepository.count(predicate); + Query countQuery = new Query(); + countQuery.addCriteria(Criteria.where("lastModified").lt(now).gt(lastRunOld.minusMinutes(2))); + long count = mongoTemplate.count(countQuery, Case.class);
🧹 Nitpick comments (2)
src/main/java/com/netgrif/application/engine/elastic/service/BulkService.java (1)
119-119: Use parameterized logging for error messages.Avoid string concatenation in log statements for better performance and consistency.
Apply this fix:
-log.error("Failed to index bulk {}", e.getMessage(), e); +log.error("Failed to index bulk: {}", e.getMessage(), e);src/main/java/com/netgrif/application/engine/elastic/service/ReindexingTask.java (1)
95-95: Use parameterized logging consistently.Apply the same logging format consistency as mentioned in previous reviews.
Apply this fix:
-log.info("Reindexing stale cases: started reindexing after {}", lastRun); +log.info("Reindexing stale cases: started reindexing after: {}", lastRun);
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (9)
pom.xml(1 hunks)src/main/java/com/netgrif/application/engine/configuration/ElasticsearchConfiguration.java(2 hunks)src/main/java/com/netgrif/application/engine/elastic/domain/CaseField.java(1 hunks)src/main/java/com/netgrif/application/engine/elastic/serializer/LocalDateTimeJsonDeserializer.java(1 hunks)src/main/java/com/netgrif/application/engine/elastic/service/BulkService.java(1 hunks)src/main/java/com/netgrif/application/engine/elastic/service/ReindexingTask.java(5 hunks)src/main/java/com/netgrif/application/engine/elastic/service/interfaces/IBulkService.java(1 hunks)src/main/java/com/netgrif/application/engine/elastic/web/ElasticController.java(2 hunks)src/main/resources/application.properties(1 hunks)
🚧 Files skipped from review as they are similar to previous changes (5)
- pom.xml
- src/main/java/com/netgrif/application/engine/elastic/web/ElasticController.java
- src/main/java/com/netgrif/application/engine/elastic/domain/CaseField.java
- src/main/java/com/netgrif/application/engine/elastic/serializer/LocalDateTimeJsonDeserializer.java
- src/main/java/com/netgrif/application/engine/elastic/service/interfaces/IBulkService.java
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (9)
- GitHub Check: task-list-completed
- GitHub Check: task-list-completed
- GitHub Check: task-list-completed
- GitHub Check: task-list-completed
- GitHub Check: task-list-completed
- GitHub Check: task-list-completed
- GitHub Check: task-list-completed
- GitHub Check: task-list-completed
- GitHub Check: task-list-completed
🔇 Additional comments (2)
src/main/resources/application.properties (1)
44-45: LGTM! Configuration aligns with bulk indexing architecture.The split configuration for case and task batch sizes is a good architectural decision that allows separate optimization for different entity types. The significantly larger batch sizes (5100 for cases, 20000 for tasks) should improve reindexing performance compared to the previous single size of 20.
src/main/java/com/netgrif/application/engine/configuration/ElasticsearchConfiguration.java (1)
88-94: LGTM! Well-structured Elasticsearch client configuration.The new
ElasticsearchClientbean is properly configured with:
- Consistent use of configured URL and port from application properties
- Proper abstraction with
RestClientTransport- Custom JSON mapper for handling date/time serialization
- Good integration alongside the existing
RestHighLevelClient
src/main/java/com/netgrif/application/engine/elastic/service/ReindexingTask.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
♻️ Duplicate comments (4)
src/main/java/com/netgrif/application/engine/elastic/service/BulkService.java (3)
64-89: Add null/empty check and avoid modifying input parameters.The method still lacks null/empty checks and modifies input
Caseobjects by settinglastModified, causing side effects as previously noted.@Override public void bulkIndexCases(List<Case> cases) { + if (cases == null || cases.isEmpty()) { + log.info("No cases to index"); + return; + } + + log.info("Indexing {} cases", cases.size()); + List<BulkOperation> operations = new ArrayList<>(); for (Case c : cases) { try { - if (c.getLastModified() == null) - c.setLastModified(LocalDateTime.now()); + // Don't modify the original case object + LocalDateTime lastModified = c.getLastModified() != null ? + c.getLastModified() : LocalDateTime.now(); ElasticCase doc = elasticCaseMappingService.transform(c); + // Handle lastModified in the mapping service or create a copy
95-96: Use parameterized logging instead of string concatenation.Follow the logging format suggestion from past review to use parameterized messages consistently.
- log.info("Reindexing task page {} / {}", i / taskBatchSize, totalSize / taskBatchSize); + log.info("Reindexing task page {} / {}", (i / taskBatchSize) + 1, (totalSize + taskBatchSize - 1) / taskBatchSize);
36-40: Consider increasing default batch sizes for better performance.The default batch sizes of 20 for both cases and tasks seem quite small. Given Elasticsearch's 100MB bulk limit, these values could be significantly increased for better performance.
Consider increasing the defaults to more reasonable values like 500-1000, and make them configurable as suggested in past reviews.
- @Value("${spring.data.elasticsearch.reindexExecutor.caseSize:20}") + @Value("${spring.data.elasticsearch.reindexExecutor.caseSize:500}") private int caseBatchSize; - @Value("${spring.data.elasticsearch.reindexExecutor.taskSize:20}") + @Value("${spring.data.elasticsearch.reindexExecutor.taskSize:1000}") private int taskBatchSize;src/main/java/com/netgrif/application/engine/elastic/service/ReindexingTask.java (1)
121-135: Critical: Handle final incomplete batch processing.The streaming implementation doesn't process the final batch if it contains fewer than
caseBatchSizeitems, potentially leaving cases unindexed.List<Case> batch = new ArrayList<>(caseBatchSize); try (CloseableIterator<Case> cursor = mongoTemplate.stream(query, Case.class)) { cursor.stream().forEach(aCase -> { batch.add(aCase); if (batch.size() == caseBatchSize) { page.getAndIncrement(); log.info("Reindexing {} / {}", page, numOfPages); reindexCasesBatch(batch); batch.clear(); } }); + + // Process remaining partial batch + if (!batch.isEmpty()) { + page.getAndIncrement(); + log.info("Reindexing {} / {} (final batch)", page, numOfPages); + reindexCasesBatch(batch); + } }
🧹 Nitpick comments (3)
src/main/java/com/netgrif/application/engine/elastic/service/ReindexingTask.java (3)
95-95: Use parameterized logging consistently.Replace string concatenation with parameterized logging for better performance and consistency.
- log.info("Reindexing stale cases: started reindexing after {}" + lastRun); + log.info("Reindexing stale cases: started reindexing after {}", lastRun);
119-119: Remove commented code.Clean up the commented criteria line that's no longer needed.
- //query.addCriteria(Criteria.where("lastModified").lt(now).gt(lastRunOld.minusMinutes(2)));
143-144: Remove commented filtering logic.The commented filtering logic should be removed as it's no longer used.
- //List<Case> casesToIndex = casesBatch.stream().filter(it -> elasticCaseRepository.countByStringIdAndLastModified(it.getStringId(), Timestamp.valueOf(it.getLastModified()).getTime()) == 0).collect(Collectors.toList()); List<Case> casesToIndex = casesBatch;
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (2)
src/main/java/com/netgrif/application/engine/elastic/service/BulkService.java(1 hunks)src/main/java/com/netgrif/application/engine/elastic/service/ReindexingTask.java(5 hunks)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (2)
- GitHub Check: task-list-completed
- GitHub Check: task-list-completed
🔇 Additional comments (4)
src/main/java/com/netgrif/application/engine/elastic/service/BulkService.java (2)
144-157: Collection usage is appropriate despite past concern.The
failedDocumentscollection serves a valid purpose for logging failed document details, which provides valuable debugging information beyond just checking if failures exist.The implementation correctly collects failed document IDs and error reasons for comprehensive error reporting, which is essential for troubleshooting bulk indexing failures.
113-142: Excellent recursive retry mechanism with proper error handling.The recursive splitting approach for handling bulk failures is well-implemented. It properly handles single operation failures and provides good logging for troubleshooting.
The logic correctly divides failed batches in half and retries, with appropriate base case handling for single operations.
src/main/java/com/netgrif/application/engine/elastic/service/ReindexingTask.java (2)
142-162: Excellent refactoring to use BulkService.The method now properly delegates bulk indexing to the dedicated
BulkService, improving separation of concerns and maintainability. The PetriNet loading logic ensures data consistency.The implementation correctly:
- Ensures PetriNet is loaded for each case
- Uses the new bulk service for efficient indexing
- Fetches related tasks in batch and indexes them together
69-72: UnusedMongoTemplatein ReindexingTask
We’ve confirmed that
IBulkServiceis used (bulkIndexCases at L156, bulkIndexTasks at L161)PetriNetServiceis used (get(...) at L152)MongoTemplatehas no references in this classPlease remove the unused dependency (or add the necessary logic if it was intended).
• Location: src/main/java/com/netgrif/application/engine/elastic/service/ReindexingTask.java
- Constructor parameters (around L69–72)
- Corresponding
private final MongoTemplate mongoTemplate;fieldSuggested diff:
--- a/src/main/java/com/netgrif/application/engine/elastic/service/ReindexingTask.java +++ b/src/main/java/com/netgrif/application/engine/elastic/service/ReindexingTask.java @@ constructor parameters - MongoTemplate mongoTemplate,Likely an incorrect or invalid review comment.
src/main/java/com/netgrif/application/engine/elastic/service/BulkService.java
Outdated
Show resolved
Hide resolved
src/main/java/com/netgrif/application/engine/elastic/service/ReindexingTask.java
Outdated
Show resolved
Hide resolved
- **Add bulk reindexing support**:
- Introduced the `bulkIndex` method in `IElasticIndexService`, enabling reindexing of all or stale cases and tasks.
- Created a dedicated `IndexParams` class to encapsulate parameters for the reindexing process (e.g., `indexAll`, `caseBatchSize`, `taskBatchSize`).
- Optimized the index batching process for cases and tasks by leveraging configurable batch sizes.
- **Remove deprecated methods**:
- Removed the old `bulkIndex` method from `IElasticIndexService`.
- Eliminated the obsolete `IBulkService` interface.
- **Update ElasticController**:
- Changed endpoint, `/reindex/bulk`, for initiating bulk reindexing with configurable parameters.
- Updated controller logic to leverage the new `bulkIndex` method for improved performance and scalability.
- **Configuration enhancements**:
- Extended `ElasticsearchProperties` to include `IndexProperties`, allowing batch sizes for cases and tasks to be configured via properties.
- **Code cleanup**:
- Removed unused imports and annotated services with `@RequiredArgsConstructor`.
- Streamlined reindexing tasks by integrating advanced filtering and improved logging.
This commit enhances the maintainability and scalability of Elasticsearch reindexing while introducing configurability and improved documentation for reindexing processes.
- updated request param
Added detailed Javadoc comments to improve the clarity of key methods and fields in `ElasticIndexService` and `IndexParams`. This documentation provides insights into parameter usage, default values, and functionalities, facilitating better understanding and maintenance.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 7
🧹 Nitpick comments (5)
src/main/java/com/netgrif/application/engine/elastic/web/requestbodies/IndexParams.java (1)
3-4: Add bean validation import for constraintsYou’re introducing parameter-level validation in the next suggestion; ensure the Min annotation is available.
Apply this diff:
import lombok.Data; +import javax.validation.constraints.Min;src/main/java/com/netgrif/application/engine/elastic/service/interfaces/IElasticIndexService.java (1)
46-46: Clarify contract of bulkIndex with Javadoc and null-handlingThe new API is clear, but lacks documentation on:
- Semantics of indexAll vs lastRun (e.g., when indexAll=true, is lastRun ignored?).
- How null batch sizes are handled (fallback to configuration?).
- Expected behavior (synchronous/async), error handling, and guarantees.
Apply this diff to add concise Javadoc:
- void bulkIndex(boolean indexAll, LocalDateTime lastRun, Integer caseBatchSize, Integer taskBatchSize); + /** + * Performs bulk indexing of cases and tasks. + * + * @param indexAll if true, index all available data; otherwise index only stale data. + * @param lastRun the reference timestamp for stale detection; ignored when indexAll is true. Nullable. + * @param caseBatchSize optional batch size for cases; if null, implementation should fallback to configured defaults. + * @param taskBatchSize optional batch size for tasks; if null, implementation should fallback to configured defaults. + */ + void bulkIndex(boolean indexAll, LocalDateTime lastRun, Integer caseBatchSize, Integer taskBatchSize);Consider returning a result object (e.g., counts, durations, failures) to enable callers to provide actionable feedback and telemetry instead of relying solely on exceptions.
src/main/java/com/netgrif/application/engine/elastic/service/ReindexingTask.java (1)
81-81: Prefer parameterized loggingUse placeholders instead of string concatenation for structured logging and to avoid unnecessary string creation.
Apply this diff:
- elasticIndexService.bulkIndex(false, lastRun, null, null); + elasticIndexService.bulkIndex(false, lastRun, null, null);And (outside this range) adjust the surrounding log statements, for example:
log.info("Reindexing stale cases: started reindexing after {}", lastRun);src/main/java/com/netgrif/application/engine/elastic/service/ElasticIndexService.java (2)
361-396: Resource management looks good, but consider adding error handling for cursor operationsThe method properly uses try-with-resources for the CloseableIterator, which ensures proper cleanup. However, consider what happens if an exception occurs during cursor iteration - currently it would bubble up and potentially leave some cases unprocessed.
Consider adding error recovery logic to track progress and allow resumption:
try (CloseableIterator<Case> cursor = mongoTemplate.stream(query, Case.class)) { while (cursor.hasNext()) { - Case aCase = cursor.next(); + Case aCase; + try { + aCase = cursor.next(); + } catch (Exception e) { + log.error("Failed to fetch case from cursor, skipping: {}", e.getMessage()); + continue; + } prepareCase(aCase);
535-546: Consider adding more details to the ElasticsearchExceptionWhile the error handling aggregates all failures nicely, the exception message could be improved to show the count of failures for better observability.
if (!failedDocuments.isEmpty()) { - throw new ElasticsearchException("Bulk indexing has failures. Use ElasticsearchException.getFailedDocuments() for details [{}]", failedDocuments); + throw new ElasticsearchException("Bulk indexing failed for {} documents. Failed IDs: {}", + failedDocuments.size(), failedDocuments.keySet()); }
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (7)
src/main/java/com/netgrif/application/engine/configuration/properties/ElasticsearchProperties.java(2 hunks)src/main/java/com/netgrif/application/engine/elastic/service/ElasticIndexService.java(3 hunks)src/main/java/com/netgrif/application/engine/elastic/service/ReindexingTask.java(3 hunks)src/main/java/com/netgrif/application/engine/elastic/service/interfaces/IElasticIndexService.java(2 hunks)src/main/java/com/netgrif/application/engine/elastic/web/ElasticController.java(5 hunks)src/main/java/com/netgrif/application/engine/elastic/web/requestbodies/IndexParams.java(1 hunks)src/main/java/com/netgrif/application/engine/workflow/service/TaskService.java(0 hunks)
💤 Files with no reviewable changes (1)
- src/main/java/com/netgrif/application/engine/workflow/service/TaskService.java
🧰 Additional context used
🧬 Code Graph Analysis (2)
src/main/java/com/netgrif/application/engine/configuration/properties/ElasticsearchProperties.java (2)
src/main/java/com/netgrif/application/engine/elastic/web/requestbodies/IndexParams.java (1)
Data(11-28)src/main/java/com/netgrif/application/engine/configuration/properties/ExecutorProperties.java (1)
Data(5-10)
src/main/java/com/netgrif/application/engine/elastic/web/requestbodies/IndexParams.java (1)
src/main/java/com/netgrif/application/engine/configuration/properties/ElasticsearchProperties.java (2)
Data(15-84)Data(78-83)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (7)
- GitHub Check: task-list-completed
- GitHub Check: task-list-completed
- GitHub Check: task-list-completed
- GitHub Check: task-list-completed
- GitHub Check: task-list-completed
- GitHub Check: task-list-completed
- GitHub Check: task-list-completed
🔇 Additional comments (7)
src/main/java/com/netgrif/application/engine/configuration/properties/ElasticsearchProperties.java (1)
56-56: Expose sane defaults and avoid ambiguous 0-sized batchesbatch currently initializes to a new BatchProperties instance whose fields default to 0 (primitive int). This makes it ambiguous whether 0 means “unset” or an actual configured value and could trigger ineffective bulk operations.
Consider either:
- Using nullable Integer with explicit defaults, or
- Keeping int but assigning explicit defaults.
[ suggest_essential_refactor ]
Apply this diff to define clear defaults and enable validation in the nested class (see next comment for field changes):- private BatchProperties batch = new BatchProperties(); + private BatchProperties batch = new BatchProperties();Additionally, enable bean validation for this @ConfigurationProperties class by annotating it with @validated (outside of this range):
Java (outside this range):
import org.springframework.validation.annotation.Validated;@DaTa
@component
@ConfigurationProperties(prefix = "spring.data.elasticsearch")
@validated
public class ElasticsearchProperties {src/main/java/com/netgrif/application/engine/elastic/service/ElasticIndexService.java (6)
4-8: LGTM! Clean integration of the new Elasticsearch Java API clientThe new imports for the Elasticsearch Java API client (ElasticsearchClient, BulkRequest, BulkResponse, BulkOperation) align well with the PR's objective to speed up Elasticsearch reindexing through bulk operations.
19-79: LGTM! Well-structured constructor-based dependency injectionThe use of
@RequiredArgsConstructorwith final fields is a clean approach for dependency injection. All the new dependencies (ElasticsearchClient, CaseRepository, PetriNetService, MongoTemplate, mapping services) are properly declared and support the new bulk indexing functionality.
336-342: Potential time window gap in the predicate logicThe predicate uses
after.minusMinutes(2)which creates a 2-minute buffer, but this could lead to cases being missed if theafterparameter is precisely set. Consider documenting this behavior or making the buffer configurable.Could you clarify the rationale for the 2-minute buffer in the query predicate? This might need to be documented or made configurable to avoid confusion.
437-444: Good defensive programming in prepareCase methodThe method properly handles null checks for PetriNet and lastModified fields. The PetriNet retrieval from the service is a good safeguard against incomplete data.
496-527: Excellent recursive retry mechanism for bulk failuresThe divide-and-conquer approach to handle bulk failures is well-implemented. It recursively splits failed batches until individual operations are isolated, which helps identify problematic documents without losing the entire batch.
404-430: EnsurecaseIdIs Indexed for Bulk QueriesThe
bulkIndexTasksmethod filters tasks bycaseId, which without an index will trigger a full collection scan. I didn’t detect any@Indexedor@Indexannotation on thecaseIdfield in your domain model. Please verify or add an index:
- src/main/java/com/netgrif/application/engine/workflow/domain/Task.java
• Annotate thecaseIdfield with@Indexed(Spring Data)
• Or define a class-level@CompoundIndexincludingcaseId
...in/java/com/netgrif/application/engine/configuration/properties/ElasticsearchProperties.java
Show resolved
Hide resolved
src/main/java/com/netgrif/application/engine/elastic/service/ElasticIndexService.java
Show resolved
Hide resolved
src/main/java/com/netgrif/application/engine/elastic/service/ElasticIndexService.java
Show resolved
Hide resolved
src/main/java/com/netgrif/application/engine/elastic/service/ReindexingTask.java
Show resolved
Hide resolved
src/main/java/com/netgrif/application/engine/elastic/web/ElasticController.java
Show resolved
Hide resolved
src/main/java/com/netgrif/application/engine/elastic/web/ElasticController.java
Show resolved
Hide resolved
src/main/java/com/netgrif/application/engine/elastic/web/requestbodies/IndexParams.java
Show resolved
Hide resolved
Enable and configure JitPack repository for dependency resolution. Update QRGen dependency to version 3.0.1 with updated groupId and artifactId for compatibility.
src/main/java/com/netgrif/application/engine/elastic/service/ElasticIndexService.java
Outdated
Show resolved
Hide resolved
src/main/java/com/netgrif/application/engine/elastic/web/ElasticController.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
🔭 Outside diff range comments (1)
pom.xml (1)
69-104: Document and isolate JitPack repository for QRGenJitPack is only needed to pull com.github.kenglxn:QRGen:3.0.1 (not on Maven Central) and should be tightly constrained to avoid supply-chain/reproducibility risks.
Action items:
- Above the
<repository>entry, add a comment:
“JitPack required only for com.github.kenglxn:QRGen:3.0.1 (unavailable on Maven Central). Pin to an immutable tag/commit and enable only when building QRGen.”- Pin the version to an immutable Git tag or SHA (e.g.,
<version>v3.0.1</version>or<version>1a2b3c4d</version>), not a floating branch.- Scope JitPack under a dedicated Maven profile (e.g.
<profile>qrgen</profile>) so it’s activated only for QRGen builds.Example diff:
<repositories> + <!-- JitPack required only for com.github.kenglxn:QRGen:3.0.1 (not on Maven Central). + Pin to the immutable tag ‘v3.0.1’ (or commit SHA) and activate via the ‘qrgen’ profile to minimize blast radius. --> <repository> <id>jitpack.io</id> <url>https://jitpack.io</url> <releases><enabled>true</enabled></releases> <snapshots><enabled>false</enabled></snapshots> </repository> </repositories> +<profiles> + <profile> + <id>qrgen</id> + <activation> + <activeByDefault>false</activeByDefault> + </activation> + <repositories> + <repository> + <id>jitpack.io</id> + <url>https://jitpack.io</url> + </repository> + </repositories> + </profile> +</profiles>Would you like me to draft a follow-up PR that adds these comments, pins the version, and wraps JitPack in its own profile?
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
pom.xml(4 hunks)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (3)
- GitHub Check: task-list-completed
- GitHub Check: task-list-completed
- GitHub Check: Build
src/main/java/com/netgrif/application/engine/configuration/ElasticsearchConfiguration.java
Outdated
Show resolved
Hide resolved
Removed ElasticSearchJsonpMapper and transitioned to using RestHighLevelClient with configurable credentials support. Refactored bulk operation handling and mapped serializer setup to ObjectMapper bean. Adjusted LocalDateTime handling in date field transformations.
37f2023
src/main/java/com/netgrif/application/engine/elastic/service/ElasticIndexService.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
♻️ Duplicate comments (7)
src/main/java/com/netgrif/application/engine/elastic/service/ElasticIndexService.java (7)
388-390: Pagination formula accepted as per designUsing ((count / caseBatchSize) + 1) to guarantee at least one page is intentional according to prior discussion and team preference.
441-442: Task pagination formula accepted as per designSame rationale as case pagination — always report at least one page.
358-363: Validate provided batch sizes (> 0) to avoid invalid pagination and batchingGuard against zero/negative values to prevent division issues and infinite loops. This was raised previously and is still applicable.
if (caseBatchSize == null) { caseBatchSize = elasticsearchProperties.getBatch().getCaseBatchSize(); + } else if (caseBatchSize <= 0) { + throw new IllegalArgumentException("Case batch size must be positive, got: " + caseBatchSize); } if (taskBatchSize == null) { taskBatchSize = elasticsearchProperties.getBatch().getTaskBatchSize(); + } else if (taskBatchSize <= 0) { + throw new IllegalArgumentException("Task batch size must be positive, got: " + taskBatchSize); }
579-581: Fix exception message formatting; include failed documents in the messageThe "{}" placeholder won't be replaced; embed the map into the message string.
- if (!failedDocuments.isEmpty()) { - throw new ElasticsearchException("Bulk indexing has failures. Use ElasticsearchException.getFailedDocuments() for details [{}]", failedDocuments); - } + if (!failedDocuments.isEmpty()) { + throw new ElasticsearchException("Bulk indexing has failures. Failed documents: " + failedDocuments); + }
541-563: Handle IO exceptions with the same divide-and-conquer retry pathRestHighLevelClient.bulk throws IOException. Treat it like ElasticsearchException to trigger the split-and-retry logic instead of just logging and aborting.
- } catch (ElasticsearchException e) { + } catch (ElasticsearchException | java.io.IOException e) { log.warn("Failed for {} ops to index bulk {}", operations.size(), e.getMessage(), e); if (operations.size() == 1) { log.error("Single operation failed. Skipping. {}", operations.get(0), e); return; } log.warn("Dividing the requirement."); int mid = operations.size() / 2; List<UpdateRequest> left = operations.subList(0, mid); List<UpdateRequest> right = operations.subList(mid, operations.size()); executeAndValidate(new ArrayList<>(left)); executeAndValidate(new ArrayList<>(right)); - } catch (Exception e) { - log.error("Failed to index bulk: {}", e.getMessage(), e); - } + } catch (Exception e) { + log.error("Failed to index bulk: {}", e.getMessage(), e); + }
495-506: Guard against missing document IDs and avoid double serialization (cases)
- If doc.getId() is null, the bulk op will fail. Fallback to stringId or skip.
- Serialize once and use docAsUpsert(true) to avoid sending the payload twice.
- private void prepareCaseBulkOperation(ElasticCase doc, List<UpdateRequest> operations) { + private void prepareCaseBulkOperation(ElasticCase doc, List<UpdateRequest> operations) { try { - UpdateRequest updateRequest = new UpdateRequest() - .id(doc.getId()) - .doc(objectMapper.writeValueAsString(doc), XContentType.JSON) - .upsert(objectMapper.writeValueAsString(doc), XContentType.JSON) - .index(elasticsearchProperties.getIndex().get("case")); + final String id = doc.getId() != null ? doc.getId() : doc.getStringId(); + if (id == null || id.isEmpty()) { + log.warn("Skipping case bulk op due to missing ID for [{}]", doc.getStringId()); + return; + } + final String json = objectMapper.writeValueAsString(doc); + UpdateRequest updateRequest = new UpdateRequest() + .id(id) + .doc(json, XContentType.JSON) + .docAsUpsert(true) + .index(elasticsearchProperties.getIndex().get("case")); operations.add(updateRequest); } catch (Exception e) { log.error("Failed to prepare bulk operation for case [{}]: {}", doc.getStringId(), e.getMessage()); } }
514-525: Guard against missing document IDs and avoid double serialization (tasks)Mirror the same resilience as for cases.
- private void prepareTaskBulkOperation(ElasticTask doc, List<UpdateRequest> operations) { + private void prepareTaskBulkOperation(ElasticTask doc, List<UpdateRequest> operations) { try { - UpdateRequest updateRequest = new UpdateRequest() - .id(doc.getId()) - .doc(objectMapper.writeValueAsString(doc), XContentType.JSON) - .upsert(objectMapper.writeValueAsString(doc), XContentType.JSON) - .index(elasticsearchProperties.getIndex().get("task")); + final String id = doc.getId() != null ? doc.getId() : doc.getStringId(); + if (id == null || id.isEmpty()) { + log.warn("Skipping task bulk op due to missing ID for [{}]", doc.getStringId()); + return; + } + final String json = objectMapper.writeValueAsString(doc); + UpdateRequest updateRequest = new UpdateRequest() + .id(id) + .doc(json, XContentType.JSON) + .docAsUpsert(true) + .index(elasticsearchProperties.getIndex().get("task")); operations.add(updateRequest); } catch (Exception e) { log.error("Failed to prepare bulk operation for task [{}]: {}", doc.getStringId(), e.getMessage()); } }
🧹 Nitpick comments (3)
src/main/java/com/netgrif/application/engine/elastic/service/ElasticIndexService.java (3)
355-357: Clarify log to reflect indexAll modeWhen indexAll is true, the log "Reindexing stale cases: started reindexing after {}" is misleading. Make the intent explicit.
- log.info("Reindexing stale cases: started reindexing after {}", after); + log.info("Reindexing: {} (after={})", indexAll ? "index all cases" : "index stale cases", after);
392-392: Use int for currentBatchSize to match caseBatchSize typeAvoid mixing long and int for batch counters.
- long page = 1, currentBatchSize = 0; + long page = 1; + int currentBatchSize = 0;
443-443: Use int for currentBatchSize (tasks) to match taskBatchSize typeConsistency with the case loop and type safety.
- long page = 1, currentBatchSize = 0; + long page = 1; + int currentBatchSize = 0;
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
💡 Knowledge Base configuration:
- MCP integration is disabled by default for public repositories
- Jira integration is disabled by default for public repositories
- Linear integration is disabled by default for public repositories
You can enable these settings in your CodeRabbit configuration.
📒 Files selected for processing (4)
src/main/java/com/netgrif/application/engine/configuration/ElasticsearchConfiguration.java(4 hunks)src/main/java/com/netgrif/application/engine/configuration/properties/ElasticsearchProperties.java(4 hunks)src/main/java/com/netgrif/application/engine/elastic/service/ElasticCaseMappingService.java(4 hunks)src/main/java/com/netgrif/application/engine/elastic/service/ElasticIndexService.java(4 hunks)
🚧 Files skipped from review as they are similar to previous changes (3)
- src/main/java/com/netgrif/application/engine/elastic/service/ElasticCaseMappingService.java
- src/main/java/com/netgrif/application/engine/configuration/properties/ElasticsearchProperties.java
- src/main/java/com/netgrif/application/engine/configuration/ElasticsearchConfiguration.java
🧰 Additional context used
🧠 Learnings (1)
📚 Learning: 2025-08-13T09:38:25.795Z
Learnt from: renczesstefan
PR: netgrif/application-engine#327
File: src/main/java/com/netgrif/application/engine/elastic/service/ElasticIndexService.java:362-362
Timestamp: 2025-08-13T09:38:25.795Z
Learning: In ElasticIndexService pagination calculations, the formula `((count / batchSize) + 1)` is intentionally used instead of the mathematically precise `(count + batchSize - 1) / batchSize` to ensure at least 1 page is always returned, even when count=0. This design choice prioritizes simplicity and guaranteeing minimum processing over exact page counts.
Applied to files:
src/main/java/com/netgrif/application/engine/elastic/service/ElasticIndexService.java
🧬 Code Graph Analysis (1)
src/main/java/com/netgrif/application/engine/elastic/service/ElasticIndexService.java (5)
src/main/java/com/netgrif/application/engine/elastic/service/ElasticCaseMappingService.java (1)
Slf4j(29-297)src/main/java/com/netgrif/application/engine/petrinet/service/PetriNetService.java (1)
Slf4j(72-584)src/main/java/com/netgrif/application/engine/workflow/service/WorkflowService.java (1)
Service(57-599)src/main/java/com/netgrif/application/engine/elastic/service/ElasticPetriNetService.java (1)
Service(18-92)src/main/java/com/netgrif/application/engine/elastic/service/ElasticPetriNetMappingService.java (1)
Service(8-15)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (6)
- GitHub Check: task-list-completed
- GitHub Check: task-list-completed
- GitHub Check: task-list-completed
- GitHub Check: task-list-completed
- GitHub Check: task-list-completed
- GitHub Check: Build
src/main/java/com/netgrif/application/engine/elastic/service/ElasticIndexService.java
Show resolved
Hide resolved
src/main/java/com/netgrif/application/engine/elastic/service/ElasticIndexService.java
Show resolved
Hide resolved
Updated `prepareCaseBulkOperation` and `prepareTaskBulkOperation` methods to handle cases where `doc.getId()` is null by using `doc.getStringId()` as a fallback. This ensures robust ID assignment during bulk operations and prevents potential null pointer issues.
Extract the JSON serialization of documents into a variable to improve code readability and reduce redundancy. This change applies to both case and task bulk operations.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
♻️ Duplicate comments (8)
src/main/java/com/netgrif/application/engine/elastic/service/ElasticIndexService.java (8)
388-390: Pagination calculation accepted as per agreed designUsing ((count / caseBatchSize) + 1) is intentionally kept to ensure at least one page, per team decision. No change requested.
358-363: Validate batch sizes to prevent invalid pagination and memory pressureReject non-positive values early; defaulting only covers nulls.
Apply this diff:
if (caseBatchSize == null) { caseBatchSize = elasticsearchProperties.getBatch().getCaseBatchSize(); + } else if (caseBatchSize <= 0) { + throw new IllegalArgumentException("Case batch size must be positive, got: " + caseBatchSize); } if (taskBatchSize == null) { taskBatchSize = elasticsearchProperties.getBatch().getTaskBatchSize(); + } else if (taskBatchSize <= 0) { + throw new IllegalArgumentException("Task batch size must be positive, got: " + taskBatchSize); }
365-371: Index-all should include documents with null/missing lastModifiedWhen indexAll is true, filtering on lastModified < now excludes nulls. For a true reindex-all, use a match-all query.
Apply this diff:
- org.springframework.data.mongodb.core.query.Query query; - if (indexAll || after == null) { - query = org.springframework.data.mongodb.core.query.Query.query(Criteria.where("lastModified").lt(now)); - log.info("Reindexing stale cases: force all"); - } else { - query = org.springframework.data.mongodb.core.query.Query.query(Criteria.where("lastModified").lt(now).gt(after.minusMinutes(2))); - } + org.springframework.data.mongodb.core.query.Query query; + if (indexAll) { + // Index absolutely all cases (including those without lastModified) + query = new org.springframework.data.mongodb.core.query.Query(); + log.info("Reindexing: indexing all cases"); + } else if (after == null) { + query = org.springframework.data.mongodb.core.query.Query.query(Criteria.where("lastModified").lt(now)); + log.info("Reindexing stale cases: force all before now"); + } else { + query = org.springframework.data.mongodb.core.query.Query.query(Criteria.where("lastModified").lt(now).gt(after.minusMinutes(2))); + }
397-415: Harden per-case processing to avoid aborting the whole batchAny exception in prepareCase or transform currently aborts the batch. Catch per-item and continue.
Apply this diff:
- while (cursor.hasNext()) { - Case aCase = cursor.next(); - prepareCase(aCase); - ElasticCase elasticCase = caseMappingService.transform(aCase); - ElasticCase existingCase = null; - try { - existingCase = elasticCaseRepository.findByStringId(aCase.getStringId()); - } catch (InvalidDataAccessApiUsageException ignored) { - log.debug("[{}]: Case \"{}\" has duplicates, will reindex.", aCase.getStringId(), aCase.getTitle()); - elasticCaseRepository.deleteAllByStringId(aCase.getStringId()); - } - if (existingCase == null) { - existingCase = elasticCase; - } else { - existingCase.update(elasticCase); - } - prepareCaseBulkOperation(existingCase, caseOperations); - caseIds.add(aCase.getStringId()); + while (cursor.hasNext()) { + Case aCase = cursor.next(); + try { + prepareCase(aCase); + ElasticCase elasticCase = caseMappingService.transform(aCase); + ElasticCase existingCase = null; + try { + existingCase = elasticCaseRepository.findByStringId(aCase.getStringId()); + } catch (InvalidDataAccessApiUsageException ignored) { + log.debug("[{}]: Case \"{}\" has duplicates, will reindex.", aCase.getStringId(), aCase.getTitle()); + elasticCaseRepository.deleteAllByStringId(aCase.getStringId()); + } + if (existingCase == null) { + existingCase = elasticCase; + } else { + existingCase.update(elasticCase); + } + prepareCaseBulkOperation(existingCase, caseOperations); + caseIds.add(aCase.getStringId()); + } catch (Exception ex) { + log.error("Skipping case [{}] due to processing error: {}", aCase.getStringId(), ex.getMessage(), ex); + continue; + }
495-507: Guard against missing IDs in case bulk opsIf both id and stringId are null/empty, the UpdateRequest will fail at execution.
Apply this diff:
private void prepareCaseBulkOperation(ElasticCase doc, List<UpdateRequest> operations) { try { String json = objectMapper.writeValueAsString(doc); - UpdateRequest updateRequest = new UpdateRequest() - .id(doc.getId() == null ? doc.getStringId() : doc.getId()) + final String id = doc.getId() != null ? doc.getId() : doc.getStringId(); + if (id == null || id.isEmpty()) { + log.warn("Skipping case bulk op due to missing ID for [{}]", doc.getStringId()); + return; + } + UpdateRequest updateRequest = new UpdateRequest() + .id(id) .doc(json, XContentType.JSON) .upsert(json, XContentType.JSON) .index(elasticsearchProperties.getIndex().get("case")); operations.add(updateRequest); } catch (Exception e) { log.error("Failed to prepare bulk operation for case [{}]: {}", doc.getStringId(), e.getMessage()); } }
515-527: Guard against missing IDs in task bulk opsSame null/empty risk as with cases; add a fallback and skip when unresolved.
Apply this diff:
private void prepareTaskBulkOperation(ElasticTask doc, List<UpdateRequest> operations) { try { String json = objectMapper.writeValueAsString(doc); - UpdateRequest updateRequest = new UpdateRequest() - .id(doc.getId() == null ? doc.getStringId() : doc.getId()) + final String id = doc.getId() != null ? doc.getId() : doc.getStringId(); + if (id == null || id.isEmpty()) { + log.warn("Skipping task bulk op due to missing ID for [{}]", doc.getStringId()); + return; + } + UpdateRequest updateRequest = new UpdateRequest() + .id(id) .doc(json, XContentType.JSON) .upsert(json, XContentType.JSON) .index(elasticsearchProperties.getIndex().get("task")); operations.add(updateRequest); } catch (Exception e) { log.error("Failed to prepare bulk operation for task [{}]: {}", doc.getStringId(), e.getMessage()); } }
581-583: Fix failed-docs message formattingThe "{}" placeholder isn’t interpolated; the message won’t show failedDocuments.
Apply this diff:
- if (!failedDocuments.isEmpty()) { - throw new ElasticsearchException("Bulk indexing has failures. Use ElasticsearchException.getFailedDocuments() for details [{}]", failedDocuments); - } + if (!failedDocuments.isEmpty()) { + throw new ElasticsearchException("Bulk indexing has failures. Failed documents: " + failedDocuments); + }
542-564: Include IOException in split-and-retry; refine log messagebulk(...) can throw IOException; include it in the retryable branch. Also, the log “Dividing the requirement.” is unclear.
Apply this diff:
- try { + try { BulkResponse response = elasticsearchClient.bulk(request, RequestOptions.DEFAULT); checkForBulkUpdateFailure(response); log.info("Batch indexed successfully with {} ops", operations.size()); - } catch (ElasticsearchException e) { + } catch (org.elasticsearch.ElasticsearchException | java.io.IOException e) { log.warn("Failed for {} ops to index bulk {}", operations.size(), e.getMessage(), e); if (operations.size() == 1) { log.error("Single operation failed. Skipping. {}", operations.get(0), e); return; } - log.warn("Dividing the requirement."); + log.warn("Dividing the batch and retrying."); int mid = operations.size() / 2; List<UpdateRequest> left = operations.subList(0, mid); List<UpdateRequest> right = operations.subList(mid, operations.size()); executeAndValidate(new ArrayList<>(left)); executeAndValidate(new ArrayList<>(right)); } catch (Exception e) { log.error("Failed to index bulk: {}", e.getMessage(), e); }
🧹 Nitpick comments (1)
src/main/java/com/netgrif/application/engine/elastic/service/ElasticIndexService.java (1)
355-355: Clarify start log when indexing all casesThe current message implies “stale after X” even when indexAll=true.
Apply this diff to make the message reflect the mode:
- log.info("Reindexing stale cases: started reindexing after {}", after); + if (indexAll) { + log.info("Reindexing: started reindexing all cases"); + } else { + log.info("Reindexing stale cases: started reindexing after {}", after); + }
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
💡 Knowledge Base configuration:
- MCP integration is disabled by default for public repositories
- Jira integration is disabled by default for public repositories
- Linear integration is disabled by default for public repositories
You can enable these settings in your CodeRabbit configuration.
📒 Files selected for processing (1)
src/main/java/com/netgrif/application/engine/elastic/service/ElasticIndexService.java(4 hunks)
🧰 Additional context used
🧠 Learnings (1)
📚 Learning: 2025-08-13T09:38:25.795Z
Learnt from: renczesstefan
PR: netgrif/application-engine#327
File: src/main/java/com/netgrif/application/engine/elastic/service/ElasticIndexService.java:362-362
Timestamp: 2025-08-13T09:38:25.795Z
Learning: In ElasticIndexService pagination calculations, the formula `((count / batchSize) + 1)` is intentionally used instead of the mathematically precise `(count + batchSize - 1) / batchSize` to ensure at least 1 page is always returned, even when count=0. This design choice prioritizes simplicity and guaranteeing minimum processing over exact page counts.
Applied to files:
src/main/java/com/netgrif/application/engine/elastic/service/ElasticIndexService.java
🧬 Code Graph Analysis (1)
src/main/java/com/netgrif/application/engine/elastic/service/ElasticIndexService.java (1)
src/main/java/com/netgrif/application/engine/elastic/service/ElasticPetriNetMappingService.java (1)
Service(8-15)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (5)
- GitHub Check: task-list-completed
- GitHub Check: Test
- GitHub Check: task-list-completed
- GitHub Check: task-list-completed
- GitHub Check: task-list-completed
🔇 Additional comments (4)
src/main/java/com/netgrif/application/engine/elastic/service/ElasticIndexService.java (4)
83-108: Constructor-based DI with final fields is solidClean wiring, immutability via final fields, and qualified ObjectMapper injection are all good choices here.
452-456: Correct repository used for task duplicate cleanupGood fix: deleting duplicates from elasticTaskRepository instead of elasticCaseRepository.
497-503: Nice: single serialization reused for doc and upsertStoring objectMapper result in a variable avoids double serialization.
517-523: Nice: single serialization reused for task doc and upsertSame benefit here, avoiding duplicate writeValueAsString calls.
Update `DataSearchRequestTest` to use `MIDNIGHT` instead of `NOON` for `LocalTime` in timestamp creation
6f102aa
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
💡 Knowledge Base configuration:
- MCP integration is disabled by default for public repositories
- Jira integration is disabled by default for public repositories
- Linear integration is disabled by default for public repositories
You can enable these settings in your CodeRabbit configuration.
📒 Files selected for processing (1)
src/test/groovy/com/netgrif/application/engine/elastic/DataSearchRequestTest.groovy(1 hunks)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (5)
- GitHub Check: Test
- GitHub Check: task-list-completed
- GitHub Check: task-list-completed
- GitHub Check: task-list-completed
- GitHub Check: task-list-completed
Description
Implements bulk indexation through REST endpoint and service method to Elasticsearch database. The new endpoint introduces configurable index request, that includes definition of whether we want to index all cases or just stale cases, it can define case batch size, task batch size that tells the algorithm how many cases and tasks will be indexed during one bulk operation.
Implements NAE-2136
Dependencies
No new dependencies were introduced.
Third party dependencies
No new dependencies were introduced.
Blocking Pull requests
There are no dependencies on other PR.
How Has Been This Tested?
This was tested on projects and with unit tests.
Test Configuration
Checklist:
Summary by CodeRabbit
New Features
Configuration
Bug Fixes
Chores