Skip to content

Commit

Permalink
Merge branch 'master' of https://github.com/chubaostream/joyqueue
Browse files Browse the repository at this point in the history
  • Loading branch information
gaohaoxiang committed Sep 9, 2020
2 parents be0d597 + 55bc444 commit 5f1c9a3
Show file tree
Hide file tree
Showing 12 changed files with 161 additions and 48 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -867,15 +867,7 @@ public List<String> getLocalSubscribeAppByTopic(TopicName topic) {
}

public List<Producer> getLocalProducersByTopic(TopicName topic) {
Map<String, MetaDataLocalCache.CacheProducer> producers = localCache.getTopicProducers(topic);
if (MapUtils.isEmpty(producers)) {
return Collections.emptyList();
}
List<Producer> result = Lists.newLinkedList();
for (Map.Entry<String, MetaDataLocalCache.CacheProducer> entry : producers.entrySet()) {
result.add(entry.getValue().getProducer());
}
return result;
return Lists.newArrayList(nameService.getProducerByTopic(topic));
}

public List<Consumer> getLocalConsumersByTopic(TopicName topic) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -534,10 +534,10 @@ private boolean extendSlideWindowAndUpdatePullIndex(
if (logger.isDebugEnabled()) {
logger.debug("set new pull index:{}, topic:{}, app:{}, partition:{}", newPullIndex, consumer.getTopic(), consumer.getApp(), partition);
}
positionManager.updateLastMsgPullIndex(TopicName.parse(consumer.getTopic()), consumer.getApp(), partition, newPullIndex);
if (consumedMessages.isReset()) {
positionManager.updateLastMsgAckIndex(TopicName.parse(consumer.getTopic()), consumer.getApp(), partition, consumedMessages.getStartIndex(), false);
positionManager.updateLastMsgAckIndex(TopicName.parse(consumer.getTopic()), consumer.getApp(), partition, pullIndex);
}
positionManager.updateLastMsgPullIndex(TopicName.parse(consumer.getTopic()), consumer.getApp(), partition, newPullIndex);
return true;
} else {
return false;
Expand Down Expand Up @@ -566,6 +566,9 @@ private int count(List<ByteBuffer> buffers) {
* @return 消息序号
*/
private long getPullIndex(Consumer consumer, short partition) throws JoyQueueException {
if (partition == Partition.RETRY_PARTITION_ID) {
return 0;
}
// 本次拉取消息的位置,默认从0开始ack
long pullIndex = 0;
String topic = consumer.getTopic();
Expand Down Expand Up @@ -930,7 +933,7 @@ boolean isAcked() {
return status.get() == ACKED;
}

public boolean isReset() {
boolean isReset() {
return reset;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ private FilterResult doFilter(List<ByteBuffer> messages, Pattern pattern) throws
}

// 是否匹配
boolean isMatch = pattern.matcher("" + flag).matches();
boolean isMatch = (flag == 0 || pattern.matcher("" + flag).matches());

if (isMatch) {
if (i == 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
import org.joyqueue.broker.monitor.stat.BrokerStat;
import org.joyqueue.broker.monitor.stat.ConsumerStat;
import org.joyqueue.broker.monitor.stat.PartitionGroupStat;
import org.joyqueue.broker.monitor.stat.TopicStat;
import org.joyqueue.domain.TopicConfig;
import org.joyqueue.exception.JoyQueueException;
import org.joyqueue.model.Pager;
import org.joyqueue.monitor.ConsumerMonitorInfo;
Expand All @@ -39,8 +39,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Collections;
import java.util.List;
import java.util.Map;

/**
* ConsumerMonitorService
Expand Down Expand Up @@ -74,15 +74,18 @@ public Pager<ConsumerMonitorInfo> getConsumerInfos(int page, int pageSize) {
int index = 0;
List<ConsumerMonitorInfo> data = Lists.newArrayListWithCapacity(pageSize);

for (Map.Entry<String, TopicStat> topicStatEntry : brokerStat.getTopicStats().entrySet()) {
for (Map.Entry<String, AppStat> appStatEntry : topicStatEntry.getValue().getAppStats().entrySet()) {
if (index >= startIndex && index < endIndex) {
data.add(convertConsumerMonitorInfo(appStatEntry.getValue().getConsumerStat()));
}
index ++;
for (TopicConfig topic : clusterManager.getTopics()) {
List<org.joyqueue.domain.Consumer> consumers = clusterManager.getLocalConsumersByTopic(topic.getName());
for (org.joyqueue.domain.Consumer consumer : consumers) {
AppStat appStat = brokerStat.getOrCreateTopicStat(topic.getName().getFullName()).getOrCreateAppStat(consumer.getApp());
data.add(convertConsumerMonitorInfo(appStat.getConsumerStat()));
}
total += topicStatEntry.getValue().getAppStats().size();
}

Collections.sort(data, (o1, o2) -> {
return Long.compare(o2.getPending().getCount(), o1.getPending().getCount());
});

return new Pager<>(page, pageSize, total, data);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,15 @@
import org.joyqueue.broker.monitor.stat.PartitionGroupStat;
import org.joyqueue.broker.monitor.stat.PartitionStat;
import org.joyqueue.broker.monitor.stat.ProducerStat;
import org.joyqueue.broker.monitor.stat.TopicStat;
import org.joyqueue.domain.TopicConfig;
import org.joyqueue.model.Pager;
import org.joyqueue.monitor.ProducerMonitorInfo;
import org.joyqueue.monitor.ProducerPartitionGroupMonitorInfo;
import org.joyqueue.monitor.ProducerPartitionMonitorInfo;
import org.joyqueue.store.StoreManagementService;

import java.util.Collections;
import java.util.List;
import java.util.Map;

/**
* ProducerMonitorService
Expand Down Expand Up @@ -60,15 +60,18 @@ public Pager<ProducerMonitorInfo> getProduceInfos(int page, int pageSize) {
int index = 0;
List<ProducerMonitorInfo> data = Lists.newArrayListWithCapacity(pageSize);

for (Map.Entry<String, TopicStat> topicStatEntry : brokerStat.getTopicStats().entrySet()) {
for (Map.Entry<String, AppStat> appStatEntry : topicStatEntry.getValue().getAppStats().entrySet()) {
if (index >= startIndex && index < endIndex) {
data.add(convertProducerMonitorInfo(appStatEntry.getValue().getProducerStat()));
}
index ++;
for (TopicConfig topic : clusterManager.getTopics()) {
List<org.joyqueue.domain.Producer> producers = clusterManager.getLocalProducersByTopic(topic.getName());
for (org.joyqueue.domain.Producer producer : producers) {
AppStat appStat = brokerStat.getOrCreateTopicStat(topic.getName().getFullName()).getOrCreateAppStat(producer.getApp());
data.add(convertProducerMonitorInfo(appStat.getProducerStat()));
}
total += topicStatEntry.getValue().getAppStats().size();
}

Collections.sort(data, (o1, o2) -> {
return Long.compare(o2.getEnQueue().getCount(), o1.getEnQueue().getCount());
});

return new Pager<>(page, pageSize, total, data);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,6 @@ public void callback(List<ByteBuffer> list) throws JoyQueueException {
}
});

Assert.assertEquals(1, filter.size());
Assert.assertEquals(3, filter.size());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -75,9 +75,9 @@ public void filter2() throws JoyQueueException {
List<ByteBuffer> filter = flagFilter.filter(byteBufferList, new FilterCallback() {
@Override
public void callback(List<ByteBuffer> list) throws JoyQueueException {
Assert.assertEquals(9, list.size());
for (int i = 0; i < 9; i++) {
Assert.assertEquals(i, Serializer.readFlag(list.get(i)));
Assert.assertEquals(8, list.size());
for (int i = 0; i < 8; i++) {
Assert.assertEquals(i + 1, Serializer.readFlag(list.get(i)));
}
inCallback[0] = true;
}
Expand Down Expand Up @@ -106,9 +106,9 @@ public void filter3() throws JoyQueueException {
@Override
public void callback(List<ByteBuffer> list) throws JoyQueueException {
inCallback[0]++;
Assert.assertEquals(5, list.size());
for (int i = 0; i < 5; i++) {
Assert.assertEquals(i, Serializer.readFlag(list.get(i)));
Assert.assertEquals(4, list.size());
for (int i = 0; i < 4; i++) {
Assert.assertEquals(i + 1, Serializer.readFlag(list.get(i)));
}
}
});
Expand All @@ -117,13 +117,13 @@ public void callback(List<ByteBuffer> list) throws JoyQueueException {
Assert.assertEquals(5, Serializer.readFlag(filter1.get(0)));
Assert.assertEquals(1, inCallback[0]);

List<ByteBuffer> filter2 = flagFilter.filter(byteBufferList.subList(6, 10), new FilterCallback() {
List<ByteBuffer> filter2 = flagFilter.filter(byteBufferList.subList(6, 9), new FilterCallback() {
@Override
public void callback(List<ByteBuffer> list) throws JoyQueueException {
inCallback[0]++;
Assert.assertEquals(3, list.size());
for (int i = 0; i < 3; i++) {
Assert.assertEquals(i + 6, Serializer.readFlag(list.get(i)));
Assert.assertEquals(2, list.size());
for (int i = 0; i < 2; i++) {
Assert.assertEquals(i + 7, Serializer.readFlag(list.get(i)));
}
}
});
Expand Down Expand Up @@ -156,6 +156,6 @@ public void callback(List<ByteBuffer> list) throws JoyQueueException {
}
});

Assert.assertEquals(0, filter.size());
Assert.assertEquals(1, filter.size());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,12 @@ public static short convertProduceCode(JoyQueueCode code) {
// return KafkaErrorCode.TOPIC_AUTHORIZATION_FAILED.getCode();
return KafkaErrorCode.UNKNOWN_TOPIC_OR_PARTITION.getCode();
}
case FW_PUT_MESSAGE_TOPIC_NOT_WRITE:
case FW_BROKER_NOT_WRITABLE: {
case FW_PUT_MESSAGE_TOPIC_NOT_WRITE: {
return KafkaErrorCode.LEADER_NOT_AVAILABLE.getCode();
}
case FW_BROKER_NOT_WRITABLE: {
return KafkaErrorCode.KAFKA_STORAGE_ERROR.getCode();
}
case FW_PRODUCE_MESSAGE_BROKER_NOT_LEADER:
case FW_TOPIC_NO_PARTITIONGROUP: {
return KafkaErrorCode.NOT_LEADER_FOR_PARTITION.getCode();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.joyqueue.domain.QosLevel;
import org.joyqueue.domain.TopicConfig;
import org.joyqueue.domain.TopicName;
import org.joyqueue.exception.JoyQueueCode;
import org.joyqueue.message.BrokerMessage;
import org.joyqueue.network.session.Connection;
import org.joyqueue.network.session.Producer;
Expand Down Expand Up @@ -241,7 +242,7 @@ protected short checkPartitionRequest(Transport transport, ProduceRequest produc
}

BooleanResponse checkResult = clusterManager.checkWritable(topic, producer.getApp(), clientIp, (short) partitionRequest.getPartition());
if (!checkResult.isSuccess()) {
if (!checkResult.isSuccess() && !checkResult.getJoyQueueCode().equals(JoyQueueCode.FW_BROKER_NOT_WRITABLE)) {
logger.warn("checkWritable failed, transport: {}, topic: {}, partition: {}, app: {}, code: {}",
transport, topic, partitionRequest.getPartition(), producer.getApp(), checkResult.getJoyQueueCode());
return CheckResultConverter.convertProduceCode(checkResult.getJoyQueueCode());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,10 @@
*/
package org.joyqueue.store;

import org.apache.commons.lang3.ArrayUtils;
import org.joyqueue.domain.QosLevel;
import org.joyqueue.store.file.PositioningStore;
import org.joyqueue.store.message.MessageParser;
import org.joyqueue.toolkit.concurrent.EventFuture;
import org.joyqueue.toolkit.concurrent.EventListener;
import org.slf4j.Logger;
Expand Down Expand Up @@ -113,7 +115,20 @@ public void asyncWrite(EventListener<WriteResult> eventListener, WriteRequest...

@Override
public ReadResult read(short partition, long index, int count, long maxSize) throws IOException {

return store.read(partition, index, count, maxSize);
// TODO 临时重试
int retry = 0;
long readIndex = 0;
ReadResult readResult = store.read(partition, index, count, maxSize);

while (retry < 3 && readResult != null && ArrayUtils.isNotEmpty(readResult.getMessages())
&& (readIndex = MessageParser.getLong(readResult.getMessages()[0], MessageParser.INDEX)) != index) {

retry++;
readResult = store.read(partition, index, count, maxSize);
if (logger.isDebugEnabled()) {
logger.debug("retry read store, partition: {}, index: {}, readIndex: {}", partition, index, readIndex);
}
}
return readResult;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,20 @@ public static List<ByteBuffer> build(int count, int bodyLength) {
}).collect(Collectors.toList());
}

public static List<ByteBuffer> buildBatch(short count, int bodyLength) {
byte[] body = new byte[bodyLength];
return IntStream.range(0, count).mapToObj(i -> {
Arrays.fill(body, (byte) (i % Byte.MAX_VALUE));
byte[][] varAtts = {body, bid, prop, expand, app};
ByteBuffer byteBuffer = MessageParser.build(varAtts);
CRC32 crc32 = new CRC32();
crc32.update(body);
MessageParser.setLong(byteBuffer, MessageParser.CRC, crc32.getValue());
MessageParser.setShort(byteBuffer, MessageParser.FLAG, count);
return byteBuffer;
}).collect(Collectors.toList());
}

public static ByteBuffer[] build1024(int count) {

return IntStream.range(0, count).parallel().mapToObj(i -> ByteBuffer.wrap(Arrays.copyOf(b1024, b1024.length))).toArray(ByteBuffer[]::new);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.serializer.SerializerFeature;
import org.apache.commons.lang3.ArrayUtils;
import org.apache.commons.lang3.RandomUtils;
import org.joyqueue.domain.QosLevel;
import org.joyqueue.exception.JoyQueueCode;
import org.joyqueue.store.file.Checkpoint;
Expand Down Expand Up @@ -93,6 +95,83 @@ public void writeReadTest() throws Exception {

}

// @Test
public void batchWriteReadTest() throws Exception {
short partition = partitions[0];
int timeout = 1000 * 60 * 10;

LoopThread commitThread = LoopThread.builder()
.name(String.format("CommitThread-%s-%d", topic, partitionGroup))
.doWork(()-> store.commit(store.rightPosition()))
.sleepTime(0L, 10L)
.onException(e -> logger.warn("Commit Exception: ", e))
.build();
commitThread.start();

try {
new Thread(() -> {
while (true) {
List<ByteBuffer> messages = null;
if (RandomUtils.nextInt(0, 100) >= 50) {
messages = MessageUtils.build(RandomUtils.nextInt(1, 30), RandomUtils.nextInt(100, 2048));
} else {
messages = MessageUtils.buildBatch((short) RandomUtils.nextInt(1, 30), RandomUtils.nextInt(100, 2048));
}

EventFuture<WriteResult> future = new EventFuture<>();
store.asyncWrite(QosLevel.RECEIVE, future, new WriteRequest(partition, messages.get(0)));
WriteResult writeResult = null;
try {
writeResult = future.get();
} catch (InterruptedException e) {
}
Assert.assertEquals(JoyQueueCode.SUCCESS, writeResult.getCode());
}
}).start();

new Thread(() -> {
long index = 0;
while (true) {
try {
ReadResult readResult = null;
try {
readResult = store.read(partition, index, 10, 0);
} catch (IOException e) {
}
Assert.assertEquals(JoyQueueCode.SUCCESS, readResult.getCode());

if (ArrayUtils.isNotEmpty(readResult.getMessages())) {
Assert.assertEquals(index, MessageParser.getLong(readResult.getMessages()[0], MessageParser.INDEX));

for (ByteBuffer readBuffer : readResult.getMessages()) {
if (MessageParser.getShort(readBuffer, MessageParser.FLAG) == 0) {
index += 1;
} else {
index += MessageParser.getShort(readBuffer, MessageParser.FLAG);
}
}
}
} catch (Exception e) {
System.out.println(e.toString());
try {
Thread.currentThread().sleep(1000 * 1);
} catch (InterruptedException ex) {
}
}
}
}).start();

long startTime = SystemClock.now();
while (true) {
if (SystemClock.now() - startTime > timeout) {
Thread.currentThread().sleep(1000 * 1);
}
}
} finally {
commitThread.stop();
}
}


@Test
public void indexLengthTest() throws Exception {
Expand Down Expand Up @@ -361,6 +440,7 @@ public void rePartitionTest() throws Exception {
this.store.start();
this.store.enable();
}

@Test
public void changeFileSizeTest() throws Exception {
int count = 1024;
Expand Down

0 comments on commit 5f1c9a3

Please sign in to comment.