diff --git a/.github/workflows/build.yaml b/.github/workflows/build.yaml
index 8be9221d..45316a4d 100644
--- a/.github/workflows/build.yaml
+++ b/.github/workflows/build.yaml
@@ -9,6 +9,8 @@ name: Operator CI
on:
workflow_dispatch:
+ schedule:
+ - cron: '0 */2 * * *'
push:
branches-ignore:
- gh-pages
@@ -101,10 +103,10 @@ jobs:
sudo chown -R runner:runner /run/systemd/resolve/stub-resolv.conf
sudo echo nameserver 8.8.8.8 > /run/systemd/resolve/stub-resolv.conf
- - name: Code Review
- shell: bash
- run: |
- make code-review
+# - name: Code Review
+# shell: bash
+# run: |
+# make code-review
- name: Start KinD Cluster
# Start a KinD K8s cluster with single worker node
@@ -128,58 +130,62 @@ jobs:
run: |
make kind-load
- - name: Unit Tests
+ - name: IDRS Remote Tests
shell: bash
- run: make test-all
-
- - name: E2E Local Tests
- shell: bash
- run: make e2e-local-test
-
- - name: E2E Remote Tests
- shell: bash
- run: make e2e-test
-
- - name: Helm Chart Tests
- shell: bash
- run: |
- make e2e-helm-test
-
- - name: Upload Manifests
- uses: actions/upload-artifact@v1
- if: success()
- with:
- name: coherence-operator-manifests.tar.gz
- path: build/_output/coherence-operator-manifests.tar.gz
-
- - name: Upload Yaml
- uses: actions/upload-artifact@v1
- if: success()
- with:
- name: coherence-operator.yaml
- path: build/_output/coherence-operator.yaml
-
- - name: Upload CRD
- uses: actions/upload-artifact@v1
- if: success()
- with:
- name: coherence.oracle.com_coherence.yaml
- path: build/_output/manifests/crd/coherence.oracle.com_coherence.yaml
+ run: make idrs-test
+
+# - name: Unit Tests
+# shell: bash
+# run: make test-all
+#
+# - name: E2E Local Tests
+# shell: bash
+# run: make e2e-local-test
+#
+# - name: E2E Remote Tests
+# shell: bash
+# run: make e2e-test
+#
+# - name: Helm Chart Tests
+# shell: bash
+# run: |
+# make e2e-helm-test
+#
+# - name: Upload Manifests
+# uses: actions/upload-artifact@v1
+# if: success()
+# with:
+# name: coherence-operator-manifests.tar.gz
+# path: build/_output/coherence-operator-manifests.tar.gz
+#
+# - name: Upload Yaml
+# uses: actions/upload-artifact@v1
+# if: success()
+# with:
+# name: coherence-operator.yaml
+# path: build/_output/coherence-operator.yaml
+#
+# - name: Upload CRD
+# uses: actions/upload-artifact@v1
+# if: success()
+# with:
+# name: coherence.oracle.com_coherence.yaml
+# path: build/_output/manifests/crd/coherence.oracle.com_coherence.yaml
- uses: actions/upload-artifact@v1
if: failure()
with:
name: test-output
path: build/_output/test-logs
-
- - name: Deploy Snapshots & Docs
- if: ${{ github.ref == 'refs/heads/main' && success() }}
- env:
- MAVEN_SONATYPE_USERNAME: ${{ secrets.MAVEN_SONATYPE_USERNAME }}
- MAVEN_SONATYPE_TOKEN: ${{ secrets.MAVEN_SONATYPE_TOKEN }}
- shell: bash
- run: |
- git config --local user.name "Github Action"
- git config --local user.email "$GITHUB_ACTOR@users.noreply.github.com"
- export NO_DAEMON=false
- make push-snapshot-docs || true
+#
+# - name: Deploy Snapshots & Docs
+# if: ${{ github.ref == 'refs/heads/main' && success() }}
+# env:
+# MAVEN_SONATYPE_USERNAME: ${{ secrets.MAVEN_SONATYPE_USERNAME }}
+# MAVEN_SONATYPE_TOKEN: ${{ secrets.MAVEN_SONATYPE_TOKEN }}
+# shell: bash
+# run: |
+# git config --local user.name "Github Action"
+# git config --local user.email "$GITHUB_ACTOR@users.noreply.github.com"
+# export NO_DAEMON=false
+# make push-snapshot-docs || true
diff --git a/Makefile b/Makefile
index 8517fa29..49a7d463 100644
--- a/Makefile
+++ b/Makefile
@@ -967,6 +967,49 @@ run-e2e-test: gotestsum ## Run the Operator 'remote' end-to-end functional test
-- $(GO_TEST_FLAGS_E2E) ./test/e2e/remote/...
+# ----------------------------------------------------------------------------------------------------------------------
+# Executes the Go end-to-end tests that require a k8s cluster using
+# a DEPLOYED operator instance (i.e. the operator Docker image is
+# deployed to k8s). These tests will use whichever k8s cluster the
+# local environment is pointing to.
+# ----------------------------------------------------------------------------------------------------------------------
+.PHONY: idrs-test
+idrs-test: export MF = $(MAKEFLAGS)
+idrs-test: prepare-idrs-test ## Run the Operator end-to-end 'IDRS' functional tests using an Operator deployed in k8s
+ $(MAKE) run-idrs-test $${MF} \
+ ; rc=$$? \
+ ; $(MAKE) undeploy $${MF} \
+ ; $(MAKE) delete-namespace $${MF} \
+ ; exit $$rc
+
+.PHONY: prepare-idrs-test
+prepare-idrs-test: $(BUILD_TARGETS)/build-operator reset-namespace create-ssl-secrets install-crds deploy-and-wait
+
+.PHONY: run-idrs-test
+run-idrs-test: export CGO_ENABLED = 0
+run-idrs-test: export TEST_SSL_SECRET := $(TEST_SSL_SECRET)
+run-idrs-test: export OPERATOR_NAMESPACE := $(OPERATOR_NAMESPACE)
+run-idrs-test: export CLUSTER_NAMESPACE := $(CLUSTER_NAMESPACE)
+run-idrs-test: export OPERATOR_NAMESPACE_CLIENT := $(OPERATOR_NAMESPACE_CLIENT)
+run-idrs-test: export BUILD_OUTPUT := $(BUILD_OUTPUT)
+run-idrs-test: export TEST_COHERENCE_IMAGE := $(TEST_COHERENCE_IMAGE)
+run-idrs-test: export TEST_IMAGE_PULL_POLICY := $(IMAGE_PULL_POLICY)
+run-idrs-test: export TEST_STORAGE_CLASS := $(TEST_STORAGE_CLASS)
+run-idrs-test: export TEST_ASSET_KUBECTL := $(TEST_ASSET_KUBECTL)
+run-idrs-test: export VERSION := $(VERSION)
+run-idrs-test: export OPERATOR_IMAGE := $(OPERATOR_IMAGE)
+run-idrs-test: export COHERENCE_IMAGE := $(COHERENCE_IMAGE)
+run-idrs-test: export TEST_APPLICATION_IMAGE := $(TEST_APPLICATION_IMAGE)
+run-idrs-test: export TEST_APPLICATION_IMAGE_CLIENT := $(TEST_APPLICATION_IMAGE_CLIENT)
+run-idrs-test: export TEST_APPLICATION_IMAGE_HELIDON := $(TEST_APPLICATION_IMAGE_HELIDON)
+run-idrs-test: export TEST_APPLICATION_IMAGE_SPRING := $(TEST_APPLICATION_IMAGE_SPRING)
+run-idrs-test: export TEST_APPLICATION_IMAGE_SPRING_FAT := $(TEST_APPLICATION_IMAGE_SPRING_FAT)
+run-idrs-test: export TEST_APPLICATION_IMAGE_SPRING_CNBP := $(TEST_APPLICATION_IMAGE_SPRING_CNBP)
+run-idrs-test: gotestsum ## Run the Operator 'IDRS' end-to-end functional tests using an ALREADY DEPLOYED Operator
+ $(GOTESTSUM) --format standard-verbose --junitfile $(TEST_LOGS_DIR)/operator-idrs-test.xml \
+ -- $(GO_TEST_FLAGS_E2E) ./test/e2e/idrs/...
+
+
# ----------------------------------------------------------------------------------------------------------------------
# Run the end-to-end Coherence client tests.
# ----------------------------------------------------------------------------------------------------------------------
diff --git a/java/operator-test/pom.xml b/java/operator-test/pom.xml
index e338af6e..65ab23ea 100644
--- a/java/operator-test/pom.xml
+++ b/java/operator-test/pom.xml
@@ -28,20 +28,41 @@
com.oracle.coherence.ce
coherence
+ ${coherence.version.2206}
com.oracle.coherence.ce
coherence-grpc-proxy
+ ${coherence.version.2206}
com.oracle.coherence.ce
coherence-management
- ${coherence.version}
+ ${coherence.version.2206}
com.oracle.coherence.ce
coherence-metrics
- ${coherence.version}
+ ${coherence.version.2206}
+
+
+
+
+ com.google.cloud.tools
+ jib-maven-plugin
+ ${version.plugin.jib}
+
+
+ docker://${coherence.test.base.image}
+
+
+ com.tangosol.net.Coherence
+ OCI
+
+
+
+
+
diff --git a/java/operator-test/src/main/java/com/oracle/coherence/idrs/ChannelPosition.java b/java/operator-test/src/main/java/com/oracle/coherence/idrs/ChannelPosition.java
new file mode 100644
index 00000000..e6a8b7ea
--- /dev/null
+++ b/java/operator-test/src/main/java/com/oracle/coherence/idrs/ChannelPosition.java
@@ -0,0 +1,101 @@
+package com.oracle.coherence.idrs;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Objects;
+
+import com.tangosol.internal.net.topic.impl.paged.model.PagedPosition;
+import com.tangosol.internal.net.topic.impl.paged.model.SubscriberId;
+import com.tangosol.io.ExternalizableLite;
+import com.tangosol.util.ExternalizableHelper;
+
+public class ChannelPosition
+ implements ExternalizableLite
+ {
+ public ChannelPosition()
+ {
+ }
+
+ public ChannelPosition(int nChannel, PagedPosition position, SubscriberId subscriberId, int[] anChannel)
+ {
+ m_nChannel = nChannel;
+ m_position = position;
+ m_subscriberId = subscriberId;
+ m_anChannel = anChannel;
+ }
+
+ public int getChannel()
+ {
+ return m_nChannel;
+ }
+
+ public PagedPosition getPosition()
+ {
+ return m_position;
+ }
+
+ @Override
+ public void readExternal(DataInput in) throws IOException
+ {
+ m_nChannel = in.readInt();
+ m_position = ExternalizableHelper.readObject(in);
+ m_subscriberId = ExternalizableHelper.readObject(in);
+ int c = in.readInt();
+ m_anChannel = new int[c];
+ for (int i = 0; i < c; i++)
+ {
+ m_anChannel[i] = in.readInt();
+ }
+ }
+
+ @Override
+ public void writeExternal(DataOutput out) throws IOException
+ {
+ out.writeInt(m_nChannel);
+ ExternalizableHelper.writeObject(out, m_position);
+ ExternalizableHelper.writeObject(out, m_subscriberId);
+ out.writeInt(m_anChannel.length);
+ for (int i : m_anChannel)
+ {
+ out.writeInt(i);
+ }
+ }
+
+ @Override
+ public boolean equals(Object o)
+ {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ ChannelPosition that = (ChannelPosition) o;
+ return m_nChannel == that.m_nChannel && Objects.equals(m_position, that.m_position);
+ }
+
+ @Override
+ public int hashCode()
+ {
+ return Objects.hash(m_nChannel, m_position);
+ }
+
+ @Override
+ public String toString()
+ {
+ return "ChannelPosition{" +
+ "channel=" + m_nChannel +
+ ", position=" + m_position +
+ ", subscriber=" + m_subscriberId +
+ ", channels=" + Arrays.toString(m_anChannel) +
+ '}';
+ }
+
+ // ----- data members ---------------------------------------------------
+
+ private int m_nChannel;
+
+ private PagedPosition m_position;
+
+ private SubscriberId m_subscriberId;
+
+ private int[] m_anChannel;
+ }
diff --git a/java/operator-test/src/main/java/com/oracle/coherence/idrs/Constants.java b/java/operator-test/src/main/java/com/oracle/coherence/idrs/Constants.java
new file mode 100644
index 00000000..f9e552c9
--- /dev/null
+++ b/java/operator-test/src/main/java/com/oracle/coherence/idrs/Constants.java
@@ -0,0 +1,32 @@
+package com.oracle.coherence.idrs;
+
+public interface Constants
+ {
+ String PROP_TOPIC = "coherence.test.topic";
+
+ String PROP_GROUP = "coherence.test.group";
+
+ String PROP_STATE_TOPIC = "coherence.test.state.topic";
+
+ String PROP_STATE_GROUP = "coherence.test.state.group";
+
+ String PROP_PUBLISH_DELAY = "coherence.test.publish.delay";
+
+ String PROP_SUBSCRIBER_MIN_TTL = "coherence.test.subscriber.min.ttl";
+
+ String PROP_SUBSCRIBER_EXTRA_TTL = "coherence.test.subscriber.extra.ttl";
+
+ String DEFAULT_TOPIC = "test";
+
+ String DEFAULT_GROUP = "test-group";
+
+ String DEFAULT_STATE_TOPIC = "test-state";
+
+ String DEFAULT_STATE_GROUP= "test-state-group";
+
+ long DEFAULT_PUBLISH_DELAY = 10L;
+
+ int DEFAULT_SUBSCRIBER_MIN_TTL = 60;
+
+ int DEFAULT_SUBSCRIBER_EXTRA_TTL = 60;
+ }
diff --git a/java/operator-test/src/main/java/com/oracle/coherence/idrs/PublisherMain.java b/java/operator-test/src/main/java/com/oracle/coherence/idrs/PublisherMain.java
new file mode 100644
index 00000000..918d2736
--- /dev/null
+++ b/java/operator-test/src/main/java/com/oracle/coherence/idrs/PublisherMain.java
@@ -0,0 +1,68 @@
+package com.oracle.coherence.idrs;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Random;
+import java.util.concurrent.TimeUnit;
+
+import com.oracle.coherence.common.base.Blocking;
+import com.oracle.coherence.common.base.Logger;
+import com.oracle.coherence.common.base.Randoms;
+
+import com.tangosol.coherence.config.Config;
+import com.tangosol.net.Coherence;
+import com.tangosol.net.Session;
+import com.tangosol.net.topic.NamedTopic;
+import com.tangosol.net.topic.Publisher;
+
+@SuppressWarnings("unchecked")
+public class PublisherMain
+ implements Constants
+ {
+
+ @SuppressWarnings("InfiniteLoopStatement")
+ public static void main(String[] args) throws Exception
+ {
+ try (Coherence coherence = Coherence.clusterMember())
+ {
+ coherence.start().get(5, TimeUnit.MINUTES);
+
+ Session session = coherence.getSession();
+ String sTopicMsg = Config.getProperty(PROP_TOPIC, DEFAULT_TOPIC);
+ NamedTopic topicMsg = session.getTopic(sTopicMsg);
+ String sMsg = Randoms.getRandomString(100, 100, true);
+ long nDelay = Config.getLong(PROP_PUBLISH_DELAY, DEFAULT_PUBLISH_DELAY);
+ Random rnd = new Random(System.currentTimeMillis());
+
+ try (Publisher publisher = topicMsg.createPublisher(Publisher.OrderBy.value(v -> rnd.nextInt(17))))
+ {
+ m_fRunning = true;
+ Logger.info("Entering Publisher loop");
+ while (true)
+ {
+ Publisher.Status status = publisher.publish(sMsg).get();
+ int nChannel = status.getChannel();
+ long count = m_mapCounts.getOrDefault(nChannel, 0L);
+ m_mapCounts.put(nChannel, count + 1);
+ m_cMessage++;
+ if (m_cMessage % 100 == 0)
+ {
+ Logger.info("Published " + m_cMessage + "messages. " + m_mapCounts);
+ }
+ if (nDelay > 5)
+ {
+ Blocking.sleep(nDelay);
+ }
+ }
+ }
+ }
+ }
+
+ // ----- data members ---------------------------------------------------
+
+ private static long m_cMessage;
+
+ public static final Map m_mapCounts = new HashMap<>();
+
+ private static boolean m_fRunning;
+ }
diff --git a/java/operator-test/src/main/java/com/oracle/coherence/idrs/SubscriberMain.java b/java/operator-test/src/main/java/com/oracle/coherence/idrs/SubscriberMain.java
new file mode 100644
index 00000000..1f35d832
--- /dev/null
+++ b/java/operator-test/src/main/java/com/oracle/coherence/idrs/SubscriberMain.java
@@ -0,0 +1,70 @@
+package com.oracle.coherence.idrs;
+
+import java.util.Set;
+import java.util.concurrent.ConcurrentSkipListSet;
+import java.util.concurrent.TimeUnit;
+
+import com.oracle.coherence.common.base.Logger;
+
+import com.tangosol.coherence.config.Config;
+import com.tangosol.internal.net.topic.impl.paged.PagedTopicSubscriber;
+import com.tangosol.internal.net.topic.impl.paged.model.PagedPosition;
+import com.tangosol.net.Coherence;
+import com.tangosol.net.Session;
+import com.tangosol.net.topic.NamedTopic;
+import com.tangosol.net.topic.Publisher;
+import com.tangosol.net.topic.Subscriber;
+
+@SuppressWarnings({"unchecked"})
+public class SubscriberMain
+ implements Constants
+ {
+
+ @SuppressWarnings("InfiniteLoopStatement")
+ public static void main(String[] args) throws Exception
+ {
+ try (Coherence coherence = Coherence.clusterMember())
+ {
+ coherence.start().get(5, TimeUnit.MINUTES);
+
+ Session session = coherence.getSession();
+ String sTopicMsg = Config.getProperty(PROP_TOPIC, DEFAULT_TOPIC);
+ String sTopicState = Config.getProperty(PROP_STATE_TOPIC, DEFAULT_STATE_TOPIC);
+ String sGroup = Config.getProperty(PROP_GROUP, DEFAULT_GROUP);
+ NamedTopic topicMsg = session.getTopic(sTopicMsg);
+ NamedTopic topicState = session.getTopic(sTopicState);
+
+ Logger.info("Creating subscriber");
+ try (Subscriber subscriber = topicMsg.createSubscriber(Subscriber.inGroup(sGroup));
+ Publisher publisher = topicState.createPublisher(Publisher.OrderBy.value(ChannelPosition::getChannel)))
+ {
+ PagedTopicSubscriber pagedTopicSubscriber = (PagedTopicSubscriber) subscriber;
+
+ Logger.info("Entering subscriber loop");
+ m_fRunning = true;
+ while (true)
+ {
+ Subscriber.Element element = subscriber.receive().get();
+ PagedPosition position = (PagedPosition) element.getPosition();
+ int[] anChannel = subscriber.getChannels();
+ Publisher.Status status = publisher.publish(new ChannelPosition(element.getChannel(), position, pagedTopicSubscriber.getSubscriberId(), anChannel)).get();
+ Subscriber.CommitResult result = element.commit();
+ Logger.info("Processed channel=" + element.getChannel() + " " + position + " result=" + result + " status=" + status);
+ m_cMessage++;
+// if (m_cMessage % 100 == 0)
+// {
+// Logger.info("Received " + m_cMessage + "messages");
+// }
+ }
+ }
+ }
+ }
+
+ // ----- data members ---------------------------------------------------
+
+ private static long m_cMessage;
+
+ private static boolean m_fRunning;
+
+ private static final Set m_setChannel = new ConcurrentSkipListSet<>();
+ }
diff --git a/java/operator-test/src/main/java/com/oracle/coherence/idrs/TrackerMain.java b/java/operator-test/src/main/java/com/oracle/coherence/idrs/TrackerMain.java
new file mode 100644
index 00000000..90a97bcc
--- /dev/null
+++ b/java/operator-test/src/main/java/com/oracle/coherence/idrs/TrackerMain.java
@@ -0,0 +1,159 @@
+package com.oracle.coherence.idrs;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import com.oracle.coherence.common.base.Logger;
+
+import com.tangosol.coherence.config.Config;
+import com.tangosol.internal.net.topic.impl.paged.model.PagedPosition;
+import com.tangosol.net.Coherence;
+import com.tangosol.net.Session;
+import com.tangosol.net.topic.NamedTopic;
+import com.tangosol.net.topic.Subscriber;
+
+@SuppressWarnings("unchecked")
+public class TrackerMain
+ implements Constants
+ {
+ @SuppressWarnings("InfiniteLoopStatement")
+ public static void main(String[] args) throws Exception
+ {
+ try (Coherence coherence = Coherence.clusterMember())
+ {
+ coherence.start().get(5, TimeUnit.MINUTES);
+
+ Session session = coherence.getSession();
+ String sTopicState = Config.getProperty(PROP_STATE_TOPIC, DEFAULT_STATE_TOPIC);
+ String sGroup = Config.getProperty(PROP_STATE_GROUP);
+ NamedTopic topicState = session.getTopic(sTopicState);
+
+ try (Subscriber subscriber = topicState.createSubscriber(Subscriber.inGroup(sGroup)))
+ {
+ m_fRunning = true;
+ Logger.info("Entering Tracker loop");
+ while (true)
+ {
+ Subscriber.Element element = subscriber.receive().get();
+ ChannelPosition position = element.getValue();
+ int nChannel = position.getChannel();
+ ChannelPosition lastChanPos = m_aReceived[nChannel];
+ PagedPosition lastPos = lastChanPos == null ? null : lastChanPos.getPosition();
+ PagedPosition thisPos = position.getPosition();
+
+// Logger.info("Received " + element);
+ if (lastPos != null)
+ {
+ long nLastPage = lastPos.getPage();
+ int nLastOffset = lastPos.getOffset();
+ long nThisPage = thisPos.getPage();
+ int nThisOffset = thisPos.getOffset();
+ if (nThisPage == nLastPage)
+ {
+ if (nThisOffset - nLastOffset > 1)
+ {
+ Logger.err("Missed message for channel " + position.getChannel() + " received=" + position + " last=" + lastChanPos);
+ m_listMissed.add(new MissedInfo(nChannel, lastChanPos, position));
+ }
+ if (nThisOffset <= nLastOffset)
+ {
+ m_nRepeated++;
+ }
+ }
+ else if (nThisPage > nLastPage)
+ {
+ if (nThisPage - nLastPage > 1)
+ {
+ Logger.err("Missed message for channel " + position.getChannel() + " received=" + position + " last=" + lastChanPos);
+ m_listMissed.add(new MissedInfo(nChannel, lastChanPos, position));
+ }
+ if (nThisOffset != 0)
+ {
+ Logger.err("Missed message for channel " + position.getChannel() + " received=" + position + " last=" + lastChanPos);
+ m_listMissed.add(new MissedInfo(nChannel, lastChanPos, position));
+ }
+ }
+ else // nThisPage < nLastPage
+ {
+ m_nRepeated++;
+ }
+ }
+ m_aReceived[nChannel] = position;
+ m_nReceived++;
+ if (m_nReceived % 100 == 0)
+ {
+ int cMissed = m_listMissed.size();
+ Logger.info("Received " + m_nReceived + " messages (missed=" + cMissed + ", repeated=" + m_nRepeated + ")");
+// for (int i = 0; i < m_aReceived.length; i++)
+// {
+// Logger.info(" Channel " + i + " " + m_aReceived[i]);
+// }
+// if (cMissed > 0)
+// {
+// m_listMissed.forEach(p -> Logger.info(p.toString()));
+// }
+ }
+ }
+ }
+ }
+ }
+
+ // ----- inner class MissedInfo -----------------------------------------
+
+ public static class MissedInfo
+ {
+ public MissedInfo(int nChannel, ChannelPosition lastPosition, ChannelPosition receivedPosition)
+ {
+ m_nChannel = nChannel;
+ m_lastPosition = lastPosition;
+ m_receivedPosition = receivedPosition;
+ }
+
+ public int getChannel()
+ {
+ return m_nChannel;
+ }
+
+ public ChannelPosition getLastPosition()
+ {
+ return m_lastPosition;
+ }
+
+ public ChannelPosition getReceivedPosition()
+ {
+ return m_receivedPosition;
+ }
+
+ @Override
+ public String toString()
+ {
+ return "Missed(" +
+ "channel=" + m_nChannel +
+ ", lastPosition=" + m_lastPosition +
+ ", receivedPosition=" + m_receivedPosition +
+ ')';
+ }
+
+ private final int m_nChannel;
+
+ private final ChannelPosition m_lastPosition;
+
+ private final ChannelPosition m_receivedPosition;
+ }
+
+
+ // ----- data members ---------------------------------------------------
+
+ private static final ChannelPosition[] m_aReceived = new ChannelPosition[34];
+
+ private static final List m_listMissed = new ArrayList<>();
+
+ private static long m_nLastReceive = 0;
+
+ private static long m_nReceived = 0;
+
+ private static long m_nRepeated = 0;
+
+ private static boolean m_fRunning;
+ }
diff --git a/java/pom.xml b/java/pom.xml
index 9ec1027f..56a55afd 100644
--- a/java/pom.xml
+++ b/java/pom.xml
@@ -40,7 +40,7 @@
21.12.4
- 22.06.7
+ 22.06.8
com.oracle.coherence.ce
${coherence.version}
diff --git a/test/e2e/idrs/idrs.yaml b/test/e2e/idrs/idrs.yaml
new file mode 100644
index 00000000..5559bf89
--- /dev/null
+++ b/test/e2e/idrs/idrs.yaml
@@ -0,0 +1,124 @@
+apiVersion: coherence.oracle.com/v1
+kind: Coherence
+metadata:
+ name: storage
+spec:
+ replicas: 3
+ image: ${TEST_APPLICATION_IMAGE}
+ imagePullPolicy: ${TEST_IMAGE_PULL_POLICY}
+ cluster: idrs-test
+ coherence:
+ management:
+ enabled: true
+ jvm:
+ args:
+ - "-Dcoherence.management=local-only"
+ memory:
+ maxRAM: 2g
+ maxHeapSize: 1g
+ resources:
+ requests:
+ cpu: 250m
+ limits:
+ cpu: 1
+ readinessProbe:
+ initialDelaySeconds: 30
+ periodSeconds: 15
+---
+apiVersion: coherence.oracle.com/v1
+kind: Coherence
+metadata:
+ name: subscriber
+spec:
+ replicas: 3
+ image: ${TEST_APPLICATION_IMAGE}
+ imagePullPolicy: ${TEST_IMAGE_PULL_POLICY}
+ cluster: idrs-test
+ coherence:
+ storageEnabled: false
+ management:
+ enabled: true
+ application:
+ main: com.oracle.coherence.idrs.SubscriberMain
+ jvm:
+ args:
+ - "-Dcoherence.management=local-only"
+ memory:
+ maxRAM: 2g
+ maxHeapSize: 1g
+ resources:
+ requests:
+ cpu: 250m
+ limits:
+ cpu: 1
+ readinessProbe:
+ initialDelaySeconds: 30
+ periodSeconds: 15
+ startQuorum:
+ - deployment: storage
+---
+apiVersion: coherence.oracle.com/v1
+kind: Coherence
+metadata:
+ name: tracker
+spec:
+ replicas: 1
+ image: ${TEST_APPLICATION_IMAGE}
+ imagePullPolicy: ${TEST_IMAGE_PULL_POLICY}
+ cluster: idrs-test
+ coherence:
+ storageEnabled: false
+ metrics:
+ enabled: true
+ management:
+ enabled: true
+ application:
+ main: com.oracle.coherence.idrs.TrackerMain
+ jvm:
+ args:
+ - "-Dcoherence.management=all"
+ memory:
+ maxRAM: 2g
+ maxHeapSize: 1g
+ env:
+ - name: COHERENCE_M
+ readinessProbe:
+ initialDelaySeconds: 30
+ periodSeconds: 15
+ ports:
+ - name: management
+ startQuorum:
+ - deployment: storage
+---
+apiVersion: coherence.oracle.com/v1
+kind: Coherence
+metadata:
+ name: publisher
+spec:
+ replicas: 1
+ image: ${TEST_APPLICATION_IMAGE}
+ imagePullPolicy: ${TEST_IMAGE_PULL_POLICY}
+ cluster: idrs-test
+ coherence:
+ storageEnabled: false
+ management:
+ enabled: true
+ application:
+ main: com.oracle.coherence.idrs.PublisherMain
+ jvm:
+ args:
+ - "-Dcoherence.management=local-only"
+ memory:
+ maxRAM: 2g
+ maxHeapSize: 1g
+ readinessProbe:
+ initialDelaySeconds: 30
+ periodSeconds: 15
+ ports:
+ - name: management
+ env:
+ - name: COHERENCE_TEST_PUBLISH_DELAY
+ value: "5000"
+ startQuorum:
+ - deployment: subscriber
+
diff --git a/test/e2e/idrs/idrs_suite_test.go b/test/e2e/idrs/idrs_suite_test.go
new file mode 100644
index 00000000..e10b8aae
--- /dev/null
+++ b/test/e2e/idrs/idrs_suite_test.go
@@ -0,0 +1,91 @@
+/*
+ * Copyright (c) 2019, 2023, Oracle and/or its affiliates.
+ * Licensed under the Universal Permissive License v 1.0 as shown at
+ * http://oss.oracle.com/licenses/upl.
+ */
+
+package idrs
+
+import (
+ "context"
+ "fmt"
+ . "github.com/onsi/gomega"
+ cohv1 "github.com/oracle/coherence-operator/api/v1"
+ "github.com/oracle/coherence-operator/test/e2e/helper"
+ "os"
+ "testing"
+ "time"
+)
+
+var testContext helper.TestContext
+
+// The entry point for the test suite
+func TestMain(m *testing.M) {
+ var err error
+
+ helper.EnsureTestEnvVars()
+
+ // Create a new TestContext - DO NOT start any controllers.
+ if testContext, err = helper.NewContext(false); err != nil {
+ fmt.Printf("Error: %+v", err)
+ os.Exit(1)
+ }
+
+ // Ensure that the Operator has been deployed to the test namespace
+ namespace := helper.GetTestNamespace()
+ pods, err := helper.ListOperatorPods(testContext, namespace)
+ if err != nil {
+ fmt.Printf("Error looking for Operator Pods in namespace %s : %+v", namespace, err)
+ os.Exit(1)
+ }
+ if len(pods) == 0 {
+ fmt.Printf("Cannot find any Operator Pods in namespace %s. "+
+ "This test suite requires an Operator is already deployed", namespace)
+ os.Exit(1)
+ }
+
+ fmt.Printf("Waiting for Operator Pod %s to be ready in namespace %s.", pods[0].Name, namespace)
+ err = helper.WaitForPodReady(testContext, namespace, pods[0].Name, 10*time.Second, 5*time.Minute)
+ if err != nil {
+ fmt.Printf("Failed waiting for Operator Pod %s to be ready in namespace %s.", pods[0].Name, namespace)
+ os.Exit(1)
+ }
+
+ exitCode := m.Run()
+ testContext.Logf("Tests completed with return code %d", exitCode)
+ testContext.Close()
+ os.Exit(exitCode)
+}
+
+// installSimpleDeployment installs a deployment and asserts that the underlying
+// StatefulSet resources reach the correct state.
+func installSimpleDeployment(t *testing.T, d cohv1.Coherence) {
+ g := NewGomegaWithT(t)
+ err := testContext.Client.Create(context.TODO(), &d)
+ g.Expect(err).NotTo(HaveOccurred())
+ assertDeploymentEventuallyInDesiredState(t, d, d.GetReplicas())
+}
+
+// assertDeploymentEventuallyInDesiredState asserts that a Coherence resource exists and has the correct spec and that the
+// underlying StatefulSet exists with the correct status and ready replicas.
+func assertDeploymentEventuallyInDesiredState(t *testing.T, d cohv1.Coherence, replicas int32) {
+ g := NewGomegaWithT(t)
+
+ testContext.Logf("Asserting Coherence resource %s exists with %d replicas", d.Name, replicas)
+
+ // create a DeploymentStateCondition that checks a deployment's replica count
+ condition := helper.ReplicaCountCondition(replicas)
+
+ // wait for the deployment to match the condition
+ _, err := helper.WaitForCoherenceCondition(testContext, d.Namespace, d.Name, condition, time.Second*10, time.Minute*5)
+ g.Expect(err).NotTo(HaveOccurred())
+
+ testContext.Logf("Asserting StatefulSet %s exists with %d replicas", d.Name, replicas)
+
+ // wait for the StatefulSet to have the required ready replicas
+ sts, err := helper.WaitForStatefulSet(testContext, d.Namespace, d.Name, replicas, time.Second*10, time.Minute*5)
+ g.Expect(err).NotTo(HaveOccurred())
+ g.Expect(sts.Status.ReadyReplicas).To(Equal(replicas))
+
+ testContext.Logf("Asserting StatefulSet %s exist with %d replicas - Done!", d.Name, replicas)
+}
diff --git a/test/e2e/idrs/idrs_test.go b/test/e2e/idrs/idrs_test.go
new file mode 100644
index 00000000..41b66e8e
--- /dev/null
+++ b/test/e2e/idrs/idrs_test.go
@@ -0,0 +1,146 @@
+/*
+ * Copyright (c) 2020, 2024, Oracle and/or its affiliates.
+ * Licensed under the Universal Permissive License v 1.0 as shown at
+ * http://oss.oracle.com/licenses/upl.
+ */
+
+package idrs
+
+import (
+ "context"
+ "encoding/json"
+ "fmt"
+ . "github.com/onsi/gomega"
+ "github.com/oracle/coherence-operator/test/e2e/helper"
+ "github.com/pkg/errors"
+ "k8s.io/apimachinery/pkg/util/wait"
+ "os/exec"
+ "testing"
+ "time"
+)
+
+func TestTopics(t *testing.T) {
+ // Make sure we defer clean-up when we're done!!
+ testContext.CleanupAfterTest(t)
+ namespace := helper.GetTestNamespace()
+
+ g := NewGomegaWithT(t)
+
+ t.Log("Deploying initial version of Coherence cluster")
+
+ // Do the initial deployments
+ helper.AssertDeployments(testContext, t, "idrs.yaml")
+
+ counts, err := GetCounts()
+ g.Expect(err).NotTo(HaveOccurred())
+
+ err = WaitForCounts(testContext, counts, time.Second*2, time.Minute*5)
+ g.Expect(err).NotTo(HaveOccurred())
+
+ for i := 0; i < 10; i++ {
+ for p := 0; p < 3; p++ {
+ for j := 0; j < 2; j++ {
+ var pod string
+ var sts string
+ if j == 0 {
+ sts = "storage"
+ pod = fmt.Sprintf("storage-%d", p)
+ } else {
+ sts = "subscriber"
+ pod = fmt.Sprintf("subscriber-%d", p)
+ }
+
+ t.Logf("Killing Pod %s", pod)
+ _, err := kubectl("delete", "pod", pod)
+ g.Expect(err).NotTo(HaveOccurred())
+ t.Log("Sleeping...")
+ time.Sleep(30 * time.Second)
+ t.Logf("Waiting for StatefulSet %s to be ready...", sts)
+ _, err = helper.WaitForStatefulSet(testContext, namespace, sts, 3, 10*time.Second, 5*time.Minute)
+ g.Expect(err).NotTo(HaveOccurred())
+
+ t.Log("Waiting for counts to increase...")
+ counts, err = GetCounts()
+ g.Expect(err).NotTo(HaveOccurred())
+ err = WaitForCounts(testContext, counts, time.Second*2, time.Minute*5)
+ g.Expect(err).NotTo(HaveOccurred())
+ }
+ }
+ }
+}
+
+func GetCounts() (map[float64]float64, error) {
+
+ m, err := cohctl("get", "subscribers", "test")
+ if err != nil {
+ return nil, err
+ }
+
+ var items []interface{}
+ data, ok := m["items"]
+ if !ok {
+ return nil, errors.Errorf("failed to get items from json")
+ }
+
+ items = data.([]interface{})
+ if len(items) != 3 {
+ return nil, errors.Errorf("failed to get three items from json")
+ }
+
+ counts := make(map[float64]float64, 3)
+
+ for _, item := range items {
+ sub := item.(map[string]interface{})
+ id := sub["id"].(float64)
+ count := sub["receivedCount"].(float64)
+ counts[id] = count
+ }
+
+ return counts, nil
+}
+
+func cohctl(args ...string) (map[string]interface{}, error) {
+ args = append([]string{"exec", "tracker-0", "-c", "coherence", "--", "/coherence-operator/utils/cohctl"}, args...)
+ args = append(args, "-o", "json")
+
+ b, err := kubectl(args...)
+ if err != nil {
+ return nil, err
+ }
+ m := make(map[string]interface{})
+ if err = json.Unmarshal(b, &m); err != nil {
+ return nil, err
+ }
+ return m, nil
+}
+
+func kubectl(args ...string) ([]byte, error) {
+ namespace := helper.GetTestNamespace()
+
+ args = append([]string{"-n", namespace}, args...)
+
+ cmd := exec.Command("kubectl", args...)
+ return cmd.CombinedOutput()
+}
+
+func WaitForCounts(ctx helper.TestContext, initial map[float64]float64, retryInterval, timeout time.Duration) error {
+ err := wait.PollUntilContextTimeout(ctx.Context, retryInterval, timeout, true, func(context.Context) (done bool, err error) {
+ counts, err := GetCounts()
+ if err != nil {
+ ctx.Logf("Error waiting for counts to increase - %s", err.Error())
+ return false, err
+ }
+
+ for id, count := range initial {
+ c, ok := counts[id]
+ if !ok || count == c {
+ ctx.Logf("Waiting for counts to increase - %f is still %f", id, count)
+ return false, nil
+ }
+ ctx.Logf("Waiting for counts to increase - %f was %f is now %f", id, count, c)
+ }
+ return true, nil
+ })
+
+ return err
+}
diff --git a/test/e2e/idrs/publisher.yaml b/test/e2e/idrs/publisher.yaml
new file mode 100644
index 00000000..89ea4df3
--- /dev/null
+++ b/test/e2e/idrs/publisher.yaml
@@ -0,0 +1,32 @@
+apiVersion: coherence.oracle.com/v1
+kind: Coherence
+metadata:
+ name: publisher
+spec:
+ replicas: 1
+ image: ${TEST_APPLICATION_IMAGE}
+ imagePullPolicy: ${TEST_IMAGE_PULL_POLICY}
+ cluster: idrs-test
+ coherence:
+ storageEnabled: false
+ management:
+ enabled: true
+ application:
+ main: com.oracle.coherence.idrs.PublisherMain
+ jvm:
+ memory:
+ maxRAM: 2g
+ maxHeapSize: 1g
+ readinessProbe:
+ initialDelaySeconds: 30
+ periodSeconds: 15
+ ports:
+ - name: management
+ env:
+ - name: COHERENCE_TEST_PUBLISH_DELAY
+ value: "5000"
+
+
+
+
+
diff --git a/test/e2e/idrs/storage.yaml b/test/e2e/idrs/storage.yaml
new file mode 100644
index 00000000..365d78d9
--- /dev/null
+++ b/test/e2e/idrs/storage.yaml
@@ -0,0 +1,29 @@
+apiVersion: coherence.oracle.com/v1
+kind: Coherence
+metadata:
+ name: storage
+spec:
+ replicas: 3
+ image: ${TEST_APPLICATION_IMAGE}
+ imagePullPolicy: ${TEST_IMAGE_PULL_POLICY}
+ cluster: idrs-test
+ coherence:
+ management:
+ enabled: true
+ jvm:
+ memory:
+ maxRAM: 2g
+ maxHeapSize: 1g
+ resources:
+ requests:
+ cpu: 250m
+ limits:
+ cpu: 1
+ readinessProbe:
+ initialDelaySeconds: 30
+ periodSeconds: 15
+
+
+
+
+
diff --git a/test/e2e/idrs/subscriber.yaml b/test/e2e/idrs/subscriber.yaml
new file mode 100644
index 00000000..55cab4b6
--- /dev/null
+++ b/test/e2e/idrs/subscriber.yaml
@@ -0,0 +1,33 @@
+apiVersion: coherence.oracle.com/v1
+kind: Coherence
+metadata:
+ name: subscriber
+spec:
+ replicas: 3
+ image: ${TEST_APPLICATION_IMAGE}
+ imagePullPolicy: ${TEST_IMAGE_PULL_POLICY}
+ cluster: idrs-test
+ coherence:
+ storageEnabled: false
+ management:
+ enabled: true
+ application:
+ main: com.oracle.coherence.idrs.SubscriberMain
+ jvm:
+ memory:
+ maxRAM: 2g
+ maxHeapSize: 1g
+ resources:
+ requests:
+ cpu: 250m
+ limits:
+ cpu: 1
+ readinessProbe:
+ initialDelaySeconds: 30
+ periodSeconds: 15
+
+
+
+
+
+
diff --git a/test/e2e/idrs/tracker.yaml b/test/e2e/idrs/tracker.yaml
new file mode 100644
index 00000000..ce971fe0
--- /dev/null
+++ b/test/e2e/idrs/tracker.yaml
@@ -0,0 +1,33 @@
+apiVersion: coherence.oracle.com/v1
+kind: Coherence
+metadata:
+ name: tracker
+spec:
+ replicas: 1
+ image: ${TEST_APPLICATION_IMAGE}
+ imagePullPolicy: ${TEST_IMAGE_PULL_POLICY}
+ cluster: idrs-test
+ coherence:
+ storageEnabled: false
+ metrics:
+ enabled: true
+ management:
+ enabled: true
+ application:
+ main: com.oracle.coherence.idrs.TrackerMain
+ jvm:
+ memory:
+ maxRAM: 2g
+ maxHeapSize: 1g
+ env:
+ - name: COHERENCE_M
+ readinessProbe:
+ initialDelaySeconds: 30
+ periodSeconds: 15
+ ports:
+ - name: management
+
+
+
+
+