Skip to content

Commit

Permalink
INT-2062 - Fixed GemfireMessageGroupStoreTest (changed Set to List to…
Browse files Browse the repository at this point in the history
… avoid equals() collisions affecting size())
  • Loading branch information
dturanski authored and markfisher committed Aug 18, 2011
1 parent 180cce3 commit 6d40df2
Show file tree
Hide file tree
Showing 5 changed files with 63 additions and 56 deletions.
1 change: 1 addition & 0 deletions spring-integration-gemfire/.gitignore
@@ -0,0 +1 @@
*.cfg
Expand Up @@ -24,7 +24,6 @@
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ConcurrentMap;

/**
* Provides an implementation of {@link org.springframework.integration.store.MessageGroupStore} that delegates to a backend Gemfire instance.
Expand Down
@@ -1,5 +1,10 @@
package org.springframework.integration.gemfire.store.messagegroupstore;

import static org.junit.Assert.assertEquals;

import java.util.Collection;
import java.util.List;

import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
Expand All @@ -8,50 +13,48 @@
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
import org.springframework.test.context.support.AnnotationConfigContextLoader;

import java.util.Collection;
import java.util.List;
import java.util.Set;

/**
* Tests the Gemfire {@link org.springframework.integration.store.MessageGroupStore} implementation,
* {@link org.springframework.integration.gemfire.store.GemfireMessageGroupStore}.
* Tests the Gemfire
* {@link org.springframework.integration.store.MessageGroupStore}
* implementation,
* {@link org.springframework.integration.gemfire.store.GemfireMessageGroupStore}
* .
* <p/>
* It tests the {@link org.springframework.integration.store.MessageGroupStore} by sending 10 batches of letters (all of the same width),
* and then counting on the other end that indeed all 10 batches arrived and that all letters expected are there.
* *
*
* It tests the {@link org.springframework.integration.store.MessageGroupStore}
* by sending 10 batches of letters (all of the same width), and then counting
* on the other end that indeed all 10 batches arrived and that all letters
* expected are there. *
*
* @author Josh Long
*/
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(loader = AnnotationConfigContextLoader.class, classes = {GemfireMessageGroupStoreTestConfiguration.class})
@ContextConfiguration(loader = AnnotationConfigContextLoader.class, classes = { GemfireMessageGroupStoreTestConfiguration.class })
public class GemfireMessageGroupStoreTest {

@Autowired private GemfireMessageGroupStoreTestConfiguration.FakeMessageConsumer consumer;
@Autowired
private GemfireMessageGroupStoreTestConfiguration.FakeMessageConsumer consumer;
@Autowired
private GemfireMessageGroupStoreTestConfiguration.FakeMessageProducer producer;

private List<String> letters = GemfireMessageGroupStoreTestConfiguration.LIST_OF_STRINGS;

private int maxSize = 10;
private long totalTimeSleeping = 10 * 1000; // give it 10s to send the messages

@Test
public void testGemfireMessageGroupStore() throws Throwable {
long counter = 0;
int delay = 1000;
Set<Collection<Object>> batches = consumer.getBatches();

while (batches.size() < maxSize && counter < totalTimeSleeping) {
counter += delay;
Thread.sleep(delay);
}

Assert.assertTrue(batches.size() == maxSize);
for (Collection<Object> collection : batches) {
Assert.assertTrue(letters.size() == collection.size());
for (String c : this.letters) {
Assert.assertTrue(collection.contains(c));
}
for (Object o : collection) {
Assert.assertTrue(o instanceof String);
public void testGemfireMessageGroupStore() throws Exception {
producer.afterPropertiesSet();
producer.start();
List<Collection<Object>> batches = consumer.getBatches();
assertEquals(maxSize, batches.size());
for (Collection<Object> collection : batches) {
Assert.assertTrue(letters.size() == collection.size());
for (String c : this.letters) {
Assert.assertTrue(collection.contains(c));
}
for (Object o : collection) {
Assert.assertTrue(o instanceof String);
}
}
}
producer.stop();
}
}
Expand Up @@ -16,8 +16,11 @@

package org.springframework.integration.gemfire.store.messagegroupstore;

import com.gemstone.gemfire.cache.Cache;
import com.gemstone.gemfire.cache.Region;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.beans.factory.InitializingBean;
Expand All @@ -43,12 +46,14 @@
import org.springframework.integration.support.MessageBuilder;
import org.springframework.util.Assert;

import java.util.*;
import com.gemstone.gemfire.cache.Cache;
import com.gemstone.gemfire.cache.Region;

/**
* Our aggregator needs a {@link org.springframework.integration.gemfire.store.KeyValueMessageGroupStore}.
* This handles configuration of the ancillary objects.
*
* Our aggregator needs a
* {@link org.springframework.integration.gemfire.store.KeyValueMessageGroupStore}
* . This handles configuration of the ancillary objects.
*
* @author Josh Long
* @since 2.1
*/
Expand All @@ -70,7 +75,6 @@ public Cache cache() throws Throwable {
return cacheFactoryBean.getObject();
}


@Bean
public Region<Object, KeyValueMessageGroup> messageGroupRegion() throws Throwable {
RegionFactoryBean<Object, KeyValueMessageGroup> regionFactoryBean = new RegionFactoryBean<Object, KeyValueMessageGroup>();
Expand Down Expand Up @@ -98,7 +102,6 @@ public Region<String, Message<?>> markedRegion() throws Throwable {
return regionFactoryBean.getObject();
}


@Bean(name = "messageGroupStoreActivator")
public FakeMessageConsumer serviceActivator() {
return new FakeMessageConsumer();
Expand Down Expand Up @@ -126,16 +129,17 @@ public FakeMessageProducer producer() {

static public class FakeMessageConsumer {

private Set<Collection<Object>> batches = new HashSet<Collection<Object>>();
private List<Collection<Object>> batches = new ArrayList<Collection<Object>>();

public Set<Collection<Object>> getBatches() {
public List<Collection<Object>> getBatches() {
return this.batches;
}

@ServiceActivator
public void activateAsMessagesArriveInBatches(Message<Collection<Object>> msg) throws Throwable {
Collection<Object> payloads = msg.getPayload();
batches.add(payloads);

if (log.isDebugEnabled()) {
log.debug(payloads);
}
Expand All @@ -146,17 +150,20 @@ public void activateAsMessagesArriveInBatches(Message<Collection<Object>> msg) t

static public class FakeMessageProducer implements InitializingBean, SmartLifecycle {
public boolean isAutoStartup() {
return true;
return false;
}

public void stop(Runnable callback) {
stop();
callback.run();
}

public int getPhase() {
return 0;
}

@Autowired @Qualifier("i")
@Autowired
@Qualifier("i")
private MessageChannel messageChannel;

private MessagingTemplate messagingTemplate = new MessagingTemplate();
Expand All @@ -172,35 +179,32 @@ public void sendManyMessages(int correlationValue, Collection<String> lines) thr
int ctr = 0;
int size = lines.size();
for (String l : lines) {
Message<?> msg = MessageBuilder.withPayload(l)
.setCorrelationId(this.correlationHeader)
.setHeader(this.correlationHeader, correlationValue)
.setSequenceNumber(++ctr)
.setSequenceSize(size)
.build();
Message<?> msg = MessageBuilder.withPayload(l).setCorrelationId(this.correlationHeader)
.setHeader(this.correlationHeader, correlationValue).setSequenceNumber(++ctr)
.setSequenceSize(size).build();
this.messagingTemplate.send(msg);
}
}


public void afterPropertiesSet() throws Exception {
this.messagingTemplate.setDefaultChannel(this.messageChannel);
}


public void start() {
running = true;
for (int i = 0; i < 10; i++) {
try {
running = true;
sendManyMessages(i, LIST_OF_STRINGS);
running = false;
} catch (Throwable throwable) {
}
catch (Throwable throwable) {
throw new RuntimeException(throwable);
}
}

}

public void stop() {
running = false;
}

public boolean isRunning() {
Expand Down

0 comments on commit 6d40df2

Please sign in to comment.