From b080244ad3255c0de4e015c8415c7b1c50736f66 Mon Sep 17 00:00:00 2001 From: Sunjeet Date: Thu, 6 Feb 2020 14:50:59 -0800 Subject: [PATCH] Producer Listener support refactored for code reuse (#448) * Producer Listener support refactored * Address PR feedback --- .../hollow/api/common/EventListener.java | 7 ++ .../hollow/api/common/ListenerSupport.java | 30 +++++++++ .../netflix/hollow/api/common/Listeners.java | 44 ++++++++++++ .../api/producer/AbstractHollowProducer.java | 29 ++++---- .../producer/HollowIncrementalProducer.java | 4 +- .../hollow/api/producer/HollowProducer.java | 4 +- ...port.java => ProducerListenerSupport.java} | 67 ++++--------------- .../listener/HollowProducerEventListener.java | 7 +- .../api/producer/HollowProducerTest.java | 2 +- ....java => ProducerListenerSupportTest.java} | 18 ++--- 10 files changed, 128 insertions(+), 84 deletions(-) create mode 100644 hollow/src/main/java/com/netflix/hollow/api/common/EventListener.java create mode 100644 hollow/src/main/java/com/netflix/hollow/api/common/ListenerSupport.java create mode 100644 hollow/src/main/java/com/netflix/hollow/api/common/Listeners.java rename hollow/src/main/java/com/netflix/hollow/api/producer/{ListenerSupport.java => ProducerListenerSupport.java} (85%) rename hollow/src/test/java/com/netflix/hollow/api/producer/{ListenerSupportTest.java => ProducerListenerSupportTest.java} (95%) diff --git a/hollow/src/main/java/com/netflix/hollow/api/common/EventListener.java b/hollow/src/main/java/com/netflix/hollow/api/common/EventListener.java new file mode 100644 index 0000000000..354ad77ad6 --- /dev/null +++ b/hollow/src/main/java/com/netflix/hollow/api/common/EventListener.java @@ -0,0 +1,7 @@ +package com.netflix.hollow.api.common; + +/** + * The top-level type for all listeners. + */ +public interface EventListener { +} diff --git a/hollow/src/main/java/com/netflix/hollow/api/common/ListenerSupport.java b/hollow/src/main/java/com/netflix/hollow/api/common/ListenerSupport.java new file mode 100644 index 0000000000..0fa6e3e1c7 --- /dev/null +++ b/hollow/src/main/java/com/netflix/hollow/api/common/ListenerSupport.java @@ -0,0 +1,30 @@ +package com.netflix.hollow.api.common; + +import java.util.List; +import java.util.concurrent.CopyOnWriteArrayList; + +public class ListenerSupport { + + protected final CopyOnWriteArrayList eventListeners; + + public ListenerSupport() { + eventListeners = new CopyOnWriteArrayList<>(); + } + + public ListenerSupport(List listeners) { + eventListeners = new CopyOnWriteArrayList<>(listeners); + } + + public ListenerSupport(ListenerSupport that) { + eventListeners = new CopyOnWriteArrayList<>(that.eventListeners); + } + + public void addListener(EventListener listener) { + eventListeners.addIfAbsent(listener); + } + + public void removeListener(EventListener listener) { + eventListeners.remove(listener); + } + +} diff --git a/hollow/src/main/java/com/netflix/hollow/api/common/Listeners.java b/hollow/src/main/java/com/netflix/hollow/api/common/Listeners.java new file mode 100644 index 0000000000..e96743c407 --- /dev/null +++ b/hollow/src/main/java/com/netflix/hollow/api/common/Listeners.java @@ -0,0 +1,44 @@ +package com.netflix.hollow.api.common; + +import com.netflix.hollow.api.producer.listener.VetoableListener; +import java.util.Arrays; +import java.util.function.Consumer; +import java.util.logging.Level; +import java.util.logging.Logger; +import java.util.stream.Stream; + +public abstract class Listeners { + + private static final Logger LOG = Logger.getLogger(Listeners.class.getName()); + + protected final EventListener[] listeners; + + protected Listeners(EventListener[] listeners) { + this.listeners = listeners; + } + + public Stream getListeners(Class c) { + return Arrays.stream(listeners).filter(c::isInstance).map(c::cast); + } + + protected void fire( + Class c, Consumer r) { + fireStream(getListeners(c), r); + } + + protected void fireStream( + Stream s, Consumer r) { + s.forEach(l -> { + try { + r.accept(l); + } catch (VetoableListener.ListenerVetoException e) { + throw e; + } catch (RuntimeException e) { + if (l instanceof VetoableListener) { + throw e; + } + LOG.log(Level.WARNING, "Error executing listener", e); + } + }); + } +} diff --git a/hollow/src/main/java/com/netflix/hollow/api/producer/AbstractHollowProducer.java b/hollow/src/main/java/com/netflix/hollow/api/producer/AbstractHollowProducer.java index 89cea207fe..9f5e9338d5 100644 --- a/hollow/src/main/java/com/netflix/hollow/api/producer/AbstractHollowProducer.java +++ b/hollow/src/main/java/com/netflix/hollow/api/producer/AbstractHollowProducer.java @@ -16,6 +16,7 @@ */ package com.netflix.hollow.api.producer; +import static com.netflix.hollow.api.producer.ProducerListenerSupport.ProducerListeners; import static java.lang.System.currentTimeMillis; import static java.util.stream.Collectors.toList; @@ -69,7 +70,7 @@ abstract class AbstractHollowProducer { final HollowProducer.BlobStorageCleaner blobStorageCleaner; HollowObjectMapper objectMapper; final HollowProducer.VersionMinter versionMinter; - final ListenerSupport listeners; + final ProducerListenerSupport listeners; ReadStateHelper readStates; final Executor snapshotPublishExecutor; final int numStatesBetweenSnapshots; @@ -143,7 +144,7 @@ private AbstractHollowProducer( this.readStates = ReadStateHelper.newDeltaChain(); this.blobStorageCleaner = blobStorageCleaner; - this.listeners = new ListenerSupport(eventListeners.stream().distinct().collect(toList())); + this.listeners = new ProducerListenerSupport(eventListeners.stream().distinct().collect(toList())); this.metrics = new HollowProducerMetrics(); this.metricsCollector = metricsCollector; @@ -258,7 +259,7 @@ private HollowProducer.ReadState restore( } HollowProducer.ReadState readState = null; - ListenerSupport.Listeners localListeners = listeners.listeners(); + ProducerListeners localListeners = listeners.listeners(); Status.RestoreStageBuilder status = localListeners.fireProducerRestoreStart(versionDesired); try { if (versionDesired != HollowConstants.VERSION_NONE) { @@ -318,7 +319,7 @@ public boolean enablePrimaryProducer(boolean doEnable) { } long runCycle(HollowProducer.Incremental.IncrementalPopulator incrementalPopulator, HollowProducer.Populator populator) { - ListenerSupport.Listeners localListeners = listeners.listeners(); + ProducerListeners localListeners = listeners.listeners(); if (!singleProducerEnforcer.isPrimary()) { // TODO: minimum time spacing between cycles @@ -346,7 +347,7 @@ long runCycle(HollowProducer.Incremental.IncrementalPopulator incrementalPopulat } long runCycle( - ListenerSupport.Listeners listeners, + ProducerListeners listeners, HollowProducer.Incremental.IncrementalPopulator incrementalPopulator, HollowProducer.Populator populator, Status.StageWithStateBuilder cycleStatus, long toVersion) { // 1. Begin a new cycle @@ -480,7 +481,7 @@ public void removeListener(HollowProducerEventListener listener) { } void populate( - ListenerSupport.Listeners listeners, + ProducerListeners listeners, HollowProducer.Incremental.IncrementalPopulator incrementalPopulator, HollowProducer.Populator populator, long toVersion) throws Exception { assert incrementalPopulator != null ^ populator != null; @@ -508,7 +509,7 @@ void populate( } HollowProducer.Populator incrementalPopulate( - ListenerSupport.Listeners listeners, + ProducerListeners listeners, HollowProducer.Incremental.IncrementalPopulator incrementalPopulator, long toVersion) throws Exception { ConcurrentHashMap events = new ConcurrentHashMap<>(); @@ -534,7 +535,7 @@ HollowProducer.Populator incrementalPopulate( /* * Publish the write state, storing the artifacts in the provided object. Visible for testing. */ - void publish(ListenerSupport.Listeners listeners, long toVersion, Artifacts artifacts) throws IOException { + void publish(ProducerListeners listeners, long toVersion, Artifacts artifacts) throws IOException { Status.StageBuilder psb = listeners.firePublishStart(toVersion); try { if(!readStates.hasCurrent() || doIntegrityCheck || numStatesUntilNextSnapshot <= 0) @@ -576,7 +577,7 @@ void publish(ListenerSupport.Listeners listeners, long toVersion, Artifacts arti } } - private HollowProducer.Blob stageBlob(ListenerSupport.Listeners listeners, HollowProducer.Blob blob) + private HollowProducer.Blob stageBlob(ProducerListeners listeners, HollowProducer.Blob blob) throws IOException { Status.PublishBuilder builder = new Status.PublishBuilder(); HollowBlobWriter writer = new HollowBlobWriter(getWriteEngine()); @@ -593,7 +594,7 @@ private HollowProducer.Blob stageBlob(ListenerSupport.Listeners listeners, Hollo } } - private void publishBlob(ListenerSupport.Listeners listeners, HollowProducer.Blob blob) { + private void publishBlob(ProducerListeners listeners, HollowProducer.Blob blob) { Status.PublishBuilder builder = new Status.PublishBuilder(); try { builder.blob(blob); @@ -611,7 +612,7 @@ private void publishBlob(ListenerSupport.Listeners listeners, HollowProducer.Blo } } - private void publishSnapshotBlobAsync(ListenerSupport.Listeners listeners, Artifacts artifacts) { + private void publishSnapshotBlobAsync(ProducerListeners listeners, Artifacts artifacts) { HollowProducer.Blob blob = artifacts.snapshot; CompletableFuture cf = new CompletableFuture<>(); try { @@ -669,7 +670,7 @@ private void publishBlob(HollowProducer.Blob b) { * @return S(cur) and S(pnd) */ private ReadStateHelper checkIntegrity( - ListenerSupport.Listeners listeners, ReadStateHelper readStates, Artifacts artifacts, + ProducerListeners listeners, ReadStateHelper readStates, Artifacts artifacts, boolean schemaChangedFromPriorVersion) throws Exception { Status.StageWithStateBuilder status = listeners.fireIntegrityCheckStart(readStates.pending()); try { @@ -764,7 +765,7 @@ private void applyDelta(HollowProducer.Blob blob, HollowReadStateEngine stateEng } } - private void validate(ListenerSupport.Listeners listeners, HollowProducer.ReadState readState) { + private void validate(ProducerListeners listeners, HollowProducer.ReadState readState) { Status.StageWithStateBuilder psb = listeners.fireValidationStart(readState); ValidationStatus status = null; @@ -796,7 +797,7 @@ private void validate(ListenerSupport.Listeners listeners, HollowProducer.ReadSt } - private void announce(ListenerSupport.Listeners listeners, HollowProducer.ReadState readState) { + private void announce(ProducerListeners listeners, HollowProducer.ReadState readState) { if (announcer != null) { Status.StageWithStateBuilder status = listeners.fireAnnouncementStart(readState); try { diff --git a/hollow/src/main/java/com/netflix/hollow/api/producer/HollowIncrementalProducer.java b/hollow/src/main/java/com/netflix/hollow/api/producer/HollowIncrementalProducer.java index 9c2a759e4e..c242ebaec9 100644 --- a/hollow/src/main/java/com/netflix/hollow/api/producer/HollowIncrementalProducer.java +++ b/hollow/src/main/java/com/netflix/hollow/api/producer/HollowIncrementalProducer.java @@ -46,7 +46,7 @@ public class HollowIncrementalProducer { private final HollowProducer producer; private final ConcurrentHashMap mutations; private final HollowProducer.Populator populator; - private final ListenerSupport listeners; + private final ProducerListenerSupport listeners; private final Map cycleMetadata; private final Class[] dataModel; private final HollowConsumer.AnnouncementWatcher announcementWatcher; @@ -70,7 +70,7 @@ protected HollowIncrementalProducer(HollowProducer producer, double threadsPerCp this.dataModel = classes; this.announcementWatcher = announcementWatcher; this.blobRetriever = blobRetriever; - this.listeners = new ListenerSupport(); + this.listeners = new ProducerListenerSupport(); this.cycleMetadata = new HashMap(); this.threadsPerCpu = threadsPerCpu; diff --git a/hollow/src/main/java/com/netflix/hollow/api/producer/HollowProducer.java b/hollow/src/main/java/com/netflix/hollow/api/producer/HollowProducer.java index 549036e717..7e977dc21e 100644 --- a/hollow/src/main/java/com/netflix/hollow/api/producer/HollowProducer.java +++ b/hollow/src/main/java/com/netflix/hollow/api/producer/HollowProducer.java @@ -693,7 +693,7 @@ public B withAnnouncer(HollowProducer.Announcer announcer) { * @throws IllegalArgumentException if the listener does not implement a recognized event listener type */ public B withListener(HollowProducerEventListener listener) { - if (!ListenerSupport.isValidListener(listener)) { + if (!ProducerListenerSupport.isValidListener(listener)) { throw new IllegalArgumentException( "Listener does not implement a recognized event listener type: " + listener); } @@ -711,7 +711,7 @@ public B withListener(HollowProducerEventListener listener) { */ public B withListeners(HollowProducerEventListener... listeners) { for (HollowProducerEventListener listener : listeners) { - if (!ListenerSupport.isValidListener(listener)) { + if (!ProducerListenerSupport.isValidListener(listener)) { throw new IllegalArgumentException( "Listener does not implement a recognized event listener type: " + listener); } diff --git a/hollow/src/main/java/com/netflix/hollow/api/producer/ListenerSupport.java b/hollow/src/main/java/com/netflix/hollow/api/producer/ProducerListenerSupport.java similarity index 85% rename from hollow/src/main/java/com/netflix/hollow/api/producer/ListenerSupport.java rename to hollow/src/main/java/com/netflix/hollow/api/producer/ProducerListenerSupport.java index 6a87e88aeb..62e87aa9f8 100644 --- a/hollow/src/main/java/com/netflix/hollow/api/producer/ListenerSupport.java +++ b/hollow/src/main/java/com/netflix/hollow/api/producer/ProducerListenerSupport.java @@ -19,6 +19,8 @@ import static java.util.concurrent.TimeUnit.MILLISECONDS; import static java.util.stream.Collectors.toList; +import com.netflix.hollow.api.common.ListenerSupport; +import com.netflix.hollow.api.common.Listeners; import com.netflix.hollow.api.producer.HollowProducerListener.ProducerStatus; import com.netflix.hollow.api.producer.IncrementalCycleListener.IncrementalCycleStatus; import com.netflix.hollow.api.producer.listener.AnnouncementListener; @@ -30,27 +32,24 @@ import com.netflix.hollow.api.producer.listener.PopulateListener; import com.netflix.hollow.api.producer.listener.PublishListener; import com.netflix.hollow.api.producer.listener.RestoreListener; -import com.netflix.hollow.api.producer.listener.VetoableListener; import com.netflix.hollow.api.producer.validation.ValidationStatus; import com.netflix.hollow.api.producer.validation.ValidationStatusListener; import com.netflix.hollow.api.producer.validation.ValidatorListener; import java.time.Duration; -import java.util.Arrays; import java.util.Collection; import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CopyOnWriteArraySet; import java.util.function.Consumer; import java.util.logging.Level; import java.util.logging.Logger; import java.util.stream.Stream; -final class ListenerSupport { +final class ProducerListenerSupport extends ListenerSupport { - private static final Logger LOG = Logger.getLogger(ListenerSupport.class.getName()); + private static final Logger LOG = Logger.getLogger(ProducerListenerSupport.class.getName()); private static final Collection> LISTENERS = Stream.of(DataModelInitializationListener.class, @@ -68,40 +67,28 @@ static boolean isValidListener(HollowProducerEventListener l) { return LISTENERS.stream().anyMatch(c -> c.isInstance(l)); } - private final CopyOnWriteArrayList eventListeners; - - ListenerSupport() { - eventListeners = new CopyOnWriteArrayList<>(); - + ProducerListenerSupport() { // @@@ This is used only by HollowIncrementalProducer, and should be // separated out incrementalCycleListeners = new CopyOnWriteArraySet<>(); } - ListenerSupport(List listeners) { - eventListeners = new CopyOnWriteArrayList<>(listeners); + ProducerListenerSupport(List listeners) { + super(listeners); // @@@ This is used only by HollowIncrementalProducer, and should be // separated out incrementalCycleListeners = new CopyOnWriteArraySet<>(); } - ListenerSupport(ListenerSupport that) { - eventListeners = new CopyOnWriteArrayList<>(that.eventListeners); + ProducerListenerSupport(ProducerListenerSupport that) { + super(that); // @@@ This is used only by HollowIncrementalProducer, and should be // separated out incrementalCycleListeners = new CopyOnWriteArraySet<>(that.incrementalCycleListeners); } - void addListener(HollowProducerEventListener listener) { - eventListeners.addIfAbsent(listener); - } - - void removeListener(HollowProducerEventListener listener) { - eventListeners.remove(listener); - } - // /** @@ -109,40 +96,14 @@ void removeListener(HollowProducerEventListener listener) { * From the returned copy events may be fired. * Any addition or removal of listeners will take effect on the next cycle. */ - Listeners listeners() { - return new Listeners(eventListeners.toArray(new HollowProducerEventListener[0])); + ProducerListeners listeners() { + return new ProducerListeners(eventListeners.toArray(new HollowProducerEventListener[0])); } - static final class Listeners { - final HollowProducerEventListener[] listeners; - - Listeners(HollowProducerEventListener[] listeners) { - this.listeners = listeners; - } - - Stream getListeners(Class c) { - return Arrays.stream(listeners).filter(c::isInstance).map(c::cast); - } - - private void fire( - Class c, Consumer r) { - fireStream(getListeners(c), r); - } + static final class ProducerListeners extends Listeners { - private void fireStream( - Stream s, Consumer r) { - s.forEach(l -> { - try { - r.accept(l); - } catch (VetoableListener.ListenerVetoException e) { - throw e; - } catch (RuntimeException e) { - if (l instanceof VetoableListener) { - throw e; - } - LOG.log(Level.WARNING, "Error executing listener", e); - } - }); + ProducerListeners(HollowProducerEventListener[] listeners) { + super(listeners); } void fireProducerInit(long elapsedMillis) { diff --git a/hollow/src/main/java/com/netflix/hollow/api/producer/listener/HollowProducerEventListener.java b/hollow/src/main/java/com/netflix/hollow/api/producer/listener/HollowProducerEventListener.java index 32b0c36d25..d072feeffb 100644 --- a/hollow/src/main/java/com/netflix/hollow/api/producer/listener/HollowProducerEventListener.java +++ b/hollow/src/main/java/com/netflix/hollow/api/producer/listener/HollowProducerEventListener.java @@ -16,9 +16,10 @@ */ package com.netflix.hollow.api.producer.listener; +import com.netflix.hollow.api.common.EventListener; + /** - * The top-level type for all producer listeners. + * The top-level type for all producer-specific listeners. */ -public interface HollowProducerEventListener { - +public interface HollowProducerEventListener extends EventListener { } diff --git a/hollow/src/test/java/com/netflix/hollow/api/producer/HollowProducerTest.java b/hollow/src/test/java/com/netflix/hollow/api/producer/HollowProducerTest.java index 73a924a371..ddbdb3a907 100644 --- a/hollow/src/test/java/com/netflix/hollow/api/producer/HollowProducerTest.java +++ b/hollow/src/test/java/com/netflix/hollow/api/producer/HollowProducerTest.java @@ -275,7 +275,7 @@ public void testRollsBackStateEngineOnPublishFailure() throws Exception { Assert.assertEquals("Should have no populated ordinals", 0, producer.getWriteEngine().getTypeState("TestPojo").getPopulatedBitSet().cardinality()); doThrow(new RuntimeException("Publish failed")).when(producer).publish( - any(ListenerSupport.Listeners.class), any(Long.class), any(AbstractHollowProducer.Artifacts.class)); + any(ProducerListenerSupport.ProducerListeners.class), any(Long.class), any(AbstractHollowProducer.Artifacts.class)); try { producer.runCycle(newState -> newState.add(new TestPojoV1(1, 1))); } catch (RuntimeException e) { // expected diff --git a/hollow/src/test/java/com/netflix/hollow/api/producer/ListenerSupportTest.java b/hollow/src/test/java/com/netflix/hollow/api/producer/ProducerListenerSupportTest.java similarity index 95% rename from hollow/src/test/java/com/netflix/hollow/api/producer/ListenerSupportTest.java rename to hollow/src/test/java/com/netflix/hollow/api/producer/ProducerListenerSupportTest.java index 6710a0b2fc..23c8e36d72 100644 --- a/hollow/src/test/java/com/netflix/hollow/api/producer/ListenerSupportTest.java +++ b/hollow/src/test/java/com/netflix/hollow/api/producer/ProducerListenerSupportTest.java @@ -29,12 +29,12 @@ import org.mockito.Mockito; import org.mockito.MockitoAnnotations; -public class ListenerSupportTest { +public class ProducerListenerSupportTest { interface ProducerAndValidationStatusListener extends HollowProducerListener, ValidationStatusListener { } - private ListenerSupport listenerSupport; + private ProducerListenerSupport listenerSupport; @Mock private HollowProducerListener listener; @@ -46,7 +46,7 @@ interface ProducerAndValidationStatusListener @Before public void setUp() { MockitoAnnotations.initMocks(this); - listenerSupport = new ListenerSupport(); + listenerSupport = new ProducerListenerSupport(); listenerSupport.addListener(listener); listenerSupport.addListener(validationStatusListener); listenerSupport.addListener(producerAndValidationStatusListener); @@ -54,12 +54,12 @@ public void setUp() { @Test public void testDuplicates() { - ListenerSupport ls = new ListenerSupport(); + ProducerListenerSupport ls = new ProducerListenerSupport(); CycleListener l = Mockito.mock(CycleListener.class); ls.addListener(l); ls.addListener(l); - ListenerSupport.Listeners s = ls.listeners(); + ProducerListenerSupport.ProducerListeners s = ls.listeners(); s.fireCycleStart(1); Mockito.verify(l, Mockito.times(1)).onCycleStart(1); @@ -67,7 +67,7 @@ public void testDuplicates() { @Test public void testAddDuringCycle() { - ListenerSupport ls = new ListenerSupport(); + ProducerListenerSupport ls = new ProducerListenerSupport(); class SecondCycleListener implements CycleListener { int cycleStart; @@ -100,7 +100,7 @@ class FirstCycleListener extends SecondCycleListener { FirstCycleListener fcl = new FirstCycleListener(); ls.addListener(fcl); - ListenerSupport.Listeners s = ls.listeners(); + ProducerListenerSupport.ProducerListeners s = ls.listeners(); s.fireCycleStart(1); s.fireCycleComplete(new Status.StageWithStateBuilder()); @@ -121,7 +121,7 @@ class FirstCycleListener extends SecondCycleListener { @Test public void testRemoveDuringCycle() { - ListenerSupport ls = new ListenerSupport(); + ProducerListenerSupport ls = new ProducerListenerSupport(); class SecondCycleListener implements CycleListener { int cycleStart; @@ -160,7 +160,7 @@ private FirstCycleListener(SecondCycleListener scl) { ls.addListener(fcl); ls.addListener(scl); - ListenerSupport.Listeners s = ls.listeners(); + ProducerListenerSupport.ProducerListeners s = ls.listeners(); s.fireCycleStart(1); s.fireCycleComplete(new Status.StageWithStateBuilder());