Skip to content

Commit

Permalink
Add tests for transactions package
Browse files Browse the repository at this point in the history
See #661
  • Loading branch information
onobc committed Apr 23, 2024
1 parent 825d0c2 commit 7e2109d
Show file tree
Hide file tree
Showing 6 changed files with 456 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
import org.springframework.pulsar.core.DefaultPulsarProducerFactory;
import org.springframework.pulsar.core.PulsarTemplate;
import org.springframework.pulsar.test.support.PulsarTestContainerSupport;
import org.springframework.pulsar.transaction.PulsarAwareTransactionManager;
import org.springframework.test.util.ReflectionTestUtils;

/**
Expand Down Expand Up @@ -405,6 +406,7 @@ void batchListenerWithRecordAckModeNotSupported() {
var containerProps = new PulsarContainerProperties();
containerProps.setSchema(Schema.STRING);
containerProps.transactions().setEnabled(true);
containerProps.transactions().setTransactionManager(mock(PulsarAwareTransactionManager.class));
containerProps.setBatchListener(true);
containerProps.setAckMode(AckMode.RECORD);
containerProps.setMessageListener((PulsarBatchMessageListener<?>) (consumer, msgs) -> {
Expand All @@ -413,7 +415,7 @@ void batchListenerWithRecordAckModeNotSupported() {
var consumerFactory = new DefaultPulsarConsumerFactory<String>(mock(PulsarClient.class), List.of());
var container = new DefaultPulsarMessageListenerContainer<>(consumerFactory, containerProps);
assertThatIllegalStateException().isThrownBy(() -> container.start())
.withMessage("Batch record listeners do not support AckMode.RECORD");
.withMessage("Transactional batch listeners do not support AckMode.RECORD");
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/*
* Copyright 2023-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.pulsar.listener;

import static org.assertj.core.api.Assertions.assertThat;

import java.time.Duration;

import org.junit.jupiter.api.Test;

import org.springframework.pulsar.listener.PulsarContainerProperties.TransactionSettings;
import org.springframework.transaction.TransactionDefinition;
import org.springframework.transaction.support.DefaultTransactionDefinition;

/**
* Unit tests for {@link TransactionSettings}.
*
* @author Chris Bono
*/
class TransactionSettingsTests {

@Test
void whenTimeoutNotSetThenReturnsConfiguredDefinition() {
var txnSettings = new TransactionSettings();
var txnDefinition = new DefaultTransactionDefinition();
txnSettings.setTransactionDefinition(txnDefinition);
assertThat(txnSettings.determineTransactionDefinition()).isSameAs(txnDefinition);
}

@Test
void whenTimeoutSetButDefinitionNotSetThenReturnsNewDefinitionWithTimeout() {
var txnSettings = new TransactionSettings();
txnSettings.setTimeout(Duration.ofSeconds(100));
assertThat(txnSettings.determineTransactionDefinition()).extracting(TransactionDefinition::getTimeout)
.isEqualTo(100);
}

@Test
void whenTimeoutSetAndDefinitionSetThenReturnsCloneDefinitionUpdatedWithTimeout() {
var txnSettings = new TransactionSettings();
txnSettings.setTimeout(Duration.ofSeconds(200));
var txnDefinition = new DefaultTransactionDefinition();
txnDefinition.setTimeout(100);
txnSettings.setTransactionDefinition(txnDefinition);
assertThat(txnSettings.determineTransactionDefinition()).extracting(TransactionDefinition::getTimeout)
.isEqualTo(200);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
* Copyright 2023-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.pulsar.transaction;

import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

import java.util.concurrent.CompletableFuture;

import org.apache.pulsar.client.api.transaction.Transaction;
import org.junit.jupiter.api.Test;

/**
* Tests for {@link PulsarResourceHolder}.
*
* @author Chris Bono
*/
class PulsarResourceHolderTests {

@Test
void rollbackAbortsTransaction() {
var txn = mock(Transaction.class);
when(txn.abort()).thenReturn(CompletableFuture.completedFuture(null));
var holder = new PulsarResourceHolder(txn);
holder.rollback();
verify(txn).abort();
}

@Test
void multipleCommitCallsCommitsTransactionOnce() {
var txn = mock(Transaction.class);
when(txn.commit()).thenReturn(CompletableFuture.completedFuture(null));
var holder = new PulsarResourceHolder(txn);
holder.commit();
holder.commit();
holder.commit();
verify(txn).commit();
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/*
* Copyright 2023-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.pulsar.transaction;

import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;

import org.apache.pulsar.client.api.PulsarClient;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;

import org.springframework.transaction.support.TransactionSynchronization;

/**
* Tests for {@link PulsarResourceSynchronization}.
*
* @author Chris Bono
*/
class PulsarResourceSynchronizationTests {

private final PulsarClient pulsarClient = mock(PulsarClient.class);

@Test
void processResourceAfterCommitDoesCommitOnResourceHolder() {
var holder = mock(PulsarResourceHolder.class);
var sync = new PulsarResourceSynchronization(holder, pulsarClient);
sync.processResourceAfterCommit(holder);
verify(holder).commit();
}

@Test
void afterCompletionDoesCommitOnHolderWhenTxnStatusIsCommitted() {
var holder = mock(PulsarResourceHolder.class);
var sync = new PulsarResourceSynchronization(holder, pulsarClient);
sync.afterCompletion(TransactionSynchronization.STATUS_COMMITTED);
verify(holder).commit();
}

@ParameterizedTest
@ValueSource(ints = { TransactionSynchronization.STATUS_ROLLED_BACK, TransactionSynchronization.STATUS_UNKNOWN })
void afterCompletionDoesRollbackOnHolderWhenTxnStatusIsNotCommitted(int status) {
var holder = mock(PulsarResourceHolder.class);
var sync = new PulsarResourceSynchronization(holder, pulsarClient);
sync.afterCompletion(status);
verify(holder).rollback();
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
/*
* Copyright 2023-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.pulsar.transaction;

import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

import org.apache.pulsar.client.api.PulsarClient;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.transaction.support.DefaultTransactionStatus;
import org.springframework.transaction.support.TransactionSynchronizationManager;

/**
* Tests for {@link PlatformTransactionManager}.
*
* @author Chris Bono
*/
class PulsarTransactionManagerTests {

private PulsarClient pulsarClient = mock(PulsarClient.class);

private PulsarTransactionManager transactionManager;

private PulsarResourceHolder resourceHolder;

private PulsarTransactionObject transactionObject;

private DefaultTransactionStatus transactionStatus;

@BeforeEach
void prepareForTest() {
transactionManager = new PulsarTransactionManager(pulsarClient);
resourceHolder = mock(PulsarResourceHolder.class);
transactionObject = new PulsarTransactionObject();
transactionObject.setResourceHolder(resourceHolder);
transactionStatus = mock(DefaultTransactionStatus.class);
when(transactionStatus.getTransaction()).thenReturn(transactionObject);
}

@Test
void doGetTransactionReturnsPulsarTxnObject() {
TransactionSynchronizationManager.bindResource(this.pulsarClient, resourceHolder);
assertThat(transactionManager.doGetTransaction()).isInstanceOf(PulsarTransactionObject.class)
.hasFieldOrPropertyWithValue("resourceHolder", resourceHolder);
}

@Test
void isExistingTransactionReturnsTrueWhenTxnObjectHasResourceHolder() {
var txnObject = new PulsarTransactionObject();
txnObject.setResourceHolder(resourceHolder);
assertThat(transactionManager.isExistingTransaction(txnObject)).isTrue();
}

@Test
void isExistingTransactionReturnsFalseWhenTxnObjectHasNoResourceHolder() {
var txnObject = new PulsarTransactionObject();
assertThat(transactionManager.isExistingTransaction(txnObject)).isFalse();
}

@Test
void doSuspendUnbindsAndNullsOutResourceHolder() {
TransactionSynchronizationManager.bindResource(this.pulsarClient, resourceHolder);
transactionManager.doSuspend(transactionObject);
assertThat(transactionObject.getResourceHolder()).isNull();
assertThat(TransactionSynchronizationManager.getResource(this.pulsarClient)).isNull();
}

@Test
void doResumeBindsResourceHolder() {
transactionManager.doResume("unused", resourceHolder);
assertThat(TransactionSynchronizationManager.getResource(this.pulsarClient)).isSameAs(resourceHolder);
}

@Test
void doCommitDoesCommitOnResourceHolder() {
transactionManager.doCommit(transactionStatus);
verify(resourceHolder).commit();
}

@Test
void doRollbackDoesRollbackOnResourceHolder() {
transactionManager.doRollback(transactionStatus);
verify(resourceHolder).rollback();
}

@Test
void doSetRollbackOnlyDoesSetRollbackOnlyOnResourceHolder() {
transactionManager.doSetRollbackOnly(transactionStatus);
verify(resourceHolder).setRollbackOnly();
}

@Test
void doCleanupDoesUnbindAndClearResourceHolder() {
TransactionSynchronizationManager.bindResource(this.pulsarClient, resourceHolder);
transactionManager.doCleanupAfterCompletion(transactionObject);
assertThat(TransactionSynchronizationManager.getResource(this.pulsarClient)).isNull();
verify(transactionObject.getResourceHolder()).clear();
}

}

0 comments on commit 7e2109d

Please sign in to comment.