Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
17 commits
Select commit Hold shift + click to select a range
75abf37
Add enhanced bulk indexing for Elasticsearch
renczesstefan Aug 19, 2025
ef45e76
Refactor cursor handling and improve error reporting.
renczesstefan Aug 19, 2025
97254f0
Refactor repository methods to replace deprecated identifiers
renczesstefan Aug 19, 2025
82ca36a
Refactor Elasticsearch configuration and object mapping setup
renczesstefan Aug 19, 2025
30ca813
Add custom JsonpMapper for Elasticsearch configuration
renczesstefan Aug 19, 2025
27d0da8
Refactor IDs by replacing `stringId` with `id`
renczesstefan Aug 19, 2025
945b4cb
Update validation import to use Jakarta package
renczesstefan Aug 19, 2025
cc9ebea
Add @ToString.Exclude to sensitive properties
renczesstefan Aug 19, 2025
d1f209c
Refactor reindexing to use try-with-resources and Stream API.
renczesstefan Aug 19, 2025
0345344
Add support for reindexing from a specific timestamp
renczesstefan Aug 19, 2025
90c1112
Fix incorrect method parameter in task removal
renczesstefan Aug 19, 2025
7ee5e89
Fix inconsistent task ID usage in task removal process
renczesstefan Aug 20, 2025
fbaafc3
Refactor case ID field access in test assertions
renczesstefan Aug 20, 2025
4850288
Remove explicit constructors and add @NoArgsConstructor annotations
renczesstefan Aug 20, 2025
108c089
Fix null check for task ID in ElasticTaskQueueManager
renczesstefan Aug 20, 2025
f141ee9
Refactor task ID handling in ElasticTask operations
renczesstefan Aug 20, 2025
30def80
Update logging and tests for task indexing in Elastic service
renczesstefan Aug 20, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,12 +1,24 @@
package com.netgrif.application.engine.configuration;

import co.elastic.clients.json.JsonpMapper;
import co.elastic.clients.json.jackson.JacksonJsonpMapper;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationFeature;
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
import com.netgrif.application.engine.configuration.properties.DataConfigurationProperties;
import com.netgrif.application.engine.objects.elastic.serializer.LocalDateTimeJsonDeserializer;
import com.netgrif.application.engine.objects.elastic.serializer.LocalDateTimeJsonSerializer;
import com.netgrif.application.engine.workflow.service.CaseEventHandler;
import org.jetbrains.annotations.NotNull;
import org.springframework.context.annotation.*;
import org.springframework.data.elasticsearch.client.ClientConfiguration;
import org.springframework.data.elasticsearch.repository.config.EnableElasticsearchRepositories;
import org.springframework.data.elasticsearch.support.HttpHeaders;

@Configuration
import java.time.LocalDateTime;
import java.util.List;

