Skip to content

Commit

Permalink
Merge branch 'master' of https://github.com/chubaostream/joyqueue
Browse files Browse the repository at this point in the history
  • Loading branch information
yenniechen committed Sep 14, 2020
2 parents 306546d + f0b8c62 commit b1e1e7c
Show file tree
Hide file tree
Showing 433 changed files with 8,387 additions and 2,442 deletions.
2 changes: 1 addition & 1 deletion docker/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
<parent>
<artifactId>joyqueue</artifactId>
<groupId>org.joyqueue</groupId>
<version>4.2.5.4-SNAPSHOT</version>
<version>4.2.7.RC3</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>joyqueue-docker</artifactId>
Expand Down
2 changes: 1 addition & 1 deletion docker/server/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
<parent>
<artifactId>joyqueue-docker</artifactId>
<groupId>org.joyqueue</groupId>
<version>4.2.5.4-SNAPSHOT</version>
<version>4.2.7.RC3</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>docker-server</artifactId>
Expand Down
2 changes: 1 addition & 1 deletion docker/web/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
<parent>
<artifactId>joyqueue-docker</artifactId>
<groupId>org.joyqueue</groupId>
<version>4.2.5.4-SNAPSHOT</version>
<version>4.2.7.RC3</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>docker-web</artifactId>
Expand Down
47 changes: 37 additions & 10 deletions docs/cn/configurations.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,25 @@ broker.frontend-server.transport.server.port | 50088 | JoyQueue Server与客户
broker.opts.memory | -Xms2G -Xmx2G -server -Xss256K -XX:SurvivorRatio=8 -XX:+UseConcMarkSweepGC -XX:+UseCMSCompactAtFullCollection -XX:CMSInitiatingOccupancyFraction=70 -XX:+CMSParallelRemarkEnabled -XX:SoftRefLRUPolicyMSPerMB=0 -XX:CMSMaxAbortablePrecleanTime=20 -XX:-OmitStackTraceInFastThrow -XX:MaxDirectMemorySize=2G | jvm 参数。
store.message.file.size | 128 MB | 消息文件大小
store.index.file.size | 512 KB | 索引文件大小
store.preload.buffer.core.count | 3 | 预加载DirectBuffer的核心数量
store.preload.buffer.max.count | 10 | 预加载DirectBuffer的最大数量
store.max.message.length | 4 MB | 每条消息的最大长度
store.write.request.cache.size | 1024 | 写入请求缓存中,最多缓存的请求数量
store.write.timeout | 3000 ms | 存储写入超时时间
store.flush.interval | 20 ms | 存储刷盘时间间隔
store.max.dirty.size | 10 MB| 脏数据的最大长度,如果内存中未刷盘的脏数据长度超过这个值,将阻塞消息写入。
store.index.file.load | false | 读取消息文件时,如果没有命中缓存,是否在内存中加载整个文件。设置为true时,整个文件就会被缓存到内存中,后续连续读取这个文件时性能更好。但加载文件太大时,会导致消费时延抖动。如果内存不够大,频繁的发生换页,也会导致文件缓存页被反复卸载再加载,反而引起不必要的磁盘IO,拖慢读取性能。<br/>设置为false时,由操作系统控制的PageCache作为读缓存。
store.index.file.load | true | 读取索引文件时,是否在内存中加载整个文件。
store.preload.buffer.core.count | 3 | 在写入时,数据先写入文件对应的缓存页,然后异步刷盘到文件中。每次更换一个新文件时,都需要申请一块儿和文件大小一致的内存作为文件的缓存页。为了提升写入性能,系统维护一个缓存页池。系统预先申请一些缓存页放入池中,需要申请时直接从池中获取,释放的缓存页则被还回缓存页池中,避免频繁的申请和释放内存。store.preload.buffer.core.count预加载DirectBuffer的缓存页的核心数量,缓存页池尽量维持池中可用的缓存页不少于这个数量。
store.preload.buffer.max.count | 10 | 缓存页池中缓存页最大数量,超过这个数量的缓存页将被释放。
store.max.message.length | 4 MB | 消息的最大长度,包含消息头。超过这个长度的消息将被拒绝写入。
store.write.request.cache.size | 128 | 每个Partition Group最多缓存的写入请求数量。
store.write.timeout | 3000 ms | 存储写入超时时间。
store.flush.interval | 50 ms | 存储刷盘最大时间间隔。
store.flush.force | true | 在写完每个文件之后,是否调用fsync刷盘。开启后,可以避免服务器宕机掉电而导致数据文件损坏,但会降低写入性能。
store.max.dirty.size | 10 MB | 脏数据的最大长度,如果内存中未刷盘的脏数据长度超过这个值,将阻塞消息写入。
store.disk.full.ratio | 90 | 磁盘空间使用率上限,超过这个上限将拒绝写入,默认为90%。
print.metric.interval | 0 ms | 打印存储监控信息的时间间隔,默认为0, 不打印。
store.max.store.size | 10 GB | 每个分区组最多保留消息的大小,超过这个大小之后,旧的消息将被自动删除。
store.max.store.time | 7 天 | 每个分区组最长保留消息的时长,超时的消息将被自动删除。
store.clean.donot.delete.consumed | true | 不删除已订阅未消费的消息。默认对于已经订阅但还未消费的消息,即使满足删除条件,也不会自动删除。
store.clean.strategy.class | GlobalStorageLimitCleaningStrategy | 存储清理策略。存储清理策略决定JoyQueue如何去删除旧数据。默认清理策略下,系统首先尝试删除所有过期的数据,如果磁盘占用率仍高于清理上限store.disk.usage.max,则继续删除未过期的数据,直到磁盘利用率降低到清理上限或者没有任何数据可以删除。
store.max.store.size | 10 GB | 每个Partition Group最多保留数据大小。
store.max.store.time | 7天 | 数据最长保留时间。
store.clean.keep.unconsumed | true | 是否保留未消费的数据。开启后,清理策略将不会自动删除已订阅未消费(也就是积压的)数据。
store.disk.usage.max | 80(80%) | 磁盘使用率上限,超过这个上限开始清理。
store.disk.usage.safe | 75(%75) | 磁盘使用率下限,每次清理都会尽量将磁盘使用率清理至下限以下。
store.force.restore | true | 系统启动时,如果Broker磁盘上存储的Partition Group与NameServer上的元数据不一致时的处理方式。true: 以NameServer上的元数据为准,强制恢复Broker上数据。false: 如果不一致抛出异常,停止恢复。
nameserver.nsr.name | server | NameServer的启动方式:<br/> server: 默认的启动方式,存储元数据。<br/> thin: 不存储元数据,远程去其它Server读写元数据。
nameservice.serverAddress | 127.0.0.1:50092 | thin模式时,需要连接其它Server获取元数据,在这里配置其它Server的地址。这里配置的Server中,NameServer的启动方式必须是server模式。支持配置多个地址,用英文逗号隔开。例如:192.168.1.1:50092,192.168.1.2:50092。
nameserver.ignite.discoverySpi.localPort | 48500| Ignite服务发现本地端口
Expand All @@ -36,6 +44,25 @@ nameserver.ignite.discoverySpi.networkTimeout | 5000 ms | Ignite服务发现超
nameserver.ignite.discoverySpi.ipFinder.address | 127.0.0.1 | Ignite本地服务发现地址范围,支持多个地址,例如:1.2.3.4,1.2.3.5:47500..47509
nameserver.ignite.communicationSpi.localPort | 48100 | Ignite使用的通信端口号

