Skip to content

Commit

Permalink
fixed issues [#333]
Browse files Browse the repository at this point in the history
  • Loading branch information
smartloli committed Apr 25, 2020
1 parent 0b30fa1 commit 1c0db7d
Show file tree
Hide file tree
Showing 5 changed files with 75 additions and 1 deletion.
Expand Up @@ -46,6 +46,12 @@ public interface TopicDao {
/** Read topic rank data. */
public List<TopicRank> readTopicRank(Map<String, Object> params);

/** Read all topic rank data. */
public List<TopicRank> getAllTopicRank(Map<String, Object> params);

/** Clean topic rank by logsize or capacity. */
public void removeTopicRank(Map<String, Object> params);

/** Get clean topic state. */
public List<TopicRank> getCleanTopicState(Map<String, Object> params);

Expand Down
Expand Up @@ -70,6 +70,16 @@
where `cluster`=#{cluster} and `tkey`=#{tkey}
order by `tvalue` desc limit 10
</select>

<!-- Get all topic rank -->
<select id="getAllTopicRank" parameterType="map" resultMap="trank">
select * from ke_topic_rank where `cluster`=#{cluster} and `tkey`=#{tkey}
</select>

<!-- Remove topic rank -->
<delete id="removeTopicRank" parameterType="map">
delete from ke_topic_rank where `cluster`=#{cluster} and `topic`=#{topic} and `tkey`=#{tkey}
</delete>

<!-- Get clean topic state -->
<select id="getCleanTopicState" parameterType="map" resultMap="trank">
Expand Down
Expand Up @@ -191,6 +191,27 @@ private void topicCapacityStats() {
String[] clusterAliass = SystemConfigUtils.getPropertyArray("kafka.eagle.zk.cluster.alias", ",");
for (String clusterAlias : clusterAliass) {
List<String> topics = brokerService.topicList(clusterAlias);

// clean up nonexistent topic
Map<String, Object> params = new HashMap<>();
params.put("cluster", clusterAlias);
params.put("tkey", Topic.CAPACITY);
List<TopicRank> trs = dashboardServiceImpl.getAllTopicRank(params);
for (TopicRank tr : trs) {
try {
if (!topics.contains(tr.getTopic())) {
Map<String, Object> clean = new HashMap<>();
clean.put("cluster", clusterAlias);
clean.put("topic", tr.getTopic());
clean.put("tkey", Topic.CAPACITY);
dashboardServiceImpl.removeTopicRank(clean);
}
} catch (Exception e) {
e.printStackTrace();
LOG.error("Failed to clean up nonexistent topic, msg is ", e);
}
}

for (String topic : topics) {
long capacity = kafkaMetricsService.topicCapacity(clusterAlias, topic);
TopicRank topicRank = new TopicRank();
Expand Down Expand Up @@ -235,6 +256,27 @@ private void topicLogsizeStats() {
String[] clusterAliass = SystemConfigUtils.getPropertyArray("kafka.eagle.zk.cluster.alias", ",");
for (String clusterAlias : clusterAliass) {
List<String> topics = brokerService.topicList(clusterAlias);

// clean up nonexistent topic
Map<String, Object> params = new HashMap<>();
params.put("cluster", clusterAlias);
params.put("tkey", Topic.LOGSIZE);
List<TopicRank> trs = dashboardServiceImpl.getAllTopicRank(params);
for (TopicRank tr : trs) {
try {
if (!topics.contains(tr.getTopic())) {
Map<String, Object> clean = new HashMap<>();
clean.put("cluster", clusterAlias);
clean.put("topic", tr.getTopic());
clean.put("tkey", Topic.LOGSIZE);
dashboardServiceImpl.removeTopicRank(clean);
}
} catch (Exception e) {
e.printStackTrace();
LOG.error("Failed to clean up nonexistent topic, msg is ", e);
}
}

for (String topic : topics) {
long logsize = brokerService.getTopicRealLogSize(clusterAlias, topic);
TopicRank topicRank = new TopicRank();
Expand Down
Expand Up @@ -42,7 +42,13 @@ public interface DashboardService {
/** Get topic logsize & capacity. */
public JSONArray getTopicRank(Map<String, Object> params);

/** Get all clean topic list. */
/** Clean up topic metadata that does not exist in zookeeper. */
public List<TopicRank> getAllTopicRank(Map<String, Object> params);

/** Clean topic rank by logsize or capacity. */
public void removeTopicRank(Map<String, Object> params);

/** Get and clean all topic tasks. */
public List<TopicRank> getCleanTopicList(Map<String, Object> params);

/** Write statistics topic rank data from kafka jmx & insert into table. */
Expand Down
Expand Up @@ -205,4 +205,14 @@ public List<TopicRank> getCleanTopicList(Map<String, Object> params) {
return topicDao.getCleanTopicList(params);
}

@Override
public List<TopicRank> getAllTopicRank(Map<String, Object> params) {
return topicDao.getAllTopicRank(params);
}

@Override
public void removeTopicRank(Map<String, Object> params) {
topicDao.removeTopicRank(params);
}

}

0 comments on commit 1c0db7d

Please sign in to comment.