Skip to content

Commit

Permalink
Events notification system for Nessie - Quarkus (#6870)
Browse files Browse the repository at this point in the history
This commit introduces a new module, nessie-events-quarkus.
To improve isolation and facilitate testing, this new module
is completely independent of other Quarkus modules; it
contains:

1. Quarkus-specific implementations of nessie-events-service
   classes;
2. Asynchronous delivery based on Vert.x, both non-blocking
   and blocking;
3. Delivery with optional logging, tracing and metrics.
  • Loading branch information
adutra committed Jun 1, 2023
1 parent 8f44feb commit d17a9ed
Show file tree
Hide file tree
Showing 73 changed files with 5,989 additions and 230 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ jobs:
- name: Gradle / test
uses: gradle/gradle-build-action@v2
with:
arguments: test :nessie-client:check -x :nessie-client:intTest -x :nessie-quarkus:test --scan
arguments: test :nessie-client:check -x :nessie-client:intTest -x :nessie-quarkus:test -x :nessie-events-quarkus:test --scan

- name: Capture Test Reports
uses: actions/upload-artifact@v3
Expand Down Expand Up @@ -170,7 +170,7 @@ jobs:
- name: Gradle / Test Quarkus
uses: gradle/gradle-build-action@v2
with:
arguments: :nessie-quarkus:test --scan
arguments: :nessie-quarkus:test :nessie-events-quarkus:test --scan

- name: Dump quarkus.log
if: ${{ failure() }}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
* @see ReferenceDeletedEvent
* @see ContentStoredEvent
* @see ContentRemovedEvent
* @see GenericEvent
*/
public interface Event {

Expand All @@ -46,6 +45,12 @@ public interface Event {
*/
UUID getId();

/** The id of the event, as a string for convenience. */
@Value.Lazy
default String getIdAsText() {
return getId().toString();
}

/**
* The id of the repository. This is configured on a per-instance basis.
*
Expand Down
10 changes: 10 additions & 0 deletions events/quarkus/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
# Nessie Events Quarkus

This module contains the Quarkus-specific implementation of the Nessie events notification system.

To improve isolation and facilitate testing, this module is completely independent of other
Quarkus modules; it contains:

1. Quarkus-specific implementations of nessie-events-service classes;
2. Asynchronous delivery based on Vert.x, both non-blocking and blocking;
3. Delivery with optional logging, tracing and metrics.
74 changes: 74 additions & 0 deletions events/quarkus/build.gradle.kts
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
/*
* Copyright (C) 2022 Dremio
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

plugins {
`java-library`
jacoco
`maven-publish`
signing
alias(libs.plugins.quarkus)
`nessie-conventions`
}

extra["maven.name"] = "Nessie - Events - Quarkus"

dependencies {
implementation(project(":nessie-versioned-spi"))
implementation(project(":nessie-events-api"))
implementation(project(":nessie-events-spi"))
implementation(project(":nessie-events-service"))

// Quarkus
implementation(enforcedPlatform(libs.quarkus.bom))
implementation("io.quarkus:quarkus-vertx")

// Metrics
implementation("io.micrometer:micrometer-core")

// OpenTelemetry
implementation("io.opentelemetry:opentelemetry-api")
implementation("io.opentelemetry:opentelemetry-semconv")

testImplementation(project(":nessie-model"))

testImplementation(enforcedPlatform(libs.quarkus.bom))
testImplementation("io.quarkus:quarkus-opentelemetry")
testImplementation("io.quarkus:quarkus-micrometer")
testImplementation("io.quarkus:quarkus-micrometer-registry-prometheus")
testImplementation("io.quarkus:quarkus-junit5")
testImplementation("io.quarkus:quarkus-junit5-mockito")
testImplementation("io.quarkus:quarkus-jacoco")

testImplementation("io.opentelemetry:opentelemetry-sdk-trace")

testImplementation(platform(libs.junit.bom))
testImplementation(libs.bundles.junit.testing)
testImplementation(libs.awaitility)

testCompileOnly(platform(libs.jackson.bom))
testCompileOnly("com.fasterxml.jackson.core:jackson-annotations")
testCompileOnly(libs.microprofile.openapi)
}

buildForJava11()

listOf("javadoc", "sourcesJar").forEach { name ->
tasks.named(name) { dependsOn(tasks.named("compileQuarkusGeneratedSourcesJava")) }
}

listOf("checkstyleTest", "compileTestJava").forEach { name ->
tasks.named(name) { dependsOn(tasks.named("compileQuarkusTestGeneratedSourcesJava")) }
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Copyright (C) 2023 Dremio
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.projectnessie.events.quarkus;

import javax.enterprise.context.Dependent;
import javax.inject.Inject;
import org.projectnessie.events.service.EventConfig;
import org.projectnessie.events.service.EventFactory;

@Dependent
public class QuarkusEventFactory extends EventFactory {

@Inject
public QuarkusEventFactory(EventConfig config) {
super(config);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
/*
* Copyright (C) 2023 Dremio
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.projectnessie.events.quarkus;

import io.quarkus.runtime.ShutdownEvent;
import io.quarkus.runtime.StartupEvent;
import io.quarkus.vertx.ConsumeEvent;
import io.vertx.core.Handler;
import io.vertx.core.eventbus.DeliveryOptions;
import io.vertx.core.eventbus.EventBus;
import io.vertx.core.eventbus.Message;
import io.vertx.core.eventbus.MessageConsumer;
import java.util.Map;
import javax.enterprise.context.ApplicationScoped;
import javax.enterprise.event.Observes;
import javax.inject.Inject;
import javax.inject.Named;
import org.projectnessie.events.api.Event;
import org.projectnessie.events.api.EventType;
import org.projectnessie.events.quarkus.config.EventBusConfigurer;
import org.projectnessie.events.quarkus.delivery.EventDelivery;
import org.projectnessie.events.quarkus.delivery.EventDeliveryFactory;
import org.projectnessie.events.service.EventConfig;
import org.projectnessie.events.service.EventFactory;
import org.projectnessie.events.service.EventService;
import org.projectnessie.events.service.EventSubscribers;
import org.projectnessie.events.service.VersionStoreEvent;
import org.projectnessie.events.spi.EventSubscriber;
import org.projectnessie.events.spi.EventSubscription;

@ApplicationScoped
public class QuarkusEventService extends EventService {

/**
* The local event bus address used to exchange messages of type {@link VersionStoreEvent} between
* {@link org.projectnessie.events.quarkus.collector.QuarkusResultCollector} and {@link
* QuarkusEventService}.
*/
public static final String NESSIE_EVENTS_SERVICE_ADDR = "nessie.events.service";

/**
* The prefix for all local event bus addresses used to exchange messages of type {@link Event}
* between {@link QuarkusEventService} and {@link EventSubscriber}s.
*
* <p>Addresses are of the form {@code nessie.events.<event-type>}, where {@code <event-type>} is
* the {@link EventType} name, e.g. {@code nessie.events.COMMIT}.
*/
public static final String NESSIE_EVENTS_SUBSCRIBERS_ADDR_PREFIX = "nessie.events.subscribers.";

private final EventBus bus;
private final EventDeliveryFactory deliveryFactory;
private final DeliveryOptions deliveryOptions;

// Mandatory for CDI.
@SuppressWarnings("unused")
public QuarkusEventService() {
this(null, null, null, null, null, null);
}

@Inject
public QuarkusEventService(
EventConfig config,
EventFactory factory,
EventSubscribers subscribers,
EventBus bus,
EventDeliveryFactory deliveryFactory,
@Named(EventBusConfigurer.EVENTS_DELIVERY_OPTIONS_BEAN_NAME)
DeliveryOptions deliveryOptions) {
super(config, factory, subscribers);
this.bus = bus;
this.deliveryFactory = deliveryFactory;
this.deliveryOptions = deliveryOptions;
}

public void onStartup(@Observes StartupEvent event) {
start();
for (Map.Entry<EventSubscription, EventSubscriber> entry :
subscribers.getSubscriptions().entrySet()) {
EventSubscription subscription = entry.getKey();
EventSubscriber subscriber = entry.getValue();
Handler<Message<Event>> handler = e -> deliverEvent(e.body(), subscriber, subscription);
for (EventType eventType : EventType.values()) {
if (subscriber.accepts(eventType)) {
String address = NESSIE_EVENTS_SUBSCRIBERS_ADDR_PREFIX + eventType;
MessageConsumer<Event> consumer = bus.localConsumer(address);
consumer.handler(handler);
}
}
}
}

public void onShutdown(@Observes ShutdownEvent event) {
close();
}

@ConsumeEvent(NESSIE_EVENTS_SERVICE_ADDR)
@Override
public void onVersionStoreEvent(VersionStoreEvent event) {
super.onVersionStoreEvent(event);
}

@Override
protected void fireEvent(Event event) {
// Publish the event to all interested subscribers that are listening to this address.
String address = NESSIE_EVENTS_SUBSCRIBERS_ADDR_PREFIX + event.getType();
bus.publish(address, event, deliveryOptions);
}

@Override
protected void deliverEvent(
Event event, EventSubscriber subscriber, EventSubscription subscription) {
EventDelivery delivery = deliveryFactory.create(event, subscriber, subscription);
delivery.start();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Copyright (C) 2023 Dremio
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.projectnessie.events.quarkus;

import java.util.List;
import javax.enterprise.context.ApplicationScoped;
import org.projectnessie.events.service.EventSubscribers;
import org.projectnessie.events.spi.EventSubscriber;

@ApplicationScoped
public class QuarkusEventSubscribers extends EventSubscribers {

private static final List<EventSubscriber> SUBSCRIBERS = loadSubscribers();

public QuarkusEventSubscribers() {
super(SUBSCRIBERS);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/*
* Copyright (C) 2023 Dremio
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.projectnessie.events.quarkus.collector;

import io.micrometer.core.instrument.MeterRegistry;
import io.vertx.core.eventbus.DeliveryOptions;
import io.vertx.core.eventbus.EventBus;
import java.security.Principal;
import org.projectnessie.events.service.EventSubscribers;
import org.projectnessie.versioned.Result;

public class QuarkusMetricsResultCollector extends QuarkusResultCollector {

/** The total number of results collected from the Version Store, exposed as a counter. */
public static final String NESSIE_RESULTS_TOTAL = "nessie.results.total";

/**
* The total number of results rejected by the collector, based on the event type filters exposed
* by the subscribers.
*/
public static final String NESSIE_RESULTS_REJECTED = "nessie.results.rejected";

private final MeterRegistry registry;

public QuarkusMetricsResultCollector(
EventSubscribers subscribers,
String repositoryId,
Principal principal,
EventBus bus,
DeliveryOptions options,
MeterRegistry registry) {
super(subscribers, repositoryId, principal, bus, options);
this.registry = registry;
}

@Override
public void accept(Result result) {
registry.counter(NESSIE_RESULTS_TOTAL).increment();
super.accept(result);
}

@Override
protected boolean shouldProcess(Result result) {
boolean shouldProcess = super.shouldProcess(result);
if (!shouldProcess) {
registry.counter(NESSIE_RESULTS_REJECTED).increment();
}
return shouldProcess;
}
}

0 comments on commit d17a9ed

Please sign in to comment.