Skip to content

Commit

Permalink
Producer Listener support refactored for code reuse (Netflix#448)
Browse files Browse the repository at this point in the history
* Producer Listener support refactored

* Address PR feedback
  • Loading branch information
Sunjeet committed Feb 6, 2020
1 parent 94cc7db commit b080244
Show file tree
Hide file tree
Showing 10 changed files with 128 additions and 84 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package com.netflix.hollow.api.common;

/**
* The top-level type for all listeners.
*/
public interface EventListener {
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package com.netflix.hollow.api.common;

import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;

public class ListenerSupport {

protected final CopyOnWriteArrayList<EventListener> eventListeners;

public ListenerSupport() {
eventListeners = new CopyOnWriteArrayList<>();
}

public ListenerSupport(List<? extends EventListener> listeners) {
eventListeners = new CopyOnWriteArrayList<>(listeners);
}

public ListenerSupport(ListenerSupport that) {
eventListeners = new CopyOnWriteArrayList<>(that.eventListeners);
}

public void addListener(EventListener listener) {
eventListeners.addIfAbsent(listener);
}

public void removeListener(EventListener listener) {
eventListeners.remove(listener);
}

}
44 changes: 44 additions & 0 deletions hollow/src/main/java/com/netflix/hollow/api/common/Listeners.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package com.netflix.hollow.api.common;

import com.netflix.hollow.api.producer.listener.VetoableListener;
import java.util.Arrays;
import java.util.function.Consumer;
import java.util.logging.Level;
import java.util.logging.Logger;
import java.util.stream.Stream;

public abstract class Listeners {

private static final Logger LOG = Logger.getLogger(Listeners.class.getName());

protected final EventListener[] listeners;

protected Listeners(EventListener[] listeners) {
this.listeners = listeners;
}

public <T extends EventListener> Stream<T> getListeners(Class<T> c) {
return Arrays.stream(listeners).filter(c::isInstance).map(c::cast);
}

protected <T extends EventListener> void fire(
Class<T> c, Consumer<? super T> r) {
fireStream(getListeners(c), r);
}

protected <T extends EventListener> void fireStream(
Stream<T> s, Consumer<? super T> r) {
s.forEach(l -> {
try {
r.accept(l);
} catch (VetoableListener.ListenerVetoException e) {
throw e;
} catch (RuntimeException e) {
if (l instanceof VetoableListener) {
throw e;
}
LOG.log(Level.WARNING, "Error executing listener", e);
}
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package com.netflix.hollow.api.producer;

import static com.netflix.hollow.api.producer.ProducerListenerSupport.ProducerListeners;
import static java.lang.System.currentTimeMillis;
import static java.util.stream.Collectors.toList;

Expand Down Expand Up @@ -69,7 +70,7 @@ abstract class AbstractHollowProducer {
final HollowProducer.BlobStorageCleaner blobStorageCleaner;
HollowObjectMapper objectMapper;
final HollowProducer.VersionMinter versionMinter;
final ListenerSupport listeners;
final ProducerListenerSupport listeners;
ReadStateHelper readStates;
final Executor snapshotPublishExecutor;
final int numStatesBetweenSnapshots;
Expand Down Expand Up @@ -143,7 +144,7 @@ private AbstractHollowProducer(
this.readStates = ReadStateHelper.newDeltaChain();
this.blobStorageCleaner = blobStorageCleaner;

this.listeners = new ListenerSupport(eventListeners.stream().distinct().collect(toList()));
this.listeners = new ProducerListenerSupport(eventListeners.stream().distinct().collect(toList()));

this.metrics = new HollowProducerMetrics();
this.metricsCollector = metricsCollector;
Expand Down Expand Up @@ -258,7 +259,7 @@ private HollowProducer.ReadState restore(
}

HollowProducer.ReadState readState = null;
ListenerSupport.Listeners localListeners = listeners.listeners();
ProducerListeners localListeners = listeners.listeners();
Status.RestoreStageBuilder status = localListeners.fireProducerRestoreStart(versionDesired);
try {
if (versionDesired != HollowConstants.VERSION_NONE) {
Expand Down Expand Up @@ -318,7 +319,7 @@ public boolean enablePrimaryProducer(boolean doEnable) {
}

long runCycle(HollowProducer.Incremental.IncrementalPopulator incrementalPopulator, HollowProducer.Populator populator) {
ListenerSupport.Listeners localListeners = listeners.listeners();
ProducerListeners localListeners = listeners.listeners();

if (!singleProducerEnforcer.isPrimary()) {
// TODO: minimum time spacing between cycles
Expand Down Expand Up @@ -346,7 +347,7 @@ long runCycle(HollowProducer.Incremental.IncrementalPopulator incrementalPopulat
}

long runCycle(
ListenerSupport.Listeners listeners,
ProducerListeners listeners,
HollowProducer.Incremental.IncrementalPopulator incrementalPopulator, HollowProducer.Populator populator,
Status.StageWithStateBuilder cycleStatus, long toVersion) {
// 1. Begin a new cycle
Expand Down Expand Up @@ -480,7 +481,7 @@ public void removeListener(HollowProducerEventListener listener) {
}

void populate(
ListenerSupport.Listeners listeners,
ProducerListeners listeners,
HollowProducer.Incremental.IncrementalPopulator incrementalPopulator, HollowProducer.Populator populator,
long toVersion) throws Exception {
assert incrementalPopulator != null ^ populator != null;
Expand Down Expand Up @@ -508,7 +509,7 @@ void populate(
}

HollowProducer.Populator incrementalPopulate(
ListenerSupport.Listeners listeners,
ProducerListeners listeners,
HollowProducer.Incremental.IncrementalPopulator incrementalPopulator,
long toVersion) throws Exception {
ConcurrentHashMap<RecordPrimaryKey, Object> events = new ConcurrentHashMap<>();
Expand All @@ -534,7 +535,7 @@ HollowProducer.Populator incrementalPopulate(
/*
* Publish the write state, storing the artifacts in the provided object. Visible for testing.
*/
void publish(ListenerSupport.Listeners listeners, long toVersion, Artifacts artifacts) throws IOException {
void publish(ProducerListeners listeners, long toVersion, Artifacts artifacts) throws IOException {
Status.StageBuilder psb = listeners.firePublishStart(toVersion);
try {
if(!readStates.hasCurrent() || doIntegrityCheck || numStatesUntilNextSnapshot <= 0)
Expand Down Expand Up @@ -576,7 +577,7 @@ void publish(ListenerSupport.Listeners listeners, long toVersion, Artifacts arti
}
}

private HollowProducer.Blob stageBlob(ListenerSupport.Listeners listeners, HollowProducer.Blob blob)
private HollowProducer.Blob stageBlob(ProducerListeners listeners, HollowProducer.Blob blob)
throws IOException {
Status.PublishBuilder builder = new Status.PublishBuilder();
HollowBlobWriter writer = new HollowBlobWriter(getWriteEngine());
Expand All @@ -593,7 +594,7 @@ private HollowProducer.Blob stageBlob(ListenerSupport.Listeners listeners, Hollo
}
}

private void publishBlob(ListenerSupport.Listeners listeners, HollowProducer.Blob blob) {
private void publishBlob(ProducerListeners listeners, HollowProducer.Blob blob) {
Status.PublishBuilder builder = new Status.PublishBuilder();
try {
builder.blob(blob);
Expand All @@ -611,7 +612,7 @@ private void publishBlob(ListenerSupport.Listeners listeners, HollowProducer.Blo
}
}

private void publishSnapshotBlobAsync(ListenerSupport.Listeners listeners, Artifacts artifacts) {
private void publishSnapshotBlobAsync(ProducerListeners listeners, Artifacts artifacts) {
HollowProducer.Blob blob = artifacts.snapshot;
CompletableFuture<HollowProducer.Blob> cf = new CompletableFuture<>();
try {
Expand Down Expand Up @@ -669,7 +670,7 @@ private void publishBlob(HollowProducer.Blob b) {
* @return S(cur) and S(pnd)
*/
private ReadStateHelper checkIntegrity(
ListenerSupport.Listeners listeners, ReadStateHelper readStates, Artifacts artifacts,
ProducerListeners listeners, ReadStateHelper readStates, Artifacts artifacts,
boolean schemaChangedFromPriorVersion) throws Exception {
Status.StageWithStateBuilder status = listeners.fireIntegrityCheckStart(readStates.pending());
try {
Expand Down Expand Up @@ -764,7 +765,7 @@ private void applyDelta(HollowProducer.Blob blob, HollowReadStateEngine stateEng
}
}

private void validate(ListenerSupport.Listeners listeners, HollowProducer.ReadState readState) {
private void validate(ProducerListeners listeners, HollowProducer.ReadState readState) {
Status.StageWithStateBuilder psb = listeners.fireValidationStart(readState);

ValidationStatus status = null;
Expand Down Expand Up @@ -796,7 +797,7 @@ private void validate(ListenerSupport.Listeners listeners, HollowProducer.ReadSt
}


private void announce(ListenerSupport.Listeners listeners, HollowProducer.ReadState readState) {
private void announce(ProducerListeners listeners, HollowProducer.ReadState readState) {
if (announcer != null) {
Status.StageWithStateBuilder status = listeners.fireAnnouncementStart(readState);
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ public class HollowIncrementalProducer {
private final HollowProducer producer;
private final ConcurrentHashMap<RecordPrimaryKey, Object> mutations;
private final HollowProducer.Populator populator;
private final ListenerSupport listeners;
private final ProducerListenerSupport listeners;
private final Map<String, Object> cycleMetadata;
private final Class<?>[] dataModel;
private final HollowConsumer.AnnouncementWatcher announcementWatcher;
Expand All @@ -70,7 +70,7 @@ protected HollowIncrementalProducer(HollowProducer producer, double threadsPerCp
this.dataModel = classes;
this.announcementWatcher = announcementWatcher;
this.blobRetriever = blobRetriever;
this.listeners = new ListenerSupport();
this.listeners = new ProducerListenerSupport();
this.cycleMetadata = new HashMap<String, Object>();
this.threadsPerCpu = threadsPerCpu;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -693,7 +693,7 @@ public B withAnnouncer(HollowProducer.Announcer announcer) {
* @throws IllegalArgumentException if the listener does not implement a recognized event listener type
*/
public B withListener(HollowProducerEventListener listener) {
if (!ListenerSupport.isValidListener(listener)) {
if (!ProducerListenerSupport.isValidListener(listener)) {
throw new IllegalArgumentException(
"Listener does not implement a recognized event listener type: " + listener);
}
Expand All @@ -711,7 +711,7 @@ public B withListener(HollowProducerEventListener listener) {
*/
public B withListeners(HollowProducerEventListener... listeners) {
for (HollowProducerEventListener listener : listeners) {
if (!ListenerSupport.isValidListener(listener)) {
if (!ProducerListenerSupport.isValidListener(listener)) {
throw new IllegalArgumentException(
"Listener does not implement a recognized event listener type: " + listener);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.stream.Collectors.toList;

import com.netflix.hollow.api.common.ListenerSupport;
import com.netflix.hollow.api.common.Listeners;
import com.netflix.hollow.api.producer.HollowProducerListener.ProducerStatus;
import com.netflix.hollow.api.producer.IncrementalCycleListener.IncrementalCycleStatus;
import com.netflix.hollow.api.producer.listener.AnnouncementListener;
Expand All @@ -30,27 +32,24 @@
import com.netflix.hollow.api.producer.listener.PopulateListener;
import com.netflix.hollow.api.producer.listener.PublishListener;
import com.netflix.hollow.api.producer.listener.RestoreListener;
import com.netflix.hollow.api.producer.listener.VetoableListener;
import com.netflix.hollow.api.producer.validation.ValidationStatus;
import com.netflix.hollow.api.producer.validation.ValidationStatusListener;
import com.netflix.hollow.api.producer.validation.ValidatorListener;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.function.Consumer;
import java.util.logging.Level;
import java.util.logging.Logger;
import java.util.stream.Stream;

final class ListenerSupport {
final class ProducerListenerSupport extends ListenerSupport {

private static final Logger LOG = Logger.getLogger(ListenerSupport.class.getName());
private static final Logger LOG = Logger.getLogger(ProducerListenerSupport.class.getName());

private static final Collection<Class<? extends HollowProducerEventListener>> LISTENERS =
Stream.of(DataModelInitializationListener.class,
Expand All @@ -68,81 +67,43 @@ static boolean isValidListener(HollowProducerEventListener l) {
return LISTENERS.stream().anyMatch(c -> c.isInstance(l));
}

private final CopyOnWriteArrayList<HollowProducerEventListener> eventListeners;

ListenerSupport() {
eventListeners = new CopyOnWriteArrayList<>();

ProducerListenerSupport() {
// @@@ This is used only by HollowIncrementalProducer, and should be
// separated out
incrementalCycleListeners = new CopyOnWriteArraySet<>();
}

ListenerSupport(List<? extends HollowProducerEventListener> listeners) {
eventListeners = new CopyOnWriteArrayList<>(listeners);
ProducerListenerSupport(List<? extends HollowProducerEventListener> listeners) {
super(listeners);

// @@@ This is used only by HollowIncrementalProducer, and should be
// separated out
incrementalCycleListeners = new CopyOnWriteArraySet<>();
}

ListenerSupport(ListenerSupport that) {
eventListeners = new CopyOnWriteArrayList<>(that.eventListeners);
ProducerListenerSupport(ProducerListenerSupport that) {
super(that);

// @@@ This is used only by HollowIncrementalProducer, and should be
// separated out
incrementalCycleListeners = new CopyOnWriteArraySet<>(that.incrementalCycleListeners);
}

void addListener(HollowProducerEventListener listener) {
eventListeners.addIfAbsent(listener);
}

void removeListener(HollowProducerEventListener listener) {
eventListeners.remove(listener);
}

//

/**
* Copies the collection of listeners so they can be iterated on without changing.
* From the returned copy events may be fired.
* Any addition or removal of listeners will take effect on the next cycle.
*/
Listeners listeners() {
return new Listeners(eventListeners.toArray(new HollowProducerEventListener[0]));
ProducerListeners listeners() {
return new ProducerListeners(eventListeners.toArray(new HollowProducerEventListener[0]));
}

static final class Listeners {
final HollowProducerEventListener[] listeners;

Listeners(HollowProducerEventListener[] listeners) {
this.listeners = listeners;
}

<T extends HollowProducerEventListener> Stream<T> getListeners(Class<T> c) {
return Arrays.stream(listeners).filter(c::isInstance).map(c::cast);
}

private <T extends HollowProducerEventListener> void fire(
Class<T> c, Consumer<? super T> r) {
fireStream(getListeners(c), r);
}
static final class ProducerListeners extends Listeners {

private <T extends HollowProducerEventListener> void fireStream(
Stream<T> s, Consumer<? super T> r) {
s.forEach(l -> {
try {
r.accept(l);
} catch (VetoableListener.ListenerVetoException e) {
throw e;
} catch (RuntimeException e) {
if (l instanceof VetoableListener) {
throw e;
}
LOG.log(Level.WARNING, "Error executing listener", e);
}
});
ProducerListeners(HollowProducerEventListener[] listeners) {
super(listeners);
}

void fireProducerInit(long elapsedMillis) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,10 @@
*/
package com.netflix.hollow.api.producer.listener;

import com.netflix.hollow.api.common.EventListener;

/**
* The top-level type for all producer listeners.
* The top-level type for all producer-specific listeners.
*/
public interface HollowProducerEventListener {

public interface HollowProducerEventListener extends EventListener {
}

0 comments on commit b080244

Please sign in to comment.