Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

event bus optimizations #6980

Merged
merged 1 commit into from Jul 8, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -22,6 +22,7 @@
import de.metas.event.Topic;
import de.metas.event.log.EventLogUserService;
import de.metas.event.log.EventLogUserService.InvokeHandlerAndLogRequest;
import de.metas.event.remote.RabbitMQEventBusConfiguration;
import de.metas.logging.LogManager;
import de.metas.logging.TableRecordMDC;
import lombok.NonNull;
Expand Down Expand Up @@ -52,7 +53,7 @@
@DependsOn(Adempiere.BEAN_NAME) // needs database
public class DocumentPostingBusService
{
private static final Topic TOPIC = Topic.remote("de.metas.acct.handler.DocumentPostRequest");
private static final Topic TOPIC = RabbitMQEventBusConfiguration.AccountingQueueConfiguration.EVENTBUS_TOPIC;
private static final String PROPERTY_DocumentPostRequest = "DocumentPostRequest";

// services
Expand Down
Expand Up @@ -17,8 +17,8 @@
import de.metas.event.IEventBusFactory;
import de.metas.event.IEventListener;
import de.metas.event.Topic;
import de.metas.event.Type;
import de.metas.event.impl.EventMDC;
import de.metas.event.remote.RabbitMQEventBusConfiguration;
import de.metas.logging.LogManager;
import de.metas.util.Check;
import de.metas.util.Services;
Expand Down Expand Up @@ -53,10 +53,7 @@ final class CacheInvalidationRemoteHandler implements IEventListener

private static final Logger logger = LogManager.getLogger(CacheInvalidationRemoteHandler.class);

private static final Topic TOPIC_CacheInvalidation = Topic.builder()
.name("de.metas.cache.CacheInvalidationRemoteHandler")
.type(Type.REMOTE)
.build();
private static final Topic TOPIC_CacheInvalidation = RabbitMQEventBusConfiguration.CacheInvalidationQueueConfiguration.EVENTBUS_TOPIC;

private static final String EVENT_PROPERTY = CacheInvalidateRequest.class.getSimpleName();

Expand Down
Expand Up @@ -6,6 +6,8 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import javax.annotation.Nullable;

import org.adempiere.exceptions.AdempiereException;
import org.adempiere.util.concurrent.CustomizableThreadFactory;
import org.adempiere.util.jmx.JMXRegistry;
Expand All @@ -17,7 +19,6 @@
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import com.google.common.cache.RemovalListener;
import com.google.common.cache.RemovalNotification;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.SetMultimap;
Expand All @@ -33,8 +34,6 @@
import de.metas.logging.LogManager;
import lombok.NonNull;

import javax.annotation.Nullable;

@Service
public class EventBusFactory implements IEventBusFactory
{
Expand Down Expand Up @@ -74,6 +73,8 @@ public EventBusFactory(@NonNull final IEventBusRemoteEndpoint remoteEndpoint)
// Setup default user notification topics
addAvailableUserNotificationsTopic(EventBusConfig.TOPIC_GeneralUserNotifications);
addAvailableUserNotificationsTopic(EventBusConfig.TOPIC_GeneralUserNotificationsLocal);

remoteEndpoint.setEventBusFactory(this);
}

@Override
Expand Down
Expand Up @@ -2,13 +2,18 @@

import de.metas.event.Event;
import de.metas.event.IEventBus;
import de.metas.event.IEventBusFactory;
import de.metas.event.IEventListener;
import lombok.NonNull;

/**
* Defines an integration point between a remote endpoint and {@link IEventBus}. Binding the endpoint to the event bus is done by registering a forwarding {@link IEventListener}.
*/
public interface IEventBusRemoteEndpoint
{
/** Called by API when this endpoint is registered to event bus factory */
void setEventBusFactory(@NonNull IEventBusFactory eventBusFactory);

/**
* Set the given <code>event</code> to the event bus identified by the given <code>topicName</code>.
*/
Expand Down
@@ -1,5 +1,7 @@
package de.metas.event.remote;

import java.util.Optional;

import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.core.AnonymousQueue;
import org.springframework.amqp.core.Binding;
Expand All @@ -17,6 +19,9 @@

import com.fasterxml.jackson.databind.ObjectMapper;

import de.metas.event.Topic;
import de.metas.monitoring.adapter.NoopPerformanceMonitoringService;
import de.metas.monitoring.adapter.PerformanceMonitoringService;
import lombok.NonNull;

/*
Expand Down Expand Up @@ -45,39 +50,10 @@
@EnableRabbit // needed for @RabbitListener to be considered
public class RabbitMQEventBusConfiguration
{
private static final String EVENTS_QUEUE_BEAN_NAME = "metasfreshEventsQueue";
public static final String EVENTS_QUEUE_NAME_SPEL = "#{metasfreshEventsQueue.name}";
public static final String EVENTS_EXCHANGE_NAME = "metasfresh-events";

@Value("${spring.application.name:spring.application.name-not-set}")
private static final String APPLICATION_NAME_SPEL = "${spring.application.name:spring.application.name-not-set}";
@Value(APPLICATION_NAME_SPEL)
private String appName;

@Bean
public AnonymousQueue.NamingStrategy namingStrategy()
{
return new AnonymousQueue.Base64UrlNamingStrategy("metasfresh.events." + appName + "-");
}

@Bean(EVENTS_QUEUE_BEAN_NAME)
public AnonymousQueue eventsQueue(AnonymousQueue.NamingStrategy eventQueueNamingStrategy)
{
return new AnonymousQueue(eventQueueNamingStrategy);
}

@Bean
public FanoutExchange eventsExchange()
{
return new FanoutExchange(EVENTS_EXCHANGE_NAME);
}

@Bean
public Binding eventsBinding(AnonymousQueue.NamingStrategy eventQueueNamingStrategy)
{
return BindingBuilder
.bind(eventsQueue(eventQueueNamingStrategy))
.to(eventsExchange());
}

@Bean
public org.springframework.amqp.support.converter.MessageConverter amqpMessageConverter(final ObjectMapper jsonObjectMapper)
{
Expand Down Expand Up @@ -111,8 +87,121 @@ public ConnectionNameStrategy connectionNameStrategy()
}

@Bean
public RabbitMQEventBusRemoteEndpoint eventBusRemoteEndpoint(@NonNull final AmqpTemplate amqpTemplate)
public RabbitMQEventBusRemoteEndpoint eventBusRemoteEndpoint(
@NonNull final AmqpTemplate amqpTemplate,
@NonNull final Optional<PerformanceMonitoringService> performanceMonitoringService)
{
return new RabbitMQEventBusRemoteEndpoint(
amqpTemplate,
performanceMonitoringService.orElse(NoopPerformanceMonitoringService.INSTANCE));
}

public static String getAMQPExchangeNameByTopicName(final String topicName)
{
if (AccountingQueueConfiguration.EVENTBUS_TOPIC.getName().equals(topicName))
{
return AccountingQueueConfiguration.EXCHANGE_NAME;
}
else if (CacheInvalidationQueueConfiguration.EVENTBUS_TOPIC.getName().equals(topicName))
{
return CacheInvalidationQueueConfiguration.EXCHANGE_NAME;
}
else
{
return DefaultQueueConfiguration.EXCHANGE_NAME;
}
}

@Configuration
public static class DefaultQueueConfiguration
{
return new RabbitMQEventBusRemoteEndpoint(amqpTemplate);
private static final String QUEUE_BEAN_NAME = "metasfreshEventsQueue";
public static final String QUEUE_NAME_SPEL = "#{metasfreshEventsQueue.name}";
private static final String EXCHANGE_NAME = "metasfresh-events";

@Value(APPLICATION_NAME_SPEL)
private String appName;

@Bean(QUEUE_BEAN_NAME)
public AnonymousQueue eventsQueue()
{
final AnonymousQueue.NamingStrategy eventQueueNamingStrategy = new AnonymousQueue.Base64UrlNamingStrategy("metasfresh.events." + appName + "-");
return new AnonymousQueue(eventQueueNamingStrategy);
}

@Bean
public FanoutExchange eventsExchange()
{
return new FanoutExchange(EXCHANGE_NAME);
}

@Bean
public Binding eventsBinding()
{
return BindingBuilder.bind(eventsQueue()).to(eventsExchange());
}
}

@Configuration
public static class CacheInvalidationQueueConfiguration
{
public static final Topic EVENTBUS_TOPIC = Topic.remote("de.metas.cache.CacheInvalidationRemoteHandler");
private static final String QUEUE_BEAN_NAME = "metasfreshCacheInvalidationEventsQueue";
public static final String QUEUE_NAME_SPEL = "#{metasfreshCacheInvalidationEventsQueue.name}";
private static final String EXCHANGE_NAME = "metasfresh-cache-events";

@Value(APPLICATION_NAME_SPEL)
private String appName;

@Bean(QUEUE_BEAN_NAME)
public AnonymousQueue cacheInvalidationQueue()
{
final AnonymousQueue.NamingStrategy eventQueueNamingStrategy = new AnonymousQueue.Base64UrlNamingStrategy(EVENTBUS_TOPIC.getName() + "." + appName + "-");
return new AnonymousQueue(eventQueueNamingStrategy);
}

@Bean
public FanoutExchange cacheInvalidationExchange()
{
return new FanoutExchange(EXCHANGE_NAME);
}

@Bean
public Binding cacheInvalidationBinding()
{
return BindingBuilder.bind(cacheInvalidationQueue()).to(cacheInvalidationExchange());
}

}

@Configuration
public static class AccountingQueueConfiguration
{
public static final Topic EVENTBUS_TOPIC = Topic.remote("de.metas.acct.handler.DocumentPostRequest");
private static final String QUEUE_BEAN_NAME = "metasfreshAccountingEventsQueue";
public static final String QUEUE_NAME_SPEL = "#{metasfreshAccountingEventsQueue.name}";
private static final String EXCHANGE_NAME = "metasfresh-accounting-events";

@Value(APPLICATION_NAME_SPEL)
private String appName;

@Bean(QUEUE_BEAN_NAME)
public AnonymousQueue accountingQueue()
{
final AnonymousQueue.NamingStrategy eventQueueNamingStrategy = new AnonymousQueue.Base64UrlNamingStrategy(EVENTBUS_TOPIC.getName() + "." + appName + "-");
return new AnonymousQueue(eventQueueNamingStrategy);
}

@Bean
public FanoutExchange accountingExchange()
{
return new FanoutExchange(EXCHANGE_NAME);
}

@Bean
public Binding accountingBinding()
{
return BindingBuilder.bind(accountingQueue()).to(accountingExchange());
}
}
}