## JoyQueue Server 内存参数配置

内存相关参数在broker.opts.memory中配置,JoyQueue会尽可能的使用物理内存(堆外内存)作为数据文件的缓存,以提升消息的收发性能,所以即使Broker相对空闲,也会一直保持比较高的内存占用率。

Broker占用的内存包括堆内存和堆外内存二部分。其中,堆外内存占用主要包括:

* JoyQueue管理的缓存页(绝大部分);
* Socket缓冲区(少量);
* 其它堆外内存(非常少);

可以通过jvm参数控制JoyQueue内存使用,这两部分内存都可以通过如下参数来控制:

jvm参数 | 建议值 | 说明
-- | -- | --
-Xms -Xmx | 分配给JoyQueue物理内存*30%,不要低于256MB | 堆内存的最大最小值,建议配置成一样。
-XX:MaxDirectMemorySize | 分配给JoyQueue物理内存 - 堆内存最大值 | 最大堆外内存大小。JoyQueue使用的堆外内存如果超过这个上限,会触发FullGC。建议这个值设置的尽可能大,然后用PreloadBufferPool.MaxMemory来控制实际占用的堆外内存大小。如果未设置-XX:MaxDirectMemorySize,取值为JVM参数-Xmx
-DPreloadBufferPool.MaxMemory | MaxDirectMemorySize * 90% | 可供缓存使用的最大堆外内存。<br/> 1. 如果PreloadBufferPool.MaxMemory设置为数值,直接使用设置值。<br/> 2. 如果PreloadBufferPool.MaxMemory设置为百分比,比如:90%,最大堆外内存 = 物理内存 * 90% - 最大堆内存(由JVM参数-Xmx配置)<br/> 3. 如果PreloadBufferPool.MaxMemory未设置或者设置了非法值,最大堆外内存 = MaxDirectMemorySize * 90%。<br/> 4. 如果设置的最大堆外内存值超过了MaxDirectMemorySize * 90%,则最大堆外内存为:MaxDirectMemorySize * 90%。
-DPreloadBufferPool.WritePageExtraWeightMs | 默认值(1分钟)| 系统在内存满需要换页时,正在写入的页在置换时有额外的权重,这个权重用时间Ms体现。默认是60秒。置换权重 = 上次访问时间戳 + 额外权重,优先从内存中驱逐权重小的页。例如:一个只读的页,上次访问时间戳是T,一个读写页,上次访问时间是T - 60秒,这两个页在置换时有同样的权重。<br/>当Broker上活动Broker数量较多时(可用的最大堆外内存 < 活动的PartitionGroup数量 * 消息文件大小),这个值设置越大,写入页优先级越高,相应的写入性能会更好,但读取性能变差。

