Skip to content

Commit

Permalink
Merge pull request #292 from iamazy/master
Browse files Browse the repository at this point in the history
解决生产消费详情broker页面显示问题
  • Loading branch information
llIlll committed Aug 25, 2020
2 parents 92f5f4a + f8a703e commit e42a47e
Show file tree
Hide file tree
Showing 37 changed files with 620 additions and 49 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -437,11 +437,20 @@ public static class Builder {
//消息过滤规则
private Map</*类型*/String, /*规则*/String> filters = new HashMap<>();

private String region;

private Map<String, String> params;


public static Builder build() {
return new Builder();
}

public Builder region(String region) {
this.region = region;
return this;
}

public Builder nearby(Boolean nearby) {
this.nearby = nearby;
return this;
Expand Down Expand Up @@ -526,9 +535,19 @@ public Builder filters(Map<String, String> filters) {
return this;
}

public Builder params(Map<String, String> params) {
this.params = params;
return this;
}

public ConsumerPolicy create() {
return new ConsumerPolicy(nearby, paused, archive, retry, seq, ackTimeout, batchSize,concurrent, delay,
ConsumerPolicy consumerPolicy = new ConsumerPolicy(nearby, paused, archive, retry, seq, ackTimeout, batchSize, concurrent, delay,
blackList, errTimes, maxPartitionNum, retryReadProbability, filters);
consumerPolicy.setRegion(region);
if (this.params!=null && this.params.size() > 0) {
consumerPolicy.setParams(this.params);
}
return consumerPolicy;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,9 @@ public static class Builder {
// 默认生产超时时间 2秒钟
private Integer timeOut = 1000 * 2;

private Integer qosLevel;
private String region;

public static Builder build() {
return new Builder();
}
Expand Down Expand Up @@ -339,14 +342,27 @@ public Builder weight(String weight) {
return this;
}

public Builder qosLevel(Integer qosLevel) {
this.qosLevel = qosLevel;
return this;
}

public Builder region(String region) {
this.region = region;
return this;
}

public Builder timeout(Integer timeOut) {
this.timeOut = timeOut;
return this;
}

public ProducerPolicy create() {
return new ProducerPolicy(nearby, single, archive, weight,
ProducerPolicy producerPolicy = new ProducerPolicy(nearby, single, archive, weight,
blackList, timeOut);
producerPolicy.setQosLevel(qosLevel);
producerPolicy.setRegion(region);
return producerPolicy;
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
*/
package org.joyqueue.model.domain;

import java.util.Map;

/**
* Created by cyy on 16-9-19.
*/
Expand All @@ -24,6 +26,8 @@ public class BrokerGroup extends LabelBaseModel {
private String name;
private String description;

private Map<String, String> policies;

public BrokerGroup() {
}

Expand Down Expand Up @@ -61,4 +65,11 @@ public void setDescription(String description) {
this.description = description;
}

public Map<String, String> getPolicies() {
return policies;
}

public void setPolicies(Map<String, String> policies) {
this.policies = policies;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import javax.validation.constraints.Max;
import javax.validation.constraints.Min;
import java.util.Map;

public class ConsumerConfig extends BaseNsrModel {
/**
Expand Down Expand Up @@ -88,6 +89,8 @@ public class ConsumerConfig extends BaseNsrModel {
@Min(0)
private int retryDelay;

private String region;

/**
* 指数增加间隔时间
**/
Expand Down Expand Up @@ -121,6 +124,8 @@ public class ConsumerConfig extends BaseNsrModel {

private int limitTraffic;

private Map<String, String> params;


public boolean isNearBy() {
return nearBy;
Expand Down Expand Up @@ -274,6 +279,14 @@ public int getLimitTraffic() {
return limitTraffic;
}

public String getRegion() {
return region;
}

public void setRegion(String region) {
this.region = region;
}

public OffsetMode getOffsetMode() {
return offsetMode;
}
Expand All @@ -282,4 +295,11 @@ public void setOffsetMode(OffsetMode offsetMode) {
this.offsetMode = offsetMode;
}

public Map<String, String> getParams() {
return params;
}

public void setParams(Map<String, String> params) {
this.params = params;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,11 @@ public class ProducerConfig extends BaseNsrModel {

private Integer timeout;

private Integer qosLevel;
private String region;

private Map<String, String> params;

public String getProducerId() {
return producerId;
}
Expand Down Expand Up @@ -125,6 +130,30 @@ public void setTimeout(Integer timeout) {
this.timeout = timeout;
}

public Map<String, String> getParams() {
return params;
}

public void setParams(Map<String, String> params) {
this.params = params;
}

public Integer getQosLevel() {
return qosLevel;
}

public void setQosLevel(Integer qosLevel) {
this.qosLevel = qosLevel;
}

public String getRegion() {
return region;
}

public void setRegion(String region) {
this.region = region;
}

/**
* 获取权重
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,8 @@ public class Topic extends BaseNsrModel {
*/
private int replica = 3;

private org.joyqueue.domain.Topic.TopicPolicy policy;

public Topic() {

}
Expand Down Expand Up @@ -214,6 +216,14 @@ public void setDataCenters(List<String> dataCenters) {
this.dataCenters = dataCenters;
}

public org.joyqueue.domain.Topic.TopicPolicy getPolicy() {
return policy;
}

public void setPolicy(org.joyqueue.domain.Topic.TopicPolicy policy) {
this.policy = policy;
}

@Override
public String toString() {
return "Topic{" +
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
package org.joyqueue.repository.mybatis.handler;


import com.alibaba.fastjson.JSON;
import org.apache.commons.lang3.StringUtils;
import org.apache.ibatis.type.BaseTypeHandler;
import org.apache.ibatis.type.JdbcType;

import java.sql.CallableStatement;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.Collections;
import java.util.Map;

/**
* @author jiangnan53
* @date 2020/6/22
**/
public class MapTypeHandler extends BaseTypeHandler<Map<String, String>> {

private Class<Map<String, String>> clazz;

public MapTypeHandler(Class<Map<String, String>> clazz) {
this.clazz = clazz;
}

public MapTypeHandler() {}

@Override
public void setNonNullParameter(PreparedStatement preparedStatement, int i, Map o, JdbcType jdbcType) throws SQLException {
if (o != null) {
preparedStatement.setString(i, JSON.toJSONString(o));
} else {
preparedStatement.setString(i, null);
}
}

@Override
public Map<String, String> getNullableResult(ResultSet resultSet, String s) throws SQLException {
if (StringUtils.isNotBlank(s)) {
return JSON.parseObject(resultSet.getString(s), clazz);
}
return Collections.emptyMap();
}

@Override
public Map<String, String> getNullableResult(ResultSet resultSet, int i) throws SQLException {
String result = resultSet.getString(i);
if (StringUtils.isNotBlank(result)) {
return JSON.parseObject(result, clazz);
}
return Collections.emptyMap();
}

@Override
public Map<String, String> getNullableResult(CallableStatement callableStatement, int i) throws SQLException {
String result = callableStatement.getString(i);
if (StringUtils.isNotBlank(result)) {
return JSON.parseObject(result, clazz);
}
return Collections.emptyMap();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,26 @@
"http://mybatis.org/dtd/mybatis-3-mapper.dtd">

<mapper namespace="org.joyqueue.repository.BrokerGroupRepository">

<resultMap id="baseResultMap" type="BrokerGroup">
<result property="id" column="id" jdbcType="BIGINT"/>
<result property="name" column="name" jdbcType="VARCHAR"/>
<result property="policies" column="policies" jdbcType="VARCHAR" typeHandler="org.joyqueue.repository.mybatis.handler.MapTypeHandler"/>
<result property="code" column="code" jdbcType="VARCHAR"/>
<result property="labels" column="labels" jdbcType="VARCHAR"/>
<result property="createTime" column="create_time" jdbcType="TIMESTAMP"/>
<result property="createBy.id" column="create_by" jdbcType="BIGINT"/>
<result property="updateTime" column="update_time" jdbcType="TIMESTAMP"/>
<result property="updateBy.id" column="update_by" jdbcType="BIGINT"/>
</resultMap>

<sql id="select">
SELECT
id,
`name`,
code,
labels,
policies,
create_time,
create_by as `createBy.id`,
update_time,
Expand All @@ -24,6 +38,7 @@
code,
`name`,
labels,
policies,
create_time,
create_by,
update_time,
Expand All @@ -33,6 +48,7 @@
#{code},
#{name},
#{labels},
#{policies},
#{createTime},
#{createBy.id},
#{updateTime},
Expand Down Expand Up @@ -70,6 +86,7 @@
SET
code=#{code},
`name`=#{name},
`policies`=#{policies},
update_time=#{updateTime},
update_by=#{updateBy.id},
status=#{status}
Expand All @@ -84,7 +101,8 @@
SET
update_time=#{updateTime},
update_by=#{updateBy.id},
labels=#{labels}
labels=#{labels},
policies =#{policies},
WHERE
id=#{id}
AND status !=-1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@
<package name="org.joyqueue.model.query" />
<package name="org.joyqueue.model" />
</typeAliases>
<typeHandlers>
<typeHandler handler="org.joyqueue.repository.mybatis.handler.MapTypeHandler"/>
</typeHandlers>
<plugins>
<plugin interceptor="org.joyqueue.repository.mybatis.interceptor.PageStatementInterceptor"/>
<plugin interceptor="org.joyqueue.repository.mybatis.interceptor.PageResultInterceptor"/>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@
<package name="org.joyqueue.model.query" />
<package name="org.joyqueue.model" />
</typeAliases>
<typeHandlers>
<typeHandler handler="org.joyqueue.repository.mybatis.handler.MapTypeHandler"/>
</typeHandlers>
<plugins>
<plugin interceptor="org.joyqueue.repository.mybatis.interceptor.PageStatementInterceptor"/>
<plugin interceptor="org.joyqueue.repository.mybatis.interceptor.PageResultInterceptor"/>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ protected org.joyqueue.domain.Consumer forward(Consumer consumer) {
.paused(consumer.getConfig().isPaused())
.retry(consumer.getConfig().isRetry())
.filters(consumer.getConfig().getFilters())
.region(consumer.getConfig().getRegion())
.create());
nsrConsumer.setRetryPolicy(RetryPolicy.Builder.build()
.maxRetrys(consumer.getConfig().getMaxRetrys())
Expand Down Expand Up @@ -103,6 +104,7 @@ protected Consumer backward(org.joyqueue.domain.Consumer nsrConsumer) {
consumerConfig.setDelay(consumerPolicy.getDelay());
consumerConfig.setPaused(consumerPolicy.getPaused());
consumerConfig.setRetry(consumerPolicy.getRetry());
consumerConfig.setRegion(consumerPolicy.getRegion());
Map<String,String> map = consumerPolicy.getFilters();
if (map !=null) {
List<String> filterList = map.entrySet().stream().map(entry -> (entry.getKey() + ":" + entry.getValue())).collect(Collectors.toList());
Expand Down
Loading

0 comments on commit e42a47e

Please sign in to comment.