Skip to content

Commit

Permalink
Merge pull request #294 from llIlll/nameserver-sql
Browse files Browse the repository at this point in the history
Nameserver sql
  • Loading branch information
llIlll committed Aug 25, 2020
2 parents f146638 + 64ae173 commit 92f5f4a
Show file tree
Hide file tree
Showing 103 changed files with 2,089 additions and 916 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,8 @@ public TopicPolicy getPolicy() {
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
if (o == null || !(o instanceof Topic)) return false;

Topic topic = (Topic) o;
return partitions == topic.partitions &&
Objects.equals(name, topic.name) &&
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package org.joyqueue.toolkit.util;

/**
* ConvertUtils
* author: gaohaoxiang
* date: 2020/8/13
*/
public class ConvertUtils {

public static <T> T convert(Object value, Class<T> type) {
if (Integer.class.equals(type) || int.class.equals(type)) {
return (T) Integer.valueOf(String.valueOf(value));
} else if (Short.class.equals(type) || short.class.equals(type)) {
return (T) Short.valueOf(String.valueOf(value));
} else if (Byte.class.equals(type) || byte.class.equals(type)) {
return (T) Byte.valueOf(String.valueOf(value));
} else if (Float.class.equals(type) || float.class.equals(type)) {
return (T) Float.valueOf(String.valueOf(value));
} else if (Double.class.equals(type) || double.class.equals(type)) {
return (T) Double.valueOf(String.valueOf(value));
} else if (Long.class.equals(type) || long.class.equals(type)) {
return (T) Long.valueOf(String.valueOf(value));
} else if (Boolean.class.equals(type) || boolean.class.equals(type)) {
return (T) Boolean.valueOf(String.valueOf(value));
} else if (String.class.equals(type)) {
return (T) String.valueOf(value);
}
throw new UnsupportedOperationException(type.getName());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -270,13 +270,11 @@ public TopicConfig getTopicConfig(TopicName topic) {
* @return
*/
public List<TopicConfig> getTopics() {
List<TopicConfig> result = Lists.newLinkedList();
for (Map.Entry<String, TopicConfig> entry : localCache.getTopicConfigCache().entrySet()) {
if (entry.getValue().isReplica(getBrokerId())) {
result.add(entry.getValue());
}
Map<TopicName, TopicConfig> topics = nameService.getTopicConfigByBroker(getBrokerId());
if (MapUtils.isEmpty(topics)) {
return Collections.emptyList();
}
return result;
return Lists.newArrayList(topics.values());
}

/**
Expand Down Expand Up @@ -881,15 +879,7 @@ public List<Producer> getLocalProducersByTopic(TopicName topic) {
}

public List<Consumer> getLocalConsumersByTopic(TopicName topic) {
Map<String, MetaDataLocalCache.CacheConsumer> consumers = localCache.getTopicConsumers(topic);
if (MapUtils.isEmpty(consumers)) {
return Collections.emptyList();
}
List<Consumer> result = Lists.newLinkedList();
for (Map.Entry<String, MetaDataLocalCache.CacheConsumer> entry : consumers.entrySet()) {
result.add(entry.getValue().getConsumer());
}
return result;
return Lists.newArrayList(nameService.getConsumerByTopic(topic));
}

public AppToken getAppToken(String app, String token) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,8 +126,9 @@ public interface ConsumerManageService {

/**
* 初始化消费ack
* @param right
* @return
* @throws JoyQueueException
*/
String initConsumerAckIndexes() throws JoyQueueException;
String initConsumerAckIndexes(boolean right) throws JoyQueueException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -113,8 +113,8 @@ public long getAckIndexByTime(String topic, String app, short partition, long ti
}

@Override
public String initConsumerAckIndexes() throws JoyQueueException {
return consumerManageService.initConsumerAckIndexes();
public String initConsumerAckIndexes(boolean right) throws JoyQueueException {
return consumerManageService.initConsumerAckIndexes(right);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,9 @@ public boolean setAckIndexByTime(String topic, String app, short partition, long
for (StoreManagementService.PartitionGroupMetric partitionGroupMetric : topicMetric.getPartitionGroupMetrics()) {
for (StoreManagementService.PartitionMetric partitionMetric : partitionGroupMetric.getPartitionMetrics()) {
if (partitionMetric.getPartition() == partition) {
if (!clusterManager.isLeader(topic, partitionMetric.getPartition())) {
continue;
}
return setPartitionAckIndexByTime(topic, app, partitionGroupMetric.getPartitionGroup(), partitionMetric.getPartition(), timestamp);
}
}
Expand All @@ -143,6 +146,9 @@ public boolean setAckIndexesByTime(String topic, String app, long timestamp) thr
StoreManagementService.TopicMetric topicMetric = storeManagementService.topicMetric(topic);
for (StoreManagementService.PartitionGroupMetric partitionGroupMetric : topicMetric.getPartitionGroupMetrics()) {
for (StoreManagementService.PartitionMetric partitionMetric : partitionGroupMetric.getPartitionMetrics()) {
if (!clusterManager.isLeader(topic, partitionMetric.getPartition())) {
continue;
}
setPartitionAckIndexByTime(topic, app, partitionGroupMetric.getPartitionGroup(), partitionMetric.getPartition(), timestamp);
}
}
Expand Down Expand Up @@ -180,7 +186,7 @@ public List<PartitionAckMonitorInfo> getTopicAckIndexByTime(String topic, String
}

@Override
public String initConsumerAckIndexes() throws JoyQueueException {
public String initConsumerAckIndexes(boolean right) throws JoyQueueException {
Map<String, List<String>> result = Maps.newHashMap();
for (TopicConfig topicConfig : clusterManager.getTopics()) {
List<String> apps = Lists.newLinkedList();
Expand All @@ -192,10 +198,18 @@ public String initConsumerAckIndexes() throws JoyQueueException {
}

for (PartitionGroup partitionGroup : clusterManager.getLocalPartitionGroups(topicConfig)) {
if (!clusterManager.isLeader(partitionGroup.getTopic(), partitionGroup.getGroup())) {
continue;
}
PartitionGroupStore store = storeService.getStore(partitionGroup.getTopic().getFullName(), partitionGroup.getGroup());
if (store == null) {
continue;
}
for (Short partition : partitionGroup.getPartitions()) {
for (org.joyqueue.domain.Consumer consumer : consumers) {
apps.add(consumer.getApp());
consume.setAckIndex(new Consumer(consumer.getTopic().getFullName(), consumer.getApp()), partition, 0);
consume.setAckIndex(new Consumer(consumer.getTopic().getFullName(), consumer.getApp()), partition,
(right ? store.getRightIndex(partition) : store.getLeftIndex(partition)));
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@
handlers="brokerManageService.getTopicAckIndexByTime"/>
<route path="/manage/topic/:topic/app/:app/ackByTime" inherit="put"
handlers="brokerManageService.setAckIndexesByTime"/>
<route path="/manage/consumer/ack/init" inherit="post"
<route path="/manage/consumers/ack/init" inherit="post"
handlers="brokerManageService.initConsumerAckIndexes"/>

<!-- store -->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,12 @@
<modelVersion>4.0.0</modelVersion>

<artifactId>joyqueue-nsr-composition</artifactId>
<name>JoyQueue-NamingService-Composition</name>

<dependencies>
<dependency>
<groupId>org.joyqueue</groupId>
<artifactId>joyqueue-nsr-ignite</artifactId>
<artifactId>joyqueue-nsr-sql</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,8 @@ public class CompositionInternalServiceManager extends Service {

private CompositionConfig config;
private InternalServiceProvider serviceProvider;
private InternalServiceProvider igniteServiceProvider;
private InternalServiceProvider journalkeeperServiceProvider;
private InternalServiceProvider sourceServiceProvider;
private InternalServiceProvider targetServiceProvider;

private CompositionAppTokenInternalService compositionAppTokenInternalService;
private CompositionBrokerInternalService compositionBrokerInternalService;
Expand All @@ -67,40 +67,40 @@ public class CompositionInternalServiceManager extends Service {
private CompositionTransactionInternalService compositionTransactionInternalService;
private CompositionClusterInternalService compositionClusterInternalService;

public CompositionInternalServiceManager(CompositionConfig config, InternalServiceProvider serviceProvider, InternalServiceProvider igniteServiceProvider,
InternalServiceProvider journalkeeperServiceProvider) {
public CompositionInternalServiceManager(CompositionConfig config, InternalServiceProvider serviceProvider, InternalServiceProvider sourceServiceProvider,
InternalServiceProvider targetServiceProvider) {
this.config = config;
this.serviceProvider = serviceProvider;
this.igniteServiceProvider = igniteServiceProvider;
this.journalkeeperServiceProvider = journalkeeperServiceProvider;
this.sourceServiceProvider = sourceServiceProvider;
this.targetServiceProvider = targetServiceProvider;
}

@Override
protected void validate() throws Exception {
compositionAppTokenInternalService = new CompositionAppTokenInternalService(config, igniteServiceProvider.getService(AppTokenInternalService.class),
journalkeeperServiceProvider.getService(AppTokenInternalService.class));
compositionBrokerInternalService = new CompositionBrokerInternalService(config, igniteServiceProvider.getService(BrokerInternalService.class),
journalkeeperServiceProvider.getService(BrokerInternalService.class));
compositionConfigInternalService = new CompositionConfigInternalService(config, igniteServiceProvider.getService(ConfigInternalService.class),
journalkeeperServiceProvider.getService(ConfigInternalService.class));
compositionConsumerInternalService = new CompositionConsumerInternalService(config, igniteServiceProvider.getService(ConsumerInternalService.class),
journalkeeperServiceProvider.getService(ConsumerInternalService.class));
compositionDataCenterInternalService = new CompositionDataCenterInternalService(config, igniteServiceProvider.getService(DataCenterInternalService.class),
journalkeeperServiceProvider.getService(DataCenterInternalService.class));
compositionNamespaceInternalService = new CompositionNamespaceInternalService(config, igniteServiceProvider.getService(NamespaceInternalService.class),
journalkeeperServiceProvider.getService(NamespaceInternalService.class));
compositionPartitionGroupInternalService = new CompositionPartitionGroupInternalService(config, igniteServiceProvider.getService(PartitionGroupInternalService.class),
journalkeeperServiceProvider.getService(PartitionGroupInternalService.class));
compositionPartitionGroupReplicaInternalService = new CompositionPartitionGroupReplicaInternalService(config, igniteServiceProvider.getService(PartitionGroupReplicaInternalService.class),
journalkeeperServiceProvider.getService(PartitionGroupReplicaInternalService.class));
compositionProducerInternalService = new CompositionProducerInternalService(config, igniteServiceProvider.getService(ProducerInternalService.class),
journalkeeperServiceProvider.getService(ProducerInternalService.class));
compositionTopicInternalService = new CompositionTopicInternalService(config, igniteServiceProvider.getService(TopicInternalService.class),
journalkeeperServiceProvider.getService(TopicInternalService.class));
compositionTransactionInternalService = new CompositionTransactionInternalService(config, igniteServiceProvider.getService(TransactionInternalService.class),
journalkeeperServiceProvider.getService(TransactionInternalService.class));
compositionAppTokenInternalService = new CompositionAppTokenInternalService(config, sourceServiceProvider.getService(AppTokenInternalService.class),
targetServiceProvider.getService(AppTokenInternalService.class));
compositionBrokerInternalService = new CompositionBrokerInternalService(config, sourceServiceProvider.getService(BrokerInternalService.class),
targetServiceProvider.getService(BrokerInternalService.class));
compositionConfigInternalService = new CompositionConfigInternalService(config, sourceServiceProvider.getService(ConfigInternalService.class),
targetServiceProvider.getService(ConfigInternalService.class));
compositionConsumerInternalService = new CompositionConsumerInternalService(config, sourceServiceProvider.getService(ConsumerInternalService.class),
targetServiceProvider.getService(ConsumerInternalService.class));
compositionDataCenterInternalService = new CompositionDataCenterInternalService(config, sourceServiceProvider.getService(DataCenterInternalService.class),
targetServiceProvider.getService(DataCenterInternalService.class));
compositionNamespaceInternalService = new CompositionNamespaceInternalService(config, sourceServiceProvider.getService(NamespaceInternalService.class),
targetServiceProvider.getService(NamespaceInternalService.class));
compositionPartitionGroupInternalService = new CompositionPartitionGroupInternalService(config, sourceServiceProvider.getService(PartitionGroupInternalService.class),
targetServiceProvider.getService(PartitionGroupInternalService.class));
compositionPartitionGroupReplicaInternalService = new CompositionPartitionGroupReplicaInternalService(config, sourceServiceProvider.getService(PartitionGroupReplicaInternalService.class),
targetServiceProvider.getService(PartitionGroupReplicaInternalService.class));
compositionProducerInternalService = new CompositionProducerInternalService(config, sourceServiceProvider.getService(ProducerInternalService.class),
targetServiceProvider.getService(ProducerInternalService.class));
compositionTopicInternalService = new CompositionTopicInternalService(config, sourceServiceProvider.getService(TopicInternalService.class),
targetServiceProvider.getService(TopicInternalService.class));
compositionTransactionInternalService = new CompositionTransactionInternalService(config, sourceServiceProvider.getService(TransactionInternalService.class),
targetServiceProvider.getService(TransactionInternalService.class));
compositionClusterInternalService = new CompositionClusterInternalService(config, null,
journalkeeperServiceProvider.getService(ClusterInternalService.class));
targetServiceProvider.getService(ClusterInternalService.class));
}

public <T> T getService(Class<T> service) {
Expand Down
Loading

0 comments on commit 92f5f4a

Please sign in to comment.