Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[3.3.3] Queue factories improvements #5749

Merged
merged 1 commit into from Dec 20, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -102,7 +102,7 @@ public TbQueueProducer<TbProtoQueueMsg<ToRuleEngineMsg>> createRuleEngineMsgProd

@Override
public TbQueueProducer<TbProtoQueueMsg<ToRuleEngineNotificationMsg>> createRuleEngineNotificationsMsgProducer() {
return new TbAwsSqsProducerTemplate<>(ruleEngineAdmin, sqsSettings, ruleEngineSettings.getTopic());
return new TbAwsSqsProducerTemplate<>(notificationAdmin, sqsSettings, ruleEngineSettings.getTopic());
}

@Override
Expand All @@ -112,7 +112,7 @@ public TbQueueProducer<TbProtoQueueMsg<ToCoreMsg>> createTbCoreMsgProducer() {

@Override
public TbQueueProducer<TbProtoQueueMsg<ToCoreNotificationMsg>> createTbCoreNotificationsMsgProducer() {
return new TbAwsSqsProducerTemplate<>(coreAdmin, sqsSettings, coreSettings.getTopic());
return new TbAwsSqsProducerTemplate<>(notificationAdmin, sqsSettings, coreSettings.getTopic());
}

@Override
Expand Down Expand Up @@ -182,13 +182,13 @@ public TbQueueProducer<TbProtoQueueMsg<ToUsageStatsServiceMsg>> createToUsageSta

@Override
public TbQueueConsumer<TbProtoQueueMsg<ToUsageStatsServiceMsg>> createToUsageStatsServiceMsgConsumer() {
return new TbAwsSqsConsumerTemplate<>(transportApiAdmin, sqsSettings, coreSettings.getUsageStatsTopic(),
return new TbAwsSqsConsumerTemplate<>(coreAdmin, sqsSettings, coreSettings.getUsageStatsTopic(),
msg -> new TbProtoQueueMsg<>(msg.getKey(), ToUsageStatsServiceMsg.parseFrom(msg.getData()), msg.getHeaders()));
}

@Override
public TbQueueConsumer<TbProtoQueueMsg<ToOtaPackageStateServiceMsg>> createToOtaPackageStateServiceMsgConsumer() {
return new TbAwsSqsConsumerTemplate<>(transportApiAdmin, sqsSettings, coreSettings.getOtaPackageTopic(),
return new TbAwsSqsConsumerTemplate<>(coreAdmin, sqsSettings, coreSettings.getOtaPackageTopic(),
msg -> new TbProtoQueueMsg<>(msg.getKey(), ToOtaPackageStateServiceMsg.parseFrom(msg.getData()), msg.getHeaders()));
}

Expand Down
Expand Up @@ -43,6 +43,7 @@
import org.thingsboard.server.queue.settings.TbQueueRemoteJsInvokeSettings;
import org.thingsboard.server.queue.settings.TbQueueRuleEngineSettings;
import org.thingsboard.server.queue.settings.TbQueueTransportApiSettings;
import org.thingsboard.server.queue.settings.TbQueueTransportNotificationSettings;
import org.thingsboard.server.queue.sqs.TbAwsSqsAdmin;
import org.thingsboard.server.queue.sqs.TbAwsSqsConsumerTemplate;
import org.thingsboard.server.queue.sqs.TbAwsSqsProducerTemplate;
Expand All @@ -63,6 +64,7 @@ public class AwsSqsTbCoreQueueFactory implements TbCoreQueueFactory {
private final PartitionService partitionService;
private final TbServiceInfoProvider serviceInfoProvider;
private final TbQueueRemoteJsInvokeSettings jsInvokeSettings;
private final TbQueueTransportNotificationSettings transportNotificationSettings;

private final TbQueueAdmin coreAdmin;
private final TbQueueAdmin ruleEngineAdmin;
Expand All @@ -77,14 +79,16 @@ public AwsSqsTbCoreQueueFactory(TbAwsSqsSettings sqsSettings,
PartitionService partitionService,
TbServiceInfoProvider serviceInfoProvider,
TbQueueRemoteJsInvokeSettings jsInvokeSettings,
TbAwsSqsQueueAttributes sqsQueueAttributes) {
TbAwsSqsQueueAttributes sqsQueueAttributes,
TbQueueTransportNotificationSettings transportNotificationSettings) {
this.sqsSettings = sqsSettings;
this.coreSettings = coreSettings;
this.transportApiSettings = transportApiSettings;
this.ruleEngineSettings = ruleEngineSettings;
this.partitionService = partitionService;
this.serviceInfoProvider = serviceInfoProvider;
this.jsInvokeSettings = jsInvokeSettings;
this.transportNotificationSettings = transportNotificationSettings;

this.coreAdmin = new TbAwsSqsAdmin(sqsSettings, sqsQueueAttributes.getCoreAttributes());
this.ruleEngineAdmin = new TbAwsSqsAdmin(sqsSettings, sqsQueueAttributes.getRuleEngineAttributes());
Expand All @@ -95,7 +99,7 @@ public AwsSqsTbCoreQueueFactory(TbAwsSqsSettings sqsSettings,

@Override
public TbQueueProducer<TbProtoQueueMsg<ToTransportMsg>> createTransportNotificationsMsgProducer() {
return new TbAwsSqsProducerTemplate<>(coreAdmin, sqsSettings, coreSettings.getTopic());
return new TbAwsSqsProducerTemplate<>(notificationAdmin, sqsSettings, transportNotificationSettings.getNotificationsTopic());
}

@Override
Expand All @@ -105,7 +109,7 @@ public TbQueueProducer<TbProtoQueueMsg<ToRuleEngineMsg>> createRuleEngineMsgProd

@Override
public TbQueueProducer<TbProtoQueueMsg<ToRuleEngineNotificationMsg>> createRuleEngineNotificationsMsgProducer() {
return new TbAwsSqsProducerTemplate<>(ruleEngineAdmin, sqsSettings, ruleEngineSettings.getTopic());
return new TbAwsSqsProducerTemplate<>(notificationAdmin, sqsSettings, ruleEngineSettings.getTopic());
}

@Override
Expand All @@ -115,7 +119,7 @@ public TbQueueProducer<TbProtoQueueMsg<ToCoreMsg>> createTbCoreMsgProducer() {

@Override
public TbQueueProducer<TbProtoQueueMsg<ToCoreNotificationMsg>> createTbCoreNotificationsMsgProducer() {
return new TbAwsSqsProducerTemplate<>(coreAdmin, sqsSettings, coreSettings.getTopic());
return new TbAwsSqsProducerTemplate<>(notificationAdmin, sqsSettings, coreSettings.getTopic());
}

@Override
Expand All @@ -139,7 +143,7 @@ public TbQueueConsumer<TbProtoQueueMsg<TransportApiRequestMsg>> createTransportA

@Override
public TbQueueProducer<TbProtoQueueMsg<TransportApiResponseMsg>> createTransportApiResponseProducer() {
return new TbAwsSqsProducerTemplate<>(coreAdmin, sqsSettings, coreSettings.getTopic());
return new TbAwsSqsProducerTemplate<>(transportApiAdmin, sqsSettings, transportApiSettings.getResponsesTopic());
}

@Override
Expand Down Expand Up @@ -172,13 +176,13 @@ public TbQueueProducer<TbProtoQueueMsg<ToUsageStatsServiceMsg>> createToUsageSta

@Override
public TbQueueConsumer<TbProtoQueueMsg<ToUsageStatsServiceMsg>> createToUsageStatsServiceMsgConsumer() {
return new TbAwsSqsConsumerTemplate<>(transportApiAdmin, sqsSettings, coreSettings.getUsageStatsTopic(),
return new TbAwsSqsConsumerTemplate<>(coreAdmin, sqsSettings, coreSettings.getUsageStatsTopic(),
msg -> new TbProtoQueueMsg<>(msg.getKey(), ToUsageStatsServiceMsg.parseFrom(msg.getData()), msg.getHeaders()));
}

@Override
public TbQueueConsumer<TbProtoQueueMsg<ToOtaPackageStateServiceMsg>> createToOtaPackageStateServiceMsgConsumer() {
return new TbAwsSqsConsumerTemplate<>(transportApiAdmin, sqsSettings, coreSettings.getOtaPackageTopic(),
return new TbAwsSqsConsumerTemplate<>(coreAdmin, sqsSettings, coreSettings.getOtaPackageTopic(),
msg -> new TbProtoQueueMsg<>(msg.getKey(), ToOtaPackageStateServiceMsg.parseFrom(msg.getData()), msg.getHeaders()));
}

Expand Down
Expand Up @@ -37,6 +37,7 @@
import org.thingsboard.server.queue.settings.TbQueueCoreSettings;
import org.thingsboard.server.queue.settings.TbQueueRemoteJsInvokeSettings;
import org.thingsboard.server.queue.settings.TbQueueRuleEngineSettings;
import org.thingsboard.server.queue.settings.TbQueueTransportNotificationSettings;
import org.thingsboard.server.queue.settings.TbRuleEngineQueueConfiguration;
import org.thingsboard.server.queue.sqs.TbAwsSqsAdmin;
import org.thingsboard.server.queue.sqs.TbAwsSqsConsumerTemplate;
Expand All @@ -57,6 +58,7 @@ public class AwsSqsTbRuleEngineQueueFactory implements TbRuleEngineQueueFactory
private final TbQueueRuleEngineSettings ruleEngineSettings;
private final TbAwsSqsSettings sqsSettings;
private final TbQueueRemoteJsInvokeSettings jsInvokeSettings;
private final TbQueueTransportNotificationSettings transportNotificationSettings;

private final TbQueueAdmin coreAdmin;
private final TbQueueAdmin ruleEngineAdmin;
Expand All @@ -68,13 +70,15 @@ public AwsSqsTbRuleEngineQueueFactory(PartitionService partitionService, TbQueue
TbServiceInfoProvider serviceInfoProvider,
TbAwsSqsSettings sqsSettings,
TbAwsSqsQueueAttributes sqsQueueAttributes,
TbQueueRemoteJsInvokeSettings jsInvokeSettings) {
TbQueueRemoteJsInvokeSettings jsInvokeSettings,
TbQueueTransportNotificationSettings transportNotificationSettings) {
this.partitionService = partitionService;
this.coreSettings = coreSettings;
this.serviceInfoProvider = serviceInfoProvider;
this.ruleEngineSettings = ruleEngineSettings;
this.sqsSettings = sqsSettings;
this.jsInvokeSettings = jsInvokeSettings;
this.transportNotificationSettings = transportNotificationSettings;

this.coreAdmin = new TbAwsSqsAdmin(sqsSettings, sqsQueueAttributes.getCoreAttributes());
this.ruleEngineAdmin = new TbAwsSqsAdmin(sqsSettings, sqsQueueAttributes.getRuleEngineAttributes());
Expand All @@ -84,17 +88,17 @@ public AwsSqsTbRuleEngineQueueFactory(PartitionService partitionService, TbQueue

@Override
public TbQueueProducer<TbProtoQueueMsg<ToTransportMsg>> createTransportNotificationsMsgProducer() {
return new TbAwsSqsProducerTemplate<>(coreAdmin, sqsSettings, coreSettings.getTopic());
return new TbAwsSqsProducerTemplate<>(notificationAdmin, sqsSettings, transportNotificationSettings.getNotificationsTopic());
}

@Override
public TbQueueProducer<TbProtoQueueMsg<ToRuleEngineMsg>> createRuleEngineMsgProducer() {
return new TbAwsSqsProducerTemplate<>(coreAdmin, sqsSettings, coreSettings.getTopic());
return new TbAwsSqsProducerTemplate<>(ruleEngineAdmin, sqsSettings, ruleEngineSettings.getTopic());
}

@Override
public TbQueueProducer<TbProtoQueueMsg<TransportProtos.ToRuleEngineNotificationMsg>> createRuleEngineNotificationsMsgProducer() {
return new TbAwsSqsProducerTemplate<>(ruleEngineAdmin, sqsSettings, ruleEngineSettings.getTopic());
return new TbAwsSqsProducerTemplate<>(notificationAdmin, sqsSettings, ruleEngineSettings.getTopic());
}

@Override
Expand All @@ -104,7 +108,7 @@ public TbQueueProducer<TbProtoQueueMsg<ToCoreMsg>> createTbCoreMsgProducer() {

@Override
public TbQueueProducer<TbProtoQueueMsg<TransportProtos.ToCoreNotificationMsg>> createTbCoreNotificationsMsgProducer() {
return new TbAwsSqsProducerTemplate<>(coreAdmin, sqsSettings, coreSettings.getTopic());
return new TbAwsSqsProducerTemplate<>(notificationAdmin, sqsSettings, coreSettings.getTopic());
}

@Override
Expand Down
Expand Up @@ -32,6 +32,7 @@
import org.thingsboard.server.queue.common.TbProtoQueueMsg;
import org.thingsboard.server.queue.discovery.TbServiceInfoProvider;
import org.thingsboard.server.queue.settings.TbQueueCoreSettings;
import org.thingsboard.server.queue.settings.TbQueueRuleEngineSettings;
import org.thingsboard.server.queue.settings.TbQueueTransportApiSettings;
import org.thingsboard.server.queue.settings.TbQueueTransportNotificationSettings;
import org.thingsboard.server.queue.sqs.TbAwsSqsAdmin;
Expand All @@ -51,34 +52,39 @@ public class AwsSqsTransportQueueFactory implements TbTransportQueueFactory {
private final TbAwsSqsSettings sqsSettings;
private final TbQueueCoreSettings coreSettings;
private final TbServiceInfoProvider serviceInfoProvider;
private final TbQueueRuleEngineSettings ruleEngineSettings;

private final TbQueueAdmin coreAdmin;
private final TbQueueAdmin transportApiAdmin;
private final TbQueueAdmin notificationAdmin;
private final TbQueueAdmin ruleEngineAdmin;

public AwsSqsTransportQueueFactory(TbQueueTransportApiSettings transportApiSettings,
TbQueueTransportNotificationSettings transportNotificationSettings,
TbAwsSqsSettings sqsSettings,
TbServiceInfoProvider serviceInfoProvider,
TbQueueCoreSettings coreSettings,
TbAwsSqsQueueAttributes sqsQueueAttributes) {
TbAwsSqsQueueAttributes sqsQueueAttributes,
TbQueueRuleEngineSettings ruleEngineSettings) {
this.transportApiSettings = transportApiSettings;
this.transportNotificationSettings = transportNotificationSettings;
this.sqsSettings = sqsSettings;
this.serviceInfoProvider = serviceInfoProvider;
this.coreSettings = coreSettings;
this.ruleEngineSettings = ruleEngineSettings;

this.coreAdmin = new TbAwsSqsAdmin(sqsSettings, sqsQueueAttributes.getCoreAttributes());
this.transportApiAdmin = new TbAwsSqsAdmin(sqsSettings, sqsQueueAttributes.getTransportApiAttributes());
this.notificationAdmin = new TbAwsSqsAdmin(sqsSettings, sqsQueueAttributes.getNotificationsAttributes());
this.ruleEngineAdmin = new TbAwsSqsAdmin(sqsSettings, sqsQueueAttributes.getRuleEngineAttributes());
}

@Override
public TbQueueRequestTemplate<TbProtoQueueMsg<TransportApiRequestMsg>, TbProtoQueueMsg<TransportApiResponseMsg>> createTransportApiRequestTemplate() {
TbAwsSqsProducerTemplate<TbProtoQueueMsg<TransportApiRequestMsg>> producerTemplate =
TbQueueProducer<TbProtoQueueMsg<TransportApiRequestMsg>> producerTemplate =
new TbAwsSqsProducerTemplate<>(transportApiAdmin, sqsSettings, transportApiSettings.getRequestsTopic());

TbAwsSqsConsumerTemplate<TbProtoQueueMsg<TransportApiResponseMsg>> consumerTemplate =
TbQueueConsumer<TbProtoQueueMsg<TransportApiResponseMsg>> consumerTemplate =
new TbAwsSqsConsumerTemplate<>(transportApiAdmin, sqsSettings,
transportApiSettings.getResponsesTopic() + "_" + serviceInfoProvider.getServiceId(),
msg -> new TbProtoQueueMsg<>(msg.getKey(), TransportApiResponseMsg.parseFrom(msg.getData()), msg.getHeaders()));
Expand All @@ -96,7 +102,7 @@ public TbQueueRequestTemplate<TbProtoQueueMsg<TransportApiRequestMsg>, TbProtoQu

@Override
public TbQueueProducer<TbProtoQueueMsg<ToRuleEngineMsg>> createRuleEngineMsgProducer() {
return new TbAwsSqsProducerTemplate<>(transportApiAdmin, sqsSettings, transportApiSettings.getRequestsTopic());
return new TbAwsSqsProducerTemplate<>(ruleEngineAdmin, sqsSettings, ruleEngineSettings.getTopic());
}

@Override
Expand Down Expand Up @@ -126,5 +132,8 @@ private void destroy() {
if (notificationAdmin != null) {
notificationAdmin.destroy();
}
if (ruleEngineAdmin != null) {
ruleEngineAdmin.destroy();
}
}
}
Expand Up @@ -131,7 +131,7 @@ public TbQueueProducer<TbProtoQueueMsg<ToRuleEngineNotificationMsg>> createRuleE
requestBuilder.settings(kafkaSettings);
requestBuilder.clientId("monolith-rule-engine-notifications-" + serviceInfoProvider.getServiceId());
requestBuilder.defaultTopic(ruleEngineSettings.getTopic());
requestBuilder.admin(ruleEngineAdmin);
requestBuilder.admin(notificationAdmin);
return requestBuilder.build();
}

Expand All @@ -151,7 +151,7 @@ public TbQueueProducer<TbProtoQueueMsg<ToCoreNotificationMsg>> createTbCoreNotif
requestBuilder.settings(kafkaSettings);
requestBuilder.clientId("monolith-core-notifications-" + serviceInfoProvider.getServiceId());
requestBuilder.defaultTopic(coreSettings.getTopic());
requestBuilder.admin(coreAdmin);
requestBuilder.admin(notificationAdmin);
return requestBuilder.build();
}

Expand Down Expand Up @@ -328,5 +328,8 @@ private void destroy() {
if (notificationAdmin != null) {
notificationAdmin.destroy();
}
if (fwUpdatesAdmin != null) {
fwUpdatesAdmin.destroy();
}
}
}