## JoyQueue Web 配置

JoyQueue Web的配置文件位于joyqueue-web/conf/application.properties。
Expand Down
2 changes: 1 addition & 1 deletion joyqueue-client/joyqueue-client-all-shaded/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
<parent>
<artifactId>joyqueue-client</artifactId>
<groupId>org.joyqueue</groupId>
<version>4.2.5.4-SNAPSHOT</version>
<version>4.2.7.RC3</version>
</parent>
<modelVersion>4.0.0</modelVersion>

Expand Down
2 changes: 1 addition & 1 deletion joyqueue-client/joyqueue-client-all/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
<parent>
<artifactId>joyqueue-client</artifactId>
<groupId>org.joyqueue</groupId>
<version>4.2.5.4-SNAPSHOT</version>
<version>4.2.7.RC3</version>
</parent>
<modelVersion>4.0.0</modelVersion>

Expand Down
2 changes: 1 addition & 1 deletion joyqueue-client/joyqueue-client-core-shaded/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
<parent>
<artifactId>joyqueue-client</artifactId>
<groupId>org.joyqueue</groupId>
<version>4.2.5.4-SNAPSHOT</version>
<version>4.2.7.RC3</version>
</parent>
<modelVersion>4.0.0</modelVersion>

Expand Down
6 changes: 5 additions & 1 deletion joyqueue-client/joyqueue-client-core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
<parent>
<artifactId>joyqueue-client</artifactId>
<groupId>org.joyqueue</groupId>
<version>4.2.5.4-SNAPSHOT</version>
<version>4.2.7.RC3</version>
</parent>
<modelVersion>4.0.0</modelVersion>

Expand All @@ -40,6 +40,10 @@
<groupId>org.joyqueue</groupId>
<artifactId>joyqueue-network</artifactId>
</dependency>
<dependency>
<groupId>org.joyqueue</groupId>
<artifactId>joyqueue-client-loadbalance-adaptive</artifactId>
</dependency>
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,18 +15,18 @@
*/
package io.openmessaging.joyqueue.config;

import org.joyqueue.client.internal.consumer.config.ConsumerConfig;
import org.joyqueue.client.internal.nameserver.NameServerConfig;
import org.joyqueue.client.internal.producer.config.ProducerConfig;
import org.joyqueue.client.internal.producer.feedback.config.TxFeedbackConfig;
import org.joyqueue.client.internal.transport.config.TransportConfig;
import org.joyqueue.domain.QosLevel;
import io.openmessaging.KeyValue;
import io.openmessaging.joyqueue.domain.JoyQueueConsumerBuiltinKeys;
import io.openmessaging.joyqueue.domain.JoyQueueNameServerBuiltinKeys;
import io.openmessaging.joyqueue.domain.JoyQueueProducerBuiltinKeys;
import io.openmessaging.joyqueue.domain.JoyQueueTransportBuiltinKeys;
import io.openmessaging.joyqueue.domain.JoyQueueTxFeedbackBuiltinKeys;
import org.joyqueue.client.internal.consumer.config.ConsumerConfig;
import org.joyqueue.client.internal.nameserver.NameServerConfig;
import org.joyqueue.client.internal.producer.config.ProducerConfig;
import org.joyqueue.client.internal.producer.feedback.config.TxFeedbackConfig;
import org.joyqueue.client.internal.transport.config.TransportConfig;
import org.joyqueue.domain.QosLevel;

