Skip to content

Commit

Permalink
AMBARI-25063. Restarting Ambari Server Fails Due to Recursive Injecti…
Browse files Browse the repository at this point in the history
…on of STOMPUpdatePublisher. (mpapirkovskyy) (apache#2826)
  • Loading branch information
mpapirkovskyy committed Feb 20, 2019
1 parent a6d9116 commit 77faf73
Show file tree
Hide file tree
Showing 5 changed files with 86 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,22 +20,29 @@

import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

import org.apache.ambari.server.events.STOMPEvent;

import com.google.common.eventbus.EventBus;
import com.google.inject.Singleton;

@Singleton
public abstract class BufferedUpdateEventPublisher<T> {

private static final long TIMEOUT = 1000L;
private final ConcurrentLinkedQueue<T> buffer = new ConcurrentLinkedQueue<>();

public abstract STOMPEvent.Type getType();

private ScheduledExecutorService scheduledExecutorService;

public BufferedUpdateEventPublisher(STOMPUpdatePublisher stompUpdatePublisher) {
stompUpdatePublisher.registerPublisher(this);
}

public void publish(T event, EventBus m_eventBus) {
if (scheduledExecutorService == null) {
scheduledExecutorService =
Expand Down Expand Up @@ -77,4 +84,17 @@ public final void run() {
mergeBufferAndPost(events, m_eventBus);
}
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
BufferedUpdateEventPublisher<?> that = (BufferedUpdateEventPublisher<?>) o;
return Objects.equals(getType(), that.getType());
}

@Override
public int hashCode() {
return Objects.hash(getType());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,27 @@
import java.util.List;
import java.util.stream.Collectors;

import org.apache.ambari.server.EagerSingleton;
import org.apache.ambari.server.events.HostComponentUpdate;
import org.apache.ambari.server.events.HostComponentsUpdateEvent;
import org.apache.ambari.server.events.STOMPEvent;

import com.google.common.eventbus.EventBus;
import com.google.inject.Singleton;
import com.google.inject.Inject;

@Singleton
@EagerSingleton
public class HostComponentUpdateEventPublisher extends BufferedUpdateEventPublisher<HostComponentsUpdateEvent> {

@Inject
public HostComponentUpdateEventPublisher(STOMPUpdatePublisher stompUpdatePublisher) {
super(stompUpdatePublisher);
}

@Override
public STOMPEvent.Type getType() {
return STOMPEvent.Type.HOSTCOMPONENT;
}

@Override
public void mergeBufferAndPost(List<HostComponentsUpdateEvent> events, EventBus m_eventBus) {
List<HostComponentUpdate> hostComponentUpdates = events.stream().flatMap(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,10 @@
import java.util.List;
import java.util.Map;

import org.apache.ambari.server.EagerSingleton;
import org.apache.ambari.server.controller.internal.CalculatedStatus;
import org.apache.ambari.server.events.RequestUpdateEvent;
import org.apache.ambari.server.events.STOMPEvent;
import org.apache.ambari.server.orm.dao.ClusterDAO;
import org.apache.ambari.server.orm.dao.HostRoleCommandDAO;
import org.apache.ambari.server.orm.dao.RequestDAO;
Expand All @@ -32,9 +34,8 @@

import com.google.common.eventbus.EventBus;
import com.google.inject.Inject;
import com.google.inject.Singleton;

@Singleton
@EagerSingleton
public class RequestUpdateEventPublisher extends BufferedUpdateEventPublisher<RequestUpdateEvent> {

@Inject
Expand All @@ -49,6 +50,16 @@ public class RequestUpdateEventPublisher extends BufferedUpdateEventPublisher<Re
@Inject
private ClusterDAO clusterDAO;

@Inject
public RequestUpdateEventPublisher(STOMPUpdatePublisher stompUpdatePublisher) {
super(stompUpdatePublisher);
}

@Override
public STOMPEvent.Type getType() {
return STOMPEvent.Type.REQUEST;
}

@Override
public void mergeBufferAndPost(List<RequestUpdateEvent> events, EventBus m_eventBus) {
Map<Long, RequestUpdateEvent> filteredRequests = new HashMap<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,37 +17,29 @@
*/
package org.apache.ambari.server.events.publishers;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import org.apache.ambari.server.AmbariRuntimeException;
import org.apache.ambari.server.events.DefaultMessageEmitter;
import org.apache.ambari.server.events.HostComponentsUpdateEvent;
import org.apache.ambari.server.events.RequestUpdateEvent;
import org.apache.ambari.server.events.STOMPEvent;
import org.apache.ambari.server.events.ServiceUpdateEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.common.eventbus.AsyncEventBus;
import com.google.common.eventbus.EventBus;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.inject.Inject;
import com.google.inject.Singleton;

@Singleton
public class STOMPUpdatePublisher {
private static final Logger LOG = LoggerFactory.getLogger(STOMPUpdatePublisher.class);

private final EventBus agentEventBus;
private final EventBus apiEventBus;

@Inject
private RequestUpdateEventPublisher requestUpdateEventPublisher;

@Inject
private HostComponentUpdateEventPublisher hostComponentUpdateEventPublisher;

@Inject
private ServiceUpdateEventPublisher serviceUpdateEventPublisher;

private final ExecutorService threadPoolExecutorAgent = Executors.newSingleThreadExecutor(
new ThreadFactoryBuilder().setNameFormat("stomp-agent-bus-%d").build());
private final ExecutorService threadPoolExecutorAPI = Executors.newSingleThreadExecutor(
Expand All @@ -61,6 +53,16 @@ public STOMPUpdatePublisher() throws NoSuchFieldException, IllegalAccessExceptio
threadPoolExecutorAPI);
}

private List<BufferedUpdateEventPublisher> publishers = new ArrayList<>();

public void registerPublisher(BufferedUpdateEventPublisher publisher) {
if (publishers.contains(publisher)) {
LOG.error("Publisher for type {} is already in use", publisher.getType());
} else {
publishers.add(publisher);
}
}

public void publish(STOMPEvent event) {
if (DefaultMessageEmitter.DEFAULT_AGENT_EVENT_TYPES.contains(event.getType())) {
publishAgent(event);
Expand All @@ -73,13 +75,14 @@ public void publish(STOMPEvent event) {
}

private void publishAPI(STOMPEvent event) {
if (event.getType().equals(STOMPEvent.Type.REQUEST)) {
requestUpdateEventPublisher.publish((RequestUpdateEvent) event, apiEventBus);
} else if (event.getType().equals(STOMPEvent.Type.HOSTCOMPONENT)) {
hostComponentUpdateEventPublisher.publish((HostComponentsUpdateEvent) event, apiEventBus);
} else if (event.getType().equals(STOMPEvent.Type.SERVICE)) {
serviceUpdateEventPublisher.publish((ServiceUpdateEvent) event, apiEventBus);
} else {
boolean published = false;
for (BufferedUpdateEventPublisher publisher : publishers) {
if (publisher.getType().equals(event.getType())) {
publisher.publish(event, apiEventBus);
published = true;
}
}
if (!published) {
apiEventBus.post(event);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,18 +23,30 @@
import java.util.List;
import java.util.Map;

import org.apache.ambari.server.EagerSingleton;
import org.apache.ambari.server.controller.utilities.ServiceCalculatedStateFactory;
import org.apache.ambari.server.controller.utilities.state.ServiceCalculatedState;
import org.apache.ambari.server.events.STOMPEvent;
import org.apache.ambari.server.events.ServiceUpdateEvent;
import org.apache.ambari.server.state.State;

import com.google.common.eventbus.EventBus;
import com.google.inject.Singleton;
import com.google.inject.Inject;

@Singleton
@EagerSingleton
public class ServiceUpdateEventPublisher extends BufferedUpdateEventPublisher<ServiceUpdateEvent> {
private Map<String, Map<String, State>> states = new HashMap<>();

@Inject
public ServiceUpdateEventPublisher(STOMPUpdatePublisher stompUpdatePublisher) {
super(stompUpdatePublisher);
}


@Override
public STOMPEvent.Type getType() {
return STOMPEvent.Type.SERVICE;
}

@Override
public void mergeBufferAndPost(List<ServiceUpdateEvent> events, EventBus eventBus) {
Expand Down

0 comments on commit 77faf73

Please sign in to comment.