@Configuration
@EnableElasticsearchRepositories(excludeFilters = {
@ComponentScan.Filter(
type = FilterType.REGEX,
Expand Down Expand Up @@ -46,10 +58,53 @@ public CaseEventHandler caseEventHandler() {
return new CaseEventHandler();
}

@NotNull
@Override
public ClientConfiguration clientConfiguration() {
return ClientConfiguration.builder()
.connectedTo(elasticsearchProperties.getUrl() + ":" + elasticsearchProperties.getSearchPort())
.build();
List<String> urls = sanitizeUrls(elasticsearchProperties.getUrl());

ClientConfiguration.MaybeSecureClientConfigurationBuilder client = ClientConfiguration.builder()
.connectedTo(urls.toArray(String[]::new));
ClientConfiguration.TerminalClientConfigurationBuilder clientBuilder = client;

if (elasticsearchProperties.isSsl()) {
clientBuilder = client.usingSsl();
}
if (hasCredentials()) {
clientBuilder = clientBuilder.withBasicAuth(elasticsearchProperties.getUsername(), elasticsearchProperties.getPassword());
} else if (hasToken()) {
clientBuilder.withHeaders(() -> {
HttpHeaders headers = new HttpHeaders();
headers.add(HttpHeaders.AUTHORIZATION, "Bearer " + elasticsearchProperties.getToken());
return headers;
});
}
return clientBuilder.build();
}

@NotNull
@Override
public JsonpMapper jsonpMapper() {
ObjectMapper mapper = new ObjectMapper();
JavaTimeModule javaTimeModule = new JavaTimeModule();

mapper.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS);
javaTimeModule.addSerializer(LocalDateTime.class, new LocalDateTimeJsonSerializer());
javaTimeModule.addDeserializer(LocalDateTime.class, new LocalDateTimeJsonDeserializer());
mapper.registerModule(javaTimeModule);
return new JacksonJsonpMapper(mapper);
}

private boolean hasCredentials() {
return elasticsearchProperties.getUsername() != null && !elasticsearchProperties.getUsername().isBlank() &&
elasticsearchProperties.getPassword() != null && !elasticsearchProperties.getPassword().isBlank();
}

private boolean hasToken() {
return elasticsearchProperties.getToken() != null && !elasticsearchProperties.getToken().isBlank();
}

private List<String> sanitizeUrls(List<String> urls) {
return urls.stream().map(u -> u.contains(":") ? u : u + ":" + elasticsearchProperties.getSearchPort()).toList();
}
}
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
package com.netgrif.application.engine.configuration.properties;

import jakarta.annotation.PostConstruct;
import jakarta.validation.constraints.Min;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.NoArgsConstructor;
import lombok.ToString;
import org.springframework.boot.autoconfigure.data.rest.RepositoryRestProperties;
import org.springframework.boot.autoconfigure.session.RedisSessionProperties;
import org.springframework.boot.context.properties.ConfigurationProperties;
Expand All @@ -14,6 +16,7 @@
import org.springframework.util.LinkedMultiValueMap;
import org.springframework.util.MultiValueMap;

import jakarta.validation.Valid;
import java.time.Duration;
import java.util.*;

Expand Down Expand Up @@ -155,7 +158,12 @@ public static class ElasticsearchProperties {
/**
* Hostname for the Elasticsearch server.
*/
private String url = "localhost";
private List<String> url = List.of("localhost");

/**
* Indicates if SSL is enabled for Elasticsearch communication.
*/
private boolean ssl = false;

/**
* Port for connecting to Elasticsearch transport client.
Expand All @@ -167,6 +175,24 @@ public static class ElasticsearchProperties {
*/
private int searchPort = 9200;

/**
* The username used for authenticating with the Elasticsearch server.
*/
@ToString.Exclude
private String username = null;

/**
* The password used for authenticating with the Elasticsearch server.
*/
@ToString.Exclude
private String password = null;

/**
* The authentication token for the Elasticsearch server, when using token-based authentication.
*/
@ToString.Exclude
private String token = null;

/**
* Command to trigger a reindexing job.
*/
Expand Down Expand Up @@ -242,6 +268,15 @@ public static class ElasticsearchProperties {
*/
private PriorityProperties priority = new PriorityProperties();


/**
* Batch-related configuration properties for Elasticsearch operations.
* These properties control the batch size for cases and tasks during
* bulk operations to optimize performance and resource usage.
*/
@Valid
private BatchProperties batch = new BatchProperties();

public static final String PETRI_NET_INDEX = "petriNet";

public static final String CASE_INDEX = "case";
Expand Down Expand Up @@ -334,6 +369,30 @@ public static class PriorityProperties {
"visualId.keyword^2"
);
}

/**
* Configuration properties for batch operations in Elasticsearch.
* This class specifies the batch sizes for cases and tasks when performing
* bulk operations like indexing or updating. These values are used to
* control and optimize resource consumption during high-load processes.
*/
@Data
public static class BatchProperties {

/**
* Default batch size for cases during Elasticsearch bulk operations.
* This value must be at least 1. The default is 5000.
*/
@Min(1)
private int caseBatchSize = 5000;

/**
* Default batch size for tasks during Elasticsearch bulk operations.
* This value must be at least 1. The default is 20000.
*/
@Min(1)
private int taskBatchSize = 20000;
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,9 @@
@Repository
public interface ElasticCaseRepository extends ElasticsearchRepository<ElasticCase, String> {

ElasticCase findByStringId(String stringId);
long countByIdAndLastModified(String stringId, long lastUpdated);

long countByStringIdAndLastModified(String stringId, long lastUpdated);

void deleteAllByStringId(String id);
void deleteAllById(String id);

void deleteAllByProcessId(String processId);
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,27 +4,17 @@
import org.springframework.stereotype.Repository;
import com.netgrif.application.engine.adapter.spring.elastic.domain.ElasticPetriNet;

import java.util.List;

/**
* Repository interface for managing {@link ElasticPetriNet} entities in Elasticsearch.
* Extends {@link ElasticsearchRepository} to provide CRUD operations and additional query methods.
*/
@Repository
public interface ElasticPetriNetRepository extends ElasticsearchRepository<ElasticPetriNet, String> {

/**
* Finds an {@link ElasticPetriNet} entity by its string ID.
*
* @param stringId the string ID of the {@link ElasticPetriNet} to find
* @return the {@link ElasticPetriNet} entity with the given string ID, or {@code null} if none found
*/
ElasticPetriNet findByStringId(String stringId);

/**
* Deletes all {@link ElasticPetriNet} entities with the given string ID.
*
* @param id the string ID of the {@link ElasticPetriNet} entities to delete
*/
void deleteAllByStringId(String id);
void deleteAllById(String id);
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,9 @@
@Repository
public interface ElasticTaskRepository extends ElasticsearchRepository<ElasticTask, String> {

ElasticTask findByStringId(String stringId);

ElasticTask findByTaskId(String taskId);

void deleteAllByStringId(String taskId);
void deleteAllById(String taskId);

ElasticTask deleteAllByTaskId(String taskId);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -257,11 +257,11 @@ private StringBuilder buildFullName(String name, String surname) {
dateField, com.netgrif.application.engine.objects.petrinet.domain.dataset.DateField netField) {
if (dateField.getValue() instanceof LocalDate) {
LocalDate date = (LocalDate) dateField.getValue();
return formatDateField(LocalDateTime.of(date, LocalTime.NOON));
return formatDateField(LocalDateTime.of(date, LocalTime.MIDNIGHT));
} else if (dateField.getValue() instanceof Date) {
// log.warn(String.format("DateFields should have LocalDate values! DateField (%s) with Date value found! Value will be converted for indexation.", netField.getImportId()));
LocalDateTime transformed = this.transformDateValueField(dateField);
return formatDateField(LocalDateTime.of(transformed.toLocalDate(), LocalTime.NOON));
return formatDateField(LocalDateTime.of(transformed.toLocalDate(), LocalTime.MIDNIGHT));
} else {
// TODO throw error?
log.error(String.format("Unsupported DateField value type (%s)! Skipping indexation...", dateField.getValue().getClass().getCanonicalName()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,6 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.actuate.autoconfigure.metrics.export.elastic.ElasticProperties;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.context.annotation.Lazy;
import org.springframework.dao.InvalidDataAccessApiUsageException;
Expand Down Expand Up @@ -88,7 +86,7 @@ public void setElasticProperties(DataConfigurationProperties.ElasticsearchProper
@Override
public void remove(String caseId) {
executors.execute(caseId, () -> {
repository.deleteAllByStringId(caseId);
repository.deleteAllById(caseId);
log.info("[" + caseId + "]: Case \"" + caseId + "\" deleted");
});
}
Expand All @@ -103,22 +101,23 @@ public void removeByPetriNetId(String processId) {

@Override
public void index(ElasticCase useCase) {
executors.execute(useCase.getStringId(), () -> {
executors.execute(useCase.getId(), () -> {
try {
com.netgrif.application.engine.adapter.spring.elastic.domain.ElasticCase elasticCase = repository.findByStringId(useCase.getStringId());
if (elasticCase == null) {
Optional<com.netgrif.application.engine.adapter.spring.elastic.domain.ElasticCase> elasticCaseOptional = repository.findById(useCase.getId());
if (elasticCaseOptional.isEmpty()) {
repository.save((com.netgrif.application.engine.adapter.spring.elastic.domain.ElasticCase) useCase);
} else {
com.netgrif.application.engine.adapter.spring.elastic.domain.ElasticCase elasticCase = elasticCaseOptional.get();
elasticCase.update(useCase);
repository.save(elasticCase);
}
log.debug("[" + useCase.getStringId() + "]: Case \"" + useCase.getTitle() + "\" indexed");
log.debug("[" + useCase.getId() + "]: Case \"" + useCase.getTitle() + "\" indexed");
publisher.publishEvent(new IndexCaseEvent(useCase));
} catch (InvalidDataAccessApiUsageException ignored) {
log.debug("[" + useCase.getStringId() + "]: Case \"" + useCase.getTitle() + "\" has duplicates, will be reindexed");
repository.deleteAllByStringId(useCase.getStringId());
log.debug("[" + useCase.getId() + "]: Case \"" + useCase.getTitle() + "\" has duplicates, will be reindexed");
repository.deleteAllById(useCase.getId());
repository.save((com.netgrif.application.engine.adapter.spring.elastic.domain.ElasticCase) useCase);
log.debug("[" + useCase.getStringId() + "]: Case \"" + useCase.getTitle() + "\" indexed");
log.debug("[" + useCase.getId() + "]: Case \"" + useCase.getTitle() + "\" indexed");
}
});
}
Expand All @@ -144,7 +143,7 @@ public Page<Case> search(List<CaseSearchRequest> requests, LoggedUser user, Page
if (query != null) {
SearchHits<ElasticCase> hits = template.search(query, ElasticCase.class, IndexCoordinates.of(elasticProperties.getIndex().get(DataConfigurationProperties.ElasticsearchProperties.CASE_INDEX)));
Page<ElasticCase> indexedCases = (Page) SearchHitSupport.unwrapSearchHits(SearchHitSupport.searchPageFor(hits, query.getPageable()));
casePage = workflowService.findAllById(indexedCases.get().map(ElasticCase::getStringId).collect(Collectors.toList()));
casePage = workflowService.findAllById(indexedCases.get().map(ElasticCase::getId).collect(Collectors.toList()));
total = indexedCases.getTotalElements();
log.debug("Found [{}] total elements of page [{}]", casePage.size(), pageable.getPageNumber());
} else {
Expand Down
Loading
Loading