Skip to content

Commit

Permalink
修复并行消费问题 (#304)
Browse files Browse the repository at this point in the history
* 修复并行消费问题
  • Loading branch information
llIlll committed Sep 9, 2020
1 parent dc59c6b commit 55bc444
Show file tree
Hide file tree
Showing 12 changed files with 163 additions and 50 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -867,15 +867,7 @@ public List<String> getLocalSubscribeAppByTopic(TopicName topic) {
}

public List<Producer> getLocalProducersByTopic(TopicName topic) {
Map<String, MetaDataLocalCache.CacheProducer> producers = localCache.getTopicProducers(topic);
if (MapUtils.isEmpty(producers)) {
return Collections.emptyList();
}
List<Producer> result = Lists.newLinkedList();
for (Map.Entry<String, MetaDataLocalCache.CacheProducer> entry : producers.entrySet()) {
result.add(entry.getValue().getProducer());
}
return result;
return Lists.newArrayList(nameService.getProducerByTopic(topic));
}

public List<Consumer> getLocalConsumersByTopic(TopicName topic) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -534,10 +534,10 @@ private boolean extendSlideWindowAndUpdatePullIndex(
if (logger.isDebugEnabled()) {
logger.debug("set new pull index:{}, topic:{}, app:{}, partition:{}", newPullIndex, consumer.getTopic(), consumer.getApp(), partition);
}
positionManager.updateLastMsgPullIndex(TopicName.parse(consumer.getTopic()), consumer.getApp(), partition, newPullIndex);
if (consumedMessages.isReset()) {
positionManager.updateLastMsgAckIndex(TopicName.parse(consumer.getTopic()), consumer.getApp(), partition, consumedMessages.getStartIndex(), false);
positionManager.updateLastMsgAckIndex(TopicName.parse(consumer.getTopic()), consumer.getApp(), partition, pullIndex);
}
positionManager.updateLastMsgPullIndex(TopicName.parse(consumer.getTopic()), consumer.getApp(), partition, newPullIndex);
return true;
} else {
return false;
Expand Down Expand Up @@ -566,6 +566,9 @@ private int count(List<ByteBuffer> buffers) {
* @return 消息序号
*/
private long getPullIndex(Consumer consumer, short partition) throws JoyQueueException {
if (partition == Partition.RETRY_PARTITION_ID) {
return 0;
}
// 本次拉取消息的位置,默认从0开始ack
long pullIndex = 0;
String topic = consumer.getTopic();
Expand Down Expand Up @@ -930,7 +933,7 @@ boolean isAcked() {
return status.get() == ACKED;
}

public boolean isReset() {
boolean isReset() {
return reset;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ private FilterResult doFilter(List<ByteBuffer> messages, Pattern pattern) throws
}

// 是否匹配
boolean isMatch = pattern.matcher("" + flag).matches();
boolean isMatch = (flag == 0 || pattern.matcher("" + flag).matches());

if (isMatch) {
if (i == 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
import org.joyqueue.broker.monitor.stat.BrokerStat;
import org.joyqueue.broker.monitor.stat.ConsumerStat;
import org.joyqueue.broker.monitor.stat.PartitionGroupStat;
import org.joyqueue.broker.monitor.stat.TopicStat;
import org.joyqueue.domain.TopicConfig;
import org.joyqueue.exception.JoyQueueException;
import org.joyqueue.model.Pager;
import org.joyqueue.monitor.ConsumerMonitorInfo;
Expand All @@ -39,8 +39,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Collections;
import java.util.List;
import java.util.Map;

/**
* ConsumerMonitorService
Expand Down Expand Up @@ -74,15 +74,18 @@ public Pager<ConsumerMonitorInfo> getConsumerInfos(int page, int pageSize) {
int index = 0;
List<ConsumerMonitorInfo> data = Lists.newArrayListWithCapacity(pageSize);

for (Map.Entry<String, TopicStat> topicStatEntry : brokerStat.getTopicStats().entrySet()) {
for (Map.Entry<String, AppStat> appStatEntry : topicStatEntry.getValue().getAppStats().entrySet()) {
if (index >= startIndex && index < endIndex) {
data.add(convertConsumerMonitorInfo(appStatEntry.getValue().getConsumerStat()));
}
index ++;
for (TopicConfig topic : clusterManager.getTopics()) {
List<org.joyqueue.domain.Consumer> consumers = clusterManager.getLocalConsumersByTopic(topic.getName());
for (org.joyqueue.domain.Consumer consumer : consumers) {
AppStat appStat = brokerStat.getOrCreateTopicStat(topic.getName().getFullName()).getOrCreateAppStat(consumer.getApp());
data.add(convertConsumerMonitorInfo(appStat.getConsumerStat()));
}
total += topicStatEntry.getValue().getAppStats().size();
}

Collections.sort(data, (o1, o2) -> {
return Long.compare(o2.getPending().getCount(), o1.getPending().getCount());
});

return new Pager<>(page, pageSize, total, data);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,15 @@
import org.joyqueue.broker.monitor.stat.PartitionGroupStat;
import org.joyqueue.broker.monitor.stat.PartitionStat;
import org.joyqueue.broker.monitor.stat.ProducerStat;
import org.joyqueue.broker.monitor.stat.TopicStat;
import org.joyqueue.domain.TopicConfig;
import org.joyqueue.model.Pager;
import org.joyqueue.monitor.ProducerMonitorInfo;
import org.joyqueue.monitor.ProducerPartitionGroupMonitorInfo;
import org.joyqueue.monitor.ProducerPartitionMonitorInfo;
import org.joyqueue.store.StoreManagementService;

import java.util.Collections;
import java.util.List;
import java.util.Map;

/**
* ProducerMonitorService
Expand Down Expand Up @@ -60,15 +60,18 @@ public Pager<ProducerMonitorInfo> getProduceInfos(int page, int pageSize) {
int index = 0;
List<ProducerMonitorInfo> data = Lists.newArrayListWithCapacity(pageSize);

for (Map.Entry<String, TopicStat> topicStatEntry : brokerStat.getTopicStats().entrySet()) {
for (Map.Entry<String, AppStat> appStatEntry : topicStatEntry.getValue().getAppStats().entrySet()) {
if (index >= startIndex && index < endIndex) {
data.add(convertProducerMonitorInfo(appStatEntry.getValue().getProducerStat()));
}
index ++;
for (TopicConfig topic : clusterManager.getTopics()) {
List<org.joyqueue.domain.Producer> producers = clusterManager.getLocalProducersByTopic(topic.getName());
for (org.joyqueue.domain.Producer producer : producers) {
AppStat appStat = brokerStat.getOrCreateTopicStat(topic.getName().getFullName()).getOrCreateAppStat(producer.getApp());
data.add(convertProducerMonitorInfo(appStat.getProducerStat()));
}
total += topicStatEntry.getValue().getAppStats().size();
}

Collections.sort(data, (o1, o2) -> {
return Long.compare(o2.getEnQueue().getCount(), o1.getEnQueue().getCount());
});

return new Pager<>(page, pageSize, total, data);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,6 @@ public void callback(List<ByteBuffer> list) throws JoyQueueException {
}
});

Assert.assertEquals(1, filter.size());
Assert.assertEquals(3, filter.size());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ public void filter2() throws JoyQueueException {
flagFilter.setRule("[9]");

List<ByteBuffer> byteBufferList = new LinkedList<>();
for (int i = 0; i < 10; i++) {
for (int i = 1; i < 10; i++) {
ByteBuffer allocate = ByteBuffer.allocate(100);
allocate.position(59);
allocate.putShort((short) i);
Expand All @@ -75,9 +75,9 @@ public void filter2() throws JoyQueueException {
List<ByteBuffer> filter = flagFilter.filter(byteBufferList, new FilterCallback() {
@Override
public void callback(List<ByteBuffer> list) throws JoyQueueException {
Assert.assertEquals(9, list.size());
for (int i = 0; i < 9; i++) {
Assert.assertEquals(i, Serializer.readFlag(list.get(i)));
Assert.assertEquals(8, list.size());
for (int i = 0; i < 8; i++) {
Assert.assertEquals(i + 1, Serializer.readFlag(list.get(i)));
}
inCallback[0] = true;
}
Expand All @@ -92,7 +92,7 @@ public void filter3() throws JoyQueueException {
flagFilter.setRule("[5, 9]");

List<ByteBuffer> byteBufferList = new LinkedList<>();
for (int i = 0; i < 10; i++) {
for (int i = 1; i < 10; i++) {
ByteBuffer allocate = ByteBuffer.allocate(100);
allocate.position(59);
allocate.putShort((short) i);
Expand All @@ -106,9 +106,9 @@ public void filter3() throws JoyQueueException {
@Override
public void callback(List<ByteBuffer> list) throws JoyQueueException {
inCallback[0]++;
Assert.assertEquals(5, list.size());
for (int i = 0; i < 5; i++) {
Assert.assertEquals(i, Serializer.readFlag(list.get(i)));
Assert.assertEquals(4, list.size());
for (int i = 0; i < 4; i++) {
Assert.assertEquals(i + 1, Serializer.readFlag(list.get(i)));
}
}
});
Expand All @@ -117,13 +117,13 @@ public void callback(List<ByteBuffer> list) throws JoyQueueException {
Assert.assertEquals(5, Serializer.readFlag(filter1.get(0)));
Assert.assertEquals(1, inCallback[0]);

List<ByteBuffer> filter2 = flagFilter.filter(byteBufferList.subList(6, 10), new FilterCallback() {
List<ByteBuffer> filter2 = flagFilter.filter(byteBufferList.subList(6, 9), new FilterCallback() {
@Override
public void callback(List<ByteBuffer> list) throws JoyQueueException {
inCallback[0]++;
Assert.assertEquals(3, list.size());
for (int i = 0; i < 3; i++) {
Assert.assertEquals(i + 6, Serializer.readFlag(list.get(i)));
Assert.assertEquals(2, list.size());
for (int i = 0; i < 2; i++) {
Assert.assertEquals(i + 7, Serializer.readFlag(list.get(i)));
}
}
});
Expand Down Expand Up @@ -156,6 +156,6 @@ public void callback(List<ByteBuffer> list) throws JoyQueueException {
}
});

Assert.assertEquals(0, filter.size());
Assert.assertEquals(1, filter.size());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,12 @@ public static short convertProduceCode(JoyQueueCode code) {
// return KafkaErrorCode.TOPIC_AUTHORIZATION_FAILED.getCode();
return KafkaErrorCode.UNKNOWN_TOPIC_OR_PARTITION.getCode();
}
case FW_PUT_MESSAGE_TOPIC_NOT_WRITE:
case FW_BROKER_NOT_WRITABLE: {
case FW_PUT_MESSAGE_TOPIC_NOT_WRITE: {
return KafkaErrorCode.LEADER_NOT_AVAILABLE.getCode();
}
case FW_BROKER_NOT_WRITABLE: {
return KafkaErrorCode.KAFKA_STORAGE_ERROR.getCode();
}
case FW_PRODUCE_MESSAGE_BROKER_NOT_LEADER:
case FW_TOPIC_NO_PARTITIONGROUP: {
return KafkaErrorCode.NOT_LEADER_FOR_PARTITION.getCode();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.joyqueue.domain.QosLevel;
import org.joyqueue.domain.TopicConfig;
import org.joyqueue.domain.TopicName;
import org.joyqueue.exception.JoyQueueCode;
import org.joyqueue.message.BrokerMessage;
import org.joyqueue.network.session.Connection;
import org.joyqueue.network.session.Producer;
Expand Down Expand Up @@ -241,7 +242,7 @@ protected short checkPartitionRequest(Transport transport, ProduceRequest produc
}

BooleanResponse checkResult = clusterManager.checkWritable(topic, producer.getApp(), clientIp, (short) partitionRequest.getPartition());
if (!checkResult.isSuccess()) {
if (!checkResult.isSuccess() && !checkResult.getJoyQueueCode().equals(JoyQueueCode.FW_BROKER_NOT_WRITABLE)) {
logger.warn("checkWritable failed, transport: {}, topic: {}, partition: {}, app: {}, code: {}",
transport, topic, partitionRequest.getPartition(), producer.getApp(), checkResult.getJoyQueueCode());
return CheckResultConverter.convertProduceCode(checkResult.getJoyQueueCode());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,10 @@
*/
package org.joyqueue.store;

import org.apache.commons.lang3.ArrayUtils;
import org.joyqueue.domain.QosLevel;
import org.joyqueue.store.file.PositioningStore;
import org.joyqueue.store.message.MessageParser;
import org.joyqueue.toolkit.concurrent.EventFuture;
import org.joyqueue.toolkit.concurrent.EventListener;
import org.slf4j.Logger;
Expand Down Expand Up @@ -113,7 +115,20 @@ public void asyncWrite(EventListener<WriteResult> eventListener, WriteRequest...

@Override
public ReadResult read(short partition, long index, int count, long maxSize) throws IOException {

return store.read(partition, index, count, maxSize);
// TODO 临时重试
int retry = 0;
long readIndex = 0;
ReadResult readResult = store.read(partition, index, count, maxSize);

while (retry < 3 && readResult != null && ArrayUtils.isNotEmpty(readResult.getMessages())
&& (readIndex = MessageParser.getLong(readResult.getMessages()[0], MessageParser.INDEX)) != index) {

retry++;
readResult = store.read(partition, index, count, maxSize);
if (logger.isDebugEnabled()) {
logger.debug("retry read store, partition: {}, index: {}, readIndex: {}", partition, index, readIndex);
}
}
return readResult;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,20 @@ public static List<ByteBuffer> build(int count, int bodyLength) {
}).collect(Collectors.toList());
}

public static List<ByteBuffer> buildBatch(short count, int bodyLength) {
byte[] body = new byte[bodyLength];
return IntStream.range(0, count).mapToObj(i -> {
Arrays.fill(body, (byte) (i % Byte.MAX_VALUE));
byte[][] varAtts = {body, bid, prop, expand, app};
ByteBuffer byteBuffer = MessageParser.build(varAtts);
CRC32 crc32 = new CRC32();
crc32.update(body);
MessageParser.setLong(byteBuffer, MessageParser.CRC, crc32.getValue());
MessageParser.setShort(byteBuffer, MessageParser.FLAG, count);
return byteBuffer;
}).collect(Collectors.toList());
}

public static ByteBuffer[] build1024(int count) {

return IntStream.range(0, count).parallel().mapToObj(i -> ByteBuffer.wrap(Arrays.copyOf(b1024, b1024.length))).toArray(ByteBuffer[]::new);
Expand Down
Loading

0 comments on commit 55bc444

Please sign in to comment.