From 55bc444f8b5153b2920e555a61849a7bf89db8be Mon Sep 17 00:00:00 2001 From: llIlll <10194588+llIlll@users.noreply.github.com> Date: Wed, 9 Sep 2020 11:11:46 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BF=AE=E5=A4=8D=E5=B9=B6=E8=A1=8C=E6=B6=88?= =?UTF-8?q?=E8=B4=B9=E9=97=AE=E9=A2=98=20(#304)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * 修复并行消费问题 --- .../broker/cluster/ClusterManager.java | 10 +-- .../SlideWindowConcurrentConsumer.java | 9 ++- .../broker/consumer/filter/FlagFilter.java | 2 +- .../DefaultConsumerMonitorService.java | 21 ++--- .../DefaultProducerMonitorService.java | 21 ++--- .../consumer/FilterMessageSupportTest.java | 2 +- .../consumer/filter/FlagFilterTest.java | 26 +++--- .../kafka/converter/CheckResultConverter.java | 6 +- .../kafka/handler/ProduceRequestHandler.java | 3 +- .../java/org/joyqueue/store/QosStore.java | 19 ++++- .../joyqueue/store/utils/MessageUtils.java | 14 ++++ .../store/PartitionGroupStoreManagerTest.java | 80 +++++++++++++++++++ 12 files changed, 163 insertions(+), 50 deletions(-) diff --git a/joyqueue-server/joyqueue-broker-core/src/main/java/org/joyqueue/broker/cluster/ClusterManager.java b/joyqueue-server/joyqueue-broker-core/src/main/java/org/joyqueue/broker/cluster/ClusterManager.java index bb9ca24ce..e15b0caf6 100644 --- a/joyqueue-server/joyqueue-broker-core/src/main/java/org/joyqueue/broker/cluster/ClusterManager.java +++ b/joyqueue-server/joyqueue-broker-core/src/main/java/org/joyqueue/broker/cluster/ClusterManager.java @@ -867,15 +867,7 @@ public List getLocalSubscribeAppByTopic(TopicName topic) { } public List getLocalProducersByTopic(TopicName topic) { - Map producers = localCache.getTopicProducers(topic); - if (MapUtils.isEmpty(producers)) { - return Collections.emptyList(); - } - List result = Lists.newLinkedList(); - for (Map.Entry entry : producers.entrySet()) { - result.add(entry.getValue().getProducer()); - } - return result; + return Lists.newArrayList(nameService.getProducerByTopic(topic)); } public List getLocalConsumersByTopic(TopicName topic) { diff --git a/joyqueue-server/joyqueue-broker-core/src/main/java/org/joyqueue/broker/consumer/SlideWindowConcurrentConsumer.java b/joyqueue-server/joyqueue-broker-core/src/main/java/org/joyqueue/broker/consumer/SlideWindowConcurrentConsumer.java index 4be80de9c..85f42ce80 100644 --- a/joyqueue-server/joyqueue-broker-core/src/main/java/org/joyqueue/broker/consumer/SlideWindowConcurrentConsumer.java +++ b/joyqueue-server/joyqueue-broker-core/src/main/java/org/joyqueue/broker/consumer/SlideWindowConcurrentConsumer.java @@ -534,10 +534,10 @@ private boolean extendSlideWindowAndUpdatePullIndex( if (logger.isDebugEnabled()) { logger.debug("set new pull index:{}, topic:{}, app:{}, partition:{}", newPullIndex, consumer.getTopic(), consumer.getApp(), partition); } - positionManager.updateLastMsgPullIndex(TopicName.parse(consumer.getTopic()), consumer.getApp(), partition, newPullIndex); if (consumedMessages.isReset()) { - positionManager.updateLastMsgAckIndex(TopicName.parse(consumer.getTopic()), consumer.getApp(), partition, consumedMessages.getStartIndex(), false); + positionManager.updateLastMsgAckIndex(TopicName.parse(consumer.getTopic()), consumer.getApp(), partition, pullIndex); } + positionManager.updateLastMsgPullIndex(TopicName.parse(consumer.getTopic()), consumer.getApp(), partition, newPullIndex); return true; } else { return false; @@ -566,6 +566,9 @@ private int count(List buffers) { * @return 消息序号 */ private long getPullIndex(Consumer consumer, short partition) throws JoyQueueException { + if (partition == Partition.RETRY_PARTITION_ID) { + return 0; + } // 本次拉取消息的位置,默认从0开始ack long pullIndex = 0; String topic = consumer.getTopic(); @@ -930,7 +933,7 @@ boolean isAcked() { return status.get() == ACKED; } - public boolean isReset() { + boolean isReset() { return reset; } } diff --git a/joyqueue-server/joyqueue-broker-core/src/main/java/org/joyqueue/broker/consumer/filter/FlagFilter.java b/joyqueue-server/joyqueue-broker-core/src/main/java/org/joyqueue/broker/consumer/filter/FlagFilter.java index e6a1c1866..53b789dfa 100644 --- a/joyqueue-server/joyqueue-broker-core/src/main/java/org/joyqueue/broker/consumer/filter/FlagFilter.java +++ b/joyqueue-server/joyqueue-broker-core/src/main/java/org/joyqueue/broker/consumer/filter/FlagFilter.java @@ -88,7 +88,7 @@ private FilterResult doFilter(List messages, Pattern pattern) throws } // 是否匹配 - boolean isMatch = pattern.matcher("" + flag).matches(); + boolean isMatch = (flag == 0 || pattern.matcher("" + flag).matches()); if (isMatch) { if (i == 0) { diff --git a/joyqueue-server/joyqueue-broker-core/src/main/java/org/joyqueue/broker/monitor/service/support/DefaultConsumerMonitorService.java b/joyqueue-server/joyqueue-broker-core/src/main/java/org/joyqueue/broker/monitor/service/support/DefaultConsumerMonitorService.java index 6980c7cf5..024bc7cbe 100644 --- a/joyqueue-server/joyqueue-broker-core/src/main/java/org/joyqueue/broker/monitor/service/support/DefaultConsumerMonitorService.java +++ b/joyqueue-server/joyqueue-broker-core/src/main/java/org/joyqueue/broker/monitor/service/support/DefaultConsumerMonitorService.java @@ -25,7 +25,7 @@ import org.joyqueue.broker.monitor.stat.BrokerStat; import org.joyqueue.broker.monitor.stat.ConsumerStat; import org.joyqueue.broker.monitor.stat.PartitionGroupStat; -import org.joyqueue.broker.monitor.stat.TopicStat; +import org.joyqueue.domain.TopicConfig; import org.joyqueue.exception.JoyQueueException; import org.joyqueue.model.Pager; import org.joyqueue.monitor.ConsumerMonitorInfo; @@ -39,8 +39,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.Collections; import java.util.List; -import java.util.Map; /** * ConsumerMonitorService @@ -74,15 +74,18 @@ public Pager getConsumerInfos(int page, int pageSize) { int index = 0; List data = Lists.newArrayListWithCapacity(pageSize); - for (Map.Entry topicStatEntry : brokerStat.getTopicStats().entrySet()) { - for (Map.Entry appStatEntry : topicStatEntry.getValue().getAppStats().entrySet()) { - if (index >= startIndex && index < endIndex) { - data.add(convertConsumerMonitorInfo(appStatEntry.getValue().getConsumerStat())); - } - index ++; + for (TopicConfig topic : clusterManager.getTopics()) { + List consumers = clusterManager.getLocalConsumersByTopic(topic.getName()); + for (org.joyqueue.domain.Consumer consumer : consumers) { + AppStat appStat = brokerStat.getOrCreateTopicStat(topic.getName().getFullName()).getOrCreateAppStat(consumer.getApp()); + data.add(convertConsumerMonitorInfo(appStat.getConsumerStat())); } - total += topicStatEntry.getValue().getAppStats().size(); } + + Collections.sort(data, (o1, o2) -> { + return Long.compare(o2.getPending().getCount(), o1.getPending().getCount()); + }); + return new Pager<>(page, pageSize, total, data); } diff --git a/joyqueue-server/joyqueue-broker-core/src/main/java/org/joyqueue/broker/monitor/service/support/DefaultProducerMonitorService.java b/joyqueue-server/joyqueue-broker-core/src/main/java/org/joyqueue/broker/monitor/service/support/DefaultProducerMonitorService.java index 4d2e4c56d..07f008883 100644 --- a/joyqueue-server/joyqueue-broker-core/src/main/java/org/joyqueue/broker/monitor/service/support/DefaultProducerMonitorService.java +++ b/joyqueue-server/joyqueue-broker-core/src/main/java/org/joyqueue/broker/monitor/service/support/DefaultProducerMonitorService.java @@ -24,15 +24,15 @@ import org.joyqueue.broker.monitor.stat.PartitionGroupStat; import org.joyqueue.broker.monitor.stat.PartitionStat; import org.joyqueue.broker.monitor.stat.ProducerStat; -import org.joyqueue.broker.monitor.stat.TopicStat; +import org.joyqueue.domain.TopicConfig; import org.joyqueue.model.Pager; import org.joyqueue.monitor.ProducerMonitorInfo; import org.joyqueue.monitor.ProducerPartitionGroupMonitorInfo; import org.joyqueue.monitor.ProducerPartitionMonitorInfo; import org.joyqueue.store.StoreManagementService; +import java.util.Collections; import java.util.List; -import java.util.Map; /** * ProducerMonitorService @@ -60,15 +60,18 @@ public Pager getProduceInfos(int page, int pageSize) { int index = 0; List data = Lists.newArrayListWithCapacity(pageSize); - for (Map.Entry topicStatEntry : brokerStat.getTopicStats().entrySet()) { - for (Map.Entry appStatEntry : topicStatEntry.getValue().getAppStats().entrySet()) { - if (index >= startIndex && index < endIndex) { - data.add(convertProducerMonitorInfo(appStatEntry.getValue().getProducerStat())); - } - index ++; + for (TopicConfig topic : clusterManager.getTopics()) { + List producers = clusterManager.getLocalProducersByTopic(topic.getName()); + for (org.joyqueue.domain.Producer producer : producers) { + AppStat appStat = brokerStat.getOrCreateTopicStat(topic.getName().getFullName()).getOrCreateAppStat(producer.getApp()); + data.add(convertProducerMonitorInfo(appStat.getProducerStat())); } - total += topicStatEntry.getValue().getAppStats().size(); } + + Collections.sort(data, (o1, o2) -> { + return Long.compare(o2.getEnQueue().getCount(), o1.getEnQueue().getCount()); + }); + return new Pager<>(page, pageSize, total, data); } diff --git a/joyqueue-server/joyqueue-broker-core/src/test/java/org/joyqueue/broker/consumer/FilterMessageSupportTest.java b/joyqueue-server/joyqueue-broker-core/src/test/java/org/joyqueue/broker/consumer/FilterMessageSupportTest.java index 7fb325eda..8cfed0895 100644 --- a/joyqueue-server/joyqueue-broker-core/src/test/java/org/joyqueue/broker/consumer/FilterMessageSupportTest.java +++ b/joyqueue-server/joyqueue-broker-core/src/test/java/org/joyqueue/broker/consumer/FilterMessageSupportTest.java @@ -68,6 +68,6 @@ public void callback(List list) throws JoyQueueException { } }); - Assert.assertEquals(1, filter.size()); + Assert.assertEquals(3, filter.size()); } } \ No newline at end of file diff --git a/joyqueue-server/joyqueue-broker-core/src/test/java/org/joyqueue/broker/consumer/filter/FlagFilterTest.java b/joyqueue-server/joyqueue-broker-core/src/test/java/org/joyqueue/broker/consumer/filter/FlagFilterTest.java index a08bcc0b9..ede986860 100644 --- a/joyqueue-server/joyqueue-broker-core/src/test/java/org/joyqueue/broker/consumer/filter/FlagFilterTest.java +++ b/joyqueue-server/joyqueue-broker-core/src/test/java/org/joyqueue/broker/consumer/filter/FlagFilterTest.java @@ -62,7 +62,7 @@ public void filter2() throws JoyQueueException { flagFilter.setRule("[9]"); List byteBufferList = new LinkedList<>(); - for (int i = 0; i < 10; i++) { + for (int i = 1; i < 10; i++) { ByteBuffer allocate = ByteBuffer.allocate(100); allocate.position(59); allocate.putShort((short) i); @@ -75,9 +75,9 @@ public void filter2() throws JoyQueueException { List filter = flagFilter.filter(byteBufferList, new FilterCallback() { @Override public void callback(List list) throws JoyQueueException { - Assert.assertEquals(9, list.size()); - for (int i = 0; i < 9; i++) { - Assert.assertEquals(i, Serializer.readFlag(list.get(i))); + Assert.assertEquals(8, list.size()); + for (int i = 0; i < 8; i++) { + Assert.assertEquals(i + 1, Serializer.readFlag(list.get(i))); } inCallback[0] = true; } @@ -92,7 +92,7 @@ public void filter3() throws JoyQueueException { flagFilter.setRule("[5, 9]"); List byteBufferList = new LinkedList<>(); - for (int i = 0; i < 10; i++) { + for (int i = 1; i < 10; i++) { ByteBuffer allocate = ByteBuffer.allocate(100); allocate.position(59); allocate.putShort((short) i); @@ -106,9 +106,9 @@ public void filter3() throws JoyQueueException { @Override public void callback(List list) throws JoyQueueException { inCallback[0]++; - Assert.assertEquals(5, list.size()); - for (int i = 0; i < 5; i++) { - Assert.assertEquals(i, Serializer.readFlag(list.get(i))); + Assert.assertEquals(4, list.size()); + for (int i = 0; i < 4; i++) { + Assert.assertEquals(i + 1, Serializer.readFlag(list.get(i))); } } }); @@ -117,13 +117,13 @@ public void callback(List list) throws JoyQueueException { Assert.assertEquals(5, Serializer.readFlag(filter1.get(0))); Assert.assertEquals(1, inCallback[0]); - List filter2 = flagFilter.filter(byteBufferList.subList(6, 10), new FilterCallback() { + List filter2 = flagFilter.filter(byteBufferList.subList(6, 9), new FilterCallback() { @Override public void callback(List list) throws JoyQueueException { inCallback[0]++; - Assert.assertEquals(3, list.size()); - for (int i = 0; i < 3; i++) { - Assert.assertEquals(i + 6, Serializer.readFlag(list.get(i))); + Assert.assertEquals(2, list.size()); + for (int i = 0; i < 2; i++) { + Assert.assertEquals(i + 7, Serializer.readFlag(list.get(i))); } } }); @@ -156,6 +156,6 @@ public void callback(List list) throws JoyQueueException { } }); - Assert.assertEquals(0, filter.size()); + Assert.assertEquals(1, filter.size()); } } \ No newline at end of file diff --git a/joyqueue-server/joyqueue-broker-kafka/src/main/java/org/joyqueue/broker/kafka/converter/CheckResultConverter.java b/joyqueue-server/joyqueue-broker-kafka/src/main/java/org/joyqueue/broker/kafka/converter/CheckResultConverter.java index 6545d7981..542a44567 100644 --- a/joyqueue-server/joyqueue-broker-kafka/src/main/java/org/joyqueue/broker/kafka/converter/CheckResultConverter.java +++ b/joyqueue-server/joyqueue-broker-kafka/src/main/java/org/joyqueue/broker/kafka/converter/CheckResultConverter.java @@ -37,10 +37,12 @@ public static short convertProduceCode(JoyQueueCode code) { // return KafkaErrorCode.TOPIC_AUTHORIZATION_FAILED.getCode(); return KafkaErrorCode.UNKNOWN_TOPIC_OR_PARTITION.getCode(); } - case FW_PUT_MESSAGE_TOPIC_NOT_WRITE: - case FW_BROKER_NOT_WRITABLE: { + case FW_PUT_MESSAGE_TOPIC_NOT_WRITE: { return KafkaErrorCode.LEADER_NOT_AVAILABLE.getCode(); } + case FW_BROKER_NOT_WRITABLE: { + return KafkaErrorCode.KAFKA_STORAGE_ERROR.getCode(); + } case FW_PRODUCE_MESSAGE_BROKER_NOT_LEADER: case FW_TOPIC_NO_PARTITIONGROUP: { return KafkaErrorCode.NOT_LEADER_FOR_PARTITION.getCode(); diff --git a/joyqueue-server/joyqueue-broker-kafka/src/main/java/org/joyqueue/broker/kafka/handler/ProduceRequestHandler.java b/joyqueue-server/joyqueue-broker-kafka/src/main/java/org/joyqueue/broker/kafka/handler/ProduceRequestHandler.java index 70b70b250..e4f7e937a 100644 --- a/joyqueue-server/joyqueue-broker-kafka/src/main/java/org/joyqueue/broker/kafka/handler/ProduceRequestHandler.java +++ b/joyqueue-server/joyqueue-broker-kafka/src/main/java/org/joyqueue/broker/kafka/handler/ProduceRequestHandler.java @@ -42,6 +42,7 @@ import org.joyqueue.domain.QosLevel; import org.joyqueue.domain.TopicConfig; import org.joyqueue.domain.TopicName; +import org.joyqueue.exception.JoyQueueCode; import org.joyqueue.message.BrokerMessage; import org.joyqueue.network.session.Connection; import org.joyqueue.network.session.Producer; @@ -241,7 +242,7 @@ protected short checkPartitionRequest(Transport transport, ProduceRequest produc } BooleanResponse checkResult = clusterManager.checkWritable(topic, producer.getApp(), clientIp, (short) partitionRequest.getPartition()); - if (!checkResult.isSuccess()) { + if (!checkResult.isSuccess() && !checkResult.getJoyQueueCode().equals(JoyQueueCode.FW_BROKER_NOT_WRITABLE)) { logger.warn("checkWritable failed, transport: {}, topic: {}, partition: {}, app: {}, code: {}", transport, topic, partitionRequest.getPartition(), producer.getApp(), checkResult.getJoyQueueCode()); return CheckResultConverter.convertProduceCode(checkResult.getJoyQueueCode()); diff --git a/joyqueue-server/joyqueue-store/joyqueue-store-core/src/main/java/org/joyqueue/store/QosStore.java b/joyqueue-server/joyqueue-store/joyqueue-store-core/src/main/java/org/joyqueue/store/QosStore.java index caf42dc4f..56ccf33bd 100644 --- a/joyqueue-server/joyqueue-store/joyqueue-store-core/src/main/java/org/joyqueue/store/QosStore.java +++ b/joyqueue-server/joyqueue-store/joyqueue-store-core/src/main/java/org/joyqueue/store/QosStore.java @@ -15,8 +15,10 @@ */ package org.joyqueue.store; +import org.apache.commons.lang3.ArrayUtils; import org.joyqueue.domain.QosLevel; import org.joyqueue.store.file.PositioningStore; +import org.joyqueue.store.message.MessageParser; import org.joyqueue.toolkit.concurrent.EventFuture; import org.joyqueue.toolkit.concurrent.EventListener; import org.slf4j.Logger; @@ -113,7 +115,20 @@ public void asyncWrite(EventListener eventListener, WriteRequest... @Override public ReadResult read(short partition, long index, int count, long maxSize) throws IOException { - - return store.read(partition, index, count, maxSize); + // TODO 临时重试 + int retry = 0; + long readIndex = 0; + ReadResult readResult = store.read(partition, index, count, maxSize); + + while (retry < 3 && readResult != null && ArrayUtils.isNotEmpty(readResult.getMessages()) + && (readIndex = MessageParser.getLong(readResult.getMessages()[0], MessageParser.INDEX)) != index) { + + retry++; + readResult = store.read(partition, index, count, maxSize); + if (logger.isDebugEnabled()) { + logger.debug("retry read store, partition: {}, index: {}, readIndex: {}", partition, index, readIndex); + } + } + return readResult; } } diff --git a/joyqueue-server/joyqueue-store/joyqueue-store-core/src/main/java/org/joyqueue/store/utils/MessageUtils.java b/joyqueue-server/joyqueue-store/joyqueue-store-core/src/main/java/org/joyqueue/store/utils/MessageUtils.java index 1ff6cdcb6..d62c5ac59 100644 --- a/joyqueue-server/joyqueue-store/joyqueue-store-core/src/main/java/org/joyqueue/store/utils/MessageUtils.java +++ b/joyqueue-server/joyqueue-store/joyqueue-store-core/src/main/java/org/joyqueue/store/utils/MessageUtils.java @@ -55,6 +55,20 @@ public static List build(int count, int bodyLength) { }).collect(Collectors.toList()); } + public static List buildBatch(short count, int bodyLength) { + byte[] body = new byte[bodyLength]; + return IntStream.range(0, count).mapToObj(i -> { + Arrays.fill(body, (byte) (i % Byte.MAX_VALUE)); + byte[][] varAtts = {body, bid, prop, expand, app}; + ByteBuffer byteBuffer = MessageParser.build(varAtts); + CRC32 crc32 = new CRC32(); + crc32.update(body); + MessageParser.setLong(byteBuffer, MessageParser.CRC, crc32.getValue()); + MessageParser.setShort(byteBuffer, MessageParser.FLAG, count); + return byteBuffer; + }).collect(Collectors.toList()); + } + public static ByteBuffer[] build1024(int count) { return IntStream.range(0, count).parallel().mapToObj(i -> ByteBuffer.wrap(Arrays.copyOf(b1024, b1024.length))).toArray(ByteBuffer[]::new); diff --git a/joyqueue-server/joyqueue-store/joyqueue-store-core/src/test/java/org/joyqueue/store/PartitionGroupStoreManagerTest.java b/joyqueue-server/joyqueue-store/joyqueue-store-core/src/test/java/org/joyqueue/store/PartitionGroupStoreManagerTest.java index 2154e3dd5..1f4727644 100644 --- a/joyqueue-server/joyqueue-store/joyqueue-store-core/src/test/java/org/joyqueue/store/PartitionGroupStoreManagerTest.java +++ b/joyqueue-server/joyqueue-store/joyqueue-store-core/src/test/java/org/joyqueue/store/PartitionGroupStoreManagerTest.java @@ -17,6 +17,8 @@ import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.serializer.SerializerFeature; +import org.apache.commons.lang3.ArrayUtils; +import org.apache.commons.lang3.RandomUtils; import org.joyqueue.domain.QosLevel; import org.joyqueue.exception.JoyQueueCode; import org.joyqueue.store.file.Checkpoint; @@ -93,6 +95,83 @@ public void writeReadTest() throws Exception { } +// @Test + public void batchWriteReadTest() throws Exception { + short partition = partitions[0]; + int timeout = 1000 * 60 * 10; + + LoopThread commitThread = LoopThread.builder() + .name(String.format("CommitThread-%s-%d", topic, partitionGroup)) + .doWork(()-> store.commit(store.rightPosition())) + .sleepTime(0L, 10L) + .onException(e -> logger.warn("Commit Exception: ", e)) + .build(); + commitThread.start(); + + try { + new Thread(() -> { + while (true) { + List messages = null; + if (RandomUtils.nextInt(0, 100) >= 50) { + messages = MessageUtils.build(RandomUtils.nextInt(1, 30), RandomUtils.nextInt(100, 2048)); + } else { + messages = MessageUtils.buildBatch((short) RandomUtils.nextInt(1, 30), RandomUtils.nextInt(100, 2048)); + } + + EventFuture future = new EventFuture<>(); + store.asyncWrite(QosLevel.RECEIVE, future, new WriteRequest(partition, messages.get(0))); + WriteResult writeResult = null; + try { + writeResult = future.get(); + } catch (InterruptedException e) { + } + Assert.assertEquals(JoyQueueCode.SUCCESS, writeResult.getCode()); + } + }).start(); + + new Thread(() -> { + long index = 0; + while (true) { + try { + ReadResult readResult = null; + try { + readResult = store.read(partition, index, 10, 0); + } catch (IOException e) { + } + Assert.assertEquals(JoyQueueCode.SUCCESS, readResult.getCode()); + + if (ArrayUtils.isNotEmpty(readResult.getMessages())) { + Assert.assertEquals(index, MessageParser.getLong(readResult.getMessages()[0], MessageParser.INDEX)); + + for (ByteBuffer readBuffer : readResult.getMessages()) { + if (MessageParser.getShort(readBuffer, MessageParser.FLAG) == 0) { + index += 1; + } else { + index += MessageParser.getShort(readBuffer, MessageParser.FLAG); + } + } + } + } catch (Exception e) { + System.out.println(e.toString()); + try { + Thread.currentThread().sleep(1000 * 1); + } catch (InterruptedException ex) { + } + } + } + }).start(); + + long startTime = SystemClock.now(); + while (true) { + if (SystemClock.now() - startTime > timeout) { + Thread.currentThread().sleep(1000 * 1); + } + } + } finally { + commitThread.stop(); + } + } + @Test public void indexLengthTest() throws Exception { @@ -361,6 +440,7 @@ public void rePartitionTest() throws Exception { this.store.start(); this.store.enable(); } + @Test public void changeFileSizeTest() throws Exception { int count = 1024;