Skip to content

Commit

Permalink
added basic sanity tests for single cluster scenario. (#393)
Browse files Browse the repository at this point in the history
Signed-off-by: Naveen Pajjuri <nppajjur@amazon.com>

Co-authored-by: Naveen Pajjuri <nppajjur@amazon.com>
  • Loading branch information
naveenpajjuri and Naveen Pajjuri committed May 9, 2022
1 parent 500a50a commit 9c4f7e2
Show file tree
Hide file tree
Showing 2 changed files with 115 additions and 1 deletion.
27 changes: 26 additions & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -401,6 +401,7 @@ integTest {
// We skip BWC test here as those get run as part of separate target `bwcTestSuite`.
filter {
excludeTestsMatching "org.opensearch.replication.bwc.*IT"
excludeTestsMatching "org.opensearch.replication.singleCluster.SingleClusterSanityIT"
}

/*
Expand Down Expand Up @@ -612,7 +613,7 @@ def securityPluginOld = new Callable<RegularFile>() {
}

// We maintain 2 set of clusters here. One for full cluster restart and one for rolling restart + mixed cluster.
List<String> clusters = ["bwcLeader0", "bwcFollower0", "bwcLeader1", "bwcFollower1"]
List<String> clusters = ["bwcLeader0", "bwcFollower0", "bwcLeader1", "bwcFollower1","singleCluster"]
// TODO: Make BWC test work with security plugin
clusters.each { name ->
testClusters {
Expand Down Expand Up @@ -858,3 +859,27 @@ task "bwcTestSuite"(type: RestIntegTestTask) {
dependsOn tasks.named("rollingUpgradeClusterTask")
dependsOn tasks.named("fullRestartClusterTask")
}

task integTestSingleCluster(type: RestIntegTestTask) {
useCluster testClusters.singleCluster
doFirst {
getClusters().forEach { cluster ->
String alltransportSocketURI = cluster.nodes.stream().flatMap { node ->
node.getAllTransportPortURI().stream()
}.collect(Collectors.joining(","))
String allHttpSocketURI = cluster.nodes.stream().flatMap { node ->
node.getAllHttpSocketURI().stream()
}.collect(Collectors.joining(","))

systemProperty "tests.cluster.${cluster.name}.http_hosts", "${-> allHttpSocketURI}"
systemProperty "tests.cluster.${cluster.name}.transport_hosts", "${-> alltransportSocketURI}"
systemProperty "tests.cluster.${cluster.name}.security_enabled", "${-> securityEnabled.toString()}"
configureCluster(cluster, securityEnabled)
}
}

filter {
setIncludePatterns("org.opensearch.replication.singleCluster.SingleClusterSanityIT")
}
nonInputProperties.systemProperty('tests.sanitySingleCluster', "integTestSingleCluster")
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
package org.opensearch.replication.singleCluster


import org.apache.logging.log4j.LogManager
import org.junit.BeforeClass
import org.opensearch.client.ResponseException
import org.opensearch.replication.MultiClusterRestTestCase
import org.opensearch.replication.StartReplicationRequest
import org.opensearch.replication.startReplication
import org.assertj.core.api.Assertions.assertThatThrownBy
import org.junit.Assert
import org.opensearch.replication.stopReplication
import java.util.stream.Collectors




class SingleClusterSanityIT : MultiClusterRestTestCase() {

companion object {
private val log = LogManager.getLogger(SingleClusterSanityIT::class.java)
private const val standaloneClusterName = "singleCluster"
private const val REPLICATION_PLUGIN_NAME = "opensearch-cross-cluster-replication"
private const val NUM_NODES = 3
private const val SAMPLE_INDEX = "sample_test_index"

@BeforeClass
@JvmStatic
fun setupTestClusters() {
val clusters = HashMap<String, TestCluster>()
clusters.put(standaloneClusterName, createTestCluster(standaloneClusterName, true, true, true, false))
testClusters = clusters
}

enum class ClusterState(val value: String) {
SINGLE_CLUSTER_SANITY_SUITE("integTestSingleCluster");

companion object {
fun from(s: String): ClusterState? = values().find { it.value == s }
}
}
}

@Throws(Exception::class)
fun testReplicationPluginWithSingleCluster() {
when(ClusterState.from(System.getProperty("tests.sanitySingleCluster"))) {
ClusterState.SINGLE_CLUSTER_SANITY_SUITE -> basicReplicationSanityWithSingleCluster()
}
}

fun basicReplicationSanityWithSingleCluster() {
verifyReplicationPluginInstallationOnAllNodes(standaloneClusterName)
VerifyReplicationApis(standaloneClusterName)
}

@Throws(java.lang.Exception::class)
private fun verifyReplicationPluginInstallationOnAllNodes(clusterName: String) {
val restClient = getClientForCluster(clusterName)
for (i in 0 until NUM_NODES) {
val responseMap = getAsMap(restClient.lowLevelClient, "_nodes/$clusterName-$i/plugins")["nodes"]
as Map<String, Map<String, Any>>?
Assert.assertTrue(responseMap!!.values.isNotEmpty())
for (response in responseMap!!.values) {
val plugins = response["plugins"] as List<Map<String, Any>>?
val pluginNames: Set<Any?> = plugins!!.stream().map { map: Map<String, Any> ->
map["name"]
}.collect(Collectors.toSet()).orEmpty()
Assert.assertTrue(pluginNames.contains(REPLICATION_PLUGIN_NAME))
}
}
}

@Throws(java.lang.Exception::class)
private fun VerifyReplicationApis(clusterName: String) {
val follower = getClientForCluster(standaloneClusterName)
assertThatThrownBy {
follower.startReplication(
StartReplicationRequest("sample_connection", SAMPLE_INDEX, SAMPLE_INDEX),
waitForRestore = true
)
}.isInstanceOf(ResponseException::class.java).hasMessageContaining("no such remote cluster")
assertThatThrownBy {
follower.stopReplication(standaloneClusterName)
}.isInstanceOf(ResponseException::class.java)
.hasMessageContaining("No replication in progress for index:"+standaloneClusterName)
}

}

0 comments on commit 9c4f7e2

Please sign in to comment.