Skip to content

Commit

Permalink
Abstract FlintIndex client (#2755) (#2771)
Browse files Browse the repository at this point in the history
* Abstract FlintIndex client



* Fix log



* Fix test function name



---------


(cherry picked from commit b2403ca)

Signed-off-by: Tomoyuki Morita <moritato@amazon.com>
Signed-off-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
Co-authored-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
  • Loading branch information
1 parent f273301 commit 93588c8
Show file tree
Hide file tree
Showing 8 changed files with 282 additions and 17 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.spark.flint;

/** Interface to abstract access to the FlintIndex */
public interface FlintIndexClient {
void deleteIndex(String indexName);
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,16 @@
package org.opensearch.sql.spark.flint.operation;

import lombok.RequiredArgsConstructor;
import org.opensearch.client.Client;
import org.opensearch.sql.spark.client.EMRServerlessClientFactory;
import org.opensearch.sql.spark.dispatcher.model.FlintIndexOptions;
import org.opensearch.sql.spark.flint.FlintIndexClient;
import org.opensearch.sql.spark.flint.FlintIndexMetadataService;
import org.opensearch.sql.spark.flint.FlintIndexStateModelService;

@RequiredArgsConstructor
public class FlintIndexOpFactory {
private final FlintIndexStateModelService flintIndexStateModelService;
private final Client client;
private final FlintIndexClient flintIndexClient;
private final FlintIndexMetadataService flintIndexMetadataService;
private final EMRServerlessClientFactory emrServerlessClientFactory;

Expand All @@ -35,7 +35,7 @@ public FlintIndexOpAlter getAlter(FlintIndexOptions flintIndexOptions, String da

public FlintIndexOpVacuum getVacuum(String datasource) {
return new FlintIndexOpVacuum(
flintIndexStateModelService, datasource, client, emrServerlessClientFactory);
flintIndexStateModelService, datasource, flintIndexClient, emrServerlessClientFactory);
}

public FlintIndexOpCancel getCancel(String datasource) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,8 @@

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.action.admin.indices.delete.DeleteIndexRequest;
import org.opensearch.action.support.master.AcknowledgedResponse;
import org.opensearch.client.Client;
import org.opensearch.sql.spark.client.EMRServerlessClientFactory;
import org.opensearch.sql.spark.flint.FlintIndexClient;
import org.opensearch.sql.spark.flint.FlintIndexMetadata;
import org.opensearch.sql.spark.flint.FlintIndexState;
import org.opensearch.sql.spark.flint.FlintIndexStateModel;
Expand All @@ -22,15 +20,15 @@ public class FlintIndexOpVacuum extends FlintIndexOp {
private static final Logger LOG = LogManager.getLogger();

/** OpenSearch client. */
private final Client client;
private final FlintIndexClient flintIndexClient;

public FlintIndexOpVacuum(
FlintIndexStateModelService flintIndexStateModelService,
String datasourceName,
Client client,
FlintIndexClient flintIndexClient,
EMRServerlessClientFactory emrServerlessClientFactory) {
super(flintIndexStateModelService, datasourceName, emrServerlessClientFactory);
this.client = client;
this.flintIndexClient = flintIndexClient;
}

@Override
Expand All @@ -46,10 +44,7 @@ FlintIndexState transitioningState() {
@Override
public void runOp(FlintIndexMetadata flintIndexMetadata, FlintIndexStateModel flintIndex) {
LOG.info("Vacuuming Flint index {}", flintIndexMetadata.getOpensearchIndexName());
DeleteIndexRequest request =
new DeleteIndexRequest().indices(flintIndexMetadata.getOpensearchIndexName());
AcknowledgedResponse response = client.admin().indices().delete(request).actionGet();
LOG.info("OpenSearch index delete result: {}", response.isAcknowledged());
flintIndexClient.deleteIndex(flintIndexMetadata.getOpensearchIndexName());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.spark.flint.operation;

import static org.junit.jupiter.api.Assertions.assertNotNull;

import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import org.opensearch.sql.spark.client.EMRServerlessClientFactory;
import org.opensearch.sql.spark.dispatcher.model.FlintIndexOptions;
import org.opensearch.sql.spark.flint.FlintIndexClient;
import org.opensearch.sql.spark.flint.FlintIndexMetadataService;
import org.opensearch.sql.spark.flint.FlintIndexStateModelService;

@ExtendWith(MockitoExtension.class)
class FlintIndexOpFactoryTest {
public static final String DATASOURCE_NAME = "DATASOURCE_NAME";

@Mock private FlintIndexStateModelService flintIndexStateModelService;
@Mock private FlintIndexClient flintIndexClient;
@Mock private FlintIndexMetadataService flintIndexMetadataService;
@Mock private EMRServerlessClientFactory emrServerlessClientFactory;

@InjectMocks FlintIndexOpFactory flintIndexOpFactory;

@Test
void getDrop() {
assertNotNull(flintIndexOpFactory.getDrop(DATASOURCE_NAME));
}

@Test
void getAlter() {
assertNotNull(flintIndexOpFactory.getAlter(new FlintIndexOptions(), DATASOURCE_NAME));
}

@Test
void getVacuum() {
assertNotNull(flintIndexOpFactory.getDrop(DATASOURCE_NAME));
}

@Test
void getCancel() {
assertNotNull(flintIndexOpFactory.getDrop(DATASOURCE_NAME));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,164 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.spark.flint.operation;

import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

import java.util.Optional;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import org.opensearch.sql.spark.client.EMRServerlessClientFactory;
import org.opensearch.sql.spark.flint.FlintIndexClient;
import org.opensearch.sql.spark.flint.FlintIndexMetadata;
import org.opensearch.sql.spark.flint.FlintIndexState;
import org.opensearch.sql.spark.flint.FlintIndexStateModel;
import org.opensearch.sql.spark.flint.FlintIndexStateModelService;

@ExtendWith(MockitoExtension.class)
class FlintIndexOpVacuumTest {

public static final String DATASOURCE_NAME = "DATASOURCE_NAME";
public static final String LATEST_ID = "LATEST_ID";
public static final String INDEX_NAME = "INDEX_NAME";
public static final FlintIndexMetadata FLINT_INDEX_METADATA_WITH_LATEST_ID =
FlintIndexMetadata.builder().latestId(LATEST_ID).opensearchIndexName(INDEX_NAME).build();
public static final FlintIndexMetadata FLINT_INDEX_METADATA_WITHOUT_LATEST_ID =
FlintIndexMetadata.builder().opensearchIndexName(INDEX_NAME).build();
@Mock FlintIndexClient flintIndexClient;
@Mock FlintIndexStateModelService flintIndexStateModelService;
@Mock EMRServerlessClientFactory emrServerlessClientFactory;
@Mock FlintIndexStateModel flintIndexStateModel;
@Mock FlintIndexStateModel transitionedFlintIndexStateModel;

RuntimeException testException = new RuntimeException("Test Exception");

FlintIndexOpVacuum flintIndexOpVacuum;

@BeforeEach
public void setUp() {
flintIndexOpVacuum =
new FlintIndexOpVacuum(
flintIndexStateModelService,
DATASOURCE_NAME,
flintIndexClient,
emrServerlessClientFactory);
}

@Test
public void testApplyWithEmptyLatestId() {
flintIndexOpVacuum.apply(FLINT_INDEX_METADATA_WITHOUT_LATEST_ID);

verify(flintIndexClient).deleteIndex(INDEX_NAME);
}

@Test
public void testApplyWithFlintIndexStateNotFound() {
when(flintIndexStateModelService.getFlintIndexStateModel(LATEST_ID, DATASOURCE_NAME))
.thenReturn(Optional.empty());

assertThrows(
IllegalStateException.class,
() -> flintIndexOpVacuum.apply(FLINT_INDEX_METADATA_WITH_LATEST_ID));
}

@Test
public void testApplyWithNotDeletedState() {
when(flintIndexStateModelService.getFlintIndexStateModel(LATEST_ID, DATASOURCE_NAME))
.thenReturn(Optional.of(flintIndexStateModel));
when(flintIndexStateModel.getIndexState()).thenReturn(FlintIndexState.ACTIVE);

assertThrows(
IllegalStateException.class,
() -> flintIndexOpVacuum.apply(FLINT_INDEX_METADATA_WITH_LATEST_ID));
}

@Test
public void testApplyWithUpdateFlintIndexStateThrow() {
when(flintIndexStateModelService.getFlintIndexStateModel(LATEST_ID, DATASOURCE_NAME))
.thenReturn(Optional.of(flintIndexStateModel));
when(flintIndexStateModel.getIndexState()).thenReturn(FlintIndexState.DELETED);
when(flintIndexStateModelService.updateFlintIndexState(
flintIndexStateModel, FlintIndexState.VACUUMING, DATASOURCE_NAME))
.thenThrow(testException);

assertThrows(
IllegalStateException.class,
() -> flintIndexOpVacuum.apply(FLINT_INDEX_METADATA_WITH_LATEST_ID));
}

@Test
public void testApplyWithRunOpThrow() {
when(flintIndexStateModelService.getFlintIndexStateModel(LATEST_ID, DATASOURCE_NAME))
.thenReturn(Optional.of(flintIndexStateModel));
when(flintIndexStateModel.getIndexState()).thenReturn(FlintIndexState.DELETED);
when(flintIndexStateModelService.updateFlintIndexState(
flintIndexStateModel, FlintIndexState.VACUUMING, DATASOURCE_NAME))
.thenReturn(transitionedFlintIndexStateModel);
doThrow(testException).when(flintIndexClient).deleteIndex(INDEX_NAME);

assertThrows(
Exception.class, () -> flintIndexOpVacuum.apply(FLINT_INDEX_METADATA_WITH_LATEST_ID));

verify(flintIndexStateModelService)
.updateFlintIndexState(
transitionedFlintIndexStateModel, FlintIndexState.DELETED, DATASOURCE_NAME);
}

@Test
public void testApplyWithRunOpThrowAndRollbackThrow() {
when(flintIndexStateModelService.getFlintIndexStateModel(LATEST_ID, DATASOURCE_NAME))
.thenReturn(Optional.of(flintIndexStateModel));
when(flintIndexStateModel.getIndexState()).thenReturn(FlintIndexState.DELETED);
when(flintIndexStateModelService.updateFlintIndexState(
flintIndexStateModel, FlintIndexState.VACUUMING, DATASOURCE_NAME))
.thenReturn(transitionedFlintIndexStateModel);
doThrow(testException).when(flintIndexClient).deleteIndex(INDEX_NAME);
when(flintIndexStateModelService.updateFlintIndexState(
transitionedFlintIndexStateModel, FlintIndexState.DELETED, DATASOURCE_NAME))
.thenThrow(testException);

assertThrows(
Exception.class, () -> flintIndexOpVacuum.apply(FLINT_INDEX_METADATA_WITH_LATEST_ID));
}

@Test
public void testApplyWithDeleteFlintIndexStateModelThrow() {
when(flintIndexStateModelService.getFlintIndexStateModel(LATEST_ID, DATASOURCE_NAME))
.thenReturn(Optional.of(flintIndexStateModel));
when(flintIndexStateModel.getIndexState()).thenReturn(FlintIndexState.DELETED);
when(flintIndexStateModelService.updateFlintIndexState(
flintIndexStateModel, FlintIndexState.VACUUMING, DATASOURCE_NAME))
.thenReturn(transitionedFlintIndexStateModel);
when(flintIndexStateModelService.deleteFlintIndexStateModel(LATEST_ID, DATASOURCE_NAME))
.thenThrow(testException);

assertThrows(
IllegalStateException.class,
() -> flintIndexOpVacuum.apply(FLINT_INDEX_METADATA_WITH_LATEST_ID));
}

@Test
public void testApplyHappyPath() {
when(flintIndexStateModelService.getFlintIndexStateModel(LATEST_ID, DATASOURCE_NAME))
.thenReturn(Optional.of(flintIndexStateModel));
when(flintIndexStateModel.getIndexState()).thenReturn(FlintIndexState.DELETED);
when(flintIndexStateModelService.updateFlintIndexState(
flintIndexStateModel, FlintIndexState.VACUUMING, DATASOURCE_NAME))
.thenReturn(transitionedFlintIndexStateModel);
when(transitionedFlintIndexStateModel.getLatestId()).thenReturn(LATEST_ID);

flintIndexOpVacuum.apply(FLINT_INDEX_METADATA_WITH_LATEST_ID);

verify(flintIndexStateModelService).deleteFlintIndexStateModel(LATEST_ID, DATASOURCE_NAME);
verify(flintIndexClient).deleteIndex(INDEX_NAME);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.spark.flint;

import lombok.RequiredArgsConstructor;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.action.admin.indices.delete.DeleteIndexRequest;
import org.opensearch.action.support.master.AcknowledgedResponse;
import org.opensearch.client.Client;

@RequiredArgsConstructor
public class OpenSearchFlintIndexClient implements FlintIndexClient {
private static final Logger LOG = LogManager.getLogger();

private final Client client;

@Override
public void deleteIndex(String indexName) {
DeleteIndexRequest request = new DeleteIndexRequest().indices(indexName);
AcknowledgedResponse response = client.admin().indices().delete(request).actionGet();
LOG.info("OpenSearch index delete result: {}", response.isAcknowledged());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,11 @@
import org.opensearch.sql.spark.execution.xcontent.FlintIndexStateModelXContentSerializer;
import org.opensearch.sql.spark.execution.xcontent.SessionModelXContentSerializer;
import org.opensearch.sql.spark.execution.xcontent.StatementModelXContentSerializer;
import org.opensearch.sql.spark.flint.FlintIndexClient;
import org.opensearch.sql.spark.flint.FlintIndexMetadataServiceImpl;
import org.opensearch.sql.spark.flint.FlintIndexStateModelService;
import org.opensearch.sql.spark.flint.IndexDMLResultStorageService;
import org.opensearch.sql.spark.flint.OpenSearchFlintIndexClient;
import org.opensearch.sql.spark.flint.OpenSearchFlintIndexStateModelService;
import org.opensearch.sql.spark.flint.OpenSearchIndexDMLResultStorageService;
import org.opensearch.sql.spark.flint.operation.FlintIndexOpFactory;
Expand Down Expand Up @@ -124,11 +126,19 @@ public QueryHandlerFactory queryhandlerFactory(
@Provides
public FlintIndexOpFactory flintIndexOpFactory(
FlintIndexStateModelService flintIndexStateModelService,
NodeClient client,
FlintIndexClient flintIndexClient,
FlintIndexMetadataServiceImpl flintIndexMetadataService,
EMRServerlessClientFactory emrServerlessClientFactory) {
return new FlintIndexOpFactory(
flintIndexStateModelService, client, flintIndexMetadataService, emrServerlessClientFactory);
flintIndexStateModelService,
flintIndexClient,
flintIndexMetadataService,
emrServerlessClientFactory);
}

@Provides
public FlintIndexClient flintIndexClient(NodeClient nodeClient) {
return new OpenSearchFlintIndexClient(nodeClient);
}

@Provides
Expand Down
Loading

0 comments on commit 93588c8

Please sign in to comment.