/**
* KeyValueConverter
Expand Down Expand Up @@ -60,9 +60,10 @@ public static TransportConfig convertTransportConfig(KeyValue attributes) {
transportConfig.setHeartbeatInterval(KeyValueHelper.getInt(attributes, JoyQueueTransportBuiltinKeys.HEARTBEAT_INTERVAL, transportConfig.getHeartbeatInterval()));
transportConfig.setHeartbeatTimeout(KeyValueHelper.getInt(attributes, JoyQueueTransportBuiltinKeys.HEARTBEAT_TIMEOUT, transportConfig.getHeartbeatTimeout()));
transportConfig.setSoLinger(KeyValueHelper.getInt(attributes, JoyQueueTransportBuiltinKeys.SO_LINGER, transportConfig.getSoLinger()));
transportConfig.setTcpNoDelay(attributes.getBoolean(JoyQueueTransportBuiltinKeys.CONNECTIONS, transportConfig.isTcpNoDelay()));
transportConfig.setTcpNoDelay(attributes.getBoolean(JoyQueueTransportBuiltinKeys.TCP_NO_DELAY, transportConfig.isTcpNoDelay()));
transportConfig.setKeepAlive(attributes.getBoolean(JoyQueueTransportBuiltinKeys.KEEPALIVE, transportConfig.isKeepAlive()));
transportConfig.setSoTimeout(KeyValueHelper.getInt(attributes, JoyQueueTransportBuiltinKeys.SO_TIMEOUT, transportConfig.getSoTimeout()));
transportConfig.setSendTimeout(KeyValueHelper.getInt(attributes, JoyQueueTransportBuiltinKeys.SEND_TIMEOUT, transportConfig.getSoTimeout()));
transportConfig.setSocketBufferSize(KeyValueHelper.getInt(attributes, JoyQueueTransportBuiltinKeys.SOCKET_BUFFER_SIZE, transportConfig.getSocketBufferSize()));
transportConfig.setMaxOneway(KeyValueHelper.getInt(attributes, JoyQueueTransportBuiltinKeys.MAX_ONEWAY, transportConfig.getMaxOneway()));
transportConfig.setMaxAsync(KeyValueHelper.getInt(attributes, JoyQueueTransportBuiltinKeys.MAX_ASYNC, transportConfig.getMaxAsync()));
Expand Down Expand Up @@ -109,12 +110,14 @@ public static ConsumerConfig convertConsumerConfig(KeyValue attributes) {
consumerConfig.setSessionTimeout(attributes.getLong(JoyQueueConsumerBuiltinKeys.SESSION_TIMEOUT, consumerConfig.getSessionTimeout()));
consumerConfig.setThread(KeyValueHelper.getInt(attributes, JoyQueueConsumerBuiltinKeys.THREAD, consumerConfig.getThread()));
consumerConfig.setFailover(attributes.getBoolean(JoyQueueConsumerBuiltinKeys.FAILOVER, consumerConfig.isFailover()));
consumerConfig.setForceAck(attributes.getBoolean(JoyQueueConsumerBuiltinKeys.FORCE_ACK, consumerConfig.isForceAck()));
consumerConfig.setLoadBalance(attributes.getBoolean(JoyQueueConsumerBuiltinKeys.LOADBALANCE, consumerConfig.isLoadBalance()));
consumerConfig.setLoadBalanceType(KeyValueHelper.getString(attributes, JoyQueueConsumerBuiltinKeys.LOADBALANCE_TYPE, consumerConfig.getLoadBalanceType()));
consumerConfig.setBroadcastGroup(KeyValueHelper.getString(attributes, JoyQueueConsumerBuiltinKeys.BROADCAST_GROUP, consumerConfig.getBroadcastGroup()));
consumerConfig.setBroadcastLocalPath(KeyValueHelper.getString(attributes, JoyQueueConsumerBuiltinKeys.BROADCAST_LOCAL_PATH, consumerConfig.getBroadcastLocalPath()));
consumerConfig.setBroadcastPersistInterval(KeyValueHelper.getInt(attributes, JoyQueueConsumerBuiltinKeys.BROADCAST_PERSIST_INTERVAL, consumerConfig.getBroadcastPersistInterval()));
consumerConfig.setBroadcastIndexExpireTime(KeyValueHelper.getInt(attributes, JoyQueueConsumerBuiltinKeys.BROADCAST_INDEX_EXPIRE_TIME, consumerConfig.getBroadcastIndexExpireTime()));
consumerConfig.setBroadcastIndexAutoReset(KeyValueHelper.getInt(attributes, JoyQueueConsumerBuiltinKeys.BROADCAST_INDEX_AUTO_RESET, consumerConfig.getBroadcastIndexAutoReset()));
return consumerConfig;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
*/
public interface ExtensionConsumer extends Consumer {

// receive
Message receive(short partition, long timeout);

List<Message> batchReceive(short partition, long timeout);
Expand All @@ -37,7 +38,20 @@ public interface ExtensionConsumer extends Consumer {

List<Message> batchReceive(short partition, long index, long timeout);

// ack
void batchAck(List<MessageReceipt> receiptList);

// commit
void commitIndex(short partition, long index);

void commitMaxIndex(short partition);

void commitMaxIndex();

void commitMinIndex(short partition);

void commitMinIndex();

// index
ConsumerIndex getIndex(short partition);
}
Original file line number Diff line number Diff line change
Expand Up @@ -331,6 +331,51 @@ public void batchAck(List<MessageReceipt> receiptList) {
}
}

