Skip to content

Commit

Permalink
improve: ensure unique name on event sources (#2370)
Browse files Browse the repository at this point in the history
* improve: ensure unique name on event sources

Signed-off-by: Attila Mészáros <csviri@gmail.com>

* fix

Signed-off-by: Attila Mészáros <csviri@gmail.com>

---------

Signed-off-by: Attila Mészáros <csviri@gmail.com>
  • Loading branch information
csviri committed May 17, 2024
1 parent 2342192 commit 90f79a9
Show file tree
Hide file tree
Showing 4 changed files with 35 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ public <R> List<EventSource<R, P>> getEventSourcesFor(Class<R> dependentType) {
@Override
public <R> EventSource<R, P> dynamicallyRegisterEventSource(EventSource<R, P> eventSource) {
synchronized (this) {
var actual = eventSources.existingEventSourceOfSameNameAndType(eventSource);
var actual = eventSources.existingEventSourceByName(eventSource.name());
if (actual != null) {
eventSource = actual;
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,37 @@ class EventSources<P extends HasMetadata> {

private final ConcurrentNavigableMap<String, Map<String, EventSource<?, P>>> sources =
new ConcurrentSkipListMap<>();
private final Map<String, EventSource> sourceByName = new HashMap<>();

private final TimerEventSource<P> retryAndRescheduleTimerEventSource =
new TimerEventSource<>("RetryAndRescheduleTimerEventSource");
private ControllerEventSource<P> controllerEventSource;

public void add(EventSource eventSource) {
final var name = eventSource.name();
var existing = sourceByName.get(name);
if (existing != null) {
throw new IllegalArgumentException("Event source " + existing
+ " is already registered with name: " + name);
}
sourceByName.put(name, eventSource);
sources.computeIfAbsent(keyFor(eventSource), k -> new HashMap<>()).put(name, eventSource);
}

public EventSource remove(String name) {
var optionalMap = sources.values().stream().filter(m -> m.containsKey(name)).findFirst();
sourceByName.remove(name);
return optionalMap.map(m -> m.remove(name)).orElse(null);
}

public void clear() {
sources.clear();
sourceByName.clear();
}

public EventSource existingEventSourceByName(String name) {
return sourceByName.get(name);
}

void createControllerEventSource(Controller<P> controller) {
controllerEventSource = new ControllerEventSource<>(controller);
Expand Down Expand Up @@ -54,30 +81,7 @@ Stream<EventSource<?, P>> flatMappedSources() {
return sources.values().stream().flatMap(c -> c.values().stream());
}

public void clear() {
sources.clear();
}

@SuppressWarnings("unchecked")
public <R> EventSource<R, P> existingEventSourceOfSameNameAndType(EventSource<R, P> source) {
return (EventSource<R, P>) existingEventSourcesOfSameType(source).get(source.name());
}

private <R> Map<String, EventSource<?, P>> existingEventSourcesOfSameType(
EventSource<R, P> source) {
return sources.getOrDefault(keyFor(source), Collections.emptyMap());
}

public <R> void add(EventSource<R, P> eventSource) {
final var name = eventSource.name();
final var existing = existingEventSourcesOfSameType(eventSource);
if (existing.get(name) != null) {
throw new IllegalArgumentException("Event source " + existing
+ " is already registered with name: " + name);
}

sources.computeIfAbsent(keyFor(eventSource), k -> new HashMap<>()).put(name, eventSource);
}

private <R> String keyFor(EventSource<R, P> source) {
return keyFor(source.resourceType());
Expand Down Expand Up @@ -145,10 +149,4 @@ public <S> List<EventSource<S, P>> getEventSources(Class<S> dependentType) {
return sourcesForType.values().stream()
.map(es -> (EventSource<S, P>) es).toList();
}

@SuppressWarnings("rawtypes")
public EventSource remove(String name) {
var optionalMap = sources.values().stream().filter(m -> m.containsKey(name)).findFirst();
return optionalMap.map(m -> m.remove(name)).orElse(null);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,11 @@ public void registersEventSource() {
@Test
public void closeShouldCascadeToEventSources() {
EventSource eventSource = mock(EventSource.class);
when(eventSource.name()).thenReturn("name1");
when(eventSource.resourceType()).thenReturn(EventSource.class);

EventSource eventSource2 = mock(TimerEventSource.class);
when(eventSource2.name()).thenReturn("name2");
when(eventSource2.resourceType()).thenReturn(AbstractEventSource.class);

eventSourceManager.registerEventSource(eventSource);
Expand All @@ -65,11 +68,12 @@ public void closeShouldCascadeToEventSources() {
public void startCascadesToEventSources() {
EventSource eventSource = mock(EventSource.class);
when(eventSource.priority()).thenReturn(EventSourceStartPriority.DEFAULT);
when(eventSource.name()).thenReturn("name1");
when(eventSource.resourceType()).thenReturn(EventSource.class);
EventSource eventSource2 = mock(TimerEventSource.class);
when(eventSource2.priority()).thenReturn(EventSourceStartPriority.DEFAULT);
when(eventSource2.name()).thenReturn("name2");
when(eventSource2.resourceType()).thenReturn(AbstractEventSource.class);

eventSourceManager.registerEventSource(eventSource);
eventSourceManager.registerEventSource(eventSource2);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,15 +134,15 @@ void getShouldWork() {
eventSourceMockWithName(EventSource.class, "name1", HasMetadata.class);
final var mock2 =
eventSourceMockWithName(EventSource.class, "name2", HasMetadata.class);
final var mock3 = eventSourceMockWithName(EventSource.class, "name2", ConfigMap.class);
final var mock3 = eventSourceMockWithName(EventSource.class, "name3", ConfigMap.class);

eventSources.add(mock1);
eventSources.add(mock2);
eventSources.add(mock3);

assertEquals(mock1, eventSources.get(HasMetadata.class, "name1"));
assertEquals(mock2, eventSources.get(HasMetadata.class, "name2"));
assertEquals(mock3, eventSources.get(ConfigMap.class, "name2"));
assertEquals(mock3, eventSources.get(ConfigMap.class, "name3"));
assertEquals(mock3, eventSources.get(ConfigMap.class, null));


Expand Down

0 comments on commit 90f79a9

Please sign in to comment.