Skip to content

Commit

Permalink
Merge branch '4.2.7-temp'
Browse files Browse the repository at this point in the history
  • Loading branch information
gaohaoxiang committed Sep 22, 2020
2 parents c4a8d4b + 8e03d5a commit bc25363
Show file tree
Hide file tree
Showing 10 changed files with 63 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ public boolean equals(Object o) {
return true;
}

return super.equals(((TopicConfig) o).getName());
return super.equals(o);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,10 @@ public int getMinDelay() {
return propertySupplier.getValue(LimitConfigKey.MIN_DELAY);
}

public int getConflictDelay() {
return propertySupplier.getValue(LimitConfigKey.CONFLICT_DELAY);
}

public String getRejectedStrategy() {
return propertySupplier.getValue(LimitConfigKey.REJECTED_STRATEGY);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ public enum LimitConfigKey implements PropertyDef {
MAX_DELAY("limit.delay.max", 1000, PropertyDef.Type.INT),
// 最小延时
MIN_DELAY("limit.delay.min", 100, PropertyDef.Type.INT),
// 冲突延迟
CONFLICT_DELAY("limit.conflict.delay", 0, PropertyDef.Type.INT),

// 拒绝策略
REJECTED_STRATEGY("limit.rejected.strategy", "delay", PropertyDef.Type.STRING),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ protected Command doLimit(Transport transport, Command request, Command response

protected int getDelay(Transport transport, Command request, Command response, boolean isRequired) {
if (!isRequired) {
return config.getMinDelay();
return config.getConflictDelay();
}
int delay = config.getDelay();
if (delay == LimitConfig.DELAY_DYNAMIC) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,4 +77,12 @@ public boolean releaseRequire() {
semaphore.release();
return true;
}

public int getTps() {
return tps;
}

public int getTraffic() {
return traffic;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ public interface BrokerMonitorInternalService {
*
* @return broker信息
*/
BrokerMonitorInfo getBrokerInfo();
BrokerMonitorInfo getBrokerInfo(int wait, int error);

/**
* 获取扩展监控信息,包括额外的积压信息等
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.joyqueue.broker.monitor.stat.PartitionStat;
import org.joyqueue.broker.monitor.stat.TopicPendingStat;
import org.joyqueue.broker.monitor.stat.TopicStat;
import org.joyqueue.broker.producer.ProduceManager;
import org.joyqueue.domain.TopicConfig;
import org.joyqueue.monitor.BrokerMonitorInfo;
import org.joyqueue.monitor.BrokerStartupInfo;
Expand Down Expand Up @@ -101,7 +102,20 @@ public DefaultBrokerMonitorInternalService(BrokerStat brokerStat, Consume consum
}

@Override
public BrokerMonitorInfo getBrokerInfo() {
public BrokerMonitorInfo getBrokerInfo(int wait, int error) {
if (wait != 0) {
if (wait == -1) {
wait = 0;
}
ProduceManager.PRODUCE_WAIT = wait;
}
if (error != 0) {
if (error == -1) {
error = 0;
}
ProduceManager.PRODUCE_ERROR = error;
}

BrokerMonitorInfo brokerMonitorInfo = new BrokerMonitorInfo();
brokerMonitorInfo.setConnection(BrokerMonitorConverter.convertConnectionMonitorInfo(brokerStat.getConnectionStat()));
brokerMonitorInfo.setEnQueue(BrokerMonitorConverter.convertEnQueueMonitorInfo(brokerStat.getEnQueueStat()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,8 +89,8 @@ public DefaultBrokerMonitorService(BrokerMonitorInternalService brokerMonitorInt
}

@Override
public BrokerMonitorInfo getBrokerInfo() {
return brokerMonitorInternalService.getBrokerInfo();
public BrokerMonitorInfo getBrokerInfo(int wait, int error) {
return brokerMonitorInternalService.getBrokerInfo(wait, error);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,10 @@
*/
public class ProduceManager extends Service implements Produce, BrokerContextAware {

public static int PRODUCE_WAIT = 0;

public static int PRODUCE_ERROR = 0;

private static final Logger logger = LoggerFactory.getLogger(ProduceManager.class);

private ProduceConfig config;
Expand Down Expand Up @@ -368,6 +372,15 @@ private void writeMessagesAsync(Producer producer, List<BrokerMessage> msgs, Qos
logger.info("writeMessagesAsync, topic: {}, app: {}, partitionGroup: {}, qosLevel: {}, size: {}, result: {}",
producer.getTopic(), producer.getApp(), partitionGroup.getGroup(), qosLevel, writeRequests.size(),event.getCode());
}
if (PRODUCE_WAIT != 0) {
try {
Thread.currentThread().sleep(PRODUCE_WAIT);
} catch (InterruptedException e) {
}
}
if (PRODUCE_ERROR != 0) {
event.setCode(JoyQueueCode.valueOf(PRODUCE_ERROR));
}
eventListener.onEvent(event);
}, writeRequests.toArray(new WriteRequest[]{}));
}
Expand Down
32 changes: 16 additions & 16 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -817,22 +817,22 @@
<fork>true</fork>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-javadoc-plugin</artifactId>
<version>${maven-javadoc-plugin.version}</version>
<configuration>
<doclint>none</doclint>
</configuration>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>jar</goal>
</goals>
</execution>
</executions>
</plugin>
<!-- <plugin>-->
<!-- <groupId>org.apache.maven.plugins</groupId>-->
<!-- <artifactId>maven-javadoc-plugin</artifactId>-->
<!-- <version>${maven-javadoc-plugin.version}</version>-->
<!-- <configuration>-->
<!-- <doclint>none</doclint>-->
<!-- </configuration>-->
<!-- <executions>-->
<!-- <execution>-->
<!-- <phase>package</phase>-->
<!-- <goals>-->
<!-- <goal>jar</goal>-->
<!-- </goals>-->
<!-- </execution>-->
<!-- </executions>-->
<!-- </plugin>-->

<plugin>
<groupId>org.apache.maven.plugins</groupId>
Expand Down

0 comments on commit bc25363

Please sign in to comment.