-
Notifications
You must be signed in to change notification settings - Fork 116
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Events notification system for Nessie - Quarkus
This commit introduces a new module, nessie-events-quarkus. To improve isolation and facilitate testing, this new module is completely independent of other Quarkus modules; it contains: 1. Quarkus-specific implementations of nessie-events-service classes; 2. Asynchronous delivery based on Vert.x, both non-blocking and blocking; 3. Delivery with optional logging, tracing and metrics. Changes outside this module are minimal: 1. A few changes were introduced in nessie-events-api, nessie-events-spi and nessie-events-service. 2. nessie-quarkus-common: 1. ConfigurableVersionStoreFactory changed to detect when events are enabled; 2. New RepositoryIdProvider. 3. nessie-quarkus (server): 1. Compile-time dependency on nessie-events-quarkus; 2. Smoke tests.
- Loading branch information
Showing
74 changed files
with
6,182 additions
and
215 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,10 @@ | ||
# Nessie Events Quarkus | ||
|
||
This module contains the Quarkus-specific implementation of the Nessie events notification system. | ||
|
||
To improve isolation and facilitate testing, this module is completely independent of other | ||
Quarkus modules; it contains: | ||
|
||
1. Quarkus-specific implementations of nessie-events-service classes; | ||
2. Asynchronous delivery based on Vert.x, both non-blocking and blocking; | ||
3. Delivery with optional logging, tracing and metrics. |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,79 @@ | ||
/* | ||
* Copyright (C) 2022 Dremio | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
plugins { | ||
`java-library` | ||
jacoco | ||
`maven-publish` | ||
signing | ||
alias(libs.plugins.quarkus) | ||
`nessie-conventions` | ||
} | ||
|
||
extra["maven.name"] = "Nessie - Events - Quarkus" | ||
|
||
dependencies { | ||
implementation(project(":nessie-versioned-spi")) | ||
implementation(project(":nessie-events-api")) | ||
implementation(project(":nessie-events-spi")) | ||
implementation(project(":nessie-events-service")) | ||
|
||
// Quarkus | ||
implementation(enforcedPlatform(libs.quarkus.bom)) | ||
implementation("io.quarkus:quarkus-vertx") | ||
|
||
// Metrics | ||
implementation("io.micrometer:micrometer-core") | ||
|
||
// OpenTelemetry | ||
implementation(platform(libs.opentelemetry.bom)) | ||
implementation(libs.opentelemetry.api) | ||
implementation(libs.opentelemetry.semconv) | ||
|
||
// Jackson | ||
compileOnly(platform(libs.jackson.bom)) | ||
compileOnly(libs.jackson.annotations) | ||
|
||
testImplementation(project(":nessie-model")) | ||
|
||
testImplementation(enforcedPlatform(libs.quarkus.bom)) | ||
testImplementation("io.quarkus:quarkus-opentelemetry") | ||
testImplementation("io.quarkus:quarkus-micrometer") | ||
testImplementation("io.quarkus:quarkus-micrometer-registry-prometheus") | ||
testImplementation("io.quarkus:quarkus-junit5") | ||
testImplementation("io.quarkus:quarkus-junit5-mockito") | ||
testImplementation("io.quarkus:quarkus-jacoco") | ||
|
||
testImplementation(libs.opentelemetry.sdk.trace) | ||
|
||
testImplementation(platform(libs.junit.bom)) | ||
testImplementation(libs.bundles.junit.testing) | ||
testImplementation(libs.awaitility) | ||
|
||
testCompileOnly(platform(libs.jackson.bom)) | ||
testCompileOnly(libs.jackson.annotations) | ||
testCompileOnly(libs.microprofile.openapi) | ||
} | ||
|
||
buildForJava11() | ||
|
||
listOf("javadoc", "sourcesJar").forEach { name -> | ||
tasks.named(name) { dependsOn(tasks.named("compileQuarkusGeneratedSourcesJava")) } | ||
} | ||
|
||
listOf("checkstyleTest", "compileTestJava").forEach { name -> | ||
tasks.named(name) { dependsOn(tasks.named("compileQuarkusTestGeneratedSourcesJava")) } | ||
} |
30 changes: 30 additions & 0 deletions
30
events/quarkus/src/main/java/org/projectnessie/events/quarkus/QuarkusEventFactory.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,30 @@ | ||
/* | ||
* Copyright (C) 2023 Dremio | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
package org.projectnessie.events.quarkus; | ||
|
||
import javax.enterprise.context.Dependent; | ||
import javax.inject.Inject; | ||
import org.projectnessie.events.service.EventConfig; | ||
import org.projectnessie.events.service.EventFactory; | ||
|
||
@Dependent | ||
public class QuarkusEventFactory extends EventFactory { | ||
|
||
@Inject | ||
public QuarkusEventFactory(EventConfig config) { | ||
super(config); | ||
} | ||
} |
129 changes: 129 additions & 0 deletions
129
events/quarkus/src/main/java/org/projectnessie/events/quarkus/QuarkusEventService.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,129 @@ | ||
/* | ||
* Copyright (C) 2023 Dremio | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
package org.projectnessie.events.quarkus; | ||
|
||
import static org.projectnessie.events.quarkus.config.VersionStoreConfigConstants.NESSIE_VERSION_STORE_EVENTS_ENABLE; | ||
|
||
import io.quarkus.arc.properties.IfBuildProperty; | ||
import io.quarkus.runtime.ShutdownEvent; | ||
import io.quarkus.runtime.StartupEvent; | ||
import io.quarkus.vertx.ConsumeEvent; | ||
import io.vertx.core.Handler; | ||
import io.vertx.core.eventbus.DeliveryOptions; | ||
import io.vertx.core.eventbus.EventBus; | ||
import io.vertx.core.eventbus.Message; | ||
import io.vertx.core.eventbus.MessageConsumer; | ||
import java.util.Map; | ||
import javax.enterprise.context.ApplicationScoped; | ||
import javax.enterprise.event.Observes; | ||
import javax.inject.Inject; | ||
import org.projectnessie.events.api.Event; | ||
import org.projectnessie.events.api.EventType; | ||
import org.projectnessie.events.quarkus.delivery.EventDelivery; | ||
import org.projectnessie.events.quarkus.delivery.EventDeliveryFactory; | ||
import org.projectnessie.events.service.EventConfig; | ||
import org.projectnessie.events.service.EventFactory; | ||
import org.projectnessie.events.service.EventService; | ||
import org.projectnessie.events.service.EventSubscribers; | ||
import org.projectnessie.events.service.VersionStoreEvent; | ||
import org.projectnessie.events.spi.EventSubscriber; | ||
import org.projectnessie.events.spi.EventSubscription; | ||
|
||
@ApplicationScoped | ||
@IfBuildProperty(name = NESSIE_VERSION_STORE_EVENTS_ENABLE, stringValue = "true") | ||
public class QuarkusEventService extends EventService { | ||
|
||
/** | ||
* The local event bus address used to exchange messages of type {@link VersionStoreEvent} between | ||
* {@link org.projectnessie.events.quarkus.collector.QuarkusResultCollector} and {@link | ||
* QuarkusEventService}. | ||
*/ | ||
public static final String NESSIE_EVENTS_SERVICE_ADDR = "nessie.events.service"; | ||
|
||
/** | ||
* The prefix for all local event bus addresses used to exchange messages of type {@link Event} | ||
* between {@link QuarkusEventService} and {@link EventSubscriber}s. | ||
* | ||
* <p>Addresses are of the form {@code nessie.events.<event-type>}, where {@code <event-type>} is | ||
* the {@link EventType} name, e.g. {@code nessie.events.COMMIT}. | ||
*/ | ||
public static final String NESSIE_EVENTS_SUBSCRIBERS_ADDR_PREFIX = "nessie.events.subscribers."; | ||
|
||
private final EventBus bus; | ||
private final EventDeliveryFactory deliveryFactory; | ||
private final DeliveryOptions deliveryOptions; | ||
|
||
// Mandatory for CDI. | ||
@SuppressWarnings("unused") | ||
public QuarkusEventService() { | ||
this(null, null, null, null, null, null); | ||
} | ||
|
||
@Inject | ||
public QuarkusEventService( | ||
EventConfig config, | ||
EventFactory factory, | ||
EventSubscribers subscribers, | ||
EventBus bus, | ||
EventDeliveryFactory deliveryFactory, | ||
DeliveryOptions deliveryOptions) { | ||
super(config, factory, subscribers); | ||
this.bus = bus; | ||
this.deliveryFactory = deliveryFactory; | ||
this.deliveryOptions = deliveryOptions; | ||
} | ||
|
||
public void onStartup(@Observes StartupEvent event) { | ||
start(); | ||
for (Map.Entry<EventSubscription, EventSubscriber> entry : | ||
subscribers.getSubscriptions().entrySet()) { | ||
EventSubscription subscription = entry.getKey(); | ||
EventSubscriber subscriber = entry.getValue(); | ||
Handler<Message<Event>> handler = e -> deliverEvent(e.body(), subscriber, subscription); | ||
for (EventType eventType : EventType.values()) { | ||
if (subscriber.accepts(eventType)) { | ||
String address = NESSIE_EVENTS_SUBSCRIBERS_ADDR_PREFIX + eventType; | ||
MessageConsumer<Event> consumer = bus.localConsumer(address); | ||
consumer.handler(handler); | ||
} | ||
} | ||
} | ||
} | ||
|
||
public void onShutdown(@Observes ShutdownEvent event) { | ||
close(); | ||
} | ||
|
||
@ConsumeEvent(NESSIE_EVENTS_SERVICE_ADDR) | ||
@Override | ||
public void onVersionStoreEvent(VersionStoreEvent event) { | ||
super.onVersionStoreEvent(event); | ||
} | ||
|
||
@Override | ||
protected void fireEvent(Event event) { | ||
// Publish the event to all interested subscribers that are listening to this address. | ||
String address = NESSIE_EVENTS_SUBSCRIBERS_ADDR_PREFIX + event.getType(); | ||
bus.publish(address, event, deliveryOptions); | ||
} | ||
|
||
@Override | ||
protected void deliverEvent( | ||
Event event, EventSubscriber subscriber, EventSubscription subscription) { | ||
EventDelivery delivery = deliveryFactory.create(event, subscriber, subscription); | ||
delivery.start(); | ||
} | ||
} |
35 changes: 35 additions & 0 deletions
35
events/quarkus/src/main/java/org/projectnessie/events/quarkus/QuarkusEventSubscribers.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,35 @@ | ||
/* | ||
* Copyright (C) 2023 Dremio | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
package org.projectnessie.events.quarkus; | ||
|
||
import static org.projectnessie.events.quarkus.config.VersionStoreConfigConstants.NESSIE_VERSION_STORE_EVENTS_ENABLE; | ||
|
||
import io.quarkus.arc.properties.IfBuildProperty; | ||
import java.util.List; | ||
import javax.enterprise.context.ApplicationScoped; | ||
import org.projectnessie.events.service.EventSubscribers; | ||
import org.projectnessie.events.spi.EventSubscriber; | ||
|
||
@ApplicationScoped | ||
@IfBuildProperty(name = NESSIE_VERSION_STORE_EVENTS_ENABLE, stringValue = "true") | ||
public class QuarkusEventSubscribers extends EventSubscribers { | ||
|
||
private static final List<EventSubscriber> SUBSCRIBERS = loadSubscribers(); | ||
|
||
public QuarkusEventSubscribers() { | ||
super(SUBSCRIBERS); | ||
} | ||
} |
63 changes: 63 additions & 0 deletions
63
...c/main/java/org/projectnessie/events/quarkus/collector/QuarkusMetricsResultCollector.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,63 @@ | ||
/* | ||
* Copyright (C) 2023 Dremio | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
package org.projectnessie.events.quarkus.collector; | ||
|
||
import io.micrometer.core.instrument.MeterRegistry; | ||
import io.vertx.core.eventbus.DeliveryOptions; | ||
import io.vertx.core.eventbus.EventBus; | ||
import java.security.Principal; | ||
import org.projectnessie.events.service.EventSubscribers; | ||
import org.projectnessie.versioned.Result; | ||
|
||
public class QuarkusMetricsResultCollector extends QuarkusResultCollector { | ||
|
||
/** The total number of results collected from the Version Store, exposed as a counter. */ | ||
public static final String NESSIE_RESULTS_TOTAL = "nessie.results.total"; | ||
|
||
/** | ||
* The total number of results rejected by the collector, based on the event type filters exposed | ||
* by the subscribers. | ||
*/ | ||
public static final String NESSIE_RESULTS_REJECTED = "nessie.results.rejected"; | ||
|
||
private final MeterRegistry registry; | ||
|
||
public QuarkusMetricsResultCollector( | ||
EventSubscribers subscribers, | ||
String repositoryId, | ||
Principal principal, | ||
EventBus bus, | ||
DeliveryOptions options, | ||
MeterRegistry registry) { | ||
super(subscribers, repositoryId, principal, bus, options); | ||
this.registry = registry; | ||
} | ||
|
||
@Override | ||
public void accept(Result result) { | ||
registry.counter(NESSIE_RESULTS_TOTAL).increment(); | ||
super.accept(result); | ||
} | ||
|
||
@Override | ||
protected boolean shouldProcess(Result result) { | ||
boolean shouldProcess = super.shouldProcess(result); | ||
if (!shouldProcess) { | ||
registry.counter(NESSIE_RESULTS_REJECTED).increment(); | ||
} | ||
return shouldProcess; | ||
} | ||
} |
Oops, something went wrong.