@Override
public void commitIndex(short partition, long index) {
try {
messageConsumer.commitIndex(partition, index);
} catch (Throwable cause) {
throw handleConsumeException(cause);
}
}

@Override
public void commitMaxIndex(short partition) {
try {
messageConsumer.commitMaxIndex(partition);
} catch (Throwable cause) {
throw handleConsumeException(cause);
}
}

@Override
public void commitMaxIndex() {
try {
messageConsumer.commitMaxIndex();
} catch (Throwable cause) {
throw handleConsumeException(cause);
}
}

@Override
public void commitMinIndex(short partition) {
try {
messageConsumer.commitMinIndex(partition);
} catch (Throwable cause) {
throw handleConsumeException(cause);
}
}

@Override
public void commitMinIndex() {
try {
messageConsumer.commitMinIndex();
} catch (Throwable cause) {
throw handleConsumeException(cause);
}
}

protected OMSRuntimeException handleConsumeException(Throwable cause) {
throw ExceptionConverter.convertConsumeException(cause);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ public interface JoyQueueConsumerBuiltinKeys extends OMSBuiltinKeys {

String FAILOVER = "CONSUMER_FAILOVER";

String FORCE_ACK = "CONSUMER_FORCE_ACK";

String LOADBALANCE = "CONSUMER_LOADBALANCE";

String LOADBALANCE_TYPE = "CONSUMER_LOADBALANCE_TYPE";
Expand All @@ -58,4 +60,6 @@ public interface JoyQueueConsumerBuiltinKeys extends OMSBuiltinKeys {
String BROADCAST_PERSIST_INTERVAL = "CONSUMER_BROADCAST_PERSIST_INTERVAL";

String BROADCAST_INDEX_EXPIRE_TIME = "CONSUMER_BROADCAST_INDEX_EXPIRE_TIME";

String BROADCAST_INDEX_AUTO_RESET = "CONSUMER_BROADCAST_INDEX_AUTO_RESET";
}
Original file line number Diff line number Diff line change
Expand Up @@ -176,4 +176,29 @@ public ConsumerIndex getIndex(short partition) {
public void batchAck(List<MessageReceipt> receiptList) {
delegate.batchAck(receiptList);
}

@Override
public void commitIndex(short partition, long index) {
delegate.commitIndex(partition, index);
}

@Override
public void commitMaxIndex(short partition) {
delegate.commitMaxIndex(partition);
}

@Override
public void commitMaxIndex() {
delegate.commitMaxIndex();
}

@Override
public void commitMinIndex(short partition) {
delegate.commitMinIndex(partition);
}

@Override
public void commitMinIndex() {
delegate.commitMinIndex();
}
}
Loading

0 comments on commit b1e1e7c

Please sign in to comment.