Skip to content

Commit

Permalink
Merge pull request #850 from jobrunr/feature/GH-499
Browse files Browse the repository at this point in the history
Feature/GH-499
  • Loading branch information
rdehuyss committed Oct 16, 2023
2 parents f00c8c6 + f4abdf2 commit c14f896
Show file tree
Hide file tree
Showing 32 changed files with 944 additions and 836 deletions.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions core/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ dependencies {
compileOnly 'redis.clients:jedis'
compileOnly 'io.lettuce:lettuce-core'
compileOnly 'org.mongodb:mongodb-driver-sync'
compileOnly 'org.elasticsearch.client:elasticsearch-rest-high-level-client'
compileOnly 'co.elastic.clients:elasticsearch-java'

testImplementation 'org.ow2.asm:asm-util'
testImplementation 'com.fasterxml.jackson.core:jackson-databind'
Expand Down Expand Up @@ -78,7 +78,7 @@ dependencies {
testImplementation 'redis.clients:jedis'
testImplementation 'io.lettuce:lettuce-core'
testImplementation 'org.mongodb:mongodb-driver-sync'
testImplementation 'org.elasticsearch.client:elasticsearch-rest-high-level-client'
testImplementation 'co.elastic.clients:elasticsearch-java'

testFixturesApi 'org.junit.jupiter:junit-jupiter'
testFixturesApi 'org.assertj:assertj-core'
Expand Down
Original file line number Diff line number Diff line change
@@ -1,15 +1,8 @@
package org.jobrunr.storage.nosql.elasticsearch;

import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.action.get.GetRequest;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.support.WriteRequest.RefreshPolicy;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.client.indices.GetIndexRequest;
import org.elasticsearch.xcontent.XContentBuilder;
import org.elasticsearch.xcontent.json.JsonXContent;
import co.elastic.clients.elasticsearch.ElasticsearchClient;
import co.elastic.clients.elasticsearch._types.ElasticsearchException;
import co.elastic.clients.transport.endpoints.BooleanResponse;
import org.jobrunr.JobRunrException;
import org.jobrunr.storage.StorageException;
import org.jobrunr.storage.StorageProviderUtils.BackgroundJobServers;
Expand All @@ -23,27 +16,29 @@

import java.io.IOException;
import java.time.Instant;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;

import static co.elastic.clients.elasticsearch._types.Refresh.True;
import static java.util.Arrays.asList;
import static org.elasticsearch.rest.RestStatus.NOT_FOUND;
import static org.apache.http.HttpStatus.SC_NOT_FOUND;
import static org.jobrunr.storage.StorageProviderUtils.Migrations;
import static org.jobrunr.storage.StorageProviderUtils.elementPrefixer;
import static org.jobrunr.storage.nosql.elasticsearch.ElasticSearchUtils.JOBRUNR_PREFIX;
import static org.jobrunr.storage.nosql.elasticsearch.ElasticSearchUtils.sleep;
import static org.jobrunr.storage.nosql.elasticsearch.migrations.ElasticSearchMigration.*;
import static org.jobrunr.utils.CollectionUtils.asSet;
import static org.jobrunr.utils.StringUtils.substringBefore;

public class ElasticSearchDBCreator extends NoSqlDatabaseCreator<ElasticSearchMigration> {

public static final String JOBRUNR_MIGRATIONS_INDEX_NAME = JOBRUNR_PREFIX + Migrations.NAME;
private final RestHighLevelClient client;
private final ElasticsearchClient client;
private final String indexPrefix;
private final String migrationIndexName;

public ElasticSearchDBCreator(NoSqlStorageProvider noSqlStorageProvider, RestHighLevelClient client, String indexPrefix) {
public ElasticSearchDBCreator(NoSqlStorageProvider noSqlStorageProvider, ElasticsearchClient client, String indexPrefix) {
super(noSqlStorageProvider);
this.client = client;
this.indexPrefix = indexPrefix;
Expand All @@ -63,14 +58,14 @@ public void validateIndices() {
waitForHealthyCluster(client);

final List<String> requiredIndexNames = asList(Jobs.NAME, RecurringJobs.NAME, BackgroundJobServers.NAME, Metadata.NAME);
final Set<String> availableIndexNames = asSet(client.indices().get(new GetIndexRequest("*"), RequestOptions.DEFAULT).getIndices());
final Set<String> availableIndexNames = client.indices().get(r -> r.index("*")).result().keySet();
for (String requiredIndexName : requiredIndexNames) {
if (!availableIndexNames.contains(elementPrefixer(indexPrefix, elementPrefixer(JOBRUNR_PREFIX, requiredIndexName)))) {
throw new JobRunrException("Not all required indices are available by JobRunr!");
}
}
} catch (ElasticsearchStatusException e) {
if (e.status() == NOT_FOUND) {
} catch (ElasticsearchException e) {
if (e.status() == SC_NOT_FOUND) {
throw new JobRunrException("Not all required indices are available by JobRunr!");
}
throw new StorageException(e);
Expand All @@ -96,39 +91,40 @@ protected void runMigration(ElasticSearchMigration noSqlMigration) throws IOExce
}

@Override
protected boolean markMigrationAsDone(NoSqlMigration noSqlMigration) {
protected boolean markMigrationAsDone(final NoSqlMigration noSqlMigration) {
try {
XContentBuilder builder = JsonXContent.contentBuilder().prettyPrint();
builder.startObject();
builder.field(Migrations.FIELD_NAME, noSqlMigration.getClassName());
builder.field(Migrations.FIELD_DATE, Instant.now());
builder.endObject();
IndexRequest indexRequest = new IndexRequest(migrationIndexName)
.id(substringBefore(noSqlMigration.getClassName(), "_"))
.setRefreshPolicy(RefreshPolicy.IMMEDIATE)
.source(builder);
return client.index(indexRequest, RequestOptions.DEFAULT) != null;
final Map<String, Object> map = new LinkedHashMap<>();
map.put(Migrations.FIELD_NAME, noSqlMigration.getClassName());
map.put(Migrations.FIELD_DATE, Instant.now().toEpochMilli());
return client.index(r ->
r.index(migrationIndexName)
.id(substringBefore(noSqlMigration.getClassName(), "_"))
.refresh(True)
.document(map)
).result() != null;
} catch (IOException e) {
throw new StorageException(e);
}
}

private boolean isNewMigration(NoSqlMigration noSqlMigration, int retry) {
private boolean isNewMigration(final NoSqlMigration noSqlMigration, final int retry) {
sleep(retry * 500L);
try {
waitForHealthyCluster(client);
GetResponse migration = client.get(new GetRequest(migrationIndexName, substringBefore(noSqlMigration.getClassName(), "_")), RequestOptions.DEFAULT);
return !migration.isExists();
} catch (ElasticsearchStatusException e) {
final BooleanResponse migration = client.exists(
r -> r.index(migrationIndexName).id(substringBefore(noSqlMigration.getClassName(), "_"))
);
return !migration.value();
} catch (ElasticsearchException e) {
if (retry < 5) {
return isNewMigration(noSqlMigration, retry + 1);
}
if (e.status() == NOT_FOUND) {
if (e.status() == SC_NOT_FOUND) {
return true;
}
throw e;
} catch (IOException e) {
throw new StorageException(e);
}
}
}
}

0 comments on commit c14f896

Please sign in to comment.