Skip to content

Commit

Permalink
调整composition模块,可以支持不同数据源
Browse files Browse the repository at this point in the history
  • Loading branch information
gaohaoxiang committed Aug 14, 2020
1 parent d222eba commit a5393a1
Show file tree
Hide file tree
Showing 26 changed files with 697 additions and 501 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,8 @@ public TopicPolicy getPolicy() {
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
if (o == null || !(o instanceof Topic)) return false;

Topic topic = (Topic) o;
return partitions == topic.partitions &&
Objects.equals(name, topic.name) &&
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,8 @@ public class CompositionInternalServiceManager extends Service {

private CompositionConfig config;
private InternalServiceProvider serviceProvider;
private InternalServiceProvider igniteServiceProvider;
private InternalServiceProvider journalkeeperServiceProvider;
private InternalServiceProvider sourceServiceProvider;
private InternalServiceProvider targetServiceProvider;

private CompositionAppTokenInternalService compositionAppTokenInternalService;
private CompositionBrokerInternalService compositionBrokerInternalService;
Expand All @@ -67,40 +67,40 @@ public class CompositionInternalServiceManager extends Service {
private CompositionTransactionInternalService compositionTransactionInternalService;
private CompositionClusterInternalService compositionClusterInternalService;

public CompositionInternalServiceManager(CompositionConfig config, InternalServiceProvider serviceProvider, InternalServiceProvider igniteServiceProvider,
InternalServiceProvider journalkeeperServiceProvider) {
public CompositionInternalServiceManager(CompositionConfig config, InternalServiceProvider serviceProvider, InternalServiceProvider sourceServiceProvider,
InternalServiceProvider targetServiceProvider) {
this.config = config;
this.serviceProvider = serviceProvider;
this.igniteServiceProvider = igniteServiceProvider;
this.journalkeeperServiceProvider = journalkeeperServiceProvider;
this.sourceServiceProvider = sourceServiceProvider;
this.targetServiceProvider = targetServiceProvider;
}

@Override
protected void validate() throws Exception {
compositionAppTokenInternalService = new CompositionAppTokenInternalService(config, igniteServiceProvider.getService(AppTokenInternalService.class),
journalkeeperServiceProvider.getService(AppTokenInternalService.class));
compositionBrokerInternalService = new CompositionBrokerInternalService(config, igniteServiceProvider.getService(BrokerInternalService.class),
journalkeeperServiceProvider.getService(BrokerInternalService.class));
compositionConfigInternalService = new CompositionConfigInternalService(config, igniteServiceProvider.getService(ConfigInternalService.class),
journalkeeperServiceProvider.getService(ConfigInternalService.class));
compositionConsumerInternalService = new CompositionConsumerInternalService(config, igniteServiceProvider.getService(ConsumerInternalService.class),
journalkeeperServiceProvider.getService(ConsumerInternalService.class));
compositionDataCenterInternalService = new CompositionDataCenterInternalService(config, igniteServiceProvider.getService(DataCenterInternalService.class),
journalkeeperServiceProvider.getService(DataCenterInternalService.class));
compositionNamespaceInternalService = new CompositionNamespaceInternalService(config, igniteServiceProvider.getService(NamespaceInternalService.class),
journalkeeperServiceProvider.getService(NamespaceInternalService.class));
compositionPartitionGroupInternalService = new CompositionPartitionGroupInternalService(config, igniteServiceProvider.getService(PartitionGroupInternalService.class),
journalkeeperServiceProvider.getService(PartitionGroupInternalService.class));
compositionPartitionGroupReplicaInternalService = new CompositionPartitionGroupReplicaInternalService(config, igniteServiceProvider.getService(PartitionGroupReplicaInternalService.class),
journalkeeperServiceProvider.getService(PartitionGroupReplicaInternalService.class));
compositionProducerInternalService = new CompositionProducerInternalService(config, igniteServiceProvider.getService(ProducerInternalService.class),
journalkeeperServiceProvider.getService(ProducerInternalService.class));
compositionTopicInternalService = new CompositionTopicInternalService(config, igniteServiceProvider.getService(TopicInternalService.class),
journalkeeperServiceProvider.getService(TopicInternalService.class));
compositionTransactionInternalService = new CompositionTransactionInternalService(config, igniteServiceProvider.getService(TransactionInternalService.class),
journalkeeperServiceProvider.getService(TransactionInternalService.class));
compositionAppTokenInternalService = new CompositionAppTokenInternalService(config, sourceServiceProvider.getService(AppTokenInternalService.class),
targetServiceProvider.getService(AppTokenInternalService.class));
compositionBrokerInternalService = new CompositionBrokerInternalService(config, sourceServiceProvider.getService(BrokerInternalService.class),
targetServiceProvider.getService(BrokerInternalService.class));
compositionConfigInternalService = new CompositionConfigInternalService(config, sourceServiceProvider.getService(ConfigInternalService.class),
targetServiceProvider.getService(ConfigInternalService.class));
compositionConsumerInternalService = new CompositionConsumerInternalService(config, sourceServiceProvider.getService(ConsumerInternalService.class),
targetServiceProvider.getService(ConsumerInternalService.class));
compositionDataCenterInternalService = new CompositionDataCenterInternalService(config, sourceServiceProvider.getService(DataCenterInternalService.class),
targetServiceProvider.getService(DataCenterInternalService.class));
compositionNamespaceInternalService = new CompositionNamespaceInternalService(config, sourceServiceProvider.getService(NamespaceInternalService.class),
targetServiceProvider.getService(NamespaceInternalService.class));
compositionPartitionGroupInternalService = new CompositionPartitionGroupInternalService(config, sourceServiceProvider.getService(PartitionGroupInternalService.class),
targetServiceProvider.getService(PartitionGroupInternalService.class));
compositionPartitionGroupReplicaInternalService = new CompositionPartitionGroupReplicaInternalService(config, sourceServiceProvider.getService(PartitionGroupReplicaInternalService.class),
targetServiceProvider.getService(PartitionGroupReplicaInternalService.class));
compositionProducerInternalService = new CompositionProducerInternalService(config, sourceServiceProvider.getService(ProducerInternalService.class),
targetServiceProvider.getService(ProducerInternalService.class));
compositionTopicInternalService = new CompositionTopicInternalService(config, sourceServiceProvider.getService(TopicInternalService.class),
targetServiceProvider.getService(TopicInternalService.class));
compositionTransactionInternalService = new CompositionTransactionInternalService(config, sourceServiceProvider.getService(TransactionInternalService.class),
targetServiceProvider.getService(TransactionInternalService.class));
compositionClusterInternalService = new CompositionClusterInternalService(config, null,
journalkeeperServiceProvider.getService(ClusterInternalService.class));
targetServiceProvider.getService(ClusterInternalService.class));
}

public <T> T getService(Class<T> service) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,8 @@ public class CompositionInternalServiceProvider extends Service implements Inter

private PropertySupplier propertySupplier;
private InternalServiceProvider serviceProvider;
private InternalServiceProvider igniteServiceProvider;
private InternalServiceProvider journalkeeperServiceProvider;
private InternalServiceProvider sourceServiceProvider;
private InternalServiceProvider targetServiceProvider;
private CompositionInternalServiceManager compositionInternalServiceManager;

private CompositionConfig config;
Expand All @@ -56,31 +56,31 @@ protected void validate() {
for (InternalServiceProvider extension : extensions) {
if (extension.getClass().equals(CompositionInternalServiceProvider.class)) {
continue;
} else if (extension.getClass().getName().contains("ignite")) {
igniteServiceProvider = extension;
} else if (extension.getClass().getName().contains("journalkeeper")) {
journalkeeperServiceProvider = extension;
} else if (extension.getClass().getName().contains(config.getSource())) {
sourceServiceProvider = extension;
} else if (extension.getClass().getName().contains(config.getTarget())) {
targetServiceProvider = extension;
}
serviceProvider = extension;
}
Preconditions.checkArgument(serviceProvider != null, "serviceProvider not exist");
this.compositionInternalServiceManager = new CompositionInternalServiceManager(config, serviceProvider, igniteServiceProvider, journalkeeperServiceProvider);
this.compositionInternalServiceManager = new CompositionInternalServiceManager(config, serviceProvider, sourceServiceProvider, targetServiceProvider);
}

@Override
protected void doStart() throws Exception {
if (igniteServiceProvider != null && journalkeeperServiceProvider != null) {
if (journalkeeperServiceProvider instanceof PropertySupplierAware) {
((PropertySupplierAware) journalkeeperServiceProvider).setSupplier(propertySupplier);
if (sourceServiceProvider != null && targetServiceProvider != null) {
if (targetServiceProvider instanceof PropertySupplierAware) {
((PropertySupplierAware) targetServiceProvider).setSupplier(propertySupplier);
}
if (journalkeeperServiceProvider instanceof LifeCycle) {
((LifeCycle) journalkeeperServiceProvider).start();
if (targetServiceProvider instanceof LifeCycle) {
((LifeCycle) targetServiceProvider).start();
}
if (igniteServiceProvider instanceof PropertySupplierAware) {
((PropertySupplierAware) igniteServiceProvider).setSupplier(propertySupplier);
if (sourceServiceProvider instanceof PropertySupplierAware) {
((PropertySupplierAware) sourceServiceProvider).setSupplier(propertySupplier);
}
if (igniteServiceProvider instanceof LifeCycle) {
((LifeCycle) igniteServiceProvider).start();
if (sourceServiceProvider instanceof LifeCycle) {
((LifeCycle) sourceServiceProvider).start();
}
compositionInternalServiceManager.start();
} else {
Expand All @@ -95,12 +95,12 @@ protected void doStart() throws Exception {

@Override
protected void doStop() {
if (igniteServiceProvider != null && journalkeeperServiceProvider != null) {
if (journalkeeperServiceProvider instanceof LifeCycle) {
((LifeCycle) journalkeeperServiceProvider).stop();
if (sourceServiceProvider != null && targetServiceProvider != null) {
if (targetServiceProvider instanceof LifeCycle) {
((LifeCycle) targetServiceProvider).stop();
}
if (igniteServiceProvider instanceof LifeCycle) {
((LifeCycle) igniteServiceProvider).stop();
if (sourceServiceProvider instanceof LifeCycle) {
((LifeCycle) sourceServiceProvider).stop();
}
} else {
if (serviceProvider instanceof LifeCycle) {
Expand All @@ -112,7 +112,7 @@ protected void doStop() {

@Override
public <T> T getService(Class<T> service) {
if (igniteServiceProvider != null && journalkeeperServiceProvider != null) {
if (sourceServiceProvider != null && targetServiceProvider != null) {
return compositionInternalServiceManager.getService(service);
} else {
return serviceProvider.getService(service);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,54 +30,54 @@ public class CompositionTransactionInternalService implements TransactionInterna
protected static final Logger logger = LoggerFactory.getLogger(CompositionTransactionInternalService.class);

private CompositionConfig config;
private TransactionInternalService igniteTransactionInternalService;
private TransactionInternalService journalkeeperTransactionInternalService;
private TransactionInternalService sourceTransactionInternalService;
private TransactionInternalService targetTransactionInternalService;

public CompositionTransactionInternalService(CompositionConfig config, TransactionInternalService igniteTransactionInternalService,
TransactionInternalService journalkeeperTransactionInternalService) {
public CompositionTransactionInternalService(CompositionConfig config, TransactionInternalService sourceTransactionInternalService,
TransactionInternalService targetTransactionInternalService) {
this.config = config;
this.igniteTransactionInternalService = igniteTransactionInternalService;
this.journalkeeperTransactionInternalService = journalkeeperTransactionInternalService;
this.sourceTransactionInternalService = sourceTransactionInternalService;
this.targetTransactionInternalService = targetTransactionInternalService;
}

@Override
public void begin() {
if (config.isWriteIgnite()) {
igniteTransactionInternalService.begin();
if (config.isWriteSource()) {
sourceTransactionInternalService.begin();
}
if (config.isWriteJournalkeeper()) {
if (config.isWriteTarget()) {
try {
journalkeeperTransactionInternalService.begin();
targetTransactionInternalService.begin();
} catch (Exception e) {
logger.info("journalkeeper transaction begin exception", e);
logger.info("transaction begin exception", e);
}
}
}

@Override
public void commit() {
if (config.isWriteIgnite()) {
igniteTransactionInternalService.commit();
if (config.isWriteSource()) {
sourceTransactionInternalService.commit();
}
if (config.isWriteJournalkeeper()) {
if (config.isWriteTarget()) {
try {
journalkeeperTransactionInternalService.commit();
targetTransactionInternalService.commit();
} catch (Exception e) {
logger.info("journalkeeper transaction commit exception", e);
logger.info("transaction commit exception", e);
}
}
}

@Override
public void rollback() {
if (config.isWriteIgnite()) {
igniteTransactionInternalService.rollback();
if (config.isWriteSource()) {
sourceTransactionInternalService.rollback();
}
if (config.isWriteJournalkeeper()) {
if (config.isWriteTarget()) {
try {
journalkeeperTransactionInternalService.rollback();
targetTransactionInternalService.rollback();
} catch (Exception e) {
logger.info("journalkeeper transaction rollback exception", e);
logger.info("transaction rollback exception", e);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,14 @@ public CompositionConfig(PropertySupplier propertySupplier) {
this.propertySupplier = propertySupplier;
}

public String getSource() {
return PropertySupplier.getValue(propertySupplier, CompositionConfigKey.SOURCE);
}

public String getTarget() {
return PropertySupplier.getValue(propertySupplier, CompositionConfigKey.TARGET);
}

public String getReadSource() {
return PropertySupplier.getValue(propertySupplier, CompositionConfigKey.READ_SOURCE);
}
Expand All @@ -46,19 +54,19 @@ public boolean isWriteAll() {
return getWriteSource().equalsIgnoreCase("all");
}

public boolean isReadIgnite() {
return getReadSource().equalsIgnoreCase("ignite");
public boolean isReadSource() {
return getReadSource().equalsIgnoreCase(getSource());
}

public boolean isWriteIgnite() {
return isWriteAll() || getWriteSource().equalsIgnoreCase("ignite");
public boolean isWriteSource() {
return isWriteAll() || getWriteSource().equalsIgnoreCase(getSource());
}

public boolean isReadJournalkeeper() {
return getReadSource().equalsIgnoreCase("journalkeeper");
public boolean isReadTarget() {
return getReadSource().equalsIgnoreCase(getTarget());
}

public boolean isWriteJournalkeeper() {
return isWriteAll() || getReadSource().equalsIgnoreCase("journalkeeper");
public boolean isWriteTarget() {
return isWriteAll() || getReadSource().equalsIgnoreCase(getTarget());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,12 @@
*/
public enum CompositionConfigKey implements PropertyDef {

// 数据源
SOURCE("nameserver.composition.source", "ignite", PropertyDef.Type.STRING),

// 目标数据源
TARGET("nameserver.composition.target", "sql", PropertyDef.Type.STRING),

// 读数据源
READ_SOURCE("nameserver.composition.read.source", "ignite", PropertyDef.Type.STRING),

Expand Down
Loading

0 comments on commit a5393a1

Please sign in to comment.