Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Events notification system for Nessie - Quarkus #6870

Merged
merged 14 commits into from
Jun 1, 2023
4 changes: 2 additions & 2 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ jobs:
- name: Gradle / test
uses: gradle/gradle-build-action@v2
with:
arguments: test :nessie-client:check -x :nessie-client:intTest -x :nessie-quarkus:test --scan
arguments: test :nessie-client:check -x :nessie-client:intTest -x :nessie-quarkus:test -x :nessie-events-quarkus:test --scan

- name: Capture Test Reports
uses: actions/upload-artifact@v3
Expand Down Expand Up @@ -170,7 +170,7 @@ jobs:
- name: Gradle / Test Quarkus
uses: gradle/gradle-build-action@v2
with:
arguments: :nessie-quarkus:test --scan
arguments: :nessie-quarkus:test :nessie-events-quarkus:test --scan

- name: Dump quarkus.log
if: ${{ failure() }}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
* @see ReferenceDeletedEvent
* @see ContentStoredEvent
* @see ContentRemovedEvent
* @see GenericEvent
*/
public interface Event {

Expand All @@ -46,6 +45,12 @@ public interface Event {
*/
UUID getId();

/** The id of the event, as a string for convenience. */
@Value.Lazy
snazy marked this conversation as resolved.
Show resolved Hide resolved
default String getIdAsText() {
return getId().toString();
}

/**
* The id of the repository. This is configured on a per-instance basis.
*
Expand Down
10 changes: 10 additions & 0 deletions events/quarkus/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
# Nessie Events Quarkus

This module contains the Quarkus-specific implementation of the Nessie events notification system.

To improve isolation and facilitate testing, this module is completely independent of other
Quarkus modules; it contains:

1. Quarkus-specific implementations of nessie-events-service classes;
2. Asynchronous delivery based on Vert.x, both non-blocking and blocking;
3. Delivery with optional logging, tracing and metrics.
74 changes: 74 additions & 0 deletions events/quarkus/build.gradle.kts
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
/*
* Copyright (C) 2022 Dremio
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

plugins {
`java-library`
jacoco
`maven-publish`
signing
alias(libs.plugins.quarkus)
`nessie-conventions`
}

extra["maven.name"] = "Nessie - Events - Quarkus"

dependencies {
implementation(project(":nessie-versioned-spi"))
implementation(project(":nessie-events-api"))
implementation(project(":nessie-events-spi"))
implementation(project(":nessie-events-service"))

// Quarkus
implementation(enforcedPlatform(libs.quarkus.bom))
implementation("io.quarkus:quarkus-vertx")

// Metrics
implementation("io.micrometer:micrometer-core")

// OpenTelemetry
implementation("io.opentelemetry:opentelemetry-api")
implementation("io.opentelemetry:opentelemetry-semconv")

testImplementation(project(":nessie-model"))
snazy marked this conversation as resolved.
Show resolved Hide resolved

testImplementation(enforcedPlatform(libs.quarkus.bom))
testImplementation("io.quarkus:quarkus-opentelemetry")
testImplementation("io.quarkus:quarkus-micrometer")
testImplementation("io.quarkus:quarkus-micrometer-registry-prometheus")
testImplementation("io.quarkus:quarkus-junit5")
testImplementation("io.quarkus:quarkus-junit5-mockito")
testImplementation("io.quarkus:quarkus-jacoco")

testImplementation("io.opentelemetry:opentelemetry-sdk-trace")

testImplementation(platform(libs.junit.bom))
testImplementation(libs.bundles.junit.testing)
testImplementation(libs.awaitility)

testCompileOnly(platform(libs.jackson.bom))
testCompileOnly("com.fasterxml.jackson.core:jackson-annotations")
testCompileOnly(libs.microprofile.openapi)
}

buildForJava11()

listOf("javadoc", "sourcesJar").forEach { name ->
tasks.named(name) { dependsOn(tasks.named("compileQuarkusGeneratedSourcesJava")) }
}

listOf("checkstyleTest", "compileTestJava").forEach { name ->
tasks.named(name) { dependsOn(tasks.named("compileQuarkusTestGeneratedSourcesJava")) }
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Copyright (C) 2023 Dremio
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.projectnessie.events.quarkus;

import javax.enterprise.context.Dependent;
import javax.inject.Inject;
import org.projectnessie.events.service.EventConfig;
import org.projectnessie.events.service.EventFactory;

@Dependent
public class QuarkusEventFactory extends EventFactory {

@Inject
public QuarkusEventFactory(EventConfig config) {
super(config);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
/*
* Copyright (C) 2023 Dremio
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.projectnessie.events.quarkus;

import io.quarkus.runtime.ShutdownEvent;
import io.quarkus.runtime.StartupEvent;
import io.quarkus.vertx.ConsumeEvent;
import io.vertx.core.Handler;
import io.vertx.core.eventbus.DeliveryOptions;
import io.vertx.core.eventbus.EventBus;
import io.vertx.core.eventbus.Message;
import io.vertx.core.eventbus.MessageConsumer;
import java.util.Map;
import javax.enterprise.context.ApplicationScoped;
import javax.enterprise.event.Observes;
import javax.inject.Inject;
import javax.inject.Named;
import org.projectnessie.events.api.Event;
import org.projectnessie.events.api.EventType;
import org.projectnessie.events.quarkus.config.EventBusConfigurer;
import org.projectnessie.events.quarkus.delivery.EventDelivery;
import org.projectnessie.events.quarkus.delivery.EventDeliveryFactory;
import org.projectnessie.events.service.EventConfig;
import org.projectnessie.events.service.EventFactory;
import org.projectnessie.events.service.EventService;
import org.projectnessie.events.service.EventSubscribers;
import org.projectnessie.events.service.VersionStoreEvent;
import org.projectnessie.events.spi.EventSubscriber;
import org.projectnessie.events.spi.EventSubscription;

@ApplicationScoped
public class QuarkusEventService extends EventService {

/**
* The local event bus address used to exchange messages of type {@link VersionStoreEvent} between
* {@link org.projectnessie.events.quarkus.collector.QuarkusResultCollector} and {@link
* QuarkusEventService}.
*/
public static final String NESSIE_EVENTS_SERVICE_ADDR = "nessie.events.service";

/**
* The prefix for all local event bus addresses used to exchange messages of type {@link Event}
* between {@link QuarkusEventService} and {@link EventSubscriber}s.
*
* <p>Addresses are of the form {@code nessie.events.<event-type>}, where {@code <event-type>} is
* the {@link EventType} name, e.g. {@code nessie.events.COMMIT}.
*/
public static final String NESSIE_EVENTS_SUBSCRIBERS_ADDR_PREFIX = "nessie.events.subscribers.";

private final EventBus bus;
private final EventDeliveryFactory deliveryFactory;
private final DeliveryOptions deliveryOptions;

// Mandatory for CDI.
@SuppressWarnings("unused")
public QuarkusEventService() {
this(null, null, null, null, null, null);
}

@Inject
public QuarkusEventService(
EventConfig config,
EventFactory factory,
EventSubscribers subscribers,
EventBus bus,
EventDeliveryFactory deliveryFactory,
@Named(EventBusConfigurer.EVENTS_DELIVERY_OPTIONS_BEAN_NAME)
DeliveryOptions deliveryOptions) {
super(config, factory, subscribers);
this.bus = bus;
this.deliveryFactory = deliveryFactory;
this.deliveryOptions = deliveryOptions;
}

public void onStartup(@Observes StartupEvent event) {
start();
for (Map.Entry<EventSubscription, EventSubscriber> entry :
subscribers.getSubscriptions().entrySet()) {
EventSubscription subscription = entry.getKey();
EventSubscriber subscriber = entry.getValue();
Handler<Message<Event>> handler = e -> deliverEvent(e.body(), subscriber, subscription);
for (EventType eventType : EventType.values()) {
if (subscriber.accepts(eventType)) {
String address = NESSIE_EVENTS_SUBSCRIBERS_ADDR_PREFIX + eventType;
MessageConsumer<Event> consumer = bus.localConsumer(address);
consumer.handler(handler);
}
}
}
}

public void onShutdown(@Observes ShutdownEvent event) {
close();
}

@ConsumeEvent(NESSIE_EVENTS_SERVICE_ADDR)
@Override
public void onVersionStoreEvent(VersionStoreEvent event) {
super.onVersionStoreEvent(event);
}

@Override
protected void fireEvent(Event event) {
// Publish the event to all interested subscribers that are listening to this address.
String address = NESSIE_EVENTS_SUBSCRIBERS_ADDR_PREFIX + event.getType();
bus.publish(address, event, deliveryOptions);
}

@Override
protected void deliverEvent(
Event event, EventSubscriber subscriber, EventSubscription subscription) {
EventDelivery delivery = deliveryFactory.create(event, subscriber, subscription);
delivery.start();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Copyright (C) 2023 Dremio
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.projectnessie.events.quarkus;

import java.util.List;
import javax.enterprise.context.ApplicationScoped;
import org.projectnessie.events.service.EventSubscribers;
import org.projectnessie.events.spi.EventSubscriber;

@ApplicationScoped
public class QuarkusEventSubscribers extends EventSubscribers {

private static final List<EventSubscriber> SUBSCRIBERS = loadSubscribers();

public QuarkusEventSubscribers() {
super(SUBSCRIBERS);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/*
* Copyright (C) 2023 Dremio
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.projectnessie.events.quarkus.collector;

import io.micrometer.core.instrument.MeterRegistry;
import io.vertx.core.eventbus.DeliveryOptions;
import io.vertx.core.eventbus.EventBus;
import java.security.Principal;
import org.projectnessie.events.service.EventSubscribers;
import org.projectnessie.versioned.Result;

public class QuarkusMetricsResultCollector extends QuarkusResultCollector {

/** The total number of results collected from the Version Store, exposed as a counter. */
public static final String NESSIE_RESULTS_TOTAL = "nessie.results.total";

/**
* The total number of results rejected by the collector, based on the event type filters exposed
* by the subscribers.
*/
public static final String NESSIE_RESULTS_REJECTED = "nessie.results.rejected";

private final MeterRegistry registry;

public QuarkusMetricsResultCollector(
EventSubscribers subscribers,
String repositoryId,
Principal principal,
EventBus bus,
DeliveryOptions options,
MeterRegistry registry) {
super(subscribers, repositoryId, principal, bus, options);
this.registry = registry;
}

@Override
public void accept(Result result) {
registry.counter(NESSIE_RESULTS_TOTAL).increment();
super.accept(result);
}

@Override
protected boolean shouldProcess(Result result) {
boolean shouldProcess = super.shouldProcess(result);
if (!shouldProcess) {
registry.counter(NESSIE_RESULTS_REJECTED).increment();
}
return shouldProcess;
}
}