Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Backport 2.x] Abstract FlintIndex client #2771

Merged
merged 1 commit into from
Jun 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.spark.flint;

/** Interface to abstract access to the FlintIndex */
public interface FlintIndexClient {
void deleteIndex(String indexName);
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,16 @@
package org.opensearch.sql.spark.flint.operation;

import lombok.RequiredArgsConstructor;
import org.opensearch.client.Client;
import org.opensearch.sql.spark.client.EMRServerlessClientFactory;
import org.opensearch.sql.spark.dispatcher.model.FlintIndexOptions;
import org.opensearch.sql.spark.flint.FlintIndexClient;
import org.opensearch.sql.spark.flint.FlintIndexMetadataService;
import org.opensearch.sql.spark.flint.FlintIndexStateModelService;

@RequiredArgsConstructor
public class FlintIndexOpFactory {
private final FlintIndexStateModelService flintIndexStateModelService;
private final Client client;
private final FlintIndexClient flintIndexClient;
private final FlintIndexMetadataService flintIndexMetadataService;
private final EMRServerlessClientFactory emrServerlessClientFactory;

Expand All @@ -35,7 +35,7 @@ public FlintIndexOpAlter getAlter(FlintIndexOptions flintIndexOptions, String da

public FlintIndexOpVacuum getVacuum(String datasource) {
return new FlintIndexOpVacuum(
flintIndexStateModelService, datasource, client, emrServerlessClientFactory);
flintIndexStateModelService, datasource, flintIndexClient, emrServerlessClientFactory);
}

public FlintIndexOpCancel getCancel(String datasource) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,8 @@

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.action.admin.indices.delete.DeleteIndexRequest;
import org.opensearch.action.support.master.AcknowledgedResponse;
import org.opensearch.client.Client;
import org.opensearch.sql.spark.client.EMRServerlessClientFactory;
import org.opensearch.sql.spark.flint.FlintIndexClient;
import org.opensearch.sql.spark.flint.FlintIndexMetadata;
import org.opensearch.sql.spark.flint.FlintIndexState;
import org.opensearch.sql.spark.flint.FlintIndexStateModel;
Expand All @@ -22,15 +20,15 @@ public class FlintIndexOpVacuum extends FlintIndexOp {
private static final Logger LOG = LogManager.getLogger();

/** OpenSearch client. */
private final Client client;
private final FlintIndexClient flintIndexClient;

public FlintIndexOpVacuum(
FlintIndexStateModelService flintIndexStateModelService,
String datasourceName,
Client client,
FlintIndexClient flintIndexClient,
EMRServerlessClientFactory emrServerlessClientFactory) {
super(flintIndexStateModelService, datasourceName, emrServerlessClientFactory);
this.client = client;
this.flintIndexClient = flintIndexClient;
}

@Override
Expand All @@ -46,10 +44,7 @@ FlintIndexState transitioningState() {
@Override
public void runOp(FlintIndexMetadata flintIndexMetadata, FlintIndexStateModel flintIndex) {
LOG.info("Vacuuming Flint index {}", flintIndexMetadata.getOpensearchIndexName());
DeleteIndexRequest request =
new DeleteIndexRequest().indices(flintIndexMetadata.getOpensearchIndexName());
AcknowledgedResponse response = client.admin().indices().delete(request).actionGet();
LOG.info("OpenSearch index delete result: {}", response.isAcknowledged());
flintIndexClient.deleteIndex(flintIndexMetadata.getOpensearchIndexName());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.spark.flint.operation;

import static org.junit.jupiter.api.Assertions.assertNotNull;

import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import org.opensearch.sql.spark.client.EMRServerlessClientFactory;
import org.opensearch.sql.spark.dispatcher.model.FlintIndexOptions;
import org.opensearch.sql.spark.flint.FlintIndexClient;
import org.opensearch.sql.spark.flint.FlintIndexMetadataService;
import org.opensearch.sql.spark.flint.FlintIndexStateModelService;

@ExtendWith(MockitoExtension.class)
class FlintIndexOpFactoryTest {
public static final String DATASOURCE_NAME = "DATASOURCE_NAME";

@Mock private FlintIndexStateModelService flintIndexStateModelService;
@Mock private FlintIndexClient flintIndexClient;
@Mock private FlintIndexMetadataService flintIndexMetadataService;
@Mock private EMRServerlessClientFactory emrServerlessClientFactory;

@InjectMocks FlintIndexOpFactory flintIndexOpFactory;

@Test
void getDrop() {
assertNotNull(flintIndexOpFactory.getDrop(DATASOURCE_NAME));
}

@Test
void getAlter() {
assertNotNull(flintIndexOpFactory.getAlter(new FlintIndexOptions(), DATASOURCE_NAME));
}

@Test
void getVacuum() {
assertNotNull(flintIndexOpFactory.getDrop(DATASOURCE_NAME));
}

@Test
void getCancel() {
assertNotNull(flintIndexOpFactory.getDrop(DATASOURCE_NAME));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,164 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.spark.flint.operation;

import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

import java.util.Optional;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import org.opensearch.sql.spark.client.EMRServerlessClientFactory;
import org.opensearch.sql.spark.flint.FlintIndexClient;
import org.opensearch.sql.spark.flint.FlintIndexMetadata;
import org.opensearch.sql.spark.flint.FlintIndexState;
import org.opensearch.sql.spark.flint.FlintIndexStateModel;
import org.opensearch.sql.spark.flint.FlintIndexStateModelService;

@ExtendWith(MockitoExtension.class)
class FlintIndexOpVacuumTest {

public static final String DATASOURCE_NAME = "DATASOURCE_NAME";
public static final String LATEST_ID = "LATEST_ID";
public static final String INDEX_NAME = "INDEX_NAME";
public static final FlintIndexMetadata FLINT_INDEX_METADATA_WITH_LATEST_ID =
FlintIndexMetadata.builder().latestId(LATEST_ID).opensearchIndexName(INDEX_NAME).build();
public static final FlintIndexMetadata FLINT_INDEX_METADATA_WITHOUT_LATEST_ID =
FlintIndexMetadata.builder().opensearchIndexName(INDEX_NAME).build();
@Mock FlintIndexClient flintIndexClient;
@Mock FlintIndexStateModelService flintIndexStateModelService;
@Mock EMRServerlessClientFactory emrServerlessClientFactory;
@Mock FlintIndexStateModel flintIndexStateModel;
@Mock FlintIndexStateModel transitionedFlintIndexStateModel;

RuntimeException testException = new RuntimeException("Test Exception");

FlintIndexOpVacuum flintIndexOpVacuum;

@BeforeEach
public void setUp() {
flintIndexOpVacuum =
new FlintIndexOpVacuum(
flintIndexStateModelService,
DATASOURCE_NAME,
flintIndexClient,
emrServerlessClientFactory);
}

@Test
public void testApplyWithEmptyLatestId() {
flintIndexOpVacuum.apply(FLINT_INDEX_METADATA_WITHOUT_LATEST_ID);

verify(flintIndexClient).deleteIndex(INDEX_NAME);
}

@Test
public void testApplyWithFlintIndexStateNotFound() {
when(flintIndexStateModelService.getFlintIndexStateModel(LATEST_ID, DATASOURCE_NAME))
.thenReturn(Optional.empty());

assertThrows(
IllegalStateException.class,
() -> flintIndexOpVacuum.apply(FLINT_INDEX_METADATA_WITH_LATEST_ID));
}

@Test
public void testApplyWithNotDeletedState() {
when(flintIndexStateModelService.getFlintIndexStateModel(LATEST_ID, DATASOURCE_NAME))
.thenReturn(Optional.of(flintIndexStateModel));
when(flintIndexStateModel.getIndexState()).thenReturn(FlintIndexState.ACTIVE);

assertThrows(
IllegalStateException.class,
() -> flintIndexOpVacuum.apply(FLINT_INDEX_METADATA_WITH_LATEST_ID));
}

@Test
public void testApplyWithUpdateFlintIndexStateThrow() {
when(flintIndexStateModelService.getFlintIndexStateModel(LATEST_ID, DATASOURCE_NAME))
.thenReturn(Optional.of(flintIndexStateModel));
when(flintIndexStateModel.getIndexState()).thenReturn(FlintIndexState.DELETED);
when(flintIndexStateModelService.updateFlintIndexState(
flintIndexStateModel, FlintIndexState.VACUUMING, DATASOURCE_NAME))
.thenThrow(testException);

assertThrows(
IllegalStateException.class,
() -> flintIndexOpVacuum.apply(FLINT_INDEX_METADATA_WITH_LATEST_ID));
}

@Test
public void testApplyWithRunOpThrow() {
when(flintIndexStateModelService.getFlintIndexStateModel(LATEST_ID, DATASOURCE_NAME))
.thenReturn(Optional.of(flintIndexStateModel));
when(flintIndexStateModel.getIndexState()).thenReturn(FlintIndexState.DELETED);
when(flintIndexStateModelService.updateFlintIndexState(
flintIndexStateModel, FlintIndexState.VACUUMING, DATASOURCE_NAME))
.thenReturn(transitionedFlintIndexStateModel);
doThrow(testException).when(flintIndexClient).deleteIndex(INDEX_NAME);

assertThrows(
Exception.class, () -> flintIndexOpVacuum.apply(FLINT_INDEX_METADATA_WITH_LATEST_ID));

verify(flintIndexStateModelService)
.updateFlintIndexState(
transitionedFlintIndexStateModel, FlintIndexState.DELETED, DATASOURCE_NAME);
}

@Test
public void testApplyWithRunOpThrowAndRollbackThrow() {
when(flintIndexStateModelService.getFlintIndexStateModel(LATEST_ID, DATASOURCE_NAME))
.thenReturn(Optional.of(flintIndexStateModel));
when(flintIndexStateModel.getIndexState()).thenReturn(FlintIndexState.DELETED);
when(flintIndexStateModelService.updateFlintIndexState(
flintIndexStateModel, FlintIndexState.VACUUMING, DATASOURCE_NAME))
.thenReturn(transitionedFlintIndexStateModel);
doThrow(testException).when(flintIndexClient).deleteIndex(INDEX_NAME);
when(flintIndexStateModelService.updateFlintIndexState(
transitionedFlintIndexStateModel, FlintIndexState.DELETED, DATASOURCE_NAME))
.thenThrow(testException);

assertThrows(
Exception.class, () -> flintIndexOpVacuum.apply(FLINT_INDEX_METADATA_WITH_LATEST_ID));
}

@Test
public void testApplyWithDeleteFlintIndexStateModelThrow() {
when(flintIndexStateModelService.getFlintIndexStateModel(LATEST_ID, DATASOURCE_NAME))
.thenReturn(Optional.of(flintIndexStateModel));
when(flintIndexStateModel.getIndexState()).thenReturn(FlintIndexState.DELETED);
when(flintIndexStateModelService.updateFlintIndexState(
flintIndexStateModel, FlintIndexState.VACUUMING, DATASOURCE_NAME))
.thenReturn(transitionedFlintIndexStateModel);
when(flintIndexStateModelService.deleteFlintIndexStateModel(LATEST_ID, DATASOURCE_NAME))
.thenThrow(testException);

assertThrows(
IllegalStateException.class,
() -> flintIndexOpVacuum.apply(FLINT_INDEX_METADATA_WITH_LATEST_ID));
}

@Test
public void testApplyHappyPath() {
when(flintIndexStateModelService.getFlintIndexStateModel(LATEST_ID, DATASOURCE_NAME))
.thenReturn(Optional.of(flintIndexStateModel));
when(flintIndexStateModel.getIndexState()).thenReturn(FlintIndexState.DELETED);
when(flintIndexStateModelService.updateFlintIndexState(
flintIndexStateModel, FlintIndexState.VACUUMING, DATASOURCE_NAME))
.thenReturn(transitionedFlintIndexStateModel);
when(transitionedFlintIndexStateModel.getLatestId()).thenReturn(LATEST_ID);

flintIndexOpVacuum.apply(FLINT_INDEX_METADATA_WITH_LATEST_ID);

verify(flintIndexStateModelService).deleteFlintIndexStateModel(LATEST_ID, DATASOURCE_NAME);
verify(flintIndexClient).deleteIndex(INDEX_NAME);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.spark.flint;

import lombok.RequiredArgsConstructor;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.action.admin.indices.delete.DeleteIndexRequest;
import org.opensearch.action.support.master.AcknowledgedResponse;
import org.opensearch.client.Client;

@RequiredArgsConstructor
public class OpenSearchFlintIndexClient implements FlintIndexClient {
private static final Logger LOG = LogManager.getLogger();

private final Client client;

@Override
public void deleteIndex(String indexName) {
DeleteIndexRequest request = new DeleteIndexRequest().indices(indexName);
AcknowledgedResponse response = client.admin().indices().delete(request).actionGet();
LOG.info("OpenSearch index delete result: {}", response.isAcknowledged());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,11 @@
import org.opensearch.sql.spark.execution.xcontent.FlintIndexStateModelXContentSerializer;
import org.opensearch.sql.spark.execution.xcontent.SessionModelXContentSerializer;
import org.opensearch.sql.spark.execution.xcontent.StatementModelXContentSerializer;
import org.opensearch.sql.spark.flint.FlintIndexClient;
import org.opensearch.sql.spark.flint.FlintIndexMetadataServiceImpl;
import org.opensearch.sql.spark.flint.FlintIndexStateModelService;
import org.opensearch.sql.spark.flint.IndexDMLResultStorageService;
import org.opensearch.sql.spark.flint.OpenSearchFlintIndexClient;
import org.opensearch.sql.spark.flint.OpenSearchFlintIndexStateModelService;
import org.opensearch.sql.spark.flint.OpenSearchIndexDMLResultStorageService;
import org.opensearch.sql.spark.flint.operation.FlintIndexOpFactory;
Expand Down Expand Up @@ -124,11 +126,19 @@ public QueryHandlerFactory queryhandlerFactory(
@Provides
public FlintIndexOpFactory flintIndexOpFactory(
FlintIndexStateModelService flintIndexStateModelService,
NodeClient client,
FlintIndexClient flintIndexClient,
FlintIndexMetadataServiceImpl flintIndexMetadataService,
EMRServerlessClientFactory emrServerlessClientFactory) {
return new FlintIndexOpFactory(
flintIndexStateModelService, client, flintIndexMetadataService, emrServerlessClientFactory);
flintIndexStateModelService,
flintIndexClient,
flintIndexMetadataService,
emrServerlessClientFactory);
}

@Provides
public FlintIndexClient flintIndexClient(NodeClient nodeClient) {
return new OpenSearchFlintIndexClient(nodeClient);
}

@Provides
Expand Down
Loading
Loading